Skip to content

Commit

Permalink
crosscluster/logical: use poller-collection for replicated time
Browse files Browse the repository at this point in the history
Release note: none.
Epic: none.
  • Loading branch information
dt committed Sep 17, 2024
1 parent 3877686 commit 00c4c06
Showing 1 changed file with 3 additions and 5 deletions.
8 changes: 3 additions & 5 deletions pkg/ccl/crosscluster/logical/logical_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,8 +540,6 @@ func (rh *rowHandler) handleRow(ctx context.Context, row tree.Datums) error {
case <-ctx.Done():
return ctx.Err()
}

rh.metrics.ReplicatedTimeSeconds.Update(replicatedTime.GoTime().Unix())
return nil
}

Expand Down Expand Up @@ -601,8 +599,6 @@ func (r *logicalReplicationResumer) OnFailOrCancel(
ctx context.Context, execCtx interface{}, _ error,
) error {
execCfg := execCtx.(sql.JobExecContext).ExecCfg()
metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics)
metrics.ReplicatedTimeSeconds.Update(0)

// Remove the LDR job ID from the destination table descriptors.
details := r.job.Details().(jobspb.LogicalReplicationDetails)
Expand Down Expand Up @@ -689,14 +685,16 @@ func getRetryPolicy(knobs *sql.StreamingTestingKnobs) retry.Options {
}

func init() {
m := MakeMetrics(base.DefaultHistogramWindowInterval())
jobs.RegisterConstructor(
jobspb.TypeLogicalReplication,
func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return &logicalReplicationResumer{
job: job,
}
},
jobs.WithJobMetrics(MakeMetrics(base.DefaultHistogramWindowInterval())),
jobs.WithJobMetrics(m),
jobs.WithResolvedMetric(m.(*Metrics).ReplicatedTimeSeconds),
jobs.UsesTenantCostControl,
)
}

0 comments on commit 00c4c06

Please sign in to comment.