From ffb7b1b6d50d970dfb0e8785c428dc025ed709e0 Mon Sep 17 00:00:00 2001 From: husharp Date: Wed, 31 Jul 2024 10:36:55 +0800 Subject: [PATCH] refine code Signed-off-by: husharp --- client/http/api.go | 5 - client/http/interface.go | 17 --- client/http/request_info.go | 1 - pkg/election/leadership.go | 5 +- pkg/election/lease.go | 2 + pkg/mcs/discovery/discover.go | 31 +++-- pkg/mcs/scheduling/server/apis/v1/api.go | 37 ++++++ pkg/mcs/scheduling/server/server.go | 52 +------- pkg/mcs/tso/server/apis/v1/api.go | 51 ++++++++ pkg/mcs/utils/expected_primary.go | 114 ++++++++++++++++++ pkg/mcs/utils/util.go | 39 ------ pkg/member/member.go | 8 +- pkg/member/participant.go | 24 +++- pkg/tso/allocator_manager.go | 2 - pkg/tso/global_allocator.go | 63 +++------- server/apiv2/handlers/micro_service.go | 54 --------- tests/integrations/mcs/members/member_test.go | 66 +++++++--- 17 files changed, 322 insertions(+), 249 deletions(-) create mode 100644 pkg/mcs/utils/expected_primary.go diff --git a/client/http/api.go b/client/http/api.go index f787327a97b..3376a48770d 100644 --- a/client/http/api.go +++ b/client/http/api.go @@ -206,11 +206,6 @@ func MicroServicePrimary(service string) string { return fmt.Sprintf("%s/primary/%s", microServicePrefix, service) } -// MicroServicePrimaryTransfer returns the path of PD HTTP API to transfer the primary of microservice. -func MicroServicePrimaryTransfer(service string) string { - return fmt.Sprintf("%s/primary/transfer/%s", microServicePrefix, service) -} - // GetUpdateKeyspaceConfigURL returns the path of PD HTTP API to update keyspace config. func GetUpdateKeyspaceConfigURL(keyspaceName string) string { return fmt.Sprintf(KeyspaceConfig, keyspaceName) diff --git a/client/http/interface.go b/client/http/interface.go index 108e1a25e40..f90ab19624f 100644 --- a/client/http/interface.go +++ b/client/http/interface.go @@ -103,7 +103,6 @@ type Client interface { /* Micro Service interfaces */ GetMicroServiceMembers(context.Context, string) ([]MicroServiceMember, error) GetMicroServicePrimary(context.Context, string) (string, error) - TransferMicroServicePrimary(context.Context, string, string) error DeleteOperators(context.Context) error /* Keyspace interface */ @@ -960,22 +959,6 @@ func (c *client) GetMicroServicePrimary(ctx context.Context, service string) (st return primary, err } -func (c *client) TransferMicroServicePrimary(ctx context.Context, service, newPrimary string) error { - reqData, err := json.Marshal(struct { - NewPrimary string `json:"new_primary"` - }{ - NewPrimary: newPrimary, - }) - if err != nil { - return errors.Trace(err) - } - return c.request(ctx, newRequestInfo(). - WithName(transferMicroServicePrimaryName). - WithURI(MicroServicePrimaryTransfer(service)). - WithMethod(http.MethodPost). - WithBody(reqData)) -} - // GetPDVersion gets the release version of the PD binary. func (c *client) GetPDVersion(ctx context.Context) (string, error) { var ver struct { diff --git a/client/http/request_info.go b/client/http/request_info.go index 9aa2b50f1b4..783220bcc60 100644 --- a/client/http/request_info.go +++ b/client/http/request_info.go @@ -77,7 +77,6 @@ const ( getMinResolvedTSByStoresIDsName = "GetMinResolvedTSByStoresIDs" getMicroServiceMembersName = "GetMicroServiceMembers" getMicroServicePrimaryName = "GetMicroServicePrimary" - transferMicroServicePrimaryName = "TransferMicroServicePrimary" getPDVersionName = "GetPDVersion" resetTSName = "ResetTS" resetBaseAllocIDName = "ResetBaseAllocID" diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index 52e14eb9880..b706407a9c2 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -390,12 +390,13 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { } for _, ev := range wresp.Events { - if ev.Type == mvccpb.DELETE { + // ensure `{service}/primary/transfer` API will not meet this condition. + if ev.Type == mvccpb.DELETE && !ls.IsPrimary() { log.Info("current leadership is deleted", zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose)) return } - // ONLY `/ms/primary/transfer` API update primary will meet this condition. + // ONLY `{service}/primary/transfer` API update primary will meet this condition. if ev.Type == mvccpb.PUT && ls.IsPrimary() { log.Info("current leadership is updated", zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.ByteString("cur-value", ev.Kv.Value), diff --git a/pkg/election/lease.go b/pkg/election/lease.go index 055e7da41de..c2e9eb97117 100644 --- a/pkg/election/lease.go +++ b/pkg/election/lease.go @@ -48,6 +48,7 @@ type Lease struct { expireTime atomic.Value } +// NewLease creates a new Lease instance. func NewLease(client *clientv3.Client, purpose string) *Lease { return &Lease{ Purpose: purpose, @@ -117,6 +118,7 @@ func (l *Lease) KeepAlive(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() timeCh := l.keepAliveWorker(ctx, l.leaseTimeout/3) + defer log.Info("lease keep alive stopped", zap.String("purpose", l.Purpose)) var maxExpire time.Time timer := time.NewTimer(l.leaseTimeout) diff --git a/pkg/mcs/discovery/discover.go b/pkg/mcs/discovery/discover.go index a6cd44eb367..c95f8944835 100644 --- a/pkg/mcs/discovery/discover.go +++ b/pkg/mcs/discovery/discover.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/storage/endpoint" @@ -82,8 +83,13 @@ func GetMSMembers(serviceName string, client *clientv3.Client) ([]ServiceRegistr } // TransferPrimary transfers the primary of the specified service. -func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimary string, keyspaceGroupID uint32) error { - log.Info("transfer primary", zap.String("service", serviceName), zap.String("from", oldPrimary), zap.String("to", newPrimary)) +// keyspaceGroupID is optional, only used for TSO service. +func TransferPrimary(client *clientv3.Client, lease *election.Lease, serviceName, + oldPrimaryAddr, newPrimary string, keyspaceGroupID uint32) error { + if lease == nil { + return errors.New("current lease is nil, please check leadership") + } + log.Info("try to transfer primary", zap.String("service", serviceName), zap.String("from", oldPrimaryAddr), zap.String("to", newPrimary)) entries, err := GetMSMembers(serviceName, client) if err != nil { return err @@ -96,12 +102,13 @@ func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimar var primaryIDs []string for _, member := range entries { - if (newPrimary == "" && member.ServiceAddr != oldPrimary) || (newPrimary != "" && member.Name == newPrimary) { + // TODO: judged by `addr` and `name` now, should unify them to `name` in the future. + if (newPrimary == "" && member.ServiceAddr != oldPrimaryAddr) || (newPrimary != "" && member.Name == newPrimary) { primaryIDs = append(primaryIDs, member.ServiceAddr) } } if len(primaryIDs) == 0 { - return errors.Errorf("no valid secondary to transfer primary, from %s to %s", oldPrimary, newPrimary) + return errors.Errorf("no valid secondary to transfer primary, from %s to %s", oldPrimaryAddr, newPrimary) } r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -112,6 +119,17 @@ func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimar return errors.Errorf("failed to get cluster ID: %v", err) } + // update expected primary flag + grantResp, err := client.Grant(client.Ctx(), utils.DefaultLeaderLease) + if err != nil { + return errors.Errorf("failed to grant lease for expected primary, err: %v", err) + } + + // revoke current primary's lease to ensure keepalive goroutine of primary exits. + if err := lease.Close(); err != nil { + return errors.Errorf("failed to revoke current primary's lease: %v", err) + } + var primaryPath string switch serviceName { case utils.SchedulingServiceName: @@ -120,11 +138,6 @@ func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimar tsoRootPath := endpoint.TSOSvcRootPath(clusterID) primaryPath = endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, keyspaceGroupID) } - - grantResp, err := client.Grant(client.Ctx(), utils.DefaultLeaderLease) - if err != nil { - return errors.Errorf("failed to grant lease for expected primary, err: %v", err) - } _, err = utils.MarkExpectedPrimaryFlag(client, primaryPath, primaryIDs[nextPrimaryID], grantResp.ID) if err != nil { return errors.Errorf("failed to mark expected primary flag for %s, err: %v", serviceName, err) diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index 39aa11927ca..9e0280e7781 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/discovery" scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/response" @@ -119,6 +120,7 @@ func NewService(srv *scheserver.Service) *Service { s.RegisterHotspotRouter() s.RegisterRegionsRouter() s.RegisterStoresRouter() + s.RegisterPrimaryRouter() return s } @@ -225,6 +227,12 @@ func (s *Service) RegisterConfigRouter() { regions.GET("/:id/labels", getRegionLabels) } +// RegisterPrimaryRouter registers the router of the config handler. +func (s *Service) RegisterPrimaryRouter() { + router := s.root.Group("primary") + router.POST("transfer", transferPrimary) +} + // @Tags admin // @Summary Change the log level. // @Produce json @@ -1477,3 +1485,32 @@ func getRegionByID(c *gin.Context) { } c.Data(http.StatusOK, "application/json", b) } + +// TransferPrimary transfers the primary member. +// @Tags primary +// @Summary Transfer the primary member of the specified service. +// @Produce json +// @Param service path string true "service name" +// @Param new_primary body string false "new primary name" +// @Success 200 string string +// @Router /primary/transfer [post] +func transferPrimary(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + var input map[string]string + if err := c.BindJSON(&input); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + newPrimary := "" + if v, ok := input["new_primary"]; ok { + newPrimary = v + } + + if err := discovery.TransferPrimary(svr.GetClient(), svr.GetParticipant().GetExpectedPrimaryLease(), + mcsutils.SchedulingServiceName, svr.GetAddr(), newPrimary, 0); err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, "success") +} diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 573d4153262..603ca696233 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -41,7 +41,6 @@ import ( bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/cache" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/discovery" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" @@ -62,7 +61,6 @@ import ( "github.com/tikv/pd/pkg/utils/memberutil" "github.com/tikv/pd/pkg/utils/metricutil" "github.com/tikv/pd/pkg/versioninfo" - "go.etcd.io/etcd/clientv3" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -253,7 +251,7 @@ func (s *Server) primaryElectionLoop() { // To make sure the expected primary(if existed) and new primary are on the same server. expectedPrimary := utils.GetExpectedPrimaryFlag(s.GetClient(), s.participant.GetLeaderPath()) // skip campaign the primary if the expected primary is not empty and not equal to the current memberValue. - // expected primary ONLY SET BY `/ms/primary/transfer` API. + // expected primary ONLY SET BY `{service}/primary/transfer` API. if expectedPrimary != "" && !strings.Contains(s.participant.MemberValue(), expectedPrimary) { log.Info("skip campaigning of scheduling primary and check later", zap.String("server-name", s.Name()), @@ -309,12 +307,13 @@ func (s *Server) campaignLeader() { }() // check expected primary and watch the primary. exitPrimary := make(chan struct{}) - expectedLease, revision, err := s.keepExpectedPrimaryAlive(ctx) + lease, err := utils.KeepExpectedPrimaryAlive(ctx, s.GetClient(), exitPrimary, + s.cfg.LeaderLease, s.participant.GetLeaderPath(), s.participant.MemberValue(), utils.SchedulingServiceName) if err != nil { - log.Error("prepare primary watch error", errs.ZapError(err)) + log.Error("prepare scheduling primary watch error", errs.ZapError(err)) return } - go s.expectedPrimaryWatch(ctx, expectedLease, revision+1, exitPrimary) + s.participant.SetExpectedPrimaryLease(lease) s.participant.EnableLeader() member.ServiceMemberGauge.WithLabelValues(serviceName).Set(1) @@ -341,47 +340,6 @@ func (s *Server) campaignLeader() { } } -// keepExpectedPrimaryAlive keeps the expected primary alive. -// We use lease to keep `expected primary` healthy. -// ONLY reset by the following conditions: -// - changed by`/ms/primary/transfer` API. -// - server closed. -func (s *Server) keepExpectedPrimaryAlive(ctx context.Context) (*election.Leadership, int64, error) { - const purpose = "scheduling-primary-watch" - lease := election.NewLease(s.GetClient(), purpose) - if err := lease.Grant(s.cfg.LeaderLease); err != nil { - log.Error("grant lease for expected primary error", errs.ZapError(err)) - return nil, 0, err - } - revision, err := utils.MarkExpectedPrimaryFlag(s.GetClient(), s.participant.GetLeaderPath(), s.participant.MemberValue(), - lease.ID.Load().(clientv3.LeaseID)) - if err != nil { - log.Error("mark expected primary error", errs.ZapError(err)) - return nil, 0, err - } - // Keep alive the current primary leadership to indicate that the server is still alive. - // Watch the expected primary path to check whether the expected primary has changed. - expectedPrimary := election.NewLeadership(s.GetClient(), utils.ExpectedPrimaryPath(s.participant.GetLeaderPath()), purpose) - expectedPrimary.SetLease(lease) - expectedPrimary.Keep(ctx) - return expectedPrimary, revision, nil -} - -// expectedPrimaryWatch watches `/ms/primary/transfer` API whether changed the expected primary. -func (s *Server) expectedPrimaryWatch(ctx context.Context, expectedPrimary *election.Leadership, revision int64, exitPrimary chan struct{}) { - log.Info("scheduling primary start to watch the expected primary", zap.String("scheduling-primary", s.participant.MemberValue())) - expectedPrimary.SetPrimaryWatch(true) - expectedPrimary.Watch(ctx, revision) - expectedPrimary.Reset() - defer log.Info("scheduling primary exit the expected primary watch loop") - select { - case <-ctx.Done(): - return - case exitPrimary <- struct{}{}: - return - } -} - // Close closes the server. func (s *Server) Close() { if !atomic.CompareAndSwapInt64(&s.isRunning, 1, 0) { diff --git a/pkg/mcs/tso/server/apis/v1/api.go b/pkg/mcs/tso/server/apis/v1/api.go index 44f4b353d58..8d0b656ba41 100644 --- a/pkg/mcs/tso/server/apis/v1/api.go +++ b/pkg/mcs/tso/server/apis/v1/api.go @@ -26,9 +26,11 @@ import ( "github.com/pingcap/kvproto/pkg/tsopb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/discovery" tsoserver "github.com/tikv/pd/pkg/mcs/tso/server" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" "github.com/tikv/pd/pkg/utils/logutil" @@ -97,6 +99,7 @@ func NewService(srv *tsoserver.Service) *Service { s.RegisterKeyspaceGroupRouter() s.RegisterHealthRouter() s.RegisterConfigRouter() + s.RegisterPrimaryRouter() return s } @@ -125,6 +128,12 @@ func (s *Service) RegisterConfigRouter() { router.GET("", getConfig) } +// RegisterPrimaryRouter registers the router of the config handler. +func (s *Service) RegisterPrimaryRouter() { + router := s.root.Group("primary") + router.POST("transfer", transferPrimary) +} + func changeLogLevel(c *gin.Context) { svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service) var level string @@ -265,3 +274,45 @@ func getConfig(c *gin.Context) { svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service) c.IndentedJSON(http.StatusOK, svr.GetConfig()) } + +// TransferPrimary transfers the primary member of the specified service. +// @Tags primary +// @Summary Transfer the primary member of the specified service. +// @Produce json +// @Param service path string true "service name" +// @Param new_primary body string false "new primary name" +// @Param keyspace_group_id body string false "keyspace group id" +// @Success 200 string string +// @Router /primary/transfer [post] +func transferPrimary(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service) + var input map[string]string + if err := c.BindJSON(&input); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + newPrimary, keyspaceGroupID := "", utils.DefaultKeyspaceGroupID + if v, ok := input["new_primary"]; ok { + newPrimary = v + } + + allocator, err := svr.GetTSOAllocatorManager(keyspaceGroupID) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + + globalAllocator, err := allocator.GetAllocator(tso.GlobalDCLocation) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + + if err := discovery.TransferPrimary(svr.GetClient(), globalAllocator.(*tso.GlobalTSOAllocator).GetExpectedPrimaryLease(), + utils.TSOServiceName, svr.GetAddr(), newPrimary, keyspaceGroupID); err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, "success") +} diff --git a/pkg/mcs/utils/expected_primary.go b/pkg/mcs/utils/expected_primary.go new file mode 100644 index 00000000000..f91f3a1a175 --- /dev/null +++ b/pkg/mcs/utils/expected_primary.go @@ -0,0 +1,114 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "context" + "fmt" + + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/election" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/etcdutil" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" +) + +// ExpectedPrimaryFlag is the flag to indicate the expected primary. +// 1. When the primary was campaigned successfully, it will set the `expected_primary` flag. +// 2. Using `{service}/primary/transfer` API will revoke the previous lease and set a new `expected_primary` flag. +// This flag used to help new primary to campaign successfully while other secondaries can skip the campaign. +const ExpectedPrimaryFlag = "expected_primary" + +// ExpectedPrimaryPath formats the primary path with the expected primary flag. +func ExpectedPrimaryPath(primaryPath string) string { + return fmt.Sprintf("%s/%s", primaryPath, ExpectedPrimaryFlag) +} + +// GetExpectedPrimaryFlag gets the expected primary flag. +func GetExpectedPrimaryFlag(client *clientv3.Client, primaryPath string) string { + path := ExpectedPrimaryPath(primaryPath) + primary, err := etcdutil.GetValue(client, path) + if err != nil { + log.Error("get expected primary flag error", errs.ZapError(err), zap.String("primary-path", path)) + return "" + } + + return string(primary) +} + +// MarkExpectedPrimaryFlag marks the expected primary flag when the primary is specified. +func MarkExpectedPrimaryFlag(client *clientv3.Client, primaryPath string, leaderRaw string, leaseID clientv3.LeaseID) (int64, error) { + path := ExpectedPrimaryPath(primaryPath) + log.Info("set expected primary flag", zap.String("primary-path", path), zap.String("leader-raw", leaderRaw)) + // write a flag to indicate the expected primary. + resp, err := kv.NewSlowLogTxn(client). + Then(clientv3.OpPut(ExpectedPrimaryPath(primaryPath), leaderRaw, clientv3.WithLease(leaseID))). + Commit() + if err != nil || !resp.Succeeded { + log.Error("mark expected primary error", errs.ZapError(err), zap.String("primary-path", path)) + return 0, err + } + return resp.Header.Revision, nil +} + +// KeepExpectedPrimaryAlive keeps the expected primary alive. +// We use lease to keep `expected primary` healthy. +// ONLY reset by the following conditions: +// - changed by `{service}/primary/transfer` API. +// - leader lease expired. +// ONLY primary called this function. +func KeepExpectedPrimaryAlive(ctx context.Context, cli *clientv3.Client, exitPrimary chan struct{}, + leaseTimeout int64, leaderPath, memberValue, service string) (*election.Lease, error) { + log.Info("primary start to watch the expected primary", zap.String("service", service), zap.String("primary-value", memberValue)) + service = fmt.Sprintf("%s-expected-primary", service) + lease := election.NewLease(cli, service) + if err := lease.Grant(leaseTimeout); err != nil { + return nil, err + } + + revision, err := MarkExpectedPrimaryFlag(cli, leaderPath, memberValue, lease.ID.Load().(clientv3.LeaseID)) + if err != nil { + log.Error("mark expected primary error", errs.ZapError(err)) + return nil, err + } + // Keep alive the current expected primary leadership to indicate that the server is still alive. + // Watch the expected primary path to check whether the expected primary has changed by `{service}/primary/transfer` API. + expectedPrimary := election.NewLeadership(cli, ExpectedPrimaryPath(leaderPath), service) + expectedPrimary.SetLease(lease) + expectedPrimary.Keep(ctx) + + go watchExpectedPrimary(ctx, expectedPrimary, revision+1, exitPrimary) + return lease, nil +} + +// watchExpectedPrimary watches `{service}/primary/transfer` API whether changed the expected primary. +func watchExpectedPrimary(ctx context.Context, + expectedPrimary *election.Leadership, revision int64, exitPrimary chan struct{}) { + expectedPrimary.SetPrimaryWatch(true) + // ONLY exited watch by the following conditions: + // - changed by `{service}/primary/transfer` API. + // - leader lease expired. + expectedPrimary.Watch(ctx, revision) + expectedPrimary.Reset() + defer log.Info("primary exit the primary watch loop") + select { + case <-ctx.Done(): + return + case exitPrimary <- struct{}{}: + return + } +} diff --git a/pkg/mcs/utils/util.go b/pkg/mcs/utils/util.go index d724fb28010..fb78f0b4be3 100644 --- a/pkg/mcs/utils/util.go +++ b/pkg/mcs/utils/util.go @@ -16,7 +16,6 @@ package utils import ( "context" - "fmt" "net" "net/http" "os" @@ -33,7 +32,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/soheilhy/cmux" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" "github.com/tikv/pd/pkg/utils/etcdutil" @@ -53,13 +51,6 @@ const ( ClusterIDPath = "/pd/cluster_id" // retryInterval is the interval to retry. retryInterval = time.Second - // ExpectedPrimaryFlag is the flag to indicate the expected primary, ONLY marked BY `/ms/primary/transfer` API. - // This flag likes a fence to avoid exited 2 primaries in the cluster simultaneously. - // 1. Since follower will campaign a new primary when it found the `leader_key` is deleted. - // **We can ensure `expected_primary` is set before deleting the `leader_key`.** - // 2. Old primary will mark `expected_primary` firstly, - // then delete the `leader_key` which will trigger the follower to campaign a new primary. - ExpectedPrimaryFlag = "expected_primary" ) // InitClusterID initializes the cluster ID. @@ -79,36 +70,6 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err return 0, errors.Errorf("failed to init cluster ID after retrying %d times", maxRetryTimes) } -// ExpectedPrimaryPath formats the primary path with the expected primary flag. -func ExpectedPrimaryPath(primaryPath string) string { - return fmt.Sprintf("%s/%s", primaryPath, ExpectedPrimaryFlag) -} - -// GetExpectedPrimaryFlag gets the expected primary flag. -func GetExpectedPrimaryFlag(client *clientv3.Client, primaryPath string) string { - primary, err := etcdutil.GetValue(client, ExpectedPrimaryPath(primaryPath)) - if err != nil { - log.Error("get expected primary flag error", errs.ZapError(err)) - return "" - } - - return string(primary) -} - -// MarkExpectedPrimaryFlag marks the expected primary flag when the primary is specified. -func MarkExpectedPrimaryFlag(client *clientv3.Client, primaryPath string, leaderRaw string, leaseID clientv3.LeaseID) (int64, error) { - log.Info("set expected primary flag", zap.String("leader-path", ExpectedPrimaryPath(primaryPath))) - // write a flag to indicate the current primary has exited - resp, err := kv.NewSlowLogTxn(client). - Then(clientv3.OpPut(ExpectedPrimaryPath(primaryPath), leaderRaw, clientv3.WithLease(leaseID))). - Commit() - if err != nil || !resp.Succeeded { - log.Error("mark expected primary error", errs.ZapError(err)) - return 0, err - } - return resp.Header.Revision, nil -} - // PromHandler is a handler to get prometheus metrics. func PromHandler() gin.HandlerFunc { return func(c *gin.Context) { diff --git a/pkg/member/member.go b/pkg/member/member.go index 7a58a976f28..32dab54b4b2 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -148,8 +148,8 @@ func (m *EmbeddedEtcdMember) setLeader(member *pdpb.Member) { m.lastLeaderUpdatedTime.Store(time.Now()) } -// UnsetLeader unsets the member's PD leader. -func (m *EmbeddedEtcdMember) UnsetLeader() { +// unsetLeader unsets the member's PD leader. +func (m *EmbeddedEtcdMember) unsetLeader() { m.leader.Store(&pdpb.Member{}) m.lastLeaderUpdatedTime.Store(time.Now()) } @@ -270,14 +270,14 @@ func (m *EmbeddedEtcdMember) CheckLeader() (ElectionLeader, bool) { func (m *EmbeddedEtcdMember) WatchLeader(ctx context.Context, leader *pdpb.Member, revision int64) { m.setLeader(leader) m.leadership.Watch(ctx, revision) - m.UnsetLeader() + m.unsetLeader() } // ResetLeader is used to reset the PD member's current leadership. // Basically it will reset the leader lease and unset leader info. func (m *EmbeddedEtcdMember) ResetLeader() { m.leadership.Reset() - m.UnsetLeader() + m.unsetLeader() } // CheckPriority checks whether the etcd leader should be moved according to the priority. diff --git a/pkg/member/participant.go b/pkg/member/participant.go index d70dd43aa9c..6f9c44d8c27 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -67,6 +67,8 @@ type Participant struct { campaignChecker atomic.Value // Store as leadershipCheckFunc // lastLeaderUpdatedTime is the last time when the leader is updated. lastLeaderUpdatedTime atomic.Value + // expectedPrimaryLease is the expected lease for the primary. + expectedPrimaryLease atomic.Value // stored as *election.Lease } // NewParticipant create a new Participant. @@ -154,8 +156,8 @@ func (m *Participant) setLeader(member participant) { m.lastLeaderUpdatedTime.Store(time.Now()) } -// UnsetLeader unsets the member's leader. -func (m *Participant) UnsetLeader() { +// unsetLeader unsets the member's leader. +func (m *Participant) unsetLeader() { leader := NewParticipantByService(m.serviceName) m.leader.Store(leader) m.lastLeaderUpdatedTime.Store(time.Now()) @@ -264,14 +266,14 @@ func (m *Participant) CheckLeader() (ElectionLeader, bool) { func (m *Participant) WatchLeader(ctx context.Context, leader participant, revision int64) { m.setLeader(leader) m.leadership.Watch(ctx, revision) - m.UnsetLeader() + m.unsetLeader() } // ResetLeader is used to reset the member's current leadership. // Basically it will reset the leader lease and unset leader info. func (m *Participant) ResetLeader() { m.leadership.Reset() - m.UnsetLeader() + m.unsetLeader() } // IsSameLeader checks whether a server is the leader itself. @@ -374,6 +376,20 @@ func (m *Participant) SetCampaignChecker(checker leadershipCheckFunc) { m.campaignChecker.Store(checker) } +// SetExpectedPrimaryLease sets the expected lease for the primary. +func (m *Participant) SetExpectedPrimaryLease(lease *election.Lease) { + m.expectedPrimaryLease.Store(lease) +} + +// GetExpectedPrimaryLease gets the expected lease for the primary. +func (m *Participant) GetExpectedPrimaryLease() *election.Lease { + l := m.expectedPrimaryLease.Load() + if l == nil { + return nil + } + return l.(*election.Lease) +} + // NewParticipantByService creates a new participant by service name. func NewParticipantByService(serviceName string) (p participant) { switch serviceName { diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index bc369a9e297..62a4fb97a57 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -145,8 +145,6 @@ type ElectionMember interface { GetDCLocationPath(id uint64) string // PreCheckLeader does some pre-check before checking whether it's the leader. PreCheckLeader() error - // UnsetLeader unsets the member's leader. - UnsetLeader() } // AllocatorManager is used to manage the TSO Allocators a PD server holds. diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 4988d7e4ae0..b7bff866b27 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -37,7 +37,6 @@ import ( "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/utils/typeutil" - "go.etcd.io/etcd/clientv3" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -81,8 +80,10 @@ type GlobalTSOAllocator struct { // for global TSO synchronization am *AllocatorManager // for election use - member ElectionMember - timestampOracle *timestampOracle + member ElectionMember + // expectedPrimaryLease is used to store the expected primary lease. + expectedPrimaryLease atomic.Value // store as *election.LeaderLease + timestampOracle *timestampOracle // syncRTT is the RTT duration a SyncMaxTS RPC call will cost, // which is used to estimate the MaxTS in a Global TSO generation // to reduce the gRPC network IO latency. @@ -566,7 +567,7 @@ func (gta *GlobalTSOAllocator) primaryElectionLoop() { // To make sure the expected primary(if existed) and new primary are on the same server. expectedPrimary := mcsutils.GetExpectedPrimaryFlag(gta.member.Client(), gta.member.GetLeaderPath()) // skip campaign the primary if the expected primary is not empty and not equal to the current memberValue. - // expected primary ONLY SET BY `/ms/primary/transfer` API. + // expected primary ONLY SET BY `{service}/primary/transfer` API. if expectedPrimary != "" && !strings.Contains(gta.member.MemberValue(), expectedPrimary) { log.Info("skip campaigning of tso primary and check later", zap.String("server-name", gta.member.Name()), @@ -639,12 +640,13 @@ func (gta *GlobalTSOAllocator) campaignLeader() { // check expected primary and watch the primary. exitPrimary := make(chan struct{}) - expectedLease, revision, err := gta.keepExpectedPrimaryAlive(ctx) + lease, err := mcsutils.KeepExpectedPrimaryAlive(ctx, gta.member.Client(), exitPrimary, + gta.am.leaderLease, gta.member.GetLeaderPath(), gta.member.MemberValue(), mcsutils.TSOServiceName) if err != nil { - log.Error("prepare primary watch error", errs.ZapError(err)) + log.Error("prepare tso primary watch error", errs.ZapError(err)) return } - go gta.expectedPrimaryWatch(ctx, expectedLease, revision+1, exitPrimary) + gta.expectedPrimaryLease.Store(lease) gta.member.EnableLeader() tsoLabel := fmt.Sprintf("TSO Service Group %d", gta.getGroupID()) @@ -684,48 +686,13 @@ func (gta *GlobalTSOAllocator) campaignLeader() { } } -// keepExpectedPrimaryAlive keeps the expected primary alive. -// We use lease to keep `expected primary` healthy. -// ONLY reset by the following conditions: -// - changed by`/ms/primary/transfer` API. -// - server closed. -func (gta *GlobalTSOAllocator) keepExpectedPrimaryAlive(ctx context.Context) (*election.Leadership, int64, error) { - const purpose = "tso-primary-watch" - cli := gta.member.Client() - newLease := election.NewLease(cli, purpose) - if err := newLease.Grant(gta.am.leaderLease); err != nil { - return nil, 0, err - } - - revision, err := mcsutils.MarkExpectedPrimaryFlag(cli, gta.member.GetLeaderPath(), gta.member.MemberValue(), - newLease.ID.Load().(clientv3.LeaseID)) - if err != nil { - log.Error("mark expected primary error", errs.ZapError(err)) - return nil, 0, err - } - // Keep alive the current primary leadership to indicate that the server is still alive. - // Watch the expected primary path to check whether the expected primary has changed. - expectedLease := election.NewLeadership(cli, mcsutils.ExpectedPrimaryPath(gta.member.GetLeaderPath()), purpose) - expectedLease.SetLease(newLease) - expectedLease.Keep(ctx) - return expectedLease, revision, nil -} - -// primaryWatch watches `/ms/primary/transfer` API whether changed the expected primary. -func (gta *GlobalTSOAllocator) expectedPrimaryWatch(ctx context.Context, expectedLease *election.Leadership, revision int64, exitPrimary chan struct{}) { - log.Info("tso primary start to watch the primary", - logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), - zap.String("campaign-tso-primary-name", gta.member.Name())) - expectedLease.SetPrimaryWatch(true) - expectedLease.Watch(ctx, revision) - expectedLease.Reset() - defer log.Info("tso primary exit the primary watch loop") - select { - case <-ctx.Done(): - return - case exitPrimary <- struct{}{}: - return +// GetExpectedPrimaryLease returns the expected primary lease. +func (gta *GlobalTSOAllocator) GetExpectedPrimaryLease() *election.Lease { + l := gta.expectedPrimaryLease.Load() + if l == nil { + return nil } + return l.(*election.Lease) } func (gta *GlobalTSOAllocator) getMetrics() *tsoMetrics { diff --git a/server/apiv2/handlers/micro_service.go b/server/apiv2/handlers/micro_service.go index b1c290a1fb1..fd44665530f 100644 --- a/server/apiv2/handlers/micro_service.go +++ b/server/apiv2/handlers/micro_service.go @@ -16,11 +16,9 @@ package handlers import ( "net/http" - "strconv" "github.com/gin-gonic/gin" "github.com/tikv/pd/pkg/mcs/discovery" - "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/server" "github.com/tikv/pd/server/apiv2/middlewares" ) @@ -30,7 +28,6 @@ func RegisterMicroService(r *gin.RouterGroup) { router := r.Group("ms") router.GET("members/:service", GetMembers) router.GET("primary/:service", GetPrimary) - router.POST("primary/transfer/:service", TransferPrimary) } // GetMembers gets all members of the cluster for the specified service. @@ -80,54 +77,3 @@ func GetPrimary(c *gin.Context) { c.AbortWithStatusJSON(http.StatusInternalServerError, "please specify service") } - -// TransferPrimary transfers the primary member of the specified service. -// @Tags primary -// @Summary Transfer the primary member of the specified service. -// @Produce json -// @Param service path string true "service name" -// @Param new_primary body string false "new primary name" -// @Success 200 string string -// @Router /ms/primary/transfer/{service} [post] -func TransferPrimary(c *gin.Context) { - svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) - if !svr.IsAPIServiceMode() { - c.AbortWithStatusJSON(http.StatusNotFound, "not support micro service") - return - } - - if service := c.Param("service"); len(service) > 0 { - var input map[string]string - if err := c.BindJSON(&input); err != nil { - c.String(http.StatusBadRequest, err.Error()) - return - } - - newPrimary, keyspaceGroupID := "", utils.DefaultKeyspaceGroupID - if v, ok := input["new_primary"]; ok { - newPrimary = v - } - - if v, ok := input["keyspace_group_id"]; ok { - keyspaceGroupIDRaw, err := strconv.ParseUint(v, 10, 32) - if err != nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) - return - } - keyspaceGroupID = uint32(keyspaceGroupIDRaw) - } - oldPrimary, _ := svr.GetServicePrimaryAddr(c.Request.Context(), service) - if oldPrimary == newPrimary { - c.AbortWithStatusJSON(http.StatusInternalServerError, "new primary is the same as the old one") - return - } - if err := discovery.TransferPrimary(svr.GetClient(), service, oldPrimary, newPrimary, keyspaceGroupID); err != nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) - return - } - c.IndentedJSON(http.StatusOK, "success") - return - } - - c.AbortWithStatusJSON(http.StatusInternalServerError, "please specify service") -} diff --git a/tests/integrations/mcs/members/member_test.go b/tests/integrations/mcs/members/member_test.go index e5fadf73b59..30f18eaf719 100644 --- a/tests/integrations/mcs/members/member_test.go +++ b/tests/integrations/mcs/members/member_test.go @@ -15,7 +15,11 @@ package members_test import ( + "bytes" "context" + "encoding/json" + "fmt" + "net/http" "testing" "time" @@ -158,10 +162,6 @@ func (suite *memberTestSuite) TestCampaignPrimaryWhileServerClose() { func (suite *memberTestSuite) TestTransferPrimary() { re := suite.Require() - primary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, "tso") - re.NoError(err) - re.NotEmpty(primary) - supportedServices := []string{"tso", "scheduling"} for _, service := range supportedServices { var nodes map[string]bs.Server @@ -173,10 +173,17 @@ func (suite *memberTestSuite) TestTransferPrimary() { } // Test resign primary by random - primary, err = suite.pdClient.GetMicroServicePrimary(suite.ctx, service) + primary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service) re.NoError(err) - err = suite.pdClient.TransferMicroServicePrimary(suite.ctx, service, "") + + newPrimaryData := make(map[string]any) + newPrimaryData["new_primary"] = "" + data, _ := json.Marshal(newPrimaryData) + resp, err := tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, service), + "application/json", bytes.NewBuffer(data)) re.NoError(err) + re.Equal(http.StatusOK, resp.StatusCode) + resp.Body.Close() testutil.Eventually(re, func() bool { for _, member := range nodes { @@ -187,7 +194,7 @@ func (suite *memberTestSuite) TestTransferPrimary() { return false }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) - primary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service) + primary, err = suite.pdClient.GetMicroServicePrimary(suite.ctx, service) re.NoError(err) // Test transfer primary to a specific node @@ -198,10 +205,16 @@ func (suite *memberTestSuite) TestTransferPrimary() { break } } - err = suite.pdClient.TransferMicroServicePrimary(suite.ctx, service, newPrimary) + newPrimaryData["new_primary"] = newPrimary + data, _ = json.Marshal(newPrimaryData) + resp, err = tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, service), + "application/json", bytes.NewBuffer(data)) re.NoError(err) + re.Equal(http.StatusOK, resp.StatusCode) + resp.Body.Close() testutil.Eventually(re, func() bool { + println("newPrimary", newPrimary, nodes[newPrimary].IsServing()) return nodes[newPrimary].IsServing() }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) @@ -211,17 +224,18 @@ func (suite *memberTestSuite) TestTransferPrimary() { // Test transfer primary to a non-exist node newPrimary = "http://" - err = suite.pdClient.TransferMicroServicePrimary(suite.ctx, service, newPrimary) - re.Error(err) + newPrimaryData["new_primary"] = newPrimary + data, _ = json.Marshal(newPrimaryData) + resp, err = tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, service), + "application/json", bytes.NewBuffer(data)) + re.NoError(err) + re.Equal(http.StatusInternalServerError, resp.StatusCode) + resp.Body.Close() } } func (suite *memberTestSuite) TestCampaignPrimaryAfterTransfer() { re := suite.Require() - primary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, "tso") - re.NoError(err) - re.NotEmpty(primary) - supportedServices := []string{"tso", "scheduling"} for _, service := range supportedServices { var nodes map[string]bs.Server @@ -243,8 +257,14 @@ func (suite *memberTestSuite) TestCampaignPrimaryAfterTransfer() { break } } - err = suite.pdClient.TransferMicroServicePrimary(suite.ctx, service, newPrimary) + newPrimaryData := make(map[string]any) + newPrimaryData["new_primary"] = newPrimary + data, _ := json.Marshal(newPrimaryData) + resp, err := tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, service), + "application/json", bytes.NewBuffer(data)) re.NoError(err) + re.Equal(http.StatusOK, resp.StatusCode) + resp.Body.Close() tests.WaitForPrimaryServing(re, nodes) newPrimary, err = suite.pdClient.GetMicroServicePrimary(suite.ctx, service) @@ -292,8 +312,14 @@ func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpired() { } // Mock the new primary can not grant leader which means the lease will expire re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/skipGrantLeader", `return()`)) - err = suite.pdClient.TransferMicroServicePrimary(suite.ctx, service, newPrimary) + newPrimaryData := make(map[string]any) + newPrimaryData["new_primary"] = newPrimary + data, _ := json.Marshal(newPrimaryData) + resp, err := tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, service), + "application/json", bytes.NewBuffer(data)) re.NoError(err) + re.Equal(http.StatusOK, resp.StatusCode) + resp.Body.Close() // Wait for the old primary exit and new primary campaign // cannot check newPrimary isServing when skipGrantLeader is enabled @@ -338,8 +364,14 @@ func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpiredAndServerDown( } // Mock the new primary can not grant leader which means the lease will expire re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/skipGrantLeader", `return()`)) - err = suite.pdClient.TransferMicroServicePrimary(suite.ctx, service, newPrimary) + newPrimaryData := make(map[string]any) + newPrimaryData["new_primary"] = "" + data, _ := json.Marshal(newPrimaryData) + resp, err := tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, service), + "application/json", bytes.NewBuffer(data)) re.NoError(err) + re.Equal(http.StatusOK, resp.StatusCode) + resp.Body.Close() // Wait for the old primary exit and new primary campaign // cannot check newPrimary isServing when skipGrantLeader is enabled