@@ -364,6 +364,13 @@ def timestamp_func(timestamp_param):
364364 'timestamp_param is wrong value %s !' % timestamp_param
365365 return timestamp_param
366366
367+ @udf (result_type = DataTypes .TIMESTAMP_LTZ (3 ))
368+ def timestamp_ltz_func (timestamp_ltz_param ):
369+ from datetime import datetime , timezone
370+ assert timestamp_ltz_param == datetime (2018 , 3 , 11 , 3 , 0 , 0 , 123000 , timezone .utc ), \
371+ 'timestamp_ltz_param is wrong value %s !' % timestamp_ltz_param
372+ return timestamp_ltz_param
373+
367374 @udf (result_type = DataTypes .ARRAY (DataTypes .BIGINT ()))
368375 def array_func (array_param ):
369376 assert array_param == [[1 , 2 , 3 ]] or array_param == ((1 , 2 , 3 ),), \
@@ -427,7 +434,8 @@ def varchar_func(varchar_param):
427434 q DECIMAL(38, 18),
428435 r BINARY(5),
429436 s CHAR(7),
430- t VARCHAR(10)
437+ t VARCHAR(10),
438+ u TIMESTAMP_LTZ(3)
431439 ) WITH ('connector'='test-sink')
432440 """
433441 self .t_env .execute_sql (sink_table_ddl )
@@ -441,7 +449,8 @@ def varchar_func(varchar_param):
441449 datetime .datetime (2018 , 3 , 11 , 3 , 0 , 0 , 123000 ), [[1 , 2 , 3 ]],
442450 {1 : 'flink' , 2 : 'pyflink' }, decimal .Decimal ('1000000000000000000.05' ),
443451 decimal .Decimal ('1000000000000000000.05999999999999999899999999999' ),
444- bytearray (b'flink' ), 'pyflink' , 'pyflink' )],
452+ bytearray (b'flink' ), 'pyflink' , 'pyflink' ,
453+ datetime .datetime (2018 , 3 , 11 , 3 , 0 , 0 , 123000 , datetime .timezone .utc ))],
445454 DataTypes .ROW (
446455 [DataTypes .FIELD ("a" , DataTypes .BIGINT ()),
447456 DataTypes .FIELD ("b" , DataTypes .BIGINT ()),
@@ -462,7 +471,8 @@ def varchar_func(varchar_param):
462471 DataTypes .FIELD ("q" , DataTypes .DECIMAL (38 , 18 )),
463472 DataTypes .FIELD ("r" , DataTypes .BINARY (5 )),
464473 DataTypes .FIELD ("s" , DataTypes .CHAR (7 )),
465- DataTypes .FIELD ("t" , DataTypes .VARCHAR (10 ))]))
474+ DataTypes .FIELD ("t" , DataTypes .VARCHAR (10 )),
475+ DataTypes .FIELD ("u" , DataTypes .TIMESTAMP_LTZ (3 ))]))
466476
467477 t .select (
468478 bigint_func (t .a ),
@@ -484,7 +494,8 @@ def varchar_func(varchar_param):
484494 decimal_cut_func (t .q ),
485495 binary_func (t .r ),
486496 char_func (t .s ),
487- varchar_func (t .t )) \
497+ varchar_func (t .t ),
498+ timestamp_ltz_func (t .u )) \
488499 .execute_insert (sink_table ).wait ()
489500 actual = source_sink_utils .results ()
490501 # Currently the sink result precision of DataTypes.TIME(precision) only supports 0.
@@ -494,7 +505,7 @@ def varchar_func(varchar_param):
494505 "2018-03-11T03:00:00.123, [1, 2, 3], "
495506 "{1=flink, 2=pyflink}, 1000000000000000000.050000000000000000, "
496507 "1000000000000000000.059999999999999999, [102, 108, 105, 110, 107], "
497- "pyflink, pyflink]" ])
508+ "pyflink, pyflink, 2018-03-11T03:00:00.123Z ]" ])
498509
499510 def test_all_data_types (self ):
500511 def boolean_func (bool_param ):
@@ -995,7 +1006,7 @@ def local_zoned_timestamp_func(local_zoned_timestamp_param):
9951006 .execute_insert (sink_table ) \
9961007 .wait ()
9971008 actual = source_sink_utils .results ()
998- self .assert_equals (actual , ["+I[1970-01-01T00 :00:00 .123Z]" ])
1009+ self .assert_equals (actual , ["+I[1969-12-31T16 :00:01 .123Z]" ])
9991010
10001011 def test_execute_from_json_plan (self ):
10011012 # create source file path
@@ -1161,6 +1172,8 @@ def echo(i: str):
11611172if __name__ == '__main__' :
11621173 import unittest
11631174
1175+ os .environ ['_python_worker_execution_mode' ] = "loopback"
1176+
11641177 try :
11651178 import xmlrunner
11661179
0 commit comments