From edf93bd547a09df6b1d7a6d57ea3d719d9f63f4f Mon Sep 17 00:00:00 2001 From: ChunLiang Lu Date: Tue, 3 Sep 2024 21:49:46 +0800 Subject: [PATCH] [INLONG-10999][SDK] Support to return raw data by star sign in transformer SQL (#11004) * [INLONG-10999][SDK] Support to return raw data by star sign in transformer SQL * add more UT Case * fix code format problems * fix pom.xml problem --- .../sdk/transform/encode/CsvSinkEncoder.java | 6 +- .../sdk/transform/encode/KvSinkEncoder.java | 6 +- .../sdk/transform/encode/SinkEncoder.java | 2 + .../transform/process/TransformProcessor.java | 34 ++++++--- .../transform/process/ValueParserNode.java | 34 +++++++++ .../process/TestTransformProcessor.java | 76 +++++++++++++++++++ 6 files changed, 146 insertions(+), 12 deletions(-) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/ValueParserNode.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java index ce47a0072c7..89f6f364a08 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java @@ -66,7 +66,11 @@ public String encode(SinkData sinkData, Context context) { } else { for (String fieldName : sinkData.keyList()) { String fieldValue = sinkData.getField(fieldName); - EscapeUtils.escapeContent(builder, delimiter, escapeChar, fieldValue); + if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) { + builder.append(fieldValue); + } else { + EscapeUtils.escapeContent(builder, delimiter, escapeChar, fieldValue); + } builder.append(delimiter); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java index 7460ec95c29..2822374c412 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java @@ -63,7 +63,11 @@ public String encode(SinkData sinkData, Context context) { if (fields == null || fields.size() == 0) { for (String fieldName : sinkData.keyList()) { String fieldValue = sinkData.getField(fieldName); - builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter); + if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) { + builder.append(fieldValue).append(entryDelimiter); + } else { + builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter); + } } } else { for (FieldInfo field : fields) { diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java index 7f845a99d61..a63f9702956 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java @@ -27,6 +27,8 @@ */ public interface SinkEncoder { + public static final String ALL_SOURCE_FIELD_SIGN = "*"; + Output encode(SinkData sinkData, Context context); List getFields(); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java index 9944268dda6..acb7e62e070 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java @@ -31,23 +31,23 @@ import com.google.common.collect.ImmutableMap; import net.sf.jsqlparser.JSQLParserException; import net.sf.jsqlparser.parser.CCJSqlParserManager; +import net.sf.jsqlparser.statement.select.AllColumns; import net.sf.jsqlparser.statement.select.PlainSelect; import net.sf.jsqlparser.statement.select.Select; import net.sf.jsqlparser.statement.select.SelectExpressionItem; import net.sf.jsqlparser.statement.select.SelectItem; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.StringReader; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; /** * TransformProcessor - * + * */ public class TransformProcessor { @@ -61,7 +61,9 @@ public class TransformProcessor { private PlainSelect transformSelect; private ExpressionOperator where; - private Map selectItemMap; + private List selectItems; + + private boolean includeAllSourceFields = false; public static TransformProcessor create( TransformConfig config, @@ -91,7 +93,7 @@ private void initTransformSql() throws JSQLParserException { this.transformSelect = (PlainSelect) select.getSelectBody(); this.where = OperatorTools.buildOperator(this.transformSelect.getWhere()); List items = this.transformSelect.getSelectItems(); - this.selectItemMap = new HashMap<>(items.size()); + this.selectItems = new ArrayList<>(items.size()); List fields = this.encoder.getFields(); for (int i = 0; i < items.size(); i++) { SelectItem item = items.get(i); @@ -108,8 +110,12 @@ private void initTransformSql() throws JSQLParserException { fieldName = exprItem.getAlias().getName(); } } - this.selectItemMap.put(fieldName, - OperatorTools.buildParser(exprItem.getExpression())); + this.selectItems + .add(new ValueParserNode(fieldName, OperatorTools.buildParser(exprItem.getExpression()))); + } else if (item instanceof AllColumns) { + fieldName = item.toString(); + this.encoder.getFields().clear(); + this.selectItems.add(new ValueParserNode(fieldName, null)); } } } @@ -137,10 +143,18 @@ public List transform(I input, Map extParams) { // parse value SinkData sinkData = new DefaultSinkData(); - for (Entry entry : this.selectItemMap.entrySet()) { - String fieldName = entry.getKey(); + for (ValueParserNode node : this.selectItems) { + String fieldName = node.getFieldName(); + ValueParser parser = node.getParser(); + if (parser == null && StringUtils.equals(fieldName, SinkEncoder.ALL_SOURCE_FIELD_SIGN)) { + if (input instanceof String) { + sinkData.addField(fieldName, (String) input); + } else { + sinkData.addField(fieldName, ""); + } + continue; + } try { - ValueParser parser = entry.getValue(); Object fieldValue = parser.parse(sourceData, i, context); sinkData.addField(fieldName, String.valueOf(fieldValue)); } catch (Throwable t) { diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/ValueParserNode.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/ValueParserNode.java new file mode 100644 index 00000000000..e36c0c9c6a9 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/ValueParserNode.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process; + +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * ValueParserNode + */ +@AllArgsConstructor +@Data +public class ValueParserNode { + + private String fieldName; + private ValueParser parser; +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java index 3413f1aca3e..8448260252d 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java @@ -350,4 +350,80 @@ public void testPb2CsvForNow() throws Exception { List output = processor.transform(srcBytes, new HashMap<>()); Assert.assertEquals(2, output.size()); } + @Test + public void testCsv2Star() throws Exception { + List fields = this.getTestFieldList("ftime", "extinfo"); + CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", '|', '\\', fields); + CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', new ArrayList<>()); + String transformSql = "select *"; + TransformConfig config = new TransformConfig(transformSql); + // case1 + TransformProcessor processor1 = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); + + List output1 = processor1.transform("2024-04-28 00:00:00|ok", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok"); + // case2 + config.setTransformSql("select * from source where extinfo!='ok'"); + TransformProcessor processor2 = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); + + List output2 = processor2.transform("2024-04-28 00:00:00|ok", new HashMap<>()); + Assert.assertEquals(0, output2.size()); + // case3 + config.setTransformSql("select *,extinfo,ftime from source where extinfo!='ok'"); + TransformProcessor processor3 = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); + + List output3 = processor3.transform("2024-04-28 00:00:00|nok", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "2024-04-28 00:00:00|nok|nok|2024-04-28 00:00:00"); + // case4 + CsvSourceInfo csvSourceNoField = new CsvSourceInfo("UTF-8", '|', '\\', new ArrayList<>()); + CsvSinkInfo csvSinkNoField = new CsvSinkInfo("UTF-8", '|', '\\', new ArrayList<>()); + config.setTransformSql("select *,$2,$1 from source where $2='nok'"); + TransformProcessor processor4 = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSourceNoField), + SinkEncoderFactory.createCsvEncoder(csvSinkNoField)); + + List output4 = processor4.transform("2024-04-28 00:00:00|nok", new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals(output4.get(0), "2024-04-28 00:00:00|nok|nok|2024-04-28 00:00:00"); + } + + @Test + public void testKv2Star() throws Exception { + List fields = this.getTestFieldList("ftime", "extinfo"); + KvSourceInfo kvSource = new KvSourceInfo("UTF-8", fields); + KvSinkInfo kvSink = new KvSinkInfo("UTF-8", new ArrayList<>()); + String transformSql = "select *"; + TransformConfig config = new TransformConfig(transformSql); + // case1 + TransformProcessor processor1 = TransformProcessor + .create(config, SourceDecoderFactory.createKvDecoder(kvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output1 = processor1.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "ftime=2024-04-28 00:00:00&extinfo=ok"); + // case2 + config.setTransformSql("select * from source where extinfo!='ok'"); + TransformProcessor processor2 = TransformProcessor + .create(config, SourceDecoderFactory.createKvDecoder(kvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output2 = processor2.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); + Assert.assertEquals(0, output2.size()); + // case3 + config.setTransformSql("select *,extinfo e1,ftime f1 from source where extinfo!='ok'"); + TransformProcessor processor3 = TransformProcessor + .create(config, SourceDecoderFactory.createKvDecoder(kvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + List output3 = processor3.transform("ftime=2024-04-28 00:00:00&extinfo=nok", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "ftime=2024-04-28 00:00:00&extinfo=nok&e1=nok&f1=2024-04-28 00:00:00"); + } }