@@ -200,6 +200,7 @@ def _test_transform_with_state_basic(
200200 q .awaitTermination (10 )
201201 self .assertTrue (q .exception () is None )
202202
203+ @unittest .skip ("Temporarily disabled for testing" )
203204 def test_transform_with_state_basic (self ):
204205 def check_results (batch_df , batch_id ):
205206 batch_df .collect ()
@@ -216,6 +217,7 @@ def check_results(batch_df, batch_id):
216217
217218 self ._test_transform_with_state_basic (SimpleStatefulProcessorFactory (), check_results )
218219
220+ @unittest .skip ("Temporarily disabled for testing" )
219221 def test_transform_with_state_non_exist_value_state (self ):
220222 def check_results (batch_df , _ ):
221223 batch_df .collect ()
@@ -228,6 +230,7 @@ def check_results(batch_df, _):
228230 InvalidSimpleStatefulProcessorFactory (), check_results , True
229231 )
230232
233+ @unittest .skip ("Temporarily disabled for testing" )
231234 def test_transform_with_state_query_restarts (self ):
232235 root_path = tempfile .mkdtemp ()
233236 input_path = root_path + "/input"
@@ -302,6 +305,7 @@ def test_transform_with_state_query_restarts(self):
302305 Row (id = "1" , countAsString = "2" ),
303306 }
304307
308+ @unittest .skip ("Temporarily disabled for testing" )
305309 def test_transform_with_state_list_state (self ):
306310 def check_results (batch_df , _ ):
307311 batch_df .collect ()
@@ -314,6 +318,7 @@ def check_results(batch_df, _):
314318 ListStateProcessorFactory (), check_results , True , "processingTime"
315319 )
316320
321+ @unittest .skip ("Temporarily disabled for testing" )
317322 def test_transform_with_state_list_state_large_list (self ):
318323 def check_results (batch_df , batch_id ):
319324 batch_df .collect ()
@@ -389,6 +394,7 @@ def check_results(batch_df, batch_id):
389394 self .assertTrue (q .exception () is None )
390395
391396 # test list state with ttl has the same behavior as list state when state doesn't expire.
397+ @unittest .skip ("Temporarily disabled for testing" )
392398 def test_transform_with_state_list_state_large_ttl (self ):
393399 def check_results (batch_df , batch_id ):
394400 batch_df .collect ()
@@ -401,6 +407,7 @@ def check_results(batch_df, batch_id):
401407 ListStateLargeTTLProcessorFactory (), check_results , True , "processingTime"
402408 )
403409
410+ @unittest .skip ("Temporarily disabled for testing" )
404411 def test_transform_with_state_map_state (self ):
405412 def check_results (batch_df , _ ):
406413 batch_df .collect ()
@@ -412,6 +419,7 @@ def check_results(batch_df, _):
412419 self ._test_transform_with_state_basic (MapStateProcessorFactory (), check_results , True )
413420
414421 # test map state with ttl has the same behavior as map state when state doesn't expire.
422+ @unittest .skip ("Temporarily disabled for testing" )
415423 def test_transform_with_state_map_state_large_ttl (self ):
416424 def check_results (batch_df , batch_id ):
417425 batch_df .collect ()
@@ -426,6 +434,7 @@ def check_results(batch_df, batch_id):
426434
427435 # test value state with ttl has the same behavior as value state when
428436 # state doesn't expire.
437+ @unittest .skip ("Temporarily disabled for testing" )
429438 def test_value_state_ttl_basic (self ):
430439 def check_results (batch_df , batch_id ):
431440 batch_df .collect ()
@@ -446,6 +455,7 @@ def check_results(batch_df, batch_id):
446455
447456 # TODO SPARK-50908 holistic fix for TTL suite
448457 @unittest .skip ("test is flaky and it is only a timing issue, skipping until we can resolve" )
458+ @unittest .skip ("Temporarily disabled for testing" )
449459 def test_value_state_ttl_expiration (self ):
450460 def check_results (batch_df , batch_id ):
451461 batch_df .collect ()
@@ -595,6 +605,7 @@ def _test_transform_with_state_proc_timer(self, stateful_processor_factory, chec
595605 q .awaitTermination (10 )
596606 self .assertTrue (q .exception () is None )
597607
608+ @unittest .skip ("Temporarily disabled for testing" )
598609 def test_transform_with_state_proc_timer (self ):
599610 def check_results (batch_df , batch_id ):
600611 batch_df .collect ()
@@ -712,6 +723,7 @@ def prepare_batch3(input_path):
712723 q .awaitTermination (10 )
713724 self .assertTrue (q .exception () is None )
714725
726+ @unittest .skip ("Temporarily disabled for testing" )
715727 def test_transform_with_state_event_time (self ):
716728 def check_results (batch_df , batch_id ):
717729 batch_df .collect ()
@@ -744,6 +756,7 @@ def check_results(batch_df, batch_id):
744756 EventTimeStatefulProcessorFactory (), check_results
745757 )
746758
759+ @unittest .skip ("Temporarily disabled for testing" )
747760 def test_transform_with_state_with_wmark_and_non_event_time (self ):
748761 def check_results (batch_df , batch_id ):
749762 batch_df .collect ()
@@ -842,6 +855,7 @@ def _test_transform_with_state_init_state(
842855 q .awaitTermination (10 )
843856 self .assertTrue (q .exception () is None )
844857
858+ @unittest .skip ("Temporarily disabled for testing" )
845859 def test_transform_with_state_init_state (self ):
846860 def check_results (batch_df , batch_id ):
847861 batch_df .collect ()
@@ -866,6 +880,7 @@ def check_results(batch_df, batch_id):
866880 SimpleStatefulProcessorWithInitialStateFactory (), check_results
867881 )
868882
883+ @unittest .skip ("Temporarily disabled for testing" )
869884 def test_transform_with_state_init_state_with_extra_transformation (self ):
870885 def check_results (batch_df , batch_id ):
871886 batch_df .collect ()
@@ -945,6 +960,7 @@ def _test_transform_with_state_non_contiguous_grouping_cols(
945960 q .awaitTermination (10 )
946961 self .assertTrue (q .exception () is None )
947962
963+ @unittest .skip ("Temporarily disabled for testing" )
948964 def test_transform_with_state_non_contiguous_grouping_cols (self ):
949965 def check_results (batch_df , batch_id ):
950966 batch_df .collect ()
@@ -957,6 +973,7 @@ def check_results(batch_df, batch_id):
957973 SimpleStatefulProcessorWithInitialStateFactory (), check_results
958974 )
959975
976+ @unittest .skip ("Temporarily disabled for testing" )
960977 def test_transform_with_state_non_contiguous_grouping_cols_with_init_state (self ):
961978 def check_results (batch_df , batch_id ):
962979 batch_df .collect ()
@@ -1040,6 +1057,7 @@ def _test_transform_with_state_chaining_ops(
10401057 q .processAllAvailable ()
10411058 q .awaitTermination (10 )
10421059
1060+ @unittest .skip ("Temporarily disabled for testing" )
10431061 def test_transform_with_state_chaining_ops (self ):
10441062 def check_results (batch_df , batch_id ):
10451063 batch_df .collect ()
@@ -1076,6 +1094,7 @@ def check_results(batch_df, batch_id):
10761094 ["outputTimestamp" , "id" ],
10771095 )
10781096
1097+ @unittest .skip ("Temporarily disabled for testing" )
10791098 def test_transform_with_state_init_state_with_timers (self ):
10801099 def check_results (batch_df , batch_id ):
10811100 batch_df .collect ()
@@ -1106,6 +1125,7 @@ def check_results(batch_df, batch_id):
11061125 StatefulProcessorWithInitialStateTimersFactory (), check_results , "processingTime"
11071126 )
11081127
1128+ @unittest .skip ("Temporarily disabled for testing" )
11091129 def test_transform_with_state_batch_query (self ):
11101130 data = [("0" , 123 ), ("0" , 46 ), ("1" , 146 ), ("1" , 346 )]
11111131 df = self .spark .createDataFrame (data , "id string, temperature int" )
@@ -1137,6 +1157,7 @@ def test_transform_with_state_batch_query(self):
11371157 Row (id = "1" , countAsString = "2" ),
11381158 }
11391159
1160+ @unittest .skip ("Temporarily disabled for testing" )
11401161 def test_transform_with_state_batch_query_initial_state (self ):
11411162 data = [("0" , 123 ), ("0" , 46 ), ("1" , 146 ), ("1" , 346 )]
11421163 df = self .spark .createDataFrame (data , "id string, temperature int" )
@@ -1181,9 +1202,11 @@ def test_transform_with_state_batch_query_initial_state(self):
11811202 @unittest .skipIf (
11821203 "COVERAGE_PROCESS_START" in os .environ , "Flaky with coverage enabled, skipping for now."
11831204 )
1205+ @unittest .skip ("Temporarily disabled for testing" )
11841206 def test_transform_with_map_state_metadata (self ):
11851207 self ._test_transform_with_map_state_metadata (None )
11861208
1209+ @unittest .skip ("Temporarily disabled for testing" )
11871210 def test_transform_with_map_state_metadata_with_init_state (self ):
11881211 # run the same test suite again but with no-op initial state
11891212 # TWS with initial state is using a different python runner
@@ -1316,6 +1339,7 @@ def check_results(batch_df, batch_id):
13161339 )
13171340
13181341 # This test covers multiple list state variables and flatten option
1342+ @unittest .skip ("Temporarily disabled for testing" )
13191343 def test_transform_with_list_state_metadata (self ):
13201344 checkpoint_path = tempfile .mktemp ()
13211345
@@ -1396,6 +1420,7 @@ def check_results(batch_df, batch_id):
13961420
13971421 # This test covers value state variable and read change feed,
13981422 # snapshotStartBatchId related options
1423+ @unittest .skip ("Temporarily disabled for testing" )
13991424 def test_transform_with_value_state_metadata (self ):
14001425 checkpoint_path = tempfile .mktemp ()
14011426
@@ -1486,6 +1511,7 @@ def check_results(batch_df, batch_id):
14861511 checkpoint_path = checkpoint_path ,
14871512 )
14881513
1514+ @unittest .skip ("Temporarily disabled for testing" )
14891515 def test_transform_with_state_restart_with_multiple_rows_init_state (self ):
14901516 def check_results (batch_df , _ ):
14911517 batch_df .collect ()
@@ -1545,6 +1571,7 @@ def dataframe_to_value_list(output_df):
15451571 initial_state = init_df ,
15461572 )
15471573
1574+ @unittest .skip ("Temporarily disabled for testing" )
15481575 def test_transform_with_state_in_pandas_composite_type (self ):
15491576 def check_results (batch_df , batch_id ):
15501577 if batch_id == 0 :
@@ -1904,6 +1931,7 @@ def check_results(batch_df, batch_id):
19041931 )
19051932
19061933 # run the same test suites again but with single shuffle partition
1934+ @unittest .skip ("Temporarily disabled for testing" )
19071935 def test_transform_with_state_with_timers_single_partition (self ):
19081936 with self .sql_conf ({"spark.sql.shuffle.partitions" : "1" }):
19091937 self .test_transform_with_state_init_state_with_timers ()
@@ -1952,6 +1980,7 @@ def _run_evolution_test(self, processor_factory, checkpoint_dir, check_results,
19521980 q .processAllAvailable ()
19531981 q .awaitTermination (10 )
19541982
1983+ @unittest .skip ("Temporarily disabled for testing" )
19551984 def test_schema_evolution_scenarios (self ):
19561985 """Test various schema evolution scenarios"""
19571986 with self .sql_conf ({"spark.sql.streaming.stateStore.encodingFormat" : "avro" }):
@@ -2020,6 +2049,7 @@ def check_upcast(batch_df, batch_id):
20202049
20212050 # This test case verifies that an exception is thrown when downcasting, which violates
20222051 # Avro's schema evolution rules
2052+ @unittest .skip ("Temporarily disabled for testing" )
20232053 def test_schema_evolution_fails (self ):
20242054 with self .sql_conf ({"spark.sql.streaming.stateStore.encodingFormat" : "avro" }):
20252055 with tempfile .TemporaryDirectory () as checkpoint_dir :
@@ -2072,6 +2102,7 @@ def check_basic_state(batch_df, batch_id):
20722102 and "Schema evolution is not possible" in error_msg
20732103 )
20742104
2105+ @unittest .skip ("Temporarily disabled for testing" )
20752106 def test_not_nullable_fails (self ):
20762107 with self .sql_conf ({"spark.sql.streaming.stateStore.encodingFormat" : "avro" }):
20772108 with tempfile .TemporaryDirectory () as checkpoint_dir :
@@ -2104,6 +2135,7 @@ def check_basic_state(batch_df, batch_id):
21042135 and "column family state must be nullable" in error_msg
21052136 )
21062137
2138+ @unittest .skip ("Temporarily disabled for testing" )
21072139 def test_transform_with_state_int_to_decimal_coercion (self ):
21082140 if not self .use_pandas ():
21092141 return
0 commit comments