diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOnRestoreInput.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOnRestoreInput.java index 16dc78447842b..788837c7304d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOnRestoreInput.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOnRestoreInput.java @@ -30,6 +30,7 @@ public class FinishedOnRestoreInput implements Input { private final int inputCount; private int watermarksSeen = 0; + private int finishedStatusSeen = 0; public FinishedOnRestoreInput(RecordWriterOutput[] streamOutputs, int inputCount) { this.streamOutputs = streamOutputs; @@ -58,7 +59,22 @@ public void processWatermark(Watermark watermark) { @Override public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception { - throw new IllegalStateException(); + // Tasks that finished on restore may receive FINISHED watermark status from upstream. + // Aggregate FINISHED status from all inputs and emit once all are received, similar to + // how MAX_WATERMARK is handled. This ensures proper watermark status propagation. + if (watermarkStatus.isFinished()) { + if (++finishedStatusSeen == inputCount) { + for (RecordWriterOutput streamOutput : streamOutputs) { + streamOutput.emitWatermarkStatus(watermarkStatus); + } + } + return; + } + // Other watermark statuses (ACTIVE, IDLE) should not occur for finished tasks + throw new IllegalStateException( + String.format( + "Unexpected watermark status [%s] received for finished on restore task", + watermarkStatus)); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java index 6e45024e1ca55..85b8e14adca36 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java @@ -340,4 +340,9 @@ protected void advanceToEndOfEventTime() throws Exception { sourceOutput.emitWatermark(Watermark.MAX_WATERMARK); } } + + @Override + protected void emitFinishedStatus() { + emitFinishedStatusToOutputs(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java index 8c60a02e8e466..0b66ae814610f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java @@ -218,6 +218,11 @@ protected void advanceToEndOfEventTime() { output.emitWatermark(Watermark.MAX_WATERMARK); } + @Override + protected void emitFinishedStatus() { + emitFinishedStatusToOutputs(); + } + @Override protected void declineCheckpoint(long checkpointId) { cleanupCheckpoint(checkpointId); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index 8768ded79c668..44a0fcef26ea0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -179,6 +179,11 @@ public void triggerCheckpoint(long checkpointId) throws FlinkException { recordWriter.setMaxOverdraftBuffersPerGate(0); } + @Override + protected void emitFinishedStatus() { + emitFinishedStatusToOutputs(); + } + @Override protected void advanceToEndOfEventTime() throws Exception { operatorChain.getMainOperatorOutput().emitWatermark(Watermark.MAX_WATERMARK); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index a7e02d458464f..c02b0a092946e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -106,6 +106,7 @@ import org.apache.flink.streaming.runtime.tasks.mailbox.PeriodTimer; import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox; import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FatalExitExceptionHandler; import org.apache.flink.util.FlinkException; @@ -697,6 +698,7 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E protected void endData(StopMode mode) throws Exception { if (mode == StopMode.DRAIN) { + emitFinishedStatus(); advanceToEndOfEventTime(); } // finish all operators in a chain effect way @@ -746,6 +748,41 @@ private boolean isCurrentSyncSavepoint(long checkpointId) { */ protected void advanceToEndOfEventTime() throws Exception {} + /** + * Emits FINISHED watermark status to indicate this task has permanently completed and should be + * excluded from watermark aggregation in downstream operators. + * + *

This method is overridden by source tasks to emit FINISHED status directly to downstream + * via network outputs. Processing tasks (e.g., OneInputStreamTask, TwoInputStreamTask) do not + * override this method because they rely on StatusWatermarkValve to aggregate FINISHED status + * from upstream inputs and propagate it downstream automatically. + */ + protected void emitFinishedStatus() { + // Empty by default - only source tasks override this method. + } + + /** + * Helper method to emit FINISHED watermark status to all stream outputs. Subclasses can call + * this from their emitFinishedStatus() override if needed. + */ + protected void emitFinishedStatusToOutputs() { + try { + if (operatorChain != null) { + RecordWriterOutput[] streamOutputs = operatorChain.getStreamOutputs(); + for (RecordWriterOutput output : streamOutputs) { + output.emitWatermarkStatus(WatermarkStatus.FINISHED); + } + LOG.debug( + "Successfully emitted FINISHED watermark status via {} outputs", + streamOutputs.length); + } else { + LOG.warn("Cannot emit FINISHED watermark status: operator chain is null"); + } + } catch (Exception e) { + LOG.warn("Failed to emit FINISHED watermark status", e); + } + } + // ------------------------------------------------------------------------ // Core work methods of the Stream Task // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java index 29f63c959d476..5519f2c83d35b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java @@ -28,6 +28,9 @@ import org.apache.flink.streaming.runtime.watermarkstatus.HeapPriorityQueue.HeapPriorityQueueElement; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -47,6 +50,8 @@ @Internal public class StatusWatermarkValve { + private static final Logger LOG = LoggerFactory.getLogger(StatusWatermarkValve.class); + // ------------------------------------------------------------------------ // Runtime state for watermark & watermark status output determination // ------------------------------------------------------------------------ @@ -153,22 +158,37 @@ public StatusWatermarkValve(ResultSubpartitionIndexSet[] subpartitionIndexSets) public void inputWatermark(Watermark watermark, int channelIndex, DataOutput output) throws Exception { final SubpartitionStatus subpartitionStatus; + final int subpartitionIndex; if (watermark instanceof InternalWatermark) { - int subpartitionStatusIndex = ((InternalWatermark) watermark).getSubpartitionIndex(); - subpartitionStatus = - subpartitionStatuses.get(channelIndex).get(subpartitionStatusIndex); + subpartitionIndex = ((InternalWatermark) watermark).getSubpartitionIndex(); + subpartitionStatus = subpartitionStatuses.get(channelIndex).get(subpartitionIndex); } else { - subpartitionStatus = - subpartitionStatuses.get(channelIndex).get(subpartitionIndexes[channelIndex]); + subpartitionIndex = subpartitionIndexes[channelIndex]; + subpartitionStatus = subpartitionStatuses.get(channelIndex).get(subpartitionIndex); } - // ignore the input watermark if its subpartition, or all subpartitions are idle (i.e. - // overall the valve is idle). - if (lastOutputWatermarkStatus.isActive() && subpartitionStatus.watermarkStatus.isActive()) { + WatermarkStatus currentSubpartitionStatus = subpartitionStatus.watermarkStatus; + + // FINISHED subpartitions can only accept Long.MAX_VALUE from upstream to preserve message + // ordering + if (currentSubpartitionStatus.isFinished()) { + if (watermark.getTimestamp() == Long.MAX_VALUE) { + subpartitionStatus.watermark = Long.MAX_VALUE; + tryEmitNewWatermark(output); + } else { + // Ignore non-MAX_VALUE watermarks + LOG.error( + "Channel {} subpartition {} in FINISHED state received a non-MAX watermark ({})." + + " Ignoring it - only MAX_WATERMARK is expected.", + channelIndex, + subpartitionIndex, + watermark.getTimestamp()); + } + } else if (currentSubpartitionStatus.isActive()) { long watermarkMillis = watermark.getTimestamp(); // if the input watermark's value is less than the last received watermark for its - // subpartition, ignore it also. + // subpartition, ignore it. if (watermarkMillis > subpartitionStatus.watermark) { subpartitionStatus.watermark = watermarkMillis; @@ -180,9 +200,20 @@ public void inputWatermark(Watermark watermark, int channelIndex, DataOutput markWatermarkAligned(subpartitionStatus); } - // now, attempt to find a new min watermark across all aligned subpartitions - findAndOutputNewMinWatermarkAcrossAlignedSubpartitions(output); + tryEmitNewWatermark(output); } + } else if (currentSubpartitionStatus.isIdle()) { + // Ignore watermark if subpartition is IDLE + LOG.debug( + "Channel {} subpartition {} is IDLE. Ignoring received watermark ({}).", + channelIndex, + subpartitionIndex, + watermark.getTimestamp()); + } else { + throw new IllegalStateException( + String.format( + "Unknown watermark status for channel %d subpartition %d: %s", + channelIndex, subpartitionIndex, currentSubpartitionStatus)); } } @@ -199,9 +230,17 @@ public void inputWatermark(Watermark watermark, int channelIndex, DataOutput public void inputWatermarkStatus( WatermarkStatus watermarkStatus, int channelIndex, DataOutput output) throws Exception { - // Shared input channel is only enabled in batch jobs, which do not have watermark status - // events. - Preconditions.checkState(!isInputChannelShared); + // Shared input channel is only enabled in batch jobs. Batch jobs do not perform + // watermark-based processing, so watermark status events can be safely ignored. + // Note: Tasks may emit FINISHED status on completion regardless of execution mode. + if (isInputChannelShared) { + LOG.debug( + "Ignoring watermark status {} on channel {} (shared input channels indicate batch mode)", + watermarkStatus, + channelIndex); + return; + } + SubpartitionStatus subpartitionStatus = subpartitionStatuses.get(channelIndex).get(subpartitionIndexes[channelIndex]); @@ -209,79 +248,200 @@ public void inputWatermarkStatus( // consumes multiple subpartitions, so we do not need to map channelIndex into // subpartitionStatusIndex for now, like what is done on Watermarks. - // only account for watermark status inputs that will result in a status change for the - // subpartition - if (watermarkStatus.isIdle() && subpartitionStatus.watermarkStatus.isActive()) { - // handle active -> idle toggle for the subpartition - subpartitionStatus.watermarkStatus = WatermarkStatus.IDLE; - - // the subpartition is now idle, therefore not aligned - markWatermarkUnaligned(subpartitionStatus); - - // if all subpartitions of the valve are now idle, we need to output an idle stream - // status from the valve (this also marks the valve as idle) - if (!SubpartitionStatus.hasActiveSubpartitions(subpartitionStatuses)) { - - // now that all subpartitions are idle and no subpartitions will continue to advance - // its - // watermark, - // we should "flush" all watermarks across all subpartitions; effectively, this - // means - // emitting - // the max watermark across all subpartitions as the new watermark. Also, since we - // already try to advance - // the min watermark as subpartitions individually become IDLE, here we only need to - // perform the flush - // if the watermark of the last active subpartition that just became idle is the - // current - // min watermark. - if (subpartitionStatus.watermark == lastOutputWatermark) { - findAndOutputMaxWatermarkAcrossAllSubpartitions(output); - } + WatermarkStatus currentSubpartitionStatus = subpartitionStatus.watermarkStatus; + + // Ignore if no change + if (watermarkStatus.equals(currentSubpartitionStatus)) { + return; + } + + // Handle all valid status transitions + if (currentSubpartitionStatus.isActive()) { + // Check if this subpartition contributes to current output watermark + boolean shouldUpdateWatermark = subpartitionStatus.watermark == lastOutputWatermark; - lastOutputWatermarkStatus = WatermarkStatus.IDLE; - output.emitWatermarkStatus(lastOutputWatermarkStatus); - } else if (subpartitionStatus.watermark == lastOutputWatermark) { - // if the watermark of the subpartition that just became idle equals the last output - // watermark (the previous overall min watermark), we may be able to find a new - // min watermark from the remaining aligned subpartitions - findAndOutputNewMinWatermarkAcrossAlignedSubpartitions(output); + if (watermarkStatus.isIdle()) { + subpartitionStatus.watermarkStatus = WatermarkStatus.IDLE; + markWatermarkUnaligned(subpartitionStatus); + + // Emit watermark first (final progression before going idle) + if (shouldUpdateWatermark) { + tryEmitNewWatermark(output); + } + // Then update and emit global watermark status + tryEmitNewGlobalWatermarkStatus(output); + } else if (watermarkStatus.isFinished()) { + subpartitionStatus.watermarkStatus = WatermarkStatus.FINISHED; + markWatermarkUnaligned(subpartitionStatus); + + // Emit watermark first if this subpartition was contributing + if (shouldUpdateWatermark) { + tryEmitNewWatermark(output); + } + // Then update and emit global watermark status + tryEmitNewGlobalWatermarkStatus(output); + } else { + throw new IllegalStateException( + "Invalid ACTIVE -> " + + watermarkStatus + + " transition for channel " + + channelIndex + + " subpartition " + + subpartitionIndexes[channelIndex]); } - } else if (watermarkStatus.isActive() && subpartitionStatus.watermarkStatus.isIdle()) { - // handle idle -> active toggle for the subpartition - subpartitionStatus.watermarkStatus = WatermarkStatus.ACTIVE; - - // if the last watermark of the subpartition, before it was marked idle, is still - // larger than - // the overall last output watermark of the valve, then we can set the subpartition to - // be - // aligned already. - if (subpartitionStatus.watermark >= lastOutputWatermark) { - markWatermarkAligned(subpartitionStatus); + } else if (currentSubpartitionStatus.isIdle()) { + // When transitioning from IDLE, no watermark emission is needed because: + // - If there are ACTIVE subpartitions: this IDLE subpartition wasn't contributing to + // min watermark calculation, so its transition doesn't change the min + // - If all subpartitions are IDLE: lastOutputWatermark = max(idle subpartitions), + // and this subpartition's watermark <= lastOutputWatermark (by definition of max) + // - Watermarks never rewind, so no new watermark can be emitted at this transition + + if (watermarkStatus.isActive()) { + subpartitionStatus.watermarkStatus = WatermarkStatus.ACTIVE; + // Check if watermark has caught up + if (subpartitionStatus.watermark >= lastOutputWatermark) { + markWatermarkAligned(subpartitionStatus); + } + // Update and emit global watermark status + tryEmitNewGlobalWatermarkStatus(output); + } else if (watermarkStatus.isFinished()) { + subpartitionStatus.watermarkStatus = WatermarkStatus.FINISHED; + markWatermarkUnaligned(subpartitionStatus); + // Update and emit global watermark status + tryEmitNewGlobalWatermarkStatus(output); + } else { + throw new IllegalStateException( + "Invalid IDLE -> " + + watermarkStatus + + " transition for channel " + + channelIndex + + " subpartition " + + subpartitionIndexes[channelIndex]); } + } else if (currentSubpartitionStatus.isFinished()) { + LOG.debug( + "Channel {} subpartition {} is in FINISHED state. Ignoring transition to {}.", + channelIndex, + subpartitionIndexes[channelIndex], + watermarkStatus); + } else { + throw new IllegalStateException( + "Invalid status transition for channel " + + channelIndex + + " subpartition " + + subpartitionIndexes[channelIndex] + + ": currentStatus=" + + currentSubpartitionStatus + + ", newStatus=" + + watermarkStatus); + } + } - // if the valve was previously marked to be idle, mark it as active and output an active - // stream - // status because at least one of the subpartitions is now active - if (lastOutputWatermarkStatus.isIdle()) { - lastOutputWatermarkStatus = WatermarkStatus.ACTIVE; - output.emitWatermarkStatus(lastOutputWatermarkStatus); + // Calculate and emit new watermark if it progresses. + private void tryEmitNewWatermark(DataOutput output) throws Exception { + Long newWatermark = calculateWatermarkByAggregationRules(); + if (newWatermark != null && newWatermark > lastOutputWatermark) { + lastOutputWatermark = newWatermark; + output.emitWatermark(new Watermark(lastOutputWatermark)); + } + } + + // Calculate and emit new operator-level (global) status. + private void tryEmitNewGlobalWatermarkStatus(DataOutput output) throws Exception { + WatermarkStatus newGlobalStatus = calculateGlobalWatermarkStatus(); + if (!newGlobalStatus.equals(lastOutputWatermarkStatus)) { + lastOutputWatermarkStatus = newGlobalStatus; + output.emitWatermarkStatus(newGlobalStatus); + } + } + + /** + * Calculates watermark based on the clear aggregation rules. + * + *

    + *
  1. If there are ACTIVE subpartitions: watermark = min(active_subpartitions) + *
  2. Else if there are IDLE subpartitions: watermark = max(idle_subpartitions) + *
  3. Else (all subpartitions FINISHED): watermark = Long.MAX_VALUE if all have received it + *
+ */ + private Long calculateWatermarkByAggregationRules() { + if (SubpartitionStatus.hasActiveSubpartitions(subpartitionStatuses)) { + // Rule 1: ACTIVE subpartitions exist -> min(active_subpartitions) + return calculateMinWatermarkFromAlignedChannels(); + } else if (SubpartitionStatus.hasIdleSubpartitions(subpartitionStatuses)) { + // Rule 2: Only IDLE subpartitions (no active) -> max(idle_subpartitions) + return calculateMaxWatermarkFromNonFinishedChannels(); + } else { + // Rule 3: All subpartitions FINISHED -> emit Long.MAX_VALUE only when all have received + // it + if (allSubpartitionsFinishedWithMaxValue()) { + return Long.MAX_VALUE; + } else { + return null; // Wait for all subpartitions to receive Long.MAX_VALUE from upstream } } } - private void findAndOutputNewMinWatermarkAcrossAlignedSubpartitions(DataOutput output) - throws Exception { + /** + * Calculates operator-level (global) watermark status by aggregating subpartition states. + * + *
    + *
  1. If there are ACTIVE subpartitions: status = ACTIVE + *
  2. Else if there are IDLE subpartitions: status = IDLE + *
  3. Else (all subpartitions FINISHED): status = FINISHED + *
+ */ + private WatermarkStatus calculateGlobalWatermarkStatus() { + if (SubpartitionStatus.hasActiveSubpartitions(subpartitionStatuses)) { + // Rule 1: At least one ACTIVE subpartition -> operator is ACTIVE + return WatermarkStatus.ACTIVE; + } else if (SubpartitionStatus.hasIdleSubpartitions(subpartitionStatuses)) { + // Rule 2: No ACTIVE, at least one IDLE subpartition -> operator is IDLE + return WatermarkStatus.IDLE; + } else { + // Rule 3: All subpartitions FINISHED -> operator is FINISHED + // (Implicitly validated: if not ACTIVE and not IDLE, must be FINISHED) + return WatermarkStatus.FINISHED; + } + } + + // Calculates minimum watermark from aligned (active) subpartitions. + private Long calculateMinWatermarkFromAlignedChannels() { boolean hasAlignedSubpartitions = !alignedSubpartitionStatuses.isEmpty(); + return hasAlignedSubpartitions ? alignedSubpartitionStatuses.peek().watermark : null; + } - // we acknowledge and output the new overall watermark if it really is aggregated - // from some remaining aligned subpartition, and is also larger than the last output - // watermark - if (hasAlignedSubpartitions - && alignedSubpartitionStatuses.peek().watermark > lastOutputWatermark) { - lastOutputWatermark = alignedSubpartitionStatuses.peek().watermark; - output.emitWatermark(new Watermark(lastOutputWatermark)); + // Calculates maximum watermark from non-finished (idle) subpartitions. + private Long calculateMaxWatermarkFromNonFinishedChannels() { + long maxWatermark = Long.MIN_VALUE; + boolean hasNonFinishedChannels = false; + + for (Map map : subpartitionStatuses) { + for (SubpartitionStatus subpartitionStatus : map.values()) { + if (!subpartitionStatus.watermarkStatus.isFinished()) { + hasNonFinishedChannels = true; + maxWatermark = Math.max(subpartitionStatus.watermark, maxWatermark); + } + } + } + + return hasNonFinishedChannels ? maxWatermark : null; + } + + /** + * Checks if all subpartitions are FINISHED and have received Long.MAX_VALUE watermark. This is + * the condition for emitting Long.MAX_VALUE downstream. + */ + private boolean allSubpartitionsFinishedWithMaxValue() { + for (Map map : subpartitionStatuses) { + for (SubpartitionStatus status : map.values()) { + if (!status.watermarkStatus.isFinished() || status.watermark != Long.MAX_VALUE) { + return false; + } + } } + return true; } /** @@ -322,22 +482,6 @@ private void adjustAlignedSubpartitionStatuses(SubpartitionStatus subpartitionSt alignedSubpartitionStatuses.adjustModifiedElement(subpartitionStatus); } - private void findAndOutputMaxWatermarkAcrossAllSubpartitions(DataOutput output) - throws Exception { - long maxWatermark = Long.MIN_VALUE; - - for (Map map : subpartitionStatuses) { - for (SubpartitionStatus subpartitionStatus : map.values()) { - maxWatermark = Math.max(subpartitionStatus.watermark, maxWatermark); - } - } - - if (maxWatermark > lastOutputWatermark) { - lastOutputWatermark = maxWatermark; - output.emitWatermark(new Watermark(lastOutputWatermark)); - } - } - /** * An {@code SubpartitionStatus} keeps track of a subpartition's last watermark, stream status, * and whether or not the subpartition's current watermark is aligned with the overall watermark @@ -382,6 +526,21 @@ private static boolean hasActiveSubpartitions( return false; } + /** + * Utility to check if at least one subpartition in a given array of subpartitions is idle. + */ + private static boolean hasIdleSubpartitions( + List> subpartitionStatuses) { + for (Map map : subpartitionStatuses) { + for (SubpartitionStatus status : map.values()) { + if (status.watermarkStatus.isIdle()) { + return true; + } + } + } + return false; + } + @Override public int getInternalIndex() { return heapIndex; diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/WatermarkStatus.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/WatermarkStatus.java index 0a389a92643e0..6312fb42026ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/WatermarkStatus.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/WatermarkStatus.java @@ -81,21 +81,25 @@ public final class WatermarkStatus extends StreamElement { public static final int IDLE_STATUS = -1; public static final int ACTIVE_STATUS = 0; + public static final int FINISHED_STATUS = 1; public static final WatermarkStatus IDLE = new WatermarkStatus(IDLE_STATUS); public static final WatermarkStatus ACTIVE = new WatermarkStatus(ACTIVE_STATUS); + public static final WatermarkStatus FINISHED = new WatermarkStatus(FINISHED_STATUS); public final int status; public WatermarkStatus(int status) { - if (status != IDLE_STATUS && status != ACTIVE_STATUS) { + if (status != IDLE_STATUS && status != ACTIVE_STATUS && status != FINISHED_STATUS) { throw new IllegalArgumentException( "Invalid status value for WatermarkStatus; " + "allowed values are " + ACTIVE_STATUS - + " (for ACTIVE) and " + + " (for ACTIVE), " + IDLE_STATUS - + " (for IDLE)."); + + " (for IDLE), and " + + FINISHED_STATUS + + " (for FINISHED)."); } this.status = status; @@ -106,7 +110,11 @@ public boolean isIdle() { } public boolean isActive() { - return !isIdle(); + return this.status == ACTIVE_STATUS; + } + + public boolean isFinished() { + return this.status == FINISHED_STATUS; } public int getStatus() { @@ -128,7 +136,20 @@ public int hashCode() { @Override public String toString() { - String statusStr = (status == ACTIVE_STATUS) ? "ACTIVE" : "IDLE"; + String statusStr; + switch (status) { + case ACTIVE_STATUS: + statusStr = "ACTIVE"; + break; + case IDLE_STATUS: + statusStr = "IDLE"; + break; + case FINISHED_STATUS: + statusStr = "FINISHED"; + break; + default: + statusStr = "UNKNOWN"; + } return "WatermarkStatus(" + statusStr + ")"; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValveTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValveTest.java index e139f4c48661c..faf41bb83a508 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValveTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValveTest.java @@ -431,6 +431,302 @@ void testUnalignedActiveChannelBecomesIdle() throws Exception { assertThat(valveOutput.popLastSeenOutput()).isNull(); } + /** + * Tests that FINISHED channels are excluded from min watermark calculation, allowing remaining + * channels to advance the watermark. + */ + @Test + void testFinishedChannelExcludedFromMinWatermark() throws Exception { + StatusWatermarkValve valve = new StatusWatermarkValve(2); + StatusWatermarkOutput valveOutput = new StatusWatermarkOutput(); + + valve.inputWatermark(new Watermark(5), 0, valveOutput); + valve.inputWatermark(new Watermark(10), 1, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(new Watermark(5)); + + // Mark channel 0's subpartition as FINISHED - should exclude it from min calculation + valve.inputWatermarkStatus(WatermarkStatus.FINISHED, 0, valveOutput); + assertThat(valveOutput.popLastSeenOutput()) + .isEqualTo(new Watermark(10)); // Now min is 10, not 5 + + assertThat(valveOutput.popLastSeenOutput()).isNull(); + } + + /** + * Tests that FINISHED channels are excluded from max watermark calculation when all other + * channels become IDLE. + */ + @Test + void testFinishedExcludedFromIdleMaxCalculation() throws Exception { + StatusWatermarkValve valve = new StatusWatermarkValve(3); + StatusWatermarkOutput valveOutput = new StatusWatermarkOutput(); + + valve.inputWatermark(new Watermark(10), 0, valveOutput); + valve.inputWatermark(new Watermark(20), 1, valveOutput); + valve.inputWatermark(new Watermark(5), 2, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(new Watermark(5)); + + // Mark channel 1's subpartition (watermark 20) as FINISHED + valve.inputWatermarkStatus(WatermarkStatus.FINISHED, 1, valveOutput); + // No watermark change since min of remaining subpartitions (10,5) is still 5 + assertThat(valveOutput.popLastSeenOutput()).isNull(); + + // Mark remaining subpartitions as IDLE + valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, valveOutput); + valve.inputWatermarkStatus(WatermarkStatus.IDLE, 2, valveOutput); + + // Should emit max of non-FINISHED subpartitions (10,5) = 10, not including FINISHED + // subpartition (20) + assertThat(valveOutput.popLastSeenOutput()) + .isEqualTo(new Watermark(10)); // max(10,5) = 10, excludes 20 + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(WatermarkStatus.IDLE); + + assertThat(valveOutput.popLastSeenOutput()).isNull(); + } + + /** + * Comprehensive test for FINISHED channels. + * + *
    + *
  1. Reject non-MAX_VALUE watermarks + *
  2. Accept MAX_VALUE + *
  3. Aggregate across channels to emit Long.MAX_VALUE once when all FINISHED channels + * receive it + *
  4. Prevent duplicate emissions + *
+ */ + @Test + void testFinishedChannelWatermarkHandling() throws Exception { + StatusWatermarkValve valve = new StatusWatermarkValve(2); + StatusWatermarkOutput valveOutput = new StatusWatermarkOutput(); + + // Setup: both channels active with watermarks + valve.inputWatermark(new Watermark(100), 0, valveOutput); + valve.inputWatermark(new Watermark(200), 1, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(new Watermark(100)); + + // Mark both channels as FINISHED + valve.inputWatermarkStatus(WatermarkStatus.FINISHED, 0, valveOutput); + // FINISHED status does NOT change watermark - remains at 200 + // FINISHED status should NOT modify the channel's watermark + // Watermark remains unchanged until Long.MAX_VALUE flows from upstream + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(new Watermark(200)); + // Channel should be marked as unaligned + assertThat(valve.getSubpartitionStatus(0).isWatermarkAligned).isFalse(); + + valve.inputWatermarkStatus(WatermarkStatus.FINISHED, 1, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(WatermarkStatus.FINISHED); + // No watermark emitted. Long.MAX_VALUE will come via inputWatermark() to preserve message + // ordering + assertThat(valveOutput.popLastSeenOutput()).isNull(); + + // FINISHED channels ignore non-MAX_VALUE watermarks (preserve message ordering) + valve.inputWatermark(new Watermark(150), 0, valveOutput); + valve.inputWatermark(new Watermark(250), 1, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isNull(); // Both ignored + assertThat(valve.getSubpartitionStatus(0).watermark).isEqualTo(100L); // Unchanged + assertThat(valve.getSubpartitionStatus(1).watermark).isEqualTo(200L); // Unchanged + + // Channel 0 receives Long.MAX_VALUE from upstream + valve.inputWatermark(new Watermark(Long.MAX_VALUE), 0, valveOutput); + assertThat(valve.getSubpartitionStatus(0).watermark).isEqualTo(Long.MAX_VALUE); + assertThat(valveOutput.popLastSeenOutput()).isNull(); // Not all channels have it yet + + // Channel 1 receives Long.MAX_VALUE from upstream + valve.inputWatermark(new Watermark(Long.MAX_VALUE), 1, valveOutput); + assertThat(valve.getSubpartitionStatus(1).watermark).isEqualTo(Long.MAX_VALUE); + + // Now ALL FINISHED channels have Long.MAX_VALUE - emit once + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(new Watermark(Long.MAX_VALUE)); + assertThat(valveOutput.popLastSeenOutput()).isNull(); + + // Duplicate Long.MAX_VALUE should NOT emit again + valve.inputWatermark(new Watermark(Long.MAX_VALUE), 0, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isNull(); + } + + /** + * Tests that FINISHED is a terminal state and channels cannot transition from FINISHED back to + * ACTIVE or IDLE. + */ + @Test + void testFinishedIsTerminalState() throws Exception { + StatusWatermarkValve valve = new StatusWatermarkValve(1); + StatusWatermarkOutput valveOutput = new StatusWatermarkOutput(); + + valve.inputWatermarkStatus(WatermarkStatus.FINISHED, 0, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(WatermarkStatus.FINISHED); + + // Attempt FINISHED -> ACTIVE (should be ignored) + valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 0, valveOutput); + assertThat(valve.getSubpartitionStatus(0).watermarkStatus.isFinished()).isTrue(); + assertThat(valveOutput.popLastSeenOutput()).isNull(); + + // Attempt FINISHED -> IDLE (should be ignored) + valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, valveOutput); + assertThat(valve.getSubpartitionStatus(0).watermarkStatus.isFinished()).isTrue(); + assertThat(valveOutput.popLastSeenOutput()).isNull(); + } + + /** + * Tests ACTIVE to IDLE and ACTIVE to FINISHED transitions in various configurations, including + * global status changes. + * + *
    + *
  1. ACTIVE → IDLE when other ACTIVE channels exist (global stays ACTIVE) + *
  2. ACTIVE → FINISHED when other ACTIVE channels exist (global stays ACTIVE) + *
  3. Last ACTIVE → IDLE (global becomes IDLE) + *
  4. Last ACTIVE → FINISHED when IDLE channels exist (global becomes IDLE) + *
  5. All IDLE → FINISHED (global becomes FINISHED) + *
+ */ + @Test + void testActiveTransitions() throws Exception { + StatusWatermarkValve valve = new StatusWatermarkValve(4); + StatusWatermarkOutput valveOutput = new StatusWatermarkOutput(); + + valve.inputWatermark(new Watermark(10), 0, valveOutput); + valve.inputWatermark(new Watermark(20), 1, valveOutput); + valve.inputWatermark(new Watermark(15), 2, valveOutput); + valve.inputWatermark(new Watermark(12), 3, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(new Watermark(10)); + + // Case 1: ACTIVE → IDLE when other ACTIVE channels exist (global stays ACTIVE) + valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(new Watermark(12)); + assertThat(valveOutput.popLastSeenOutput()).isNull(); + + // Case 2: ACTIVE → FINISHED when other ACTIVE channels exist (global stays ACTIVE) + valve.inputWatermarkStatus(WatermarkStatus.FINISHED, 3, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(new Watermark(15)); + assertThat(valve.getSubpartitionStatus(3).isWatermarkAligned).isFalse(); + assertThat(valveOutput.popLastSeenOutput()).isNull(); + + // Case 3: Last ACTIVE → IDLE (global becomes IDLE) + valve.inputWatermarkStatus(WatermarkStatus.IDLE, 1, valveOutput); + valve.inputWatermarkStatus(WatermarkStatus.IDLE, 2, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(new Watermark(20)); + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(WatermarkStatus.IDLE); + + // Setup for next case: bring one channel back to ACTIVE + valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 1, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(WatermarkStatus.ACTIVE); + + // Case 4: Last ACTIVE → FINISHED when IDLE channels exist (global becomes IDLE) + valve.inputWatermarkStatus(WatermarkStatus.FINISHED, 1, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(WatermarkStatus.IDLE); + assertThat(valve.getSubpartitionStatus(1).isWatermarkAligned).isFalse(); + + // Case 5: All IDLE → FINISHED (global becomes FINISHED) + valve.inputWatermarkStatus(WatermarkStatus.FINISHED, 0, valveOutput); + valve.inputWatermarkStatus(WatermarkStatus.FINISHED, 2, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(WatermarkStatus.FINISHED); + assertThat(valveOutput.popLastSeenOutput()).isNull(); + } + + /** + * Tests IDLE to ACTIVE transitions with both aligned and unaligned watermarks, verifying + * realignment behavior. + * + *
    + *
  1. IDLE → ACTIVE when already caught up (watermark >= lastOutput) + *
  2. IDLE → ACTIVE when behind (watermark < lastOutput) + *
+ */ + @Test + void testIdleToActiveTransition() throws Exception { + StatusWatermarkValve valve = new StatusWatermarkValve(3); + StatusWatermarkOutput valveOutput = new StatusWatermarkOutput(); + + valve.inputWatermark(new Watermark(10), 0, valveOutput); + valve.inputWatermark(new Watermark(20), 1, valveOutput); + valve.inputWatermark(new Watermark(15), 2, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(new Watermark(10)); + + // All channels go IDLE + valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, valveOutput); + assertThat(valveOutput.popLastSeenOutput()) + .isEqualTo(new Watermark(15)); // min of remaining active (Ch1=20, Ch2=15) + valve.inputWatermarkStatus(WatermarkStatus.IDLE, 1, valveOutput); + valve.inputWatermarkStatus(WatermarkStatus.IDLE, 2, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(new Watermark(20)); // max of all idle + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(WatermarkStatus.IDLE); + + // Case 1: IDLE -> ACTIVE when already caught up (watermark >= lastOutput) + valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 1, valveOutput); + assertThat(valveOutput.popLastSeenOutput()) + .isEqualTo(WatermarkStatus.ACTIVE); // Global becomes ACTIVE + assertThat(valve.getSubpartitionStatus(1).isWatermarkAligned) + .isTrue(); // 20 >= 20, aligned immediately + + // Case 2: IDLE -> ACTIVE when behind (watermark < lastOutput) + valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 0, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isNull(); // No status change (already ACTIVE) + assertThat(valve.getSubpartitionStatus(0).isWatermarkAligned) + .isFalse(); // 10 < 20, unaligned + + // Verify unaligned channel doesn't affect min watermark + valve.inputWatermark(new Watermark(25), 1, valveOutput); + assertThat(valveOutput.popLastSeenOutput()) + .isEqualTo(new Watermark(25)); // Only Ch1 aligned + + // Ch0 catches up and becomes aligned (but min doesn't change, so no watermark emission) + valve.inputWatermark(new Watermark(26), 0, valveOutput); + assertThat(valve.getSubpartitionStatus(0).isWatermarkAligned).isTrue(); + assertThat(valveOutput.popLastSeenOutput()).isNull(); // min still 25, no progression + } + + /** + * Tests IDLE to FINISHED transitions in various configurations, verifying global status changes + * and watermark progression. + * + *
    + *
  1. IDLE → FINISHED when ACTIVE channels exist (global stays ACTIVE) + *
  2. IDLE → FINISHED when other IDLE channels exist (global stays IDLE) + *
  3. IDLE → FINISHED as last channel (global becomes FINISHED) + *
+ */ + @Test + void testIdleToFinishedTransition() throws Exception { + StatusWatermarkValve valve = new StatusWatermarkValve(3); + StatusWatermarkOutput valveOutput = new StatusWatermarkOutput(); + + valve.inputWatermark(new Watermark(10), 0, valveOutput); + valve.inputWatermark(new Watermark(15), 1, valveOutput); + valve.inputWatermark(new Watermark(12), 2, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(new Watermark(10)); + + // Ch0 goes IDLE + valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(new Watermark(12)); + + // Case 1: IDLE -> FINISHED when ACTIVE channels exist (global stays ACTIVE) + valve.inputWatermarkStatus(WatermarkStatus.FINISHED, 0, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isNull(); // No watermark, no status change + assertThat(valve.getSubpartitionStatus(0).watermarkStatus.isFinished()).isTrue(); + assertThat(valve.getSubpartitionStatus(0).isWatermarkAligned).isFalse(); + + // Ch1 goes IDLE (no watermark change since Ch2 still has min=12) + valve.inputWatermarkStatus(WatermarkStatus.IDLE, 1, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isNull(); + + // Ch2 goes IDLE (all non-FINISHED are IDLE, emit max of idle channels, then global becomes + // IDLE) + valve.inputWatermarkStatus(WatermarkStatus.IDLE, 2, valveOutput); + assertThat(valveOutput.popLastSeenOutput()) + .isEqualTo(new Watermark(15)); // max of idle (Ch1=15, Ch2=12) + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(WatermarkStatus.IDLE); + + // Case 2: IDLE -> FINISHED when other IDLE channels exist (global stays IDLE) + valve.inputWatermarkStatus(WatermarkStatus.FINISHED, 1, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isNull(); // Global stays IDLE (Ch2 still IDLE) + + // Case 3: IDLE -> FINISHED as last channel (global becomes FINISHED) + valve.inputWatermarkStatus(WatermarkStatus.FINISHED, 2, valveOutput); + assertThat(valveOutput.popLastSeenOutput()).isEqualTo(WatermarkStatus.FINISHED); + assertThat(valveOutput.popLastSeenOutput()).isNull(); + } + private static class StatusWatermarkOutput implements PushingAsyncDataInput.DataOutput { private BlockingQueue allOutputs = new LinkedBlockingQueue<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/WatermarkStatusTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/WatermarkStatusTest.java index 401a67ec2bae2..9377d46a7e65d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/WatermarkStatusTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/WatermarkStatusTest.java @@ -36,14 +36,22 @@ void testIllegalCreationThrowsException() { void testEquals() { WatermarkStatus idleStatus = new WatermarkStatus(WatermarkStatus.IDLE_STATUS); WatermarkStatus activeStatus = new WatermarkStatus(WatermarkStatus.ACTIVE_STATUS); + WatermarkStatus finishedStatus = new WatermarkStatus(WatermarkStatus.FINISHED_STATUS); assertThat(idleStatus).isEqualTo(WatermarkStatus.IDLE); assertThat(idleStatus.isIdle()).isTrue(); assertThat(idleStatus.isActive()).isFalse(); + assertThat(idleStatus.isFinished()).isFalse(); assertThat(activeStatus).isEqualTo(WatermarkStatus.ACTIVE); assertThat(activeStatus.isActive()).isTrue(); assertThat(activeStatus.isIdle()).isFalse(); + assertThat(activeStatus.isFinished()).isFalse(); + + assertThat(finishedStatus).isEqualTo(WatermarkStatus.FINISHED); + assertThat(finishedStatus.isFinished()).isTrue(); + assertThat(finishedStatus.isActive()).isFalse(); + assertThat(finishedStatus.isIdle()).isFalse(); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java index 9605e0fa6637e..020a1e849e6a3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java @@ -21,6 +21,7 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import java.util.ArrayList; import java.util.Arrays; @@ -29,6 +30,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -48,6 +50,23 @@ public static List getRawElementsFromOutput(Queue output) { return resultElements; } + /** + * Filters out {@link WatermarkStatus} elements from the output queue. + * + *

This is useful for tests that focus on data processing or event handling behavior and + * should not be affected by task lifecycle signals. {@link WatermarkStatus#FINISHED} is an + * implementation detail emitted by tasks when they finish, and is not part of the core feature + * being tested in most cases. + * + * @param output the output queue to filter + * @return a new queue with all {@link WatermarkStatus} elements removed + */ + public static ConcurrentLinkedQueue filterOutWatermarkStatus(Queue output) { + return output.stream() + .filter(o -> !(o instanceof WatermarkStatus)) + .collect(Collectors.toCollection(ConcurrentLinkedQueue::new)); + } + /** * Compare the two queues containing operator/task output by converting them to an array first. */ diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java index 93d526a4d0422..56a8f87589aaa 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java @@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.tasks.SourceStreamTask; import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.util.ExceptionUtils; import org.junit.jupiter.api.Test; @@ -49,8 +50,10 @@ void testEmitMaxWatermarkForFiniteSource() throws Exception { testHarness.invoke(); testHarness.waitForTaskCompletion(); - assertThat(testHarness.getOutput()).hasSize(1); - assertThat(testHarness.getOutput().peek()).isEqualTo(Watermark.MAX_WATERMARK); + // Source tasks emit FINISHED status before MAX_WATERMARK when completing with drain + assertThat(testHarness.getOutput()).hasSize(2); + assertThat(testHarness.getOutput().poll()).isEqualTo(WatermarkStatus.FINISHED); + assertThat(testHarness.getOutput().poll()).isEqualTo(Watermark.MAX_WATERMARK); } @Test @@ -66,6 +69,9 @@ void testDisabledProgressiveWatermarksForFiniteSource() throws Exception { // sent by source function assertThat(testHarness.getOutput().poll()).isEqualTo(Watermark.MAX_WATERMARK); + // Source tasks emit FINISHED status when completing with drain + assertThat(testHarness.getOutput().poll()).isEqualTo(WatermarkStatus.FINISHED); + // sent by framework assertThat(testHarness.getOutput().poll()).isEqualTo(Watermark.MAX_WATERMARK); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index f7bbd997ba059..61948fca22da8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -543,8 +543,15 @@ void testOvertakingCheckpointBarriers() throws Exception { testHarness.processAll(); + // Filter out WatermarkStatus (e.g., FINISHED watermark status when task finishes), + // which is a task lifecycle signal (an implementation detail emitted by tasks when they + // finish), not part of the feature being tested. This test focuses on checkpoint + // barrier handling. + ConcurrentLinkedQueue actualOutput = + TestHarnessUtil.filterOutWatermarkStatus(testHarness.getOutput()); + TestHarnessUtil.assertOutputEquals( - "Output was not correct.", expectedOutput, testHarness.getOutput()); + "Output was not correct.", expectedOutput, actualOutput); // Then give the earlier barrier, these should be ignored testHarness.processEvent( @@ -565,8 +572,12 @@ void testOvertakingCheckpointBarriers() throws Exception { testHarness.waitForTaskCompletion(); + // Filter again for final comparison + ConcurrentLinkedQueue finalOutput = + TestHarnessUtil.filterOutWatermarkStatus(testHarness.getOutput()); + TestHarnessUtil.assertOutputEquals( - "Output was not correct.", expectedOutput, testHarness.getOutput()); + "Output was not correct.", expectedOutput, finalOutput); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java index 5648fd96466d3..8b1e83ec1e656 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java @@ -58,6 +58,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.LifeCycleMonitor.LifeCyclePhase; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.util.SerializedValue; import org.assertj.core.api.Assertions; @@ -132,7 +133,9 @@ void testSnapshotAndAdvanceToEndOfEventTime() throws Exception { CheckpointStorageLocationReference.getDefault()); triggerCheckpointWaitForFinish(testHarness, checkpointId, checkpointOptions); + // Source tasks emit FINISHED status before MAX_WATERMARK when completing with drain Queue expectedOutput = new LinkedList<>(); + expectedOutput.add(WatermarkStatus.FINISHED); expectedOutput.add(Watermark.MAX_WATERMARK); expectedOutput.add(new EndOfData(StopMode.DRAIN)); expectedOutput.add( @@ -148,7 +151,9 @@ void testEmittingMaxWatermarkAfterReadingAllRecords() throws Exception { testHarness.processAll(); testHarness.finishProcessing(); + // Source tasks emit FINISHED status before MAX_WATERMARK when completing with drain Queue expectedOutput = new LinkedList<>(); + expectedOutput.add(WatermarkStatus.FINISHED); expectedOutput.add(Watermark.MAX_WATERMARK); expectedOutput.add(new EndOfData(StopMode.DRAIN)); assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput); @@ -279,8 +284,12 @@ public void notifyEndOfData(StopMode mode) throws IOException { testHarness.getStreamTask().invoke(); testHarness.processAll(); + // Source tasks emit FINISHED status before MAX_WATERMARK when completing with drain assertThat(output) - .containsExactly(Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN)); + .containsExactly( + WatermarkStatus.FINISHED, + Watermark.MAX_WATERMARK, + new EndOfData(StopMode.DRAIN)); LifeCycleMonitorSourceReader sourceReader = (LifeCycleMonitorSourceReader) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java index 66a2cafb733ec..e7806e09d2fef 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -65,6 +65,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.LifeCycleMonitor.LifeCyclePhase; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; @@ -241,11 +242,13 @@ void testClosingAllOperatorsOnChainProperly() throws Exception { testHarness.invoke(); testHarness.waitForTaskCompletion(); + // Source tasks emit FINISHED status before MAX_WATERMARK when completing with drain ArrayList expected = new ArrayList<>(); Collections.addAll( expected, new StreamRecord<>("Hello"), new StreamRecord<>("[Source0]: End of input"), + WatermarkStatus.FINISHED, Watermark.MAX_WATERMARK, new StreamRecord<>("[Source0]: Finish"), new StreamRecord<>("[Operator1]: End of input"), @@ -647,8 +650,12 @@ public void notifyEndOfData(StopMode mode) throws IOException { harness.processAll(); harness.streamTask.getCompletionFuture().get(); + // Source tasks emit FINISHED status before MAX_WATERMARK when completing with drain assertThat(output) - .containsExactly(Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN)); + .containsExactly( + WatermarkStatus.FINISHED, + Watermark.MAX_WATERMARK, + new EndOfData(StopMode.DRAIN)); LifeCycleMonitorSource source = (LifeCycleMonitorSource) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java index a71f8827d6b8c..870a26eddb613 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java @@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -106,7 +107,8 @@ private void stopWithSavepointStreamTaskTestHelper(final boolean shouldTerminate if (shouldTerminate) { // if we are in TERMINATE mode, we expect the source task - // to emit MAX_WM before the SYNC_SAVEPOINT barrier. + // to emit FINISHED status and then MAX_WM before the SYNC_SAVEPOINT barrier. + verifyWatermarkStatus(srcTaskTestHarness.getOutput(), WatermarkStatus.FINISHED); verifyWatermark(srcTaskTestHarness.getOutput(), Watermark.MAX_WATERMARK); } @@ -170,6 +172,14 @@ private void verifyWatermark(Queue output, Watermark expectedWatermark) assertThat(next).as("wrong watermark").isEqualTo(expectedWatermark); } + private void verifyWatermarkStatus(Queue output, WatermarkStatus expectedStatus) { + Object next = output.remove(); + assertThat(next) + .as("next element is not a watermark status") + .isInstanceOf(WatermarkStatus.class); + assertThat(next).as("wrong watermark status").isEqualTo(expectedStatus); + } + private void verifyEvent(Queue output, AbstractEvent expectedEvent) { Object next = output.remove(); assertThat(next).isInstanceOf(expectedEvent.getClass()).isEqualTo(expectedEvent); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java index 9a9c8c44e3ee6..909d27b9fb59b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java @@ -32,6 +32,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.TestAnyModeMultipleInputStreamOperator; import org.apache.flink.streaming.util.TestAnyModeMultipleInputStreamOperator.ToStringInput; +import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.streaming.util.TestSequentialMultipleInputStreamOperator; import org.junit.jupiter.api.Test; @@ -169,11 +170,15 @@ private void testInputSelection( } testHarness.waitForTaskCompletion(); + // Filter out WatermarkStatus (e.g., FINISHED watermark status when task finishes) - + // this test focuses on selective reading behavior. + Queue filteredOutput = + TestHarnessUtil.filterOutWatermarkStatus(testHarness.getOutput()); + if (orderedCheck) { - assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput); + assertThat(filteredOutput).containsExactlyElementsOf(expectedOutput); } else { - assertThat(testHarness.getOutput()) - .containsExactlyInAnyOrderElementsOf(expectedOutput); + assertThat(filteredOutput).containsExactlyInAnyOrderElementsOf(expectedOutput); } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java index 7e32663b053ca..103cfa98d1de5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java @@ -38,7 +38,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedBlockingQueue; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -176,9 +175,14 @@ private void testBase( } testHarness.waitForTaskCompletion(10_000L); - LinkedBlockingQueue output = testHarness.getOutput(); + // Filter out WatermarkStatus (e.g., FINISHED watermark status when task finishes) - this + // test focuses on selective reading behavior. + ConcurrentLinkedQueue filteredOutput = + TestHarnessUtil.filterOutWatermarkStatus(testHarness.getOutput()); + if (orderedCheck) { - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, output); + TestHarnessUtil.assertOutputEquals( + "Output was not correct.", expectedOutput, filteredOutput); } else { String[] expectedResult = expectedOutput.stream() @@ -187,7 +191,7 @@ private void testBase( Arrays.sort(expectedResult); String[] result = - output.stream() + filteredOutput.stream() .map(StreamTaskSelectiveReadingTest::elementToString) .toArray(String[]::new); Arrays.sort(result); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java index 710f465a935e4..cf5b6c4ed19c9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java @@ -430,8 +430,15 @@ void testOvertakingCheckpointBarriers() throws Exception { testHarness.processAll(); + // Filter out WatermarkStatus (e.g., FINISHED watermark status when task finishes), + // which is a task lifecycle signal (an implementation detail emitted by tasks when they + // finish), not part of the feature being tested. This test focuses on checkpoint + // barrier handling. + ConcurrentLinkedQueue actualOutput = + TestHarnessUtil.filterOutWatermarkStatus(testHarness.getOutput()); + TestHarnessUtil.assertOutputEquals( - "Output was not correct.", expectedOutput, testHarness.getOutput()); + "Output was not correct.", expectedOutput, actualOutput); // Then give the earlier barrier, these should be ignored testHarness.processEvent( @@ -452,8 +459,12 @@ void testOvertakingCheckpointBarriers() throws Exception { testHarness.waitForTaskCompletion(); + // Filter again for final comparison + ConcurrentLinkedQueue finalOutput = + TestHarnessUtil.filterOutWatermarkStatus(testHarness.getOutput()); + TestHarnessUtil.assertOutputEquals( - "Output was not correct.", expectedOutput, testHarness.getOutput()); + "Output was not correct.", expectedOutput, finalOutput); } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java index 6548608e3e40a..83f7a632c57d9 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.functions.DefaultOpenContext; import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; @@ -198,10 +199,26 @@ public void processWatermark(Watermark mark) throws Exception { // if we receive a Long.MAX_VALUE watermark we forward it since it is used // to signal the end of input and to not block watermark progress downstream if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) { + // Only emit ACTIVE when transitioning from IDLE with configured timeout + // and NOT when current status is FINISHED if (isIdlenessEnabled() && currentStatus.equals(WatermarkStatus.IDLE)) { // mark the channel active + Environment env = getContainingTask().getEnvironment(); + LOG.debug( + "WatermarkStatus change: IDLE -> ACTIVE due to MAX_WATERMARK. tmHost={}, " + + "taskName={}, subtask={}, operator={}, prevStatus={}, " + + "currentWatermark={}, upstreamWatermark={}", + env.getTaskManagerInfo().getTaskManagerExternalAddress(), + env.getTaskInfo().getTaskName(), + env.getTaskInfo().getIndexOfThisSubtask(), + this.getClass().getSimpleName(), + currentStatus, + currentWatermark, + mark.getTimestamp()); emitWatermarkStatus(WatermarkStatus.ACTIVE); } + // If current status is FINISHED, preserve it - don't emit ACTIVE + currentWatermark = Long.MAX_VALUE; output.emitWatermark(mark); }