Skip to content

Commit

Permalink
Merge pull request #131269 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.1-131244

release-24.1: changefeedccl/kvfeed: add logs for kv feed restarts
  • Loading branch information
andyyang890 authored Sep 24, 2024
2 parents d5a2de1 + 78949e4 commit fb8f162
Showing 1 changed file with 21 additions and 8 deletions.
29 changes: 21 additions & 8 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ func Run(ctx context.Context, cfg Config) error {
// provided buffer.
var scErr schemaChangeDetectedError
isChangefeedCompleted := errors.Is(err, errChangefeedCompleted)
if !(isChangefeedCompleted || errors.As(err, &scErr)) {
if !isChangefeedCompleted && !errors.As(err, &scErr) {
log.Errorf(ctx, "stopping kv feed due to error: %s", err)
// Regardless of whether we exited KV feed with or without an error, that error
// is not a schema change; so, close the writer and return.
return errors.CombineErrors(err, f.writer.CloseWithReason(ctx, err))
Expand Down Expand Up @@ -311,6 +312,8 @@ func newKVFeed(
var errChangefeedCompleted = errors.New("changefeed completed")

func (f *kvFeed) run(ctx context.Context) (err error) {
log.Infof(ctx, "kv feed run starting")

emitResolved := func(ts hlc.Timestamp, boundary jobspb.ResolvedSpan_BoundaryType) error {
for _, sp := range f.spans {
if err := f.writer.Add(ctx, kvevent.NewBackfillResolvedEvent(sp, ts, boundary)); err != nil {
Expand Down Expand Up @@ -369,9 +372,10 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
f.checkpointTimestamp = hlc.Timestamp{}
}

highWater := rangeFeedResumeFrontier.Frontier()
boundaryTS := rangeFeedResumeFrontier.Frontier()
schemaChangeTS := boundaryTS.Next()
boundaryType := jobspb.ResolvedSpan_BACKFILL
events, err := f.tableFeed.Peek(ctx, highWater.Next())
events, err := f.tableFeed.Peek(ctx, schemaChangeTS)
if err != nil {
return err
}
Expand All @@ -398,15 +402,17 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
// we should do so.
if f.schemaChangePolicy != changefeedbase.OptSchemaChangePolicyNoBackfill ||
boundaryType == jobspb.ResolvedSpan_RESTART {
if err := emitResolved(highWater, boundaryType); err != nil {
if err := emitResolved(boundaryTS, boundaryType); err != nil {
return err
}
}

// Exit if the policy says we should.
if boundaryType == jobspb.ResolvedSpan_RESTART || boundaryType == jobspb.ResolvedSpan_EXIT {
return schemaChangeDetectedError{highWater.Next()}
return schemaChangeDetectedError{ts: schemaChangeTS}
}

log.Infof(ctx, "kv feed run loop restarting because of schema change at %s", schemaChangeTS)
}
}

Expand Down Expand Up @@ -498,12 +504,14 @@ func (f *kvFeed) scanIfShould(
return nil, hlc.Timestamp{}, err
}

if !isInitialScan && f.schemaChangePolicy == changefeedbase.OptSchemaChangePolicyNoBackfill {
return spansToScan, scanTime, nil
}

// If we have initial checkpoint information specified, filter out
// spans which we no longer need to scan.
spansToBackfill := filterCheckpointSpans(spansToScan, f.checkpoint)

if (!isInitialScan && f.schemaChangePolicy == changefeedbase.OptSchemaChangePolicyNoBackfill) ||
len(spansToBackfill) == 0 {
if len(spansToBackfill) == 0 {
return spansToScan, scanTime, nil
}

Expand All @@ -530,6 +538,11 @@ func (f *kvFeed) scanIfShould(
return spansToScan, scanTime, nil
}

// runUntilTableEvent starts rangefeeds for the spans being watched by
// the kv feed and runs until a table event (schema change) is encountered.
//
// If the function returns a nil error, resumeFrontier.Frontier() will be
// ts.Prev() where ts is the schema change timestamp.
func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Frontier) (err error) {
startFrom := resumeFrontier.Frontier()

Expand Down

0 comments on commit fb8f162

Please sign in to comment.