From aba23e60746f7a3ea0fc6621ff311b90a2e62726 Mon Sep 17 00:00:00 2001 From: Andy Yang Date: Fri, 20 Sep 2024 12:57:58 -0400 Subject: [PATCH] changefeedccl/kvfeed: add logs for kv feed restarts This patch adds some additional logs for when a kv feed starts running and when it restarts its internal loop due to a schema change. Release note: None --- pkg/ccl/changefeedccl/kvfeed/kv_feed.go | 29 ++++++++++++++++++------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 6bb2855fcdd0..6a4f25927359 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -143,7 +143,8 @@ func Run(ctx context.Context, cfg Config) error { // provided buffer. var scErr schemaChangeDetectedError isChangefeedCompleted := errors.Is(err, errChangefeedCompleted) - if !(isChangefeedCompleted || errors.As(err, &scErr)) { + if !isChangefeedCompleted && !errors.As(err, &scErr) { + log.Errorf(ctx, "stopping kv feed due to error: %s", err) // Regardless of whether we exited KV feed with or without an error, that error // is not a schema change; so, close the writer and return. return errors.CombineErrors(err, f.writer.CloseWithReason(ctx, err)) @@ -311,6 +312,8 @@ func newKVFeed( var errChangefeedCompleted = errors.New("changefeed completed") func (f *kvFeed) run(ctx context.Context) (err error) { + log.Infof(ctx, "kv feed run starting") + emitResolved := func(ts hlc.Timestamp, boundary jobspb.ResolvedSpan_BoundaryType) error { for _, sp := range f.spans { if err := f.writer.Add(ctx, kvevent.NewBackfillResolvedEvent(sp, ts, boundary)); err != nil { @@ -369,9 +372,10 @@ func (f *kvFeed) run(ctx context.Context) (err error) { f.checkpointTimestamp = hlc.Timestamp{} } - highWater := rangeFeedResumeFrontier.Frontier() + boundaryTS := rangeFeedResumeFrontier.Frontier() + schemaChangeTS := boundaryTS.Next() boundaryType := jobspb.ResolvedSpan_BACKFILL - events, err := f.tableFeed.Peek(ctx, highWater.Next()) + events, err := f.tableFeed.Peek(ctx, schemaChangeTS) if err != nil { return err } @@ -398,15 +402,17 @@ func (f *kvFeed) run(ctx context.Context) (err error) { // we should do so. if f.schemaChangePolicy != changefeedbase.OptSchemaChangePolicyNoBackfill || boundaryType == jobspb.ResolvedSpan_RESTART { - if err := emitResolved(highWater, boundaryType); err != nil { + if err := emitResolved(boundaryTS, boundaryType); err != nil { return err } } // Exit if the policy says we should. if boundaryType == jobspb.ResolvedSpan_RESTART || boundaryType == jobspb.ResolvedSpan_EXIT { - return schemaChangeDetectedError{highWater.Next()} + return schemaChangeDetectedError{ts: schemaChangeTS} } + + log.Infof(ctx, "kv feed run loop restarting because of schema change at %s", schemaChangeTS) } } @@ -498,12 +504,14 @@ func (f *kvFeed) scanIfShould( return nil, hlc.Timestamp{}, err } + if !isInitialScan && f.schemaChangePolicy == changefeedbase.OptSchemaChangePolicyNoBackfill { + return spansToScan, scanTime, nil + } + // If we have initial checkpoint information specified, filter out // spans which we no longer need to scan. spansToBackfill := filterCheckpointSpans(spansToScan, f.checkpoint) - - if (!isInitialScan && f.schemaChangePolicy == changefeedbase.OptSchemaChangePolicyNoBackfill) || - len(spansToBackfill) == 0 { + if len(spansToBackfill) == 0 { return spansToScan, scanTime, nil } @@ -530,6 +538,11 @@ func (f *kvFeed) scanIfShould( return spansToScan, scanTime, nil } +// runUntilTableEvent starts rangefeeds for the spans being watched by +// the kv feed and runs until a table event (schema change) is encountered. +// +// If the function returns a nil error, resumeFrontier.Frontier() will be +// ts.Prev() where ts is the schema change timestamp. func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Frontier) (err error) { startFrom := resumeFrontier.Frontier()