diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateAddFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateAddFunction.java new file mode 100644 index 0000000000..11fbfcda38 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/DateAddFunction.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; +import org.apache.inlong.sdk.transform.process.utils.DateUtil; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; +import org.apache.commons.lang3.tuple.Pair; + +import java.time.temporal.ChronoField; +import java.util.List; +import java.util.Map; + +/** + * DateAddFunction + * Description: + * - return NULL if date is NULL. + * - return DATE if the date argument is a DATE value and your calculations involve only YEAR, MONTH, and DAY parts + * (that is, no time parts). + * - return TIME if the date argument is a TIME value and the calculations involve only HOURS, MINUTES, + * and SECONDS parts (that is, no date parts). + * - return DATETIME if the first argument is a DATETIME (or TIMESTAMP) value, or if the first argument is a DATE + * and the unit value uses HOURS, MINUTES, or SECONDS, or if the first argument is of type TIME and the + * unit value uses YEAR, MONTH, or DAY. + * - return If the first argument is a dynamic parameter (for example, of a prepared statement), its resolved type + * is DATE if the second argument is an interval that contains some combination of YEAR, MONTH, or DAY values + * only; otherwise, its type is DATETIME. + * - return String otherwise (type VARCHAR). + */ +@TransformFunction(names = {"date_add"}) +public class DateAddFunction implements ValueParser { + + private ValueParser datetimeParser; + private ValueParser intervalParser; + + public DateAddFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + datetimeParser = OperatorTools.buildParser(expressions.get(0)); + intervalParser = OperatorTools.buildParser(expressions.get(1)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object intervalPairObj = intervalParser.parse(sourceData, rowIndex, context); + Object dateObj = datetimeParser.parse(sourceData, rowIndex, context); + if (intervalPairObj == null || dateObj == null) { + return null; + } + return DateUtil.dateAdd(OperatorTools.parseString(dateObj), + (Pair>) intervalPairObj, 1); + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestDateAddFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestDateAddFunction.java new file mode 100644 index 0000000000..fae86d1276 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/temporal/TestDateAddFunction.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.temporal; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestDateAddFunction extends AbstractFunctionTemporalTestBase { + + @Test + public void testDateAddFunction() throws Exception { + String transformSql = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select DATE_ADD(string1, INTERVAL string2 DAY) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: date_add('2020-12-31 23:59:59',INTERVAL 999 DAY) + output = processor.transform("2020-12-31 23:59:59|999", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=2023-09-26 23:59:59", output.get(0)); + + // case2: date_add('2020-12-31 23:59:59',INTERVAL -999 DAY) + output = processor.transform("2020-12-31 23:59:59|-999", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=2018-04-07 23:59:59", output.get(0)); + + transformSql = "select DATE_ADD(string1, INTERVAL string2 MINUTE_SECOND) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case3: date_add('2020-12-31',INTERVAL '-1:1' MINUTE_SECOND) + output = processor.transform("2020-12-31|-1:1", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=2020-12-30 23:58:59", output.get(0)); + + transformSql = "select DATE_ADD(string1, INTERVAL string2 SECOND_MICROSECOND) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case4: DATE_ADD('1992-12-31 23:59:59', INTERVAL '-1.999999' SECOND_MICROSECOND) + output = processor.transform("1992-12-31 23:59:59|-1.999999", new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=1992-12-31 23:59:57.000001", output.get(0)); + + } +}