Skip to content

Commit

Permalink
[FLLINK-36862][table] Fix CI errors
Browse files Browse the repository at this point in the history
  • Loading branch information
yiyutian1 committed Dec 13, 2024
1 parent 750f965 commit afbf75f
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 87 deletions.
6 changes: 3 additions & 3 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -681,13 +681,13 @@ temporal:
description: Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date.
- sql: TO_TIMESTAMP_LTZ(numeric[, precision])
table: toTimestampLtz(NUMERIC, PRECISION)
description: "Converts a epoch seconds or epoch milliseconds to a TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents TO_TIMESTAMP_LTZ(epochMilliseconds, 3). If no precision is provided, the default precision is 3."
description: "Converts an epoch seconds or epoch milliseconds to a TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents TO_TIMESTAMP_LTZ(epochMilliseconds, 3). If no precision is provided, the default precision is 3."
- sql: TO_TIMESTAMP_LTZ(string1[, string2])
table: toTimestampLtz(STRING1[, STRING2])
description: "Converts a timestamp string string1 with format string2 (by default 'yyyy-MM-dd HH:mm:ss.SSS') to a TIMESTAMP_LTZ".
description: "Converts a timestamp string string1 with format string2 (by default 'yyyy-MM-dd HH:mm:ss.SSS') to a TIMESTAMP_LTZ."
- sql: TO_TIMESTAMP_TZ(string1, string2, string3)
table: toTimestampLtz(STRING1, STRING2, STRING3)
description: "Converts a timestamp string string1 with format string2 to a TIMESTAMP_TZ in time zone string3."
description: "Converts a timestamp string string1 with format string2 to a TIMESTAMP_LTZ in time zone string3."
- sql: TO_TIMESTAMP(string1[, string2])
table: toTimestamp(STRING1[, STRING2])
description: "Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone."
Expand Down
37 changes: 28 additions & 9 deletions flink-python/pyflink/table/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,19 +306,38 @@ def to_timestamp(timestamp_str: Union[str, Expression[str]],
return _binary_op("toTimestamp", timestamp_str, format)


def to_timestamp_ltz(numeric_epoch_time, precision) -> Expression:
def to_timestamp_ltz(*args) -> Expression:
"""
Converts a numeric type epoch time to TIMESTAMP_LTZ.
Converts a value to a timestamp with local time zone.
The supported precision is 0 or 3:
0 means the numericEpochTime is in second.
3 means the numericEpochTime is in millisecond.
Supported signatures:
1. to_timestamp_ltz(numeric) -> timestamp_ltz
2. to_timestamp_ltz(numeric, precision) -> timestamp_ltz
3. to_timestamp_ltz(string) -> timestamp_ltz
4. to_timestamp_ltz(string, format) -> timestamp_ltz
5. to_timestamp_ltz(string, format, timezone) -> timestamp_ltz
:param numeric_epoch_time: The epoch time with numeric type
:param precision: The precision to indicate the epoch time is in second or millisecond
:return: The timestamp value with TIMESTAMP_LTZ type.
Example:
::
>>> table.select(to_timestamp_ltz(100)) # numeric with default precision
>>> table.select(to_timestamp_ltz(100, 0)) # numeric with second precision
>>> table.select(to_timestamp_ltz("2023-01-01 00:00:00")) # string with default format
>>> table.select(to_timestamp_ltz("01/01/2023", "MM/dd/yyyy")) # string with format
>>> table.select(to_timestamp_ltz("2023-01-01 00:00:00",
"yyyy-MM-dd HH:mm:ss",
"UTC")) # string with format and timezone
"""
return _binary_op("toTimestampLtz", numeric_epoch_time, precision)
if len(args) == 1:
return _unary_op("toTimestampLtz", lit(args[0]))

# For two arguments case (numeric + precision or string + format)
elif len(args) == 2:
return _binary_op("toTimestampLtz", lit(args[0]), lit(args[1]))

# For three arguments case (string + format + timezone)
elif len(args) == 3:
return _ternary_op("toTimestampLtz", lit(args[0]), lit(args[1]), lit(args[2]))


def temporal_overlaps(left_time_point,
Expand Down
9 changes: 8 additions & 1 deletion flink-python/pyflink/table/tests/test_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,14 @@ def test_expressions(self):
self.assertEqual("toDate('2018-03-18')", str(to_date('2018-03-18')))
self.assertEqual("toDate('2018-03-18', 'yyyy-MM-dd')",
str(to_date('2018-03-18', 'yyyy-MM-dd')))
self.assertEqual('toTimestampLtz(123, 0)', str(to_timestamp_ltz(123, 0)))
self.assertEqual('toTimestampLtz(100)', str(to_timestamp_ltz(100)))
self.assertEqual("toTimestampLtz('2023-01-01 00:00:00')",
str(to_timestamp_ltz('2023-01-01 00:00:00')))
self.assertEqual("toTimestampLtz('01/01/2023 00:00:00', 'MM/dd/yyyy HH:mm:ss')",
str(to_timestamp_ltz("01/01/2023 00:00:00", "MM/dd/yyyy HH:mm:ss")))
self.assertEqual("toTimestampLtz('2023-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss', 'UTC')",
str(to_timestamp_ltz("2023-01-01 00:00:00", "yyyy-MM-dd HH:mm:ss", "UTC")))
self.assertEqual("toTimestampLtz(123, 0)", str(to_timestamp_ltz(123, 0)))
self.assertEqual("toTimestamp('1970-01-01 08:01:40')",
str(to_timestamp('1970-01-01 08:01:40')))
self.assertEqual("toTimestamp('1970-01-01 08:01:40', 'yyyy-MM-dd HH:mm:ss')",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2332,7 +2332,9 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)

public static final BuiltInFunctionDefinition TO_TIMESTAMP_LTZ =
BuiltInFunctionDefinition.newBuilder()
.name("TO_TIMESTAMP_LTZ")
.name("toTimestampLtz")
.sqlName("TO_TIMESTAMP_LTZ")
// .name("TO_TIMESTAMP_LTZ")
.kind(SCALAR)
.inputTypeStrategy(
or(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.TypeStrategy;
import org.apache.flink.table.types.logical.LogicalTypeRoot;

import java.util.List;
import java.util.Optional;
Expand All @@ -43,34 +42,9 @@ public Optional<DataType> inferType(CallContext callContext) {
throw new IllegalArgumentException(
"TO_TIMESTAMP_LTZ requires 1 to 3 arguments, but "
+ argCount
+ "arguments were provided.");
+ " were provided.");
}

if (argCount == 1) {
// TO_TIMESTAMP_LTZ(numeric)
// TO_TIMESTAMP_LTZ(string)
return Optional.of(DataTypes.TIMESTAMP_LTZ(DEFAULT_PRECISION));
} else if (argCount == 2) {
LogicalTypeRoot firstArgTypeRoot = argumentTypes.get(0).getLogicalType().getTypeRoot();
boolean isFirstArgNumeric =
firstArgTypeRoot == LogicalTypeRoot.TINYINT
|| firstArgTypeRoot == LogicalTypeRoot.SMALLINT
|| firstArgTypeRoot == LogicalTypeRoot.INTEGER
|| firstArgTypeRoot == LogicalTypeRoot.BIGINT
|| firstArgTypeRoot == LogicalTypeRoot.FLOAT
|| firstArgTypeRoot == LogicalTypeRoot.DOUBLE
|| firstArgTypeRoot == LogicalTypeRoot.DECIMAL;
// TO_TIMESTAMP_LTZ(numeric, precision)
if (callContext.isArgumentLiteral(1) && isFirstArgNumeric) {
final int precision = callContext.getArgumentValue(1, Integer.class).get();
return Optional.of(DataTypes.TIMESTAMP_LTZ(precision));
}
// TO_TIMESTAMP_LTZ(string, format)
return Optional.of(DataTypes.TIMESTAMP_LTZ(DEFAULT_PRECISION));
} else {
// argCount == 3
// TO_TIMESTAMP_LTZ(string, format, timezone)
return Optional.of(DataTypes.TIMESTAMP_LTZ(DEFAULT_PRECISION));
}
return Optional.of(DataTypes.TIMESTAMP_LTZ(DEFAULT_PRECISION));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -406,18 +406,6 @@ public static TimestampData toTimestampData(DecimalData v, int precision) {
}
}

public static TimestampData toTimestampData(long epoch) {
return toTimestampData(epoch, 3);
}

public static TimestampData toTimestampData(double epoch) {
return toTimestampData(epoch, 3);
}

public static TimestampData toTimestampData(DecimalData epoch) {
return toTimestampData(epoch, 3);
}

private static TimestampData timestampDataFromEpochMills(long epochMills) {
if (MIN_EPOCH_MILLS <= epochMills && epochMills <= MAX_EPOCH_MILLS) {
return TimestampData.fromEpochMillis(epochMills);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ private Stream<TestSetSpec> toTimestampLtzTestCases() {
LocalDateTime.of(1970, 1, 1, 0, 1, 40)
.atZone(ZoneOffset.UTC)
.toInstant(),
TIMESTAMP_LTZ(0).nullable())
TIMESTAMP_LTZ(3).nullable())
.testResult(
toTimestampLtz($("f1"), literal(3)),
"TO_TIMESTAMP_LTZ(f1, 3)",
Expand All @@ -774,7 +774,7 @@ private Stream<TestSetSpec> toTimestampLtzTestCases() {
LocalDateTime.of(1969, 12, 31, 23, 58, 20)
.atZone(ZoneOffset.UTC)
.toInstant(),
TIMESTAMP_LTZ(0).nullable())
TIMESTAMP_LTZ(3).nullable())
.testResult(
toTimestampLtz($("f3"), literal(3)),
"TO_TIMESTAMP_LTZ(f3, 3)",
Expand All @@ -784,7 +784,7 @@ private Stream<TestSetSpec> toTimestampLtzTestCases() {
toTimestampLtz($("f4"), literal(0)),
"TO_TIMESTAMP_LTZ(-" + Double.MAX_VALUE + ", 0)",
null, // expecting NULL result
TIMESTAMP_LTZ(0).nullable())
TIMESTAMP_LTZ(3).nullable())
.testResult(
toTimestampLtz("2023-01-01 00:00:00"),
"TO_TIMESTAMP_LTZ('2023-01-01 00:00:00')",
Expand Down Expand Up @@ -862,6 +862,14 @@ private Stream<TestSetSpec> toTimestampLtzTestCases() {
LocalDateTime.of(2023, 1, 1, 0, 0, 0)
.atZone(ZoneId.of("America/Los_Angeles"))
.toInstant(),
TIMESTAMP_LTZ(3).nullable())
.testResult(
toTimestampLtz(
"un-parsable timestamp",
literal("yyyy-MM-dd HH:mm:ss"),
literal("UTC")),
"TO_TIMESTAMP_LTZ('invalid timestamp', 'yyyy-MM-dd HH:mm:ss', 'UTC')",
null,
TIMESTAMP_LTZ(3).nullable()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import org.apache.flink.table.api._
import org.apache.flink.table.expressions.TimeIntervalUnit
import org.apache.flink.table.planner.codegen.CodeGenException
import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
import org.apache.flink.table.planner.utils.{DateTimeTestUtil, TableConfigUtils}
import org.apache.flink.table.planner.utils.DateTimeTestUtil._
import org.apache.flink.table.planner.utils.{DateTimeTestUtil, TableConfigUtils}
import org.apache.flink.table.types.DataType
import org.apache.flink.types.Row

import org.junit.jupiter.api.Test

import java.lang.{Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong}
Expand Down Expand Up @@ -1140,41 +1139,41 @@ class TemporalTypesTest extends ExpressionTestBase {
tableConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))

// INT -> TIMESTAMP_LTZ
testAllApis(toTimestampLtz(100, 0), "TO_TIMESTAMP_LTZ(100, 0)", "1970-01-01 08:01:40")
testAllApis(toTimestampLtz(100, 0), "TO_TIMESTAMP_LTZ(100, 0)", "1970-01-01 08:01:40.000")

// TINYINT -> TIMESTAMP_LTZ
testAllApis(
toTimestampLtz(100.cast(DataTypes.TINYINT()), 0),
"TO_TIMESTAMP_LTZ(CAST(100 AS TINYINT), 0)",
"1970-01-01 08:01:40")
"1970-01-01 08:01:40.000")

// BIGINT -> TIMESTAMP_LTZ
testAllApis(
toTimestampLtz(100.cast(DataTypes.BIGINT()), 0),
"TO_TIMESTAMP_LTZ(CAST(100 AS BIGINT), 0)",
"1970-01-01 08:01:40")
"1970-01-01 08:01:40.000")

// FLOAT -> TIMESTAMP_LTZ
testAllApis(
toTimestampLtz(100.01.cast(DataTypes.FLOAT()), 0),
"TO_TIMESTAMP_LTZ(CAST(100.01 AS FLOAT), 0)",
"1970-01-01 08:01:40")
"1970-01-01 08:01:40.000")

// DOUBLE -> TIMESTAMP_LTZ
testAllApis(
toTimestampLtz(100.123.cast(DataTypes.DOUBLE()), 0),
"TO_TIMESTAMP_LTZ(CAST(100.123 AS DOUBLE), 0)",
"1970-01-01 08:01:40")
"1970-01-01 08:01:40.123")

