From 8a2766453099e281e3a4d9f490207737a9f7cda1 Mon Sep 17 00:00:00 2001 From: LeeWY <61183968+yfsn666@users.noreply.github.com> Date: Tue, 6 Aug 2024 19:03:46 +0800 Subject: [PATCH] [INLONG-10618][SDK] Transform SQL support common functions(Including substring, locate, to_date and date_format) (#10744) * [INLONG-10618][SDK] Transform SQL support common functions(Including substring, locate, to_date and date_format) * [INLONG-10618][SDK] Fix unit test failure, set time zone to Shanghai * [INLONG-10618][SDK] Make the DateTimeFormatter object reusable to avoid creating multiple identical DateTimeFormatter objects. * [INLONG-10618][SDK] Make the SimpleDateFormat object reusable to avoid creating multiple identical SimpleDateFormat objects. --------- Co-authored-by: jameswyli --- .../process/function/DateFormatFunction.java | 90 +++++++++++++ .../process/function/LocateFunction.java | 87 +++++++++++++ .../process/function/SubstringFunction.java | 81 ++++++++++++ .../process/function/ToDateFunction.java | 93 +++++++++++++ .../process/operator/OperatorTools.java | 8 ++ ...TestTransformStringFunctionsProcessor.java | 121 +++++++++++++++++ ...stTransformTemporalFunctionsProcessor.java | 123 ++++++++++++++++++ 7 files changed, 603 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateFormatFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LocateFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SubstringFunction.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ToDateFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateFormatFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateFormatFunction.java new file mode 100644 index 00000000000..9e233ff8824 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateFormatFunction.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * DateFormatFunction + * description: date_format(timestamp,format)--converts timestamp(in seconds) to a value of string in the format + * specified by the date format string. The format string is compatible with Java’s SimpleDateFormat + */ +public class DateFormatFunction implements ValueParser { + + private ValueParser timestampParser; + private ValueParser formatParser; + private static final Map SIMPLE_DATE_FORMATS = new ConcurrentHashMap<>(); + + /** + * Constructor + * + * @param expr + */ + public DateFormatFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + timestampParser = OperatorTools.buildParser(expressions.get(0)); + formatParser = OperatorTools.buildParser(expressions.get(1)); + } + + /** + * parse + * + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object timestampObj = timestampParser.parse(sourceData, rowIndex, context); + Object formatObj = formatParser.parse(sourceData, rowIndex, context); + BigDecimal timestamp = OperatorTools.parseBigDecimal(timestampObj); + String format = OperatorTools.parseString(formatObj); + SimpleDateFormat sdf = getSimpleDateFormat(format); + // the timestamp is in seconds, multiply 1000 to get milliseconds + Date date = new Date(timestamp.longValue() * 1000); + return sdf.format(date); + } + + /** + * getSimpleDateFormat + * + * @param pattern + * @return + */ + private SimpleDateFormat getSimpleDateFormat(String pattern) { + SimpleDateFormat sdf = SIMPLE_DATE_FORMATS.get(pattern); + if (sdf == null) { + sdf = new SimpleDateFormat(pattern); + SIMPLE_DATE_FORMATS.put(pattern, sdf); + } + return sdf; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LocateFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LocateFunction.java new file mode 100644 index 00000000000..e300815eec9 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LocateFunction.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.List; + +/** + * LocateFunction + * description: locate(string1, string2[, integer]) + * - returns the position of the first occurrence of string1 in string2 after position integer + * - returns 0 if not found + * - returns NULL if any of arguments is NULL + */ +public class LocateFunction implements ValueParser { + + private ValueParser stringParser1; + private ValueParser stringParser2; + private ValueParser startPositionParser; + + /** + * Constructor + * + * @param expr + */ + public LocateFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + // Determine the number of arguments and build parser + stringParser1 = OperatorTools.buildParser(expressions.get(0)); + stringParser2 = OperatorTools.buildParser(expressions.get(1)); + if (expressions.size() == 3) { + startPositionParser = OperatorTools.buildParser(expressions.get(2)); + } + } + + /** + * parse + * + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object stringObj1 = stringParser1.parse(sourceData, rowIndex, context); + Object stringObj2 = stringParser2.parse(sourceData, rowIndex, context); + // If any of arguments is null, return null + if (stringObj1 == null || stringObj2 == null) { + return null; + } + String str1 = OperatorTools.parseString(stringObj1); + String str2 = OperatorTools.parseString(stringObj2); + if (startPositionParser != null) { + Object startPositionObj = startPositionParser.parse(sourceData, rowIndex, context); + // if startPositionObj is null, return null + if (startPositionObj == null) { + return null; + } + int startPosition = OperatorTools.parseBigDecimal(startPositionObj).intValue(); + return str2.indexOf(str1, startPosition - 1) + 1; + } else { + return str2.indexOf(str1) + 1; + } + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SubstringFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SubstringFunction.java new file mode 100644 index 00000000000..063686aa55b --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SubstringFunction.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.List; + +/** + * SubstringFunction + * description: substring(string FROM INT1 [ FOR INT2 ])--returns a substring of STRING starting from position INT1 with + * length INT2 (to the end by default) + */ +public class SubstringFunction implements ValueParser { + + private ValueParser stringParser; + private ValueParser startPositionParser; + private ValueParser lengthParser; + + /** + * Constructor + * @param expr + */ + public SubstringFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + // Determine the number of arguments and build parser + stringParser = OperatorTools.buildParser(expressions.get(0)); + startPositionParser = OperatorTools.buildParser(expressions.get(1)); + if (expressions.size() == 3) { + lengthParser = OperatorTools.buildParser(expressions.get(2)); + } + } + + /** + * parse + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object stringObj = stringParser.parse(sourceData, rowIndex, context); + Object startPositionObj = startPositionParser.parse(sourceData, rowIndex, context); + String str = OperatorTools.parseString(stringObj); + int start = OperatorTools.parseBigDecimal(startPositionObj).intValue(); + if (start > str.length()) { + return ""; + } + if (lengthParser != null) { + Object lengthObj = lengthParser.parse(sourceData, rowIndex, context); + int len = OperatorTools.parseBigDecimal(lengthObj).intValue(); + if (len <= 0) { + return ""; + } + return str.substring(Math.max(start - 1, 0), Math.min(start - 1 + len, str.length())); + } else { + return str.substring(Math.max(start - 1, 0)); + } + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ToDateFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ToDateFunction.java new file mode 100644 index 00000000000..5a03f1d2e42 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ToDateFunction.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * ToDateFunction + * description: to_date(string1[, string2])--converts a date string string1 with format string2 (by default ‘yyyy-MM-dd’) to a date + */ +public class ToDateFunction implements ValueParser { + + private ValueParser stringParser1; + private ValueParser stringParser2; + private static final Map INPUT_FORMATTERS = new ConcurrentHashMap<>(); + private static final DateTimeFormatter OUTPUT_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + /** + * Constructor + * + * @param expr + */ + public ToDateFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + // Determine the number of arguments and build parser + stringParser1 = OperatorTools.buildParser(expressions.get(0)); + if (expressions.size() == 2) { + stringParser2 = OperatorTools.buildParser(expressions.get(1)); + } + } + + /** + * parse + * + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object stringObj1 = stringParser1.parse(sourceData, rowIndex, context); + String str1 = OperatorTools.parseString(stringObj1); + String str2 = "yyyy-MM-dd"; + if (stringParser2 != null) { + Object stringObj2 = stringParser2.parse(sourceData, rowIndex, context); + str2 = OperatorTools.parseString(stringObj2); + } + LocalDate date = LocalDate.parse(str1, getDateTimeFormatter(str2)); + return date.format(OUTPUT_FORMATTER); + } + + /** + * getDateTimeFormatter + * + * @param pattern + * @return + */ + private DateTimeFormatter getDateTimeFormatter(String pattern) { + DateTimeFormatter formatter = INPUT_FORMATTERS.get(pattern); + if (formatter == null) { + formatter = DateTimeFormatter.ofPattern(pattern); + INPUT_FORMATTERS.put(pattern, formatter); + } + return formatter; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java index 94cbba090c1..bc97d111486 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java @@ -19,14 +19,18 @@ import org.apache.inlong.sdk.transform.process.function.AbsFunction; import org.apache.inlong.sdk.transform.process.function.ConcatFunction; +import org.apache.inlong.sdk.transform.process.function.DateFormatFunction; import org.apache.inlong.sdk.transform.process.function.ExpFunction; import org.apache.inlong.sdk.transform.process.function.LnFunction; +import org.apache.inlong.sdk.transform.process.function.LocateFunction; import org.apache.inlong.sdk.transform.process.function.Log10Function; import org.apache.inlong.sdk.transform.process.function.Log2Function; import org.apache.inlong.sdk.transform.process.function.LogFunction; import org.apache.inlong.sdk.transform.process.function.NowFunction; import org.apache.inlong.sdk.transform.process.function.PowerFunction; import org.apache.inlong.sdk.transform.process.function.SqrtFunction; +import org.apache.inlong.sdk.transform.process.function.SubstringFunction; +import org.apache.inlong.sdk.transform.process.function.ToDateFunction; import org.apache.inlong.sdk.transform.process.parser.AdditionParser; import org.apache.inlong.sdk.transform.process.parser.ColumnParser; import org.apache.inlong.sdk.transform.process.parser.DivisionParser; @@ -85,6 +89,10 @@ public class OperatorTools { functionMap.put("log2", Log2Function::new); functionMap.put("log", LogFunction::new); functionMap.put("exp", ExpFunction::new); + functionMap.put("substring", SubstringFunction::new); + functionMap.put("locate", LocateFunction::new); + functionMap.put("to_date", ToDateFunction::new); + functionMap.put("date_format", DateFormatFunction::new); } public static ExpressionOperator buildOperator(Expression expr) { diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java new file mode 100644 index 00000000000..8dabad12f6c --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo; +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.KvSinkInfo; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * TestTransformStringFunctionsProcessor + * description: test the string functions in transform processor + */ +public class TestTransformStringFunctionsProcessor { + + private static final List srcFields = new ArrayList<>(); + private static final List dstFields = new ArrayList<>(); + private static final CsvSourceInfo csvSource; + private static final KvSinkInfo kvSink; + static { + for (int i = 1; i < 4; i++) { + FieldInfo field = new FieldInfo(); + field.setName("string" + i); + srcFields.add(field); + } + for (int i = 1; i < 4; i++) { + FieldInfo field = new FieldInfo(); + field.setName("numeric" + i); + srcFields.add(field); + } + FieldInfo field = new FieldInfo(); + field.setName("result"); + dstFields.add(field); + csvSource = new CsvSourceInfo("UTF-8", '|', '\\', srcFields); + kvSink = new KvSinkInfo("UTF-8", dstFields); + } + + @Test + public void testSubstringFunction() throws Exception { + String transformSql1 = "select substring(string2, numeric1) from source"; + TransformConfig config1 = new TransformConfig(transformSql1); + TransformProcessor processor1 = TransformProcessor + .create(config1, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: substring('banana', 2) + List output1 = processor1.transform("apple|banana|cloud|2|1|3", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=anana"); + String transformSql2 = "select substring(string1, numeric1, numeric3) from source"; + TransformConfig config2 = new TransformConfig(transformSql2); + TransformProcessor processor2 = TransformProcessor + .create(config2, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case2: substring('apple', 1, 3) + List output2 = processor2.transform("apple|banana|cloud|1|1|3", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=app"); + // case3: substring('apple', 2, 9) + List output3 = processor2.transform("apple|banana|cloud|2|1|9", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=pple"); + } + + @Test + public void testLocateFunction() throws Exception { + String transformSql1 = "select locate(string1, string2) from source"; + TransformConfig config1 = new TransformConfig(transformSql1); + TransformProcessor processor1 = TransformProcessor + .create(config1, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: locate('app', 'apple') + List output1 = processor1.transform("app|apple|cloud|2|1|3", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=1"); + // case2: locate('ape', 'apple') + List output2 = processor1.transform("ape|apple|cloud|2|1|3", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=0"); + String transformSql2 = "select locate(string1, string2, numeric1) from source"; + TransformConfig config2 = new TransformConfig(transformSql2); + TransformProcessor processor2 = TransformProcessor + .create(config2, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case3: locate('app', 'appapp', 2) + List output3 = processor2.transform("app|appapp|cloud|2|1|3", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=4"); + // case4: locate('app', 'appape', 2) + List output4 = processor2.transform("app|appape|cloud|2|1|9", new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals(output4.get(0), "result=0"); + // case5: locate('app', null) + List output5 = processor1.transform("app", new HashMap<>()); + Assert.assertEquals(1, output5.size()); + Assert.assertEquals(output5.get(0), "result=null"); + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java new file mode 100644 index 00000000000..25675b25f38 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo; +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.KvSinkInfo; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.TimeZone; + +/** + * TestTransformTemporalFunctionsProcessor + * description: test the temporal functions in transform processor + */ +public class TestTransformTemporalFunctionsProcessor { + + private static final List srcFields = new ArrayList<>(); + private static final List dstFields = new ArrayList<>(); + private static final CsvSourceInfo csvSource; + private static final KvSinkInfo kvSink; + static { + for (int i = 1; i < 4; i++) { + FieldInfo field = new FieldInfo(); + field.setName("string" + i); + srcFields.add(field); + } + for (int i = 1; i < 4; i++) { + FieldInfo field = new FieldInfo(); + field.setName("numeric" + i); + srcFields.add(field); + } + FieldInfo field = new FieldInfo(); + field.setName("result"); + dstFields.add(field); + csvSource = new CsvSourceInfo("UTF-8", '|', '\\', srcFields); + kvSink = new KvSinkInfo("UTF-8", dstFields); + } + + @Before + public void setUp() { + TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai")); + } + + @Test + public void testToDateFunction() throws Exception { + String transformSql1 = "select to_date(string1) from source"; + TransformConfig config1 = new TransformConfig(transformSql1); + TransformProcessor processor1 = TransformProcessor + .create(config1, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: to_date('2024-08-15') + List output1 = processor1.transform("2024-08-15|apple|cloud|2|1|3", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=2024-08-15"); + String transformSql2 = "select to_date(string1, string2) from source"; + TransformConfig config2 = new TransformConfig(transformSql2); + TransformProcessor processor2 = TransformProcessor + .create(config2, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case2: to_date('20240815', 'yyyyMMdd') + List output2 = processor2.transform("20240815|yyyyMMdd|cloud|2|1|3", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=2024-08-15"); + // case3: to_date('08152024', 'MMddyyyy') + List output3 = processor2.transform("08152024|MMddyyyy|cloud|2|1|3", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=2024-08-15"); + // case4: to_date('2024/08/15', 'yyyy/MM/dd') + List output4 = processor2.transform("2024/08/15|yyyy/MM/dd|cloud|2|1|3", new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals(output4.get(0), "result=2024-08-15"); + } + + @Test + public void testDateFormatFunction() throws Exception { + String transformSql = "select date_format(numeric1, string1) from source"; + TransformConfig config = new TransformConfig(transformSql); + TransformProcessor processor1 = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: date_format(1722524216, 'yyyy-MM-dd HH:mm:ss') + List output1 = processor1.transform("yyyy-MM-dd HH:mm:ss|apple|cloud|1722524216|1|3", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=2024-08-01 22:56:56"); + // case2: date_format(1722524216, 'yyyy-MM-dd') + List output2 = processor1.transform("yyyy-MM-dd|apple|cloud|1722524216|1|3", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=2024-08-01"); + // case3: date_format(1722524216, 'yyyyMMddHHmmss') + List output3 = processor1.transform("yyyyMMddHHmmss|apple|cloud|1722524216|1|3", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=20240801225656"); + // case1: date_format(1722524216, 'yyyy/MM/dd HH:mm:ss') + List output4 = processor1.transform("yyyy/MM/dd HH:mm:ss|apple|cloud|1722524216|1|3", new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals(output4.get(0), "result=2024/08/01 22:56:56"); + } +}