Skip to content

Commit

Permalink
add table id to some logs
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Sep 24, 2024
1 parent 152d2f2 commit 0dd9516
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 14 deletions.
22 changes: 17 additions & 5 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,8 @@ func (s *SharedClient) setTableStopped(rt *subscribedTable) {
log.Info("event feed starts to stop table",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", rt.subscriptionID))
zap.Any("subscriptionID", rt.subscriptionID),
zap.Int64("tableID", rt.span.TableID))

Check warning on line 393 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L392-L393

Added lines #L392 - L393 were not covered by tests

// Set stopped to true so we can stop handling region events from the table.
// Then send a special singleRegionInfo to regionRouter to deregister the table
Expand All @@ -406,7 +407,8 @@ func (s *SharedClient) onTableDrained(rt *subscribedTable) {
log.Info("event feed stop table is finished",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", rt.subscriptionID))
zap.Any("subscriptionID", rt.subscriptionID),
zap.Int64("tableID", rt.span.TableID))

Check warning on line 411 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L410-L411

Added lines #L410 - L411 were not covered by tests

s.totalSpans.Lock()
defer s.totalSpans.Unlock()
Expand Down Expand Up @@ -693,6 +695,8 @@ func (s *SharedClient) doHandleError(ctx context.Context, errInfo regionErrorInf
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", errInfo.subscribedTable.subscriptionID),
zap.Uint64("regionID", errInfo.verID.GetID()),
zap.Int64("tableID", errInfo.span.TableID),

Check warning on line 699 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L698-L699

Added lines #L698 - L699 were not covered by tests
zap.Stringer("error", innerErr))

if notLeader := innerErr.GetNotLeader(); notLeader != nil {
Expand Down Expand Up @@ -737,6 +741,8 @@ func (s *SharedClient) doHandleError(ctx context.Context, errInfo regionErrorInf
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", errInfo.subscribedTable.subscriptionID),
zap.Uint64("regionID", errInfo.verID.GetID()),
zap.Int64("tableID", errInfo.span.TableID),

Check warning on line 745 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L744-L745

Added lines #L744 - L745 were not covered by tests
zap.Stringer("error", innerErr))
metricFeedUnknownErrorCounter.Inc()
s.scheduleRegionRequest(ctx, errInfo.regionInfo)
Expand All @@ -757,6 +763,8 @@ func (s *SharedClient) doHandleError(ctx context.Context, errInfo regionErrorInf
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", errInfo.subscribedTable.subscriptionID),
zap.Uint64("regionID", errInfo.verID.GetID()),
zap.Int64("tableID", errInfo.span.TableID),

Check warning on line 767 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L766-L767

Added lines #L766 - L767 were not covered by tests
zap.Error(err))
return err
}
Expand Down Expand Up @@ -833,7 +841,7 @@ func (s *SharedClient) logSlowRegions(ctx context.Context) error {

currTime := s.pdClock.CurrentTime()
s.totalSpans.RLock()
slowInitializeRegion := 0
var slowInitializeRegionCount int

Check warning on line 844 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L844

Added line #L844 was not covered by tests
for subscriptionID, rt := range s.totalSpans.v {
attr := rt.rangeLock.IterAll(nil)
ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.ResolvedTs)
Expand All @@ -843,32 +851,36 @@ func (s *SharedClient) logSlowRegions(ctx context.Context) error {
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.Int64("tableID", rt.span.TableID),

Check warning on line 854 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L854

Added line #L854 was not covered by tests
zap.Any("slowRegion", attr.SlowestRegion))
}
} else if currTime.Sub(attr.SlowestRegion.Created) > 10*time.Minute {
slowInitializeRegion += 1
slowInitializeRegionCount += 1

Check warning on line 858 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L858

Added line #L858 was not covered by tests
log.Info("event feed initializes a region too slow",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.Int64("tableID", rt.span.TableID),

Check warning on line 863 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L863

Added line #L863 was not covered by tests
zap.Any("slowRegion", attr.SlowestRegion))
} else if currTime.Sub(ckptTime) > 10*time.Minute {
log.Info("event feed finds a uninitialized slow region",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.Int64("tableID", rt.span.TableID),

Check warning on line 870 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L870

Added line #L870 was not covered by tests
zap.Any("slowRegion", attr.SlowestRegion))
}
if len(attr.UnLockedRanges) > 0 {
log.Info("event feed holes exist",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.Int64("tableID", rt.span.TableID),

Check warning on line 878 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L878

Added line #L878 was not covered by tests
zap.Any("holes", attr.UnLockedRanges))
}
}
s.totalSpans.RUnlock()
s.metrics.slowInitializeRegion.Set(float64(slowInitializeRegion))
s.metrics.slowInitializeRegion.Set(float64(slowInitializeRegionCount))

