From 34386cee0f532bb0838f78d6dd8c7ce961b68609 Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Sun, 25 Jun 2023 20:31:06 -0700 Subject: [PATCH 1/8] mcs, tso: fix expensive async forwardTSORequest() and its timeout mechanism. (#6664) ref tikv/pd#6659 Fix expensive async forwardTSORequest() and its timeout mechanism. In order to handle the timeout case for forwardStream send/recv, the existing logic is to create context.withTimeout(forwardCtx,...) for every request, then start a new goroutine "forwardTSORequest", which is very expensive as shown by the profiling in #6659. This change create a watchDeadline routine per forward stream and reuse it for all the forward requests in which forwardTSORequest is called synchronously. Compared to the existing logic, the new change is much cheaper and the latency is much stable. Signed-off-by: Bin Shi --- pkg/utils/tsoutil/tso_dispatcher.go | 33 +++++--- server/grpc_service.go | 122 ++++++++++++++-------------- 2 files changed, 81 insertions(+), 74 deletions(-) diff --git a/pkg/utils/tsoutil/tso_dispatcher.go b/pkg/utils/tsoutil/tso_dispatcher.go index 152a3996538..69baf4b1e41 100644 --- a/pkg/utils/tsoutil/tso_dispatcher.go +++ b/pkg/utils/tsoutil/tso_dispatcher.go @@ -58,7 +58,7 @@ func NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize prometheus.Histo return tsoDispatcher } -// DispatchRequest is the entry point for dispatching/forwarding a tso request to the detination host +// DispatchRequest is the entry point for dispatching/forwarding a tso request to the destination host func (s *TSODispatcher) DispatchRequest( ctx context.Context, req Request, @@ -69,9 +69,9 @@ func (s *TSODispatcher) DispatchRequest( val, loaded := s.dispatchChs.LoadOrStore(req.getForwardedHost(), make(chan Request, maxMergeRequests)) reqCh := val.(chan Request) if !loaded { - tsDeadlineCh := make(chan deadline, 1) + tsDeadlineCh := make(chan *TSDeadline, 1) go s.dispatch(ctx, tsoProtoFactory, req.getForwardedHost(), req.getClientConn(), reqCh, tsDeadlineCh, doneCh, errCh, tsoPrimaryWatchers...) - go watchTSDeadline(ctx, tsDeadlineCh) + go WatchTSDeadline(ctx, tsDeadlineCh) } reqCh <- req } @@ -82,7 +82,7 @@ func (s *TSODispatcher) dispatch( forwardedHost string, clientConn *grpc.ClientConn, tsoRequestCh <-chan Request, - tsDeadlineCh chan<- deadline, + tsDeadlineCh chan<- *TSDeadline, doneCh <-chan struct{}, errCh chan<- error, tsoPrimaryWatchers ...*etcdutil.LoopWatcher) { @@ -121,11 +121,7 @@ func (s *TSODispatcher) dispatch( requests[i] = <-tsoRequestCh } done := make(chan struct{}) - dl := deadline{ - timer: time.After(DefaultTSOProxyTimeout), - done: done, - cancel: cancel, - } + dl := NewTSDeadline(DefaultTSOProxyTimeout, done, cancel) select { case tsDeadlineCh <- dl: case <-dispatcherCtx.Done(): @@ -199,13 +195,28 @@ func (s *TSODispatcher) finishRequest(requests []Request, physical, firstLogical return nil } -type deadline struct { +// TSDeadline is used to watch the deadline of each tso request. +type TSDeadline struct { timer <-chan time.Time done chan struct{} cancel context.CancelFunc } -func watchTSDeadline(ctx context.Context, tsDeadlineCh <-chan deadline) { +// NewTSDeadline creates a new TSDeadline. +func NewTSDeadline( + timeout time.Duration, + done chan struct{}, + cancel context.CancelFunc, +) *TSDeadline { + return &TSDeadline{ + timer: time.After(timeout), + done: done, + cancel: cancel, + } +} + +// WatchTSDeadline watches the deadline of each tso request. +func WatchTSDeadline(ctx context.Context, tsDeadlineCh <-chan *TSDeadline) { defer logutil.LogPanic() ctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/server/grpc_service.go b/server/grpc_service.go index 3096357654b..1badabb19d8 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -406,16 +406,17 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { var ( server = &tsoServer{stream: stream} forwardStream tsopb.TSO_TsoClient - cancel context.CancelFunc + forwardCtx context.Context + cancelForward context.CancelFunc lastForwardedHost string ) defer func() { s.concurrentTSOProxyStreamings.Add(-1) - // cancel the forward stream - if cancel != nil { - cancel() + if cancelForward != nil { + cancelForward() } }() + maxConcurrentTSOProxyStreamings := int32(s.GetMaxConcurrentTSOProxyStreamings()) if maxConcurrentTSOProxyStreamings >= 0 { if newCount := s.concurrentTSOProxyStreamings.Add(1); newCount > maxConcurrentTSOProxyStreamings { @@ -423,6 +424,9 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { } } + tsDeadlineCh := make(chan *tsoutil.TSDeadline, 1) + go tsoutil.WatchTSDeadline(stream.Context(), tsDeadlineCh) + for { select { case <-s.ctx.Done(): @@ -449,22 +453,24 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { return errors.WithStack(ErrNotFoundTSOAddr) } if forwardStream == nil || lastForwardedHost != forwardedHost { - if cancel != nil { - cancel() + if cancelForward != nil { + cancelForward() } clientConn, err := s.getDelegateClient(s.ctx, forwardedHost) if err != nil { return errors.WithStack(err) } - forwardStream, cancel, err = s.createTSOForwardStream(clientConn) + forwardStream, forwardCtx, cancelForward, err = + s.createTSOForwardStream(stream.Context(), clientConn) if err != nil { return errors.WithStack(err) } lastForwardedHost = forwardedHost } - tsopbResp, err := s.forwardTSORequestWithDeadLine(stream.Context(), request, forwardStream) + tsopbResp, err := s.forwardTSORequestWithDeadLine( + forwardCtx, cancelForward, forwardStream, request, tsDeadlineCh) if err != nil { return errors.WithStack(err) } @@ -502,37 +508,39 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { } func (s *GrpcServer) forwardTSORequestWithDeadLine( - ctx context.Context, request *pdpb.TsoRequest, forwardStream tsopb.TSO_TsoClient, + forwardCtx context.Context, + cancelForward context.CancelFunc, + forwardStream tsopb.TSO_TsoClient, + request *pdpb.TsoRequest, + tsDeadlineCh chan<- *tsoutil.TSDeadline, ) (*tsopb.TsoResponse, error) { - defer logutil.LogPanic() - // Create a context with deadline for forwarding TSO request to TSO service. - ctxTimeout, cancel := context.WithTimeout(ctx, tsoutil.DefaultTSOProxyTimeout) - defer cancel() - - tsoProxyBatchSize.Observe(float64(request.GetCount())) + done := make(chan struct{}) + dl := tsoutil.NewTSDeadline(tsoutil.DefaultTSOProxyTimeout, done, cancelForward) + select { + case tsDeadlineCh <- dl: + case <-forwardCtx.Done(): + return nil, forwardCtx.Err() + } - // used to receive the result from doSomething function - tsoRespCh := make(chan *tsopbTSOResponse, 1) start := time.Now() - go s.forwardTSORequestAsync(ctxTimeout, request, forwardStream, tsoRespCh) - select { - case <-ctxTimeout.Done(): - tsoProxyForwardTimeoutCounter.Inc() - return nil, ErrForwardTSOTimeout - case tsoResp := <-tsoRespCh: - if tsoResp.err == nil { - tsoProxyHandleDuration.Observe(time.Since(start).Seconds()) + resp, err := s.forwardTSORequest(forwardCtx, request, forwardStream) + close(done) + if err != nil { + if strings.Contains(err.Error(), errs.NotLeaderErr) { + s.tsoPrimaryWatcher.ForceLoad() } - return tsoResp.response, tsoResp.err + return nil, err } + tsoProxyBatchSize.Observe(float64(request.GetCount())) + tsoProxyHandleDuration.Observe(time.Since(start).Seconds()) + return resp, nil } -func (s *GrpcServer) forwardTSORequestAsync( - ctxTimeout context.Context, +func (s *GrpcServer) forwardTSORequest( + ctx context.Context, request *pdpb.TsoRequest, forwardStream tsopb.TSO_TsoClient, - tsoRespCh chan<- *tsopbTSOResponse, -) { +) (*tsopb.TsoResponse, error) { tsopbReq := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ ClusterId: request.GetHeader().GetClusterId(), @@ -545,46 +553,32 @@ func (s *GrpcServer) forwardTSORequestAsync( } failpoint.Inject("tsoProxySendToTSOTimeout", func() { - <-ctxTimeout.Done() - failpoint.Return() + // block until watchDeadline routine cancels the context. + <-ctx.Done() }) - if err := forwardStream.Send(tsopbReq); err != nil { - select { - case <-ctxTimeout.Done(): - return - case tsoRespCh <- &tsopbTSOResponse{err: err}: - } - return - } - select { - case <-ctxTimeout.Done(): - return + case <-ctx.Done(): + return nil, ctx.Err() default: } + if err := forwardStream.Send(tsopbReq); err != nil { + return nil, err + } + failpoint.Inject("tsoProxyRecvFromTSOTimeout", func() { - <-ctxTimeout.Done() - failpoint.Return() + // block until watchDeadline routine cancels the context. + <-ctx.Done() }) - response, err := forwardStream.Recv() - if err != nil { - if strings.Contains(err.Error(), errs.NotLeaderErr) { - s.tsoPrimaryWatcher.ForceLoad() - } - } select { - case <-ctxTimeout.Done(): - return - case tsoRespCh <- &tsopbTSOResponse{response: response, err: err}: + case <-ctx.Done(): + return nil, ctx.Err() + default: } -} -type tsopbTSOResponse struct { - response *tsopb.TsoResponse - err error + return forwardStream.Recv() } // tsoServer wraps PD_TsoServer to ensure when any error @@ -2140,13 +2134,15 @@ func forwardRegionHeartbeatClientToServer(forwardStream pdpb.PD_RegionHeartbeatC } } -func (s *GrpcServer) createTSOForwardStream(client *grpc.ClientConn) (tsopb.TSO_TsoClient, context.CancelFunc, error) { +func (s *GrpcServer) createTSOForwardStream( + ctx context.Context, client *grpc.ClientConn, +) (tsopb.TSO_TsoClient, context.Context, context.CancelFunc, error) { done := make(chan struct{}) - ctx, cancel := context.WithCancel(s.ctx) - go checkStream(ctx, cancel, done) - forwardStream, err := tsopb.NewTSOClient(client).Tso(ctx) + forwardCtx, cancelForward := context.WithCancel(ctx) + go checkStream(forwardCtx, cancelForward, done) + forwardStream, err := tsopb.NewTSOClient(client).Tso(forwardCtx) done <- struct{}{} - return forwardStream, cancel, err + return forwardStream, forwardCtx, cancelForward, err } func (s *GrpcServer) createReportBucketsForwardStream(client *grpc.ClientConn) (pdpb.PD_ReportBucketsClient, context.CancelFunc, error) { From b4b1fccc5b4271f301d2eb865ff0d0c6b7b49be6 Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Sun, 25 Jun 2023 20:59:35 -0700 Subject: [PATCH 2/8] mcs, tso: support weighted-election for TSO keyspace group primary election (#6617) close tikv/pd#6616 Add the tso server registry watch loop in tso's keyspace group manager. re-distribute TSO keyspace group primaries according to their replica priorities Signed-off-by: Bin Shi --- pkg/mcs/discovery/discover.go | 2 +- pkg/mcs/discovery/key_path.go | 8 +- pkg/mcs/discovery/register.go | 2 +- pkg/mcs/tso/server/server.go | 3 +- pkg/mcs/utils/constant.go | 3 + pkg/tso/keyspace_group_manager.go | 250 ++++++++++++-- pkg/tso/keyspace_group_manager_test.go | 325 ++++++++++++++---- tests/integrations/mcs/cluster.go | 3 +- .../mcs/tso/keyspace_group_manager_test.go | 4 +- 9 files changed, 498 insertions(+), 102 deletions(-) diff --git a/pkg/mcs/discovery/discover.go b/pkg/mcs/discovery/discover.go index d3c06ad2cc8..6d939fde540 100644 --- a/pkg/mcs/discovery/discover.go +++ b/pkg/mcs/discovery/discover.go @@ -21,7 +21,7 @@ import ( // Discover is used to get all the service instances of the specified service name. func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, error) { - key := discoveryPath(clusterID, serviceName) + "/" + key := ServicePath(clusterID, serviceName) + "/" endKey := clientv3.GetPrefixRangeEnd(key) withRange := clientv3.WithRange(endKey) diff --git a/pkg/mcs/discovery/key_path.go b/pkg/mcs/discovery/key_path.go index 0e53b21c9fe..4eb339dd5db 100644 --- a/pkg/mcs/discovery/key_path.go +++ b/pkg/mcs/discovery/key_path.go @@ -24,15 +24,17 @@ const ( registryKey = "registry" ) -func registryPath(clusterID, serviceName, serviceAddr string) string { +// RegistryPath returns the full path to store microservice addresses. +func RegistryPath(clusterID, serviceName, serviceAddr string) string { return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey, serviceAddr}, "/") } -func discoveryPath(clusterID, serviceName string) string { +// ServicePath returns the path to store microservice addresses. +func ServicePath(clusterID, serviceName string) string { return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey}, "/") } // TSOPath returns the path to store TSO addresses. func TSOPath(clusterID uint64) string { - return discoveryPath(strconv.FormatUint(clusterID, 10), "tso") + "/" + return ServicePath(strconv.FormatUint(clusterID, 10), "tso") + "/" } diff --git a/pkg/mcs/discovery/register.go b/pkg/mcs/discovery/register.go index 617c1520b8d..3e08d9b49cf 100644 --- a/pkg/mcs/discovery/register.go +++ b/pkg/mcs/discovery/register.go @@ -41,7 +41,7 @@ type ServiceRegister struct { // NewServiceRegister creates a new ServiceRegister. func NewServiceRegister(ctx context.Context, cli *clientv3.Client, clusterID, serviceName, serviceAddr, serializedValue string, ttl int64) *ServiceRegister { cctx, cancel := context.WithCancel(ctx) - serviceKey := registryPath(clusterID, serviceName, serviceAddr) + serviceKey := RegistryPath(clusterID, serviceName, serviceAddr) return &ServiceRegister{ ctx: cctx, cancel: cancel, diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 1b4b5c36374..a0904f4dc7b 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -539,7 +539,8 @@ func (s *Server) startServer() (err error) { tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID) s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr} s.keyspaceGroupManager = tso.NewKeyspaceGroupManager( - s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr, legacySvcRootPath, tsoSvcRootPath, s.cfg) + s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr, + discovery.TSOPath(s.clusterID), legacySvcRootPath, tsoSvcRootPath, s.cfg) if err := s.keyspaceGroupManager.Initialize(); err != nil { return err } diff --git a/pkg/mcs/utils/constant.go b/pkg/mcs/utils/constant.go index 21a4a655afe..c87cec16a64 100644 --- a/pkg/mcs/utils/constant.go +++ b/pkg/mcs/utils/constant.go @@ -76,5 +76,8 @@ const ( DefaultKeyspaceGroupReplicaCount = 2 // DefaultKeyspaceGroupReplicaPriority is the default priority of a keyspace group replica. + // It's used in keyspace group primary weighted-election to balance primaries' distribution. + // Among multiple replicas of a keyspace group, the higher the priority, the more likely + // the replica is to be elected as primary. DefaultKeyspaceGroupReplicaPriority = 0 ) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 6ed925f971a..a82376430fa 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + "math" "net/http" "path" "regexp" @@ -27,6 +28,7 @@ import ( "time" perrors "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/election" @@ -55,6 +57,10 @@ const ( // mergingCheckInterval is the interval for merging check to see if the keyspace groups // merging process could be moved forward. mergingCheckInterval = 5 * time.Second + // defaultPrimaryPriorityCheckInterval is the default interval for checking if the priorities + // of the primaries on this TSO server/pod have changed. A goroutine will periodically check + // do this check and re-distribute the primaries if necessary. + defaultPrimaryPriorityCheckInterval = 10 * time.Second ) type state struct { @@ -153,6 +159,41 @@ func (s *state) getKeyspaceGroupMetaWithCheck( mcsutils.DefaultKeyspaceGroupID, nil } +func (s *state) getNextPrimaryToReset( + groupID int, localAddress string, +) (member ElectionMember, kg *endpoint.KeyspaceGroup, localPriority, nextGroupID int) { + s.RLock() + defer s.RUnlock() + + // Both s.ams and s.kgs are arrays with the fixed size defined by the const value MaxKeyspaceGroupCountInUse. + groupSize := int(mcsutils.MaxKeyspaceGroupCountInUse) + groupID %= groupSize + for j := 0; j < groupSize; groupID, j = (groupID+1)%groupSize, j+1 { + am := s.ams[groupID] + kg := s.kgs[groupID] + if am != nil && kg != nil && am.GetMember().IsLeader() { + maxPriority := math.MinInt32 + localPriority := math.MaxInt32 + for _, member := range kg.Members { + if member.Priority > maxPriority { + maxPriority = member.Priority + } + if member.Address == localAddress { + localPriority = member.Priority + } + } + + if localPriority < maxPriority { + // return here and reset the primary outside of the critical section + // as resetting the primary may take some time. + return am.GetMember(), kg, localPriority, (groupID + 1) % groupSize + } + } + } + + return nil, nil, 0, groupID +} + // kgPrimaryPathBuilder builds the path for keyspace group primary election. // default keyspace group: "/ms/{cluster_id}/tso/00000/primary". // non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary". @@ -198,6 +239,10 @@ type KeyspaceGroupManager struct { // which participate in the election of its keyspace group's primary, in the format of // "electionNamePrefix:keyspace-group-id" electionNamePrefix string + // tsoServiceKey is the path for storing the registered tso servers. + // Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress} + // Value: discover.ServiceRegistryEntry + tsoServiceKey string // legacySvcRootPath defines the legacy root path for all etcd paths which derives from // the PD/API service. It's in the format of "/pd/{cluster_id}". // The main paths for different usages include: @@ -238,17 +283,26 @@ type KeyspaceGroupManager struct { loadKeyspaceGroupsBatchSize int64 loadFromEtcdMaxRetryTimes int - // compiledKGMembershipIDRegexp is the compiled regular expression for matching keyspace group id in the - // keyspace group membership path. + // compiledKGMembershipIDRegexp is the compiled regular expression for matching keyspace group id + // in the keyspace group membership path. compiledKGMembershipIDRegexp *regexp.Regexp // groupUpdateRetryList is the list of keyspace groups which failed to update and need to retry. groupUpdateRetryList map[uint32]*endpoint.KeyspaceGroup groupWatcher *etcdutil.LoopWatcher - primaryPathBuilder *kgPrimaryPathBuilder - // mergeCheckerCancelMap is the cancel function map for the merge checker of each keyspace group. mergeCheckerCancelMap sync.Map // GroupID -> context.CancelFunc + + primaryPathBuilder *kgPrimaryPathBuilder + primaryPriorityCheckInterval time.Duration + + // tsoNodes is the registered tso servers. + tsoNodes sync.Map // store as map[string]struct{} + // serviceRegistryMap stores the mapping from the service registry key to the service address. + // Note: it is only used in tsoNodesWatcher. + serviceRegistryMap map[string]string + // tsoNodesWatcher is the watcher for the registered tso servers. + tsoNodesWatcher *etcdutil.LoopWatcher } // NewKeyspaceGroupManager creates a new Keyspace Group Manager. @@ -258,6 +312,7 @@ func NewKeyspaceGroupManager( etcdClient *clientv3.Client, httpClient *http.Client, electionNamePrefix string, + tsoServiceKey string, legacySvcRootPath string, tsoSvcRootPath string, cfg ServiceConfig, @@ -270,16 +325,19 @@ func NewKeyspaceGroupManager( ctx, cancel := context.WithCancel(ctx) kgm := &KeyspaceGroupManager{ - ctx: ctx, - cancel: cancel, - tsoServiceID: tsoServiceID, - etcdClient: etcdClient, - httpClient: httpClient, - electionNamePrefix: electionNamePrefix, - legacySvcRootPath: legacySvcRootPath, - tsoSvcRootPath: tsoSvcRootPath, - cfg: cfg, - groupUpdateRetryList: make(map[uint32]*endpoint.KeyspaceGroup), + ctx: ctx, + cancel: cancel, + tsoServiceID: tsoServiceID, + etcdClient: etcdClient, + httpClient: httpClient, + electionNamePrefix: electionNamePrefix, + tsoServiceKey: tsoServiceKey, + legacySvcRootPath: legacySvcRootPath, + tsoSvcRootPath: tsoSvcRootPath, + primaryPriorityCheckInterval: defaultPrimaryPriorityCheckInterval, + cfg: cfg, + groupUpdateRetryList: make(map[uint32]*endpoint.KeyspaceGroup), + serviceRegistryMap: make(map[string]string), } kgm.legacySvcStorage = endpoint.NewStorageEndpoint( kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil) @@ -296,6 +354,100 @@ func NewKeyspaceGroupManager( // Initialize this KeyspaceGroupManager func (kgm *KeyspaceGroupManager) Initialize() error { + if err := kgm.InitializeTSOServerWatchLoop(); err != nil { + log.Error("failed to initialize tso server watch loop", zap.Error(err)) + kgm.Close() // Close the manager to clean up the allocated resources. + return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err) + } + if err := kgm.InitializeGroupWatchLoop(); err != nil { + log.Error("failed to initialize group watch loop", zap.Error(err)) + kgm.Close() // Close the manager to clean up the loaded keyspace groups. + return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err) + } + + kgm.wg.Add(1) + go kgm.primaryPriorityCheckLoop() + + return nil +} + +// Close this KeyspaceGroupManager +func (kgm *KeyspaceGroupManager) Close() { + log.Info("closing keyspace group manager") + + // Note: don't change the order. We need to cancel all service loops in the keyspace group manager + // before closing all keyspace groups. It's to prevent concurrent addition/removal of keyspace groups + // during critical periods such as service shutdown and online keyspace group, while the former requires + // snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is + // added/initialized after that. + kgm.cancel() + kgm.wg.Wait() + kgm.state.deinitialize() + + log.Info("keyspace group manager closed") +} + +// GetServiceConfig returns the service config. +func (kgm *KeyspaceGroupManager) GetServiceConfig() ServiceConfig { + return kgm.cfg +} + +// InitializeTSOServerWatchLoop initializes the watch loop monitoring the path for storing the +// registered tso servers. +// Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress} +// Value: discover.ServiceRegistryEntry +func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error { + tsoServiceEndKey := clientv3.GetPrefixRangeEnd(kgm.tsoServiceKey) + "/" + + putFn := func(kv *mvccpb.KeyValue) error { + s := &discovery.ServiceRegistryEntry{} + if err := json.Unmarshal(kv.Value, s); err != nil { + log.Warn("failed to unmarshal service registry entry", + zap.String("event-kv-key", string(kv.Key)), zap.Error(err)) + return err + } + kgm.tsoNodes.Store(s.ServiceAddr, struct{}{}) + kgm.serviceRegistryMap[string(kv.Key)] = s.ServiceAddr + return nil + } + deleteFn := func(kv *mvccpb.KeyValue) error { + key := string(kv.Key) + if serviceAddr, ok := kgm.serviceRegistryMap[key]; ok { + delete(kgm.serviceRegistryMap, key) + kgm.tsoNodes.Delete(serviceAddr) + return nil + } + return perrors.Errorf("failed to find the service address for key %s", key) + } + + kgm.tsoNodesWatcher = etcdutil.NewLoopWatcher( + kgm.ctx, + &kgm.wg, + kgm.etcdClient, + "tso-nodes-watcher", + kgm.tsoServiceKey, + putFn, + deleteFn, + func() error { return nil }, + clientv3.WithRange(tsoServiceEndKey), + ) + + kgm.wg.Add(1) + go kgm.tsoNodesWatcher.StartWatchLoop() + + if err := kgm.tsoNodesWatcher.WaitLoad(); err != nil { + log.Error("failed to load the registered tso servers", errs.ZapError(err)) + return err + } + + return nil +} + +// InitializeGroupWatchLoop initializes the watch loop monitoring the path for storing keyspace group +// membership/distribution metadata. +// Key: /pd/{cluster_id}/tso/keyspace_groups/membership/{group} +// Value: endpoint.KeyspaceGroup +func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { rootPath := kgm.legacySvcRootPath startKey := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(mcsutils.DefaultKeyspaceGroupID)}, "/") endKey := strings.Join( @@ -375,20 +527,66 @@ func (kgm *KeyspaceGroupManager) Initialize() error { return nil } -// Close this KeyspaceGroupManager -func (kgm *KeyspaceGroupManager) Close() { - log.Info("closing keyspace group manager") +func (kgm *KeyspaceGroupManager) primaryPriorityCheckLoop() { + defer logutil.LogPanic() + defer kgm.wg.Done() - // Note: don't change the order. We need to cancel all service loops in the keyspace group manager - // before closing all keyspace groups. It's to prevent concurrent addition/removal of keyspace groups - // during critical periods such as service shutdown and online keyspace group, while the former requires - // snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is - // added/initialized after that. - kgm.cancel() - kgm.wg.Wait() - kgm.state.deinitialize() + failpoint.Inject("fastPrimaryPriorityCheck", func() { + kgm.primaryPriorityCheckInterval = 200 * time.Millisecond + }) - log.Info("keyspace group manager closed") + ctx, cancel := context.WithCancel(kgm.ctx) + defer cancel() + groupID := 0 + for { + select { + case <-ctx.Done(): + log.Info("exit primary priority check loop") + return + case <-time.After(kgm.primaryPriorityCheckInterval): + // Every primaryPriorityCheckInterval, we only reset the primary of one keyspace group + member, kg, localPriority, nextGroupID := kgm.getNextPrimaryToReset(groupID, kgm.tsoServiceID.ServiceAddr) + if member != nil { + aliveTSONodes := make(map[string]struct{}) + kgm.tsoNodes.Range(func(key, _ interface{}) bool { + aliveTSONodes[key.(string)] = struct{}{} + return true + }) + if len(aliveTSONodes) == 0 { + log.Warn("no alive tso node", zap.String("local-address", kgm.tsoServiceID.ServiceAddr)) + continue + } + // If there is a alive member with higher priority, reset the leader. + resetLeader := false + for _, member := range kg.Members { + if member.Priority <= localPriority { + continue + } + if _, ok := aliveTSONodes[member.Address]; ok { + resetLeader = true + break + } + } + if resetLeader { + select { + case <-ctx.Done(): + default: + member.ResetLeader() + log.Info("reset primary", + zap.String("local-address", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("keyspace-group-id", kg.ID), + zap.Int("local-priority", localPriority)) + } + } else { + log.Warn("no need to reset primary as the replicas with higher priority are offline", + zap.String("local-address", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("keyspace-group-id", kg.ID), + zap.Int("local-priority", localPriority)) + } + } + groupID = nextGroupID + } + } } func (kgm *KeyspaceGroupManager) isAssignedToMe(group *endpoint.KeyspaceGroup) bool { diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 1e7d072ade3..2e03418bae7 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -22,7 +22,6 @@ import ( "path" "reflect" "sort" - "strconv" "strings" "sync" "testing" @@ -36,8 +35,9 @@ import ( "github.com/tikv/pd/pkg/mcs/discovery" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/storage/endpoint" - "github.com/tikv/pd/pkg/utils/memberutil" + "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/pkg/utils/tsoutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/goleak" @@ -51,6 +51,7 @@ type keyspaceGroupManagerTestSuite struct { suite.Suite ctx context.Context cancel context.CancelFunc + ClusterID uint64 backendEndpoints string etcdClient *clientv3.Client clean func() @@ -64,13 +65,23 @@ func TestKeyspaceGroupManagerTestSuite(t *testing.T) { func (suite *keyspaceGroupManagerTestSuite) SetupSuite() { t := suite.T() suite.ctx, suite.cancel = context.WithCancel(context.Background()) + suite.ClusterID = rand.Uint64() suite.backendEndpoints, suite.etcdClient, suite.clean = startEmbeddedEtcd(t) + suite.cfg = suite.createConfig() +} + +func (suite *keyspaceGroupManagerTestSuite) TearDownSuite() { + suite.clean() + suite.cancel() +} - suite.cfg = &TestServiceConfig{ - Name: "tso-test-name", +func (suite *keyspaceGroupManagerTestSuite) createConfig() *TestServiceConfig { + addr := tempurl.Alloc() + return &TestServiceConfig{ + Name: "tso-test-name-default", BackendEndpoints: suite.backendEndpoints, - ListenAddr: "http://127.0.0.1:3379", - AdvertiseListenAddr: "http://127.0.0.1:3379", + ListenAddr: addr, + AdvertiseListenAddr: addr, LeaderLease: mcsutils.DefaultLeaderLease, LocalTSOEnabled: false, TSOUpdatePhysicalInterval: 50 * time.Millisecond, @@ -80,11 +91,6 @@ func (suite *keyspaceGroupManagerTestSuite) SetupSuite() { } } -func (suite *keyspaceGroupManagerTestSuite) TearDownSuite() { - suite.clean() - suite.cancel() -} - // TestNewKeyspaceGroupManager tests the initialization of KeyspaceGroupManager. // It should initialize the allocator manager with the desired configurations and parameters. func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { @@ -92,11 +98,15 @@ func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: suite.cfg.AdvertiseListenAddr} guid := uuid.New().String() + tsoServiceKey := discovery.ServicePath(guid, "tso") + "/" legacySvcRootPath := path.Join("/pd", guid) tsoSvcRootPath := path.Join("/ms", guid, "tso") electionNamePrefix := "tso-server-" + guid - kgm := suite.newKeyspaceGroupManager(tsoServiceID, electionNamePrefix, legacySvcRootPath, tsoSvcRootPath) + kgm := NewKeyspaceGroupManager( + suite.ctx, tsoServiceID, suite.etcdClient, nil, electionNamePrefix, + tsoServiceKey, legacySvcRootPath, tsoSvcRootPath, suite.cfg) + defer kgm.Close() err := kgm.Initialize() re.NoError(err) @@ -116,8 +126,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { re.Equal(legacySvcRootPath, am.rootPath) re.Equal(time.Duration(mcsutils.DefaultLeaderLease)*time.Second, am.saveInterval) re.Equal(time.Duration(50)*time.Millisecond, am.updatePhysicalInterval) - - kgm.Close() } // TestLoadKeyspaceGroupsAssignment tests the loading of the keyspace group assignment. @@ -174,8 +182,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsTimeout() { defer mgr.Close() addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0), []uint32{0}) + suite.ctx, suite.etcdClient, uint32(0), mgr.legacySvcRootPath, + []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{0}) // Set the timeout to 1 second and inject the delayLoad to return 3 seconds to let // the loading sleep 3 seconds. @@ -197,8 +205,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsSucceedWithTem defer mgr.Close() addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0), []uint32{0}) + suite.ctx, suite.etcdClient, uint32(0), mgr.legacySvcRootPath, + []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{0}) // Set the max retry times to 3 and inject the loadTemporaryFail to return 2 to let // loading from etcd fail 2 times but the whole initialization still succeeds. @@ -219,8 +227,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsFailed() { defer mgr.Close() addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0), []uint32{0}) + suite.ctx, suite.etcdClient, uint32(0), mgr.legacySvcRootPath, + []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{0}) // Set the max retry times to 3 and inject the loadTemporaryFail to return 3 to let // loading from etcd fail 3 times which should cause the whole initialization to fail. @@ -388,9 +396,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestGetKeyspaceGroupMetaWithCheck() // Create keyspace group 0 which contains keyspace 0, 1, 2. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - uint32(0), []uint32{0, 1, 2}) + suite.ctx, suite.etcdClient, uint32(0), mgr.legacySvcRootPath, + []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{0, 1, 2}) err = mgr.Initialize() re.NoError(err) @@ -461,14 +468,12 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() { // Create keyspace group 0 which contains keyspace 0, 1, 2. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - mcsutils.DefaultKeyspaceGroupID, []uint32{mcsutils.DefaultKeyspaceID, 1, 2}) + suite.ctx, suite.etcdClient, mcsutils.DefaultKeyspaceGroupID, rootPath, + []string{svcAddr}, []int{0}, []uint32{mcsutils.DefaultKeyspaceID, 1, 2}) // Create keyspace group 3 which contains keyspace 3, 4. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - uint32(3), []uint32{3, 4}) + suite.ctx, suite.etcdClient, uint32(3), mgr.legacySvcRootPath, + []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{3, 4}) err = mgr.Initialize() re.NoError(err) @@ -536,14 +541,12 @@ func (suite *keyspaceGroupManagerTestSuite) TestKeyspaceMovementConsistency() { // Create keyspace group 0 which contains keyspace 0, 1, 2. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - mcsutils.DefaultKeyspaceGroupID, []uint32{mcsutils.DefaultKeyspaceID, 10, 20}) + suite.ctx, suite.etcdClient, mcsutils.DefaultKeyspaceGroupID, + rootPath, []string{svcAddr}, []int{0}, []uint32{mcsutils.DefaultKeyspaceID, 10, 20}) // Create keyspace group 1 which contains keyspace 3, 4. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - uint32(1), []uint32{11, 21}) + suite.ctx, suite.etcdClient, uint32(1), rootPath, + []string{svcAddr}, []int{0}, []uint32{11, 21}) err = mgr.Initialize() re.NoError(err) @@ -591,9 +594,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestHandleTSORequestWithWrongMembers // Create keyspace group 0 which contains keyspace 0, 1, 2. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - uint32(0), []uint32{0, 1, 2}) + suite.ctx, suite.etcdClient, uint32(0), mgr.legacySvcRootPath, + []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{0, 1, 2}) err := mgr.Initialize() re.NoError(err) @@ -681,15 +683,6 @@ func (suite *keyspaceGroupManagerTestSuite) applyEtcdEvents( } } -func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager( - tsoServiceID *discovery.ServiceRegistryEntry, - electionNamePrefix, legacySvcRootPath, tsoSvcRootPath string, -) *KeyspaceGroupManager { - return NewKeyspaceGroupManager( - suite.ctx, tsoServiceID, suite.etcdClient, nil, electionNamePrefix, - legacySvcRootPath, tsoSvcRootPath, suite.cfg) -} - // runTestLoadMultipleKeyspaceGroupsAssignment tests the loading of multiple keyspace group assignment. func (suite *keyspaceGroupManagerTestSuite) runTestLoadKeyspaceGroupsAssignment( re *require.Assertions, @@ -727,10 +720,16 @@ func (suite *keyspaceGroupManagerTestSuite) runTestLoadKeyspaceGroupsAssignment( expectedGroupIDs = append(expectedGroupIDs, uint32(j)) mux.Unlock() } + + svcAddrs := make([]string, 0) + if assignToMe { + svcAddrs = append(svcAddrs, mgr.tsoServiceID.ServiceAddr) + } else { + svcAddrs = append(svcAddrs, uuid.NewString()) + } addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, - assignToMe, mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - uint32(j), []uint32{uint32(j)}) + suite.ctx, suite.etcdClient, uint32(j), mgr.legacySvcRootPath, + svcAddrs, []int{0}, []uint32{uint32(j)}) } }(i) } @@ -756,19 +755,27 @@ func (suite *keyspaceGroupManagerTestSuite) runTestLoadKeyspaceGroupsAssignment( func (suite *keyspaceGroupManagerTestSuite) newUniqueKeyspaceGroupManager( loadKeyspaceGroupsBatchSize int64, // set to 0 to use the default value ) *KeyspaceGroupManager { - tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: suite.cfg.AdvertiseListenAddr} - uniqueID := memberutil.GenerateUniqueID(uuid.New().String()) - uniqueStr := strconv.FormatUint(uniqueID, 10) + return suite.newKeyspaceGroupManager(loadKeyspaceGroupsBatchSize, uuid.New().String(), suite.cfg) +} + +func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager( + loadKeyspaceGroupsBatchSize int64, // set to 0 to use the default value + uniqueStr string, + cfg *TestServiceConfig, +) *KeyspaceGroupManager { + tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: cfg.GetAdvertiseListenAddr()} + tsoServiceKey := discovery.ServicePath(uniqueStr, "tso") + "/" legacySvcRootPath := path.Join("/pd", uniqueStr) tsoSvcRootPath := path.Join("/ms", uniqueStr, "tso") - electionNamePrefix := "kgm-test-" + uniqueStr - - keyspaceGroupManager := suite.newKeyspaceGroupManager(tsoServiceID, electionNamePrefix, legacySvcRootPath, tsoSvcRootPath) + electionNamePrefix := "kgm-test-" + cfg.GetAdvertiseListenAddr() + kgm := NewKeyspaceGroupManager( + suite.ctx, tsoServiceID, suite.etcdClient, nil, electionNamePrefix, + tsoServiceKey, legacySvcRootPath, tsoSvcRootPath, cfg) if loadKeyspaceGroupsBatchSize != 0 { - keyspaceGroupManager.loadKeyspaceGroupsBatchSize = loadKeyspaceGroupsBatchSize + kgm.loadKeyspaceGroupsBatchSize = loadKeyspaceGroupsBatchSize } - return keyspaceGroupManager + return kgm } // putKeyspaceGroupToEtcd puts a keyspace group to etcd. @@ -805,19 +812,21 @@ func deleteKeyspaceGroupInEtcd( // addKeyspaceGroupAssignment adds a keyspace group assignment to etcd. func addKeyspaceGroupAssignment( - ctx context.Context, etcdClient *clientv3.Client, - assignToMe bool, rootPath, svcAddr string, - groupID uint32, keyspaces []uint32, + ctx context.Context, + etcdClient *clientv3.Client, + groupID uint32, + rootPath string, + svcAddrs []string, + priorites []int, + keyspaces []uint32, ) error { - var location string - if assignToMe { - location = svcAddr - } else { - location = uuid.NewString() + members := make([]endpoint.KeyspaceGroupMember, len(svcAddrs)) + for i, svcAddr := range svcAddrs { + members[i] = endpoint.KeyspaceGroupMember{Address: svcAddr, Priority: priorites[i]} } group := &endpoint.KeyspaceGroup{ ID: groupID, - Members: []endpoint.KeyspaceGroupMember{{Address: location}}, + Members: members, Keyspaces: keyspaces, } @@ -990,3 +999,185 @@ func (suite *keyspaceGroupManagerTestSuite) TestGroupSplitUpdateRetry() { return reflect.DeepEqual(expectedGroupIDs, assignedGroupIDs) }) } + +// TestPrimaryPriorityChange tests the case that the primary priority of a keyspace group changes +// and the locations of the primaries should be updated accordingly. +func (suite *keyspaceGroupManagerTestSuite) TestPrimaryPriorityChange() { + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastPrimaryPriorityCheck", `return(true)`)) + defer func() { + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastPrimaryPriorityCheck")) + }() + + var err error + defaultPriority := mcsutils.DefaultKeyspaceGroupReplicaPriority + uniqueStr := uuid.New().String() + rootPath := path.Join("/pd", uniqueStr) + cfg1 := suite.createConfig() + cfg2 := suite.createConfig() + svcAddr1 := cfg1.GetAdvertiseListenAddr() + svcAddr2 := cfg2.GetAdvertiseListenAddr() + + // Register TSO server 1 + err = suite.registerTSOServer(re, uniqueStr, svcAddr1, cfg1) + re.NoError(err) + defer func() { + re.NoError(suite.deregisterTSOServer(uniqueStr, svcAddr1)) + }() + + // Create three keyspace groups on two TSO servers with default replica priority. + ids := []uint32{0, mcsutils.MaxKeyspaceGroupCountInUse / 2, mcsutils.MaxKeyspaceGroupCountInUse - 1} + for _, id := range ids { + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, id, rootPath, + []string{svcAddr1, svcAddr2}, []int{defaultPriority, defaultPriority}, []uint32{id}) + } + + // Create the first TSO server which loads all three keyspace groups created above. + // All primaries should be on the first TSO server. + mgr1 := suite.newKeyspaceGroupManager(1, uniqueStr, cfg1) + re.NotNil(mgr1) + defer mgr1.Close() + err = mgr1.Initialize() + re.NoError(err) + // Wait until all keyspace groups are ready for serving tso requests. + waitForPrimariesServing(re, []*KeyspaceGroupManager{mgr1, mgr1, mgr1}, ids) + + // We increase the priority of the TSO server 2 which hasn't started yet. The primaries + // on the TSO server 1 shouldn't move. + for _, id := range ids { + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, id, rootPath, + []string{svcAddr1, svcAddr2}, []int{defaultPriority, defaultPriority + 1}, []uint32{id}) + } + + // And the primaries on TSO Server 1 should continue to serve TSO requests without any failures. + for i := 0; i < 100; i++ { + for _, id := range ids { + _, keyspaceGroupBelongTo, err := mgr1.HandleTSORequest(id, id, GlobalDCLocation, 1) + re.NoError(err) + re.Equal(id, keyspaceGroupBelongTo) + } + } + + // Continually sending TSO requests to the TSO server 1 to make sure the primaries will move back + // to it at the end of test + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + checkTSO(ctx, re, &wg, mgr1, ids) + + // Create the Second TSO server. + err = suite.registerTSOServer(re, uniqueStr, svcAddr2, cfg2) + re.NoError(err) + mgr2 := suite.newKeyspaceGroupManager(1, uniqueStr, cfg2) + re.NotNil(mgr2) + err = mgr2.Initialize() + re.NoError(err) + // All primaries should eventually move to the second TSO server because of the higher priority. + waitForPrimariesServing(re, []*KeyspaceGroupManager{mgr2, mgr2, mgr2}, ids) + + // Shutdown the second TSO server. + mgr2.Close() + re.NoError(suite.deregisterTSOServer(uniqueStr, svcAddr2)) + // The primaries should move back to the first TSO server. + waitForPrimariesServing(re, []*KeyspaceGroupManager{mgr1, mgr1, mgr1}, ids) + + // Restart the Second TSO server. + err = suite.registerTSOServer(re, uniqueStr, svcAddr2, cfg2) + re.NoError(err) + defer func() { + re.NoError(suite.deregisterTSOServer(uniqueStr, svcAddr2)) + }() + mgr2 = suite.newKeyspaceGroupManager(1, uniqueStr, cfg2) + re.NotNil(mgr2) + defer mgr2.Close() + err = mgr2.Initialize() + re.NoError(err) + // All primaries should eventually move to the second TSO server because of the higher priority. + waitForPrimariesServing(re, []*KeyspaceGroupManager{mgr2, mgr2, mgr2}, ids) + + mgrs := []*KeyspaceGroupManager{mgr2, mgr2, mgr2} + for i, id := range ids { + // Set the keyspace group replica on the first TSO server to have higher priority. + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, id, rootPath, + []string{svcAddr1, svcAddr2}, []int{defaultPriority - 1, defaultPriority - 2}, []uint32{id}) + // The primary of this keyspace group should move back to the first TSO server. + mgrs[i] = mgr1 + waitForPrimariesServing(re, mgrs, ids) + } + + cancel() + wg.Wait() +} + +// Register TSO server. +func (suite *keyspaceGroupManagerTestSuite) registerTSOServer( + re *require.Assertions, clusterID, svcAddr string, cfg *TestServiceConfig, +) error { + // Register TSO server 1 + serviceID := &discovery.ServiceRegistryEntry{ServiceAddr: cfg.GetAdvertiseListenAddr()} + serializedEntry, err := serviceID.Serialize() + re.NoError(err) + serviceKey := discovery.RegistryPath(clusterID, mcsutils.TSOServiceName, svcAddr) + _, err = suite.etcdClient.Put(suite.ctx, serviceKey, serializedEntry) + return err +} + +// Deregister TSO server. +func (suite *keyspaceGroupManagerTestSuite) deregisterTSOServer(clusterID, svcAddr string) error { + serviceKey := discovery.RegistryPath(clusterID, mcsutils.TSOServiceName, svcAddr) + if _, err := suite.etcdClient.Delete(suite.ctx, serviceKey); err != nil { + return err + } + return nil +} + +func checkTSO( + ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, + mgr *KeyspaceGroupManager, ids []uint32, +) { + wg.Add(len(ids)) + for _, id := range ids { + go func(id uint32) { + defer wg.Done() + var ts, lastTS uint64 + for { + select { + case <-ctx.Done(): + // Make sure the lastTS is not empty + re.NotEmpty(lastTS) + return + default: + } + respTS, respGroupID, err := mgr.HandleTSORequest(id, id, GlobalDCLocation, 1) + // omit the error check since there are many kinds of errors during primaries movement + if err != nil { + continue + } + re.Equal(id, respGroupID) + ts = tsoutil.ComposeTS(respTS.Physical, respTS.Logical) + re.Less(lastTS, ts) + lastTS = ts + } + }(id) + } +} + +func waitForPrimariesServing( + re *require.Assertions, mgrs []*KeyspaceGroupManager, ids []uint32, +) { + testutil.Eventually(re, func() bool { + for i := 0; i < 100; i++ { + for j, id := range ids { + if member, err := mgrs[j].GetElectionMember(id, id); err != nil || !member.IsLeader() { + return false + } + if _, _, err := mgrs[j].HandleTSORequest(id, id, GlobalDCLocation, 1); err != nil { + return false + } + } + } + return true + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) +} diff --git a/tests/integrations/mcs/cluster.go b/tests/integrations/mcs/cluster.go index 3cda8c1888a..65a6bf293c3 100644 --- a/tests/integrations/mcs/cluster.go +++ b/tests/integrations/mcs/cluster.go @@ -202,7 +202,8 @@ func (tc *TestTSOCluster) GetServers() map[string]*tso.Server { func (tc *TestTSOCluster) GetKeyspaceGroupMember() (members []endpoint.KeyspaceGroupMember) { for _, server := range tc.servers { members = append(members, endpoint.KeyspaceGroupMember{ - Address: server.GetAddr(), + Address: server.GetAddr(), + Priority: mcsutils.DefaultKeyspaceGroupReplicaPriority, }) } return diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index d265d8fc73b..98c6b90ca28 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -497,7 +497,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) // Init api server config but not start. - tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, serverName string) { + tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = []string{ "keyspace_a", "keyspace_b", } @@ -536,7 +536,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { // Wait pd clients are ready. testutil.Eventually(re, func() bool { count := 0 - clients.Range(func(key, value interface{}) bool { + clients.Range(func(_, _ interface{}) bool { count++ return true }) From 4d26c8b0aebdb437cd4d7788211e6fe49cbb199b Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 26 Jun 2023 12:53:34 +0800 Subject: [PATCH 3/8] tools: add merge commands for pd-ctl (#6675) ref tikv/pd#6589 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- tests/pdctl/keyspace/keyspace_group_test.go | 73 +++++++++++++++ .../pdctl/command/keyspace_group_command.go | 92 +++++++++++++++++++ 2 files changed, 165 insertions(+) diff --git a/tests/pdctl/keyspace/keyspace_group_test.go b/tests/pdctl/keyspace/keyspace_group_test.go index 0c44700f48e..80f9eaa8420 100644 --- a/tests/pdctl/keyspace/keyspace_group_test.go +++ b/tests/pdctl/keyspace/keyspace_group_test.go @@ -277,3 +277,76 @@ func TestSetNodeAndPriorityKeyspaceGroup(t *testing.T) { re.NoError(err) re.Contains(string(output), "Failed to parse the priority") } + +func TestMergeKeyspaceGroup(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`)) + keyspaces := make([]string, 0) + // we test the case which exceed the default max txn ops limit in etcd, which is 128. + for i := 0; i < 129; i++ { + keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) + } + tc, err := tests.NewTestAPICluster(ctx, 3, func(conf *config.Config, serverName string) { + conf.Keyspace.PreAlloc = keyspaces + }) + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + pdAddr := tc.GetConfig().GetClientURL() + + _, tsoServerCleanup1, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc()) + defer tsoServerCleanup1() + re.NoError(err) + _, tsoServerCleanup2, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc()) + defer tsoServerCleanup2() + re.NoError(err) + cmd := pdctlCmd.GetRootCmd() + + tc.WaitLeader() + leaderServer := tc.GetServer(tc.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + + // split keyspace group. + testutil.Eventually(re, func() bool { + args := []string{"-u", pdAddr, "keyspace-group", "split", "0", "1", "2"} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + return strings.Contains(string(output), "Success") + }) + + args := []string{"-u", pdAddr, "keyspace-group", "finish-split", "0"} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + strings.Contains(string(output), "Success") + args = []string{"-u", pdAddr, "keyspace-group", "finish-split", "1"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + strings.Contains(string(output), "Success") + + // merge keyspace group. + testutil.Eventually(re, func() bool { + args := []string{"-u", pdAddr, "keyspace-group", "merge", "0", "1"} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + return strings.Contains(string(output), "Success") + }) + + args = []string{"-u", pdAddr, "keyspace-group", "finish-merge", "0"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + strings.Contains(string(output), "Success") + args = []string{"-u", pdAddr, "keyspace-group", "0"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + var keyspaceGroup endpoint.KeyspaceGroup + err = json.Unmarshal(output, &keyspaceGroup) + re.NoError(err) + re.Len(keyspaceGroup.Keyspaces, 130) + re.Nil(keyspaceGroup.MergeState) + + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop")) +} diff --git a/tools/pd-ctl/pdctl/command/keyspace_group_command.go b/tools/pd-ctl/pdctl/command/keyspace_group_command.go index 662a4aa157e..3e46df39e63 100644 --- a/tools/pd-ctl/pdctl/command/keyspace_group_command.go +++ b/tools/pd-ctl/pdctl/command/keyspace_group_command.go @@ -33,6 +33,9 @@ func NewKeyspaceGroupCommand() *cobra.Command { Run: showKeyspaceGroupCommandFunc, } cmd.AddCommand(newSplitKeyspaceGroupCommand()) + cmd.AddCommand(newFinishSplitKeyspaceGroupCommand()) + cmd.AddCommand(newMergeKeyspaceGroupCommand()) + cmd.AddCommand(newFinishMergeKeyspaceGroupCommand()) cmd.AddCommand(newSetNodesKeyspaceGroupCommand()) cmd.AddCommand(newSetPriorityKeyspaceGroupCommand()) return cmd @@ -47,6 +50,35 @@ func newSplitKeyspaceGroupCommand() *cobra.Command { return r } +func newFinishSplitKeyspaceGroupCommand() *cobra.Command { + r := &cobra.Command{ + Use: "finish-split ", + Short: "finish split the keyspace group with the given ID", + Run: finishSplitKeyspaceGroupCommandFunc, + Hidden: true, + } + return r +} + +func newMergeKeyspaceGroupCommand() *cobra.Command { + r := &cobra.Command{ + Use: "merge []", + Short: "merge the keyspace group with the given IDs into the target one", + Run: mergeKeyspaceGroupCommandFunc, + } + return r +} + +func newFinishMergeKeyspaceGroupCommand() *cobra.Command { + r := &cobra.Command{ + Use: "finish-merge ", + Short: "finish merge the keyspace group with the given ID", + Run: finishMergeKeyspaceGroupCommandFunc, + Hidden: true, + } + return r +} + func newSetNodesKeyspaceGroupCommand() *cobra.Command { r := &cobra.Command{ Use: "set-node [...]", @@ -108,6 +140,66 @@ func splitKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) { }) } +func finishSplitKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) { + if len(args) < 1 { + cmd.Usage() + return + } + _, err := strconv.ParseUint(args[0], 10, 32) + if err != nil { + cmd.Printf("Failed to parse the keyspace group ID: %s\n", err) + return + } + _, err = doRequest(cmd, fmt.Sprintf("%s/%s/split", keyspaceGroupsPrefix, args[0]), http.MethodDelete, http.Header{}) + if err != nil { + cmd.Println(err) + return + } + cmd.Println("Success!") +} + +func mergeKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) { + if len(args) < 2 { + cmd.Usage() + return + } + _, err := strconv.ParseUint(args[0], 10, 32) + if err != nil { + cmd.Printf("Failed to parse the target keyspace group ID: %s\n", err) + return + } + groups := make([]uint32, 0, len(args)-1) + for _, arg := range args[1:] { + id, err := strconv.ParseUint(arg, 10, 32) + if err != nil { + cmd.Printf("Failed to parse the keyspace ID: %s\n", err) + return + } + groups = append(groups, uint32(id)) + } + postJSON(cmd, fmt.Sprintf("%s/%s/merge", keyspaceGroupsPrefix, args[0]), map[string]interface{}{ + "merge-list": groups, + }) +} + +func finishMergeKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) { + if len(args) < 1 { + cmd.Usage() + return + } + _, err := strconv.ParseUint(args[0], 10, 32) + if err != nil { + cmd.Printf("Failed to parse the keyspace group ID: %s\n", err) + return + } + _, err = doRequest(cmd, fmt.Sprintf("%s/%s/merge", keyspaceGroupsPrefix, args[0]), http.MethodDelete, http.Header{}) + if err != nil { + cmd.Println(err) + return + } + cmd.Println("Success!") +} + func setNodesKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) { if len(args) < 2 { cmd.Usage() From cd5b1cebe27b2af25ae0fac991a43c3a249dc01b Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 26 Jun 2023 20:26:34 +0800 Subject: [PATCH 4/8] pd-ctl: fix hot region show (#6650) close tikv/pd#6649 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/schedule/coordinator.go | 27 ++++++++++++++++++++++----- pkg/statistics/hot_peer_cache.go | 4 ++-- pkg/statistics/hot_regions_stat.go | 17 ++--------------- 3 files changed, 26 insertions(+), 22 deletions(-) diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 07e519dac7a..419b6a7adae 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -519,11 +519,28 @@ func (c *Coordinator) GetHotRegionsByType(typ statistics.RWType) *statistics.Sto default: } // update params `IsLearner` and `LastUpdateTime` - for _, stores := range []statistics.StoreHotPeersStat{infos.AsLeader, infos.AsPeer} { - for _, store := range stores { - for _, hotPeer := range store.Stats { - region := c.cluster.GetRegion(hotPeer.RegionID) - hotPeer.UpdateHotPeerStatShow(region) + s := []statistics.StoreHotPeersStat{infos.AsLeader, infos.AsPeer} + for i, stores := range s { + for j, store := range stores { + for k := range store.Stats { + h := &s[i][j].Stats[k] + region := c.cluster.GetRegion(h.RegionID) + if region != nil { + h.IsLearner = core.IsLearner(region.GetPeer(h.StoreID)) + } + switch typ { + case statistics.Write: + if region != nil { + h.LastUpdateTime = time.Unix(int64(region.GetInterval().GetEndTimestamp()), 0) + } + case statistics.Read: + store := c.cluster.GetStore(h.StoreID) + if store != nil { + ts := store.GetMeta().GetLastHeartbeat() + h.LastUpdateTime = time.Unix(ts/1e9, ts%1e9) + } + default: + } } } } diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index 478a9f506d1..16c64b752e0 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -234,8 +234,8 @@ func (f *hotPeerCache) checkPeerFlow(peer *core.PeerInfo, region *core.RegionInf actionType: Update, stores: make([]uint64, len(peers)), } - for _, peer := range peers { - newItem.stores = append(newItem.stores, peer.GetStoreId()) + for i, peer := range peers { + newItem.stores[i] = peer.GetStoreId() } if oldItem == nil { diff --git a/pkg/statistics/hot_regions_stat.go b/pkg/statistics/hot_regions_stat.go index d606a0d8bb4..d30a153492b 100644 --- a/pkg/statistics/hot_regions_stat.go +++ b/pkg/statistics/hot_regions_stat.go @@ -14,11 +14,7 @@ package statistics -import ( - "time" - - "github.com/tikv/pd/pkg/core" -) +import "time" // HotPeersStat records all hot regions statistics type HotPeersStat struct { @@ -44,14 +40,5 @@ type HotPeerStatShow struct { KeyRate float64 `json:"flow_keys"` QueryRate float64 `json:"flow_query"` AntiCount int `json:"anti_count"` - LastUpdateTime time.Time `json:"last_update_time"` -} - -// UpdateHotPeerStatShow updates the region information, such as `IsLearner` and `LastUpdateTime`. -func (h *HotPeerStatShow) UpdateHotPeerStatShow(region *core.RegionInfo) { - if region == nil { - return - } - h.IsLearner = core.IsLearner(region.GetPeer(h.StoreID)) - h.LastUpdateTime = time.Unix(int64(region.GetInterval().GetEndTimestamp()), 0) + LastUpdateTime time.Time `json:"last_update_time,omitempty"` } From c07dd199f091dcbd1fec17a959bebfccd073c815 Mon Sep 17 00:00:00 2001 From: Hu# Date: Tue, 27 Jun 2023 11:01:05 +0800 Subject: [PATCH 5/8] dashboard: update version to show all instance cpu limit (#6678) ref tikv/pd#6415 Signed-off-by: husharp Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- tests/integrations/client/go.mod | 2 +- tests/integrations/client/go.sum | 4 ++-- tests/integrations/mcs/go.mod | 2 +- tests/integrations/mcs/go.sum | 4 ++-- tests/integrations/tso/go.mod | 2 +- tests/integrations/tso/go.sum | 4 ++-- 8 files changed, 12 insertions(+), 12 deletions(-) diff --git a/go.mod b/go.mod index 5cc8b55950c..0dccd506aad 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/pingcap/kvproto v0.0.0-20230530111525-e4919c190b46 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 - github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5 + github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45 github.com/prometheus/client_golang v1.11.1 github.com/prometheus/common v0.26.0 github.com/sasha-s/go-deadlock v0.2.0 diff --git a/go.sum b/go.sum index ac034473b3b..2962b6ca897 100644 --- a/go.sum +++ b/go.sum @@ -431,8 +431,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5 h1:adV4kUWI7v+v/6joR7lfjFngHhS4eiqwr4g3dHCjHtA= -github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM= +github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45 h1:/jqjj+ydlqjp144LAlHDfHtr7eWJyaNIIXX5viv0RZo= +github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/tests/integrations/client/go.mod b/tests/integrations/client/go.mod index d6e94929f6f..00c6bd4daaf 100644 --- a/tests/integrations/client/go.mod +++ b/tests/integrations/client/go.mod @@ -119,7 +119,7 @@ require ( github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tests/integrations/client/go.sum b/tests/integrations/client/go.sum index 848c249f032..0e51f7e6dec 100644 --- a/tests/integrations/client/go.sum +++ b/tests/integrations/client/go.sum @@ -395,8 +395,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5 h1:adV4kUWI7v+v/6joR7lfjFngHhS4eiqwr4g3dHCjHtA= -github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM= +github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45 h1:/jqjj+ydlqjp144LAlHDfHtr7eWJyaNIIXX5viv0RZo= +github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index 2b225f0ced2..70f9bcc3a9a 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -119,7 +119,7 @@ require ( github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tests/integrations/mcs/go.sum b/tests/integrations/mcs/go.sum index 178a6dfb953..0fd5cbf2d6f 100644 --- a/tests/integrations/mcs/go.sum +++ b/tests/integrations/mcs/go.sum @@ -394,8 +394,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5 h1:adV4kUWI7v+v/6joR7lfjFngHhS4eiqwr4g3dHCjHtA= -github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM= +github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45 h1:/jqjj+ydlqjp144LAlHDfHtr7eWJyaNIIXX5viv0RZo= +github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/tests/integrations/tso/go.mod b/tests/integrations/tso/go.mod index 06b3c93887e..9b82706d7a4 100644 --- a/tests/integrations/tso/go.mod +++ b/tests/integrations/tso/go.mod @@ -117,7 +117,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tests/integrations/tso/go.sum b/tests/integrations/tso/go.sum index 5af6e71ae4d..cdffbdfe8df 100644 --- a/tests/integrations/tso/go.sum +++ b/tests/integrations/tso/go.sum @@ -392,8 +392,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5 h1:adV4kUWI7v+v/6joR7lfjFngHhS4eiqwr4g3dHCjHtA= -github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM= +github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45 h1:/jqjj+ydlqjp144LAlHDfHtr7eWJyaNIIXX5viv0RZo= +github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= From 317c1bcdd0d369598b2ec340a105a50e5ed3db57 Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Mon, 26 Jun 2023 20:34:05 -0700 Subject: [PATCH 6/8] Add split-range cmd and fix duplicate keyspaces (#6689) close tikv/pd#6687, close tikv/pd#6688 Add split-range cmd and fix duplicate keyspaces 1. Add split-range cmd to support StartKeyspaceID and EndKeyspaceID parameters. 2. Fix "split 0 2 2 2" generate duplicate keyspaces in the keyspace list of the group" Signed-off-by: Bin Shi --- pkg/keyspace/tso_keyspace_group.go | 10 ++++- tools/pd-ctl/pdctl/command/global.go | 2 +- .../pdctl/command/keyspace_group_command.go | 42 +++++++++++++++++++ 3 files changed, 52 insertions(+), 2 deletions(-) diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index fe91443bb95..c4916f5f526 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -615,7 +615,15 @@ func buildSplitKeyspaces( oldSplit = append(oldSplit, keyspace) } } - return oldSplit, new, nil + // Dedup new keyspaces if it's necessary. + if newNum == len(newKeyspaceMap) { + return oldSplit, new, nil + } + newSplit := make([]uint32, 0, len(newKeyspaceMap)) + for keyspace := range newKeyspaceMap { + newSplit = append(newSplit, keyspace) + } + return oldSplit, newSplit, nil } // Split according to the start and end keyspace ID. if startKeyspaceID == 0 && endKeyspaceID == 0 { diff --git a/tools/pd-ctl/pdctl/command/global.go b/tools/pd-ctl/pdctl/command/global.go index 623ab3edfba..8d888b60b1f 100644 --- a/tools/pd-ctl/pdctl/command/global.go +++ b/tools/pd-ctl/pdctl/command/global.go @@ -192,7 +192,7 @@ func postJSON(cmd *cobra.Command, prefix string, input map[string]interface{}) { return nil }) if err != nil { - cmd.Printf("Failed! %s", err) + cmd.Printf("Failed! %s\n", err) return } cmd.Println("Success!") diff --git a/tools/pd-ctl/pdctl/command/keyspace_group_command.go b/tools/pd-ctl/pdctl/command/keyspace_group_command.go index 3e46df39e63..b5ccaf01e2b 100644 --- a/tools/pd-ctl/pdctl/command/keyspace_group_command.go +++ b/tools/pd-ctl/pdctl/command/keyspace_group_command.go @@ -33,6 +33,7 @@ func NewKeyspaceGroupCommand() *cobra.Command { Run: showKeyspaceGroupCommandFunc, } cmd.AddCommand(newSplitKeyspaceGroupCommand()) + cmd.AddCommand(newSplitRangeKeyspaceGroupCommand()) cmd.AddCommand(newFinishSplitKeyspaceGroupCommand()) cmd.AddCommand(newMergeKeyspaceGroupCommand()) cmd.AddCommand(newFinishMergeKeyspaceGroupCommand()) @@ -50,6 +51,15 @@ func newSplitKeyspaceGroupCommand() *cobra.Command { return r } +func newSplitRangeKeyspaceGroupCommand() *cobra.Command { + r := &cobra.Command{ + Use: "split-range ", + Short: "split the keyspace group with the given ID and transfer the keyspaces in the given range (both ends inclusive) into the newly split one", + Run: splitRangeKeyspaceGroupCommandFunc, + } + return r +} + func newFinishSplitKeyspaceGroupCommand() *cobra.Command { r := &cobra.Command{ Use: "finish-split ", @@ -140,6 +150,38 @@ func splitKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) { }) } +func splitRangeKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) { + if len(args) < 4 { + cmd.Usage() + return + } + _, err := strconv.ParseUint(args[0], 10, 32) + if err != nil { + cmd.Printf("Failed to parse the old keyspace group ID: %s\n", err) + return + } + newID, err := strconv.ParseUint(args[1], 10, 32) + if err != nil { + cmd.Printf("Failed to parse the new keyspace group ID: %s\n", err) + return + } + startKeyspaceID, err := strconv.ParseUint(args[2], 10, 32) + if err != nil { + cmd.Printf("Failed to parse the start keyspace ID: %s\n", err) + return + } + endKeyspaceID, err := strconv.ParseUint(args[3], 10, 32) + if err != nil { + cmd.Printf("Failed to parse the end keyspace ID: %s\n", err) + return + } + postJSON(cmd, fmt.Sprintf("%s/%s/split", keyspaceGroupsPrefix, args[0]), map[string]interface{}{ + "new-id": uint32(newID), + "start-keyspace-id": uint32(startKeyspaceID), + "end-keyspace-id": uint32(endKeyspaceID), + }) +} + func finishSplitKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) { if len(args) < 1 { cmd.Usage() From 7e2cf0455acddcaac531e70939eae4d8c40a9712 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 27 Jun 2023 12:01:05 +0800 Subject: [PATCH 7/8] *: add group id to error logs (#6695) close tikv/pd#6685 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/keyspace/keyspace.go | 4 +- pkg/keyspace/tso_keyspace_group.go | 69 +++++++++++++------------ pkg/keyspace/tso_keyspace_group_test.go | 18 +++---- pkg/keyspace/util.go | 20 +++++-- 4 files changed, 62 insertions(+), 49 deletions(-) diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index 6220166d409..6fa4941b178 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -702,10 +702,10 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID return errors.Errorf("default keyspace group %d not found", utils.DefaultKeyspaceGroupID) } if defaultKeyspaceGroup.IsSplitting() { - return ErrKeyspaceGroupInSplit + return ErrKeyspaceGroupInSplit(utils.DefaultKeyspaceGroupID) } if defaultKeyspaceGroup.IsMerging() { - return ErrKeyspaceGroupInMerging + return ErrKeyspaceGroupInMerging(utils.DefaultKeyspaceGroupID) } keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, maxEtcdTxnOps) if err != nil { diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index c4916f5f526..a1cc9a0e9b0 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -310,7 +310,7 @@ func (m *GroupManager) DeleteKeyspaceGroupByID(id uint32) (*endpoint.KeyspaceGro return nil } if kg.IsSplitting() { - return ErrKeyspaceGroupInSplit + return ErrKeyspaceGroupInSplit(id) } return m.store.DeleteKeyspaceGroup(txn, id) }); err != nil { @@ -339,10 +339,10 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro return ErrKeyspaceGroupExists } if oldKG.IsSplitting() && overwrite { - return ErrKeyspaceGroupInSplit + return ErrKeyspaceGroupInSplit(keyspaceGroup.ID) } if oldKG.IsMerging() && overwrite { - return ErrKeyspaceGroupInMerging + return ErrKeyspaceGroupInMerging(keyspaceGroup.ID) } newKG := &endpoint.KeyspaceGroup{ ID: keyspaceGroup.ID, @@ -414,13 +414,13 @@ func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupI func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind, groupID uint64, keyspaceID uint32, mutation int) error { kg := m.groups[userKind].Get(uint32(groupID)) if kg == nil { - return errors.Errorf("keyspace group %d not found", groupID) + return ErrKeyspaceGroupNotExists(uint32(groupID)) } if kg.IsSplitting() { - return ErrKeyspaceGroupInSplit + return ErrKeyspaceGroupInSplit(uint32(groupID)) } if kg.IsMerging() { - return ErrKeyspaceGroupInMerging + return ErrKeyspaceGroupInMerging(uint32(groupID)) } changed := false @@ -473,11 +473,14 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse if newKG == nil { return errors.Errorf("keyspace group %s not found in %s group", newGroupID, newUserKind) } - if oldKG.IsSplitting() || newKG.IsSplitting() { - return ErrKeyspaceGroupInSplit - } - if oldKG.IsMerging() || newKG.IsMerging() { - return ErrKeyspaceGroupInMerging + if oldKG.IsSplitting() { + return ErrKeyspaceGroupInSplit(uint32(oldID)) + } else if newKG.IsSplitting() { + return ErrKeyspaceGroupInSplit(uint32(newID)) + } else if oldKG.IsMerging() { + return ErrKeyspaceGroupInMerging(uint32(oldID)) + } else if newKG.IsMerging() { + return ErrKeyspaceGroupInMerging(uint32(newID)) } var updateOld, updateNew bool @@ -523,15 +526,15 @@ func (m *GroupManager) SplitKeyspaceGroupByID( return err } if splitSourceKg == nil { - return ErrKeyspaceGroupNotExists + return ErrKeyspaceGroupNotExists(splitSourceID) } // A keyspace group can not take part in multiple split processes. if splitSourceKg.IsSplitting() { - return ErrKeyspaceGroupInSplit + return ErrKeyspaceGroupInSplit(splitSourceID) } // A keyspace group can not be split when it is in merging. if splitSourceKg.IsMerging() { - return ErrKeyspaceGroupInMerging + return ErrKeyspaceGroupInMerging(splitSourceID) } // Check if the source keyspace group has enough replicas. if len(splitSourceKg.Members) < utils.DefaultKeyspaceGroupReplicaCount { @@ -661,11 +664,11 @@ func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error { return err } if splitTargetKg == nil { - return ErrKeyspaceGroupNotExists + return ErrKeyspaceGroupNotExists(splitTargetID) } // Check if it's in the split state. if !splitTargetKg.IsSplitTarget() { - return ErrKeyspaceGroupNotInSplit + return ErrKeyspaceGroupNotInSplit(splitTargetID) } // Load the split source keyspace group then. splitSourceKg, err = m.store.LoadKeyspaceGroup(txn, splitTargetKg.SplitSource()) @@ -673,10 +676,10 @@ func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error { return err } if splitSourceKg == nil { - return ErrKeyspaceGroupNotExists + return ErrKeyspaceGroupNotExists(splitTargetKg.SplitSource()) } if !splitSourceKg.IsSplitSource() { - return ErrKeyspaceGroupNotInSplit + return ErrKeyspaceGroupNotInSplit(splitTargetKg.SplitSource()) } splitTargetKg.SplitState = nil splitSourceKg.SplitState = nil @@ -721,13 +724,13 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount return err } if kg == nil { - return ErrKeyspaceGroupNotExists + return ErrKeyspaceGroupNotExists(id) } if kg.IsSplitting() { - return ErrKeyspaceGroupInSplit + return ErrKeyspaceGroupInSplit(id) } if kg.IsMerging() { - return ErrKeyspaceGroupInMerging + return ErrKeyspaceGroupInMerging(id) } exists := make(map[string]struct{}) for _, member := range kg.Members { @@ -783,13 +786,13 @@ func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error return err } if kg == nil { - return ErrKeyspaceGroupNotExists + return ErrKeyspaceGroupNotExists(id) } if kg.IsSplitting() { - return ErrKeyspaceGroupInSplit + return ErrKeyspaceGroupInSplit(id) } if kg.IsMerging() { - return ErrKeyspaceGroupInMerging + return ErrKeyspaceGroupInMerging(id) } members := make([]endpoint.KeyspaceGroupMember, 0, len(nodes)) for _, node := range nodes { @@ -820,13 +823,13 @@ func (m *GroupManager) SetPriorityForKeyspaceGroup(id uint32, node string, prior return err } if kg == nil { - return ErrKeyspaceGroupNotExists + return ErrKeyspaceGroupNotExists(id) } if kg.IsSplitting() { - return ErrKeyspaceGroupInSplit + return ErrKeyspaceGroupInSplit(id) } if kg.IsMerging() { - return ErrKeyspaceGroupInMerging + return ErrKeyspaceGroupInMerging(id) } inKeyspaceGroup := false members := make([]endpoint.KeyspaceGroupMember, 0, len(kg.Members)) @@ -891,15 +894,15 @@ func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uin return err } if kg == nil { - return ErrKeyspaceGroupNotExists + return ErrKeyspaceGroupNotExists(kgID) } // A keyspace group can not be merged if it's in splitting. if kg.IsSplitting() { - return ErrKeyspaceGroupInSplit + return ErrKeyspaceGroupInSplit(kgID) } // A keyspace group can not be split when it is in merging. if kg.IsMerging() { - return ErrKeyspaceGroupInMerging + return ErrKeyspaceGroupInMerging(kgID) } groups[kgID] = kg } @@ -955,11 +958,11 @@ func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error { return err } if mergeTargetKg == nil { - return ErrKeyspaceGroupNotExists + return ErrKeyspaceGroupNotExists(mergeTargetID) } // Check if it's in the merging state. if !mergeTargetKg.IsMergeTarget() { - return ErrKeyspaceGroupNotInMerging + return ErrKeyspaceGroupNotInMerging(mergeTargetID) } // Make sure all merging keyspace groups are deleted. for _, kgID := range mergeTargetKg.MergeState.MergeList { @@ -968,7 +971,7 @@ func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error { return err } if kg != nil { - return ErrKeyspaceGroupNotInMerging + return ErrKeyspaceGroupNotInMerging(kgID) } } mergeTargetKg.MergeState = nil diff --git a/pkg/keyspace/tso_keyspace_group_test.go b/pkg/keyspace/tso_keyspace_group_test.go index 42c8918e78b..0eb71d2d5c4 100644 --- a/pkg/keyspace/tso_keyspace_group_test.go +++ b/pkg/keyspace/tso_keyspace_group_test.go @@ -276,25 +276,25 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() { // finish the split of the keyspace group 2 err = suite.kgm.FinishSplitKeyspaceByID(2) - re.ErrorIs(err, ErrKeyspaceGroupNotInSplit) + re.ErrorContains(err, ErrKeyspaceGroupNotInSplit(2).Error()) // finish the split of a non-existing keyspace group err = suite.kgm.FinishSplitKeyspaceByID(5) - re.ErrorIs(err, ErrKeyspaceGroupNotExists) + re.ErrorContains(err, ErrKeyspaceGroupNotExists(5).Error()) // split the in-split keyspace group err = suite.kgm.SplitKeyspaceGroupByID(2, 4, []uint32{333}) - re.ErrorIs(err, ErrKeyspaceGroupInSplit) + re.ErrorContains(err, ErrKeyspaceGroupInSplit(2).Error()) // remove the in-split keyspace group kg2, err = suite.kgm.DeleteKeyspaceGroupByID(2) re.Nil(kg2) - re.ErrorIs(err, ErrKeyspaceGroupInSplit) + re.ErrorContains(err, ErrKeyspaceGroupInSplit(2).Error()) kg4, err = suite.kgm.DeleteKeyspaceGroupByID(4) re.Nil(kg4) - re.ErrorIs(err, ErrKeyspaceGroupInSplit) + re.ErrorContains(err, ErrKeyspaceGroupInSplit(4).Error()) // update the in-split keyspace group err = suite.kg.kgm.UpdateKeyspaceForGroup(endpoint.Standard, "2", 444, opAdd) - re.ErrorIs(err, ErrKeyspaceGroupInSplit) + re.ErrorContains(err, ErrKeyspaceGroupInSplit(2).Error()) err = suite.kg.kgm.UpdateKeyspaceForGroup(endpoint.Standard, "4", 444, opAdd) - re.ErrorIs(err, ErrKeyspaceGroupInSplit) + re.ErrorContains(err, ErrKeyspaceGroupInSplit(4).Error()) // finish the split of keyspace group 4 err = suite.kgm.FinishSplitKeyspaceByID(4) @@ -314,7 +314,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() { // split a non-existing keyspace group err = suite.kgm.SplitKeyspaceGroupByID(3, 5, nil) - re.ErrorIs(err, ErrKeyspaceGroupNotExists) + re.ErrorContains(err, ErrKeyspaceGroupNotExists(3).Error()) // split into an existing keyspace group err = suite.kgm.SplitKeyspaceGroupByID(2, 4, nil) re.ErrorIs(err, ErrKeyspaceGroupExists) @@ -442,7 +442,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupMerge() { // merge a non-existing keyspace group err = suite.kgm.MergeKeyspaceGroups(4, []uint32{5}) - re.ErrorIs(err, ErrKeyspaceGroupNotExists) + re.ErrorContains(err, ErrKeyspaceGroupNotExists(5).Error()) // merge with the number of keyspace groups exceeds the limit err = suite.kgm.MergeKeyspaceGroups(1, make([]uint32, maxEtcdTxnOps/2)) re.ErrorIs(err, ErrExceedMaxEtcdTxnOps) diff --git a/pkg/keyspace/util.go b/pkg/keyspace/util.go index 8e29e728328..ac8168aa5f2 100644 --- a/pkg/keyspace/util.go +++ b/pkg/keyspace/util.go @@ -49,15 +49,25 @@ var ( // ErrKeyspaceGroupExists indicates target keyspace group already exists. ErrKeyspaceGroupExists = errors.New("keyspace group already exists") // ErrKeyspaceGroupNotExists is used to indicate target keyspace group does not exist. - ErrKeyspaceGroupNotExists = errors.New("keyspace group does not exist") + ErrKeyspaceGroupNotExists = func(groupID uint32) error { + return errors.Errorf("keyspace group %v does not exist", groupID) + } // ErrKeyspaceGroupInSplit is used to indicate target keyspace group is in split state. - ErrKeyspaceGroupInSplit = errors.New("keyspace group is in split state") + ErrKeyspaceGroupInSplit = func(groupID uint32) error { + return errors.Errorf("keyspace group %v is in split state", groupID) + } // ErrKeyspaceGroupNotInSplit is used to indicate target keyspace group is not in split state. - ErrKeyspaceGroupNotInSplit = errors.New("keyspace group is not in split state") + ErrKeyspaceGroupNotInSplit = func(groupID uint32) error { + return errors.Errorf("keyspace group %v is not in split state", groupID) + } // ErrKeyspaceGroupInMerging is used to indicate target keyspace group is in merging state. - ErrKeyspaceGroupInMerging = errors.New("keyspace group is in merging state") + ErrKeyspaceGroupInMerging = func(groupID uint32) error { + return errors.Errorf("keyspace group %v is in merging state", groupID) + } // ErrKeyspaceGroupNotInMerging is used to indicate target keyspace group is not in merging state. - ErrKeyspaceGroupNotInMerging = errors.New("keyspace group is not in merging state") + ErrKeyspaceGroupNotInMerging = func(groupID uint32) error { + return errors.Errorf("keyspace group %v is not in merging state", groupID) + } // ErrKeyspaceNotInKeyspaceGroup is used to indicate target keyspace is not in this keyspace group. ErrKeyspaceNotInKeyspaceGroup = errors.New("keyspace is not in this keyspace group") // ErrNodeNotInKeyspaceGroup is used to indicate the tso node is not in this keyspace group. From c1601ca3f2fbcc37b268f16a84b364050da1ed07 Mon Sep 17 00:00:00 2001 From: buffer <1045931706@qq.com> Date: Wed, 28 Jun 2023 13:29:35 +0800 Subject: [PATCH 8/8] operator: log the cancel reason (#6676) ref tikv/pd#6605, close tikv/pd#6677 log the cancel reason Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/schedule/checker/merge_checker_test.go | 4 +- pkg/schedule/operator/operator.go | 38 +++++++- pkg/schedule/operator/operator_controller.go | 89 ++++++++++++------- .../operator/operator_controller_test.go | 2 +- pkg/schedule/scatter/region_scatterer.go | 2 +- pkg/schedule/schedulers/hot_region_test.go | 8 +- pkg/schedule/schedulers/split_bucket.go | 3 - server/handler.go | 2 +- 8 files changed, 101 insertions(+), 47 deletions(-) diff --git a/pkg/schedule/checker/merge_checker_test.go b/pkg/schedule/checker/merge_checker_test.go index f6f2dd69868..9c38d677619 100644 --- a/pkg/schedule/checker/merge_checker_test.go +++ b/pkg/schedule/checker/merge_checker_test.go @@ -484,7 +484,7 @@ func (suite *mergeCheckerTestSuite) TestStoreLimitWithMerge() { suite.NotNil(ops) suite.True(oc.AddOperator(ops...)) for _, op := range ops { - oc.RemoveOperator(op) + oc.RemoveOperator(op, operator.ExceedStoreLimit) } } regions[2] = regions[2].Clone( @@ -498,7 +498,7 @@ func (suite *mergeCheckerTestSuite) TestStoreLimitWithMerge() { suite.NotNil(ops) suite.True(oc.AddOperator(ops...)) for _, op := range ops { - oc.RemoveOperator(op) + oc.RemoveOperator(op, operator.ExceedStoreLimit) } } { diff --git a/pkg/schedule/operator/operator.go b/pkg/schedule/operator/operator.go index 4a9f9e8cbbd..02a2f1a8a5f 100644 --- a/pkg/schedule/operator/operator.go +++ b/pkg/schedule/operator/operator.go @@ -32,6 +32,39 @@ const ( // OperatorExpireTime is the duration that when an operator is not started // after it, the operator will be considered expired. OperatorExpireTime = 3 * time.Second + cancelReason = "cancel-reason" +) + +// CancelReasonType is the type of cancel reason. +type CancelReasonType string + +var ( + // RegionNotFound is the cancel reason when the region is not found. + RegionNotFound CancelReasonType = "region not found" + // EpochNotMatch is the cancel reason when the region epoch is not match. + EpochNotMatch CancelReasonType = "epoch not match" + // AlreadyExist is the cancel reason when the operator is running. + AlreadyExist CancelReasonType = "already exist" + // AdminStop is the cancel reason when the operator is stopped by adminer. + AdminStop CancelReasonType = "admin stop" + // NotInRunningState is the cancel reason when the operator is not in running state. + NotInRunningState CancelReasonType = "not in running state" + // Succeed is the cancel reason when the operator is finished successfully. + Succeed CancelReasonType = "succeed" + // Timeout is the cancel reason when the operator is timeout. + Timeout CancelReasonType = "timeout" + // Expired is the cancel reason when the operator is expired. + Expired CancelReasonType = "expired" + // NotInCreateStatus is the cancel reason when the operator is not in create status. + NotInCreateStatus CancelReasonType = "not in create status" + // StaleStatus is the cancel reason when the operator is in a stale status. + StaleStatus CancelReasonType = "stale status" + // ExceedStoreLimit is the cancel reason when the operator exceeds the store limit. + ExceedStoreLimit CancelReasonType = "exceed store limit" + // ExceedWaitLimit is the cancel reason when the operator exceeds the waiting queue limit. + ExceedWaitLimit CancelReasonType = "exceed wait limit" + // Unknown is the cancel reason when the operator is cancelled by an unknown reason. + Unknown CancelReasonType = "unknown" ) // Operator contains execution steps generated by scheduler. @@ -227,7 +260,10 @@ func (o *Operator) CheckSuccess() bool { } // Cancel marks the operator canceled. -func (o *Operator) Cancel() bool { +func (o *Operator) Cancel(reason CancelReasonType) bool { + if _, ok := o.AdditionalInfos[cancelReason]; !ok { + o.AdditionalInfos[cancelReason] = string(reason) + } return o.status.To(CANCELED) } diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index cbda045130a..b1e40a35e58 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -125,7 +125,7 @@ func (oc *Controller) Dispatch(region *core.RegionInfo, source string, recordOpS if op.ContainNonWitnessStep() { recordOpStepWithTTL(op.RegionID()) } - if oc.RemoveOperator(op) { + if oc.RemoveOperator(op, Succeed) { operatorWaitCounter.WithLabelValues(op.Desc(), "promote-success").Inc() oc.PromoteWaitingOperator() } @@ -134,7 +134,7 @@ func (oc *Controller) Dispatch(region *core.RegionInfo, source string, recordOpS oc.pushFastOperator(op) } case TIMEOUT: - if oc.RemoveOperator(op) { + if oc.RemoveOperator(op, Timeout) { operatorCounter.WithLabelValues(op.Desc(), "promote-timeout").Inc() oc.PromoteWaitingOperator() } @@ -150,7 +150,7 @@ func (oc *Controller) Dispatch(region *core.RegionInfo, source string, recordOpS failpoint.Inject("unexpectedOperator", func() { panic(op) }) - _ = op.Cancel() + _ = op.Cancel(NotInRunningState) oc.buryOperator(op) operatorWaitCounter.WithLabelValues(op.Desc(), "promote-unexpected").Inc() oc.PromoteWaitingOperator() @@ -162,7 +162,8 @@ func (oc *Controller) Dispatch(region *core.RegionInfo, source string, recordOpS func (oc *Controller) checkStaleOperator(op *Operator, step OpStep, region *core.RegionInfo) bool { err := step.CheckInProgress(oc.cluster, oc.config, region) if err != nil { - if oc.RemoveOperator(op, zap.String("reason", err.Error())) { + log.Info("operator is stale", zap.Uint64("region-id", op.RegionID()), errs.ZapError(err)) + if oc.RemoveOperator(op, StaleStatus) { operatorCounter.WithLabelValues(op.Desc(), "stale").Inc() operatorWaitCounter.WithLabelValues(op.Desc(), "promote-stale").Inc() oc.PromoteWaitingOperator() @@ -177,11 +178,13 @@ func (oc *Controller) checkStaleOperator(op *Operator, step OpStep, region *core latest := region.GetRegionEpoch() changes := latest.GetConfVer() - origin.GetConfVer() if changes > op.ConfVerChanged(region) { + log.Info("operator is stale", + zap.Uint64("region-id", op.RegionID()), + zap.Uint64("diff", changes), + zap.Reflect("latest-epoch", region.GetRegionEpoch())) if oc.RemoveOperator( op, - zap.String("reason", "stale operator, confver does not meet expectations"), - zap.Reflect("latest-epoch", region.GetRegionEpoch()), - zap.Uint64("diff", changes), + EpochNotMatch, ) { operatorCounter.WithLabelValues(op.Desc(), "stale").Inc() operatorWaitCounter.WithLabelValues(op.Desc(), "promote-stale").Inc() @@ -220,7 +223,7 @@ func (oc *Controller) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) { r = oc.cluster.GetRegion(regionID) if r == nil { _ = oc.removeOperatorLocked(op) - if op.Cancel() { + if op.Cancel(RegionNotFound) { log.Warn("remove operator because region disappeared", zap.Uint64("region-id", op.RegionID()), zap.Stringer("operator", op)) @@ -285,14 +288,14 @@ func (oc *Controller) AddWaitingOperator(ops ...*Operator) int { } isMerge = true } - if !oc.checkAddOperator(false, op) { - _ = op.Cancel() + if pass, reason := oc.checkAddOperator(false, op); !pass { + _ = op.Cancel(reason) oc.buryOperator(op) if isMerge { // Merge operation have two operators, cancel them all i++ next := ops[i] - _ = next.Cancel() + _ = next.Cancel(reason) oc.buryOperator(next) } continue @@ -327,9 +330,16 @@ func (oc *Controller) AddOperator(ops ...*Operator) bool { // note: checkAddOperator uses false param for `isPromoting`. // This is used to keep check logic before fixing issue #4946, // but maybe user want to add operator when waiting queue is busy - if oc.exceedStoreLimitLocked(ops...) || !oc.checkAddOperator(false, ops...) { + if oc.exceedStoreLimitLocked(ops...) { for _, op := range ops { - _ = op.Cancel() + _ = op.Cancel(ExceedStoreLimit) + oc.buryOperator(op) + } + return false + } + if pass, reason := oc.checkAddOperator(false, ops...); !pass { + for _, op := range ops { + _ = op.Cancel(reason) oc.buryOperator(op) } return false @@ -354,11 +364,20 @@ func (oc *Controller) PromoteWaitingOperator() { return } operatorWaitCounter.WithLabelValues(ops[0].Desc(), "get").Inc() + if oc.exceedStoreLimitLocked(ops...) { + for _, op := range ops { + operatorWaitCounter.WithLabelValues(op.Desc(), "promote-canceled").Inc() + _ = op.Cancel(ExceedStoreLimit) + oc.buryOperator(op) + } + oc.wopStatus.ops[ops[0].Desc()]-- + continue + } - if oc.exceedStoreLimitLocked(ops...) || !oc.checkAddOperator(true, ops...) { + if pass, reason := oc.checkAddOperator(true, ops...); !pass { for _, op := range ops { operatorWaitCounter.WithLabelValues(op.Desc(), "promote-canceled").Inc() - _ = op.Cancel() + _ = op.Cancel(reason) oc.buryOperator(op) } oc.wopStatus.ops[ops[0].Desc()]-- @@ -382,14 +401,14 @@ func (oc *Controller) PromoteWaitingOperator() { // - The region already has a higher priority or same priority // - Exceed the max number of waiting operators // - At least one operator is expired. -func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) bool { +func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) (bool, CancelReasonType) { for _, op := range ops { region := oc.cluster.GetRegion(op.RegionID()) if region == nil { log.Debug("region not found, cancel add operator", zap.Uint64("region-id", op.RegionID())) operatorWaitCounter.WithLabelValues(op.Desc(), "not-found").Inc() - return false + return false, RegionNotFound } if region.GetRegionEpoch().GetVersion() != op.RegionEpoch().GetVersion() || region.GetRegionEpoch().GetConfVer() != op.RegionEpoch().GetConfVer() { @@ -398,14 +417,14 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) bool zap.Reflect("old", region.GetRegionEpoch()), zap.Reflect("new", op.RegionEpoch())) operatorWaitCounter.WithLabelValues(op.Desc(), "epoch-not-match").Inc() - return false + return false, EpochNotMatch } if old := oc.operators[op.RegionID()]; old != nil && !isHigherPriorityOperator(op, old) { log.Debug("already have operator, cancel add operator", zap.Uint64("region-id", op.RegionID()), zap.Reflect("old", old)) operatorWaitCounter.WithLabelValues(op.Desc(), "already-have").Inc() - return false + return false, AlreadyExist } if op.Status() != CREATED { log.Error("trying to add operator with unexpected status", @@ -416,26 +435,26 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) bool panic(op) }) operatorWaitCounter.WithLabelValues(op.Desc(), "unexpected-status").Inc() - return false + return false, NotInCreateStatus } if !isPromoting && oc.wopStatus.ops[op.Desc()] >= oc.config.GetSchedulerMaxWaitingOperator() { log.Debug("exceed max return false", zap.Uint64("waiting", oc.wopStatus.ops[op.Desc()]), zap.String("desc", op.Desc()), zap.Uint64("max", oc.config.GetSchedulerMaxWaitingOperator())) operatorWaitCounter.WithLabelValues(op.Desc(), "exceed-max").Inc() - return false + return false, ExceedWaitLimit } if op.SchedulerKind() == OpAdmin || op.IsLeaveJointStateOperator() { continue } } - expired := false + var reason CancelReasonType for _, op := range ops { if op.CheckExpired() { - expired = true + reason = Expired operatorWaitCounter.WithLabelValues(op.Desc(), "expired").Inc() } } - return !expired + return reason != Expired, reason } func isHigherPriorityOperator(new, old *Operator) bool { @@ -521,18 +540,24 @@ func (oc *Controller) ack(op *Operator) { } // RemoveOperator removes an operator from the running operators. -func (oc *Controller) RemoveOperator(op *Operator, extraFields ...zap.Field) bool { +func (oc *Controller) RemoveOperator(op *Operator, reasons ...CancelReasonType) bool { oc.Lock() removed := oc.removeOperatorLocked(op) oc.Unlock() + var cancelReason CancelReasonType + if len(reasons) > 0 { + cancelReason = reasons[0] + } else { + cancelReason = Unknown + } if removed { - if op.Cancel() { + if op.Cancel(cancelReason) { log.Info("operator removed", zap.Uint64("region-id", op.RegionID()), zap.Duration("takes", op.RunningTime()), zap.Reflect("operator", op)) } - oc.buryOperator(op, extraFields...) + oc.buryOperator(op) } return removed } @@ -555,7 +580,7 @@ func (oc *Controller) removeOperatorLocked(op *Operator) bool { return false } -func (oc *Controller) buryOperator(op *Operator, extraFields ...zap.Field) { +func (oc *Controller) buryOperator(op *Operator) { st := op.Status() if !IsEndStatus(st) { @@ -567,7 +592,7 @@ func (oc *Controller) buryOperator(op *Operator, extraFields ...zap.Field) { panic(op) }) operatorCounter.WithLabelValues(op.Desc(), "unexpected").Inc() - _ = op.Cancel() + _ = op.Cancel(Unknown) } switch st { @@ -603,15 +628,11 @@ func (oc *Controller) buryOperator(op *Operator, extraFields ...zap.Field) { zap.String("additional-info", op.GetAdditionalInfo())) operatorCounter.WithLabelValues(op.Desc(), "timeout").Inc() case CANCELED: - fields := []zap.Field{ + log.Info("operator canceled", zap.Uint64("region-id", op.RegionID()), zap.Duration("takes", op.RunningTime()), zap.Reflect("operator", op), zap.String("additional-info", op.GetAdditionalInfo()), - } - fields = append(fields, extraFields...) - log.Info("operator canceled", - fields..., ) operatorCounter.WithLabelValues(op.Desc(), "cancel").Inc() } diff --git a/pkg/schedule/operator/operator_controller_test.go b/pkg/schedule/operator/operator_controller_test.go index eb2d69db944..112e5a11f9c 100644 --- a/pkg/schedule/operator/operator_controller_test.go +++ b/pkg/schedule/operator/operator_controller_test.go @@ -248,7 +248,7 @@ func (suite *operatorControllerTestSuite) TestCheckAddUnexpectedStatus() { // finished op canceled op := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 2}) suite.True(oc.checkAddOperator(false, op)) - suite.True(op.Cancel()) + suite.True(op.Cancel(AdminStop)) suite.False(oc.checkAddOperator(false, op)) } { diff --git a/pkg/schedule/scatter/region_scatterer.go b/pkg/schedule/scatter/region_scatterer.go index 54fc291b363..de90228f7f6 100644 --- a/pkg/schedule/scatter/region_scatterer.go +++ b/pkg/schedule/scatter/region_scatterer.go @@ -264,7 +264,7 @@ func (r *RegionScatterer) scatterRegions(regions map[uint64]*core.RegionInfo, fa } failpoint.Inject("scatterHbStreamsDrain", func() { r.opController.GetHBStreams().Drain(1) - r.opController.RemoveOperator(op) + r.opController.RemoveOperator(op, operator.AdminStop) }) } delete(failures, region.GetID()) diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 362758d4ebf..e4cf6b121f8 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -145,7 +145,7 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) { } justDoneOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence { infl := notDoneOpInfluence(region, ty) - infl.op.Cancel() + infl.op.Cancel(operator.AdminStop) return infl } shouldRemoveOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence { @@ -957,7 +957,7 @@ func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 4) cnt++ if cnt == 3 { - re.True(op.Cancel()) + re.True(op.Cancel(operator.AdminStop)) } default: re.FailNow("wrong op: " + op.String()) @@ -1367,14 +1367,14 @@ func checkHotReadRegionScheduleWithPendingInfluence(re *require.Assertions, dim op2 := ops[0] operatorutil.CheckTransferPeer(re, op2, operator.OpHotRegion, 1, 4) // After move-peer, store byte/key rate (min, max): (6.1, 7.1) | 6.1 | 6 | (5, 6) - re.True(op2.Cancel()) + re.True(op2.Cancel(operator.AdminStop)) ops, _ = hb.Schedule(tc, false) op2 = ops[0] operatorutil.CheckTransferPeer(re, op2, operator.OpHotRegion, 1, 4) // After move-peer, store byte/key rate (min, max): (6.1, 7.1) | 6.1 | (6, 6.5) | (5, 5.5) - re.True(op1.Cancel()) + re.True(op1.Cancel(operator.AdminStop)) // store byte/key rate (min, max): (6.6, 7.1) | 6.1 | 6 | (5, 5.5) ops, _ = hb.Schedule(tc, false) diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index fef20974ef0..7452d2ceafa 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -23,7 +23,6 @@ import ( "github.com/gorilla/mux" "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/tikv/pd/pkg/core" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" @@ -251,8 +250,6 @@ func (s *splitBucketScheduler) splitBucket(plan *splitBucketPlan) []*operator.Op return nil } splitBucketNewOperatorCounter.Inc() - op.AdditionalInfos["region-start-key"] = core.HexRegionKeyStr(region.GetStartKey()) - op.AdditionalInfos["region-end-key"] = core.HexRegionKeyStr(region.GetEndKey()) op.AdditionalInfos["hot-degree"] = strconv.FormatInt(int64(splitBucket.HotDegree), 10) return []*operator.Operator{op} } diff --git a/server/handler.go b/server/handler.go index 1fc543827b2..635901fb04a 100644 --- a/server/handler.go +++ b/server/handler.go @@ -419,7 +419,7 @@ func (h *Handler) RemoveOperator(regionID uint64) error { return ErrOperatorNotFound } - _ = c.RemoveOperator(op) + _ = c.RemoveOperator(op, operator.AdminStop) return nil }