From 94730d75addaadbc23a0e15d6815fd475806b875 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Thu, 19 Sep 2024 13:09:33 -0400 Subject: [PATCH] crosscluster/physical: use correct lag replan cluster setting Previously the `stream_replication.replan_flow_frequency` setting determined the frequency that the frontier processor checked for lagging nodes, not the `stream_replication.lag_check_frequency` setting, as intended. The latter didn't control anything. This patch fixes this. Epic: none Release note: none --- .../physical/stream_ingestion_frontier_processor.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/ccl/crosscluster/physical/stream_ingestion_frontier_processor.go b/pkg/ccl/crosscluster/physical/stream_ingestion_frontier_processor.go index 74d518bc52cf..ed36848a9a5b 100644 --- a/pkg/ccl/crosscluster/physical/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/crosscluster/physical/stream_ingestion_frontier_processor.go @@ -435,11 +435,7 @@ func (sf *streamIngestionFrontier) maybePersistFrontierEntries() error { func (sf *streamIngestionFrontier) maybeCheckForLaggingNodes() error { ctx := sf.Ctx() - - // We halve the frequency relative to the ReplanFrequency setting (i.e. - // check twice as often), because the node lag checker will only restart the - // distSQL plan if a node is lagging for 2 checks in a row. - checkFreq := crosscluster.ReplanFrequency.Get(&sf.FlowCtx.Cfg.Settings.SV) / 2 + checkFreq := crosscluster.LagCheckFrequency.Get(&sf.FlowCtx.Cfg.Settings.SV) maxLag := crosscluster.InterNodeLag.Get(&sf.FlowCtx.Cfg.Settings.SV) if sf.persistedReplicatedTime.IsEmpty() { log.VEvent(ctx, 2, "skipping lag replanning check: no persisted replicated time")