Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvclient(ticdc): add table id to kv client logs #11622

Merged
merged 4 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdc/kv/region_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type regionInfo struct {
lockedRangeState *regionlock.LockedRangeState
}

func (s regionInfo) isStoped() bool {
func (s regionInfo) isStopped() bool {
// lockedRange only nil when the region's subscribedTable is stopped.
return s.lockedRangeState == nil
}
Expand Down
59 changes: 44 additions & 15 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,14 @@
s.totalSpans.Lock()
s.totalSpans.v[subID] = rt
s.totalSpans.Unlock()

s.rangeTaskCh.In() <- rangeTask{span: span, subscribedTable: rt}
log.Info("event feed subscribes table success",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", rt.subscriptionID),
zap.String("span", rt.span.String()))
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
zap.Int64("tableID", rt.span.TableID),
zap.Any("startKey", rt.span.StartKey),
zap.Any("endKey", rt.span.EndKey))
}

// Unsubscribe the given table span. All covered regions will be deregistered asynchronously.
Expand All @@ -306,13 +307,19 @@
s.totalSpans.Unlock()
if rt != nil {
s.setTableStopped(rt)
log.Info("event feed unsubscribes table",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", rt.subscriptionID),
zap.Int64("tableID", rt.span.TableID),
zap.Any("startKey", rt.span.StartKey),
zap.Any("endKey", rt.span.EndKey))
return

Check warning on line 317 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L310-L317

Added lines #L310 - L317 were not covered by tests
}

log.Info("event feed unsubscribes table",
log.Warn("event feed unsubscribes table, but not found",

Check warning on line 319 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L319

Added line #L319 was not covered by tests
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", rt.subscriptionID),
zap.Bool("exists", rt != nil))
zap.Any("subscriptionID", subID))

Check warning on line 322 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L322

Added line #L322 was not covered by tests
}

// ResolveLock is a function. If outsider subscribers find a span resolved timestamp is
Expand Down Expand Up @@ -382,7 +389,8 @@
log.Info("event feed starts to stop table",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", rt.subscriptionID))
zap.Any("subscriptionID", rt.subscriptionID),
zap.Int64("tableID", rt.span.TableID))

Check warning on line 393 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L392-L393

Added lines #L392 - L393 were not covered by tests

// Set stopped to true so we can stop handling region events from the table.
// Then send a special singleRegionInfo to regionRouter to deregister the table
Expand All @@ -399,7 +407,8 @@
log.Info("event feed stop table is finished",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", rt.subscriptionID))
zap.Any("subscriptionID", rt.subscriptionID),
zap.Int64("tableID", rt.span.TableID))

Check warning on line 411 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L410-L411

Added lines #L410 - L411 were not covered by tests

s.totalSpans.Lock()
defer s.totalSpans.Unlock()
Expand All @@ -418,7 +427,7 @@
case <-ctx.Done():
return errors.Trace(ctx.Err())
case region := <-s.regionCh.Out():
if region.isStoped() {
if region.isStopped() {
for _, rs := range s.stores {
s.broadcastRequest(rs, region)
}
Expand All @@ -440,7 +449,10 @@
zap.String("changefeed", s.changefeed.ID),
zap.Uint64("streamID", stream.streamID),
zap.Any("subscriptionID", region.subscribedTable.subscriptionID),
zap.Int64("tableID", region.span.TableID),
zap.Uint64("regionID", region.verID.GetID()),
zap.Any("startKey", region.span.StartKey),
zap.Any("endKey", region.span.EndKey),
zap.Uint64("storeID", store.storeID),
zap.String("addr", store.storeAddr))
}
Expand Down Expand Up @@ -554,7 +566,9 @@
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscribedTable.subscriptionID),
zap.Any("span", nextSpan),
zap.Int64("tableID", nextSpan.TableID),
zap.Any("startKey", nextSpan.StartKey),
zap.Any("endKey", nextSpan.EndKey),

Check warning on line 571 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L569-L571

Added lines #L569 - L571 were not covered by tests
zap.Error(err))
backoffBeforeLoad = true
continue
Expand All @@ -572,7 +586,9 @@
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscribedTable.subscriptionID),
zap.Any("span", nextSpan))
zap.Int64("tableID", nextSpan.TableID),
zap.Any("startKey", nextSpan.StartKey),
zap.Any("endKey", nextSpan.EndKey))

Check warning on line 591 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L589-L591

Added lines #L589 - L591 were not covered by tests
backoffBeforeLoad = true
continue
}
Expand All @@ -590,7 +606,10 @@
log.Panic("event feed check spans intersect shouldn't fail",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscribedTable.subscriptionID))
zap.Any("subscriptionID", subscribedTable.subscriptionID),
zap.Int64("tableID", nextSpan.TableID),
zap.Any("startKey", nextSpan.StartKey),
zap.Any("endKey", nextSpan.EndKey))

