From c4bf54bbb1bb0d83429c1aa9b93ef721bf376e90 Mon Sep 17 00:00:00 2001 From: fuweng11 <76141879+fuweng11@users.noreply.github.com> Date: Mon, 2 Sep 2024 20:24:25 +0800 Subject: [PATCH 01/11] [INLONG-10988][Manager] Data preview filters data in tubes based on streamId (#10989) --- .../manager/service/resource/queue/tubemq/TubeMQOperator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java index 050de078050..66d1fcf59a5 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java @@ -71,6 +71,7 @@ public class TubeMQOperator { private static final String CREATE_USER = "&createUser="; private static final String CONF_MOD_AUTH_TOKEN = "&confModAuthToken="; private static final String MSG_COUNT = "&msgCount="; + private static final String FILTER_CONDS = "&filterConds="; private static final String QUERY_TOPIC_PATH = "/webapi.htm?method=admin_query_cluster_topic_view"; private static final String QUERY_BROKER_PATH = "/webapi.htm?method=admin_query_broker_run_status"; @@ -288,7 +289,7 @@ public List queryLastMessage(TubeClusterInfo tubeCluster, String } String url = "http://" + brokerUrl + QUERY_MESSAGE_PATH + TOPIC_NAME + topicName + MSG_COUNT - + request.getMessageCount(); + + request.getMessageCount() + FILTER_CONDS + streamInfo.getInlongStreamId(); TubeMessageResponse response = HttpUtils.request(restTemplate, url, HttpMethod.GET, null, new HttpHeaders(), TubeMessageResponse.class); if (response.getErrCode() != SUCCESS_CODE && response.getErrCode() != 200) { From c08549eff5113d23b03f4436ca02a77949f4a49d Mon Sep 17 00:00:00 2001 From: emptyOVO <118812562+emptyOVO@users.noreply.github.com> Date: Tue, 3 Sep 2024 10:23:03 +0800 Subject: [PATCH 02/11] [INLONG-10984][SDK] Transform support RADIANS(x) function (#10987) * [INLONG-10984][SDK] Transform support RADIANS(x) function * fix: add description --- .../process/function/RadiansFunction.java | 49 +++++++++++++++++++ ...TransformArithmeticFunctionsProcessor.java | 17 +++++++ 2 files changed, 66 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/RadiansFunction.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/RadiansFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/RadiansFunction.java new file mode 100644 index 00000000000..635d18b9b8f --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/RadiansFunction.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +/** + * RadiansFunction + * description: + * - RADIANS(x)--returns radians of x, Convert degrees to radians + */ +@TransformFunction(names = {"radians"}) +public class RadiansFunction implements ValueParser { + + private ValueParser degreeParser; + + public RadiansFunction(Function expr) { + degreeParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object degreeObj = degreeParser.parse(sourceData, rowIndex, context); + if (degreeObj == null) { + return null; + } + return Math.toRadians(OperatorTools.parseBigDecimal(degreeObj).doubleValue()); + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java index 1e0f14025f0..c36835e5792 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java @@ -422,6 +422,23 @@ public void testLnFunction() throws Exception { Assert.assertEquals(output2.get(0), "result=2.302585092994046"); } + @Test + public void testRadiansFunction() throws Exception { + String transformSql = "select radians(numeric1) from source"; + TransformConfig config = new TransformConfig(transformSql); + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: radians(10) + List output1 = processor.transform("10|4|6|8", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=0.17453292519943295"); + // case2: radians(18.97) + List output2 = processor.transform("18.97|4|6|8", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=0.33108895910332425"); + } + @Test public void testLog10Function() throws Exception { String transformSql = "select log10(numeric1) from source"; From bc6e9610bb84efe3e794cda5d79a7dda784b71e8 Mon Sep 17 00:00:00 2001 From: Xincheng Huang <60057611+ying-hua@users.noreply.github.com> Date: Tue, 3 Sep 2024 10:23:19 +0800 Subject: [PATCH 03/11] [INLONG-10982][SDK] Improve the test code structure (#10985) --- .../process/TestTransformProcessor.java | 67 +++++-------------- 1 file changed, 16 insertions(+), 51 deletions(-) diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java index 20af097de60..3413f1aca3e 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java @@ -44,13 +44,7 @@ public class TestTransformProcessor { @Test public void testCsv2Kv() throws Exception { - List fields = new ArrayList<>(); - FieldInfo ftime = new FieldInfo(); - ftime.setName("ftime"); - fields.add(ftime); - FieldInfo extinfo = new FieldInfo(); - extinfo.setName("extinfo"); - fields.add(extinfo); + List fields = this.getTestFieldList("ftime", "extinfo"); CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", '|', '\\', fields); KvSinkInfo kvSink = new KvSinkInfo("UTF-8", fields); String transformSql = "select ftime,extinfo from source where extinfo='ok'"; @@ -97,13 +91,7 @@ public void testCsv2KvNoField() throws Exception { @Test public void testKv2Csv() throws Exception { - List fields = new ArrayList<>(); - FieldInfo ftime = new FieldInfo(); - ftime.setName("ftime"); - fields.add(ftime); - FieldInfo extinfo = new FieldInfo(); - extinfo.setName("extinfo"); - fields.add(extinfo); + List fields = this.getTestFieldList("ftime", "extinfo"); KvSourceInfo kvSource = new KvSourceInfo("UTF-8", fields); CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields); String transformSql = "select ftime,extinfo from source where extinfo='ok'"; @@ -148,7 +136,7 @@ public void testKv2CsvNoField() throws Exception { @Test public void testJson2Csv() throws Exception { - List fields1 = this.getTestFieldList(); + List fields1 = this.getTestFieldList("sid", "packageID", "msgTime", "msg"); JsonSourceInfo jsonSource1 = new JsonSourceInfo("UTF-8", "msgs"); CsvSinkInfo csvSink1 = new CsvSinkInfo("UTF-8", '|', '\\', fields1); String transformSql1 = "select $root.sid,$root.packageID,$child.msgTime,$child.msg from source"; @@ -170,7 +158,7 @@ public void testJson2Csv() throws Exception { Assert.assertEquals(output1.get(0), "value1|value2|1713243918000|value4"); Assert.assertEquals(output1.get(1), "value1|value2|1713243918000|v4"); // case2 - List fields2 = this.getTestFieldList2(); + List fields2 = this.getTestFieldList("id", "itemId", "subItemId", "msg"); JsonSourceInfo jsonSource2 = new JsonSourceInfo("UTF-8", "items"); CsvSinkInfo csvSink2 = new CsvSinkInfo("UTF-8", '|', '\\', fields2); String transformSql2 = @@ -205,7 +193,7 @@ public void testJson2Csv() throws Exception { @Test public void testJson2CsvForOne() throws Exception { - List fields = this.getTestFieldList(); + List fields = this.getTestFieldList("sid", "packageID", "msgTime", "msg"); JsonSourceInfo jsonSource = new JsonSourceInfo("UTF-8", ""); CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields); String transformSql = "select $root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source"; @@ -229,7 +217,7 @@ public void testJson2CsvForOne() throws Exception { @Test public void testPb2Csv() throws Exception { - List fields = this.getTestFieldList(); + List fields = this.getTestFieldList("sid", "packageID", "msgTime", "msg"); String transformBase64 = this.getPbTestDescription(); PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs"); CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields); @@ -246,36 +234,13 @@ public void testPb2Csv() throws Exception { Assert.assertEquals(output.get(1), "sid|1|1713243918002|msgValue42"); } - private List getTestFieldList() { - List fields = new ArrayList<>(); - FieldInfo sid = new FieldInfo(); - sid.setName("sid"); - fields.add(sid); - FieldInfo packageID = new FieldInfo(); - packageID.setName("packageID"); - fields.add(packageID); - FieldInfo msgTime = new FieldInfo(); - msgTime.setName("msgTime"); - fields.add(msgTime); - FieldInfo msg = new FieldInfo(); - msg.setName("msg"); - fields.add(msg); - return fields; - } - private List getTestFieldList2() { + private List getTestFieldList(String... fieldNames) { List fields = new ArrayList<>(); - FieldInfo id = new FieldInfo(); - id.setName("id"); - fields.add(id); - FieldInfo itemId = new FieldInfo(); - itemId.setName("itemId"); - fields.add(itemId); - FieldInfo subItemId = new FieldInfo(); - subItemId.setName("subItemId"); - fields.add(subItemId); - FieldInfo msg = new FieldInfo(); - msg.setName("msg"); - fields.add(msg); + for (String fieldName : fieldNames) { + FieldInfo field = new FieldInfo(); + field.setName(fieldName); + fields.add(field); + } return fields; } @@ -309,7 +274,7 @@ private String getPbTestDescription() { @Test public void testPb2CsvForOne() throws Exception { - List fields = this.getTestFieldList(); + List fields = this.getTestFieldList("sid", "packageID", "msgTime", "msg"); String transformBase64 = this.getPbTestDescription(); PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", null); CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields); @@ -327,7 +292,7 @@ public void testPb2CsvForOne() throws Exception { @Test public void testPb2CsvForAdd() throws Exception { - List fields = this.getTestFieldList(); + List fields = this.getTestFieldList("sid", "packageID", "msgTime", "msg"); String transformBase64 = this.getPbTestDescription(); PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", null); CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields); @@ -351,7 +316,7 @@ public void testPb2CsvForAdd() throws Exception { @Test public void testPb2CsvForConcat() throws Exception { - List fields = this.getTestFieldList(); + List fields = this.getTestFieldList("sid", "packageID", "msgTime", "msg"); String transformBase64 = this.getPbTestDescription(); PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs"); CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields); @@ -371,7 +336,7 @@ public void testPb2CsvForConcat() throws Exception { @Test public void testPb2CsvForNow() throws Exception { - List fields = this.getTestFieldList(); + List fields = this.getTestFieldList("sid", "packageID", "msgTime", "msg"); String transformBase64 = this.getPbTestDescription(); PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs"); CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields); From dfd42dc88d7f5bb95d66165ae5cfcf6c92921bd7 Mon Sep 17 00:00:00 2001 From: Zkplo <87751516+Zkplo@users.noreply.github.com> Date: Tue, 3 Sep 2024 10:24:08 +0800 Subject: [PATCH 04/11] [INLONG-10809][SDK] Improvements to TypeConverter field types and CompareValue in OperatorTools (#10817) Co-authored-by: ZKpLo <14148880+zkplo@user.noreply.gitee.com> --- .../sdk/transform/decode/CsvSourceData.java | 10 +- .../transform/decode/CsvSourceDecoder.java | 8 +- .../sdk/transform/decode/SourceData.java | 2 +- .../inlong/sdk/transform/pojo/FieldInfo.java | 2 +- .../process/converter/DoubleConverter.java | 26 ++ .../process/converter/LongConverter.java | 26 ++ .../process/function/IfFunction.java | 55 +++ .../process/operator/OperatorTools.java | 21 +- .../process/parser/DoubleParser.java | 1 + ...TransformExpressionOperatorsProcessor.java | 354 ++++++++++++++++++ 10 files changed, 485 insertions(+), 20 deletions(-) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/DoubleConverter.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/LongConverter.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IfFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformExpressionOperatorsProcessor.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java index d4492b4b853..e0bd9f794ce 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java @@ -28,14 +28,14 @@ */ public class CsvSourceData implements SourceData { - private List> rows = new ArrayList<>(); + private List> rows = new ArrayList<>(); - private Map currentRow; + private Map currentRow; public CsvSourceData() { } - public void putField(String fieldName, String fieldValue) { + public void putField(String fieldName, Object fieldValue) { this.currentRow.put(fieldName, fieldValue); } @@ -50,11 +50,11 @@ public int getRowCount() { } @Override - public String getField(int rowNum, String fieldName) { + public Object getField(int rowNum, String fieldName) { if (rowNum >= this.rows.size()) { return null; } - Map targetRow = this.rows.get(rowNum); + Map targetRow = this.rows.get(rowNum); return targetRow.get(fieldName); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java index fb95dadc430..7b3dedb6373 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java @@ -76,9 +76,13 @@ public SourceData decode(String srcString, Context context) { int fieldIndex = 0; for (FieldInfo field : fields) { String fieldName = field.getName(); - String fieldValue = null; + Object fieldValue = null; if (fieldIndex < fieldValues.length) { - fieldValue = fieldValues[fieldIndex]; + try { + fieldValue = field.getConverter().convert(fieldValues[fieldIndex]); + } catch (Exception e) { + throw new RuntimeException(e); + } } sourceData.putField(fieldName, fieldValue); fieldIndex++; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java index 2c39948f2d0..cf5f9c0fbe8 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java @@ -26,5 +26,5 @@ public interface SourceData { int getRowCount(); - String getField(int rowNum, String fieldName); + Object getField(int rowNum, String fieldName); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java index 1027dad944b..2a7834112a1 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java @@ -28,7 +28,7 @@ public class FieldInfo { private String name; - private TypeConverter converter; + private TypeConverter converter = TypeConverter.DefaultTypeConverter(); public FieldInfo() { diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/DoubleConverter.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/DoubleConverter.java new file mode 100644 index 00000000000..52afbda16b7 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/DoubleConverter.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.converter; + +public class DoubleConverter implements TypeConverter { + + @Override + public Object convert(String value) throws Exception { + return Double.parseDouble(value); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/LongConverter.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/LongConverter.java new file mode 100644 index 00000000000..5a18f8ee138 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/LongConverter.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.converter; + +public class LongConverter implements TypeConverter { + + @Override + public Object convert(String value) throws Exception { + return Long.parseLong(value); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IfFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IfFunction.java new file mode 100644 index 00000000000..bda7b9301cc --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IfFunction.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.ExpressionOperator; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.List; + +/** + * IfFunction + * description: if(expr,r1,r2) -- expr is an expression, if it holds, return r1; otherwise, return r2 + */ +@TransformFunction(names = {"if"}) +public class IfFunction implements ValueParser { + + private final ExpressionOperator expressionOperator; + private final ValueParser tureValueParser; + private final ValueParser falseValueParser; + + public IfFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + expressionOperator = OperatorTools.buildOperator(expressions.get(0)); + tureValueParser = OperatorTools.buildParser(expressions.get(1)); + falseValueParser = OperatorTools.buildParser(expressions.get(2)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + boolean condition = expressionOperator.check(sourceData, rowIndex, context); + return condition ? tureValueParser.parse(sourceData, rowIndex, context) + : falseValueParser.parse(sourceData, rowIndex, context); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java index 02b24cdb6b4..bb35bb4490f 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java @@ -133,19 +133,18 @@ public static int compareValue(Comparable left, Comparable right) { if (right == null) { return 1; } - if (left instanceof String) { - if (right instanceof String) { - return ObjectUtils.compare(left, right); - } else { - BigDecimal leftValue = parseBigDecimal(left); - return ObjectUtils.compare(leftValue, right); - } + + if (((Object) left).getClass() == ((Object) right).getClass()) { + return ObjectUtils.compare(left, right); } else { - if (right instanceof String) { + try { + BigDecimal leftValue = parseBigDecimal(left); BigDecimal rightValue = parseBigDecimal(right); - return ObjectUtils.compare(left, rightValue); - } else { - return ObjectUtils.compare(left, right); + return ObjectUtils.compare(leftValue, rightValue); + } catch (Exception e) { + String leftValue = parseString(left); + String rightValue = parseString(right); + return ObjectUtils.compare(leftValue, rightValue); } } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java index ad39558a112..a88b17f6baa 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java @@ -24,6 +24,7 @@ /** * LongParser + * */ @TransformParser(values = DoubleValue.class) public class DoubleParser implements ValueParser { diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformExpressionOperatorsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformExpressionOperatorsProcessor.java new file mode 100644 index 00000000000..67e4e331d41 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformExpressionOperatorsProcessor.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo; +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.KvSinkInfo; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.converter.DoubleConverter; +import org.apache.inlong.sdk.transform.process.converter.LongConverter; +import org.apache.inlong.sdk.transform.process.converter.TypeConverter; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * TestArithmeticFunctionsTransformProcessor + * description: test the arithmetic functions in transform processor + */ +public class TestTransformExpressionOperatorsProcessor { + + private static final List srcFields = new ArrayList<>(); + private static final List dstFields = new ArrayList<>(); + private static final CsvSourceInfo csvSource; + private static final KvSinkInfo kvSink; + + static { + srcFields.add(new FieldInfo("numeric1", new DoubleConverter())); + srcFields.add(new FieldInfo("string2", TypeConverter.DefaultTypeConverter())); + srcFields.add(new FieldInfo("numeric3", new DoubleConverter())); + srcFields.add(new FieldInfo("numeric4", new LongConverter())); + + FieldInfo field = new FieldInfo(); + field.setName("result"); + dstFields.add(field); + csvSource = new CsvSourceInfo("UTF-8", '|', '\\', srcFields); + kvSink = new KvSinkInfo("UTF-8", dstFields); + } + + @Test + public void testEqualsToOperator() throws Exception { + String transformSql = "select if(string2 = 4,1,0) from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1: "3.14159265358979323846|4a|4|8" + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output1 = processor.transform("3.14159265358979323846|4a|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=0"); + // case2: "3.14159265358979323846|4|4|8" + List output2 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output2.get(0), "result=1"); + + transformSql = "select if(numeric3 = 4,1,0) from source"; + config = new TransformConfig(transformSql); + // case3: "3.14159265358979323846|4|4|8" + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output3 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output3.get(0), "result=1"); + // case4: "3.14159265358979323846|4|4.2|8" + List output4 = processor.transform("3.14159265358979323846|4|4.2|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output4.get(0), "result=0"); + } + + @Test + public void testNotEqualsToOperator() throws Exception { + String transformSql = "select if(string2 != 4,1,0) from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1: "3.14159265358979323846|4a|4|8" + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output1 = processor.transform("3.14159265358979323846|4a|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=1"); + // case2: "3.14159265358979323846|4|4|8" + List output2 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output2.get(0), "result=0"); + + transformSql = "select if(numeric3 != 4,1,0) from source"; + config = new TransformConfig(transformSql); + // case3: "3.14159265358979323846|4|4|8" + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output3 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output3.get(0), "result=0"); + // case4: "3.14159265358979323846|4|4.2|8" + List output4 = processor.transform("3.14159265358979323846|4|4.2|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output4.get(0), "result=1"); + } + + @Test + public void testGreaterThanEqualsOperator() throws Exception { + String transformSql = "select if(string2 >= 4,1,0) from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1: "3.14159265358979323846|3a|4|8" + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output1 = processor.transform("3.14159265358979323846|3a|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=0"); + // case2: "3.14159265358979323846|5|4|8" + List output2 = processor.transform("3.14159265358979323846|5|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output2.get(0), "result=1"); + + transformSql = "select if(numeric3 >= 4,1,0) from source"; + config = new TransformConfig(transformSql); + // case3: "3.14159265358979323846|4|4|8" + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output3 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output3.get(0), "result=1"); + // case4: "3.14159265358979323846|4|3.2|8" + List output4 = processor.transform("3.14159265358979323846|4|3.2|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output4.get(0), "result=0"); + } + + @Test + public void testGreaterThanOperator() throws Exception { + String transformSql = "select if(string2 > 4.1,1,0) from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1: "3.14159265358979323846|3a|4|8" + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output1 = processor.transform("3.14159265358979323846|3a|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=0"); + // case2: "3.14159265358979323846|5|4|8" + List output2 = processor.transform("3.14159265358979323846|5|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output2.get(0), "result=1"); + + transformSql = "select if(numeric3 > 4.1,1,0) from source"; + config = new TransformConfig(transformSql); + // case3: "3.14159265358979323846|4|4|8" + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output3 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output3.get(0), "result=0"); + // case4: "3.14159265358979323846|4|4.2|8" + List output4 = processor.transform("3.14159265358979323846|4|4.2|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output4.get(0), "result=1"); + } + + @Test + public void testMinorThanEqualsOperator() throws Exception { + String transformSql = "select if(string2 <= 4,1,0) from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1: "3.14159265358979323846|3a|4|8" + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output1 = processor.transform("3.14159265358979323846|3a|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=1"); + // case2: "3.14159265358979323846|5|4|8" + List output2 = processor.transform("3.14159265358979323846|5|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output2.get(0), "result=0"); + + transformSql = "select if(numeric3 <= 4,1,0) from source"; + config = new TransformConfig(transformSql); + // case3: "3.14159265358979323846|4|4|8" + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output3 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output3.get(0), "result=1"); + // case4: "3.14159265358979323846|4|4.2|8" + List output4 = processor.transform("3.14159265358979323846|4|4.2|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output4.get(0), "result=0"); + } + + @Test + public void testMinorThanOperator() throws Exception { + String transformSql = "select if(string2 < 4.1,1,0) from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1: "3.14159265358979323846|3a|4|8" + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output1 = processor.transform("3.14159265358979323846|3a|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=1"); + // case2: "3.14159265358979323846|5|4|8" + List output2 = processor.transform("3.14159265358979323846|5|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output2.get(0), "result=0"); + + transformSql = "select if(numeric3 < 4,1,0) from source"; + config = new TransformConfig(transformSql); + // case3: "3.14159265358979323846|4|4|8" + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output3 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output3.get(0), "result=0"); + // case4: "3.14159265358979323846|4|3.2|8" + List output4 = processor.transform("3.14159265358979323846|4|3.2|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output4.get(0), "result=1"); + } + + @Test + public void testNotOperator() throws Exception { + String transformSql = "select if(!(string2 < 4),1,0) from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1: "3.14159265358979323846|3a|4|8" + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output1 = processor.transform("3.14159265358979323846|3a|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=0"); + // case2: "3.14159265358979323846|5|4|8" + List output2 = processor.transform("3.14159265358979323846|5|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output2.get(0), "result=1"); + + transformSql = "select if(!(numeric3 < 3.9),1,0) from source"; + config = new TransformConfig(transformSql); + // case3: "3.14159265358979323846|4|4|8" + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output3 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output3.get(0), "result=1"); + // case4: "3.14159265358979323846|4|3.2|8" + List output4 = processor.transform("3.14159265358979323846|4|3.2|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output4.get(0), "result=0"); + } + + @Test + public void testOrOperator() throws Exception { + String transformSql = "select if((string2 < 4) or (numeric4 > 5),1,0) from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1: "3.14159265358979323846|3a|4|8" + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output1 = processor.transform("3.14159265358979323846|3a|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=1"); + // case2: "3.14159265358979323846|5|4|8" + List output2 = processor.transform("3.14159265358979323846|5|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output2.get(0), "result=1"); + // case3: "3.14159265358979323846|5|4|4" + List output3 = processor.transform("3.14159265358979323846|5|4|4"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output3.get(0), "result=0"); + + transformSql = "select if((numeric3 < 4) or (numeric4 > 5),1,0) from source"; + config = new TransformConfig(transformSql); + // case4: "3.14159265358979323846|4|4|8" + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output4 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output4.get(0), "result=1"); + // case5: "3.14159265358979323846|4|3.2|8" + List output5 = processor.transform("3.14159265358979323846|4|3.2|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output5.get(0), "result=1"); + // case6: "3.14159265358979323846|4|4.2|5" + List output6 = processor.transform("3.14159265358979323846|4|4.2|5"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output6.get(0), "result=0"); + } + + @Test + public void testAndOperator() throws Exception { + String transformSql = "select if((string2 < 4) and (numeric4 > 5),1,0) from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1: "3.14159265358979323846|3a|4|4" + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output1 = processor.transform("3.14159265358979323846|3a|4|4"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=0"); + // case2: "3.14159265358979323846|5|4|8" + List output2 = processor.transform("3.14159265358979323846|5|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output2.get(0), "result=0"); + // case3: "3.14159265358979323846|3|4|8" + List output3 = processor.transform("3.14159265358979323846|3|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output3.get(0), "result=1"); + + transformSql = "select if((numeric3 < 4) and (numeric4 > 5),1,0) from source"; + config = new TransformConfig(transformSql); + // case4: "3.14159265358979323846|4|4|8" + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output4 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output4.get(0), "result=0"); + // case5: "3.14159265358979323846|4|3.2|4" + List output5 = processor.transform("3.14159265358979323846|4|3.2|4"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output5.get(0), "result=0"); + // case6: "3.14159265358979323846|4|3.2|8" + List output6 = processor.transform("3.14159265358979323846|4|3.2|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output6.get(0), "result=1"); + } +} From 0a394f49ea026b2536ef0b2a90e3a9183e4a51de Mon Sep 17 00:00:00 2001 From: emptyOVO <118812562+emptyOVO@users.noreply.github.com> Date: Tue, 3 Sep 2024 10:25:35 +0800 Subject: [PATCH 05/11] [INLONG-10979][SDK] Transform support PI() function (#10983) --- .../process/function/PiFunction.java | 40 +++++++++++++++++++ ...TransformArithmeticFunctionsProcessor.java | 13 ++++++ 2 files changed, 53 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/PiFunction.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/PiFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/PiFunction.java new file mode 100644 index 00000000000..722955ebf51 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/PiFunction.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; +/** + * PiFunction + * returns the mathematical constant PI + */ +@TransformFunction(names = {"pi"}) +public class PiFunction implements ValueParser { + + public PiFunction(Function expr) { + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + return String.valueOf(Math.PI); + } + +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java index c36835e5792..4007a9a8760 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java @@ -728,4 +728,17 @@ public void testHexFunction() throws Exception { Assert.assertEquals(output5.get(0), "result=616263"); } + @Test + public void testPiFunction() throws Exception { + String transformSql1 = "select pi() from source"; + TransformConfig config1 = new TransformConfig(transformSql1); + TransformProcessor processor1 = TransformProcessor + .create(config1, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case: pi() + List output1 = processor1.transform("1007|4|6|8", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=3.141592653589793"); + } + } From 3e7f81d41c0e1ab8294d93b374e54515d0b7936b Mon Sep 17 00:00:00 2001 From: Xincheng Huang <60057611+ying-hua@users.noreply.github.com> Date: Tue, 3 Sep 2024 10:25:52 +0800 Subject: [PATCH 06/11] [INLONG-10971][SDK] Transform support INSERT function (#10972) --- .../process/function/InsertFunction.java | 101 ++++++++++++++++++ ...TestTransformStringFunctionsProcessor.java | 34 ++++++ 2 files changed, 135 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/InsertFunction.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/InsertFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/InsertFunction.java new file mode 100644 index 00000000000..3473bc9d793 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/InsertFunction.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.List; +/** + * InsertFunction + * + * Description: + * Returns a string where a specified substring is replaced by another string, starting at a given position and for a specified length. + * If the position is out of the string's bounds, the original string is returned. + * If the length exceeds the remaining length of the string from the given position, the replacement continues to the end of the string. + * If any argument is null, the function returns null. + * + * Arguments: + * - str: The original string. + * - pos: The position to start the replacement (1-based index). + * - len: The number of characters to replace. + * - newstr: The string to insert. + * + * Examples: + * - INSERT('12345678', 3, 4, 'word') = '12word78' + * - INSERT('12345678', -1, 4, 'word') = '12345678' + * - INSERT('12345678', 3, 100, 'word') = '12word' + */ +@TransformFunction(names = {"insert"}) +public class InsertFunction implements ValueParser { + + private ValueParser strParser; + private ValueParser posParser; + private ValueParser lenParser; + private ValueParser newStrParser; + + public InsertFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + strParser = OperatorTools.buildParser(expressions.get(0)); + posParser = OperatorTools.buildParser(expressions.get(1)); + lenParser = OperatorTools.buildParser(expressions.get(2)); + newStrParser = OperatorTools.buildParser(expressions.get(3)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object strObject = strParser.parse(sourceData, rowIndex, context); + Object posObject = posParser.parse(sourceData, rowIndex, context); + Object lenObject = lenParser.parse(sourceData, rowIndex, context); + Object newStrObject = newStrParser.parse(sourceData, rowIndex, context); + + if (strObject == null || posObject == null || lenObject == null || newStrObject == null) { + return null; + } + + String str = OperatorTools.parseString(strObject); + int pos = OperatorTools.parseBigDecimal(posObject).intValue(); + int len = OperatorTools.parseBigDecimal(lenObject).intValue(); + String newStr = OperatorTools.parseString(newStrObject); + + if (str == null || newStr == null) { + return null; + } + + if (pos < 1 || pos > str.length()) { + return str; + } + + int startIndex = pos - 1; + int endIndex = Math.min(startIndex + len, str.length()); + + StringBuilder result = new StringBuilder(); + result.append(str, 0, startIndex); + result.append(newStr); + if (endIndex < str.length()) { + result.append(str, endIndex, str.length()); + } + + return result.toString(); + } +} \ No newline at end of file diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java index 14511946070..a3099d09f2a 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java @@ -690,4 +690,38 @@ public void testTranslateFunction() throws Exception { Assert.assertEquals(output3.get(0), "result=Apache Inlong"); } + @Test + public void testInsertFunction() throws Exception { + String transformSql1 = "select insert(string1, numeric1, numeric2, string2) from source"; + TransformConfig config1 = new TransformConfig(transformSql1); + TransformProcessor processor1 = TransformProcessor + .create(config1, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: insert('12345678', 3, 4, 'word') -> '12word78' + List output1 = processor1.transform("12345678|word|cloud|3|4|0", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=12word78", output1.get(0)); + + // case2: insert('12345678', -1, 4, 'word') -> '12345678' + List output2 = processor1.transform("12345678|word|cloud|-1|4|0", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=12345678", output2.get(0)); + + // case3: insert('12345678', 3, 100, 'word') -> '12word' + List output3 = processor1.transform("12345678|word|cloud|3|100|0", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals("result=12word", output3.get(0)); + + // case4: insert('', 3, 4, 'word') -> '' + List output4 = processor1.transform("|word|cloud|3|4|0", new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals("result=", output4.get(0)); + + // case5: insert('12345678', 3, 4, '') -> '1278' + List output5 = processor1.transform("12345678||cloud|3|4|0", new HashMap<>()); + Assert.assertEquals(1, output5.size()); + Assert.assertEquals("result=1278", output5.get(0)); + } + } From cb4b61b5a889cbb035b51ad1dd48f6a02790573f Mon Sep 17 00:00:00 2001 From: Zkplo <87751516+Zkplo@users.noreply.github.com> Date: Tue, 3 Sep 2024 10:26:19 +0800 Subject: [PATCH 07/11] [INLONG-10965][SDK] Transform support Bitwise operation (#10970) Co-authored-by: ZKpLo <14148880+zkplo@user.noreply.gitee.com> --- .../process/parser/BitwiseAndParser.java | 61 ++++++ .../parser/BitwiseLeftShiftParser.java | 67 +++++++ .../process/parser/BitwiseOrParser.java | 62 +++++++ .../parser/BitwiseRightShiftParser.java | 66 +++++++ .../process/parser/BitwiseXorParser.java | 62 +++++++ .../transform/process/parser/SignParser.java | 26 ++- ...TransformArithmeticFunctionsProcessor.java | 175 ++++++++++++++++++ 7 files changed, 513 insertions(+), 6 deletions(-) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/BitwiseAndParser.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/BitwiseLeftShiftParser.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/BitwiseOrParser.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/BitwiseRightShiftParser.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/BitwiseXorParser.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/BitwiseAndParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/BitwiseAndParser.java new file mode 100644 index 00000000000..b8920434ef2 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/BitwiseAndParser.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; + +import lombok.extern.slf4j.Slf4j; +import net.sf.jsqlparser.expression.operators.arithmetic.BitwiseAnd; + +import java.math.BigInteger; + +/** + * BitwiseAndParser + */ +@Slf4j +@TransformParser(values = BitwiseAnd.class) +public class BitwiseAndParser implements ValueParser { + + private final ValueParser left; + + private final ValueParser right; + + public BitwiseAndParser(BitwiseAnd expr) { + this.left = OperatorTools.buildParser(expr.getLeftExpression()); + this.right = OperatorTools.buildParser(expr.getRightExpression()); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + try { + Object leftObj = this.left.parse(sourceData, rowIndex, context); + Object rightObj = this.right.parse(sourceData, rowIndex, context); + if (leftObj == null || rightObj == null) { + return null; + } + BigInteger leftValue = OperatorTools.parseBigDecimal(leftObj).toBigInteger(); + BigInteger rightValue = OperatorTools.parseBigDecimal(rightObj).toBigInteger(); + return Long.toUnsignedString(leftValue.and(rightValue).longValue()); + } catch (Exception e) { + log.error("Value parsing failed", e); + return null; + } + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/BitwiseLeftShiftParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/BitwiseLeftShiftParser.java new file mode 100644 index 00000000000..c68bc89e543 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/BitwiseLeftShiftParser.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; + +import lombok.extern.slf4j.Slf4j; +import net.sf.jsqlparser.expression.operators.arithmetic.BitwiseLeftShift; + +import java.math.BigInteger; + +/** + * BitwiseLeftShiftParser + * + */ +@Slf4j +@TransformParser(values = BitwiseLeftShift.class) +public class BitwiseLeftShiftParser implements ValueParser { + + private final ValueParser left; + + private final ValueParser right; + + public BitwiseLeftShiftParser(BitwiseLeftShift expr) { + this.left = OperatorTools.buildParser(expr.getLeftExpression()); + this.right = OperatorTools.buildParser(expr.getRightExpression()); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + try { + Object leftObj = this.left.parse(sourceData, rowIndex, context); + Object rightObj = this.right.parse(sourceData, rowIndex, context); + if (leftObj == null || rightObj == null) { + return null; + } + BigInteger leftValue = OperatorTools.parseBigDecimal(leftObj).toBigInteger(); + String unsignedRight = Long.toUnsignedString(OperatorTools.parseBigDecimal(rightObj).longValue()); + int cmp = new BigInteger(unsignedRight).compareTo(new BigInteger("65")); + if (cmp >= 0) { + return Long.toUnsignedString(leftValue.shiftLeft(65).longValue()); + } else { + return Long.toUnsignedString(leftValue.shiftLeft(Integer.parseInt(unsignedRight)).longValue()); + } + } catch (Exception e) { + log.error("Value parsing failed", e); + return null; + } + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/BitwiseOrParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/BitwiseOrParser.java new file mode 100644 index 00000000000..2cbf5e9d09c --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/BitwiseOrParser.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; + +import lombok.extern.slf4j.Slf4j; +import net.sf.jsqlparser.expression.operators.arithmetic.BitwiseOr; + +import java.math.BigInteger; + +/** + * BitwiseOrParser + * + */ +@Slf4j +@TransformParser(values = BitwiseOr.class) +public class BitwiseOrParser implements ValueParser { + + private final ValueParser left; + + private final ValueParser right; + + public BitwiseOrParser(BitwiseOr expr) { + this.left = OperatorTools.buildParser(expr.getLeftExpression()); + this.right = OperatorTools.buildParser(expr.getRightExpression()); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + try { + Object leftObj = this.left.parse(sourceData, rowIndex, context); + Object rightObj = this.right.parse(sourceData, rowIndex, context); + if (leftObj == null || rightObj == null) { + return null; + } + BigInteger leftValue = OperatorTools.parseBigDecimal(leftObj).toBigInteger(); + BigInteger rightValue = OperatorTools.parseBigDecimal(rightObj).toBigInteger(); + return Long.toUnsignedString(leftValue.or(rightValue).longValue()); + } catch (Exception e) { + log.error("Value parsing failed", e); + return null; + } + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/BitwiseRightShiftParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/BitwiseRightShiftParser.java new file mode 100644 index 00000000000..7b6085725b8 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/BitwiseRightShiftParser.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; + +import lombok.extern.slf4j.Slf4j; +import net.sf.jsqlparser.expression.operators.arithmetic.BitwiseRightShift; + +import java.math.BigInteger; + +/** + * BitwiseRightShiftParser + */ +@Slf4j +@TransformParser(values = BitwiseRightShift.class) +public class BitwiseRightShiftParser implements ValueParser { + + private final ValueParser left; + + private final ValueParser right; + + public BitwiseRightShiftParser(BitwiseRightShift expr) { + this.left = OperatorTools.buildParser(expr.getLeftExpression()); + this.right = OperatorTools.buildParser(expr.getRightExpression()); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + try { + Object leftObj = this.left.parse(sourceData, rowIndex, context); + Object rightObj = this.right.parse(sourceData, rowIndex, context); + if (leftObj == null || rightObj == null) { + return null; + } + BigInteger leftValue = OperatorTools.parseBigDecimal(leftObj).toBigInteger(); + String unsignedRight = Long.toUnsignedString(OperatorTools.parseBigDecimal(rightObj).longValue()); + int cmp = new BigInteger(unsignedRight).compareTo(new BigInteger("65")); + if (cmp >= 0) { + return Long.toUnsignedString(leftValue.shiftRight(65).longValue()); + } else { + return Long.toUnsignedString(leftValue.shiftRight(Integer.parseInt(unsignedRight)).longValue()); + } + } catch (Exception e) { + log.error("Value parsing failed", e); + return null; + } + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/BitwiseXorParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/BitwiseXorParser.java new file mode 100644 index 00000000000..3277bd91edc --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/BitwiseXorParser.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; + +import lombok.extern.slf4j.Slf4j; +import net.sf.jsqlparser.expression.operators.arithmetic.BitwiseXor; + +import java.math.BigInteger; + +/** + * BitwiseXorParser + * + */ +@Slf4j +@TransformParser(values = BitwiseXor.class) +public class BitwiseXorParser implements ValueParser { + + private final ValueParser left; + + private final ValueParser right; + + public BitwiseXorParser(BitwiseXor expr) { + this.left = OperatorTools.buildParser(expr.getLeftExpression()); + this.right = OperatorTools.buildParser(expr.getRightExpression()); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + try { + Object leftObj = this.left.parse(sourceData, rowIndex, context); + Object rightObj = this.right.parse(sourceData, rowIndex, context); + if (leftObj == null || rightObj == null) { + return null; + } + BigInteger leftValue = OperatorTools.parseBigDecimal(leftObj).toBigInteger(); + BigInteger rightValue = OperatorTools.parseBigDecimal(rightObj).toBigInteger(); + return Long.toUnsignedString(leftValue.xor(rightValue).longValue()); + } catch (Exception e) { + log.error("Value parsing failed", e); + return null; + } + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SignParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SignParser.java index ff97aadfdb2..7a744f90154 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SignParser.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SignParser.java @@ -21,29 +21,43 @@ import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import lombok.extern.slf4j.Slf4j; import net.sf.jsqlparser.expression.SignedExpression; import java.math.BigDecimal; /** * SignParser - * */ +@Slf4j @TransformParser(values = SignedExpression.class) public class SignParser implements ValueParser { - private final Integer sign; + private final char sign; private final ValueParser number; public SignParser(SignedExpression expr) { - sign = expr.getSign() == '-' ? -1 : 1; + sign = expr.getSign(); number = OperatorTools.buildParser(expr.getExpression()); } @Override public Object parse(SourceData sourceData, int rowIndex, Context context) { - Object numberObject = number.parse(sourceData, rowIndex, context); - BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObject); - return numberValue.multiply(new BigDecimal(sign)); + try { + Object numberObject = number.parse(sourceData, rowIndex, context); + if (numberObject == null) { + return null; + } + BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObject); + switch (sign) { + case '-': + return numberValue.multiply(new BigDecimal(-1)); + case '~': + return Long.toUnsignedString(numberValue.toBigInteger().not().longValue()); + } + } catch (Exception e) { + log.error("Value parsing failed", e); + } + return null; } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java index 4007a9a8760..dcdb2d61d45 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java @@ -320,6 +320,181 @@ public void testMd5Function() throws Exception { Assert.assertEquals("result=null", output4.get(0)); } + @Test + public void testBitwiseInversionOperator() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + // case1: ~-4 + transformSql = "select ~numeric1 from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "-4|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=3", output.get(0)); + + // case2: ~4 + data = "4|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=18446744073709551611", output.get(0)); + + // case3: ~0 + data = "0|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=18446744073709551615", output.get(0)); + + // case4: ~~-4 + transformSql = "select ~(~numeric1) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "-4|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=18446744073709551612", output.get(0)); + + } + @Test + public void testBitwiseAndOperator() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + // case1: 18446744073709551615 & -1 + transformSql = "select numeric1 & numeric2 from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "18446744073709551615|-1|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=18446744073709551615", output.get(0)); + + // case2: 18446744073709551615 & 0 + data = "18446744073709551615|0|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=0", output.get(0)); + } + @Test + public void testBitwiseOrOperator() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + // case1: 18446744073709551615 | -1 + transformSql = "select numeric1 | numeric2 from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "18446744073709551615|-1|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=18446744073709551615", output.get(0)); + + // case2: 4 | 3 + data = "4|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=7", output.get(0)); + } + @Test + public void testBitwiseRightShiftOperator() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + // case1: 4 >> -1 + transformSql = "select numeric1 >> numeric2 from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "4|-1|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=0", output.get(0)); + + // case2: 9223372036854775808 >> 2 + data = "9223372036854775808|2|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=2305843009213693952", output.get(0)); + + // case3: 9223372036854775808 >> 9223372036854775808 + data = "9223372036854775808|9223372036854775808|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=0", output.get(0)); + } + @Test + public void testBitwiseLeftShiftOperator() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + // case1: 9223372036854775807 << 1 + transformSql = "select numeric1 << numeric2 from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "9223372036854775807|1|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=18446744073709551614", output.get(0)); + + // case2: 18446744073709551615 << 18446744073709551615 + data = "18446744073709551615|18446744073709551615|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=0", output.get(0)); + + // case3: 9223372036854775807 << -1 + data = "9223372036854775807|-1|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=0", output.get(0)); + } + @Test + public void testBitwiseXorOperator() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + // case1: 4 ^ 3 + transformSql = "select numeric1 ^ numeric2 from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "4|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=7", output.get(0)); + + // case2: 4 ^ -1 + data = "4|-1|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=18446744073709551611", output.get(0)); + } + @Test public void testRoundFunction() throws Exception { String transformSql = "select round(numeric1) from source"; From f1285ed4b1c9ec1ff16942c6324f6fad46c6898c Mon Sep 17 00:00:00 2001 From: emptyOVO <118812562+emptyOVO@users.noreply.github.com> Date: Tue, 3 Sep 2024 10:33:10 +0800 Subject: [PATCH 08/11] [INLONG-10968][SDK] Support Inlong Transform operator annotation (#10969) --- .../process/operator/AndOperator.java | 1 + .../process/operator/EqualsToOperator.java | 1 + .../operator/GreaterThanEqualsOperator.java | 1 + .../process/operator/GreaterThanOperator.java | 1 + .../operator/MinorThanEqualsOperator.java | 1 + .../process/operator/MinorThanOperator.java | 1 + .../process/operator/NotEqualsToOperator.java | 1 + .../process/operator/NotOperator.java | 1 + .../process/operator/OperatorTools.java | 89 ++++++++++++------- .../process/operator/OrOperator.java | 1 + .../process/operator/ParenthesisOperator.java | 1 + .../process/operator/TransformOperator.java | 31 +++++++ 12 files changed, 100 insertions(+), 30 deletions(-) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/TransformOperator.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java index a9dcd426062..f438d4295cd 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java @@ -26,6 +26,7 @@ * AndOperator * */ +@TransformOperator(values = AndExpression.class) public class AndOperator implements ExpressionOperator { private final ExpressionOperator left; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java index 709537e8a01..13010c854bf 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java @@ -27,6 +27,7 @@ * EqualsToOperator * */ +@TransformOperator(values = EqualsTo.class) public class EqualsToOperator implements ExpressionOperator { private final ValueParser left; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java index 3a53968e10e..e703afdbda7 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java @@ -27,6 +27,7 @@ * GreaterThanEqualsOperator * */ +@TransformOperator(values = GreaterThanEquals.class) public class GreaterThanEqualsOperator implements ExpressionOperator { private final ValueParser left; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java index a1cd8c2ea20..ba73ef4c4df 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java @@ -27,6 +27,7 @@ * GreaterThanOperator * */ +@TransformOperator(values = GreaterThan.class) public class GreaterThanOperator implements ExpressionOperator { private final ValueParser left; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java index 4248cf1d366..e8104e2cc3b 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java @@ -27,6 +27,7 @@ * MinorThanEqualsOperator * */ +@TransformOperator(values = MinorThanEquals.class) public class MinorThanEqualsOperator implements ExpressionOperator { private final ValueParser left; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java index 21ecc0400a3..ecae167c5c4 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java @@ -27,6 +27,7 @@ * MinorThanOperator * */ +@TransformOperator(values = MinorThan.class) public class MinorThanOperator implements ExpressionOperator { private final ValueParser left; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java index 98bf102b4f4..d3fec5c8c2c 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java @@ -27,6 +27,7 @@ * NotEqualsToOperator * */ +@TransformOperator(values = NotEqualsTo.class) public class NotEqualsToOperator implements ExpressionOperator { private final ValueParser left; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java index d8b9ff07e06..306177ffbab 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java @@ -26,6 +26,7 @@ * NotOperator * */ +@TransformOperator(values = NotExpression.class) public class NotOperator implements ExpressionOperator { private final ExpressionOperator node; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java index bb35bb4490f..9982b374189 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java @@ -22,54 +22,83 @@ import org.apache.inlong.sdk.transform.process.parser.ParserTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; import net.sf.jsqlparser.expression.Expression; import net.sf.jsqlparser.expression.Function; -import net.sf.jsqlparser.expression.NotExpression; -import net.sf.jsqlparser.expression.Parenthesis; -import net.sf.jsqlparser.expression.operators.conditional.AndExpression; -import net.sf.jsqlparser.expression.operators.conditional.OrExpression; -import net.sf.jsqlparser.expression.operators.relational.EqualsTo; -import net.sf.jsqlparser.expression.operators.relational.GreaterThan; -import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals; -import net.sf.jsqlparser.expression.operators.relational.MinorThan; -import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals; -import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo; import org.apache.commons.lang.ObjectUtils; +import org.reflections.Reflections; +import org.reflections.scanners.Scanners; +import java.lang.reflect.Constructor; import java.math.BigDecimal; import java.sql.Date; import java.sql.Timestamp; +import java.util.Map; +import java.util.Set; /** * OperatorTools * */ +@Slf4j public class OperatorTools { + private static final String OPERATOR_PATH = "org.apache.inlong.sdk.transform.process.operator"; + + private final static Map, Class> operatorMap = Maps.newConcurrentMap(); + public static final String ROOT_KEY = "$root"; public static final String CHILD_KEY = "$child"; + + static { + init(); + } + + private static void init() { + Reflections reflections = new Reflections(OPERATOR_PATH, Scanners.TypesAnnotated); + Set> clazzSet = reflections.getTypesAnnotatedWith(TransformOperator.class); + for (Class clazz : clazzSet) { + if (ExpressionOperator.class.isAssignableFrom(clazz)) { + TransformOperator annotation = clazz.getAnnotation(TransformOperator.class); + if (annotation == null) { + continue; + } + Class[] values = annotation.values(); + for (Class value : values) { + operatorMap.compute(value, (key, former) -> { + if (former != null) { + log.warn("find a conflict for parser class [{}], the former one is [{}], new one is [{}]", + key, former.getName(), clazz.getName()); + } + return clazz; + }); + } + } + } + } + + public static ExpressionOperator getTransformOperator(Expression expr) { + Class clazz = operatorMap.get(expr.getClass()); + if (clazz == null) { + return null; + } + try { + Constructor constructor = clazz.getDeclaredConstructor(expr.getClass()); + return (ExpressionOperator) constructor.newInstance(expr); + } catch (NoSuchMethodException e) { + log.error("transform operator {} needs one constructor that accept one params whose type is {}", + clazz.getName(), expr.getClass().getName(), e); + throw new RuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + public static ExpressionOperator buildOperator(Expression expr) { - if (expr instanceof AndExpression) { - return new AndOperator((AndExpression) expr); - } else if (expr instanceof OrExpression) { - return new OrOperator((OrExpression) expr); - } else if (expr instanceof Parenthesis) { - return new ParenthesisOperator((Parenthesis) expr); - } else if (expr instanceof NotExpression) { - return new NotOperator((NotExpression) expr); - } else if (expr instanceof EqualsTo) { - return new EqualsToOperator((EqualsTo) expr); - } else if (expr instanceof NotEqualsTo) { - return new NotEqualsToOperator((NotEqualsTo) expr); - } else if (expr instanceof GreaterThan) { - return new GreaterThanOperator((GreaterThan) expr); - } else if (expr instanceof GreaterThanEquals) { - return new GreaterThanEqualsOperator((GreaterThanEquals) expr); - } else if (expr instanceof MinorThan) { - return new MinorThanOperator((MinorThan) expr); - } else if (expr instanceof MinorThanEquals) { - return new MinorThanEqualsOperator((MinorThanEquals) expr); + if (expr != null) { + return getTransformOperator(expr); } return null; } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java index b5de7f279ed..9efccba3f22 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java @@ -26,6 +26,7 @@ * OrOperator * */ +@TransformOperator(values = OrExpression.class) public class OrOperator implements ExpressionOperator { private final ExpressionOperator left; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java index 0ca1334fcee..de71c84b4d5 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java @@ -26,6 +26,7 @@ * ParenthesisOperator * */ +@TransformOperator(values = Parenthesis.class) public class ParenthesisOperator implements ExpressionOperator { private final ExpressionOperator node; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/TransformOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/TransformOperator.java new file mode 100644 index 00000000000..ee9c676665e --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/TransformOperator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.operator; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target(TYPE) +public @interface TransformOperator { + + Class[] values(); +} From e394d740769730f6960e7022e0bc8f4dee1e2e9b Mon Sep 17 00:00:00 2001 From: Zkplo <87751516+Zkplo@users.noreply.github.com> Date: Tue, 3 Sep 2024 10:39:27 +0800 Subject: [PATCH 09/11] [INLONG-10956][SDK] Transform SQL supports SHA encryption algorithm (#10996) Co-authored-by: ZKpLo <14148880+zkplo@user.noreply.gitee.com> --- .../process/function/Sha2Function.java | 75 ++++++++++++++++++ .../process/function/ShaFunction.java | 54 +++++++++++++ ...TransformArithmeticFunctionsProcessor.java | 78 +++++++++++++++++++ 3 files changed, 207 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Sha2Function.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ShaFunction.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Sha2Function.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Sha2Function.java new file mode 100644 index 00000000000..ad120712bfd --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Sha2Function.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; +import org.apache.commons.codec.digest.DigestUtils; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static org.apache.commons.codec.digest.MessageDigestAlgorithms.SHA_224; + +/** + * Sha2Function + * description: SHA2(str, hash_length): Calculates the SHA-2 family of hash functions (SHA-224, SHA-256, SHA-384, and SHA-512) + * return NULL If either argument is NULL or the hash length(224 256 384 512) is not one of the permitted values + * return a hash value containing the desired number of bits. + */ +@TransformFunction(names = {"sha2"}) +public class Sha2Function implements ValueParser { + + private final ValueParser msgParser; + private final ValueParser lenParser; + + public Sha2Function(Function expr) { + List expressions = expr.getParameters().getExpressions(); + msgParser = OperatorTools.buildParser(expressions.get(0)); + lenParser = OperatorTools.buildParser(expressions.get(1)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object msgObj = msgParser.parse(sourceData, rowIndex, context); + Object lenObj = lenParser.parse(sourceData, rowIndex, context); + if (msgObj == null || lenObj == null) { + return null; + } + String msg = msgObj.toString(); + int len = Integer.parseInt(lenObj.toString()); + switch (len) { + case 0: + case 256: + return DigestUtils.sha256Hex(msg.getBytes(StandardCharsets.UTF_8)); + case 224: + return new DigestUtils(SHA_224).digestAsHex(msg.getBytes(StandardCharsets.UTF_8)); + case 384: + return DigestUtils.sha384Hex(msg.getBytes(StandardCharsets.UTF_8)); + case 512: + return DigestUtils.sha512Hex(msg.getBytes(StandardCharsets.UTF_8)); + default: + return null; + } + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ShaFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ShaFunction.java new file mode 100644 index 00000000000..7da5e6c3a9d --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ShaFunction.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; +import org.apache.commons.codec.digest.DigestUtils; + +import java.nio.charset.StandardCharsets; + +/** + * ShaFunction + * description: sha(string): Compute the SHA-1 160 bit checksum of a string. + * return NULL if the parameter is NULL + * return a string of 40 hexadecimal digits. + */ +@TransformFunction(names = {"sha"}) +public class ShaFunction implements ValueParser { + + private final ValueParser msgParser; + + public ShaFunction(Function expr) { + msgParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object msgObj = msgParser.parse(sourceData, rowIndex, context); + if (msgObj == null) { + return null; + } + String msg = msgObj.toString(); + return DigestUtils.sha1Hex(msg.getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java index dcdb2d61d45..738ef2e0f6d 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java @@ -287,6 +287,84 @@ public void testIfNullFunction() throws Exception { Assert.assertEquals("result=null", output.get(0)); } + @Test + public void testShaFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + // case1: sha("") + transformSql = "select sha(numeric1) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "|3|3|5"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=da39a3ee5e6b4b0d3255bfef95601890afd80709", output.get(0)); + + // case2: sha("5") + data = "5|3|3|5"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=ac3478d69a3c81fa62e60f5c3696165a4e5e6ac4", output.get(0)); + + // case3: sha(null) + transformSql = "select sha(xxd) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "3|3|3|5"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + } + + @Test + public void testSha2Function() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + // case1: sha2("",3) + transformSql = "select sha2(numeric1,numeric2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "|3|3|5"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + + // case2: sha2("5",224) + data = "5|224|3|5"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=b51d18b551043c1f145f22dbde6f8531faeaf68c54ed9dd79ce24d17", output.get(0)); + + // case3: sha2("5",0) + data = "5|0|3|5"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=ef2d127de37b942baad06145e54b0c619a1f22327b2ebbcfbec78f5564afe39d", output.get(0)); + + // case4: sha2(null,224) + transformSql = "select sha2(xxd,224) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + data = "3|224|3|5"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=null", output.get(0)); + } + @Test public void testMd5Function() throws Exception { String transformSql = "select md5(numeric1) from source"; From e1884106681a4062555d413f6f2cdc24ef741229 Mon Sep 17 00:00:00 2001 From: fuweng11 <76141879+fuweng11@users.noreply.github.com> Date: Tue, 3 Sep 2024 14:07:56 +0800 Subject: [PATCH 10/11] [INLONG-10997][Manager] Incorrect setting of transformSQL in dataflowconfig (#10998) --- .../manager/service/resource/sort/DefaultSortConfigOperator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java index 116a8f0ea21..17ca4b9bad2 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java @@ -175,6 +175,7 @@ private DataFlowConfig getDataFlowConfig(InlongGroupInfo groupInfo, InlongStream .dataflowId(String.valueOf(sink.getId())) .sourceConfig(getSourceConfig(groupInfo, streamInfo, sink)) .auditTag(String.valueOf(sink.getId())) + .transformSql(sink.getTransformSql()) .sinkConfig(getSinkConfig(groupInfo, streamInfo, sink)) .inlongGroupId(groupInfo.getInlongGroupId()) .inlongStreamId(streamInfo.getInlongStreamId()) From edf93bd547a09df6b1d7a6d57ea3d719d9f63f4f Mon Sep 17 00:00:00 2001 From: ChunLiang Lu Date: Tue, 3 Sep 2024 21:49:46 +0800 Subject: [PATCH 11/11] [INLONG-10999][SDK] Support to return raw data by star sign in transformer SQL (#11004) * [INLONG-10999][SDK] Support to return raw data by star sign in transformer SQL * add more UT Case * fix code format problems * fix pom.xml problem --- .../sdk/transform/encode/CsvSinkEncoder.java | 6 +- .../sdk/transform/encode/KvSinkEncoder.java | 6 +- .../sdk/transform/encode/SinkEncoder.java | 2 + .../transform/process/TransformProcessor.java | 34 ++++++--- .../transform/process/ValueParserNode.java | 34 +++++++++ .../process/TestTransformProcessor.java | 76 +++++++++++++++++++ 6 files changed, 146 insertions(+), 12 deletions(-) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/ValueParserNode.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java index ce47a0072c7..89f6f364a08 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java @@ -66,7 +66,11 @@ public String encode(SinkData sinkData, Context context) { } else { for (String fieldName : sinkData.keyList()) { String fieldValue = sinkData.getField(fieldName); - EscapeUtils.escapeContent(builder, delimiter, escapeChar, fieldValue); + if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) { + builder.append(fieldValue); + } else { + EscapeUtils.escapeContent(builder, delimiter, escapeChar, fieldValue); + } builder.append(delimiter); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java index 7460ec95c29..2822374c412 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java @@ -63,7 +63,11 @@ public String encode(SinkData sinkData, Context context) { if (fields == null || fields.size() == 0) { for (String fieldName : sinkData.keyList()) { String fieldValue = sinkData.getField(fieldName); - builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter); + if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) { + builder.append(fieldValue).append(entryDelimiter); + } else { + builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter); + } } } else { for (FieldInfo field : fields) { diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java index 7f845a99d61..a63f9702956 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java @@ -27,6 +27,8 @@ */ public interface SinkEncoder { + public static final String ALL_SOURCE_FIELD_SIGN = "*"; + Output encode(SinkData sinkData, Context context); List getFields(); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java index 9944268dda6..acb7e62e070 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java @@ -31,23 +31,23 @@ import com.google.common.collect.ImmutableMap; import net.sf.jsqlparser.JSQLParserException; import net.sf.jsqlparser.parser.CCJSqlParserManager; +import net.sf.jsqlparser.statement.select.AllColumns; import net.sf.jsqlparser.statement.select.PlainSelect; import net.sf.jsqlparser.statement.select.Select; import net.sf.jsqlparser.statement.select.SelectExpressionItem; import net.sf.jsqlparser.statement.select.SelectItem; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.StringReader; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; /** * TransformProcessor - * + * */ public class TransformProcessor { @@ -61,7 +61,9 @@ public class TransformProcessor { private PlainSelect transformSelect; private ExpressionOperator where; - private Map selectItemMap; + private List selectItems; + + private boolean includeAllSourceFields = false; public static TransformProcessor create( TransformConfig config, @@ -91,7 +93,7 @@ private void initTransformSql() throws JSQLParserException { this.transformSelect = (PlainSelect) select.getSelectBody(); this.where = OperatorTools.buildOperator(this.transformSelect.getWhere()); List items = this.transformSelect.getSelectItems(); - this.selectItemMap = new HashMap<>(items.size()); + this.selectItems = new ArrayList<>(items.size()); List fields = this.encoder.getFields(); for (int i = 0; i < items.size(); i++) { SelectItem item = items.get(i); @@ -108,8 +110,12 @@ private void initTransformSql() throws JSQLParserException { fieldName = exprItem.getAlias().getName(); } } - this.selectItemMap.put(fieldName, - OperatorTools.buildParser(exprItem.getExpression())); + this.selectItems + .add(new ValueParserNode(fieldName, OperatorTools.buildParser(exprItem.getExpression()))); + } else if (item instanceof AllColumns) { + fieldName = item.toString(); + this.encoder.getFields().clear(); + this.selectItems.add(new ValueParserNode(fieldName, null)); } } } @@ -137,10 +143,18 @@ public List transform(I input, Map extParams) { // parse value SinkData sinkData = new DefaultSinkData(); - for (Entry entry : this.selectItemMap.entrySet()) { - String fieldName = entry.getKey(); + for (ValueParserNode node : this.selectItems) { + String fieldName = node.getFieldName(); + ValueParser parser = node.getParser(); + if (parser == null && StringUtils.equals(fieldName, SinkEncoder.ALL_SOURCE_FIELD_SIGN)) { + if (input instanceof String) { + sinkData.addField(fieldName, (String) input); + } else { + sinkData.addField(fieldName, ""); + } + continue; + } try { - ValueParser parser = entry.getValue(); Object fieldValue = parser.parse(sourceData, i, context); sinkData.addField(fieldName, String.valueOf(fieldValue)); } catch (Throwable t) { diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/ValueParserNode.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/ValueParserNode.java new file mode 100644 index 00000000000..e36c0c9c6a9 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/ValueParserNode.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process; + +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * ValueParserNode + */ +@AllArgsConstructor +@Data +public class ValueParserNode { + + private String fieldName; + private ValueParser parser; +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java index 3413f1aca3e..8448260252d 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java @@ -350,4 +350,80 @@ public void testPb2CsvForNow() throws Exception { List output = processor.transform(srcBytes, new HashMap<>()); Assert.assertEquals(2, output.size()); } + @Test + public void testCsv2Star() throws Exception { + List fields = this.getTestFieldList("ftime", "extinfo"); + CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", '|', '\\', fields); + CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', new ArrayList<>()); + String transformSql = "select *"; + TransformConfig config = new TransformConfig(transformSql); + // case1 + TransformProcessor processor1 = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); + + List output1 = processor1.transform("2024-04-28 00:00:00|ok", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok"); + // case2 + config.setTransformSql("select * from source where extinfo!='ok'"); + TransformProcessor processor2 = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); + + List output2 = processor2.transform("2024-04-28 00:00:00|ok", new HashMap<>()); + Assert.assertEquals(0, output2.size()); + // case3 + config.setTransformSql("select *,extinfo,ftime from source where extinfo!='ok'"); + TransformProcessor processor3 = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); + + List output3 = processor3.transform("2024-04-28 00:00:00|nok", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "2024-04-28 00:00:00|nok|nok|2024-04-28 00:00:00"); + // case4 + CsvSourceInfo csvSourceNoField = new CsvSourceInfo("UTF-8", '|', '\\', new ArrayList<>()); + CsvSinkInfo csvSinkNoField = new CsvSinkInfo("UTF-8", '|', '\\', new ArrayList<>()); + config.setTransformSql("select *,$2,$1 from source where $2='nok'"); + TransformProcessor processor4 = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSourceNoField), + SinkEncoderFactory.createCsvEncoder(csvSinkNoField)); + + List output4 = processor4.transform("2024-04-28 00:00:00|nok", new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals(output4.get(0), "2024-04-28 00:00:00|nok|nok|2024-04-28 00:00:00"); + } + + @Test + public void testKv2Star() throws Exception { + List fields = this.getTestFieldList("ftime", "extinfo"); + KvSourceInfo kvSource = new KvSourceInfo("UTF-8", fields); + KvSinkInfo kvSink = new KvSinkInfo("UTF-8", new ArrayList<>()); + String transformSql = "select *"; + TransformConfig config = new TransformConfig(transformSql); + // case1 + TransformProcessor processor1 = TransformProcessor + .create(config, SourceDecoderFactory.createKvDecoder(kvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output1 = processor1.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "ftime=2024-04-28 00:00:00&extinfo=ok"); + // case2 + config.setTransformSql("select * from source where extinfo!='ok'"); + TransformProcessor processor2 = TransformProcessor + .create(config, SourceDecoderFactory.createKvDecoder(kvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output2 = processor2.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); + Assert.assertEquals(0, output2.size()); + // case3 + config.setTransformSql("select *,extinfo e1,ftime f1 from source where extinfo!='ok'"); + TransformProcessor processor3 = TransformProcessor + .create(config, SourceDecoderFactory.createKvDecoder(kvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + List output3 = processor3.transform("ftime=2024-04-28 00:00:00&extinfo=nok", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "ftime=2024-04-28 00:00:00&extinfo=nok&e1=nok&f1=2024-04-28 00:00:00"); + } }