Check warning on line 883 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L883

Added line #L883 was not covered by tests
}
}

Expand Down
10 changes: 3 additions & 7 deletions cdc/kv/shared_region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (w *sharedRegionWorker) handleSingleRegionError(state *regionFeedState, str
zap.Uint64("streamID", stream.streamID),
zap.Any("subscriptionID", state.getRegionID()),
zap.Uint64("regionID", state.region.verID.GetID()),
zap.Int64("tableID", state.region.span.TableID),
zap.Bool("reschedule", stepsToRemoved),
zap.Error(err))
}
Expand Down Expand Up @@ -228,12 +229,6 @@ func (w *sharedRegionWorker) handleEventEntry(ctx context.Context, x *cdcpb.Even
}
}
tableID := state.region.subscribedTable.span.TableID
log.Debug("region worker get an Event",
zap.String("namespace", w.changefeed.Namespace),
zap.String("changefeed", w.changefeed.ID),
zap.Any("subscriptionID", state.region.subscribedTable.subscriptionID),
zap.Int64("tableID", tableID),
zap.Int("rows", len(x.Entries.GetEntries())))
return handleEventEntry(x, startTs, state, w.metrics, emit, w.changefeed, tableID, w.client.logRegionDetails)
}

Expand All @@ -250,7 +245,7 @@ func handleEventEntry(
regionID, regionSpan, _ := state.getRegionMeta()
for _, entry := range x.Entries.GetEntries() {
// NOTE: from TiKV 7.0.0, entries are already filtered out in TiKV side.
// We can remove the check in future.
// We can remove the check in the future.
comparableKey := spanz.ToComparableKey(entry.GetKey())
if entry.Type != cdcpb.Event_INITIALIZED &&
!spanz.KeyInSpan(comparableKey, regionSpan) {
Expand All @@ -266,6 +261,7 @@ func handleEventEntry(
zap.String("changefeed", changefeed.ID),
zap.Int64("tableID", tableID),
zap.Uint64("regionID", regionID),
zap.Int64("tableID", state.region.span.TableID),
zap.Uint64("requestID", state.requestID),
zap.Stringer("span", &state.region.span))

Expand Down
9 changes: 7 additions & 2 deletions cdc/kv/shared_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,14 +469,19 @@ func (s *requestedStream) sendRegionChangeEvents(
state := s.getState(subscriptionID, regionID)
switch x := event.Event.(type) {
case *cdcpb.Event_Error:
s.logRegionDetails("event feed receives a region error",
fields := []zap.Field{

Check warning on line 472 in cdc/kv/shared_stream.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_stream.go#L472

Added line #L472 was not covered by tests
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", event.RegionId),
zap.Bool("stateIsNil", state == nil),
zap.Any("error", x.Error))
zap.Any("error", x.Error),
}
if state != nil {
fields = append(fields, zap.Int64("tableID", state.region.span.TableID))
}
s.logRegionDetails("event feed receives a region error", fields...)

Check warning on line 484 in cdc/kv/shared_stream.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_stream.go#L479-L484

Added lines #L479 - L484 were not covered by tests
}

if state != nil {
Expand Down

0 comments on commit 0dd9516

Please sign in to comment.