Check warning on line 612 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L609-L612

Added lines #L609 - L612 were not covered by tests
}

verID := tikv.NewRegionVerID(regionMeta.Id, regionMeta.RegionEpoch.ConfVer, regionMeta.RegionEpoch.Version)
Expand Down Expand Up @@ -676,6 +695,8 @@
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", errInfo.subscribedTable.subscriptionID),
zap.Uint64("regionID", errInfo.verID.GetID()),
zap.Int64("tableID", errInfo.span.TableID),

Check warning on line 699 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L698-L699

Added lines #L698 - L699 were not covered by tests
zap.Stringer("error", innerErr))

if notLeader := innerErr.GetNotLeader(); notLeader != nil {
Expand Down Expand Up @@ -720,6 +741,8 @@
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", errInfo.subscribedTable.subscriptionID),
zap.Uint64("regionID", errInfo.verID.GetID()),
zap.Int64("tableID", errInfo.span.TableID),

Check warning on line 745 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L744-L745

Added lines #L744 - L745 were not covered by tests
zap.Stringer("error", innerErr))
metricFeedUnknownErrorCounter.Inc()
s.scheduleRegionRequest(ctx, errInfo.regionInfo)
Expand All @@ -740,6 +763,8 @@
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", errInfo.subscribedTable.subscriptionID),
zap.Uint64("regionID", errInfo.verID.GetID()),
zap.Int64("tableID", errInfo.span.TableID),

Check warning on line 767 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L766-L767

Added lines #L766 - L767 were not covered by tests
zap.Error(err))
return err
}
Expand Down Expand Up @@ -816,7 +841,7 @@

currTime := s.pdClock.CurrentTime()
s.totalSpans.RLock()
slowInitializeRegion := 0
var slowInitializeRegionCount int

Check warning on line 844 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L844

Added line #L844 was not covered by tests
for subscriptionID, rt := range s.totalSpans.v {
attr := rt.rangeLock.IterAll(nil)
ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.ResolvedTs)
Expand All @@ -826,32 +851,36 @@
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.Int64("tableID", rt.span.TableID),

Check warning on line 854 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L854

Added line #L854 was not covered by tests
zap.Any("slowRegion", attr.SlowestRegion))
}
} else if currTime.Sub(attr.SlowestRegion.Created) > 10*time.Minute {
slowInitializeRegion += 1
slowInitializeRegionCount += 1

Check warning on line 858 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L858

Added line #L858 was not covered by tests
log.Info("event feed initializes a region too slow",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.Int64("tableID", rt.span.TableID),

Check warning on line 863 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L863

Added line #L863 was not covered by tests
zap.Any("slowRegion", attr.SlowestRegion))
} else if currTime.Sub(ckptTime) > 10*time.Minute {
log.Info("event feed finds a uninitialized slow region",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.Int64("tableID", rt.span.TableID),

Check warning on line 870 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L870

Added line #L870 was not covered by tests
zap.Any("slowRegion", attr.SlowestRegion))
}
if len(attr.UnLockedRanges) > 0 {
log.Info("event feed holes exist",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.Int64("tableID", rt.span.TableID),

Check warning on line 878 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L878

Added line #L878 was not covered by tests
zap.Any("holes", attr.UnLockedRanges))
}
}
s.totalSpans.RUnlock()
s.metrics.slowInitializeRegion.Set(float64(slowInitializeRegion))
s.metrics.slowInitializeRegion.Set(float64(slowInitializeRegionCount))

Check warning on line 883 in cdc/kv/shared_client.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_client.go#L883

Added line #L883 was not covered by tests
}
}

Expand Down
10 changes: 3 additions & 7 deletions cdc/kv/shared_region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (w *sharedRegionWorker) handleSingleRegionError(state *regionFeedState, str
zap.Uint64("streamID", stream.streamID),
zap.Any("subscriptionID", state.getRegionID()),
zap.Uint64("regionID", state.region.verID.GetID()),
zap.Int64("tableID", state.region.span.TableID),
zap.Bool("reschedule", stepsToRemoved),
zap.Error(err))
}
Expand Down Expand Up @@ -228,12 +229,6 @@ func (w *sharedRegionWorker) handleEventEntry(ctx context.Context, x *cdcpb.Even
}
}
tableID := state.region.subscribedTable.span.TableID
log.Debug("region worker get an Event",
zap.String("namespace", w.changefeed.Namespace),
zap.String("changefeed", w.changefeed.ID),
zap.Any("subscriptionID", state.region.subscribedTable.subscriptionID),
zap.Int64("tableID", tableID),
zap.Int("rows", len(x.Entries.GetEntries())))
return handleEventEntry(x, startTs, state, w.metrics, emit, w.changefeed, tableID, w.client.logRegionDetails)
}

