From d651c6b91f2cbd0b22ae87a22c06317cdee12462 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 8 Nov 2023 11:18:42 +0800 Subject: [PATCH 1/7] core: batch get region size (#7252) close tikv/pd#7248 Signed-off-by: nolouch Co-authored-by: nolouch Co-authored-by: ShuNing --- pkg/core/region.go | 36 ++++++++--- pkg/core/region_test.go | 120 +++++++++++++++++++++++++++++++++++ server/cluster/cluster.go | 3 +- tests/server/api/api_test.go | 6 +- 4 files changed, 152 insertions(+), 13 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index 2ac323a12722..c9daa69c477c 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -41,7 +41,10 @@ import ( "go.uber.org/zap" ) -const randomRegionMaxRetry = 10 +const ( + randomRegionMaxRetry = 10 + scanRegionLimit = 1000 +) // errRegionIsStale is error info for region is stale. func errRegionIsStale(region *metapb.Region, origin *metapb.Region) error { @@ -1610,16 +1613,31 @@ func (r *RegionsInfo) ScanRegionWithIterator(startKey []byte, iterator func(regi // GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range. func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 { - r.t.RLock() - defer r.t.RUnlock() var size int64 - r.tree.scanRange(startKey, func(region *RegionInfo) bool { - if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 { - return false + for { + r.t.RLock() + var cnt int + r.tree.scanRange(startKey, func(region *RegionInfo) bool { + if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 { + return false + } + if cnt >= scanRegionLimit { + return false + } + cnt++ + startKey = region.GetEndKey() + size += region.GetApproximateSize() + return true + }) + r.t.RUnlock() + if cnt == 0 { + break } - size += region.GetApproximateSize() - return true - }) + if len(startKey) == 0 { + break + } + } + return size } diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 50302de920e3..508e7aa59aae 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -18,8 +18,10 @@ import ( "crypto/rand" "fmt" "math" + mrand "math/rand" "strconv" "testing" + "time" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" @@ -658,6 +660,124 @@ func BenchmarkRandomRegion(b *testing.B) { } } +func BenchmarkRandomSetRegion(b *testing.B) { + regions := NewRegionsInfo() + var items []*RegionInfo + for i := 0; i < 1000000; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1)), + }, peer) + origin, overlaps, rangeChanged := regions.SetRegion(region) + regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + items = append(items, region) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + item := items[i%len(items)] + item.approximateKeys = int64(200000) + item.approximateSize = int64(20) + origin, overlaps, rangeChanged := regions.SetRegion(item) + regions.UpdateSubTree(item, origin, overlaps, rangeChanged) + } +} + +func TestGetRegionSizeByRange(t *testing.T) { + regions := NewRegionsInfo() + nums := 1000010 + for i := 0; i < nums; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + endKey := []byte(fmt.Sprintf("%20d", i+1)) + if i == nums-1 { + endKey = []byte("") + } + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: endKey, + }, peer, SetApproximateSize(10)) + origin, overlaps, rangeChanged := regions.SetRegion(region) + regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + } + totalSize := regions.GetRegionSizeByRange([]byte(""), []byte("")) + require.Equal(t, int64(nums*10), totalSize) + for i := 1; i < 10; i++ { + verifyNum := nums / i + endKey := fmt.Sprintf("%20d", verifyNum) + totalSize := regions.GetRegionSizeByRange([]byte(""), []byte(endKey)) + require.Equal(t, int64(verifyNum*10), totalSize) + } +} + +func BenchmarkRandomSetRegionWithGetRegionSizeByRange(b *testing.B) { + regions := NewRegionsInfo() + var items []*RegionInfo + for i := 0; i < 1000000; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1)), + }, peer, SetApproximateSize(10)) + origin, overlaps, rangeChanged := regions.SetRegion(region) + regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + items = append(items, region) + } + b.ResetTimer() + go func() { + for { + regions.GetRegionSizeByRange([]byte(""), []byte("")) + time.Sleep(time.Millisecond) + } + }() + for i := 0; i < b.N; i++ { + item := items[i%len(items)] + item.approximateKeys = int64(200000) + origin, overlaps, rangeChanged := regions.SetRegion(item) + regions.UpdateSubTree(item, origin, overlaps, rangeChanged) + } +} + +func BenchmarkRandomSetRegionWithGetRegionSizeByRangeParallel(b *testing.B) { + regions := NewRegionsInfo() + var items []*RegionInfo + for i := 0; i < 1000000; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1)), + }, peer) + origin, overlaps, rangeChanged := regions.SetRegion(region) + regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + items = append(items, region) + } + b.ResetTimer() + go func() { + for { + regions.GetRegionSizeByRange([]byte(""), []byte("")) + time.Sleep(time.Millisecond) + } + }() + + b.RunParallel( + func(pb *testing.PB) { + for pb.Next() { + item := items[mrand.Intn(len(items))] + n := item.Clone(SetApproximateSize(20)) + origin, overlaps, rangeChanged := regions.SetRegion(n) + regions.UpdateSubTree(item, origin, overlaps, rangeChanged) + } + }, + ) +} + const keyLength = 100 func randomBytes(n int) []byte { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 25a47a7fca9b..8362ee9f3314 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1846,12 +1846,13 @@ func (c *RaftCluster) checkStores() { if err := c.ReadyToServe(storeID); err != nil { log.Error("change store to serving failed", zap.Stringer("store", store.GetMeta()), + zap.Int("region-count", c.GetTotalRegionCount()), errs.ZapError(err)) } } else if c.IsPrepared() { threshold := c.getThreshold(stores, store) - log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold)) regionSize := float64(store.GetRegionSize()) + log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold), zap.Float64("region-size", regionSize)) if regionSize >= threshold { if err := c.ReadyToServe(storeID); err != nil { log.Error("change store to serving failed", diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index ff430f1b848b..04bcdc0d4614 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -914,7 +914,7 @@ func TestPreparingProgress(t *testing.T) { tests.MustPutStore(re, cluster, store) } for i := 0; i < 100; i++ { - tests.MustPutRegion(re, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("p%d", i)), []byte(fmt.Sprintf("%d", i+1)), core.SetApproximateSize(10)) + tests.MustPutRegion(re, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("%20d", i)), []byte(fmt.Sprintf("%20d", i+1)), core.SetApproximateSize(10)) } // no store preparing output := sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusNotFound) @@ -941,8 +941,8 @@ func TestPreparingProgress(t *testing.T) { re.Equal(math.MaxFloat64, p.LeftSeconds) // update size - tests.MustPutRegion(re, cluster, 1000, 4, []byte(fmt.Sprintf("%d", 1000)), []byte(fmt.Sprintf("%d", 1001)), core.SetApproximateSize(10)) - tests.MustPutRegion(re, cluster, 1001, 5, []byte(fmt.Sprintf("%d", 1001)), []byte(fmt.Sprintf("%d", 1002)), core.SetApproximateSize(40)) + tests.MustPutRegion(re, cluster, 1000, 4, []byte(fmt.Sprintf("%20d", 1000)), []byte(fmt.Sprintf("%20d", 1001)), core.SetApproximateSize(10)) + tests.MustPutRegion(re, cluster, 1001, 5, []byte(fmt.Sprintf("%20d", 1001)), []byte(fmt.Sprintf("%20d", 1002)), core.SetApproximateSize(40)) time.Sleep(2 * time.Second) output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusOK) re.NoError(json.Unmarshal(output, &p)) From 4457ac2717644a39a1ccfaeeb5cfb7ecd0542e99 Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Wed, 8 Nov 2023 12:14:13 +0800 Subject: [PATCH 2/7] mcs/scheduling: fix typo (#7333) ref tikv/pd#5839 Signed-off-by: Cabinfever_B Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mcs/scheduling/server/grpc_service.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index 79c5c293ee7e..b865e917d75c 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -65,7 +65,7 @@ type Service struct { *Server } -// NewService creates a new TSO service. +// NewService creates a new scheduling service. func NewService[T ConfigProvider](svr bs.Server) registry.RegistrableService { server, ok := svr.(*Server) if !ok { @@ -118,7 +118,7 @@ func (s *heartbeatServer) Recv() (*schedulingpb.RegionHeartbeatRequest, error) { return req, nil } -// RegionHeartbeat implements gRPC PDServer. +// RegionHeartbeat implements gRPC SchedulingServer. func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeatServer) error { var ( server = &heartbeatServer{stream: stream} @@ -168,7 +168,7 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat } } -// StoreHeartbeat implements gRPC PDServer. +// StoreHeartbeat implements gRPC SchedulingServer. func (s *Service) StoreHeartbeat(ctx context.Context, request *schedulingpb.StoreHeartbeatRequest) (*schedulingpb.StoreHeartbeatResponse, error) { c := s.GetCluster() if c == nil { @@ -202,7 +202,7 @@ func (s *Service) SplitRegions(ctx context.Context, request *schedulingpb.SplitR }, nil } -// ScatterRegions implements gRPC PDServer. +// ScatterRegions implements gRPC SchedulingServer. func (s *Service) ScatterRegions(ctx context.Context, request *schedulingpb.ScatterRegionsRequest) (*schedulingpb.ScatterRegionsResponse, error) { c := s.GetCluster() if c == nil { @@ -261,7 +261,7 @@ func (s *Service) GetOperator(ctx context.Context, request *schedulingpb.GetOper }, nil } -// AskBatchSplit implements gRPC PDServer. +// AskBatchSplit implements gRPC SchedulingServer. func (s *Service) AskBatchSplit(ctx context.Context, request *schedulingpb.AskBatchSplitRequest) (*schedulingpb.AskBatchSplitResponse, error) { c := s.GetCluster() if c == nil { From a98295c22490b9fdb0e2cb052b68691a9d56f6dc Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 8 Nov 2023 16:05:14 +0800 Subject: [PATCH 3/7] mcs: fix participant name (#7335) close tikv/pd#7336 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mcs/resourcemanager/server/config.go | 20 ++++++++++++++++++++ pkg/mcs/resourcemanager/server/server.go | 6 +++--- pkg/mcs/scheduling/server/config/config.go | 15 +++++++++++++++ pkg/mcs/scheduling/server/server.go | 8 ++++---- 4 files changed, 42 insertions(+), 7 deletions(-) diff --git a/pkg/mcs/resourcemanager/server/config.go b/pkg/mcs/resourcemanager/server/config.go index 3f64b2987fd6..10e916128424 100644 --- a/pkg/mcs/resourcemanager/server/config.go +++ b/pkg/mcs/resourcemanager/server/config.go @@ -250,6 +250,26 @@ func (c *Config) adjustLog(meta *configutil.ConfigMetaData) { } } +// GetName returns the Name +func (c *Config) GetName() string { + return c.Name +} + +// GeBackendEndpoints returns the BackendEndpoints +func (c *Config) GeBackendEndpoints() string { + return c.BackendEndpoints +} + +// GetListenAddr returns the ListenAddr +func (c *Config) GetListenAddr() string { + return c.ListenAddr +} + +// GetAdvertiseListenAddr returns the AdvertiseListenAddr +func (c *Config) GetAdvertiseListenAddr() string { + return c.AdvertiseListenAddr +} + // GetTLSConfig returns the TLS config. func (c *Config) GetTLSConfig() *grpcutil.TLSConfig { return &c.Security.TLSConfig diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 47248208c8a2..7b660c076055 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -296,14 +296,14 @@ func (s *Server) startServer() (err error) { // different service modes provided by the same pd-server binary serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix())) - uniqueName := s.cfg.ListenAddr + uniqueName := s.cfg.GetAdvertiseListenAddr() uniqueID := memberutil.GenerateUniqueID(uniqueName) log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) s.participant = member.NewParticipant(s.GetClient(), utils.ResourceManagerServiceName) p := &resource_manager.Participant{ Name: uniqueName, Id: uniqueID, // id is unique among all participants - ListenUrls: []string{s.cfg.AdvertiseListenAddr}, + ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()}, } s.participant.InitInfo(p, endpoint.ResourceManagerSvcRootPath(s.clusterID), utils.PrimaryKey, "primary election") @@ -312,7 +312,7 @@ func (s *Server) startServer() (err error) { manager: NewManager[*Server](s), } - if err := s.InitListener(s.GetTLSConfig(), s.cfg.ListenAddr); err != nil { + if err := s.InitListener(s.GetTLSConfig(), s.cfg.GetListenAddr()); err != nil { return err } diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index 772eab835f19..a211c989c64f 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -164,11 +164,26 @@ func (c *Config) adjustLog(meta *configutil.ConfigMetaData) { } } +// GetName returns the Name +func (c *Config) GetName() string { + return c.Name +} + +// GeBackendEndpoints returns the BackendEndpoints +func (c *Config) GeBackendEndpoints() string { + return c.BackendEndpoints +} + // GetListenAddr returns the ListenAddr func (c *Config) GetListenAddr() string { return c.ListenAddr } +// GetAdvertiseListenAddr returns the AdvertiseListenAddr +func (c *Config) GetAdvertiseListenAddr() string { + return c.AdvertiseListenAddr +} + // GetTLSConfig returns the TLS config. func (c *Config) GetTLSConfig() *grpcutil.TLSConfig { return &c.Security.TLSConfig diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 1790cb2b4be2..4304ffb218a8 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -405,21 +405,21 @@ func (s *Server) startServer() (err error) { // different service modes provided by the same pd-server binary serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix())) - uniqueName := s.cfg.ListenAddr + uniqueName := s.cfg.GetAdvertiseListenAddr() uniqueID := memberutil.GenerateUniqueID(uniqueName) log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) s.participant = member.NewParticipant(s.GetClient(), utils.SchedulingServiceName) p := &schedulingpb.Participant{ Name: uniqueName, Id: uniqueID, // id is unique among all participants - ListenUrls: []string{s.cfg.AdvertiseListenAddr}, + ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()}, } s.participant.InitInfo(p, endpoint.SchedulingSvcRootPath(s.clusterID), utils.PrimaryKey, "primary election") s.service = &Service{Server: s} s.AddServiceReadyCallback(s.startCluster) s.AddServiceExitCallback(s.stopCluster) - if err := s.InitListener(s.GetTLSConfig(), s.cfg.ListenAddr); err != nil { + if err := s.InitListener(s.GetTLSConfig(), s.cfg.GetListenAddr()); err != nil { return err } @@ -443,7 +443,7 @@ func (s *Server) startServer() (err error) { return err } s.serviceRegister = discovery.NewServiceRegister(s.Context(), s.GetClient(), strconv.FormatUint(s.clusterID, 10), - utils.SchedulingServiceName, s.cfg.AdvertiseListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds) + utils.SchedulingServiceName, s.cfg.GetAdvertiseListenAddr(), serializedEntry, discovery.DefaultLeaseInSeconds) if err := s.serviceRegister.Register(); err != nil { log.Error("failed to register the service", zap.String("service-name", utils.SchedulingServiceName), errs.ZapError(err)) return err From d189a42894f5e2d957f9f4da0790cbe1468558da Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 8 Nov 2023 17:08:12 +0800 Subject: [PATCH 4/7] mcs: solve stream error when forward tso (#7327) close tikv/pd#7320 Signed-off-by: lhy1024 --- pkg/utils/grpcutil/grpcutil.go | 14 +++++++ server/gc_service.go | 11 +----- server/grpc_service.go | 71 +++++++++++++++++----------------- 3 files changed, 50 insertions(+), 46 deletions(-) diff --git a/pkg/utils/grpcutil/grpcutil.go b/pkg/utils/grpcutil/grpcutil.go index ee9d85a4ee19..44d45ff4c70d 100644 --- a/pkg/utils/grpcutil/grpcutil.go +++ b/pkg/utils/grpcutil/grpcutil.go @@ -18,7 +18,9 @@ import ( "context" "crypto/tls" "crypto/x509" + "io" "net/url" + "strings" "time" "github.com/pingcap/errors" @@ -28,6 +30,7 @@ import ( "go.etcd.io/etcd/pkg/transport" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" ) @@ -221,3 +224,14 @@ func CheckStream(ctx context.Context, cancel context.CancelFunc, done chan struc } <-done } + +// NeedRebuildConnection checks if the error is a connection error. +func NeedRebuildConnection(err error) bool { + return err == io.EOF || + strings.Contains(err.Error(), codes.Unavailable.String()) || // Unavailable indicates the service is currently unavailable. This is a most likely a transient condition. + strings.Contains(err.Error(), codes.DeadlineExceeded.String()) || // DeadlineExceeded means operation expired before completion. + strings.Contains(err.Error(), codes.Internal.String()) || // Internal errors. + strings.Contains(err.Error(), codes.Unknown.String()) || // Unknown error. + strings.Contains(err.Error(), codes.ResourceExhausted.String()) // ResourceExhausted is returned when either the client or the server has exhausted their resources. + // Besides, we don't need to rebuild the connection if the code is Canceled, which means the client cancelled the request. +} diff --git a/server/gc_service.go b/server/gc_service.go index d8a0158920d8..90333654e5e6 100644 --- a/server/gc_service.go +++ b/server/gc_service.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/endpoint" - "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/tsoutil" "go.etcd.io/etcd/clientv3" @@ -107,15 +106,7 @@ func (s *GrpcServer) UpdateServiceSafePointV2(ctx context.Context, request *pdpb return rsp.(*pdpb.UpdateServiceSafePointV2Response), err } - var ( - nowTSO pdpb.Timestamp - err error - ) - if s.IsAPIServiceMode() { - nowTSO, err = s.getGlobalTSOFromTSOServer(ctx) - } else { - nowTSO, err = s.tsoAllocatorManager.HandleRequest(ctx, tso.GlobalDCLocation, 1) - } + nowTSO, err := s.getGlobalTSO(ctx) if err != nil { return nil, err } diff --git a/server/grpc_service.go b/server/grpc_service.go index 4aa6dc5b1da8..05ec38919cbe 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -2002,15 +2002,7 @@ func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb return nil, err } } - var ( - nowTSO pdpb.Timestamp - err error - ) - if s.IsAPIServiceMode() { - nowTSO, err = s.getGlobalTSOFromTSOServer(ctx) - } else { - nowTSO, err = s.tsoAllocatorManager.HandleRequest(ctx, tso.GlobalDCLocation, 1) - } + nowTSO, err := s.getGlobalTSO(ctx) if err != nil { return nil, err } @@ -2608,7 +2600,10 @@ func forwardReportBucketClientToServer(forwardStream pdpb.PD_ReportBucketsClient } } -func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timestamp, error) { +func (s *GrpcServer) getGlobalTSO(ctx context.Context) (pdpb.Timestamp, error) { + if !s.IsAPIServiceMode() { + return s.tsoAllocatorManager.HandleRequest(ctx, tso.GlobalDCLocation, 1) + } request := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ ClusterId: s.clusterID, @@ -2622,9 +2617,28 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest forwardStream tsopb.TSO_TsoClient ts *tsopb.TsoResponse err error + ok bool ) + handleStreamError := func(err error) (needRetry bool) { + if strings.Contains(err.Error(), errs.NotLeaderErr) { + s.tsoPrimaryWatcher.ForceLoad() + log.Warn("force to load tso primary address due to error", zap.Error(err), zap.String("tso-addr", forwardedHost)) + return true + } + if grpcutil.NeedRebuildConnection(err) { + s.tsoClientPool.Lock() + delete(s.tsoClientPool.clients, forwardedHost) + s.tsoClientPool.Unlock() + log.Warn("client connection removed due to error", zap.Error(err), zap.String("tso-addr", forwardedHost)) + return true + } + return false + } for i := 0; i < maxRetryTimesRequestTSOServer; i++ { - forwardedHost, ok := s.GetServicePrimaryAddr(ctx, utils.TSOServiceName) + if i > 0 { + time.Sleep(retryIntervalRequestTSOServer) + } + forwardedHost, ok = s.GetServicePrimaryAddr(ctx, utils.TSOServiceName) if !ok || forwardedHost == "" { return pdpb.Timestamp{}, ErrNotFoundTSOAddr } @@ -2632,32 +2646,25 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest if err != nil { return pdpb.Timestamp{}, err } - err := forwardStream.Send(request) + err = forwardStream.Send(request) if err != nil { - s.tsoClientPool.Lock() - delete(s.tsoClientPool.clients, forwardedHost) - s.tsoClientPool.Unlock() - continue + if needRetry := handleStreamError(err); needRetry { + continue + } + log.Error("send request to tso primary server failed", zap.Error(err), zap.String("tso-addr", forwardedHost)) + return pdpb.Timestamp{}, err } ts, err = forwardStream.Recv() if err != nil { - if strings.Contains(err.Error(), errs.NotLeaderErr) { - s.tsoPrimaryWatcher.ForceLoad() - time.Sleep(retryIntervalRequestTSOServer) - continue - } - if strings.Contains(err.Error(), codes.Unavailable.String()) { - s.tsoClientPool.Lock() - delete(s.tsoClientPool.clients, forwardedHost) - s.tsoClientPool.Unlock() + if needRetry := handleStreamError(err); needRetry { continue } - log.Error("get global tso from tso service primary addr failed", zap.Error(err), zap.String("tso-addr", forwardedHost)) + log.Error("receive response from tso primary server failed", zap.Error(err), zap.String("tso-addr", forwardedHost)) return pdpb.Timestamp{}, err } return *ts.GetTimestamp(), nil } - log.Error("get global tso from tso service primary addr failed after retry", zap.Error(err), zap.String("tso-addr", forwardedHost)) + log.Error("get global tso from tso primary server failed after retry", zap.Error(err), zap.String("tso-addr", forwardedHost)) return pdpb.Timestamp{}, err } @@ -2906,15 +2913,7 @@ func (s *GrpcServer) SetExternalTimestamp(ctx context.Context, request *pdpb.Set return rsp.(*pdpb.SetExternalTimestampResponse), nil } - var ( - nowTSO pdpb.Timestamp - err error - ) - if s.IsAPIServiceMode() { - nowTSO, err = s.getGlobalTSOFromTSOServer(ctx) - } else { - nowTSO, err = s.tsoAllocatorManager.HandleRequest(ctx, tso.GlobalDCLocation, 1) - } + nowTSO, err := s.getGlobalTSO(ctx) if err != nil { return nil, err } From 2c07c241114fe9afabd9927ecbee61c4252f2d8e Mon Sep 17 00:00:00 2001 From: Sparkle <1284531+baurine@users.noreply.github.com> Date: Wed, 8 Nov 2023 18:18:42 +0800 Subject: [PATCH 5/7] chore(dashboard): update tidb dashboard verstion to v2023.11.08.1 (#7339) close tikv/pd#7340 Signed-off-by: baurine <2008.hbl@gmail.com> --- go.mod | 2 +- go.sum | 4 ++-- tests/integrations/client/go.mod | 2 +- tests/integrations/client/go.sum | 4 ++-- tests/integrations/mcs/go.mod | 2 +- tests/integrations/mcs/go.sum | 4 ++-- tests/integrations/tso/go.mod | 2 +- tests/integrations/tso/go.sum | 4 ++-- 8 files changed, 12 insertions(+), 12 deletions(-) diff --git a/go.mod b/go.mod index e8da2542be20..0306d70f7a35 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 - github.com/pingcap/tidb-dashboard v0.0.0-20231102083420-865955cd15d9 + github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537 github.com/prometheus/client_golang v1.11.1 github.com/prometheus/common v0.26.0 github.com/sasha-s/go-deadlock v0.2.0 diff --git a/go.sum b/go.sum index 28e210ef1cd0..fb1783218640 100644 --- a/go.sum +++ b/go.sum @@ -446,8 +446,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20231102083420-865955cd15d9 h1:xIeaDUq2ItkYMIgpWXAYKC/N3hs8aurfFvvz79lhHYE= -github.com/pingcap/tidb-dashboard v0.0.0-20231102083420-865955cd15d9/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw= +github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537 h1:wnHt7ETIB0vm+gbLx8QhcIEmRtrT4QlWlfpcI9vjxOk= +github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/tests/integrations/client/go.mod b/tests/integrations/client/go.mod index b9b868cf8e35..a4aca195f3fb 100644 --- a/tests/integrations/client/go.mod +++ b/tests/integrations/client/go.mod @@ -119,7 +119,7 @@ require ( github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20231102083420-865955cd15d9 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tests/integrations/client/go.sum b/tests/integrations/client/go.sum index 81fa6fd7b390..ef9c4d2a5f3a 100644 --- a/tests/integrations/client/go.sum +++ b/tests/integrations/client/go.sum @@ -410,8 +410,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20231102083420-865955cd15d9 h1:xIeaDUq2ItkYMIgpWXAYKC/N3hs8aurfFvvz79lhHYE= -github.com/pingcap/tidb-dashboard v0.0.0-20231102083420-865955cd15d9/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw= +github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537 h1:wnHt7ETIB0vm+gbLx8QhcIEmRtrT4QlWlfpcI9vjxOk= +github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index c2dfdbe96ef4..f6df0eb4de0d 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -119,7 +119,7 @@ require ( github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20231102083420-865955cd15d9 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tests/integrations/mcs/go.sum b/tests/integrations/mcs/go.sum index d1b0962ab550..fc1dc1bbea5a 100644 --- a/tests/integrations/mcs/go.sum +++ b/tests/integrations/mcs/go.sum @@ -414,8 +414,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20231102083420-865955cd15d9 h1:xIeaDUq2ItkYMIgpWXAYKC/N3hs8aurfFvvz79lhHYE= -github.com/pingcap/tidb-dashboard v0.0.0-20231102083420-865955cd15d9/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw= +github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537 h1:wnHt7ETIB0vm+gbLx8QhcIEmRtrT4QlWlfpcI9vjxOk= +github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/tests/integrations/tso/go.mod b/tests/integrations/tso/go.mod index e5131f15d911..7e833943e6ee 100644 --- a/tests/integrations/tso/go.mod +++ b/tests/integrations/tso/go.mod @@ -117,7 +117,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20231102083420-865955cd15d9 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tests/integrations/tso/go.sum b/tests/integrations/tso/go.sum index 576c3e75765a..65a7f3e3558a 100644 --- a/tests/integrations/tso/go.sum +++ b/tests/integrations/tso/go.sum @@ -408,8 +408,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20231102083420-865955cd15d9 h1:xIeaDUq2ItkYMIgpWXAYKC/N3hs8aurfFvvz79lhHYE= -github.com/pingcap/tidb-dashboard v0.0.0-20231102083420-865955cd15d9/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw= +github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537 h1:wnHt7ETIB0vm+gbLx8QhcIEmRtrT4QlWlfpcI9vjxOk= +github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= From 0c352271d7413bdf6ac948e11f1a3fb905fe2ccb Mon Sep 17 00:00:00 2001 From: disksing Date: Fri, 10 Nov 2023 12:03:42 +0800 Subject: [PATCH 6/7] dr-autosync: add recover timeout (#6295) ref tikv/pd#4399 Signed-off-by: husharp --- pkg/replication/replication_mode.go | 15 +++++- pkg/replication/replication_mode_test.go | 60 ++++++++++++++++-------- server/config/config.go | 15 +++--- 3 files changed, 62 insertions(+), 28 deletions(-) diff --git a/pkg/replication/replication_mode.go b/pkg/replication/replication_mode.go index 30b34e4596a9..9093f911901a 100644 --- a/pkg/replication/replication_mode.go +++ b/pkg/replication/replication_mode.go @@ -212,6 +212,7 @@ const ( type drAutoSyncStatus struct { State string `json:"state,omitempty"` StateID uint64 `json:"state_id,omitempty"` + AsyncStartTime *time.Time `json:"async_start,omitempty"` RecoverStartTime *time.Time `json:"recover_start,omitempty"` TotalRegions int `json:"total_regions,omitempty"` SyncedRegions int `json:"synced_regions,omitempty"` @@ -262,7 +263,8 @@ func (m *ModeManager) drSwitchToAsyncWithLock(availableStores []uint64) error { log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err } - dr := drAutoSyncStatus{State: drStateAsync, StateID: id, AvailableStores: availableStores} + now := time.Now() + dr := drAutoSyncStatus{State: drStateAsync, StateID: id, AvailableStores: availableStores, AsyncStartTime: &now} if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil { log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err @@ -272,6 +274,15 @@ func (m *ModeManager) drSwitchToAsyncWithLock(availableStores []uint64) error { return nil } +func (m *ModeManager) drDurationSinceAsyncStart() time.Duration { + m.RLock() + defer m.RUnlock() + if m.drAutoSync.AsyncStartTime == nil { + return 0 + } + return time.Since(*m.drAutoSync.AsyncStartTime) +} + func (m *ModeManager) drSwitchToSyncRecover() error { m.Lock() defer m.Unlock() @@ -477,7 +488,7 @@ func (m *ModeManager) tickUpdateState() { m.drSwitchToAsync(storeIDs[primaryUp]) } case drStateAsync: - if canSync { + if canSync && m.drDurationSinceAsyncStart() > m.config.DRAutoSync.WaitRecoverTimeout.Duration { m.drSwitchToSyncRecover() break } diff --git a/pkg/replication/replication_mode_test.go b/pkg/replication/replication_mode_test.go index e01fb7a0b9a5..5cf9f1a14504 100644 --- a/pkg/replication/replication_mode_test.go +++ b/pkg/replication/replication_mode_test.go @@ -16,6 +16,7 @@ package replication import ( "context" + "encoding/json" "errors" "fmt" "testing" @@ -159,6 +160,20 @@ func newMockReplicator(ids []uint64) *mockFileReplicator { } } +func assertLastData(t *testing.T, data string, state string, stateID uint64, availableStores []uint64) { + type status struct { + State string `json:"state"` + StateID uint64 `json:"state_id"` + AvailableStores []uint64 `json:"available_stores"` + } + var s status + err := json.Unmarshal([]byte(data), &s) + require.NoError(t, err) + require.Equal(t, state, s.State) + require.Equal(t, stateID, s.StateID) + require.Equal(t, availableStores, s.AvailableStores) +} + func TestStateSwitch(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) @@ -190,7 +205,7 @@ func TestStateSwitch(t *testing.T) { stateID := rep.drAutoSync.StateID re.NotEqual(uint64(0), stateID) rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "sync", stateID, nil) assertStateIDUpdate := func() { re.NotEqual(stateID, rep.drAutoSync.StateID) stateID = rep.drAutoSync.StateID @@ -207,7 +222,7 @@ func TestStateSwitch(t *testing.T) { re.Equal(drStateAsyncWait, rep.drGetState()) assertStateIDUpdate() rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{1, 2, 3, 4}) re.False(rep.GetReplicationStatus().GetDrAutoSync().GetPauseRegionSplit()) conf.DRAutoSync.PauseRegionSplit = true @@ -218,7 +233,7 @@ func TestStateSwitch(t *testing.T) { rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 2, 3, 4}) // add new store in dr zone. cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2"}) @@ -268,18 +283,19 @@ func TestStateSwitch(t *testing.T) { rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) assertStateIDUpdate() + rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{1, 2, 3, 4}) setStoreState(cluster, "down", "up", "up", "up", "down", "down") rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[2,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{2, 3, 4}) setStoreState(cluster, "up", "down", "up", "up", "down", "down") rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{1, 3, 4}) // async_wait -> async rep.tickUpdateState() @@ -291,26 +307,32 @@ func TestStateSwitch(t *testing.T) { rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 3, 4}) // async -> async setStoreState(cluster, "up", "up", "up", "up", "down", "down") rep.tickUpdateState() // store 2 won't be available before it syncs status. rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 3, 4}) syncStoreStatus(1, 2, 3, 4) rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 2, 3, 4}) // async -> sync_recover setStoreState(cluster, "up", "up", "up", "up", "up", "up") rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) assertStateIDUpdate() + rep.drSwitchToAsync([]uint64{1, 2, 3, 4, 5}) + rep.config.DRAutoSync.WaitRecoverTimeout = typeutil.NewDuration(time.Hour) + rep.tickUpdateState() + re.Equal(drStateAsync, rep.drGetState()) // wait recover timeout + + rep.config.DRAutoSync.WaitRecoverTimeout = typeutil.NewDuration(0) setStoreState(cluster, "down", "up", "up", "up", "up", "up") rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) @@ -387,27 +409,27 @@ func TestReplicateState(t *testing.T) { stateID := rep.drAutoSync.StateID // replicate after initialized rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "sync", stateID, nil) // repliate state to new member replicator.memberIDs = append(replicator.memberIDs, 2, 3) rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[2]) - re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[3]) + assertLastData(t, replicator.lastData[2], "sync", stateID, nil) + assertLastData(t, replicator.lastData[3], "sync", stateID, nil) // inject error replicator.errors[2] = errors.New("failed to persist") rep.tickUpdateState() // switch async_wait since there is only one zone newStateID := rep.drAutoSync.StateID rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[1]) - re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[2]) - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[3]) + assertLastData(t, replicator.lastData[1], "async_wait", newStateID, []uint64{1, 2}) + assertLastData(t, replicator.lastData[2], "sync", stateID, nil) + assertLastData(t, replicator.lastData[3], "async_wait", newStateID, []uint64{1, 2}) // clear error, replicate to node 2 next time delete(replicator.errors, 2) rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[2]) + assertLastData(t, replicator.lastData[2], "async_wait", newStateID, []uint64{1, 2}) } func TestAsynctimeout(t *testing.T) { @@ -637,7 +659,7 @@ func TestComplexPlacementRules(t *testing.T) { rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4,5,6]}`, rep.drAutoSync.StateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", rep.drAutoSync.StateID, []uint64{1, 2, 3, 4, 5, 6}) // reset to sync setStoreState(cluster, "up", "up", "up", "up", "up", "up", "up", "up", "up", "up") @@ -698,7 +720,7 @@ func TestComplexPlacementRules2(t *testing.T) { rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", rep.drAutoSync.StateID, []uint64{1, 2, 3, 4}) } func TestComplexPlacementRules3(t *testing.T) { @@ -737,7 +759,7 @@ func TestComplexPlacementRules3(t *testing.T) { rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", rep.drAutoSync.StateID, []uint64{1, 2, 3, 4}) } func genRegions(cluster *mockcluster.Cluster, stateID uint64, n int) []*core.RegionInfo { diff --git a/server/config/config.go b/server/config/config.go index 0485e077c676..da6b0e29e075 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -831,13 +831,14 @@ func NormalizeReplicationMode(m string) string { // DRAutoSyncReplicationConfig is the configuration for auto sync mode between 2 data centers. type DRAutoSyncReplicationConfig struct { - LabelKey string `toml:"label-key" json:"label-key"` - Primary string `toml:"primary" json:"primary"` - DR string `toml:"dr" json:"dr"` - PrimaryReplicas int `toml:"primary-replicas" json:"primary-replicas"` - DRReplicas int `toml:"dr-replicas" json:"dr-replicas"` - WaitStoreTimeout typeutil.Duration `toml:"wait-store-timeout" json:"wait-store-timeout"` - PauseRegionSplit bool `toml:"pause-region-split" json:"pause-region-split,string"` + LabelKey string `toml:"label-key" json:"label-key"` + Primary string `toml:"primary" json:"primary"` + DR string `toml:"dr" json:"dr"` + PrimaryReplicas int `toml:"primary-replicas" json:"primary-replicas"` + DRReplicas int `toml:"dr-replicas" json:"dr-replicas"` + WaitStoreTimeout typeutil.Duration `toml:"wait-store-timeout" json:"wait-store-timeout"` + WaitRecoverTimeout typeutil.Duration `toml:"wait-recover-timeout" json:"wait-recover-timeout"` + PauseRegionSplit bool `toml:"pause-region-split" json:"pause-region-split,string"` } func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) { From f1cee6c3971e18c6ab201e50555261a8c51c3041 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 10 Nov 2023 15:48:43 +0800 Subject: [PATCH 7/7] mcs/resourcemanager: delete expire tokenSlot (#7344) close tikv/pd#7346 Signed-off-by: guo-shaoge --- .../resourcemanager/server/token_buckets.go | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/pkg/mcs/resourcemanager/server/token_buckets.go b/pkg/mcs/resourcemanager/server/token_buckets.go index a0acba3b54dc..05a93c326733 100644 --- a/pkg/mcs/resourcemanager/server/token_buckets.go +++ b/pkg/mcs/resourcemanager/server/token_buckets.go @@ -20,6 +20,8 @@ import ( "github.com/gogo/protobuf/proto" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/pingcap/log" + "go.uber.org/zap" ) const ( @@ -31,6 +33,7 @@ const ( defaultReserveRatio = 0.5 defaultLoanCoefficient = 2 maxAssignTokens = math.MaxFloat64 / 1024 // assume max client connect is 1024 + slotExpireTimeout = 10 * time.Minute ) // GroupTokenBucket is a token bucket for a resource group. @@ -62,6 +65,7 @@ type TokenSlot struct { // tokenCapacity is the number of tokens in the slot. tokenCapacity float64 lastTokenCapacity float64 + lastReqTime time.Time } // GroupTokenBucketState is the running state of TokenBucket. @@ -75,7 +79,8 @@ type GroupTokenBucketState struct { LastUpdate *time.Time `json:"last_update,omitempty"` Initialized bool `json:"initialized"` // settingChanged is used to avoid that the number of tokens returned is jitter because of changing fill rate. - settingChanged bool + settingChanged bool + lastCheckExpireSlot time.Time } // Clone returns the copy of GroupTokenBucketState @@ -95,6 +100,7 @@ func (gts *GroupTokenBucketState) Clone() *GroupTokenBucketState { Initialized: gts.Initialized, tokenSlots: tokenSlots, clientConsumptionTokensSum: gts.clientConsumptionTokensSum, + lastCheckExpireSlot: gts.lastCheckExpireSlot, } } @@ -119,16 +125,18 @@ func (gts *GroupTokenBucketState) balanceSlotTokens( clientUniqueID uint64, settings *rmpb.TokenLimitSettings, requiredToken, elapseTokens float64) { + now := time.Now() slot, exist := gts.tokenSlots[clientUniqueID] if !exist { // Only slots that require a positive number will be considered alive, // but still need to allocate the elapsed tokens as well. if requiredToken != 0 { - slot = &TokenSlot{} + slot = &TokenSlot{lastReqTime: now} gts.tokenSlots[clientUniqueID] = slot gts.clientConsumptionTokensSum = 0 } } else { + slot.lastReqTime = now if gts.clientConsumptionTokensSum >= maxAssignTokens { gts.clientConsumptionTokensSum = 0 } @@ -139,6 +147,16 @@ func (gts *GroupTokenBucketState) balanceSlotTokens( } } + if time.Since(gts.lastCheckExpireSlot) >= slotExpireTimeout { + gts.lastCheckExpireSlot = now + for clientUniqueID, slot := range gts.tokenSlots { + if time.Since(slot.lastReqTime) >= slotExpireTimeout { + delete(gts.tokenSlots, clientUniqueID) + log.Info("delete resource group slot because expire", zap.Time("last-req-time", slot.lastReqTime), + zap.Any("expire timeout", slotExpireTimeout), zap.Any("del client id", clientUniqueID), zap.Any("len", len(gts.tokenSlots))) + } + } + } if len(gts.tokenSlots) == 0 { return } @@ -264,6 +282,7 @@ func (gtb *GroupTokenBucket) init(now time.Time, clientID uint64) { lastTokenCapacity: gtb.Tokens, } gtb.LastUpdate = &now + gtb.lastCheckExpireSlot = now gtb.Initialized = true }