Skip to content

Commit

Permalink
chore: always publish watermark to sink OT (#2288)
Browse files Browse the repository at this point in the history
  • Loading branch information
whynowy authored Dec 15, 2024
1 parent a32ebbc commit 883650c
Showing 1 changed file with 8 additions and 14 deletions.
22 changes: 8 additions & 14 deletions pkg/sinks/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) error {
return nil
}

// if the validation passed, we will publish the watermark to all the toBuffer partitions.
// if the validation passed, we will publish the idle watermark to SINK OT even though we do not use it today.
idlehandler.PublishIdleWatermark(ctx, df.sinkWriter.GetPartitionIdx(), df.sinkWriter, df.wmPublisher, df.idleManager, df.opts.logger, df.vertexName, df.pipelineName, dfv1.VertexTypeSink, df.vertexReplica, wmb.Watermark(time.UnixMilli(processorWMB.Watermark)))
return nil
}
Expand Down Expand Up @@ -271,7 +271,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) error {
}

// write the messages to the sink
writeOffsets, fallbackMessages, err := df.writeToSink(ctx, df.sinkWriter, writeMessages, false)
_, fallbackMessages, err := df.writeToSink(ctx, df.sinkWriter, writeMessages, false)
// error will not be nil only when we get ctx.Done()
if err != nil {
df.opts.logger.Errorw("failed to write to sink", zap.Error(err))
Expand All @@ -292,19 +292,13 @@ func (df *DataForward) forwardAChunk(ctx context.Context) error {
}
}

// FIXME: offsets are not supported for sink, so len(writeOffsets) > 0 will always fail
// in sink we don't drop any messages
// so len(dataMessages) should be the same as len(writeOffsets)
// if len(writeOffsets) is greater than 0, publish normal watermark
// if len(writeOffsets) is 0, meaning we only have control messages,
// we should not publish anything: the next len(readMessage) check will handle this idling situation
if len(writeOffsets) > 0 {
df.wmPublisher.PublishWatermark(processorWM, nil, int32(0))
// reset because the toBuffer is no longer idling
df.idleManager.MarkActive(df.fromBufferPartition.GetPartitionIdx(), df.sinkWriter.GetName())
}
// Always publish the watermark to SINK OT even though we do not use it today.
// There's no offset returned from sink writer.
df.wmPublisher.PublishWatermark(processorWM, nil, int32(0))
// reset because the toBuffer is no longer idling
df.idleManager.MarkActive(df.fromBufferPartition.GetPartitionIdx(), df.sinkWriter.GetName())

df.opts.logger.Debugw("write to sink completed")
df.opts.logger.Debugw("Write to sink completed")

ackStart := time.Now()
err = df.ackFromBuffer(ctx, readOffsets)
Expand Down

0 comments on commit 883650c

Please sign in to comment.