Skip to content

Conversation

weiqingy
Copy link

@weiqingy weiqingy commented Oct 5, 2025

What is the purpose of the change

This PR fixes a watermark aggregation bug that can cause zero output from time-based operators (e.g., IntervalJoin) when some upstream subtasks finish while others later become temporarily idle.

Details of the issue are described in FLINK-38477.

Root cause:
Flink currently allows finished subtasks to dominate watermark aggregation by emitting Long.MAX_VALUE instead of being excluded like idle inputs. This prematurely advances downstream watermarks to infinity (Long.MAX_VALUE), halting event-time progress and eliminating output.

Brief change log

Goal: Ensure a finished input is excluded from operator watermark aggregation, just like an idle input. This prevents Long.MAX_VALUE from dominating aggregation when some channels are finished and others are idle.

Implementation

  • Introduced a new watermark status: FINISHED.
  • When an operator finishes, it emits:
    • status = FINISHED
    • watermark = Long.MAX_VALUE if the watermarks of all input channels are already Long.MAX_VALUE.
  • Adjusted aggregation rules:
    • If there are active channelsstatus = ACTIVE, watermark = min(all active channel watermarks).
    • Else if there are idle channelsstatus = IDLE, watermark = max(all idle channel watermarks).
    • Else (all channels finished) → status = FINISHED, watermark = Long.MAX_VALUE.

This ensures finished inputs are ignored during aggregation until every input has completed, at which point the operator watermark correctly advances to Long.MAX_VALUE.

Core issues identified

  • Current state model: ACTIVE ←→ IDLE

    • Cannot distinguish between:
      • IDLE = “temporarily no data, may resume”
      • FINISHED = “permanently done, never resuming”
  • Rejected approach: treating finished as idle.

    • Issue 1: IDLE status gets overwritten

      • StreamTask.endData() emits WatermarkStatus.IDLE when a source finishs.
      • Then operatorChain.finishOperators() calls WatermarkAssignerOperator.processWatermark().
      • When WatermarkAssignerOperator receives Long.MAX_VALUE, it forcibly emits WatermarkStatus.ACTIVE.
      • Result: finished channels are incorrectly marked as ACTIVE, so the finished tasks still participate in watermark aggregation.
    • Issue 2: Incorrect aggregation when all channels are IDLE

      • Even if Issue 1 is fixed, when all channels become IDLE (including finished tasks whose watermark is Long.MAX_VALUE):
      • StatusWatermarkValve calls findAndOutputMaxWatermarkAcrossAllChannels().
      • This still emits a Long.MAX_VALUE watermark.
      • It cannot distinguish between “all temporarily idle” vs. “all permanently finished.”

Conclusion:

  • Introducing the explicit FINISHED status is necessary to create a three-state WatermarkStatus (ACTIVE / IDLE / FINISHED) that properly distinguishes temporary idleness from permanent completion.

Long-term refactoring (future follow-up)

This PR fixes the immediate bug, but the broader problem is fragmented watermark lifecycle management.

Current problems

  • Multiple, conflicting emission points: For example:
StreamTask.endData():
  ├─ advanceToEndOfEventTime()
  │   └─ emits Long.MAX_VALUE watermark
  │
  └─ operatorChain.finishOperators():
      ├─ ContinuousFileReaderOperator.finish()
      │   └─ emits Long.MAX_VALUE (deprecated?)
      │
      ├─ WatermarkAssignerOperator.finish()
      │    ├─ switches IDLE → ACTIVE if idleness enabled 
      │    └─ emits Long.MAX_VALUE watermark
      └─ ...
  • Wrapper operators interfere with completion semantics.
  • Sources don’t fully control their own lifecycle, at least from the perspective of watermark and watermark status handling.

