2727import org .apache .flink .configuration .Configuration ;
2828import org .apache .flink .configuration .ExternalizedCheckpointRetention ;
2929import org .apache .flink .configuration .MemorySize ;
30+ import org .apache .flink .configuration .RestartStrategyOptions ;
3031import org .apache .flink .configuration .StateRecoveryOptions ;
3132import org .apache .flink .configuration .TaskManagerOptions ;
3233import org .apache .flink .connector .datagen .source .DataGeneratorSource ;
5859import java .util .Collections ;
5960import java .util .Random ;
6061
62+ import static org .apache .flink .configuration .RestartStrategyOptions .RestartStrategyType .NO_RESTART_STRATEGY ;
63+
6164/**
6265 * Integration test for rescaling jobs with mixed (UC-supported and UC-unsupported) exchanges from
6366 * an unaligned checkpoint.
@@ -81,7 +84,8 @@ public static Collection<ExecuteJobViaEnv> parameter() {
8184 UnalignedCheckpointRescaleWithMixedExchangesITCase ::createMultiOutputDAG ,
8285 UnalignedCheckpointRescaleWithMixedExchangesITCase ::createMultiInputDAG ,
8386 UnalignedCheckpointRescaleWithMixedExchangesITCase ::createRescalePartitionerDAG ,
84- UnalignedCheckpointRescaleWithMixedExchangesITCase ::createMixedComplexityDAG );
87+ UnalignedCheckpointRescaleWithMixedExchangesITCase ::createMixedComplexityDAG ,
88+ UnalignedCheckpointRescaleWithMixedExchangesITCase ::createPartEmptyHashExchangeDAG );
8589 }
8690
8791 @ Before
@@ -138,6 +142,7 @@ private StreamExecutionEnvironment getUnalignedCheckpointEnv(@Nullable String re
138142 conf .set (CheckpointingOptions .CHECKPOINTING_INTERVAL , Duration .ofSeconds (1 ));
139143 // Disable aligned timeout to ensure it works with unaligned checkpoint directly
140144 conf .set (CheckpointingOptions .ALIGNED_CHECKPOINT_TIMEOUT , Duration .ofSeconds (0 ));
145+ conf .set (RestartStrategyOptions .RESTART_STRATEGY , NO_RESTART_STRATEGY .getMainValue ());
141146 conf .set (
142147 CheckpointingOptions .EXTERNALIZED_CHECKPOINT_RETENTION ,
143148 ExternalizedCheckpointRetention .RETAIN_ON_CANCELLATION );
@@ -337,6 +342,53 @@ private static JobClient createMixedComplexityDAG(StreamExecutionEnvironment env
337342 return env .executeAsync ();
338343 }
339344
345+ /**
346+ * Creates a DAG where the downstream MapAfterKeyBy task receives input from two hash exchanges:
347+ * one with actual data and one that is empty due to filtering. This tests unaligned checkpoint
348+ * rescaling with mixed empty and non-empty hash partitions.
349+ */
350+ private static JobClient createPartEmptyHashExchangeDAG (StreamExecutionEnvironment env )
351+ throws Exception {
352+ int source1Parallelism = getRandomParallelism ();
353+ DataGeneratorSource <Long > source1 =
354+ new DataGeneratorSource <>(
355+ index -> index ,
356+ Long .MAX_VALUE ,
357+ RateLimiterStrategy .perSecond (5000 ),
358+ Types .LONG );
359+ DataStream <Long > sourceStream1 =
360+ env .fromSource (source1 , WatermarkStrategy .noWatermarks (), "Source 1" )
361+ .setParallelism (source1Parallelism );
362+
363+ int source2Parallelism = getRandomParallelism ();
364+ DataGeneratorSource <Long > source2 =
365+ new DataGeneratorSource <>(
366+ index -> index ,
367+ Long .MAX_VALUE ,
368+ RateLimiterStrategy .perSecond (5000 ),
369+ Types .LONG );
370+
371+ // Filter all records to simulate empty state exchange
372+ DataStream <Long > sourceStream2 =
373+ env .fromSource (source2 , WatermarkStrategy .noWatermarks (), "Source 2" )
374+ .setParallelism (source2Parallelism )
375+ .filter (value -> false )
376+ .setParallelism (source2Parallelism );
377+
378+ sourceStream1
379+ .union (sourceStream2 )
380+ .keyBy ((KeySelector <Long , Long >) value -> value )
381+ .map (
382+ x -> {
383+ Thread .sleep (5 );
384+ return x ;
385+ })
386+ .name ("MapAfterKeyBy" )
387+ .setParallelism (getRandomParallelism ());
388+
389+ return env .executeAsync ();
390+ }
391+
340392 private static int getRandomParallelism () {
341393 return RANDOM .nextInt (MAX_SLOTS ) + 1 ;
342394 }
0 commit comments