diff --git a/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go b/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go index 9c2c3a492409..c7c9fa09f707 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go @@ -319,6 +319,7 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) { lrw.subscription = sub lrw.workerGroup.GoCtx(func(_ context.Context) error { if err := sub.Subscribe(subscriptionCtx); err != nil { + log.Infof(lrw.Ctx(), "subscription completed. Error: %s", err) lrw.sendError(errors.Wrap(err, "subscription")) } return nil @@ -326,6 +327,7 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) { lrw.workerGroup.GoCtx(func(ctx context.Context) error { defer close(lrw.checkpointCh) if err := lrw.consumeEvents(ctx); err != nil { + log.Infof(lrw.Ctx(), "consumer completed. Error: %s", err) lrw.sendError(errors.Wrap(err, "consume events")) } return nil @@ -391,10 +393,10 @@ func (lrw *logicalReplicationWriterProcessor) ConsumerClosed() { func (lrw *logicalReplicationWriterProcessor) close() { streampb.UnregisterActiveLogicalConsumerStatus(&lrw.debug) - if lrw.Closed { return } + log.Infof(lrw.Ctx(), "logical replication writer processor closing") defer lrw.frontier.Release() if lrw.streamPartitionClient != nil { @@ -443,12 +445,19 @@ func (lrw *logicalReplicationWriterProcessor) sendError(err error) { // the event channel has closed. func (lrw *logicalReplicationWriterProcessor) consumeEvents(ctx context.Context) error { before := timeutil.Now() + lastLog := timeutil.Now() for event := range lrw.subscription.Events() { lrw.debug.RecordRecv(timeutil.Since(before)) before = timeutil.Now() if err := lrw.handleEvent(ctx, event); err != nil { return err } + if timeutil.Since(lastLog) > 5*time.Minute { + lastLog = timeutil.Now() + if !lrw.frontier.Frontier().GoTime().After(timeutil.Now().Add(-5 * time.Minute)) { + log.Infof(lrw.Ctx(), "lagging frontier: %s with span %s", lrw.frontier.Frontier(), lrw.frontier.PeekFrontierSpan()) + } + } } return lrw.subscription.Err() }