Skip to content

Commit

Permalink
mcs: watch scheduling service's primary address (tikv#7072)
Browse files Browse the repository at this point in the history
ref tikv#5839

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] committed Sep 13, 2023
1 parent 2f57a9f commit b8d9c6e
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 32 deletions.
33 changes: 15 additions & 18 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,7 @@ func (m *Participant) setLeader(member participant) {

// unsetLeader unsets the member's leader.
func (m *Participant) unsetLeader() {
var leader participant
switch m.serviceName {
case utils.TSOServiceName:
leader = &tsopb.Participant{}
case utils.SchedulingServiceName:
leader = &schedulingpb.Participant{}
case utils.ResourceManagerServiceName:
leader = &resource_manager.Participant{}
}
leader := NewParticipantByService(m.serviceName)
m.leader.Store(leader)
m.lastLeaderUpdatedTime.Store(time.Now())
}
Expand Down Expand Up @@ -225,15 +217,7 @@ func (m *Participant) PreCheckLeader() error {

// getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func (m *Participant) getPersistentLeader() (participant, int64, error) {
var leader participant
switch m.serviceName {
case utils.TSOServiceName:
leader = &tsopb.Participant{}
case utils.SchedulingServiceName:
leader = &schedulingpb.Participant{}
case utils.ResourceManagerServiceName:
leader = &resource_manager.Participant{}
}
leader := NewParticipantByService(m.serviceName)
ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader)
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -399,3 +383,16 @@ func (m *Participant) campaignCheck() bool {
func (m *Participant) SetCampaignChecker(checker leadershipCheckFunc) {
m.campaignChecker.Store(checker)
}

// NewParticipantByService creates a new participant by service name.
func NewParticipantByService(serviceName string) (p participant) {
switch serviceName {
case utils.TSOServiceName:
p = &tsopb.Participant{}
case utils.SchedulingServiceName:
p = &schedulingpb.Participant{}
case utils.ResourceManagerServiceName:
p = &resource_manager.Participant{}
}
return p
}
6 changes: 6 additions & 0 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ func KeyspaceGroupPrimaryPath(rootPath string, keyspaceGroupID uint32) string {
return path.Join(electionPath, utils.PrimaryKey)
}

// SchedulingPrimaryPath returns the path of scheduling primary.
// Path: /ms/{cluster_id}/scheduling/primary
func SchedulingPrimaryPath(clusterID uint64) string {
return path.Join(SchedulingSvcRootPath(clusterID), utils.PrimaryKey)
}

// KeyspaceGroupsElectionPath returns the path of keyspace groups election.
// default keyspace group: "/ms/{cluster_id}/tso/00000".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}".
Expand Down
5 changes: 4 additions & 1 deletion server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/gorilla/mux"
tsoapi "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
mcs "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/serverapi"
"github.com/tikv/pd/server"
Expand All @@ -39,7 +40,9 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
router.PathPrefix(apiPrefix).Handler(negroni.New(
serverapi.NewRuntimeServiceValidator(svr, group),
serverapi.NewRedirector(svr, serverapi.MicroserviceRedirectRule(
apiPrefix+"/api/v1"+"/admin/reset-ts", tsoapi.APIPathPrefix+"/admin/reset-ts", "tso")),
apiPrefix+"/api/v1"+"/admin/reset-ts",
tsoapi.APIPathPrefix+"/admin/reset-ts",
mcs.TSOServiceName)),
negroni.Wrap(r)),
)

Expand Down
36 changes: 25 additions & 11 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,11 @@ type Server struct {

auditBackends []audit.Backend

registry *registry.ServiceRegistry
mode string
servicePrimaryMap sync.Map /* Store as map[string]string */
tsoPrimaryWatcher *etcdutil.LoopWatcher
registry *registry.ServiceRegistry
mode string
servicePrimaryMap sync.Map /* Store as map[string]string */
tsoPrimaryWatcher *etcdutil.LoopWatcher
schedulingPrimaryWatcher *etcdutil.LoopWatcher
}

// HandlerBuilder builds a server HTTP handler.
Expand Down Expand Up @@ -617,7 +618,7 @@ func (s *Server) startServerLoop(ctx context.Context) {
go s.encryptionKeyManagerLoop()
if s.IsAPIServiceMode() {
s.initTSOPrimaryWatcher()
s.tsoPrimaryWatcher.StartWatchLoop()
s.initSchedulingPrimaryWatcher()
}
}

