@@ -194,7 +194,6 @@ def _test_transform_with_state_basic(
194194 q .awaitTermination (10 )
195195 self .assertTrue (q .exception () is None )
196196
197- @unittest .skip ("Temporarily disabled for testing" )
198197 def test_transform_with_state_basic (self ):
199198 def check_results (batch_df , batch_id ):
200199 batch_df .collect ()
@@ -211,7 +210,6 @@ def check_results(batch_df, batch_id):
211210
212211 self ._test_transform_with_state_basic (SimpleStatefulProcessorFactory (), check_results )
213212
214- @unittest .skip ("Temporarily disabled for testing" )
215213 def test_transform_with_state_non_exist_value_state (self ):
216214 def check_results (batch_df , _ ):
217215 batch_df .collect ()
@@ -224,7 +222,6 @@ def check_results(batch_df, _):
224222 InvalidSimpleStatefulProcessorFactory (), check_results , True
225223 )
226224
227- @unittest .skip ("Temporarily disabled for testing" )
228225 def test_transform_with_state_query_restarts (self ):
229226 root_path = tempfile .mkdtemp ()
230227 input_path = root_path + "/input"
@@ -299,7 +296,6 @@ def test_transform_with_state_query_restarts(self):
299296 Row (id = "1" , countAsString = "2" ),
300297 }
301298
302- @unittest .skip ("Temporarily disabled for testing" )
303299 def test_transform_with_state_list_state (self ):
304300 def check_results (batch_df , _ ):
305301 batch_df .collect ()
@@ -312,7 +308,6 @@ def check_results(batch_df, _):
312308 ListStateProcessorFactory (), check_results , True , "processingTime"
313309 )
314310
315- @unittest .skip ("Temporarily disabled for testing" )
316311 def test_transform_with_state_list_state_large_list (self ):
317312 def check_results (batch_df , batch_id ):
318313 batch_df .collect ()
@@ -388,7 +383,6 @@ def check_results(batch_df, batch_id):
388383 self .assertTrue (q .exception () is None )
389384
390385 # test list state with ttl has the same behavior as list state when state doesn't expire.
391- @unittest .skip ("Temporarily disabled for testing" )
392386 def test_transform_with_state_list_state_large_ttl (self ):
393387 def check_results (batch_df , batch_id ):
394388 batch_df .collect ()
@@ -401,7 +395,6 @@ def check_results(batch_df, batch_id):
401395 ListStateLargeTTLProcessorFactory (), check_results , True , "processingTime"
402396 )
403397
404- @unittest .skip ("Temporarily disabled for testing" )
405398 def test_transform_with_state_map_state (self ):
406399 def check_results (batch_df , _ ):
407400 batch_df .collect ()
@@ -413,7 +406,6 @@ def check_results(batch_df, _):
413406 self ._test_transform_with_state_basic (MapStateProcessorFactory (), check_results , True )
414407
415408 # test map state with ttl has the same behavior as map state when state doesn't expire.
416- @unittest .skip ("Temporarily disabled for testing" )
417409 def test_transform_with_state_map_state_large_ttl (self ):
418410 def check_results (batch_df , batch_id ):
419411 batch_df .collect ()
@@ -428,7 +420,6 @@ def check_results(batch_df, batch_id):
428420
429421 # test value state with ttl has the same behavior as value state when
430422 # state doesn't expire.
431- @unittest .skip ("Temporarily disabled for testing" )
432423 def test_value_state_ttl_basic (self ):
433424 def check_results (batch_df , batch_id ):
434425 batch_df .collect ()
@@ -449,7 +440,6 @@ def check_results(batch_df, batch_id):
449440
450441 # TODO SPARK-50908 holistic fix for TTL suite
451442 @unittest .skip ("test is flaky and it is only a timing issue, skipping until we can resolve" )
452- @unittest .skip ("Temporarily disabled for testing" )
453443 def test_value_state_ttl_expiration (self ):
454444 def check_results (batch_df , batch_id ):
455445 batch_df .collect ()
@@ -599,7 +589,6 @@ def _test_transform_with_state_proc_timer(self, stateful_processor_factory, chec
599589 q .awaitTermination (10 )
600590 self .assertTrue (q .exception () is None )
601591
602- @unittest .skip ("Temporarily disabled for testing" )
603592 def test_transform_with_state_proc_timer (self ):
604593 def check_results (batch_df , batch_id ):
605594 batch_df .collect ()
@@ -717,7 +706,6 @@ def prepare_batch3(input_path):
717706 q .awaitTermination (10 )
718707 self .assertTrue (q .exception () is None )
719708
720- @unittest .skip ("Temporarily disabled for testing" )
721709 def test_transform_with_state_event_time (self ):
722710 def check_results (batch_df , batch_id ):
723711 batch_df .collect ()
@@ -750,7 +738,6 @@ def check_results(batch_df, batch_id):
750738 EventTimeStatefulProcessorFactory (), check_results
751739 )
752740
753- @unittest .skip ("Temporarily disabled for testing" )
754741 def test_transform_with_state_with_wmark_and_non_event_time (self ):
755742 def check_results (batch_df , batch_id ):
756743 batch_df .collect ()
@@ -849,7 +836,6 @@ def _test_transform_with_state_init_state(
849836 q .awaitTermination (10 )
850837 self .assertTrue (q .exception () is None )
851838
852- @unittest .skip ("Temporarily disabled for testing" )
853839 def test_transform_with_state_init_state (self ):
854840 def check_results (batch_df , batch_id ):
855841 batch_df .collect ()
@@ -874,7 +860,6 @@ def check_results(batch_df, batch_id):
874860 SimpleStatefulProcessorWithInitialStateFactory (), check_results
875861 )
876862
877- @unittest .skip ("Temporarily disabled for testing" )
878863 def test_transform_with_state_init_state_with_extra_transformation (self ):
879864 def check_results (batch_df , batch_id ):
880865 batch_df .collect ()
@@ -954,7 +939,6 @@ def _test_transform_with_state_non_contiguous_grouping_cols(
954939 q .awaitTermination (10 )
955940 self .assertTrue (q .exception () is None )
956941
957- @unittest .skip ("Temporarily disabled for testing" )
958942 def test_transform_with_state_non_contiguous_grouping_cols (self ):
959943 def check_results (batch_df , batch_id ):
960944 batch_df .collect ()
@@ -967,7 +951,6 @@ def check_results(batch_df, batch_id):
967951 SimpleStatefulProcessorWithInitialStateFactory (), check_results
968952 )
969953
970- @unittest .skip ("Temporarily disabled for testing" )
971954 def test_transform_with_state_non_contiguous_grouping_cols_with_init_state (self ):
972955 def check_results (batch_df , batch_id ):
973956 batch_df .collect ()
@@ -1051,7 +1034,6 @@ def _test_transform_with_state_chaining_ops(
10511034 q .processAllAvailable ()
10521035 q .awaitTermination (10 )
10531036
1054- @unittest .skip ("Temporarily disabled for testing" )
10551037 def test_transform_with_state_chaining_ops (self ):
10561038 def check_results (batch_df , batch_id ):
10571039 batch_df .collect ()
@@ -1088,7 +1070,6 @@ def check_results(batch_df, batch_id):
10881070 ["outputTimestamp" , "id" ],
10891071 )
10901072
1091- @unittest .skip ("Temporarily disabled for testing" )
10921073 def test_transform_with_state_init_state_with_timers (self ):
10931074 def check_results (batch_df , batch_id ):
10941075 batch_df .collect ()
@@ -1119,7 +1100,6 @@ def check_results(batch_df, batch_id):
11191100 StatefulProcessorWithInitialStateTimersFactory (), check_results , "processingTime"
11201101 )
11211102
1122- @unittest .skip ("Temporarily disabled for testing" )
11231103 def test_transform_with_state_batch_query (self ):
11241104 data = [("0" , 123 ), ("0" , 46 ), ("1" , 146 ), ("1" , 346 )]
11251105 df = self .spark .createDataFrame (data , "id string, temperature int" )
@@ -1151,7 +1131,6 @@ def test_transform_with_state_batch_query(self):
11511131 Row (id = "1" , countAsString = "2" ),
11521132 }
11531133
1154- @unittest .skip ("Temporarily disabled for testing" )
11551134 def test_transform_with_state_batch_query_initial_state (self ):
11561135 data = [("0" , 123 ), ("0" , 46 ), ("1" , 146 ), ("1" , 346 )]
11571136 df = self .spark .createDataFrame (data , "id string, temperature int" )
@@ -1196,11 +1175,9 @@ def test_transform_with_state_batch_query_initial_state(self):
11961175 @unittest .skipIf (
11971176 "COVERAGE_PROCESS_START" in os .environ , "Flaky with coverage enabled, skipping for now."
11981177 )
1199- @unittest .skip ("Temporarily disabled for testing" )
12001178 def test_transform_with_map_state_metadata (self ):
12011179 self ._test_transform_with_map_state_metadata (None )
12021180
1203- @unittest .skip ("Temporarily disabled for testing" )
12041181 def test_transform_with_map_state_metadata_with_init_state (self ):
12051182 # run the same test suite again but with no-op initial state
12061183 # TWS with initial state is using a different python runner
@@ -1333,7 +1310,6 @@ def check_results(batch_df, batch_id):
13331310 )
13341311
13351312 # This test covers multiple list state variables and flatten option
1336- @unittest .skip ("Temporarily disabled for testing" )
13371313 def test_transform_with_list_state_metadata (self ):
13381314 checkpoint_path = tempfile .mktemp ()
13391315
@@ -1414,7 +1390,6 @@ def check_results(batch_df, batch_id):
14141390
14151391 # This test covers value state variable and read change feed,
14161392 # snapshotStartBatchId related options
1417- @unittest .skip ("Temporarily disabled for testing" )
14181393 def test_transform_with_value_state_metadata (self ):
14191394 checkpoint_path = tempfile .mktemp ()
14201395
@@ -1505,7 +1480,6 @@ def check_results(batch_df, batch_id):
15051480 checkpoint_path = checkpoint_path ,
15061481 )
15071482
1508- @unittest .skip ("Temporarily disabled for testing" )
15091483 def test_transform_with_state_restart_with_multiple_rows_init_state (self ):
15101484 def check_results (batch_df , _ ):
15111485 batch_df .collect ()
@@ -1565,7 +1539,6 @@ def dataframe_to_value_list(output_df):
15651539 initial_state = init_df ,
15661540 )
15671541
1568- @unittest .skip ("Temporarily disabled for testing" )
15691542 def test_transform_with_state_in_pandas_composite_type (self ):
15701543 def check_results (batch_df , batch_id ):
15711544 if batch_id == 0 :
@@ -1624,7 +1597,6 @@ def check_results(batch_df, batch_id):
16241597 )
16251598
16261599 # run the same test suites again but with single shuffle partition
1627- @unittest .skip ("Temporarily disabled for testing" )
16281600 def test_transform_with_state_with_timers_single_partition (self ):
16291601 with self .sql_conf ({"spark.sql.shuffle.partitions" : "1" }):
16301602 self .test_transform_with_state_init_state_with_timers ()
@@ -1673,7 +1645,6 @@ def _run_evolution_test(self, processor_factory, checkpoint_dir, check_results,
16731645 q .processAllAvailable ()
16741646 q .awaitTermination (10 )
16751647
1676- @unittest .skip ("Temporarily disabled for testing" )
16771648 def test_schema_evolution_scenarios (self ):
16781649 """Test various schema evolution scenarios"""
16791650 with self .sql_conf ({"spark.sql.streaming.stateStore.encodingFormat" : "avro" }):
@@ -1742,7 +1713,6 @@ def check_upcast(batch_df, batch_id):
17421713
17431714 # This test case verifies that an exception is thrown when downcasting, which violates
17441715 # Avro's schema evolution rules
1745- @unittest .skip ("Temporarily disabled for testing" )
17461716 def test_schema_evolution_fails (self ):
17471717 with self .sql_conf ({"spark.sql.streaming.stateStore.encodingFormat" : "avro" }):
17481718 with tempfile .TemporaryDirectory () as checkpoint_dir :
@@ -1795,7 +1765,6 @@ def check_basic_state(batch_df, batch_id):
17951765 and "Schema evolution is not possible" in error_msg
17961766 )
17971767
1798- @unittest .skip ("Temporarily disabled for testing" )
17991768 def test_not_nullable_fails (self ):
18001769 with self .sql_conf ({"spark.sql.streaming.stateStore.encodingFormat" : "avro" }):
18011770 with tempfile .TemporaryDirectory () as checkpoint_dir :
@@ -1828,7 +1797,6 @@ def check_basic_state(batch_df, batch_id):
18281797 and "column family state must be nullable" in error_msg
18291798 )
18301799
1831- @unittest .skip ("Temporarily disabled for testing" )
18321800 def test_transform_with_state_int_to_decimal_coercion (self ):
18331801 if not self .use_pandas ():
18341802 return
0 commit comments