@@ -200,7 +200,6 @@ def _test_transform_with_state_basic(
200200 q .awaitTermination (10 )
201201 self .assertTrue (q .exception () is None )
202202
203- @unittest .skip ("Temporarily disabled for testing" )
204203 def test_transform_with_state_basic (self ):
205204 def check_results (batch_df , batch_id ):
206205 batch_df .collect ()
@@ -217,7 +216,6 @@ def check_results(batch_df, batch_id):
217216
218217 self ._test_transform_with_state_basic (SimpleStatefulProcessorFactory (), check_results )
219218
220- @unittest .skip ("Temporarily disabled for testing" )
221219 def test_transform_with_state_non_exist_value_state (self ):
222220 def check_results (batch_df , _ ):
223221 batch_df .collect ()
@@ -230,7 +228,6 @@ def check_results(batch_df, _):
230228 InvalidSimpleStatefulProcessorFactory (), check_results , True
231229 )
232230
233- @unittest .skip ("Temporarily disabled for testing" )
234231 def test_transform_with_state_query_restarts (self ):
235232 root_path = tempfile .mkdtemp ()
236233 input_path = root_path + "/input"
@@ -305,7 +302,6 @@ def test_transform_with_state_query_restarts(self):
305302 Row (id = "1" , countAsString = "2" ),
306303 }
307304
308- @unittest .skip ("Temporarily disabled for testing" )
309305 def test_transform_with_state_list_state (self ):
310306 def check_results (batch_df , _ ):
311307 batch_df .collect ()
@@ -318,7 +314,6 @@ def check_results(batch_df, _):
318314 ListStateProcessorFactory (), check_results , True , "processingTime"
319315 )
320316
321- @unittest .skip ("Temporarily disabled for testing" )
322317 def test_transform_with_state_list_state_large_list (self ):
323318 def check_results (batch_df , batch_id ):
324319 batch_df .collect ()
@@ -394,7 +389,6 @@ def check_results(batch_df, batch_id):
394389 self .assertTrue (q .exception () is None )
395390
396391 # test list state with ttl has the same behavior as list state when state doesn't expire.
397- @unittest .skip ("Temporarily disabled for testing" )
398392 def test_transform_with_state_list_state_large_ttl (self ):
399393 def check_results (batch_df , batch_id ):
400394 batch_df .collect ()
@@ -407,7 +401,6 @@ def check_results(batch_df, batch_id):
407401 ListStateLargeTTLProcessorFactory (), check_results , True , "processingTime"
408402 )
409403
410- @unittest .skip ("Temporarily disabled for testing" )
411404 def test_transform_with_state_map_state (self ):
412405 def check_results (batch_df , _ ):
413406 batch_df .collect ()
@@ -419,7 +412,6 @@ def check_results(batch_df, _):
419412 self ._test_transform_with_state_basic (MapStateProcessorFactory (), check_results , True )
420413
421414 # test map state with ttl has the same behavior as map state when state doesn't expire.
422- @unittest .skip ("Temporarily disabled for testing" )
423415 def test_transform_with_state_map_state_large_ttl (self ):
424416 def check_results (batch_df , batch_id ):
425417 batch_df .collect ()
@@ -434,7 +426,6 @@ def check_results(batch_df, batch_id):
434426
435427 # test value state with ttl has the same behavior as value state when
436428 # state doesn't expire.
437- @unittest .skip ("Temporarily disabled for testing" )
438429 def test_value_state_ttl_basic (self ):
439430 def check_results (batch_df , batch_id ):
440431 batch_df .collect ()
@@ -455,7 +446,6 @@ def check_results(batch_df, batch_id):
455446
456447 # TODO SPARK-50908 holistic fix for TTL suite
457448 @unittest .skip ("test is flaky and it is only a timing issue, skipping until we can resolve" )
458- @unittest .skip ("Temporarily disabled for testing" )
459449 def test_value_state_ttl_expiration (self ):
460450 def check_results (batch_df , batch_id ):
461451 batch_df .collect ()
@@ -605,7 +595,6 @@ def _test_transform_with_state_proc_timer(self, stateful_processor_factory, chec
605595 q .awaitTermination (10 )
606596 self .assertTrue (q .exception () is None )
607597
608- @unittest .skip ("Temporarily disabled for testing" )
609598 def test_transform_with_state_proc_timer (self ):
610599 def check_results (batch_df , batch_id ):
611600 batch_df .collect ()
@@ -723,7 +712,6 @@ def prepare_batch3(input_path):
723712 q .awaitTermination (10 )
724713 self .assertTrue (q .exception () is None )
725714
726- @unittest .skip ("Temporarily disabled for testing" )
727715 def test_transform_with_state_event_time (self ):
728716 def check_results (batch_df , batch_id ):
729717 batch_df .collect ()
@@ -756,7 +744,6 @@ def check_results(batch_df, batch_id):
756744 EventTimeStatefulProcessorFactory (), check_results
757745 )
758746
759- @unittest .skip ("Temporarily disabled for testing" )
760747 def test_transform_with_state_with_wmark_and_non_event_time (self ):
761748 def check_results (batch_df , batch_id ):
762749 batch_df .collect ()
@@ -855,7 +842,6 @@ def _test_transform_with_state_init_state(
855842 q .awaitTermination (10 )
856843 self .assertTrue (q .exception () is None )
857844
858- @unittest .skip ("Temporarily disabled for testing" )
859845 def test_transform_with_state_init_state (self ):
860846 def check_results (batch_df , batch_id ):
861847 batch_df .collect ()
@@ -880,7 +866,6 @@ def check_results(batch_df, batch_id):
880866 SimpleStatefulProcessorWithInitialStateFactory (), check_results
881867 )
882868
883- @unittest .skip ("Temporarily disabled for testing" )
884869 def test_transform_with_state_init_state_with_extra_transformation (self ):
885870 def check_results (batch_df , batch_id ):
886871 batch_df .collect ()
@@ -960,7 +945,6 @@ def _test_transform_with_state_non_contiguous_grouping_cols(
960945 q .awaitTermination (10 )
961946 self .assertTrue (q .exception () is None )
962947
963- @unittest .skip ("Temporarily disabled for testing" )
964948 def test_transform_with_state_non_contiguous_grouping_cols (self ):
965949 def check_results (batch_df , batch_id ):
966950 batch_df .collect ()
@@ -973,7 +957,6 @@ def check_results(batch_df, batch_id):
973957 SimpleStatefulProcessorWithInitialStateFactory (), check_results
974958 )
975959
976- @unittest .skip ("Temporarily disabled for testing" )
977960 def test_transform_with_state_non_contiguous_grouping_cols_with_init_state (self ):
978961 def check_results (batch_df , batch_id ):
979962 batch_df .collect ()
@@ -1057,7 +1040,6 @@ def _test_transform_with_state_chaining_ops(
10571040 q .processAllAvailable ()
10581041 q .awaitTermination (10 )
10591042
1060- @unittest .skip ("Temporarily disabled for testing" )
10611043 def test_transform_with_state_chaining_ops (self ):
10621044 def check_results (batch_df , batch_id ):
10631045 batch_df .collect ()
@@ -1094,7 +1076,6 @@ def check_results(batch_df, batch_id):
10941076 ["outputTimestamp" , "id" ],
10951077 )
10961078
1097- @unittest .skip ("Temporarily disabled for testing" )
10981079 def test_transform_with_state_init_state_with_timers (self ):
10991080 def check_results (batch_df , batch_id ):
11001081 batch_df .collect ()
@@ -1125,7 +1106,6 @@ def check_results(batch_df, batch_id):
11251106 StatefulProcessorWithInitialStateTimersFactory (), check_results , "processingTime"
11261107 )
11271108
1128- @unittest .skip ("Temporarily disabled for testing" )
11291109 def test_transform_with_state_batch_query (self ):
11301110 data = [("0" , 123 ), ("0" , 46 ), ("1" , 146 ), ("1" , 346 )]
11311111 df = self .spark .createDataFrame (data , "id string, temperature int" )
@@ -1157,7 +1137,6 @@ def test_transform_with_state_batch_query(self):
11571137 Row (id = "1" , countAsString = "2" ),
11581138 }
11591139
1160- @unittest .skip ("Temporarily disabled for testing" )
11611140 def test_transform_with_state_batch_query_initial_state (self ):
11621141 data = [("0" , 123 ), ("0" , 46 ), ("1" , 146 ), ("1" , 346 )]
11631142 df = self .spark .createDataFrame (data , "id string, temperature int" )
@@ -1202,11 +1181,9 @@ def test_transform_with_state_batch_query_initial_state(self):
12021181 @unittest .skipIf (
12031182 "COVERAGE_PROCESS_START" in os .environ , "Flaky with coverage enabled, skipping for now."
12041183 )
1205- @unittest .skip ("Temporarily disabled for testing" )
12061184 def test_transform_with_map_state_metadata (self ):
12071185 self ._test_transform_with_map_state_metadata (None )
12081186
1209- @unittest .skip ("Temporarily disabled for testing" )
12101187 def test_transform_with_map_state_metadata_with_init_state (self ):
12111188 # run the same test suite again but with no-op initial state
12121189 # TWS with initial state is using a different python runner
@@ -1339,7 +1316,6 @@ def check_results(batch_df, batch_id):
13391316 )
13401317
13411318 # This test covers multiple list state variables and flatten option
1342- @unittest .skip ("Temporarily disabled for testing" )
13431319 def test_transform_with_list_state_metadata (self ):
13441320 checkpoint_path = tempfile .mktemp ()
13451321
@@ -1420,7 +1396,6 @@ def check_results(batch_df, batch_id):
14201396
14211397 # This test covers value state variable and read change feed,
14221398 # snapshotStartBatchId related options
1423- @unittest .skip ("Temporarily disabled for testing" )
14241399 def test_transform_with_value_state_metadata (self ):
14251400 checkpoint_path = tempfile .mktemp ()
14261401
@@ -1511,7 +1486,6 @@ def check_results(batch_df, batch_id):
15111486 checkpoint_path = checkpoint_path ,
15121487 )
15131488
1514- @unittest .skip ("Temporarily disabled for testing" )
15151489 def test_transform_with_state_restart_with_multiple_rows_init_state (self ):
15161490 def check_results (batch_df , _ ):
15171491 batch_df .collect ()
@@ -1571,7 +1545,6 @@ def dataframe_to_value_list(output_df):
15711545 initial_state = init_df ,
15721546 )
15731547
1574- @unittest .skip ("Temporarily disabled for testing" )
15751548 def test_transform_with_state_in_pandas_composite_type (self ):
15761549 def check_results (batch_df , batch_id ):
15771550 if batch_id == 0 :
@@ -1931,7 +1904,6 @@ def check_results(batch_df, batch_id):
19311904 )
19321905
19331906 # run the same test suites again but with single shuffle partition
1934- @unittest .skip ("Temporarily disabled for testing" )
19351907 def test_transform_with_state_with_timers_single_partition (self ):
19361908 with self .sql_conf ({"spark.sql.shuffle.partitions" : "1" }):
19371909 self .test_transform_with_state_init_state_with_timers ()
@@ -1980,7 +1952,6 @@ def _run_evolution_test(self, processor_factory, checkpoint_dir, check_results,
19801952 q .processAllAvailable ()
19811953 q .awaitTermination (10 )
19821954
1983- @unittest .skip ("Temporarily disabled for testing" )
19841955 def test_schema_evolution_scenarios (self ):
19851956 """Test various schema evolution scenarios"""
19861957 with self .sql_conf ({"spark.sql.streaming.stateStore.encodingFormat" : "avro" }):
@@ -2049,7 +2020,6 @@ def check_upcast(batch_df, batch_id):
20492020
20502021 # This test case verifies that an exception is thrown when downcasting, which violates
20512022 # Avro's schema evolution rules
2052- @unittest .skip ("Temporarily disabled for testing" )
20532023 def test_schema_evolution_fails (self ):
20542024 with self .sql_conf ({"spark.sql.streaming.stateStore.encodingFormat" : "avro" }):
20552025 with tempfile .TemporaryDirectory () as checkpoint_dir :
@@ -2102,7 +2072,6 @@ def check_basic_state(batch_df, batch_id):
21022072 and "Schema evolution is not possible" in error_msg
21032073 )
21042074
2105- @unittest .skip ("Temporarily disabled for testing" )
21062075 def test_not_nullable_fails (self ):
21072076 with self .sql_conf ({"spark.sql.streaming.stateStore.encodingFormat" : "avro" }):
21082077 with tempfile .TemporaryDirectory () as checkpoint_dir :
@@ -2135,7 +2104,6 @@ def check_basic_state(batch_df, batch_id):
21352104 and "column family state must be nullable" in error_msg
21362105 )
21372106
2138- @unittest .skip ("Temporarily disabled for testing" )
21392107 def test_transform_with_state_int_to_decimal_coercion (self ):
21402108 if not self .use_pandas ():
21412109 return
0 commit comments