Expand Down Expand Up @@ -1962,16 +1963,28 @@ func (s *Server) initTSOPrimaryWatcher() {
serviceName := mcs.TSOServiceName
tsoRootPath := endpoint.TSOSvcRootPath(s.clusterID)
tsoServicePrimaryKey := endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, mcs.DefaultKeyspaceGroupID)
s.tsoPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, tsoServicePrimaryKey)
s.tsoPrimaryWatcher.StartWatchLoop()
}

func (s *Server) initSchedulingPrimaryWatcher() {
serviceName := mcs.SchedulingServiceName
primaryKey := endpoint.SchedulingPrimaryPath(s.clusterID)
s.schedulingPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, primaryKey)
s.schedulingPrimaryWatcher.StartWatchLoop()
}

func (s *Server) initServicePrimaryWatcher(serviceName string, primaryKey string) *etcdutil.LoopWatcher {
putFn := func(kv *mvccpb.KeyValue) error {
primary := &tsopb.Participant{} // TODO: use Generics
primary := member.NewParticipantByService(serviceName)
if err := proto.Unmarshal(kv.Value, primary); err != nil {
return err
}
listenUrls := primary.GetListenUrls()
if len(listenUrls) > 0 {
// listenUrls[0] is the primary service endpoint of the keyspace group
s.servicePrimaryMap.Store(serviceName, listenUrls[0])
log.Info("update tso primary", zap.String("primary", listenUrls[0]))
log.Info("update service primary", zap.String("service-name", serviceName), zap.String("primary", listenUrls[0]))
}
return nil
}
Expand All @@ -1981,16 +1994,17 @@ func (s *Server) initTSOPrimaryWatcher() {
if ok {
oldPrimary = v.(string)
}
log.Info("delete tso primary", zap.String("old-primary", oldPrimary))
log.Info("delete service primary", zap.String("service-name", serviceName), zap.String("old-primary", oldPrimary))
s.servicePrimaryMap.Delete(serviceName)
return nil
}
s.tsoPrimaryWatcher = etcdutil.NewLoopWatcher(
name := fmt.Sprintf("%s-primary-watcher", serviceName)
return etcdutil.NewLoopWatcher(
s.serverLoopCtx,
&s.serverLoopWg,
s.client,
"tso-primary-watcher",
tsoServicePrimaryKey,
name,
primaryKey,
putFn,
deleteFn,
func() error { return nil },
Expand Down
5 changes: 5 additions & 0 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,11 @@ func (s *TestServer) GetTSOAllocatorManager() *tso.AllocatorManager {
return s.server.GetTSOAllocatorManager()
}

// GetServicePrimaryAddr returns the primary address of the service.
func (s *TestServer) GetServicePrimaryAddr(ctx context.Context, serviceName string) (string, bool) {
return s.server.GetServicePrimaryAddr(ctx, serviceName)
}

// TestCluster is only for test.
type TestCluster struct {
config *clusterConfig
Expand Down
15 changes: 13 additions & 2 deletions tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/suite"
mcs "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/tests"
"go.uber.org/goleak"
Expand Down Expand Up @@ -107,11 +108,21 @@ func (suite *serverTestSuite) TestPrimaryChange() {
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
primary := tc.GetPrimaryServer()
addr := primary.GetAddr()
oldPrimaryAddr := primary.GetAddr()
re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5)
testutil.Eventually(re, func() bool {
watchedAddr, ok := suite.pdLeader.GetServicePrimaryAddr(suite.ctx, mcs.SchedulingServiceName)
return ok && oldPrimaryAddr == watchedAddr
})
// transfer leader
primary.Close()
tc.WaitForPrimaryServing(re)
primary = tc.GetPrimaryServer()
re.NotEqual(addr, primary.GetAddr())
newPrimaryAddr := primary.GetAddr()
re.NotEqual(oldPrimaryAddr, newPrimaryAddr)
re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5)
testutil.Eventually(re, func() bool {
watchedAddr, ok := suite.pdLeader.GetServicePrimaryAddr(suite.ctx, mcs.SchedulingServiceName)
return ok && newPrimaryAddr == watchedAddr
})
}

0 comments on commit b8d9c6e

Please sign in to comment.