Skip to content

Commit

Permalink
[FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions
Browse files Browse the repository at this point in the history
  • Loading branch information
yiyutian1 committed Dec 12, 2024
1 parent 6951686 commit 750f965
Show file tree
Hide file tree
Showing 8 changed files with 475 additions and 26 deletions.
10 changes: 8 additions & 2 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -679,9 +679,15 @@ temporal:
- sql: TO_DATE(string1[, string2])
table: toDate(STRING1[, STRING2])
description: Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date.
- sql: TO_TIMESTAMP_LTZ(numeric, precision)
- sql: TO_TIMESTAMP_LTZ(numeric[, precision])
table: toTimestampLtz(NUMERIC, PRECISION)
description: "Converts a epoch seconds or epoch milliseconds to a TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents TO_TIMESTAMP_LTZ(epochMilliseconds, 3)."
description: "Converts a epoch seconds or epoch milliseconds to a TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents TO_TIMESTAMP_LTZ(epochMilliseconds, 3). If no precision is provided, the default precision is 3."
- sql: TO_TIMESTAMP_LTZ(string1[, string2])
table: toTimestampLtz(STRING1[, STRING2])
description: "Converts a timestamp string string1 with format string2 (by default 'yyyy-MM-dd HH:mm:ss.SSS') to a TIMESTAMP_LTZ".
- sql: TO_TIMESTAMP_TZ(string1, string2, string3)
table: toTimestampLtz(STRING1, STRING2, STRING3)
description: "Converts a timestamp string string1 with format string2 to a TIMESTAMP_TZ in time zone string3."
- sql: TO_TIMESTAMP(string1[, string2])
table: toTimestamp(STRING1[, STRING2])
description: "Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,61 @@ public static ApiExpression toTimestampLtz(Object numericEpochTime, Object preci
return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, numericEpochTime, precision);
}

/**
* * Converts the given time string with the specified format to {@link
* DataTypes#TIMESTAMP_LTZ(int)}.
*
* @param timestampStr The timestamp string to convert.
* @param format The format of the string.
* @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} type.
*/
public static ApiExpression toTimestampLtz(String timestampStr, String format) {
return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, timestampStr, format);
}

/**
* Converts a timestamp to {@link DataTypes#TIMESTAMP_LTZ(int)}.
*
* <p>This method takes an object representing a timestamp and converts it to a TIMESTAMP_LTZ
* using the built-in TO_TIMESTAMP_LTZ function definition.
*
* @param timeStamp The timestamp string to be converted.
* @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} type.
*/
public static ApiExpression toTimestampLtz(String timeStamp) {
return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, timeStamp);
}

/**
* Converts a numeric type epoch time to {@link DataTypes#TIMESTAMP_LTZ(int)}.
*
* <p>This method takes an object representing an epoch time and converts it to a TIMESTAMP_LTZ
* using the built-in TO_TIMESTAMP_LTZ function definition.
*
* @param numericEpochTime The epoch time with numeric type.
* @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} type.
*/
public static ApiExpression toTimestampLtz(Object numericEpochTime) {
return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, numericEpochTime);
}

/**
* Converts a string timestamp with the custom format and timezone to {@link
* DataTypes#TIMESTAMP_LTZ(int)}.
*
* <p>The timestamp string will be parsed using the custom format and timezone, and converted to
* a TIMESTAMP_LTZ value.
*
* @param timestampStr The timestamp string to convert.
* @param format The format pattern to parse the timestamp string.
* @param timezone The timezone to use for the conversion.
* @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} type.
*/
public static ApiExpression toTimestampLtz(
Object timestampStr, Object format, Object timezone) {
return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, timestampStr, format, timezone);
}

