diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job.go b/pkg/ccl/crosscluster/logical/logical_replication_job.go index 36228eb3100e..74bed8ed40cc 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job.go @@ -540,8 +540,6 @@ func (rh *rowHandler) handleRow(ctx context.Context, row tree.Datums) error { case <-ctx.Done(): return ctx.Err() } - - rh.metrics.ReplicatedTimeSeconds.Update(replicatedTime.GoTime().Unix()) return nil } @@ -601,8 +599,6 @@ func (r *logicalReplicationResumer) OnFailOrCancel( ctx context.Context, execCtx interface{}, _ error, ) error { execCfg := execCtx.(sql.JobExecContext).ExecCfg() - metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics) - metrics.ReplicatedTimeSeconds.Update(0) // Remove the LDR job ID from the destination table descriptors. details := r.job.Details().(jobspb.LogicalReplicationDetails) @@ -689,6 +685,7 @@ func getRetryPolicy(knobs *sql.StreamingTestingKnobs) retry.Options { } func init() { + m := MakeMetrics(base.DefaultHistogramWindowInterval()) jobs.RegisterConstructor( jobspb.TypeLogicalReplication, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer { @@ -696,7 +693,8 @@ func init() { job: job, } }, - jobs.WithJobMetrics(MakeMetrics(base.DefaultHistogramWindowInterval())), + jobs.WithJobMetrics(m), + jobs.WithResolvedMetric(m.(*Metrics).ReplicatedTimeSeconds), jobs.UsesTenantCostControl, ) }