// DECIMAL -> TIMESTAMP_LTZ
testAllApis(
toTimestampLtz(100.cast(DataTypes.DECIMAL(38, 18)), 0),
"TO_TIMESTAMP_LTZ(100, 0)",
"1970-01-01 08:01:40")
"1970-01-01 08:01:40.000")
testAllApis(
toTimestampLtz(-100.cast(DataTypes.DECIMAL(38, 18)), 0),
"TO_TIMESTAMP_LTZ(-100, 0)",
"1970-01-01 07:58:20")
"1970-01-01 07:58:20.000")

// keep scale
testAllApis(toTimestampLtz(1234, 3), "TO_TIMESTAMP_LTZ(1234, 3)", "1970-01-01 08:00:01.234")
Expand All @@ -1185,13 +1184,13 @@ class TemporalTypesTest extends ExpressionTestBase {
@Test
def testToTimestampLtzUTC(): Unit = {
tableConfig.setLocalTimeZone(ZoneId.of("UTC"))
testAllApis(toTimestampLtz(100, 0), "TO_TIMESTAMP_LTZ(100, 0)", "1970-01-01 00:01:40")
testAllApis(toTimestampLtz(100, 0), "TO_TIMESTAMP_LTZ(100, 0)", "1970-01-01 00:01:40.000")

testAllApis(toTimestampLtz(100, 0), "TO_TIMESTAMP_LTZ(100, 0)", "1970-01-01 00:01:40")
testAllApis(toTimestampLtz(100, 0), "TO_TIMESTAMP_LTZ(100, 0)", "1970-01-01 00:01:40.000")

testAllApis(toTimestampLtz(1234, 3), "TO_TIMESTAMP_LTZ(1234, 3)", "1970-01-01 00:00:01.234")

testAllApis(toTimestampLtz(-100, 0), "TO_TIMESTAMP_LTZ(-100, 0)", "1969-12-31 23:58:20")
testAllApis(toTimestampLtz(-100, 0), "TO_TIMESTAMP_LTZ(-100, 0)", "1969-12-31 23:58:20.000")
}

@Test
Expand All @@ -1202,21 +1201,21 @@ class TemporalTypesTest extends ExpressionTestBase {
testAllApis(
toTimestampLtz(JInt.MIN_VALUE.cast(DataTypes.INT()), 0),
s"TO_TIMESTAMP_LTZ(CAST(${JInt.MIN_VALUE} AS INTEGER), 0)",
"1901-12-13 20:45:52")
"1901-12-13 20:45:52.000")
testAllApis(
toTimestampLtz(JInt.MAX_VALUE.cast(DataTypes.INT()), 0),
s"TO_TIMESTAMP_LTZ(CAST(${JInt.MAX_VALUE} AS INTEGER), 0)",
"2038-01-19 03:14:07")
"2038-01-19 03:14:07.000")

