From 389b41d81b644ef3899117714ac09f144f2b1b1c Mon Sep 17 00:00:00 2001 From: Zkplo <87751516+Zkplo@users.noreply.github.com> Date: Wed, 28 Aug 2024 14:12:35 +0800 Subject: [PATCH] [INLONG-10879][SDK] Transform support TIMESTAMPADD() function (#10880) --- .../function/TimestampAddFunction.java | 117 ++++++++++++++++++ .../process/operator/OperatorTools.java | 2 + ...stTransformTemporalFunctionsProcessor.java | 40 ++++++ 3 files changed, 159 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampAddFunction.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampAddFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampAddFunction.java new file mode 100644 index 00000000000..e09512cdfd2 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/TimestampAddFunction.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; + +/** + * TimestampAddFunction + * Description: Add integer expression intervals to the date or date time expression expr. + * The unit of the time interval is specified by the unit parameter, which should be one of the following values: + * FRAC_SECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, QUARTER, or YEAR. + */ +public class TimestampAddFunction implements ValueParser { + + private ValueParser intervalParser; + private ValueParser amountParser; + private ValueParser datetimeParser; + private static final DateTimeFormatter DEFAULT_FORMAT_DATE_TIME = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private static final DateTimeFormatter DEFAULT_FORMAT_DATE = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + public TimestampAddFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + intervalParser = OperatorTools.buildParser(expressions.get(0)); + amountParser = OperatorTools.buildParser(expressions.get(1)); + datetimeParser = OperatorTools.buildParser(expressions.get(2)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + String interval = intervalParser.parse(sourceData, rowIndex, context).toString(); + Long amount = Long.parseLong(amountParser.parse(sourceData, rowIndex, context).toString()); + String dateString = datetimeParser.parse(sourceData, rowIndex, context).toString(); + return evalDate(dateString, interval, amount); + } + + private String evalDate(String dateString, String interval, Long amount) { + DateTimeFormatter formatter = null; + LocalDateTime dateTime = null; + boolean hasTime = true; + if (dateString.indexOf(' ') != -1) { + formatter = DEFAULT_FORMAT_DATE_TIME; + dateTime = LocalDateTime.parse(dateString, formatter); + } else { + formatter = DEFAULT_FORMAT_DATE; + dateTime = LocalDate.parse(dateString, formatter).atStartOfDay(); + hasTime = false; + } + + switch (interval.toUpperCase()) { + case "FRAC_SECOND": + hasTime = true; + dateTime = dateTime.plusNanos(amount * 1000_000); + break; + case "SECOND": + hasTime = true; + dateTime = dateTime.plusSeconds(amount); + break; + case "MINUTE": + hasTime = true; + dateTime = dateTime.plusMinutes(amount); + break; + case "HOUR": + hasTime = true; + dateTime = dateTime.plusHours(amount); + break; + case "DAY": + dateTime = dateTime.plusDays(amount); + break; + case "WEEK": + dateTime = dateTime.plusWeeks(amount); + break; + case "MONTH": + dateTime = dateTime.plusMonths(amount); + break; + case "QUARTER": + dateTime = dateTime.plusMonths(amount * 3); + break; + case "YEAR": + dateTime = dateTime.plusYears(amount); + break; + } + + String result = dateTime.toLocalDate().toString(); + if (hasTime) { + result += " " + dateTime.toLocalTime().format(DateTimeFormatter.ofPattern("HH:mm:ss")); + } + + return result; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java index 029089c646b..66c59a888bc 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java @@ -53,6 +53,7 @@ import org.apache.inlong.sdk.transform.process.function.SqrtFunction; import org.apache.inlong.sdk.transform.process.function.SubstringFunction; import org.apache.inlong.sdk.transform.process.function.TanFunction; +import org.apache.inlong.sdk.transform.process.function.TimestampAddFunction; import org.apache.inlong.sdk.transform.process.function.TimestampExtractFunction; import org.apache.inlong.sdk.transform.process.function.ToBase64Function; import org.apache.inlong.sdk.transform.process.function.ToDateFunction; @@ -175,6 +176,7 @@ public class OperatorTools { functionMap.put("replace", ReplaceFunction::new); functionMap.put("left", LeftFunction::new); functionMap.put("right", RightFunction::new); + functionMap.put("timestampadd", TimestampAddFunction::new); } public static ExpressionOperator buildOperator(Expression expr) { diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java index 67027c4b25d..6ccfc357b34 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformTemporalFunctionsProcessor.java @@ -47,6 +47,7 @@ public class TestTransformTemporalFunctionsProcessor { private static final List dstFields = new ArrayList<>(); private static final CsvSourceInfo csvSource; private static final KvSinkInfo kvSink; + static { for (int i = 1; i < 4; i++) { FieldInfo field = new FieldInfo(); @@ -409,4 +410,43 @@ public void testLocalTimeFunction() throws Exception { Assert.assertEquals(1, output3.size()); Assert.assertTrue(duration3.getSeconds() < 1); } + + @Test + public void testTimestampAdd() throws Exception { + String transformSql1 = "select timestampadd('day',string2,string1) from source"; + TransformConfig config1 = new TransformConfig(transformSql1); + TransformProcessor processor1 = TransformProcessor + .create(config1, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: timestampadd('day',3,'1970-01-01') + List output1 = processor1.transform("1970-01-01|3", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=1970-01-04", output1.get(0)); + + // case2: timestampadd('day',-3,'1970-01-01 00:00:44') + List output2 = processor1.transform("1970-01-01 00:00:44|-3", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=1969-12-29 00:00:44", output2.get(0)); + + String transformSql2 = "select timestampadd('MINUTE',string2,string1) from source"; + TransformConfig config2 = new TransformConfig(transformSql2); + TransformProcessor processor2 = TransformProcessor + .create(config2, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case3: timestampadd('MINUTE',3,'1970-01-01 00:00:44') + List output3 = processor2.transform("1970-01-01 00:00:44|3", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals("result=1970-01-01 00:03:44", output3.get(0)); + + // case4: timestampadd('MINUTE',-3,'1970-01-01 00:00:44') + List output4 = processor2.transform("1970-01-01 00:00:44|-3", new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals("result=1969-12-31 23:57:44", output4.get(0)); + + // case5: timestampadd('MINUTE',-3,'1970-01-01') + List output5 = processor2.transform("1970-01-01|-3", new HashMap<>()); + Assert.assertEquals(1, output5.size()); + Assert.assertEquals("result=1969-12-31 23:57:00", output5.get(0)); + } }