From ef829aaada712af5fbf1b002f11bb9ecc4d79d68 Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 29 Apr 2024 15:56:32 +0800 Subject: [PATCH] improve region request log for diagnose (#1300) * improve region request log for diagnose Signed-off-by: crazycs520 * rename struct Signed-off-by: crazycs520 * refine region error metric with store id label and add rpc error metric Signed-off-by: crazycs520 * refine comment Signed-off-by: crazycs520 * refine code Signed-off-by: crazycs520 * restrict log Signed-off-by: crazycs520 * refine code Signed-off-by: crazycs520 * refine Signed-off-by: crazycs520 * refine Signed-off-by: crazycs520 * refine log Signed-off-by: crazycs520 * refine code Signed-off-by: crazycs520 * fix test Signed-off-by: crazycs520 * address comment Signed-off-by: crazycs520 * refine Signed-off-by: crazycs520 * refine Signed-off-by: crazycs520 * refine log Signed-off-by: crazycs520 --------- Signed-off-by: crazycs520 --- integration_tests/main_test.go | 1 + integration_tests/snapshot_test.go | 8 +- internal/locate/region_request.go | 351 +++++++++++++++++++----- internal/locate/region_request3_test.go | 26 +- internal/locate/region_request_test.go | 44 +++ metrics/metrics.go | 13 +- tikv/region.go | 7 +- txnkv/txnsnapshot/client_helper.go | 6 +- txnkv/txnsnapshot/snapshot.go | 44 ++- txnkv/txnsnapshot/test_probe.go | 5 +- 10 files changed, 386 insertions(+), 119 deletions(-) diff --git a/integration_tests/main_test.go b/integration_tests/main_test.go index fcd7a050a7..5998804baf 100644 --- a/integration_tests/main_test.go +++ b/integration_tests/main_test.go @@ -27,6 +27,7 @@ func TestMain(m *testing.M) { opts := []goleak.Option{ goleak.IgnoreTopFunction("github.com/pingcap/goleveldb/leveldb.(*DB).mpoolDrain"), goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"), // TODO: fix ttlManager goroutine leak + goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/config/retry.(*Config).createBackoffFn.newBackoffFn.func2"), } goleak.VerifyTestMain(m, opts...) diff --git a/integration_tests/snapshot_test.go b/integration_tests/snapshot_test.go index a8244ed4fa..aee209543e 100644 --- a/integration_tests/snapshot_test.go +++ b/integration_tests/snapshot_test.go @@ -303,13 +303,13 @@ func (s *testSnapshotSuite) TestSnapshotThreadSafe() { func (s *testSnapshotSuite) TestSnapshotRuntimeStats() { reqStats := tikv.NewRegionRequestRuntimeStats() - tikv.RecordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Second) - tikv.RecordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Millisecond) + reqStats.RecordRPCRuntimeStats(tikvrpc.CmdGet, time.Second) + reqStats.RecordRPCRuntimeStats(tikvrpc.CmdGet, time.Millisecond) snapshot := s.store.GetSnapshot(0) runtimeStats := &txnkv.SnapshotRuntimeStats{} snapshot.SetRuntimeStats(runtimeStats) - snapshot.MergeRegionRequestStats(reqStats.Stats) - snapshot.MergeRegionRequestStats(reqStats.Stats) + snapshot.MergeRegionRequestStats(reqStats) + snapshot.MergeRegionRequestStats(reqStats) bo := tikv.NewBackofferWithVars(context.Background(), 2000, nil) err := bo.BackoffWithMaxSleepTxnLockFast(5, errors.New("test")) s.Nil(err) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index be3103d055..07dda82e5a 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -35,9 +35,9 @@ package locate import ( - "bytes" "context" "fmt" + "maps" "math" "math/rand" "strconv" @@ -113,7 +113,8 @@ type RegionRequestSender struct { replicaSelector ReplicaSelector failStoreIDs map[uint64]struct{} failProxyStoreIDs map[uint64]struct{} - RegionRequestRuntimeStats + Stats *RegionRequestRuntimeStats + AccessStats *ReplicaAccessStats } func (s *RegionRequestSender) String() string { @@ -125,13 +126,22 @@ func (s *RegionRequestSender) String() string { // RegionRequestRuntimeStats records the runtime stats of send region requests. type RegionRequestRuntimeStats struct { - Stats map[tikvrpc.CmdType]*RPCRuntimeStats + RPCStats map[tikvrpc.CmdType]*RPCRuntimeStats + RequestErrorStats +} + +// RequestErrorStats records the request error(region error and rpc error) count. +type RequestErrorStats struct { + // ErrStats record the region error and rpc error, and their count. + // Attention: avoid too many error types, ErrStats only record the first 16 different errors. + ErrStats map[string]int + OtherErrCnt int } // NewRegionRequestRuntimeStats returns a new RegionRequestRuntimeStats. -func NewRegionRequestRuntimeStats() RegionRequestRuntimeStats { - return RegionRequestRuntimeStats{ - Stats: make(map[tikvrpc.CmdType]*RPCRuntimeStats), +func NewRegionRequestRuntimeStats() *RegionRequestRuntimeStats { + return &RegionRequestRuntimeStats{ + RPCStats: make(map[tikvrpc.CmdType]*RPCRuntimeStats), } } @@ -142,14 +152,44 @@ type RPCRuntimeStats struct { Consume int64 } +// RecordRPCRuntimeStats uses to record the rpc count and duration stats. +func (r *RegionRequestRuntimeStats) RecordRPCRuntimeStats(cmd tikvrpc.CmdType, d time.Duration) { + stat, ok := r.RPCStats[cmd] + if !ok { + r.RPCStats[cmd] = &RPCRuntimeStats{ + Count: 1, + Consume: int64(d), + } + return + } + stat.Count++ + stat.Consume += int64(d) +} + +// RecordRPCErrorStats uses to record the request error(region error label and rpc error) info and count. +func (r *RequestErrorStats) RecordRPCErrorStats(errLabel string) { + if r.ErrStats == nil { + // lazy init to avoid unnecessary allocation. + r.ErrStats = make(map[string]int) + } + if len(r.ErrStats) < 16 { + // Avoid too many error. + r.ErrStats[errLabel]++ + } else { + r.OtherErrCnt++ + } +} + // String implements fmt.Stringer interface. func (r *RegionRequestRuntimeStats) String() string { + if r == nil { + return "" + } var builder strings.Builder - for k, v := range r.Stats { + for k, v := range r.RPCStats { if builder.Len() > 0 { builder.WriteByte(',') } - // append string: fmt.Sprintf("%s:{num_rpc:%v, total_time:%s}", k.String(), v.Count, util.FormatDuration(time.Duration(v.Consume))") builder.WriteString(k.String()) builder.WriteString(":{num_rpc:") builder.WriteString(strconv.FormatInt(v.Count, 10)) @@ -157,27 +197,57 @@ func (r *RegionRequestRuntimeStats) String() string { builder.WriteString(util.FormatDuration(time.Duration(v.Consume))) builder.WriteString("}") } + if errStatsStr := r.RequestErrorStats.String(); errStatsStr != "" { + builder.WriteString(", rpc_errors:") + builder.WriteString(errStatsStr) + } + return builder.String() +} + +// String implements fmt.Stringer interface. +func (r *RequestErrorStats) String() string { + if len(r.ErrStats) == 0 { + return "" + } + var builder strings.Builder + builder.WriteString("{") + for err, cnt := range r.ErrStats { + if builder.Len() > 2 { + builder.WriteString(", ") + } + builder.WriteString(err) + builder.WriteString(":") + builder.WriteString(strconv.Itoa(cnt)) + } + if r.OtherErrCnt > 0 { + builder.WriteString(", other_error:") + builder.WriteString(strconv.Itoa(r.OtherErrCnt)) + } + builder.WriteString("}") return builder.String() } // Clone returns a copy of itself. -func (r *RegionRequestRuntimeStats) Clone() RegionRequestRuntimeStats { +func (r *RegionRequestRuntimeStats) Clone() *RegionRequestRuntimeStats { newRs := NewRegionRequestRuntimeStats() - for cmd, v := range r.Stats { - newRs.Stats[cmd] = &RPCRuntimeStats{ - Count: v.Count, - Consume: v.Consume, - } + maps.Copy(newRs.RPCStats, r.RPCStats) + if len(r.ErrStats) > 0 { + newRs.ErrStats = make(map[string]int) + maps.Copy(newRs.ErrStats, r.ErrStats) + newRs.OtherErrCnt = r.OtherErrCnt } return newRs } // Merge merges other RegionRequestRuntimeStats. -func (r *RegionRequestRuntimeStats) Merge(rs RegionRequestRuntimeStats) { - for cmd, v := range rs.Stats { - stat, ok := r.Stats[cmd] +func (r *RegionRequestRuntimeStats) Merge(rs *RegionRequestRuntimeStats) { + if rs == nil { + return + } + for cmd, v := range rs.RPCStats { + stat, ok := r.RPCStats[cmd] if !ok { - r.Stats[cmd] = &RPCRuntimeStats{ + r.RPCStats[cmd] = &RPCRuntimeStats{ Count: v.Count, Consume: v.Consume, } @@ -186,20 +256,114 @@ func (r *RegionRequestRuntimeStats) Merge(rs RegionRequestRuntimeStats) { stat.Count += v.Count stat.Consume += v.Consume } + if len(rs.ErrStats) > 0 { + if r.ErrStats == nil { + r.ErrStats = make(map[string]int) + } + for err, cnt := range rs.ErrStats { + r.ErrStats[err] += cnt + } + r.OtherErrCnt += rs.OtherErrCnt + } +} + +// ReplicaAccessStats records the replica access info. +type ReplicaAccessStats struct { + // AccessInfos records the access info + AccessInfos []ReplicaAccessInfo + // avoid to consume too much memory, after more than 5 records, count them by peerID in `OverflowAccessStat` map. + OverflowAccessStat map[uint64]*RequestErrorStats } -// RecordRegionRequestRuntimeStats records request runtime stats. -func RecordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats, cmd tikvrpc.CmdType, d time.Duration) { - stat, ok := stats[cmd] +// ReplicaAccessInfo indicates the access path detail info of a request. +type ReplicaAccessInfo struct { + Peer uint64 + Store uint64 + ReqReadTp ReqReadType + Err string +} + +type ReqReadType byte + +const ( + ReqLeader ReqReadType = iota + ReqReplicaRead + ReqStaleRead +) + +func (s *ReplicaAccessStats) recordReplicaAccessInfo(staleRead, replicaRead bool, peerID, storeID uint64, err string) { + if len(s.AccessInfos) < 5 { + tp := ReqLeader + if replicaRead { + tp = ReqReplicaRead + } else if staleRead { + tp = ReqStaleRead + } + s.AccessInfos = append(s.AccessInfos, ReplicaAccessInfo{ + Peer: peerID, + Store: storeID, + ReqReadTp: tp, + Err: err, + }) + return + } + if s.OverflowAccessStat == nil { + s.OverflowAccessStat = make(map[uint64]*RequestErrorStats) + } + stat, ok := s.OverflowAccessStat[peerID] if !ok { - stats[cmd] = &RPCRuntimeStats{ - Count: 1, - Consume: int64(d), + stat = &RequestErrorStats{} + s.OverflowAccessStat[peerID] = stat + } + stat.RecordRPCErrorStats(err) +} + +// String implements fmt.Stringer interface. +func (s *ReplicaAccessStats) String() string { + if s == nil { + return "" + } + var builder strings.Builder + for i, info := range s.AccessInfos { + if i > 0 { + builder.WriteString(", ") + } + switch info.ReqReadTp { + case ReqLeader: + builder.WriteString("{") + case ReqReplicaRead: + builder.WriteString("{replica_read, ") + case ReqStaleRead: + builder.WriteString("{stale_read, ") + } + builder.WriteString("peer:") + builder.WriteString(strconv.FormatUint(info.Peer, 10)) + builder.WriteString(", store:") + builder.WriteString(strconv.FormatUint(info.Store, 10)) + builder.WriteString(", err:") + builder.WriteString(info.Err) + builder.WriteString("}") + } + if len(s.OverflowAccessStat) > 0 { + builder.WriteString(", overflow_count:{") + cnt := 0 + for peerID, stat := range s.OverflowAccessStat { + if stat == nil { + continue + } + if cnt > 0 { + builder.WriteString(", ") + } + builder.WriteString("{peer:") + builder.WriteString(strconv.FormatUint(peerID, 10)) + builder.WriteString(", error_stats:") + builder.WriteString(stat.String()) + builder.WriteString("}") + cnt++ } - return + builder.WriteString("}") } - stat.Count++ - stat.Consume += int64(d) + return builder.String() } // NewRegionRequestSender creates a new sender. @@ -259,6 +423,16 @@ func (s *RegionRequestSender) SendReq( return resp, retryTimes, err } +func (s *RegionRequestSender) recordRPCAccessInfo(req *tikvrpc.Request, rpcCtx *RPCContext, err string) { + if req == nil || rpcCtx == nil || rpcCtx.Peer == nil || rpcCtx.Store == nil { + return + } + if s.AccessStats == nil { + s.AccessStats = &ReplicaAccessStats{} + } + s.AccessStats.recordReplicaAccessInfo(req.StaleRead, req.ReplicaRead, rpcCtx.Peer.GetId(), rpcCtx.Store.storeID, err) +} + type replica struct { store *Store peer *metapb.Peer @@ -383,11 +557,12 @@ func (s *baseReplicaSelector) String() string { } for _, replica := range s.replicas { replicaStatus = append(replicaStatus, fmt.Sprintf("peer: %v, store: %v, isEpochStale: %v, "+ - "attempts: %v, replica-epoch: %v, store-epoch: %v, store-state: %v, store-liveness-state: %v", + "attempts: %v, attempts_time: %v, replica-epoch: %v, store-epoch: %v, store-state: %v, store-liveness-state: %v", replica.peer.GetId(), replica.store.storeID, replica.isEpochStale(), replica.attempts, + util.FormatDuration(replica.attemptedTime), replica.getEpoch(), atomic.LoadUint32(&replica.store.epoch), replica.store.getResolveState(), @@ -1453,6 +1628,8 @@ func IsFakeRegionError(err *errorpb.Error) bool { return err != nil && err.GetEpochNotMatch() != nil && len(err.GetEpochNotMatch().CurrentRegions) == 0 } +const slowLogSendReqTime = 100 * time.Millisecond + // SendReqCtx sends a request to tikv server and return response and RPCCtx of this RPC. func (s *RegionRequestSender) SendReqCtx( bo *retry.Backoffer, @@ -1519,6 +1696,8 @@ func (s *RegionRequestSender) SendReqCtx( } s.reset() + startTime := time.Now() + startBackOff := bo.GetTotalSleep() retryTimes = 0 defer func() { if retryTimes > 0 { @@ -1538,7 +1717,6 @@ func (s *RegionRequestSender) SendReqCtx( }() } - totalErrors := make(map[string]int) for { if retryTimes > 0 { if retryTimes%100 == 0 { @@ -1570,11 +1748,13 @@ func (s *RegionRequestSender) SendReqCtx( // TODO: Change the returned error to something like "region missing in cache", // and handle this error like EpochNotMatch, which means to re-split the request and retry. - s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, retryTimes, req, totalErrors) if s.replicaSelector != nil { if err := s.replicaSelector.getBaseReplicaSelector().backoffOnNoCandidate(bo); err != nil { return nil, nil, retryTimes, err } + if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout { + s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, retryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout) + } } resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}) return resp, nil, retryTimes, err @@ -1616,8 +1796,10 @@ func (s *RegionRequestSender) SendReqCtx( resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout) req.IsRetryRequest = true if err != nil { - msg := fmt.Sprintf("send request failed, err: %v", err.Error()) - s.logSendReqError(bo, msg, regionID, retryTimes, req, totalErrors) + if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout { + msg := fmt.Sprintf("send request failed, err: %v", err.Error()) + s.logSendReqError(bo, msg, regionID, retryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout) + } return nil, nil, retryTimes, err } @@ -1648,19 +1830,21 @@ func (s *RegionRequestSender) SendReqCtx( return nil, nil, retryTimes, err } if regionErr != nil { - regionErrLogging := regionErrorToLogging(rpcCtx.Peer.GetId(), regionErr) - totalErrors[regionErrLogging]++ retry, err = s.onRegionError(bo, rpcCtx, req, regionErr) if err != nil { - msg := fmt.Sprintf("send request on region error failed, err: %v", err.Error()) - s.logSendReqError(bo, msg, regionID, retryTimes, req, totalErrors) + if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout { + msg := fmt.Sprintf("send request on region error failed, err: %v", err.Error()) + s.logSendReqError(bo, msg, regionID, retryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout) + } return nil, nil, retryTimes, err } if retry { retryTimes++ continue } - s.logSendReqError(bo, "send request meet region error without retry", regionID, retryTimes, req, totalErrors) + if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout { + s.logSendReqError(bo, "send request meet region error without retry", regionID, retryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout) + } } else { if s.replicaSelector != nil { s.replicaSelector.onSendSuccess(req) @@ -1673,16 +1857,40 @@ func (s *RegionRequestSender) SendReqCtx( } } -func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, regionID RegionVerID, retryTimes int, req *tikvrpc.Request, totalErrors map[string]int) { - var totalErrorStr bytes.Buffer - for err, cnt := range totalErrors { - if totalErrorStr.Len() > 0 { - totalErrorStr.WriteString(", ") - } - totalErrorStr.WriteString(err) - totalErrorStr.WriteString(":") - totalErrorStr.WriteString(strconv.Itoa(cnt)) +func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, regionID RegionVerID, retryTimes int, req *tikvrpc.Request, cost time.Duration, currentBackoffMs int, timeout time.Duration) { + var builder strings.Builder + // build the total round stats string. + builder.WriteString("{total-backoff: ") + builder.WriteString(util.FormatDuration(time.Duration(bo.GetTotalSleep() * int(time.Millisecond)))) + builder.WriteString(", total-backoff-times: ") + builder.WriteString(strconv.Itoa(bo.GetTotalBackoffTimes())) + if s.Stats != nil { + builder.WriteString(", total-rpc: {") + builder.WriteString(s.Stats.String()) + builder.WriteString("}") } + builder.WriteString("}") + totalRoundStats := builder.String() + + // build the current round stats string. + builder.Reset() + builder.WriteString("{time: ") + builder.WriteString(util.FormatDuration(cost)) + builder.WriteString(", backoff: ") + builder.WriteString(util.FormatDuration(time.Duration(currentBackoffMs * int(time.Millisecond)))) + builder.WriteString(", timeout: ") + builder.WriteString(util.FormatDuration(timeout)) + builder.WriteString(", req-max-exec-timeout: ") + builder.WriteString(util.FormatDuration(time.Duration(int64(req.Context.MaxExecutionDurationMs) * int64(time.Millisecond)))) + builder.WriteString(", retry-times: ") + builder.WriteString(strconv.Itoa(retryTimes)) + if s.AccessStats != nil { + builder.WriteString(", replica-access: {") + builder.WriteString(s.AccessStats.String()) + builder.WriteString("}") + } + builder.WriteString("}") + currentRoundStats := builder.String() logutil.Logger(bo.GetCtx()).Info(msg, zap.Uint64("req-ts", req.GetStartTS()), zap.String("req-type", req.Type.String()), @@ -1690,11 +1898,8 @@ func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, r zap.String("replica-read-type", req.ReplicaReadType.String()), zap.Bool("stale-read", req.StaleRead), zap.Stringer("request-sender", s), - zap.Int("retry-times", retryTimes), - zap.Int("total-backoff-ms", bo.GetTotalSleep()), - zap.Int("total-backoff-times", bo.GetTotalBackoffTimes()), - zap.Uint64("max-exec-timeout-ms", req.Context.MaxExecutionDurationMs), - zap.String("total-region-errors", totalErrorStr.String())) + zap.String("total-round-stats", totalRoundStats), + zap.String("current-round-stats", currentRoundStats)) } // RPCCancellerCtxKey is context key attach rpc send cancelFunc collector to ctx. @@ -1842,7 +2047,7 @@ func (s *RegionRequestSender) sendReqToRegion( rpcCtx.Store.healthStatus.recordClientSideSlowScoreStat(rpcDuration) } if s.Stats != nil { - RecordRegionRequestRuntimeStats(s.Stats, req.Type, rpcDuration) + s.Stats.RecordRPCRuntimeStats(req.Type, rpcDuration) if val, fpErr := util.EvalFailpoint("tikvStoreRespResult"); fpErr == nil { if val.(bool) { if req.Type == tikvrpc.CmdCop && bo.GetTotalSleep() == 0 { @@ -1909,11 +2114,16 @@ func (s *RegionRequestSender) sendReqToRegion( if err != nil { s.rpcError = err - + if s.Stats != nil { + errStr := errors.Cause(err).Error() + s.Stats.RecordRPCErrorStats(errStr) + s.recordRPCAccessInfo(req, rpcCtx, errStr) + } // Because in rpc logic, context.Cancel() will be transferred to rpcContext.Cancel error. For rpcContext cancel, // we need to retry the request. But for context cancel active, for example, limitExec gets the required rows, // we shouldn't retry the request, it will go to backoff and hang in retry logic. if ctx.Err() != nil && errors.Cause(ctx.Err()) == context.Canceled { + metrics.TiKVRPCErrorCounter.WithLabelValues("context-canceled", storeIDLabel(rpcCtx)).Inc() return nil, false, errors.WithStack(ctx.Err()) } @@ -1930,6 +2140,13 @@ func (s *RegionRequestSender) sendReqToRegion( return } +func storeIDLabel(rpcCtx *RPCContext) string { + if rpcCtx != nil && rpcCtx.Store != nil { + return strconv.FormatUint(rpcCtx.Store.storeID, 10) + } + return "nil" +} + func (s *RegionRequestSender) getStoreToken(st *Store, limit int64) error { // Checking limit is not thread safe, preferring this for avoiding load in loop. count := st.tokenCount.Load() @@ -1958,19 +2175,25 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, r defer span1.Finish() bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) } + storeLabel := storeIDLabel(ctx) // If it failed because the context is cancelled by ourself, don't retry. if errors.Cause(err) == context.Canceled { + metrics.TiKVRPCErrorCounter.WithLabelValues("context-canceled", storeLabel).Inc() return errors.WithStack(err) } else if LoadShuttingDown() > 0 { + metrics.TiKVRPCErrorCounter.WithLabelValues("shutting-down", storeLabel).Inc() return errors.WithStack(tikverr.ErrTiDBShuttingDown) } else if isCauseByDeadlineExceeded(err) { if s.replicaSelector != nil && s.replicaSelector.onReadReqConfigurableTimeout(req) { + errLabel := "read-timeout-" + strconv.FormatUint(req.MaxExecutionDurationMs, 10) + "ms" + metrics.TiKVRPCErrorCounter.WithLabelValues(errLabel, storeLabel).Inc() return nil } } if status.Code(errors.Cause(err)) == codes.Canceled { select { case <-bo.GetCtx().Done(): + metrics.TiKVRPCErrorCounter.WithLabelValues("grpc-canceled", storeLabel).Inc() return errors.WithStack(err) default: // If we don't cancel, but the error code is Canceled, it may be canceled by keepalive or gRPC remote. @@ -1988,6 +2211,7 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, r } } } + metrics.TiKVRPCErrorCounter.WithLabelValues(errors.Cause(err).Error(), storeLabel).Inc() if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlashCompute { s.regionCache.InvalidateTiFlashComputeStoresIfGRPCError(err) @@ -2064,17 +2288,17 @@ func (s *RegionRequestSender) NeedReloadRegion(ctx *RPCContext) (need bool) { } // regionErrorToLogging constructs the logging content with extra information like returned leader peer id. -func regionErrorToLogging(peerID uint64, e *errorpb.Error) string { - str := regionErrorToLabel(e) +func regionErrorToLogging(e *errorpb.Error, errLabel string) string { + str := errLabel if e.GetNotLeader() != nil { notLeader := e.GetNotLeader() if notLeader.GetLeader() != nil { - str = fmt.Sprintf("%v-%v", str, notLeader.GetLeader().GetId()) + str = fmt.Sprintf("%v_with_leader_%v", str, notLeader.GetLeader().GetId()) } else { - str = fmt.Sprintf("%v-nil", str) + str = fmt.Sprintf("%v_with_no_leader", str) } } - return fmt.Sprintf("%v-%v", peerID, str) + return str } func regionErrorToLabel(e *errorpb.Error) string { @@ -2140,13 +2364,14 @@ func (s *RegionRequestSender) onRegionError( bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) } - // NOTE: Please add the region error handler in the same order of errorpb.Error. - isInternal := false - if req != nil { - isInternal = util.IsInternalRequest(req.GetRequestSource()) + regionErrLabel := regionErrorToLabel(regionErr) + metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrLabel, storeIDLabel(ctx)).Inc() + if s.Stats != nil { + s.Stats.RecordRPCErrorStats(regionErrLabel) + s.recordRPCAccessInfo(req, ctx, regionErrorToLogging(regionErr, regionErrLabel)) } - metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr), strconv.FormatBool(isInternal)).Inc() + // NOTE: Please add the region error handler in the same order of errorpb.Error. if notLeader := regionErr.GetNotLeader(); notLeader != nil { // Retry if error is `NotLeader`. logutil.Logger(bo.GetCtx()).Debug( diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 32ed218d7b..8a72dba3ff 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1219,7 +1219,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { leaderAddr := "" reqTargetAddrs := make(map[string]struct{}) - s.regionRequestSender.RegionRequestRuntimeStats = NewRegionRequestRuntimeStats() + s.regionRequestSender.Stats = NewRegionRequestRuntimeStats() bo := retry.NewBackoffer(context.Background(), 10000) mockRPCClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { reqTargetAddrs[addr] = struct{}{} @@ -1243,7 +1243,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { resetStats := func() { reqTargetAddrs = make(map[string]struct{}) s.regionRequestSender = NewRegionRequestSender(s.cache, mockRPCClient) - s.regionRequestSender.RegionRequestRuntimeStats = NewRegionRequestRuntimeStats() + s.regionRequestSender.Stats = NewRegionRequestRuntimeStats() } //Test different read type. @@ -1266,10 +1266,10 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { regionErr, err := resp.GetRegionError() s.Nil(err) s.True(IsFakeRegionError(regionErr)) - s.Equal(1, len(s.regionRequestSender.Stats)) - s.Equal(int64(3), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 3 rpc - s.Equal(3, len(reqTargetAddrs)) // each rpc to a different store. - s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + s.Equal(1, len(s.regionRequestSender.Stats.RPCStats)) + s.Equal(int64(3), s.regionRequestSender.Stats.RPCStats[tikvrpc.CmdGet].Count) // 3 rpc + s.Equal(3, len(reqTargetAddrs)) // each rpc to a different store. + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. // warn: must rest MaxExecutionDurationMs before retry. resetStats() if staleRead { @@ -1285,9 +1285,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { s.Nil(err) s.Nil(regionErr) s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) - s.Equal(1, len(s.regionRequestSender.Stats)) - s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 1 rpc - s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + s.Equal(1, len(s.regionRequestSender.Stats.RPCStats)) + s.Equal(int64(1), s.regionRequestSender.Stats.RPCStats[tikvrpc.CmdGet].Count) // 1 rpc + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. } } @@ -1482,7 +1482,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadTryFollowerAfterTimeo return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}, nil }} s.regionRequestSender = NewRegionRequestSender(s.cache, mockRPCClient) - s.regionRequestSender.RegionRequestRuntimeStats = NewRegionRequestRuntimeStats() + s.regionRequestSender.Stats = NewRegionRequestRuntimeStats() getLocFn := func() *KeyLocation { loc, err := s.regionRequestSender.regionCache.LocateKey(bo, []byte("a")) s.Nil(err) @@ -1504,9 +1504,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadTryFollowerAfterTimeo s.Nil(err) s.Nil(regionErr) s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) - s.Equal(1, len(s.regionRequestSender.Stats)) - s.Equal(int64(2), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 2 rpc - s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + s.Equal(1, len(s.regionRequestSender.Stats.RPCStats)) + s.Equal(int64(2), s.regionRequestSender.Stats.RPCStats[tikvrpc.CmdGet].Count) // 2 rpc + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. } func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() { diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index a86de2a121..797b739b57 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -40,6 +40,7 @@ import ( "math" "math/rand" "net" + "strconv" "sync" "sync/atomic" "testing" @@ -828,3 +829,46 @@ func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestSenderString() { sender.SendReqCtx(s.bo, tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}), loc.Region, time.Second, tikvrpc.TiKV) s.Equal("{rpcError:, replicaSelector: }", sender.String()) } + +func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestStats() { + reqStats := NewRegionRequestRuntimeStats() + reqStats.RecordRPCRuntimeStats(tikvrpc.CmdGet, time.Second) + reqStats.RecordRPCRuntimeStats(tikvrpc.CmdGet, time.Millisecond) + reqStats.RecordRPCRuntimeStats(tikvrpc.CmdCop, time.Second*2) + reqStats.RecordRPCRuntimeStats(tikvrpc.CmdCop, time.Millisecond*200) + reqStats.RecordRPCErrorStats("context canceled") + reqStats.RecordRPCErrorStats("context canceled") + reqStats.RecordRPCErrorStats("region_not_found") + reqStats.Merge(NewRegionRequestRuntimeStats()) + reqStats2 := NewRegionRequestRuntimeStats() + reqStats2.Merge(reqStats.Clone()) + expecteds := []string{ + // Since map iteration order is random, we need to check all possible orders. + "Get:{num_rpc:2, total_time:1s},Cop:{num_rpc:2, total_time:2.2s}, rpc_errors:{region_not_found:1, context canceled:2}", + "Get:{num_rpc:2, total_time:1s},Cop:{num_rpc:2, total_time:2.2s}, rpc_errors:{context canceled:2, region_not_found:1}", + "Cop:{num_rpc:2, total_time:2.2s},Get:{num_rpc:2, total_time:1s}, rpc_errors:{context canceled:2, region_not_found:1}", + "Cop:{num_rpc:2, total_time:2.2s},Get:{num_rpc:2, total_time:1s}, rpc_errors:{region_not_found:1, context canceled:2}", + } + s.Contains(expecteds, reqStats.String()) + s.Contains(expecteds, reqStats2.String()) + for i := 0; i < 50; i++ { + reqStats.RecordRPCErrorStats("err_" + strconv.Itoa(i)) + } + s.Regexp("{.*err_.*:1.*, other_error:36}", reqStats.RequestErrorStats.String()) + s.Regexp(".*num_rpc.*total_time.*, rpc_errors:{.*err.*, other_error:36}", reqStats.String()) + + access := &ReplicaAccessStats{} + access.recordReplicaAccessInfo(true, false, 1, 2, "data_not_ready") + access.recordReplicaAccessInfo(false, false, 3, 4, "not_leader") + access.recordReplicaAccessInfo(false, true, 5, 6, "server_is_Busy") + s.Equal("{stale_read, peer:1, store:2, err:data_not_ready}, {peer:3, store:4, err:not_leader}, {replica_read, peer:5, store:6, err:server_is_Busy}", access.String()) + for i := 0; i < 20; i++ { + access.recordReplicaAccessInfo(false, false, 5+uint64(i)%2, 6, "server_is_Busy") + } + expecteds = []string{ + // Since map iteration order is random, we need to check all possible orders. + "{stale_read, peer:1, store:2, err:data_not_ready}, {peer:3, store:4, err:not_leader}, {replica_read, peer:5, store:6, err:server_is_Busy}, {peer:5, store:6, err:server_is_Busy}, {peer:6, store:6, err:server_is_Busy}, overflow_count:{{peer:5, error_stats:{server_is_Busy:9}}, {peer:6, error_stats:{server_is_Busy:9}}}", + "{stale_read, peer:1, store:2, err:data_not_ready}, {peer:3, store:4, err:not_leader}, {replica_read, peer:5, store:6, err:server_is_Busy}, {peer:5, store:6, err:server_is_Busy}, {peer:6, store:6, err:server_is_Busy}, overflow_count:{{peer:6, error_stats:{server_is_Busy:9}}, {peer:5, error_stats:{server_is_Busy:9}}}", + } + s.Contains(expecteds, access.String()) +} diff --git a/metrics/metrics.go b/metrics/metrics.go index f5e8827b07..6d6c007203 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -50,6 +50,7 @@ var ( TiKVCoprocessorHistogram *prometheus.HistogramVec TiKVLockResolverCounter *prometheus.CounterVec TiKVRegionErrorCounter *prometheus.CounterVec + TiKVRPCErrorCounter *prometheus.CounterVec TiKVTxnWriteKVCountHistogram *prometheus.HistogramVec TiKVTxnWriteSizeHistogram *prometheus.HistogramVec TiKVRawkvCmdHistogram *prometheus.HistogramVec @@ -221,7 +222,16 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Name: "region_err_total", Help: "Counter of region errors.", ConstLabels: constLabels, - }, []string{LblType, LblScope}) + }, []string{LblType, LblStore}) + + TiKVRPCErrorCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "rpc_err_total", + Help: "Counter of rpc errors.", + ConstLabels: constLabels, + }, []string{LblType, LblStore}) TiKVTxnWriteKVCountHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -805,6 +815,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVCoprocessorHistogram) prometheus.MustRegister(TiKVLockResolverCounter) prometheus.MustRegister(TiKVRegionErrorCounter) + prometheus.MustRegister(TiKVRPCErrorCounter) prometheus.MustRegister(TiKVTxnWriteKVCountHistogram) prometheus.MustRegister(TiKVTxnWriteSizeHistogram) prometheus.MustRegister(TiKVRawkvCmdHistogram) diff --git a/tikv/region.go b/tikv/region.go index 7753d4478b..52da6b6bfa 100644 --- a/tikv/region.go +++ b/tikv/region.go @@ -135,11 +135,6 @@ const ( NullspaceID KeyspaceID = apicodec.NullspaceID ) -// RecordRegionRequestRuntimeStats records request runtime stats. -func RecordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*locate.RPCRuntimeStats, cmd tikvrpc.CmdType, d time.Duration) { - locate.RecordRegionRequestRuntimeStats(stats, cmd, d) -} - // Store contains a kv process's address. type Store = locate.Store @@ -190,7 +185,7 @@ func WithMatchStores(stores []uint64) StoreSelectorOption { } // NewRegionRequestRuntimeStats returns a new RegionRequestRuntimeStats. -func NewRegionRequestRuntimeStats() RegionRequestRuntimeStats { +func NewRegionRequestRuntimeStats() *RegionRequestRuntimeStats { return locate.NewRegionRequestRuntimeStats() } diff --git a/txnkv/txnsnapshot/client_helper.go b/txnkv/txnsnapshot/client_helper.go index bd9278069d..6944c75536 100644 --- a/txnkv/txnsnapshot/client_helper.go +++ b/txnkv/txnsnapshot/client_helper.go @@ -62,7 +62,7 @@ type ClientHelper struct { committedLocks *util.TSSet client client.Client resolveLite bool - locate.RegionRequestRuntimeStats + Stats *locate.RegionRequestRuntimeStats } // NewClientHelper creates a helper instance. @@ -81,7 +81,7 @@ func NewClientHelper(store kvstore, resolvedLocks *util.TSSet, committedLocks *u func (ch *ClientHelper) ResolveLocksWithOpts(bo *retry.Backoffer, opts txnlock.ResolveLocksOptions) (txnlock.ResolveLockResult, error) { if ch.Stats != nil { defer func(start time.Time) { - locate.RecordRegionRequestRuntimeStats(ch.Stats, tikvrpc.CmdResolveLock, time.Since(start)) + ch.Stats.RecordRPCRuntimeStats(tikvrpc.CmdResolveLock, time.Since(start)) }(time.Now()) } opts.ForRead = true @@ -103,7 +103,7 @@ func (ch *ClientHelper) ResolveLocksWithOpts(bo *retry.Backoffer, opts txnlock.R func (ch *ClientHelper) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*txnlock.Lock) (int64, error) { if ch.Stats != nil { defer func(start time.Time) { - locate.RecordRegionRequestRuntimeStats(ch.Stats, tikvrpc.CmdResolveLock, time.Since(start)) + ch.Stats.RecordRPCRuntimeStats(tikvrpc.CmdResolveLock, time.Since(start)) }(time.Now()) } msBeforeTxnExpired, resolvedLocks, committedLocks, err := ch.lockResolver.ResolveLocksForRead(bo, callerStartTS, locks, ch.resolveLite) diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index f40e6bc984..58c3a755fb 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -408,7 +408,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, cli := NewClientHelper(s.store, &s.resolvedLocks, &s.committedLocks, false) s.mu.RLock() if s.mu.stats != nil { - cli.Stats = make(map[tikvrpc.CmdType]*locate.RPCRuntimeStats) + cli.Stats = locate.NewRegionRequestRuntimeStats() defer func() { s.mergeRegionRequestStats(cli.Stats) }() @@ -657,7 +657,7 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] cli := NewClientHelper(s.store, &s.resolvedLocks, &s.committedLocks, true) s.mu.RLock() if s.mu.stats != nil { - cli.Stats = make(map[tikvrpc.CmdType]*locate.RPCRuntimeStats) + cli.Stats = locate.NewRegionRequestRuntimeStats() defer func() { s.mergeRegionRequestStats(cli.Stats) }() @@ -1080,25 +1080,17 @@ func (s *KVSnapshot) recordBackoffInfo(bo *retry.Backoffer) { } } -func (s *KVSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*locate.RPCRuntimeStats) { +func (s *KVSnapshot) mergeRegionRequestStats(rpcStats *locate.RegionRequestRuntimeStats) { s.mu.Lock() defer s.mu.Unlock() if s.mu.stats == nil { return } - if s.mu.stats.rpcStats.Stats == nil { - s.mu.stats.rpcStats.Stats = stats + if s.mu.stats.rpcStats == nil { + s.mu.stats.rpcStats = rpcStats return } - for k, v := range stats { - stat, ok := s.mu.stats.rpcStats.Stats[k] - if !ok { - s.mu.stats.rpcStats.Stats[k] = v - continue - } - stat.Count += v.Count - stat.Consume += v.Consume - } + s.mu.stats.rpcStats.Merge(rpcStats) } // SetKVReadTimeout sets timeout for individual KV read operations under this snapshot @@ -1133,7 +1125,7 @@ func (s *KVSnapshot) SetPipelined(ts uint64) { // SnapshotRuntimeStats records the runtime stats of snapshot. type SnapshotRuntimeStats struct { - rpcStats locate.RegionRequestRuntimeStats + rpcStats *locate.RegionRequestRuntimeStats backoffSleepMS map[string]int backoffTimes map[string]int scanDetail *util.ScanDetail @@ -1143,11 +1135,9 @@ type SnapshotRuntimeStats struct { // Clone implements the RuntimeStats interface. func (rs *SnapshotRuntimeStats) Clone() *SnapshotRuntimeStats { - newRs := SnapshotRuntimeStats{rpcStats: locate.NewRegionRequestRuntimeStats()} - if rs.rpcStats.Stats != nil { - for k, v := range rs.rpcStats.Stats { - newRs.rpcStats.Stats[k] = v - } + newRs := SnapshotRuntimeStats{} + if rs.rpcStats != nil { + newRs.rpcStats = rs.rpcStats.Clone() } if len(rs.backoffSleepMS) > 0 { newRs.backoffSleepMS = make(map[string]int) @@ -1177,9 +1167,9 @@ func (rs *SnapshotRuntimeStats) Clone() *SnapshotRuntimeStats { // Merge implements the RuntimeStats interface. func (rs *SnapshotRuntimeStats) Merge(other *SnapshotRuntimeStats) { - if other.rpcStats.Stats != nil { - if rs.rpcStats.Stats == nil { - rs.rpcStats.Stats = make(map[tikvrpc.CmdType]*locate.RPCRuntimeStats, len(other.rpcStats.Stats)) + if other.rpcStats != nil { + if rs.rpcStats == nil { + rs.rpcStats = locate.NewRegionRequestRuntimeStats() } rs.rpcStats.Merge(other.rpcStats) } @@ -1202,7 +1192,9 @@ func (rs *SnapshotRuntimeStats) Merge(other *SnapshotRuntimeStats) { // String implements fmt.Stringer interface. func (rs *SnapshotRuntimeStats) String() string { var buf bytes.Buffer - buf.WriteString(rs.rpcStats.String()) + if rs.rpcStats != nil { + buf.WriteString(rs.rpcStats.String()) + } for k, v := range rs.backoffTimes { if buf.Len() > 0 { buf.WriteByte(',') @@ -1226,11 +1218,11 @@ func (rs *SnapshotRuntimeStats) String() string { // GetCmdRPCCount returns the count of the corresponding kind of rpc requests func (rs *SnapshotRuntimeStats) GetCmdRPCCount(cmd tikvrpc.CmdType) int64 { - if rs.rpcStats.Stats == nil { + if rs.rpcStats == nil || len(rs.rpcStats.RPCStats) == 0 { return 0 } - stats, ok := rs.rpcStats.Stats[cmd] + stats, ok := rs.rpcStats.RPCStats[cmd] if !ok { return 0 } diff --git a/txnkv/txnsnapshot/test_probe.go b/txnkv/txnsnapshot/test_probe.go index 8900243d7b..0d06769eb3 100644 --- a/txnkv/txnsnapshot/test_probe.go +++ b/txnkv/txnsnapshot/test_probe.go @@ -18,7 +18,6 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/tikv/client-go/v2/config/retry" "github.com/tikv/client-go/v2/internal/locate" - "github.com/tikv/client-go/v2/tikvrpc" ) // SnapshotProbe exposes some snapshot utilities for testing purpose. @@ -27,8 +26,8 @@ type SnapshotProbe struct { } // MergeRegionRequestStats merges RPC runtime stats into snapshot's stats. -func (s SnapshotProbe) MergeRegionRequestStats(stats map[tikvrpc.CmdType]*locate.RPCRuntimeStats) { - s.mergeRegionRequestStats(stats) +func (s SnapshotProbe) MergeRegionRequestStats(rpcStats *locate.RegionRequestRuntimeStats) { + s.mergeRegionRequestStats(rpcStats) } // RecordBackoffInfo records backoff stats into snapshot's stats.