diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index b99061bfe90ec..2574d571699b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -862,15 +862,17 @@ private CompletableFuture restoreStateAndGates( INITIALIZE_STATE_DURATION, initializeStateEndTs - readOutputDataTs); IndexedInputGate[] inputGates = getEnvironment().getAllInputGates(); - channelIOExecutor.execute( - () -> { - try { - reader.readInputData(inputGates); - } catch (Exception e) { - asyncExceptionHandler.handleAsyncException( - "Unable to read channel state", e); - } - }); + if (inputGates.length > 0) { + channelIOExecutor.execute( + () -> { + try { + reader.readInputData(inputGates); + } catch (Exception e) { + asyncExceptionHandler.handleAsyncException( + "Unable to read channel state", e); + } + }); + } // We wait for all input channel state to recover before we go into RUNNING state, and thus // start checkpointing. If we implement incremental checkpointing of input channel state