Proposed long-term direction

  • Source-owned completion and embedded watermark strategy:
    Sources should declare when they are active, idle, or finished, and emit both status and watermark atomically. Wrapper operators should not override this behavior.

    • Current Architecture:
      Source → WatermarkAssignerOperator (wrapper) → Downstream
               ↑ Adds watermark strategy
               ↑ Can conflict with source completion
      
    • Benefits:
      • No wrapper operator to conflict with source status.
      • Source and watermark strategy have consistent lifecycle.
      • Table API can configure watermark strategy at source creation, not via wrapper.

This refactoring would prevent the entire class of bugs we've been fixing and make watermark completion semantics clear and maintainable.

Verifying this change

Unit tests (new)

  • Test that FINISHED channels are excluded from min watermark calculation, allowing remaining channels to advance the watermark.
  • Test that FINISHED channels are excluded from max watermark calculation when all other channels become IDLE.
  • Test that FINISHED channels reject non-MAX_VALUE watermarks, accept Long.MAX_VALUE, and properly aggregate across channels to emit Long.MAX_VALUE once when all FINISHED channels receive it.
  • Test that FINISHED is a terminal state and channels cannot transition from FINISHED back to ACTIVE or IDLE.
  • Tests ACTIVE to IDLE and ACTIVE to FINISHED transitions in various configurations, including global status changes
  • Tests IDLE to ACTIVE transitions with both aligned and unaligned watermarks, verifying realignment behavior
  • Tests IDLE to FINISHED transitions in various configurations, verifying global status changes and watermark progression.
  • Aggregation rule selection verified:
    • Active → min(active watermarks).
    • Idle (with no active) → idle status; finished excluded.
    • All finished → operator watermark = Long.MAX_VALUE.

Manual/e2e tests

  1. Set job parallelism greater than Kafka partitions; disable dynamic partition discovery.
  2. Some subtasks finish immediately, others keep running.
  3. Stall the running subtasks until they become idle (table.exec.source.idle-timeout=10s).
  4. Before fix: operator watermark = Long.MAX_VALUE, all records dropped → no output.
    After fix: watermark does not jump; resumed records are processed correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies: no
  • Public API (@Public(Evolving)): no
  • Serializers: no
  • Runtime per-record code paths: yes (watermark aggregation; minimal guarded changes)
  • Deployment/recovery (JM, checkpointing, K8s/Yarn, ZooKeeper): no
  • S3 file system connector: no

Documentation

  • New feature: no (bug fix only; introduces internal FINISHED watermark status)
  • Documentation: not applicable

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 5, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@weiqingy
Copy link
Author

weiqingy commented Oct 7, 2025

@flinkbot run azure

}
return;
}
// Other watermark statuses (ACTIVE, IDLE) should not occur for finished tasks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits:

  • you could could fail fast an issue the Exception if (!watermarkStatus.isFinished()) {.throw }
    then you would not need the return.
  • can we put out an identifier of the task in the Exception to help with diagnostics.

* override this method because they rely on StatusWatermarkValve to aggregate FINISHED status
* from upstream inputs and propagate it downstream automatically.
*/
protected void emitFinishedStatus() {
Copy link
Contributor

@davidradl davidradl Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we arrange the class hierarchy so that emitFinishedStatus() is only overridable for sources?

protected void emitFinishedStatusToOutputs() {
try {
if (operatorChain != null) {
RecordWriterOutput<?>[] streamOutputs = operatorChain.getStreamOutputs();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have no stream outputs? if so we could issue another LOG.warn("Cannot emit FINISHED watermark status: operator chain has no stream outputs");)

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Oct 8, 2025

public static final int IDLE_STATUS = -1;
public static final int ACTIVE_STATUS = 0;
public static final int FINISHED_STATUS = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we turn this into an enum?


public WatermarkStatus(int status) {
if (status != IDLE_STATUS && status != ACTIVE_STATUS) {
if (status != IDLE_STATUS && status != ACTIVE_STATUS && status != FINISHED_STATUS) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can we get into this situation?

private final int inputCount;

private int watermarksSeen = 0;
private int finishedStatusSeen = 0;
Copy link
Contributor

@davidradl davidradl Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious on you thinking on how this new status effects https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream-v2/watermark/#emit-watermark customizations.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants