From 883650cbe584668b9b4492cc188e5ae5c4ef8e6b Mon Sep 17 00:00:00 2001 From: Derek Wang Date: Sun, 15 Dec 2024 12:11:34 -0800 Subject: [PATCH] chore: always publish watermark to sink OT (#2288) --- pkg/sinks/forward/forward.go | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/pkg/sinks/forward/forward.go b/pkg/sinks/forward/forward.go index ac60ecddf..15b14daf3 100644 --- a/pkg/sinks/forward/forward.go +++ b/pkg/sinks/forward/forward.go @@ -230,7 +230,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) error { return nil } - // if the validation passed, we will publish the watermark to all the toBuffer partitions. + // if the validation passed, we will publish the idle watermark to SINK OT even though we do not use it today. idlehandler.PublishIdleWatermark(ctx, df.sinkWriter.GetPartitionIdx(), df.sinkWriter, df.wmPublisher, df.idleManager, df.opts.logger, df.vertexName, df.pipelineName, dfv1.VertexTypeSink, df.vertexReplica, wmb.Watermark(time.UnixMilli(processorWMB.Watermark))) return nil } @@ -271,7 +271,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) error { } // write the messages to the sink - writeOffsets, fallbackMessages, err := df.writeToSink(ctx, df.sinkWriter, writeMessages, false) + _, fallbackMessages, err := df.writeToSink(ctx, df.sinkWriter, writeMessages, false) // error will not be nil only when we get ctx.Done() if err != nil { df.opts.logger.Errorw("failed to write to sink", zap.Error(err)) @@ -292,19 +292,13 @@ func (df *DataForward) forwardAChunk(ctx context.Context) error { } } - // FIXME: offsets are not supported for sink, so len(writeOffsets) > 0 will always fail - // in sink we don't drop any messages - // so len(dataMessages) should be the same as len(writeOffsets) - // if len(writeOffsets) is greater than 0, publish normal watermark - // if len(writeOffsets) is 0, meaning we only have control messages, - // we should not publish anything: the next len(readMessage) check will handle this idling situation - if len(writeOffsets) > 0 { - df.wmPublisher.PublishWatermark(processorWM, nil, int32(0)) - // reset because the toBuffer is no longer idling - df.idleManager.MarkActive(df.fromBufferPartition.GetPartitionIdx(), df.sinkWriter.GetName()) - } + // Always publish the watermark to SINK OT even though we do not use it today. + // There's no offset returned from sink writer. + df.wmPublisher.PublishWatermark(processorWM, nil, int32(0)) + // reset because the toBuffer is no longer idling + df.idleManager.MarkActive(df.fromBufferPartition.GetPartitionIdx(), df.sinkWriter.GetName()) - df.opts.logger.Debugw("write to sink completed") + df.opts.logger.Debugw("Write to sink completed") ackStart := time.Now() err = df.ackFromBuffer(ctx, readOffsets)