diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java index 0ffefeea221..2b470cdc246 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java @@ -17,6 +17,8 @@ package org.apache.inlong.sdk.transform.encode; +import lombok.Data; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -26,6 +28,7 @@ * DefaultSinkData * */ +@Data public class DefaultSinkData implements SinkData { private List keyList = new ArrayList<>(); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java index 2ce813ec033..307df39d166 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java @@ -35,16 +35,37 @@ public class TransformConfig { @JsonProperty("configuration") private Map configuration; + @JsonProperty("strictOrder") + private boolean strictOrder = true; + + @JsonProperty("ignoreConfigError") + private boolean ignoreConfigError = true; + @JsonCreator public TransformConfig(@JsonProperty("transformSql") String transformSql) { - this(transformSql, ImmutableMap.of()); + this(transformSql, ImmutableMap.of(), true, true); } @JsonCreator public TransformConfig(@JsonProperty("transformSql") String transformSql, @JsonProperty("configuration") Map configuration) { + this(transformSql, configuration, true, true); + } + @JsonCreator + public TransformConfig(@JsonProperty("transformSql") String transformSql, + @JsonProperty("strictOrder") boolean strictOrder) { + this(transformSql, ImmutableMap.of(), strictOrder, true); + } + + @JsonCreator + public TransformConfig(@JsonProperty("transformSql") String transformSql, + @JsonProperty("configuration") Map configuration, + @JsonProperty("strictOrder") boolean strictOrder, + @JsonProperty("ignoreConfigError") boolean ignoreConfigError) { this.transformSql = Preconditions.checkNotNull(transformSql, "transform sql should not be null"); this.configuration = configuration; + this.strictOrder = strictOrder; + this.ignoreConfigError = ignoreConfigError; } /** @@ -61,6 +82,16 @@ public Map getConfiguration() { return configuration; } + @JsonProperty("strictOrder") + public boolean isStrictOrder() { + return strictOrder; + } + + @JsonProperty("ignoreConfigError") + public boolean isIgnoreConfigError() { + return ignoreConfigError; + } + /** * set transformSql * @param transformSql the transformSql to set diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java index acb7e62e070..ba8888adde2 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java @@ -20,7 +20,6 @@ import org.apache.inlong.sdk.transform.decode.SourceData; import org.apache.inlong.sdk.transform.decode.SourceDecoder; import org.apache.inlong.sdk.transform.encode.DefaultSinkData; -import org.apache.inlong.sdk.transform.encode.SinkData; import org.apache.inlong.sdk.transform.encode.SinkEncoder; import org.apache.inlong.sdk.transform.pojo.FieldInfo; import org.apache.inlong.sdk.transform.pojo.TransformConfig; @@ -63,7 +62,7 @@ public class TransformProcessor { private ExpressionOperator where; private List selectItems; - private boolean includeAllSourceFields = false; + private List sinkFieldList; public static TransformProcessor create( TransformConfig config, @@ -84,6 +83,11 @@ private TransformProcessor( } private void init() throws JSQLParserException { + if (!config.isStrictOrder() && encoder != null && encoder.getFields() != null) { + List fields = encoder.getFields(); + this.sinkFieldList = new ArrayList<>(fields.size()); + fields.forEach(v -> this.sinkFieldList.add(v.getName())); + } this.initTransformSql(); } @@ -98,7 +102,7 @@ private void initTransformSql() throws JSQLParserException { for (int i = 0; i < items.size(); i++) { SelectItem item = items.get(i); String fieldName = null; - if (i < fields.size()) { + if (config.isStrictOrder() && i < fields.size()) { fieldName = fields.get(i).getName(); } if (item instanceof SelectExpressionItem) { @@ -109,6 +113,10 @@ private void initTransformSql() throws JSQLParserException { } else { fieldName = exprItem.getAlias().getName(); } + if (!this.checkSelectField(fieldName)) { + throw new JSQLParserException( + String.format("Field name:%s can not be found in sink field list.", fieldName)); + } } this.selectItems .add(new ValueParserNode(fieldName, OperatorTools.buildParser(exprItem.getExpression()))); @@ -120,6 +128,13 @@ private void initTransformSql() throws JSQLParserException { } } + public boolean checkSelectField(String fieldName) { + if (config.isIgnoreConfigError()) { + return true; + } + return this.sinkFieldList != null && this.sinkFieldList.contains(fieldName); + } + public List transform(I input) { return this.transform(input, EMPTY_EXT_PARAMS); } @@ -142,7 +157,7 @@ public List transform(I input, Map extParams) { } // parse value - SinkData sinkData = new DefaultSinkData(); + DefaultSinkData sinkData = new DefaultSinkData(); for (ValueParserNode node : this.selectItems) { String fieldName = node.getFieldName(); ValueParser parser = node.getParser(); @@ -163,6 +178,9 @@ public List transform(I input, Map extParams) { } } + if (this.sinkFieldList != null) { + sinkData.setKeyList(this.sinkFieldList); + } // encode sinkDatas.add(this.encoder.encode(sinkData, context)); } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2CsvForErrorOrderProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2CsvForErrorOrderProcessor.java new file mode 100644 index 00000000000..d03d1127769 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2CsvForErrorOrderProcessor.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.processor; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo; +import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo; +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.KvSinkInfo; +import org.apache.inlong.sdk.transform.pojo.KvSourceInfo; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import net.sf.jsqlparser.JSQLParserException; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestCsv2CsvForErrorOrderProcessor extends AbstractProcessorTestBase { + + @Test + public void testCsv2CsvForErrorOrder() throws Exception { + List sourceFields = this.getTestFieldList("ftime", "extinfo", "data"); + CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", '|', '\\', sourceFields); + List sinkFields = this.getTestFieldList("field1", "field2", "field3"); + CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', sinkFields); + String transformSql = "select ftime as field2,data as field3,extinfo as field4 from source where extinfo='ok'"; + TransformConfig config = new TransformConfig(transformSql, false); + // case1 + TransformProcessor processor1 = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); + + List output1 = processor1.transform("2024-04-28 00:00:00|ok|data1", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("|2024-04-28 00:00:00|data1", output1.get(0)); + } + + @Test + public void testKv2KvForErrorOrder() throws Exception { + List sourceFields = this.getTestFieldList("key1", "key2", "key3", "key4"); + KvSourceInfo kvSource = new KvSourceInfo("UTF-8", sourceFields); + List sinkFields = this.getTestFieldList("field1", "field2", "field3"); + KvSinkInfo kvSink = new KvSinkInfo("UTF-8", sinkFields); + String transformSql = "select key4 as field3, key2 as field6, key1 as field1"; + TransformConfig config = new TransformConfig(transformSql, false); + // case1 + TransformProcessor processor1 = TransformProcessor + .create(config, SourceDecoderFactory.createKvDecoder(kvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + List output1 = processor1.transform("key1=string11&key2=string12&key3=number11&key4=number12", + new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("field1=string11&field2=&field3=number12", output1.get(0)); + } + + @Test + public void testKv2KvForConfigError() throws Exception { + List sourceFields = this.getTestFieldList("key1", "key2", "key3", "key4"); + KvSourceInfo kvSource = new KvSourceInfo("UTF-8", sourceFields); + List sinkFields = this.getTestFieldList("field1", "field2", "field3"); + KvSinkInfo kvSink = new KvSinkInfo("UTF-8", sinkFields); + String transformSql = "select key4 as field3, key2 as field6, key1 as field1"; + TransformConfig config = new TransformConfig(transformSql, new HashMap<>(), false, false); + // case1 + Assert.assertThrows(JSQLParserException.class, () -> { + TransformProcessor + .create(config, SourceDecoderFactory.createKvDecoder(kvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + }); + } +}