Skip to content

Commit

Permalink
add table id to some logs
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Sep 24, 2024
1 parent 0874d36 commit 152d2f2
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 50 deletions.
2 changes: 1 addition & 1 deletion cdc/kv/region_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type regionInfo struct {
lockedRangeState *regionlock.LockedRangeState
}

func (s regionInfo) isStoped() bool {
func (s regionInfo) isStopped() bool {
// lockedRange only nil when the region's subscribedTable is stopped.
return s.lockedRangeState == nil
}
Expand Down
37 changes: 27 additions & 10 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,14 @@ func (s *SharedClient) Subscribe(subID SubscriptionID, span tablepb.Span, startT
s.totalSpans.Lock()
s.totalSpans.v[subID] = rt
s.totalSpans.Unlock()

s.rangeTaskCh.In() <- rangeTask{span: span, subscribedTable: rt}
log.Info("event feed subscribes table success",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", rt.subscriptionID),
zap.String("span", rt.span.String()))
zap.Int64("tableID", rt.span.TableID),
zap.Any("startKey", rt.span.StartKey),
zap.Any("endKey", rt.span.EndKey))
}

// Unsubscribe the given table span. All covered regions will be deregistered asynchronously.
Expand All @@ -306,13 +307,19 @@ func (s *SharedClient) Unsubscribe(subID SubscriptionID) {
s.totalSpans.Unlock()
if rt != nil {
s.setTableStopped(rt)
log.Info("event feed unsubscribes table",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", rt.subscriptionID),
zap.Int64("tableID", rt.span.TableID),
zap.Any("startKey", rt.span.StartKey),
zap.Any("endKey", rt.span.EndKey))
return
}

log.Info("event feed unsubscribes table",
log.Warn("event feed unsubscribes table, but not found",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", rt.subscriptionID),
zap.Bool("exists", rt != nil))
zap.Any("subscriptionID", subID))
}

// ResolveLock is a function. If outsider subscribers find a span resolved timestamp is
Expand Down Expand Up @@ -418,7 +425,7 @@ func (s *SharedClient) handleRegions(ctx context.Context, eg *errgroup.Group) er
case <-ctx.Done():
return errors.Trace(ctx.Err())
case region := <-s.regionCh.Out():
if region.isStoped() {
if region.isStopped() {
for _, rs := range s.stores {
s.broadcastRequest(rs, region)
}
Expand All @@ -440,7 +447,10 @@ func (s *SharedClient) handleRegions(ctx context.Context, eg *errgroup.Group) er
zap.String("changefeed", s.changefeed.ID),
zap.Uint64("streamID", stream.streamID),
zap.Any("subscriptionID", region.subscribedTable.subscriptionID),
zap.Int64("tableID", region.span.TableID),
zap.Uint64("regionID", region.verID.GetID()),
zap.Any("startKey", region.span.StartKey),
zap.Any("endKey", region.span.EndKey),
zap.Uint64("storeID", store.storeID),
zap.String("addr", store.storeAddr))
}
Expand Down Expand Up @@ -554,7 +564,9 @@ func (s *SharedClient) divideSpanAndScheduleRegionRequests(
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscribedTable.subscriptionID),
zap.Any("span", nextSpan),
zap.Int64("tableID", nextSpan.TableID),
zap.Any("startKey", nextSpan.StartKey),
zap.Any("endKey", nextSpan.EndKey),
zap.Error(err))
backoffBeforeLoad = true
continue
Expand All @@ -572,7 +584,9 @@ func (s *SharedClient) divideSpanAndScheduleRegionRequests(
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscribedTable.subscriptionID),
zap.Any("span", nextSpan))
zap.Int64("tableID", nextSpan.TableID),
zap.Any("startKey", nextSpan.StartKey),
zap.Any("endKey", nextSpan.EndKey))
backoffBeforeLoad = true
continue
}
Expand All @@ -590,7 +604,10 @@ func (s *SharedClient) divideSpanAndScheduleRegionRequests(
log.Panic("event feed check spans intersect shouldn't fail",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscribedTable.subscriptionID))
zap.Any("subscriptionID", subscribedTable.subscriptionID),
zap.Int64("tableID", nextSpan.TableID),
zap.Any("startKey", nextSpan.StartKey),
zap.Any("endKey", nextSpan.EndKey))
}

verID := tikv.NewRegionVerID(regionMeta.Id, regionMeta.RegionEpoch.ConfVer, regionMeta.RegionEpoch.Version)
Expand Down
64 changes: 25 additions & 39 deletions cdc/kv/shared_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque
case <-ctx.Done():
return ctx.Err()
case region := <-stream.requests.Out():
if !region.isStoped() {
if !region.isStopped() {
stream.preFetchForConnecting = new(regionInfo)
*stream.preFetchForConnecting = region
return nil
Expand Down Expand Up @@ -104,7 +104,7 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque
// Why we need to re-schedule pending regions? This because the store can
// fail forever, and all regions are scheduled to other stores.
for _, region := range stream.clearPendingRegions() {
if region.isStoped() {
if region.isStopped() {
// It means it's a special task for stopping the table.
continue
}
Expand Down Expand Up @@ -254,30 +254,6 @@ func (s *requestedStream) receive(
}

func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *requestedStore) (err error) {
doSend := func(cc *sharedconn.ConnAndClient, req *cdcpb.ChangeDataRequest, subscriptionID SubscriptionID) error {
if err := cc.Client().Send(req); err != nil {
log.Warn("event feed send request to grpc stream failed",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", req.RegionId),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Error(err))
return errors.Trace(err)
}
log.Debug("event feed send request to grpc stream success",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", req.RegionId),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr))
return nil
}

fetchMoreReq := func() (regionInfo, error) {
waitReqTicker := time.NewTicker(60 * time.Second)
defer waitReqTicker.Stop()
Expand Down Expand Up @@ -329,23 +305,24 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request
s.preFetchForConnecting = nil
for {
subscriptionID := region.subscribedTable.subscriptionID
log.Debug("event feed gets a singleRegionInfo",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", region.verID.GetID()),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr))
// It means it's a special task for stopping the table.
if region.isStoped() {
if region.isStopped() {
if s.multiplexing != nil {
req := &cdcpb.ChangeDataRequest{
RequestId: uint64(subscriptionID),
Request: &cdcpb.ChangeDataRequest_Deregister_{},
}
if err = doSend(s.multiplexing, req, subscriptionID); err != nil {
return err
if err = s.multiplexing.Client().Send(req); err != nil {
log.Warn("event feed send deregister request to grpc stream failed",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Int64("tableID", region.span.TableID),
zap.Uint64("regionID", req.RegionId),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Error(err))
}
} else if cc := tableExclusives[subscriptionID]; cc != nil {
delete(tableExclusives, subscriptionID)
Expand Down Expand Up @@ -385,8 +362,17 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request
} else if cc, err = getTableExclusiveConn(subscriptionID); err != nil {
return err
}
if err = doSend(cc, c.createRegionRequest(region), subscriptionID); err != nil {
return err
if err = cc.Client().Send(c.createRegionRequest(region)); err != nil {
log.Warn("event feed send request to grpc stream failed",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Any("subscriptionID", subscriptionID),
zap.Uint64("regionID", region.verID.GetID()),
zap.Int64("tableID", region.span.TableID),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Error(err))
}
}

Expand Down

0 comments on commit 152d2f2

Please sign in to comment.