Skip to content

Commit

Permalink
[FLLINK-36862][table] Address Comments
Browse files Browse the repository at this point in the history
  • Loading branch information
yiyutian1 committed Dec 16, 2024
1 parent 750f965 commit 5335325
Show file tree
Hide file tree
Showing 15 changed files with 392 additions and 125 deletions.
8 changes: 4 additions & 4 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -681,13 +681,13 @@ temporal:
description: Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date.
- sql: TO_TIMESTAMP_LTZ(numeric[, precision])
table: toTimestampLtz(NUMERIC, PRECISION)
description: "Converts a epoch seconds or epoch milliseconds to a TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents TO_TIMESTAMP_LTZ(epochMilliseconds, 3). If no precision is provided, the default precision is 3."
description: Converts an epoch seconds or epoch milliseconds to a TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents TO_TIMESTAMP_LTZ(epochMilliseconds, 3). If no precision is provided, the default precision is 3.
- sql: TO_TIMESTAMP_LTZ(string1[, string2])
table: toTimestampLtz(STRING1[, STRING2])
description: "Converts a timestamp string string1 with format string2 (by default 'yyyy-MM-dd HH:mm:ss.SSS') to a TIMESTAMP_LTZ".
- sql: TO_TIMESTAMP_TZ(string1, string2, string3)
description: Converts a timestamp string string1 with format string2 (by default 'yyyy-MM-dd HH:mm:ss.SSS') to a TIMESTAMP_LTZ.
- sql: TO_TIMESTAMP_LTZ(string1, string2, string3)
table: toTimestampLtz(STRING1, STRING2, STRING3)
description: "Converts a timestamp string string1 with format string2 to a TIMESTAMP_TZ in time zone string3."
description: Converts a timestamp string string1 with format string2 to a TIMESTAMP_LTZ in time zone string3.
- sql: TO_TIMESTAMP(string1[, string2])
table: toTimestamp(STRING1[, STRING2])
description: "Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone."
Expand Down
14 changes: 9 additions & 5 deletions docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -805,11 +805,15 @@ temporal:
- sql: TO_DATE(string1[, string2])
table: toDate(STRING1[, STRING2])
description: 将格式为 string2(默认为 'yyyy-MM-dd')的字符串 string1 转换为日期。
- sql: TO_TIMESTAMP_LTZ(numeric, precision)
table: toTimestampLtz(numeric, PRECISION)
description: |
将纪元秒或纪元毫秒转换为 TIMESTAMP_LTZ,有效精度为 0 或 3,0 代表 `TO_TIMESTAMP_LTZ(epochSeconds, 0)`,
3 代表` TO_TIMESTAMP_LTZ(epochMilliseconds, 3)`。
- sql: TO_TIMESTAMP_LTZ(numeric[, precision])
table: toTimestampLtz(NUMERIC, PRECISION)
description: Converts an epoch seconds or epoch milliseconds to a TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents TO_TIMESTAMP_LTZ(epochMilliseconds, 3). If no precision is provided, the default precision is 3.
- sql: TO_TIMESTAMP_LTZ(string1[, string2])
table: toTimestampLtz(STRING1[, STRING2])
description: Converts a timestamp string string1 with format string2 (by default 'yyyy-MM-dd HH:mm:ss.SSS') to a TIMESTAMP_LTZ.
- sql: TO_TIMESTAMP_LTZ(string1, string2, string3)
table: toTimestampLtz(STRING1, STRING2, STRING3)
description: Converts a timestamp string string1 with format string2 to a TIMESTAMP_LTZ in time zone string3.
- sql: TO_TIMESTAMP(string1[, string2])
table: toTimestamp(STRING1[, STRING2])
description: 将格式为 string2(默认为:'yyyy-MM-dd HH:mm:ss')的字符串 string1 转换为 timestamp,不带时区。
Expand Down
37 changes: 28 additions & 9 deletions flink-python/pyflink/table/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,19 +306,38 @@ def to_timestamp(timestamp_str: Union[str, Expression[str]],
return _binary_op("toTimestamp", timestamp_str, format)


def to_timestamp_ltz(numeric_epoch_time, precision) -> Expression:
def to_timestamp_ltz(*args) -> Expression:
"""
Converts a numeric type epoch time to TIMESTAMP_LTZ.
Converts a value to a timestamp with local time zone.
The supported precision is 0 or 3:
0 means the numericEpochTime is in second.
3 means the numericEpochTime is in millisecond.
Supported signatures:
1. to_timestamp_ltz(numeric) -> timestamp_ltz
2. to_timestamp_ltz(numeric, precision) -> timestamp_ltz
3. to_timestamp_ltz(string) -> timestamp_ltz
4. to_timestamp_ltz(string, format) -> timestamp_ltz
5. to_timestamp_ltz(string, format, timezone) -> timestamp_ltz
:param numeric_epoch_time: The epoch time with numeric type
:param precision: The precision to indicate the epoch time is in second or millisecond
:return: The timestamp value with TIMESTAMP_LTZ type.
Example:
::
>>> table.select(to_timestamp_ltz(100)) # numeric with default precision
>>> table.select(to_timestamp_ltz(100, 0)) # numeric with second precision
>>> table.select(to_timestamp_ltz("2023-01-01 00:00:00")) # string with default format
>>> table.select(to_timestamp_ltz("01/01/2023", "MM/dd/yyyy")) # string with format
>>> table.select(to_timestamp_ltz("2023-01-01 00:00:00",
"yyyy-MM-dd HH:mm:ss",
"UTC")) # string with format and timezone
"""
return _binary_op("toTimestampLtz", numeric_epoch_time, precision)
if len(args) == 1:
return _unary_op("toTimestampLtz", lit(args[0]))

# For two arguments case (numeric + precision or string + format)
elif len(args) == 2:
return _binary_op("toTimestampLtz", lit(args[0]), lit(args[1]))

# For three arguments case (string + format + timezone)
elif len(args) == 3:
return _ternary_op("toTimestampLtz", lit(args[0]), lit(args[1]), lit(args[2]))


def temporal_overlaps(left_time_point,
Expand Down
9 changes: 8 additions & 1 deletion flink-python/pyflink/table/tests/test_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,14 @@ def test_expressions(self):
self.assertEqual("toDate('2018-03-18')", str(to_date('2018-03-18')))
self.assertEqual("toDate('2018-03-18', 'yyyy-MM-dd')",
str(to_date('2018-03-18', 'yyyy-MM-dd')))
self.assertEqual('toTimestampLtz(123, 0)', str(to_timestamp_ltz(123, 0)))
self.assertEqual('toTimestampLtz(100)', str(to_timestamp_ltz(100)))
self.assertEqual("toTimestampLtz('2023-01-01 00:00:00')",
str(to_timestamp_ltz('2023-01-01 00:00:00')))
self.assertEqual("toTimestampLtz('01/01/2023 00:00:00', 'MM/dd/yyyy HH:mm:ss')",
str(to_timestamp_ltz("01/01/2023 00:00:00", "MM/dd/yyyy HH:mm:ss")))
self.assertEqual("toTimestampLtz('2023-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss', 'UTC')",
str(to_timestamp_ltz("2023-01-01 00:00:00", "yyyy-MM-dd HH:mm:ss", "UTC")))
self.assertEqual("toTimestampLtz(123, 0)", str(to_timestamp_ltz(123, 0)))
self.assertEqual("toTimestamp('1970-01-01 08:01:40')",
str(to_timestamp('1970-01-01 08:01:40')))
self.assertEqual("toTimestamp('1970-01-01 08:01:40', 'yyyy-MM-dd HH:mm:ss')",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2332,7 +2332,8 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)

public static final BuiltInFunctionDefinition TO_TIMESTAMP_LTZ =
BuiltInFunctionDefinition.newBuilder()
.name("TO_TIMESTAMP_LTZ")
.name("toTimestampLtz")
.sqlName("TO_TIMESTAMP_LTZ")
.kind(SCALAR)
.inputTypeStrategy(
or(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.TypeStrategy;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.LogicalTypeRoot;

import java.util.List;
Expand All @@ -40,37 +43,56 @@ public Optional<DataType> inferType(CallContext callContext) {
int argCount = argumentTypes.size();

if (argCount < 1 || argCount > 3) {
throw new IllegalArgumentException(
"TO_TIMESTAMP_LTZ requires 1 to 3 arguments, but "
throw new ValidationException(
"Unsupported argument type. "
+ "TO_TIMESTAMP_LTZ requires 1 to 3 arguments, but "
+ argCount
+ "arguments were provided.");
+ " were provided.");
}

LogicalType firstType = argumentTypes.get(0).getLogicalType();
LogicalTypeRoot firstTypeRoot = firstType.getTypeRoot();

if (argCount == 1) {
// TO_TIMESTAMP_LTZ(numeric)
// TO_TIMESTAMP_LTZ(string)
return Optional.of(DataTypes.TIMESTAMP_LTZ(DEFAULT_PRECISION));
if (!isCharacterType(firstTypeRoot) && !firstType.is(LogicalTypeFamily.NUMERIC)) {
throw new ValidationException(
"Unsupported argument type. "
+ "When taking 1 argument, TO_TIMESTAMP_LTZ accepts <CHARACTER> or <NUMERIC>.");
}
} else if (argCount == 2) {
LogicalTypeRoot firstArgTypeRoot = argumentTypes.get(0).getLogicalType().getTypeRoot();
boolean isFirstArgNumeric =
firstArgTypeRoot == LogicalTypeRoot.TINYINT
|| firstArgTypeRoot == LogicalTypeRoot.SMALLINT
|| firstArgTypeRoot == LogicalTypeRoot.INTEGER
|| firstArgTypeRoot == LogicalTypeRoot.BIGINT
|| firstArgTypeRoot == LogicalTypeRoot.FLOAT
|| firstArgTypeRoot == LogicalTypeRoot.DOUBLE
|| firstArgTypeRoot == LogicalTypeRoot.DECIMAL;
// TO_TIMESTAMP_LTZ(numeric, precision)
if (callContext.isArgumentLiteral(1) && isFirstArgNumeric) {
final int precision = callContext.getArgumentValue(1, Integer.class).get();
return Optional.of(DataTypes.TIMESTAMP_LTZ(precision));
LogicalType secondType = argumentTypes.get(1).getLogicalType();
LogicalTypeRoot secondTypeRoot = secondType.getTypeRoot();
if (firstType.is(LogicalTypeFamily.NUMERIC)) {
if (secondTypeRoot != LogicalTypeRoot.INTEGER) {
throw new ValidationException(
"Unsupported argument type. "
+ "TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>) requires the second argument to be <INTEGER>.");
}
} else if (isCharacterType(firstTypeRoot)) {
if (!isCharacterType(secondTypeRoot)) {
throw new ValidationException(
"Unsupported argument type. "
+ "TO_TIMESTAMP_LTZ(<CHARACTER>, <CHARACTER>) requires the second argument to be <CHARACTER>.");
}
} else {
throw new ValidationException(
"Unsupported argument type. "
+ "When taking 2 arguments, TO_TIMESTAMP_LTZ requires the first argument to be <NUMERIC> or <CHARACTER>.");
}
} else if (argCount == 3) {
if (!isCharacterType(firstTypeRoot)
|| !isCharacterType(argumentTypes.get(1).getLogicalType().getTypeRoot())
|| !isCharacterType(argumentTypes.get(2).getLogicalType().getTypeRoot())) {
throw new ValidationException(
"Unsupported argument type. "
+ "When taking 3 arguments, TO_TIMESTAMP_LTZ requires all three arguments to be of type <CHARACTER>.");
}
// TO_TIMESTAMP_LTZ(string, format)
return Optional.of(DataTypes.TIMESTAMP_LTZ(DEFAULT_PRECISION));
} else {
// argCount == 3
// TO_TIMESTAMP_LTZ(string, format, timezone)
return Optional.of(DataTypes.TIMESTAMP_LTZ(DEFAULT_PRECISION));
}

return Optional.of(DataTypes.TIMESTAMP_LTZ(DEFAULT_PRECISION).nullable());
}

private boolean isCharacterType(LogicalTypeRoot typeRoot) {
return typeRoot == LogicalTypeRoot.CHAR || typeRoot == LogicalTypeRoot.VARCHAR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static java.time.temporal.ChronoField.NANO_OF_SECOND;
import static java.time.temporal.ChronoField.SECOND_OF_MINUTE;
import static java.time.temporal.ChronoField.YEAR;
import static org.apache.flink.table.types.logical.YearMonthIntervalType.DEFAULT_PRECISION;

/**
* Utility functions for datetime types: date, time, timestamp.
Expand Down Expand Up @@ -142,6 +143,8 @@ public class DateTimeUtils {
.optionalEnd()
.toFormatter();

private static final Integer DEFAULT_PRECISION = 3;

/**
* A ThreadLocal cache map for SimpleDateFormat, because SimpleDateFormat is not thread-safe.
* (string_format) => formatter
Expand Down Expand Up @@ -385,6 +388,18 @@ public static TimestampData toTimestampData(int v, int precision) {
}
}

public static TimestampData toTimestampData(long epoch) {
return toTimestampData(epoch, DEFAULT_PRECISION);
}

public static TimestampData toTimestampData(double epoch) {
return toTimestampData(epoch, DEFAULT_PRECISION);
}

public static TimestampData toTimestampData(DecimalData epoch) {
return toTimestampData(epoch, DEFAULT_PRECISION);
}

public static TimestampData toTimestampData(DecimalData v, int precision) {
long epochMills;
switch (precision) {
Expand All @@ -406,18 +421,6 @@ public static TimestampData toTimestampData(DecimalData v, int precision) {
}
}

public static TimestampData toTimestampData(long epoch) {
return toTimestampData(epoch, 3);
}

public static TimestampData toTimestampData(double epoch) {
return toTimestampData(epoch, 3);
}

public static TimestampData toTimestampData(DecimalData epoch) {
return toTimestampData(epoch, 3);
}

private static TimestampData timestampDataFromEpochMills(long epochMills) {
if (MIN_EPOCH_MILLS <= epochMills && epochMills <= MAX_EPOCH_MILLS) {
return TimestampData.fromEpochMillis(epochMills);
Expand Down Expand Up @@ -453,7 +456,7 @@ public static TimestampData parseTimestampData(String dateStr, int precision, Ti
.toInstant());
}

public static TimestampData toTimestampData(String dateStr, String format, String timezone) {
public static TimestampData parseTimestampData(String dateStr, String format, String timezone) {
if (dateStr == null || format == null || timezone == null) {
return null;
}
Expand Down
Loading

0 comments on commit 5335325

Please sign in to comment.