/**
* Determines whether two anchored time intervals overlap. Time point and temporal are
* transformed into a range defined by two time points (start, end). The function evaluates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2332,14 +2332,25 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)

public static final BuiltInFunctionDefinition TO_TIMESTAMP_LTZ =
BuiltInFunctionDefinition.newBuilder()
.name("toTimestampLtz")
.sqlName("TO_TIMESTAMP_LTZ")
.name("TO_TIMESTAMP_LTZ")
.kind(SCALAR)
.inputTypeStrategy(
sequence(
logical(LogicalTypeFamily.NUMERIC),
logical(LogicalTypeFamily.INTEGER_NUMERIC, false)))
or(
sequence(logical(LogicalTypeFamily.CHARACTER_STRING)),
sequence(
logical(LogicalTypeFamily.CHARACTER_STRING),
logical(LogicalTypeFamily.CHARACTER_STRING)),
sequence(
logical(LogicalTypeFamily.CHARACTER_STRING),
logical(LogicalTypeFamily.CHARACTER_STRING),
logical(LogicalTypeFamily.CHARACTER_STRING)),
sequence(logical(LogicalTypeFamily.NUMERIC)),
sequence(
logical(LogicalTypeFamily.NUMERIC),
logical(LogicalTypeFamily.INTEGER_NUMERIC, false))))
.outputTypeStrategy(SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
.runtimeClass(
"org.apache.flink.table.runtime.functions.scalar.ToTimestampLtzFunction")
.build();

public static final BuiltInFunctionDefinition TO_TIMESTAMP =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,54 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.TypeStrategy;
import org.apache.flink.table.types.logical.LogicalTypeRoot;

import java.util.List;
import java.util.Optional;

/** Type strategy of {@code TO_TIMESTAMP_LTZ}. */
@Internal
public class ToTimestampLtzTypeStrategy implements TypeStrategy {

private static final int DEFAULT_PRECISION = 3;

@Override
public Optional<DataType> inferType(CallContext callContext) {
if (callContext.isArgumentLiteral(1)) {
final int precision = callContext.getArgumentValue(1, Integer.class).get();
return Optional.of(DataTypes.TIMESTAMP_LTZ(precision));
List<DataType> argumentTypes = callContext.getArgumentDataTypes();
int argCount = argumentTypes.size();

if (argCount < 1 || argCount > 3) {
throw new IllegalArgumentException(
"TO_TIMESTAMP_LTZ requires 1 to 3 arguments, but "
+ argCount
+ "arguments were provided.");
}

if (argCount == 1) {
// TO_TIMESTAMP_LTZ(numeric)
// TO_TIMESTAMP_LTZ(string)
return Optional.of(DataTypes.TIMESTAMP_LTZ(DEFAULT_PRECISION));
} else if (argCount == 2) {
LogicalTypeRoot firstArgTypeRoot = argumentTypes.get(0).getLogicalType().getTypeRoot();
boolean isFirstArgNumeric =
firstArgTypeRoot == LogicalTypeRoot.TINYINT
|| firstArgTypeRoot == LogicalTypeRoot.SMALLINT
|| firstArgTypeRoot == LogicalTypeRoot.INTEGER
|| firstArgTypeRoot == LogicalTypeRoot.BIGINT
|| firstArgTypeRoot == LogicalTypeRoot.FLOAT
|| firstArgTypeRoot == LogicalTypeRoot.DOUBLE
|| firstArgTypeRoot == LogicalTypeRoot.DECIMAL;
// TO_TIMESTAMP_LTZ(numeric, precision)
if (callContext.isArgumentLiteral(1) && isFirstArgNumeric) {
final int precision = callContext.getArgumentValue(1, Integer.class).get();
return Optional.of(DataTypes.TIMESTAMP_LTZ(precision));
}
// TO_TIMESTAMP_LTZ(string, format)
return Optional.of(DataTypes.TIMESTAMP_LTZ(DEFAULT_PRECISION));
} else {
// argCount == 3
// TO_TIMESTAMP_LTZ(string, format, timezone)
return Optional.of(DataTypes.TIMESTAMP_LTZ(DEFAULT_PRECISION));
}
return Optional.of(DataTypes.TIMESTAMP_LTZ(3));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,26 @@ public static TimestampData toTimestampData(double v, int precision) {
}
}

public static TimestampData toTimestampData(int v, int precision) {
switch (precision) {
case 0:
if (MIN_EPOCH_SECONDS <= v && v <= MAX_EPOCH_SECONDS) {
return timestampDataFromEpochMills((v * MILLIS_PER_SECOND));
} else {
return null;
}
case 3:
return timestampDataFromEpochMills(v);
default:
throw new TableException(
"The precision value '"
+ precision
+ "' for function "
+ "TO_TIMESTAMP_LTZ(numeric, precision) is unsupported,"
+ " the supported value is '0' for second or '3' for millisecond.");
}
}

public static TimestampData toTimestampData(DecimalData v, int precision) {
long epochMills;
switch (precision) {
Expand All @@ -386,6 +406,18 @@ public static TimestampData toTimestampData(DecimalData v, int precision) {
}
}

public static TimestampData toTimestampData(long epoch) {
return toTimestampData(epoch, 3);
}

public static TimestampData toTimestampData(double epoch) {
return toTimestampData(epoch, 3);
}

public static TimestampData toTimestampData(DecimalData epoch) {
return toTimestampData(epoch, 3);
}

private static TimestampData timestampDataFromEpochMills(long epochMills) {
if (MIN_EPOCH_MILLS <= epochMills && epochMills <= MAX_EPOCH_MILLS) {
return TimestampData.fromEpochMillis(epochMills);
Expand Down Expand Up @@ -421,6 +453,21 @@ public static TimestampData parseTimestampData(String dateStr, int precision, Ti
.toInstant());
}

public static TimestampData toTimestampData(String dateStr, String format, String timezone) {
if (dateStr == null || format == null || timezone == null) {
return null;
}

TimestampData ts = parseTimestampData(dateStr, format);
if (ts == null) {
return null;
}

ZonedDateTime utcZoned = ts.toLocalDateTime().atZone(ZoneId.of("UTC"));
ZonedDateTime targetTime = utcZoned.withZoneSameInstant(ZoneId.of(timezone));
return TimestampData.fromInstant(targetTime.toInstant());
}

public static TimestampData parseTimestampData(String dateStr, String format) {
DateTimeFormatter formatter = DATETIME_FORMATTER_CACHE.get(format);

Expand Down
Loading

0 comments on commit 750f965

Please sign in to comment.