Skip to content

Commit

Permalink
Merge #134721
Browse files Browse the repository at this point in the history
134721: crosscluster/logical: don't set fraction once resolved ts exists r=dt a=dt

Release note: none.
Epic: none.

Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
craig[bot] and dt committed Nov 9, 2024
2 parents 820bcfa + 879a89a commit 8f5366d
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 12 deletions.
7 changes: 3 additions & 4 deletions pkg/ccl/crosscluster/logical/logical_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,17 +542,16 @@ func (rh *rowHandler) handleRow(ctx context.Context, row tree.Datums) error {

// TODO (msbutler): add ldr initial and lagging range timeseries metrics.
aggRangeStats, fractionCompleted, status := rh.rangeStats.RollupStats()
progress.RunningStatus = status

if rh.replicatedTimeAtStart.Less(replicatedTime) {
if replicatedTime.IsSet() {
prog.ReplicatedTime = replicatedTime
// The HighWater is for informational purposes
// only.
progress.Progress = &jobspb.Progress_HighWater{
HighWater: &replicatedTime,
}
}
progress.RunningStatus = status
if fractionCompleted > 0 && fractionCompleted < 1 {
} else if fractionCompleted > 0 && fractionCompleted < 1 {
// If 0, the coordinator has not gotten a complete range stats update
// from all nodes yet.
//
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/crosscluster/logical/range_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (r *rangeStatsByProcessorID) RollupStats() (streampb.StreamEvent_RangeStats
if total.LaggingRangeCount != 0 {
return total, fractionCompleted, fmt.Sprintf("catching up on %d out of %d ranges", total.LaggingRangeCount, total.RangeCount)
}
return total, 1, fmt.Sprintf("all %d ranges are caught up", total.RangeCount)
return total, 1, ""
}

func newRangeStatsCollector(processorCount int) rangeStatsByProcessorID {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/crosscluster/logical/range_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestRangeStats(t *testing.T) {
RangeCount: 10,
},
fraction: 1,
expectedMsg: "all 10 ranges are caught up",
expectedMsg: "",
inputStats: map[int32]*streampb.StreamEvent_RangeStats{
1: {RangeCount: 5},
2: {RangeCount: 3},
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/crosscluster/producer/range_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func startStatsPoller(
g: ctxgroup.WithContext(ctx),
}
poller.g.GoCtx(func(ctx context.Context) error {
timer := time.NewTimer(interval)
defer timer.Stop()
tick := time.NewTicker(interval)
defer tick.Stop()
for {
stats, err := computeRangeStats(ctx, spans, frontier, ranges)
if err != nil {
Expand All @@ -59,7 +59,7 @@ func startStatsPoller(
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
case <-tick.C:
//continue
}
}
Expand All @@ -85,14 +85,14 @@ func computeRangeStats(
frontier span.Frontier,
ranges rangedesc.IteratorFactory,
) (streampb.StreamEvent_RangeStats, error) {
now := timeutil.Now()
var stats streampb.StreamEvent_RangeStats
for _, initialSpan := range spans {
lazyIterator, err := ranges.NewLazyIterator(ctx, initialSpan, 100)
if err != nil {
return streampb.StreamEvent_RangeStats{}, err
}
for ; lazyIterator.Valid(); lazyIterator.Next() {
now := timeutil.Now()
rangeSpan := roachpb.Span{
Key: lazyIterator.CurRangeDescriptor().StartKey.AsRawKey(),
EndKey: lazyIterator.CurRangeDescriptor().EndKey.AsRawKey(),
Expand All @@ -104,7 +104,7 @@ func computeRangeStats(
if timestamp.IsEmpty() {
stats.ScanningRangeCount += 1
return span.StopMatch
} else if timestamp.GoTime().Before(now.Add(-laggingSpanThreshold)) {
} else if now.Sub(timestamp.GoTime()) > laggingSpanThreshold {
stats.LaggingRangeCount += 1
return span.StopMatch
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ui/workspaces/cluster-ui/src/jobs/util/jobOptions.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ function jobToVisualForReplicationIngestion(job: Job): JobStatusVisual {
if (job.fraction_completed > 0 && job.status === JOB_STATUS_RUNNING) {
return JobStatusVisual.ProgressBarWithDuration;
}
return JobStatusVisual.BadgeOnly;
return JobStatusVisual.BadgeWithMessage;
}

export const JOB_STATUS_SUCCEEDED = "succeeded";
Expand Down

0 comments on commit 8f5366d

Please sign in to comment.