Skip to content

Commit

Permalink
refine code
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Jul 31, 2024
1 parent ea8d9e3 commit ffb7b1b
Show file tree
Hide file tree
Showing 17 changed files with 322 additions and 249 deletions.
5 changes: 0 additions & 5 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,6 @@ func MicroServicePrimary(service string) string {
return fmt.Sprintf("%s/primary/%s", microServicePrefix, service)
}

// MicroServicePrimaryTransfer returns the path of PD HTTP API to transfer the primary of microservice.
func MicroServicePrimaryTransfer(service string) string {
return fmt.Sprintf("%s/primary/transfer/%s", microServicePrefix, service)
}

// GetUpdateKeyspaceConfigURL returns the path of PD HTTP API to update keyspace config.
func GetUpdateKeyspaceConfigURL(keyspaceName string) string {
return fmt.Sprintf(KeyspaceConfig, keyspaceName)
Expand Down
17 changes: 0 additions & 17 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ type Client interface {
/* Micro Service interfaces */
GetMicroServiceMembers(context.Context, string) ([]MicroServiceMember, error)
GetMicroServicePrimary(context.Context, string) (string, error)
TransferMicroServicePrimary(context.Context, string, string) error
DeleteOperators(context.Context) error

/* Keyspace interface */
Expand Down Expand Up @@ -960,22 +959,6 @@ func (c *client) GetMicroServicePrimary(ctx context.Context, service string) (st
return primary, err
}

func (c *client) TransferMicroServicePrimary(ctx context.Context, service, newPrimary string) error {
reqData, err := json.Marshal(struct {
NewPrimary string `json:"new_primary"`
}{
NewPrimary: newPrimary,
})
if err != nil {
return errors.Trace(err)
}
return c.request(ctx, newRequestInfo().
WithName(transferMicroServicePrimaryName).
WithURI(MicroServicePrimaryTransfer(service)).
WithMethod(http.MethodPost).
WithBody(reqData))
}

// GetPDVersion gets the release version of the PD binary.
func (c *client) GetPDVersion(ctx context.Context) (string, error) {
var ver struct {
Expand Down
1 change: 0 additions & 1 deletion client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ const (
getMinResolvedTSByStoresIDsName = "GetMinResolvedTSByStoresIDs"
getMicroServiceMembersName = "GetMicroServiceMembers"
getMicroServicePrimaryName = "GetMicroServicePrimary"
transferMicroServicePrimaryName = "TransferMicroServicePrimary"
getPDVersionName = "GetPDVersion"
resetTSName = "ResetTS"
resetBaseAllocIDName = "ResetBaseAllocID"
Expand Down
5 changes: 3 additions & 2 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,12 +390,13 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
}

for _, ev := range wresp.Events {
if ev.Type == mvccpb.DELETE {
// ensure `{service}/primary/transfer` API will not meet this condition.
if ev.Type == mvccpb.DELETE && !ls.IsPrimary() {
log.Info("current leadership is deleted",
zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
return
}
// ONLY `/ms/primary/transfer` API update primary will meet this condition.
// ONLY `{service}/primary/transfer` API update primary will meet this condition.
if ev.Type == mvccpb.PUT && ls.IsPrimary() {
log.Info("current leadership is updated", zap.Int64("revision", wresp.Header.Revision),
zap.String("leader-key", ls.leaderKey), zap.ByteString("cur-value", ev.Kv.Value),
Expand Down
2 changes: 2 additions & 0 deletions pkg/election/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Lease struct {
expireTime atomic.Value
}

// NewLease creates a new Lease instance.
func NewLease(client *clientv3.Client, purpose string) *Lease {
return &Lease{
Purpose: purpose,
Expand Down Expand Up @@ -117,6 +118,7 @@ func (l *Lease) KeepAlive(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
timeCh := l.keepAliveWorker(ctx, l.leaseTimeout/3)
defer log.Info("lease keep alive stopped", zap.String("purpose", l.Purpose))

var maxExpire time.Time
timer := time.NewTimer(l.leaseTimeout)
Expand Down
31 changes: 22 additions & 9 deletions pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
Expand Down Expand Up @@ -82,8 +83,13 @@ func GetMSMembers(serviceName string, client *clientv3.Client) ([]ServiceRegistr
}

// TransferPrimary transfers the primary of the specified service.
func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimary string, keyspaceGroupID uint32) error {
log.Info("transfer primary", zap.String("service", serviceName), zap.String("from", oldPrimary), zap.String("to", newPrimary))
// keyspaceGroupID is optional, only used for TSO service.
func TransferPrimary(client *clientv3.Client, lease *election.Lease, serviceName,
oldPrimaryAddr, newPrimary string, keyspaceGroupID uint32) error {
if lease == nil {
return errors.New("current lease is nil, please check leadership")
}
log.Info("try to transfer primary", zap.String("service", serviceName), zap.String("from", oldPrimaryAddr), zap.String("to", newPrimary))
entries, err := GetMSMembers(serviceName, client)
if err != nil {
return err
Expand All @@ -96,12 +102,13 @@ func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimar

var primaryIDs []string
for _, member := range entries {
if (newPrimary == "" && member.ServiceAddr != oldPrimary) || (newPrimary != "" && member.Name == newPrimary) {
// TODO: judged by `addr` and `name` now, should unify them to `name` in the future.
if (newPrimary == "" && member.ServiceAddr != oldPrimaryAddr) || (newPrimary != "" && member.Name == newPrimary) {
primaryIDs = append(primaryIDs, member.ServiceAddr)
}
}
if len(primaryIDs) == 0 {
return errors.Errorf("no valid secondary to transfer primary, from %s to %s", oldPrimary, newPrimary)
return errors.Errorf("no valid secondary to transfer primary, from %s to %s", oldPrimaryAddr, newPrimary)
}

r := rand.New(rand.NewSource(time.Now().UnixNano()))
Expand All @@ -112,6 +119,17 @@ func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimar
return errors.Errorf("failed to get cluster ID: %v", err)
}

// update expected primary flag
grantResp, err := client.Grant(client.Ctx(), utils.DefaultLeaderLease)
if err != nil {
return errors.Errorf("failed to grant lease for expected primary, err: %v", err)
}

// revoke current primary's lease to ensure keepalive goroutine of primary exits.
if err := lease.Close(); err != nil {
return errors.Errorf("failed to revoke current primary's lease: %v", err)
}

var primaryPath string
switch serviceName {
case utils.SchedulingServiceName:
Expand All @@ -120,11 +138,6 @@ func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimar
tsoRootPath := endpoint.TSOSvcRootPath(clusterID)
primaryPath = endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, keyspaceGroupID)
}

grantResp, err := client.Grant(client.Ctx(), utils.DefaultLeaderLease)
if err != nil {
return errors.Errorf("failed to grant lease for expected primary, err: %v", err)
}
_, err = utils.MarkExpectedPrimaryFlag(client, primaryPath, primaryIDs[nextPrimaryID], grantResp.ID)
if err != nil {
return errors.Errorf("failed to mark expected primary flag for %s, err: %v", serviceName, err)
Expand Down
37 changes: 37 additions & 0 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/response"
Expand Down Expand Up @@ -119,6 +120,7 @@ func NewService(srv *scheserver.Service) *Service {
s.RegisterHotspotRouter()
s.RegisterRegionsRouter()
s.RegisterStoresRouter()
s.RegisterPrimaryRouter()
return s
}

Expand Down Expand Up @@ -225,6 +227,12 @@ func (s *Service) RegisterConfigRouter() {
regions.GET("/:id/labels", getRegionLabels)
}

// RegisterPrimaryRouter registers the router of the config handler.
func (s *Service) RegisterPrimaryRouter() {
router := s.root.Group("primary")
router.POST("transfer", transferPrimary)
}

// @Tags admin
// @Summary Change the log level.
// @Produce json
Expand Down Expand Up @@ -1477,3 +1485,32 @@ func getRegionByID(c *gin.Context) {
}
c.Data(http.StatusOK, "application/json", b)
}

// TransferPrimary transfers the primary member.
// @Tags primary
// @Summary Transfer the primary member of the specified service.
// @Produce json
// @Param service path string true "service name"
// @Param new_primary body string false "new primary name"
// @Success 200 string string
// @Router /primary/transfer [post]
func transferPrimary(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
var input map[string]string
if err := c.BindJSON(&input); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

newPrimary := ""
if v, ok := input["new_primary"]; ok {
newPrimary = v
}

if err := discovery.TransferPrimary(svr.GetClient(), svr.GetParticipant().GetExpectedPrimaryLease(),
mcsutils.SchedulingServiceName, svr.GetAddr(), newPrimary, 0); err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, "success")
}
52 changes: 5 additions & 47 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
Expand All @@ -62,7 +61,6 @@ import (
"github.com/tikv/pd/pkg/utils/memberutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"github.com/tikv/pd/pkg/versioninfo"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -253,7 +251,7 @@ func (s *Server) primaryElectionLoop() {
// To make sure the expected primary(if existed) and new primary are on the same server.
expectedPrimary := utils.GetExpectedPrimaryFlag(s.GetClient(), s.participant.GetLeaderPath())
// skip campaign the primary if the expected primary is not empty and not equal to the current memberValue.
// expected primary ONLY SET BY `/ms/primary/transfer` API.
// expected primary ONLY SET BY `{service}/primary/transfer` API.
if expectedPrimary != "" && !strings.Contains(s.participant.MemberValue(), expectedPrimary) {
log.Info("skip campaigning of scheduling primary and check later",
zap.String("server-name", s.Name()),
Expand Down Expand Up @@ -309,12 +307,13 @@ func (s *Server) campaignLeader() {
}()
// check expected primary and watch the primary.
exitPrimary := make(chan struct{})
expectedLease, revision, err := s.keepExpectedPrimaryAlive(ctx)
lease, err := utils.KeepExpectedPrimaryAlive(ctx, s.GetClient(), exitPrimary,
s.cfg.LeaderLease, s.participant.GetLeaderPath(), s.participant.MemberValue(), utils.SchedulingServiceName)
if err != nil {
log.Error("prepare primary watch error", errs.ZapError(err))
log.Error("prepare scheduling primary watch error", errs.ZapError(err))
return
}
go s.expectedPrimaryWatch(ctx, expectedLease, revision+1, exitPrimary)
s.participant.SetExpectedPrimaryLease(lease)
s.participant.EnableLeader()

member.ServiceMemberGauge.WithLabelValues(serviceName).Set(1)
Expand All @@ -341,47 +340,6 @@ func (s *Server) campaignLeader() {
}
}

// keepExpectedPrimaryAlive keeps the expected primary alive.
// We use lease to keep `expected primary` healthy.
// ONLY reset by the following conditions:
// - changed by`/ms/primary/transfer` API.
// - server closed.
func (s *Server) keepExpectedPrimaryAlive(ctx context.Context) (*election.Leadership, int64, error) {
const purpose = "scheduling-primary-watch"
lease := election.NewLease(s.GetClient(), purpose)
if err := lease.Grant(s.cfg.LeaderLease); err != nil {
log.Error("grant lease for expected primary error", errs.ZapError(err))
return nil, 0, err
}
revision, err := utils.MarkExpectedPrimaryFlag(s.GetClient(), s.participant.GetLeaderPath(), s.participant.MemberValue(),
lease.ID.Load().(clientv3.LeaseID))
if err != nil {
log.Error("mark expected primary error", errs.ZapError(err))
return nil, 0, err
}
// Keep alive the current primary leadership to indicate that the server is still alive.
// Watch the expected primary path to check whether the expected primary has changed.
expectedPrimary := election.NewLeadership(s.GetClient(), utils.ExpectedPrimaryPath(s.participant.GetLeaderPath()), purpose)
expectedPrimary.SetLease(lease)
expectedPrimary.Keep(ctx)
return expectedPrimary, revision, nil
}

// expectedPrimaryWatch watches `/ms/primary/transfer` API whether changed the expected primary.
func (s *Server) expectedPrimaryWatch(ctx context.Context, expectedPrimary *election.Leadership, revision int64, exitPrimary chan struct{}) {
log.Info("scheduling primary start to watch the expected primary", zap.String("scheduling-primary", s.participant.MemberValue()))
expectedPrimary.SetPrimaryWatch(true)
expectedPrimary.Watch(ctx, revision)
expectedPrimary.Reset()
defer log.Info("scheduling primary exit the expected primary watch loop")
select {
case <-ctx.Done():
return
case exitPrimary <- struct{}{}:
return
}
}

// Close closes the server.
func (s *Server) Close() {
if !atomic.CompareAndSwapInt64(&s.isRunning, 1, 0) {
Expand Down
51 changes: 51 additions & 0 deletions pkg/mcs/tso/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ import (
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
tsoserver "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/tikv/pd/pkg/utils/logutil"
Expand Down Expand Up @@ -97,6 +99,7 @@ func NewService(srv *tsoserver.Service) *Service {
s.RegisterKeyspaceGroupRouter()
s.RegisterHealthRouter()
s.RegisterConfigRouter()
s.RegisterPrimaryRouter()
return s
}

Expand Down Expand Up @@ -125,6 +128,12 @@ func (s *Service) RegisterConfigRouter() {
router.GET("", getConfig)
}

// RegisterPrimaryRouter registers the router of the config handler.
func (s *Service) RegisterPrimaryRouter() {
router := s.root.Group("primary")
router.POST("transfer", transferPrimary)
}

func changeLogLevel(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service)
var level string
Expand Down Expand Up @@ -265,3 +274,45 @@ func getConfig(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service)
c.IndentedJSON(http.StatusOK, svr.GetConfig())
}

// TransferPrimary transfers the primary member of the specified service.
// @Tags primary
// @Summary Transfer the primary member of the specified service.
// @Produce json
// @Param service path string true "service name"
// @Param new_primary body string false "new primary name"
// @Param keyspace_group_id body string false "keyspace group id"
// @Success 200 string string
// @Router /primary/transfer [post]
func transferPrimary(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service)
var input map[string]string
if err := c.BindJSON(&input); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

newPrimary, keyspaceGroupID := "", utils.DefaultKeyspaceGroupID
if v, ok := input["new_primary"]; ok {
newPrimary = v
}

allocator, err := svr.GetTSOAllocatorManager(keyspaceGroupID)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}

globalAllocator, err := allocator.GetAllocator(tso.GlobalDCLocation)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}

if err := discovery.TransferPrimary(svr.GetClient(), globalAllocator.(*tso.GlobalTSOAllocator).GetExpectedPrimaryLease(),
utils.TSOServiceName, svr.GetAddr(), newPrimary, keyspaceGroupID); err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, "success")
}
Loading

0 comments on commit ffb7b1b

Please sign in to comment.