// TINYINT
testAllApis(
toTimestampLtz(-128.cast(DataTypes.TINYINT()), 0),
s"TO_TIMESTAMP_LTZ(CAST(-128 AS TINYINT), 0)",
"1969-12-31 23:57:52")
"1969-12-31 23:57:52.000")
testAllApis(
toTimestampLtz(127.cast(DataTypes.TINYINT()), 0),
s"TO_TIMESTAMP_LTZ(CAST(127 AS TINYINT), 0)",
"1970-01-01 00:02:07")
"1970-01-01 00:02:07.000")

// BIGINT
testAllApis(
Expand Down Expand Up @@ -1285,7 +1284,8 @@ class TemporalTypesTest extends ExpressionTestBase {
// test invalid number of arguments
testExpectedSqlException(
"TO_TIMESTAMP_LTZ(123)",
"Invalid number of arguments to function 'TO_TIMESTAMP_LTZ'. Was expecting 2 arguments")
"Invalid number of arguments to function 'TO_TIMESTAMP_LTZ'. Was expecting 2 arguments"
)

// invalid precision
testExpectedAllApisException(
Expand Down Expand Up @@ -1313,10 +1313,11 @@ class TemporalTypesTest extends ExpressionTestBase {
" 'TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>)'",
classOf[ValidationException]
)

testExpectedTableApiException(
toTimestampLtz("test_string_type", 0),
"Unsupported argument type. " +
"Expected type of family 'NUMERIC' but actual type was 'CHAR(16) NOT NULL'"
"Invalid function call:\n" +
"toTimestampLtz(CHAR(16) NOT NULL, INT NOT NULL"
)

// invalid type for the second input
Expand All @@ -1329,8 +1330,8 @@ class TemporalTypesTest extends ExpressionTestBase {

testExpectedTableApiException(
toTimestampLtz(123, "test_string_type"),
"Unsupported argument type. " +
"Expected type of family 'INTEGER_NUMERIC' but actual type was 'CHAR(16) NOT NULL'"
"Invalid function call:\n" +
"toTimestampLtz(INT NOT NULL, CHAR(16) NOT NULL"
)
}

Expand Down
Loading

0 comments on commit afbf75f

Please sign in to comment.