From 750f9655cf37e5a8810d23a13f43ff89aeb38c98 Mon Sep 17 00:00:00 2001 From: yiyutian1 Date: Sun, 8 Dec 2024 23:07:49 -0800 Subject: [PATCH] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions --- docs/data/sql_functions.yml | 10 +- .../apache/flink/table/api/Expressions.java | 55 ++++++ .../functions/BuiltInFunctionDefinitions.java | 21 ++- .../ToTimestampLtzTypeStrategy.java | 43 ++++- .../flink/table/utils/DateTimeUtils.java | 47 +++++ .../functions/TimeFunctionsITCase.java | 132 +++++++++++++- .../expressions/TemporalTypesTest.scala | 28 +-- .../scalar/ToTimestampLtzFunction.java | 165 ++++++++++++++++++ 8 files changed, 475 insertions(+), 26 deletions(-) create mode 100644 flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index eda6d3de753d4..f2915c462bb70 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -679,9 +679,15 @@ temporal: - sql: TO_DATE(string1[, string2]) table: toDate(STRING1[, STRING2]) description: Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. - - sql: TO_TIMESTAMP_LTZ(numeric, precision) + - sql: TO_TIMESTAMP_LTZ(numeric[, precision]) table: toTimestampLtz(NUMERIC, PRECISION) - description: "Converts a epoch seconds or epoch milliseconds to a TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents TO_TIMESTAMP_LTZ(epochMilliseconds, 3)." + description: "Converts a epoch seconds or epoch milliseconds to a TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents TO_TIMESTAMP_LTZ(epochMilliseconds, 3). If no precision is provided, the default precision is 3." + - sql: TO_TIMESTAMP_LTZ(string1[, string2]) + table: toTimestampLtz(STRING1[, STRING2]) + description: "Converts a timestamp string string1 with format string2 (by default 'yyyy-MM-dd HH:mm:ss.SSS') to a TIMESTAMP_LTZ". + - sql: TO_TIMESTAMP_TZ(string1, string2, string3) + table: toTimestampLtz(STRING1, STRING2, STRING3) + description: "Converts a timestamp string string1 with format string2 to a TIMESTAMP_TZ in time zone string3." - sql: TO_TIMESTAMP(string1[, string2]) table: toTimestamp(STRING1[, STRING2]) description: "Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone." diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java index 9b687ae65a00b..a4a4f064dd404 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java @@ -366,6 +366,61 @@ public static ApiExpression toTimestampLtz(Object numericEpochTime, Object preci return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, numericEpochTime, precision); } + /** + * * Converts the given time string with the specified format to {@link + * DataTypes#TIMESTAMP_LTZ(int)}. + * + * @param timestampStr The timestamp string to convert. + * @param format The format of the string. + * @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} type. + */ + public static ApiExpression toTimestampLtz(String timestampStr, String format) { + return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, timestampStr, format); + } + + /** + * Converts a timestamp to {@link DataTypes#TIMESTAMP_LTZ(int)}. + * + *

This method takes an object representing a timestamp and converts it to a TIMESTAMP_LTZ + * using the built-in TO_TIMESTAMP_LTZ function definition. + * + * @param timeStamp The timestamp string to be converted. + * @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} type. + */ + public static ApiExpression toTimestampLtz(String timeStamp) { + return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, timeStamp); + } + + /** + * Converts a numeric type epoch time to {@link DataTypes#TIMESTAMP_LTZ(int)}. + * + *

This method takes an object representing an epoch time and converts it to a TIMESTAMP_LTZ + * using the built-in TO_TIMESTAMP_LTZ function definition. + * + * @param numericEpochTime The epoch time with numeric type. + * @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} type. + */ + public static ApiExpression toTimestampLtz(Object numericEpochTime) { + return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, numericEpochTime); + } + + /** + * Converts a string timestamp with the custom format and timezone to {@link + * DataTypes#TIMESTAMP_LTZ(int)}. + * + *

The timestamp string will be parsed using the custom format and timezone, and converted to + * a TIMESTAMP_LTZ value. + * + * @param timestampStr The timestamp string to convert. + * @param format The format pattern to parse the timestamp string. + * @param timezone The timezone to use for the conversion. + * @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} type. + */ + public static ApiExpression toTimestampLtz( + Object timestampStr, Object format, Object timezone) { + return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, timestampStr, format, timezone); + } + /** * Determines whether two anchored time intervals overlap. Time point and temporal are * transformed into a range defined by two time points (start, end). The function evaluates diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index ec4e42910894a..9803544e3168f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -2332,14 +2332,25 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) public static final BuiltInFunctionDefinition TO_TIMESTAMP_LTZ = BuiltInFunctionDefinition.newBuilder() - .name("toTimestampLtz") - .sqlName("TO_TIMESTAMP_LTZ") + .name("TO_TIMESTAMP_LTZ") .kind(SCALAR) .inputTypeStrategy( - sequence( - logical(LogicalTypeFamily.NUMERIC), - logical(LogicalTypeFamily.INTEGER_NUMERIC, false))) + or( + sequence(logical(LogicalTypeFamily.CHARACTER_STRING)), + sequence( + logical(LogicalTypeFamily.CHARACTER_STRING), + logical(LogicalTypeFamily.CHARACTER_STRING)), + sequence( + logical(LogicalTypeFamily.CHARACTER_STRING), + logical(LogicalTypeFamily.CHARACTER_STRING), + logical(LogicalTypeFamily.CHARACTER_STRING)), + sequence(logical(LogicalTypeFamily.NUMERIC)), + sequence( + logical(LogicalTypeFamily.NUMERIC), + logical(LogicalTypeFamily.INTEGER_NUMERIC, false)))) .outputTypeStrategy(SpecificTypeStrategies.TO_TIMESTAMP_LTZ) + .runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.ToTimestampLtzFunction") .build(); public static final BuiltInFunctionDefinition TO_TIMESTAMP = diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java index 722dd63e51d98..26bd99d0255c4 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java @@ -23,19 +23,54 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.CallContext; import org.apache.flink.table.types.inference.TypeStrategy; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import java.util.List; import java.util.Optional; /** Type strategy of {@code TO_TIMESTAMP_LTZ}. */ @Internal public class ToTimestampLtzTypeStrategy implements TypeStrategy { + private static final int DEFAULT_PRECISION = 3; + @Override public Optional inferType(CallContext callContext) { - if (callContext.isArgumentLiteral(1)) { - final int precision = callContext.getArgumentValue(1, Integer.class).get(); - return Optional.of(DataTypes.TIMESTAMP_LTZ(precision)); + List argumentTypes = callContext.getArgumentDataTypes(); + int argCount = argumentTypes.size(); + + if (argCount < 1 || argCount > 3) { + throw new IllegalArgumentException( + "TO_TIMESTAMP_LTZ requires 1 to 3 arguments, but " + + argCount + + "arguments were provided."); + } + + if (argCount == 1) { + // TO_TIMESTAMP_LTZ(numeric) + // TO_TIMESTAMP_LTZ(string) + return Optional.of(DataTypes.TIMESTAMP_LTZ(DEFAULT_PRECISION)); + } else if (argCount == 2) { + LogicalTypeRoot firstArgTypeRoot = argumentTypes.get(0).getLogicalType().getTypeRoot(); + boolean isFirstArgNumeric = + firstArgTypeRoot == LogicalTypeRoot.TINYINT + || firstArgTypeRoot == LogicalTypeRoot.SMALLINT + || firstArgTypeRoot == LogicalTypeRoot.INTEGER + || firstArgTypeRoot == LogicalTypeRoot.BIGINT + || firstArgTypeRoot == LogicalTypeRoot.FLOAT + || firstArgTypeRoot == LogicalTypeRoot.DOUBLE + || firstArgTypeRoot == LogicalTypeRoot.DECIMAL; + // TO_TIMESTAMP_LTZ(numeric, precision) + if (callContext.isArgumentLiteral(1) && isFirstArgNumeric) { + final int precision = callContext.getArgumentValue(1, Integer.class).get(); + return Optional.of(DataTypes.TIMESTAMP_LTZ(precision)); + } + // TO_TIMESTAMP_LTZ(string, format) + return Optional.of(DataTypes.TIMESTAMP_LTZ(DEFAULT_PRECISION)); + } else { + // argCount == 3 + // TO_TIMESTAMP_LTZ(string, format, timezone) + return Optional.of(DataTypes.TIMESTAMP_LTZ(DEFAULT_PRECISION)); } - return Optional.of(DataTypes.TIMESTAMP_LTZ(3)); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java index 3a71144fcb790..abd5369c2c401 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java @@ -365,6 +365,26 @@ public static TimestampData toTimestampData(double v, int precision) { } } + public static TimestampData toTimestampData(int v, int precision) { + switch (precision) { + case 0: + if (MIN_EPOCH_SECONDS <= v && v <= MAX_EPOCH_SECONDS) { + return timestampDataFromEpochMills((v * MILLIS_PER_SECOND)); + } else { + return null; + } + case 3: + return timestampDataFromEpochMills(v); + default: + throw new TableException( + "The precision value '" + + precision + + "' for function " + + "TO_TIMESTAMP_LTZ(numeric, precision) is unsupported," + + " the supported value is '0' for second or '3' for millisecond."); + } + } + public static TimestampData toTimestampData(DecimalData v, int precision) { long epochMills; switch (precision) { @@ -386,6 +406,18 @@ public static TimestampData toTimestampData(DecimalData v, int precision) { } } + public static TimestampData toTimestampData(long epoch) { + return toTimestampData(epoch, 3); + } + + public static TimestampData toTimestampData(double epoch) { + return toTimestampData(epoch, 3); + } + + public static TimestampData toTimestampData(DecimalData epoch) { + return toTimestampData(epoch, 3); + } + private static TimestampData timestampDataFromEpochMills(long epochMills) { if (MIN_EPOCH_MILLS <= epochMills && epochMills <= MAX_EPOCH_MILLS) { return TimestampData.fromEpochMillis(epochMills); @@ -421,6 +453,21 @@ public static TimestampData parseTimestampData(String dateStr, int precision, Ti .toInstant()); } + public static TimestampData toTimestampData(String dateStr, String format, String timezone) { + if (dateStr == null || format == null || timezone == null) { + return null; + } + + TimestampData ts = parseTimestampData(dateStr, format); + if (ts == null) { + return null; + } + + ZonedDateTime utcZoned = ts.toLocalDateTime().atZone(ZoneId.of("UTC")); + ZonedDateTime targetTime = utcZoned.withZoneSameInstant(ZoneId.of(timezone)); + return TimestampData.fromInstant(targetTime.toInstant()); + } + public static TimestampData parseTimestampData(String dateStr, String format) { DateTimeFormatter formatter = DATETIME_FORMATTER_CACHE.get(format); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java index f3b706282749f..b53101913a99c 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java @@ -18,7 +18,9 @@ package org.apache.flink.table.planner.functions; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.JsonExistsOnError; +import org.apache.flink.table.data.DecimalDataUtils; import org.apache.flink.table.expressions.TimeIntervalUnit; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; @@ -27,12 +29,15 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.ZoneId; +import java.time.ZoneOffset; import java.util.stream.Stream; import static org.apache.flink.table.api.DataTypes.BIGINT; import static org.apache.flink.table.api.DataTypes.BOOLEAN; import static org.apache.flink.table.api.DataTypes.DATE; import static org.apache.flink.table.api.DataTypes.DAY; +import static org.apache.flink.table.api.DataTypes.DOUBLE; import static org.apache.flink.table.api.DataTypes.HOUR; import static org.apache.flink.table.api.DataTypes.INT; import static org.apache.flink.table.api.DataTypes.INTERVAL; @@ -43,6 +48,8 @@ import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call; import static org.apache.flink.table.api.Expressions.temporalOverlaps; +import static org.apache.flink.table.api.Expressions.toTimestampLtz; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.literal; /** Test time-related built-in functions. */ class TimeFunctionsITCase extends BuiltInFunctionTestBase { @@ -53,7 +60,8 @@ Stream getTestSetSpecs() { extractTestCases(), temporalOverlapsTestCases(), ceilTestCases(), - floorTestCases()) + floorTestCases(), + toTimestampLtzTestCases()) .flatMap(s -> s); } @@ -734,4 +742,126 @@ private Stream floorTestCases() { LocalDateTime.of(2001, 1, 1, 0, 0), TIMESTAMP().nullable())); } + + private Stream toTimestampLtzTestCases() { + return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ) + .onFieldsWithData( + 100, + 1234, + -100, + null, + DecimalDataUtils.castFrom(-Double.MAX_VALUE, 38, 18)) + .andDataTypes( + DOUBLE(), BIGINT(), BIGINT(), BIGINT(), DataTypes.DECIMAL(38, 18)) + .testResult( + toTimestampLtz($("f0"), literal(0)), + "TO_TIMESTAMP_LTZ(f0, 0)", + LocalDateTime.of(1970, 1, 1, 0, 1, 40) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(0).nullable()) + .testResult( + toTimestampLtz($("f1"), literal(3)), + "TO_TIMESTAMP_LTZ(f1, 3)", + LocalDateTime.of(1970, 1, 1, 0, 0, 1, 234000000) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz($("f2"), literal(0)), + "TO_TIMESTAMP_LTZ(f2, 0)", + LocalDateTime.of(1969, 12, 31, 23, 58, 20) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(0).nullable()) + .testResult( + toTimestampLtz($("f3"), literal(3)), + "TO_TIMESTAMP_LTZ(f3, 3)", + null, + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz($("f4"), literal(0)), + "TO_TIMESTAMP_LTZ(-" + Double.MAX_VALUE + ", 0)", + null, // expecting NULL result + TIMESTAMP_LTZ(0).nullable()) + .testResult( + toTimestampLtz("2023-01-01 00:00:00"), + "TO_TIMESTAMP_LTZ('2023-01-01 00:00:00')", + LocalDateTime.of(2023, 1, 1, 0, 0, 0) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz("01/01/2023 00:00:00", "dd/MM/yyyy HH:mm:ss"), + "TO_TIMESTAMP_LTZ('01/01/2023 00:00:00', 'dd/MM/yyyy HH:mm:ss')", + LocalDateTime.of(2023, 1, 1, 0, 0, 0) + .atZone(ZoneOffset.UTC) // why UTC? + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz("1970-01-01 00:00:00.123456789"), + "TO_TIMESTAMP_LTZ('1970-01-01 00:00:00.123456789')", + LocalDateTime.of(1970, 1, 1, 0, 0, 0, 123000000) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz( + "1970-01-01 00:00:00.12345", "yyyy-MM-dd HH:mm:ss.SSSSS"), + "TO_TIMESTAMP_LTZ('1970-01-01 00:00:00.12345', 'yyyy-MM-dd HH:mm:ss.SSSSS')", + LocalDateTime.of(1970, 1, 1, 0, 0, 0, 123000000) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz("20000202 59:59.1234567", "yyyyMMdd mm:ss.SSSSSSS"), + "TO_TIMESTAMP_LTZ('20000202 59:59.1234567', 'yyyyMMdd mm:ss.SSSSSSS')", + LocalDateTime.of(2000, 2, 2, 0, 59, 59, 123000000) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz("1234567", "SSSSSSS"), + "TO_TIMESTAMP_LTZ('1234567', 'SSSSSSS')", + LocalDateTime.of(1970, 1, 1, 0, 0, 0, 123000000) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz( + "2017-09-15 00:00:00.12345", "yyyy-MM-dd HH:mm:ss.SSS"), + "TO_TIMESTAMP_LTZ('2017-09-15 00:00:00.12345', 'yyyy-MM-dd HH:mm:ss.SSS')", + LocalDateTime.of(2017, 9, 15, 0, 0, 0, 123000000) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz( + "2023-01-01 00:00:00", + "yyyy-MM-dd HH:mm:ss", + "Asia/Shanghai"), + "TO_TIMESTAMP_LTZ('2023-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss', 'Asia/Shanghai')", + LocalDateTime.of(2023, 1, 1, 8, 0, 0) + .atZone(ZoneId.of("Asia/Shanghai")) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz("2023-01-01 00:00:00", "yyyy-MM-dd HH:mm:ss", "UTC"), + "TO_TIMESTAMP_LTZ('2023-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss', 'UTC')", + LocalDateTime.of(2023, 1, 1, 0, 0, 0) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz( + "01/01/2023 08:00:00", + "dd/MM/yyyy HH:mm:ss", + "America/Los_Angeles"), + "TO_TIMESTAMP_LTZ('01/01/2023 08:00:00', 'dd/MM/yyyy HH:mm:ss', 'America/Los_Angeles')", + LocalDateTime.of(2023, 1, 1, 0, 0, 0) + .atZone(ZoneId.of("America/Los_Angeles")) + .toInstant(), + TIMESTAMP_LTZ(3).nullable())); + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala index 4fba6e34a08de..39ced7abdb29b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala @@ -1140,41 +1140,41 @@ class TemporalTypesTest extends ExpressionTestBase { tableConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai")) // INT -> TIMESTAMP_LTZ - testAllApis(toTimestampLtz(100, 0), "TO_TIMESTAMP_LTZ(100, 0)", "1970-01-01 08:01:40.000") + testAllApis(toTimestampLtz(100, 0), "TO_TIMESTAMP_LTZ(100, 0)", "1970-01-01 08:01:40") // TINYINT -> TIMESTAMP_LTZ testAllApis( toTimestampLtz(100.cast(DataTypes.TINYINT()), 0), "TO_TIMESTAMP_LTZ(CAST(100 AS TINYINT), 0)", - "1970-01-01 08:01:40.000") + "1970-01-01 08:01:40") // BIGINT -> TIMESTAMP_LTZ testAllApis( toTimestampLtz(100.cast(DataTypes.BIGINT()), 0), "TO_TIMESTAMP_LTZ(CAST(100 AS BIGINT), 0)", - "1970-01-01 08:01:40.000") + "1970-01-01 08:01:40") // FLOAT -> TIMESTAMP_LTZ testAllApis( toTimestampLtz(100.01.cast(DataTypes.FLOAT()), 0), "TO_TIMESTAMP_LTZ(CAST(100.01 AS FLOAT), 0)", - "1970-01-01 08:01:40.010") + "1970-01-01 08:01:40") // DOUBLE -> TIMESTAMP_LTZ testAllApis( toTimestampLtz(100.123.cast(DataTypes.DOUBLE()), 0), "TO_TIMESTAMP_LTZ(CAST(100.123 AS DOUBLE), 0)", - "1970-01-01 08:01:40.123") + "1970-01-01 08:01:40") // DECIMAL -> TIMESTAMP_LTZ testAllApis( toTimestampLtz(100.cast(DataTypes.DECIMAL(38, 18)), 0), "TO_TIMESTAMP_LTZ(100, 0)", - "1970-01-01 08:01:40.000") + "1970-01-01 08:01:40") testAllApis( toTimestampLtz(-100.cast(DataTypes.DECIMAL(38, 18)), 0), "TO_TIMESTAMP_LTZ(-100, 0)", - "1970-01-01 07:58:20.000") + "1970-01-01 07:58:20") // keep scale testAllApis(toTimestampLtz(1234, 3), "TO_TIMESTAMP_LTZ(1234, 3)", "1970-01-01 08:00:01.234") @@ -1185,13 +1185,13 @@ class TemporalTypesTest extends ExpressionTestBase { @Test def testToTimestampLtzUTC(): Unit = { tableConfig.setLocalTimeZone(ZoneId.of("UTC")) - testAllApis(toTimestampLtz(100, 0), "TO_TIMESTAMP_LTZ(100, 0)", "1970-01-01 00:01:40.000") + testAllApis(toTimestampLtz(100, 0), "TO_TIMESTAMP_LTZ(100, 0)", "1970-01-01 00:01:40") - testAllApis(toTimestampLtz(100, 0), "TO_TIMESTAMP_LTZ(100, 0)", "1970-01-01 00:01:40.000") + testAllApis(toTimestampLtz(100, 0), "TO_TIMESTAMP_LTZ(100, 0)", "1970-01-01 00:01:40") testAllApis(toTimestampLtz(1234, 3), "TO_TIMESTAMP_LTZ(1234, 3)", "1970-01-01 00:00:01.234") - testAllApis(toTimestampLtz(-100, 0), "TO_TIMESTAMP_LTZ(-100, 0)", "1969-12-31 23:58:20.000") + testAllApis(toTimestampLtz(-100, 0), "TO_TIMESTAMP_LTZ(-100, 0)", "1969-12-31 23:58:20") } @Test @@ -1202,21 +1202,21 @@ class TemporalTypesTest extends ExpressionTestBase { testAllApis( toTimestampLtz(JInt.MIN_VALUE.cast(DataTypes.INT()), 0), s"TO_TIMESTAMP_LTZ(CAST(${JInt.MIN_VALUE} AS INTEGER), 0)", - "1901-12-13 20:45:52.000") + "1901-12-13 20:45:52") testAllApis( toTimestampLtz(JInt.MAX_VALUE.cast(DataTypes.INT()), 0), s"TO_TIMESTAMP_LTZ(CAST(${JInt.MAX_VALUE} AS INTEGER), 0)", - "2038-01-19 03:14:07.000") + "2038-01-19 03:14:07") // TINYINT testAllApis( toTimestampLtz(-128.cast(DataTypes.TINYINT()), 0), s"TO_TIMESTAMP_LTZ(CAST(-128 AS TINYINT), 0)", - "1969-12-31 23:57:52.000") + "1969-12-31 23:57:52") testAllApis( toTimestampLtz(127.cast(DataTypes.TINYINT()), 0), s"TO_TIMESTAMP_LTZ(CAST(127 AS TINYINT), 0)", - "1970-01-01 00:02:07.000") + "1970-01-01 00:02:07") // BIGINT testAllApis( diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java new file mode 100644 index 0000000000000..edb662d97002d --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. + * + *

Supported function signatures: TO_TIMESTAMP_LTZ(numeric, precision) -> + * TIMESTAMP_LTZ(precision) TO_TIMESTAMP_LTZ(string) -> TIMESTAMP_LTZ(3) TO_TIMESTAMP_LTZ(string, + * format) -> TIMESTAMP_LTZ(3) TO_TIMESTAMP_LTZ(string, format, timezone) -> TIMESTAMP_LTZ(3) + * TO_TIMESTAMP_LTZ(numeric) -> TIMESTAMP_LTZ(3) + */ +@Internal +public class ToTimestampLtzFunction extends BuiltInScalarFunction { + + public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext context) { + super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context); + } + + public TimestampData eval(Integer epoch, Integer precision) { + if (epoch == null || precision == null) { + return null; + } + + return DateTimeUtils.toTimestampData(epoch, precision); + } + + public TimestampData eval(Long epoch, Integer precision) { + if (epoch == null || precision == null) { + return null; + } + + return DateTimeUtils.toTimestampData(epoch, precision); + } + + public TimestampData eval(Double epoch, Integer precision) { + if (epoch == null || precision == null) { + return null; + } + + return DateTimeUtils.toTimestampData(epoch, precision); + } + + public TimestampData eval(Float value, Integer precision) { + if (value == null || precision == null) { + return null; + } + return DateTimeUtils.toTimestampData(value.longValue(), precision); + } + + public TimestampData eval(Byte value, Integer precision) { + if (value == null || precision == null) { + return null; + } + return DateTimeUtils.toTimestampData(value.longValue(), precision); + } + + public TimestampData eval(DecimalData epoch, Integer precision) { + if (epoch == null || precision == null) { + return null; + } + + return DateTimeUtils.toTimestampData(epoch, precision); + } + + public TimestampData eval(Integer epoch) { + if (epoch == null) { + return null; + } + + return eval(epoch, 3); + } + + public TimestampData eval(Long epoch) { + if (epoch == null) { + return null; + } + + return eval(epoch, 3); + } + + public TimestampData eval(Float epoch) { + if (epoch == null) { + return null; + } + + return eval(epoch, 3); + } + + public TimestampData eval(Byte epoch) { + if (epoch == null) { + return null; + } + + return eval(epoch, 3); + } + + public TimestampData eval(Double epoch) { + if (epoch == null) { + return null; + } + + return eval(epoch, 3); + } + + public TimestampData eval(DecimalData epoch) { + if (epoch == null) { + return null; + } + + return eval(epoch, 3); + } + + // Parse timestamp with default format + public TimestampData eval(StringData timestamp) { + if (timestamp == null) { + return null; + } + + return DateTimeUtils.parseTimestampData(timestamp.toString()); + } + + // Parse timestamp with custom format + public TimestampData eval(StringData timestamp, StringData format) { + if (timestamp == null || format == null) { + return null; + } + + return DateTimeUtils.parseTimestampData(timestamp.toString(), format.toString()); + } + + // Parse timestamp with format and timezone + public TimestampData eval(StringData timestamp, StringData format, StringData timezone) { + if (timestamp == null || format == null || timezone == null) { + return null; + } + + return DateTimeUtils.toTimestampData( + timestamp.toString(), format.toString(), timezone.toString()); + } +}