Skip to content

Commit

Permalink
Merge pull request #135170 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.2-135142

release-24.2: ttljob: log progress in TTL job
  • Loading branch information
rafiss authored Nov 15, 2024
2 parents 8b60dec + c5f906f commit 70c2086
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 14 deletions.
12 changes: 8 additions & 4 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1244,17 +1244,21 @@ message RowLevelTTLDetails {

message RowLevelTTLProgress {

// JobRowCount is the number of deleted rows for the entire TTL job.
int64 job_row_count = 1;
// JobDeletedRowCount is the number of rows deleted by TTL job so far.
int64 job_deleted_row_count = 1;

// ProcessorProgresses is the progress per DistSQL processor.
repeated RowLevelTTLProcessorProgress processor_progresses = 2 [(gogoproto.nullable)=false];

// UseDistSQL is no longer used in v23.1+ as all TTL jobs are using DistSQL.
reserved 3;

// JobSpanCount is the number of spans for the entire TTL job.
int64 job_span_count = 4;
// JobTotalSpanCount is the number of spans for the entire TTL job.
int64 job_total_span_count = 4;

// JobProcessedSpanCount is the number of spans that have been processed by
// the TTL job so far.
int64 job_processed_span_count = 5;
}

message RowLevelTTLProcessorProgress {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/ttl/ttljob/ttljob.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) (re
func(_ isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
progress := md.Progress
rowLevelTTL := progress.Details.(*jobspb.Progress_RowLevelTTL).RowLevelTTL
rowLevelTTL.JobSpanCount = int64(jobSpanCount)
rowLevelTTL.JobTotalSpanCount = int64(jobSpanCount)
ju.UpdateProgress(progress)
return nil
},
Expand Down
44 changes: 38 additions & 6 deletions pkg/sql/ttl/ttljob/ttljob_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,21 @@ func (t *ttlProcessor) work(ctx context.Context) error {
if processorSpanCount < processorConcurrency {
processorConcurrency = processorSpanCount
}
var processorRowCount int64
var rowsDeletedSoFar atomic.Int64
var spansProccessedSoFar atomic.Int64

// Log progress approximately every 1% of spans processed.
updateEvery := max(1, processorSpanCount/100)
logProgress := func() error {
processorID := t.ProcessorID
log.Infof(
ctx,
"TTL progress processorID=%d tableID=%d deletedRowCount=%d processedSpanCountForProcessor=%d totalSpanCountForProcessor=%d",
processorID, tableID, rowsDeletedSoFar.Load(), spansProccessedSoFar.Load(), processorSpanCount,
)
return nil
}

err = func() error {
boundsChan := make(chan QueryBounds, processorConcurrency)
defer close(boundsChan)
Expand Down Expand Up @@ -205,7 +219,8 @@ func (t *ttlProcessor) work(ctx context.Context) error {
deleteBuilder,
)
// add before returning err in case of partial success
atomic.AddInt64(&processorRowCount, spanRowCount)
rowsDeletedSoFar.Add(spanRowCount)
spansProccessedSoFar.Add(1)
if err != nil {
// Continue until channel is fully read.
// Otherwise, the keys input will be blocked.
Expand Down Expand Up @@ -238,6 +253,15 @@ func (t *ttlProcessor) work(ctx context.Context) error {
} else if hasRows {
// Only process bounds from spans with rows inside them.
boundsChan <- bounds
} else {
// If the span has no rows, we still need to increment the processed
// count.
spansProccessedSoFar.Add(1)
}
if spansProccessedSoFar.Load() >= updateEvery {
if err := logProgress(); err != nil {
return err
}
}
}
return nil
Expand All @@ -249,6 +273,9 @@ func (t *ttlProcessor) work(ctx context.Context) error {
if err := group.Wait(); err != nil {
return err
}
if err := logProgress(); err != nil {
return err
}

sqlInstanceID := flowCtx.NodeID.SQLInstanceID()
jobID := ttlSpec.JobID
Expand All @@ -259,21 +286,26 @@ func (t *ttlProcessor) work(ctx context.Context) error {
func(_ isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
progress := md.Progress
rowLevelTTL := progress.Details.(*jobspb.Progress_RowLevelTTL).RowLevelTTL
rowLevelTTL.JobRowCount += processorRowCount
processorID := t.ProcessorID
fractionCompleted := float32(processorSpanCount) / float32(rowLevelTTL.JobTotalSpanCount)
progress.Progress = &jobspb.Progress_FractionCompleted{
FractionCompleted: fractionCompleted,
}
rowLevelTTL.JobDeletedRowCount += rowsDeletedSoFar.Load()
rowLevelTTL.JobProcessedSpanCount += spansProccessedSoFar.Load()
rowLevelTTL.ProcessorProgresses = append(rowLevelTTL.ProcessorProgresses, jobspb.RowLevelTTLProcessorProgress{
ProcessorID: processorID,
SQLInstanceID: sqlInstanceID,
ProcessorRowCount: processorRowCount,
ProcessorRowCount: rowsDeletedSoFar.Load(),
ProcessorSpanCount: processorSpanCount,
ProcessorConcurrency: processorConcurrency,
})
ju.UpdateProgress(progress)
log.VInfof(
ctx,
2, /* level */
"TTL processorRowCount updated jobID=%d processorID=%d sqlInstanceID=%d tableID=%d jobRowCount=%d processorRowCount=%d",
jobID, processorID, sqlInstanceID, tableID, rowLevelTTL.JobRowCount, processorRowCount,
"TTL processorRowCount updated processorID=%d sqlInstanceID=%d tableID=%d jobRowCount=%d processorRowCount=%d fractionCompleted=%.3f",
processorID, sqlInstanceID, tableID, rowLevelTTL.JobDeletedRowCount, rowsDeletedSoFar.Load(), fractionCompleted,
)
return nil
},
Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/ttl/ttljob/ttljob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (h *rowLevelTTLTestJobTestHelper) verifyExpiredRowsJobOnly(
var progress jobspb.Progress
require.NoError(t, protoutil.Unmarshal(progressBytes, &progress))

actualNumExpiredRows := progress.UnwrapDetails().(jobspb.RowLevelTTLProgress).JobRowCount
actualNumExpiredRows := progress.UnwrapDetails().(jobspb.RowLevelTTLProgress).JobDeletedRowCount
require.Equal(t, int64(expectedNumExpiredRows), actualNumExpiredRows)
jobCount++
}
Expand Down Expand Up @@ -270,8 +270,9 @@ func (h *rowLevelTTLTestJobTestHelper) verifyExpiredRows(
require.Equal(t, expectedProcessorRowCount, processorProgress.ProcessorRowCount)
expectedJobRowCount += expectedProcessorRowCount
}
require.Equal(t, expectedJobSpanCount, rowLevelTTLProgress.JobSpanCount)
require.Equal(t, expectedJobRowCount, rowLevelTTLProgress.JobRowCount)
require.Equal(t, expectedJobSpanCount, rowLevelTTLProgress.JobProcessedSpanCount)
require.Equal(t, expectedJobSpanCount, rowLevelTTLProgress.JobTotalSpanCount)
require.Equal(t, expectedJobRowCount, rowLevelTTLProgress.JobDeletedRowCount)
jobCount++
}
require.Equal(t, 1, jobCount)
Expand Down

0 comments on commit 70c2086

Please sign in to comment.