diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 07c3fb55ae1..3e87a9fd98a 100755 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -956,8 +956,6 @@ func (c *changefeed) preflightCheck(captures map[model.CaptureID]*model.CaptureI // barrierTs is used to control the data that can be flush to downstream. func (c *changefeed) handleBarrier(ctx cdcContext.Context, barrier *schedulepb.BarrierWithMinTs) error { barrierTp, barrierTs := c.barriers.Min() - c.metricsChangefeedBarrierTsGauge.Set(float64(oracle.ExtractPhysical(barrierTs))) - // It means: // 1. All data before the barrierTs was sent to downstream. // 2. No more data after barrierTs was sent to downstream. @@ -998,6 +996,8 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context, barrier *schedulepb.B barrier.MinTableBarrierTs = barrierTs } + // MinTableBarrierTs is always the next barrier that blocking the global resolvedTs. + c.metricsChangefeedBarrierTsGauge.Set(float64(oracle.ExtractPhysical(barrier.MinTableBarrierTs))) return nil }