Expand All @@ -250,7 +245,7 @@ func handleEventEntry(
regionID, regionSpan, _ := state.getRegionMeta()
for _, entry := range x.Entries.GetEntries() {
// NOTE: from TiKV 7.0.0, entries are already filtered out in TiKV side.
// We can remove the check in future.
// We can remove the check in the future.
comparableKey := spanz.ToComparableKey(entry.GetKey())
if entry.Type != cdcpb.Event_INITIALIZED &&
!spanz.KeyInSpan(comparableKey, regionSpan) {
Expand All @@ -266,6 +261,7 @@ func handleEventEntry(
zap.String("changefeed", changefeed.ID),
zap.Int64("tableID", tableID),
zap.Uint64("regionID", regionID),
zap.Int64("tableID", state.region.span.TableID),
zap.Uint64("requestID", state.requestID),
zap.Stringer("span", &state.region.span))

Expand Down
73 changes: 32 additions & 41 deletions cdc/kv/shared_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
case <-ctx.Done():
return ctx.Err()
case region := <-stream.requests.Out():
if !region.isStoped() {
if !region.isStopped() {
stream.preFetchForConnecting = new(regionInfo)
*stream.preFetchForConnecting = region
return nil
Expand Down Expand Up @@ -104,7 +104,7 @@
// Why we need to re-schedule pending regions? This because the store can
// fail forever, and all regions are scheduled to other stores.
for _, region := range stream.clearPendingRegions() {
if region.isStoped() {
if region.isStopped() {
// It means it's a special task for stopping the table.
continue
}
Expand Down Expand Up @@ -254,30 +254,6 @@
}

func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *requestedStore) (err error) {
doSend := func(cc *sharedconn.ConnAndClient, req *cdcpb.ChangeDataRequest, subscriptionID SubscriptionID) error {
if err := cc.Client().Send(req); err != nil {
log.Warn("event feed send request to grpc stream failed",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", req.RegionId),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Error(err))
return errors.Trace(err)
}
log.Debug("event feed send request to grpc stream success",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", req.RegionId),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr))
return nil
}

fetchMoreReq := func() (regionInfo, error) {
waitReqTicker := time.NewTicker(60 * time.Second)
defer waitReqTicker.Stop()
Expand Down Expand Up @@ -329,23 +305,24 @@
s.preFetchForConnecting = nil
for {
subscriptionID := region.subscribedTable.subscriptionID
log.Debug("event feed gets a singleRegionInfo",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", region.verID.GetID()),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr))
// It means it's a special task for stopping the table.
if region.isStoped() {
if region.isStopped() {
if s.multiplexing != nil {
req := &cdcpb.ChangeDataRequest{
RequestId: uint64(subscriptionID),
Request: &cdcpb.ChangeDataRequest_Deregister_{},
}
if err = doSend(s.multiplexing, req, subscriptionID); err != nil {
return err
if err = s.multiplexing.Client().Send(req); err != nil {
log.Warn("event feed send deregister request to grpc stream failed",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Int64("tableID", region.span.TableID),
zap.Uint64("regionID", req.RegionId),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Error(err))

Check warning on line 325 in cdc/kv/shared_stream.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_stream.go#L315-L325

Added lines #L315 - L325 were not covered by tests
}
} else if cc := tableExclusives[subscriptionID]; cc != nil {
delete(tableExclusives, subscriptionID)
Expand Down Expand Up @@ -385,8 +362,17 @@
} else if cc, err = getTableExclusiveConn(subscriptionID); err != nil {
return err
}
if err = doSend(cc, c.createRegionRequest(region), subscriptionID); err != nil {
return err
if err = cc.Client().Send(c.createRegionRequest(region)); err != nil {
log.Warn("event feed send request to grpc stream failed",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", region.verID.GetID()),
zap.Int64("tableID", region.span.TableID),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Error(err))

Check warning on line 375 in cdc/kv/shared_stream.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_stream.go#L366-L375

Added lines #L366 - L375 were not covered by tests
}
}

Expand Down Expand Up @@ -483,14 +469,19 @@
state := s.getState(subscriptionID, regionID)
switch x := event.Event.(type) {
case *cdcpb.Event_Error:
s.logRegionDetails("event feed receives a region error",
fields := []zap.Field{

Check warning on line 472 in cdc/kv/shared_stream.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_stream.go#L472

Added line #L472 was not covered by tests
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", event.RegionId),
zap.Bool("stateIsNil", state == nil),
zap.Any("error", x.Error))
zap.Any("error", x.Error),
}
if state != nil {
fields = append(fields, zap.Int64("tableID", state.region.span.TableID))
}
s.logRegionDetails("event feed receives a region error", fields...)

Check warning on line 484 in cdc/kv/shared_stream.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/shared_stream.go#L479-L484

Added lines #L479 - L484 were not covered by tests
}

if state != nil {
Expand Down
Loading
Loading