Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class FinishedOnRestoreInput<IN> implements Input<IN> {
private final int inputCount;

private int watermarksSeen = 0;
private int finishedStatusSeen = 0;
Copy link
Contributor

@davidradl davidradl Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious on you thinking on how this new status effects https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream-v2/watermark/#emit-watermark customizations.


public FinishedOnRestoreInput(RecordWriterOutput<?>[] streamOutputs, int inputCount) {
this.streamOutputs = streamOutputs;
Expand Down Expand Up @@ -58,7 +59,22 @@ public void processWatermark(Watermark watermark) {

@Override
public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
throw new IllegalStateException();
// Tasks that finished on restore may receive FINISHED watermark status from upstream.
// Aggregate FINISHED status from all inputs and emit once all are received, similar to
// how MAX_WATERMARK is handled. This ensures proper watermark status propagation.
if (watermarkStatus.isFinished()) {
if (++finishedStatusSeen == inputCount) {
for (RecordWriterOutput<?> streamOutput : streamOutputs) {
streamOutput.emitWatermarkStatus(watermarkStatus);
}
}
return;
}
// Other watermark statuses (ACTIVE, IDLE) should not occur for finished tasks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits:

  • you could could fail fast an issue the Exception if (!watermarkStatus.isFinished()) {.throw }
    then you would not need the return.
  • can we put out an identifier of the task in the Exception to help with diagnostics.

throw new IllegalStateException(
String.format(
"Unexpected watermark status [%s] received for finished on restore task",
watermarkStatus));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,4 +340,9 @@ protected void advanceToEndOfEventTime() throws Exception {
sourceOutput.emitWatermark(Watermark.MAX_WATERMARK);
}
}

@Override
protected void emitFinishedStatus() {
emitFinishedStatusToOutputs();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ protected void advanceToEndOfEventTime() {
output.emitWatermark(Watermark.MAX_WATERMARK);
}

@Override
protected void emitFinishedStatus() {
emitFinishedStatusToOutputs();
}

@Override
protected void declineCheckpoint(long checkpointId) {
cleanupCheckpoint(checkpointId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ public void triggerCheckpoint(long checkpointId) throws FlinkException {
recordWriter.setMaxOverdraftBuffersPerGate(0);
}

@Override
protected void emitFinishedStatus() {
emitFinishedStatusToOutputs();
}

@Override
protected void advanceToEndOfEventTime() throws Exception {
operatorChain.getMainOperatorOutput().emitWatermark(Watermark.MAX_WATERMARK);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.apache.flink.streaming.runtime.tasks.mailbox.PeriodTimer;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.FlinkException;
Expand Down Expand Up @@ -697,6 +698,7 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E
protected void endData(StopMode mode) throws Exception {

if (mode == StopMode.DRAIN) {
emitFinishedStatus();
advanceToEndOfEventTime();
}
// finish all operators in a chain effect way
Expand Down Expand Up @@ -746,6 +748,41 @@ private boolean isCurrentSyncSavepoint(long checkpointId) {
*/
protected void advanceToEndOfEventTime() throws Exception {}

/**
* Emits FINISHED watermark status to indicate this task has permanently completed and should be
* excluded from watermark aggregation in downstream operators.
*
* <p>This method is overridden by source tasks to emit FINISHED status directly to downstream
* via network outputs. Processing tasks (e.g., OneInputStreamTask, TwoInputStreamTask) do not
* override this method because they rely on StatusWatermarkValve to aggregate FINISHED status
* from upstream inputs and propagate it downstream automatically.
*/
protected void emitFinishedStatus() {
Copy link
Contributor

@davidradl davidradl Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we arrange the class hierarchy so that emitFinishedStatus() is only overridable for sources?

// Empty by default - only source tasks override this method.
}

/**
* Helper method to emit FINISHED watermark status to all stream outputs. Subclasses can call
* this from their emitFinishedStatus() override if needed.
*/
protected void emitFinishedStatusToOutputs() {
try {
if (operatorChain != null) {
RecordWriterOutput<?>[] streamOutputs = operatorChain.getStreamOutputs();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have no stream outputs? if so we could issue another LOG.warn("Cannot emit FINISHED watermark status: operator chain has no stream outputs");)

for (RecordWriterOutput<?> output : streamOutputs) {
output.emitWatermarkStatus(WatermarkStatus.FINISHED);
}
LOG.debug(
"Successfully emitted FINISHED watermark status via {} outputs",
streamOutputs.length);
} else {
LOG.warn("Cannot emit FINISHED watermark status: operator chain is null");
}
} catch (Exception e) {
LOG.warn("Failed to emit FINISHED watermark status", e);
}
}

// ------------------------------------------------------------------------
// Core work methods of the Stream Task
// ------------------------------------------------------------------------
Expand Down
Loading