This document describes how to understand, develop and contribute a transform.
We also provide the transform e2e test to verify the data input and output by the transform.
Using SeaTunnel you can read or write data through the connector, but if you need to process your data after reading or before writing, then need to use transform.
Use transform to make simple edits to your data rows or fields, such as split field, change field values, add or remove field.
Transform receives datatype input from upstream(source or transform) and outputs new datatype to downstream(sink or transform), this process is datatype transform.
Example 1:Remove fields
| A | B | C |
|-----------|-----------|-----------|
| STRING | INT | BOOLEAN |
| A | B |
|-----------|-----------|
| STRING | INT |
Example 2:Sort fields
| B | C | A |
|-----------|-----------|-----------|
| INT | BOOLEAN | STRING |
| A | B | C |
|-----------|-----------|-----------|
| STRING | INT | BOOLEAN |
Example 3:Update fields datatype
| A | B | C |
|-----------|-----------|-----------|
| STRING | INT | BOOLEAN |
| A | B | C |
|-----------|-----------|-----------|
| STRING | STRING | STRING |
Example 4:Add new fields
| A | B | C |
|-----------|-----------|-----------|
| STRING | INT | BOOLEAN |
| A | B | C | D |
|-----------|-----------|-----------|-----------|
| STRING | INT | BOOLEAN | DOUBLE |
After datatype transformed, Transform will receives data-row input from upstream(source or transform), edit into data-row with new datatype and outputs to downstream (sink or transform). This process is data transform.
Transform is decoupled from the execution engine, any transform implement can run into all engines without change the code & config, which requires the translation layer to adapt transform and execution engine.
Example:Translation datatype & data
Original:
| A | B | C |
|-----------|-----------|-----------|
| STRING | INT | BOOLEAN |
Datatype translation:
| A | B | C |
|-------------------|-------------------|-------------------|
| ENGINE<STRING> | ENGINE<INT> | ENGINE<BOOLEAN> |
Data translation:
| A | B | C |
|-------------------|-------------------|-------------------|
| ENGINE<"test"> | ENGINE<1> | ENGINE<false> |
SeaTunnelTransform
provides all major and primary APIs, you can subclass it to do whatever transform.
/**
* Set the data type info of input data.
*
* @param inputDataType The data type info of upstream input.
*/
void setTypeInfo(SeaTunnelDataType<T> inputDataType);
/**
* Get the data type of the records produced by this transform.
*
* @return Produced data type.
*/
SeaTunnelDataType<T> getProducedType();
/**
* Transform input data to {@link this#getProducedType()} types data.
*
* @param row the data need be transform.
* @return transformed data.
*/
T map(T row);
SingleFieldOutputTransform
abstract single field change operator
/**
* Outputs new field
*
* @return
*/
protected abstract String getOutputFieldName();
/**
* Outputs new field datatype
*
* @return
*/
protected abstract SeaTunnelDataType getOutputFieldDataType();
/**
* Outputs new field value
*
* @param inputRow The inputRow of upstream input.
* @return
*/
protected abstract Object getOutputFieldValue(SeaTunnelRowAccessor inputRow);
MultipleFieldOutputTransform
abstract multiple fields change operator
/**
* Outputs new fields
*
* @return
*/
protected abstract String[] getOutputFieldNames();
/**
* Outputs new fields datatype
*
* @return
*/
protected abstract SeaTunnelDataType[] getOutputFieldDataTypes();
/**
* Outputs new fields value
*
* @param inputRow The inputRow of upstream input.
* @return
*/
protected abstract Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow);
AbstractSeaTunnelTransform
abstract datatype & fields change operator
/**
* Outputs transformed row type.
*
* @param inputRowType upstream input row type
* @return
*/
protected abstract SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType);
/**
* Outputs transformed row data.
*
* @param inputRow upstream input row data
* @return
*/
protected abstract SeaTunnelRow transformRow(SeaTunnelRow inputRow);
It must implement one of the following APIs:
Add implement subclass into module seatunnel-transforms-v2
.
@AutoService(SeaTunnelTransform.class)
public class CopyFieldTransform extends SingleFieldOutputTransform {
private String srcField;
private int srcFieldIndex;
private SeaTunnelDataType srcFieldDataType;
private String destField;
@Override
public String getPluginName() {
return "Copy";
}
@Override
protected void setConfig(Config pluginConfig) {
this.srcField = pluginConfig.getString("src_field");
this.destField = pluginConfig.getString("dest_fields");
}
@Override
protected void setInputRowType(SeaTunnelRowType inputRowType) {
srcFieldIndex = inputRowType.indexOf(srcField);
srcFieldDataType = inputRowType.getFieldType(srcFieldIndex);
}
@Override
protected String getOutputFieldName() {
return destField;
}
@Override
protected SeaTunnelDataType getOutputFieldDataType() {
return srcFieldDataType;
}
@Override
protected Object getOutputFieldValue(SeaTunnelRowAccessor inputRow) {
return inputRow.getField(srcFieldIndex);
}
}
getPluginName
method is used to identify the transform name.META-INF/services/org.apache.seatunnel.api.transform.SeaTunnelTransform
file automatically.setConfig
method is used to inject user configs.Once you add a new plugin, it is recommended to add e2e tests for it.
We have a seatunnel-e2e/seatunnel-transforms-v2-e2e
module to help you to do this.
For example, if you want to add an e2e test for CopyFieldTransform
, you can create a new test in
seatunnel-e2e/seatunnel-transforms-v2-e2e
module and extend the TestSuiteBase
class in the test.
public class TestCopyFieldTransformIT extends TestSuiteBase {
@TestTemplate
public void testCopyFieldTransform(TestContainer container) {
Container.ExecResult execResult = container.executeJob("/copy_transform.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
}
Once your testcase implements the TestSuiteBase
interface and use @TestTemplate
annotation startup,
it will running job to all engines, and you just need to execute the executeJob method with your SeaTunnel configuration file,
it will submit the SeaTunnel job.