Skip to content

Commit

Permalink
crosscluster/logical: add more processor logging
Browse files Browse the repository at this point in the history
Epic: none

Release note: none
  • Loading branch information
msbutler committed Sep 25, 2024
1 parent d644373 commit 452a3a8
Showing 1 changed file with 10 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,15 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) {
lrw.subscription = sub
lrw.workerGroup.GoCtx(func(_ context.Context) error {
if err := sub.Subscribe(subscriptionCtx); err != nil {
log.Infof(lrw.Ctx(), "subscription completed. Error: %s", err)
lrw.sendError(errors.Wrap(err, "subscription"))
}
return nil
})
lrw.workerGroup.GoCtx(func(ctx context.Context) error {
defer close(lrw.checkpointCh)
if err := lrw.consumeEvents(ctx); err != nil {
log.Infof(lrw.Ctx(), "consumer completed. Error: %s", err)
lrw.sendError(errors.Wrap(err, "consume events"))
}
return nil
Expand Down Expand Up @@ -391,10 +393,10 @@ func (lrw *logicalReplicationWriterProcessor) ConsumerClosed() {

func (lrw *logicalReplicationWriterProcessor) close() {
streampb.UnregisterActiveLogicalConsumerStatus(&lrw.debug)

if lrw.Closed {
return
}
log.Infof(lrw.Ctx(), "logical replication writer processor closing")
defer lrw.frontier.Release()

if lrw.streamPartitionClient != nil {
Expand Down Expand Up @@ -443,12 +445,19 @@ func (lrw *logicalReplicationWriterProcessor) sendError(err error) {
// the event channel has closed.
func (lrw *logicalReplicationWriterProcessor) consumeEvents(ctx context.Context) error {
before := timeutil.Now()
lastLog := timeutil.Now()
for event := range lrw.subscription.Events() {
lrw.debug.RecordRecv(timeutil.Since(before))
before = timeutil.Now()
if err := lrw.handleEvent(ctx, event); err != nil {
return err
}
if timeutil.Since(lastLog) > 5*time.Minute {
lastLog = timeutil.Now()
if !lrw.frontier.Frontier().GoTime().After(timeutil.Now().Add(-5 * time.Minute)) {
log.Infof(lrw.Ctx(), "lagging frontier: %s with span %s", lrw.frontier.Frontier(), lrw.frontier.PeekFrontierSpan())
}
}
}
return lrw.subscription.Err()
}
Expand Down

0 comments on commit 452a3a8

Please sign in to comment.