From b2926869dd7ca06bdd5d853c3ecf767acbbe2e1f Mon Sep 17 00:00:00 2001 From: MOONSakura0614 <151456101+MOONSakura0614@users.noreply.github.com> Date: Mon, 9 Sep 2024 20:35:44 +0800 Subject: [PATCH] [INLONG-10881][SDK] Transform SQL support concat_ws function (#10992) --- .../process/function/ConcatWsFunction.java | 94 +++++++++++++++ ...estTransformConcatWsFunctionProcessor.java | 111 ++++++++++++++++++ 2 files changed, 205 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatWsFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformConcatWsFunctionProcessor.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatWsFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatWsFunction.java new file mode 100644 index 00000000000..8901d3f9b3b --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatWsFunction.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; +import net.sf.jsqlparser.expression.NullValue; + +import java.util.ArrayList; +import java.util.List; + +/** + * ConcatWsFunction + * description: concat_ws(string1, string2, string3,…)--Returns a string that concatenates STRING2, + * STRING3, … with a separator STRING1. The separator is added between the strings to be concatenated. + * Returns NULL If STRING1 is NULL. + */ +@TransformFunction(names = {"concat_ws"}) +public class ConcatWsFunction implements ValueParser { + + private final String separator; + private final List nodeList; + + public ConcatWsFunction(Function expr) { + List params = expr.getParameters().getExpressions(); + if (params == null || params.isEmpty()) { + this.separator = null; + this.nodeList = new ArrayList<>(); + return; + } + + // Handle the case where the separator is NULL + Expression separatorExpr = params.get(0); + if (separatorExpr instanceof NullValue) { + this.separator = null; + } else { + this.separator = OperatorTools.parseString( + OperatorTools.buildParser(separatorExpr).parse(null, 0, null)); + } + + this.nodeList = new ArrayList<>(params.size() - 1); + for (int i = 1; i < params.size(); i++) { + Expression paramExpr = params.get(i); + if (paramExpr instanceof NullValue) { + // Add null to the list to be handled in parse method + nodeList.add(null); + } else { + nodeList.add(OperatorTools.buildParser(paramExpr)); + } + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + if (separator == null) { + return null; + } + StringBuilder result = new StringBuilder(); + boolean firstStrFlag = true; + for (ValueParser node : nodeList) { + if (node != null) { + Object parsedValue = node.parse(sourceData, rowIndex, context); + if (parsedValue != null) { + if (!firstStrFlag) { + result.append(separator); + } + result.append(parsedValue); + firstStrFlag = false; + } + } + } + return result.toString(); + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformConcatWsFunctionProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformConcatWsFunctionProcessor.java new file mode 100644 index 00000000000..76c253ea611 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformConcatWsFunctionProcessor.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo; +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.KvSinkInfo; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * TestTransformConcatWsFunctionProcessor + * description: test the concat_ws function in transform processor + */ +public class TestTransformConcatWsFunctionProcessor { + + private static final List srcFields = new ArrayList<>(); + private static final List dstFields = new ArrayList<>(); + private static final CsvSourceInfo csvSource; + private static final KvSinkInfo kvSink; + + static { + for (int i = 1; i < 4; i++) { + FieldInfo field = new FieldInfo(); + field.setName("string" + i); + srcFields.add(field); + } + FieldInfo field = new FieldInfo(); + field.setName("result"); + dstFields.add(field); + csvSource = new CsvSourceInfo("UTF-8", '|', '\\', srcFields); + kvSink = new KvSinkInfo("UTF-8", dstFields); + } + + @Test + public void testConcatWsFunction() throws Exception { + // case 1: concat_ws('-', 'apple', 'banana', 'cloud') + String transformSql1 = "select concat_ws('-', string1, string2, string3) from source"; + TransformConfig config1 = new TransformConfig(transformSql1); + TransformProcessor processor1 = TransformProcessor + .create(config1, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output1 = processor1.transform("apple|banana|cloud|extra", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=apple-banana-cloud"); + + // case 2: concat_ws('-', 'apple', '', 'cloud') + String transformSql2 = "select concat_ws('-', string1, string2, string3) from source"; + TransformConfig config2 = new TransformConfig(transformSql2); + TransformProcessor processor2 = TransformProcessor + .create(config2, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output2 = processor2.transform("apple||cloud|extra", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=apple--cloud"); + + // case 3: concat_ws('-', 'apple', null, 'cloud') + String transformSql3 = "select concat_ws('-', string1, stringX, string3) from source"; + TransformConfig config3 = new TransformConfig(transformSql3); + TransformProcessor processor3 = TransformProcessor + .create(config3, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output3 = processor3.transform("apple|banana|cloud|extra", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=apple-cloud"); + + // case 4: concat_ws(null, 'apple', 'banana', 'cloud') + String transformSql4 = "select concat_ws(null, string1, string2, string3) from source"; + TransformConfig config4 = new TransformConfig(transformSql4); + TransformProcessor processor4 = TransformProcessor + .create(config4, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output4 = processor4.transform("apple|null|cloud|extra", new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals(output4.get(0), "result=null"); + + // case 5: concat_ws('-', '', '', '') + String transformSql5 = "select concat_ws('-', string1, string2, string3) from source"; + TransformConfig config5 = new TransformConfig(transformSql5); + TransformProcessor processor5 = TransformProcessor + .create(config5, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output5 = processor5.transform("|||", new HashMap<>()); + Assert.assertEquals(1, output5.size()); + Assert.assertEquals(output5.get(0), "result=--"); + } + +}