Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into fix_coordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
HuSharp committed Sep 13, 2023
2 parents 68a1107 + b8d9c6e commit c67ed32
Show file tree
Hide file tree
Showing 28 changed files with 437 additions and 222 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27
github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/common v0.26.0
github.com/sasha-s/go-deadlock v0.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 h1:K9lZMYuDuAiR5kOjFESwJ8KfSb4ui5zX6vZGbUp58uk=
github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM=
github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 h1:oyrCfNlAWmLlUfEr+7YTSBo29SP/J1N8hnxBt5yUABo=
github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
8 changes: 4 additions & 4 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -7094,14 +7094,14 @@
"expr": "-sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-leader-scheduler\"}[1m])) by (source)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}",
"legendFormat": "store-{{source}}",
"refId": "A"
},
{
"expr": "sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-leader-scheduler\"}[1m])) by (target)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}",
"legendFormat": "store-{{target}}",
"refId": "B"
}
],
Expand Down Expand Up @@ -7198,14 +7198,14 @@
"expr": "-sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-region-scheduler\"}[1m])) by (source)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}",
"legendFormat": "store-{{source}}",
"refId": "A"
},
{
"expr": "sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-region-scheduler\"}[1m])) by (target)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}",
"legendFormat": "store-{{target}}",
"refId": "B"
}
],
Expand Down
2 changes: 1 addition & 1 deletion pkg/basicserver/basic_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ type Server interface {
// IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise.
IsServing() bool
// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
AddServiceReadyCallback(callbacks ...func(context.Context))
AddServiceReadyCallback(callbacks ...func(context.Context) error)
}
3 changes: 2 additions & 1 deletion pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (m *Manager) GetBasicServer() bs.Server {
}

// Init initializes the resource group manager.
func (m *Manager) Init(ctx context.Context) {
func (m *Manager) Init(ctx context.Context) error {
// Todo: If we can modify following configs in the future, we should reload these configs.
// Store the controller config into the storage.
m.storage.SaveControllerConfig(m.controllerConfig)
Expand Down Expand Up @@ -156,6 +156,7 @@ func (m *Manager) Init(ctx context.Context) {
m.persistLoop(ctx)
}()
log.Info("resource group manager finishes initialization")
return nil
}

// AddResourceGroup puts a resource group.
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type Server struct {
service *Service

// primaryCallbacks will be called after the server becomes leader.
primaryCallbacks []func(context.Context)
primaryCallbacks []func(context.Context) error

serviceRegister *discovery.ServiceRegister
}
Expand Down Expand Up @@ -232,7 +232,7 @@ func (s *Server) IsClosed() bool {
}

// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) {
s.primaryCallbacks = append(s.primaryCallbacks, callbacks...)
}

Expand Down
18 changes: 13 additions & 5 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,27 @@ func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConf

// AllocID allocates a new ID.
func (c *Cluster) AllocID() (uint64, error) {
cli := c.apiServerLeader.Load().(pdpb.PDClient)
if cli == nil {
c.checkMembershipCh <- struct{}{}
return 0, errors.New("API server leader is not found")
client, err := c.getAPIServerLeaderClient()
if err != nil {
return 0, err
}
resp, err := cli.AllocID(c.ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}})
resp, err := client.AllocID(c.ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}})
if err != nil {
c.checkMembershipCh <- struct{}{}
return 0, err
}
return resp.GetId(), nil
}

func (c *Cluster) getAPIServerLeaderClient() (pdpb.PDClient, error) {
cli := c.apiServerLeader.Load()
if cli == nil {
c.checkMembershipCh <- struct{}{}
return nil, errors.New("API server leader is not found")
}
return cli.(pdpb.PDClient), nil
}

// SwitchAPIServerLeader switches the API server leader.
func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool {
old := c.apiServerLeader.Load()
Expand Down
13 changes: 13 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/mcs/utils"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/configutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
Expand Down Expand Up @@ -239,6 +240,18 @@ func (o *PersistConfig) SetScheduleConfig(cfg *sc.ScheduleConfig) {
o.schedule.Store(cfg)
}

// AdjustScheduleCfg adjusts the schedule config.
func (o *PersistConfig) AdjustScheduleCfg(scheduleCfg *sc.ScheduleConfig) {
// In case we add new default schedulers.
for _, ps := range sc.DefaultSchedulers {
if slice.NoneOf(scheduleCfg.Schedulers, func(i int) bool {
return scheduleCfg.Schedulers[i].Type == ps.Type
}) {
scheduleCfg.Schedulers = append(scheduleCfg.Schedulers, ps)
}
}
}

// GetReplicationConfig returns replication configurations.
func (o *PersistConfig) GetReplicationConfig() *sc.ReplicationConfig {
return o.replication.Load().(*sc.ReplicationConfig)
Expand Down
1 change: 1 addition & 0 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (cw *Watcher) initializeConfigWatcher() error {
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
cw.AdjustScheduleCfg(&cfg.Schedule)
cw.SetClusterVersion(&cfg.ClusterVersion)
cw.SetScheduleConfig(&cfg.Schedule)
cw.SetReplicationConfig(&cfg.Replication)
Expand Down
66 changes: 45 additions & 21 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ type Server struct {
checkMembershipCh chan struct{}

// primaryCallbacks will be called after the server becomes leader.
primaryCallbacks []func(context.Context)
primaryCallbacks []func(context.Context) error
primaryExitCallbacks []func()

// for service registry
serviceID *discovery.ServiceRegistryEntry
Expand Down Expand Up @@ -164,6 +165,9 @@ func (s *Server) updateAPIServerMemberLoop() {
case <-ticker.C:
case <-s.checkMembershipCh:
}
if !s.IsServing() {
continue
}
members, err := s.GetClient().MemberList(ctx)
if err != nil {
log.Warn("failed to list members", errs.ZapError(err))
Expand Down Expand Up @@ -247,9 +251,16 @@ func (s *Server) campaignLeader() {

log.Info("triggering the primary callback functions")
for _, cb := range s.primaryCallbacks {
cb(ctx)
if err := cb(ctx); err != nil {
log.Error("failed to trigger the primary callback functions", errs.ZapError(err))
return
}
}

defer func() {
for _, cb := range s.primaryExitCallbacks {
cb()
}
}()
s.participant.EnableLeader()
log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name()))

Expand Down Expand Up @@ -283,10 +294,6 @@ func (s *Server) Close() {
utils.StopHTTPServer(s)
utils.StopGRPCServer(s)
s.GetListener().Close()
s.GetCoordinator().Stop()
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
s.serverLoopCancel()
s.serverLoopWg.Wait()

Expand All @@ -313,10 +320,15 @@ func (s *Server) IsClosed() bool {
}

// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) {
s.primaryCallbacks = append(s.primaryCallbacks, callbacks...)
}

// AddServiceExitCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) AddServiceExitCallback(callbacks ...func()) {
s.primaryExitCallbacks = append(s.primaryExitCallbacks, callbacks...)
}

// GetTLSConfig gets the security config.
func (s *Server) GetTLSConfig() *grpcutil.TLSConfig {
return &s.cfg.Security.TLSConfig
Expand Down Expand Up @@ -381,20 +393,10 @@ func (s *Server) startServer() (err error) {
ListenUrls: []string{s.cfg.AdvertiseListenAddr},
}
s.participant.InitInfo(p, endpoint.SchedulingSvcRootPath(s.clusterID), utils.PrimaryKey, "primary election")
s.basicCluster = core.NewBasicCluster()
err = s.startWatcher()
if err != nil {
return err
}
s.storage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(s.GetClient(), endpoint.PDRootPath(s.clusterID)), nil)
s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster)
s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh)
if err != nil {
return err
}

s.service = &Service{Server: s}
s.AddServiceReadyCallback(s.startCluster)
s.AddServiceExitCallback(s.stopCluster)
if err := s.InitListener(s.GetTLSConfig(), s.cfg.ListenAddr); err != nil {
return err
}
Expand All @@ -406,7 +408,6 @@ func (s *Server) startServer() (err error) {
go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.GetListener())
s.checkMembershipCh <- struct{}{}
<-serverReadyChan
go s.GetCoordinator().RunUntilStop()

// Run callbacks
log.Info("triggering the start callback functions")
Expand All @@ -429,6 +430,29 @@ func (s *Server) startServer() (err error) {
return nil
}

func (s *Server) startCluster(context.Context) error {
s.basicCluster = core.NewBasicCluster()
err := s.startWatcher()
if err != nil {
return err
}
s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster)
s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh)
if err != nil {
return err
}
go s.GetCoordinator().RunUntilStop()
return nil
}

func (s *Server) stopCluster() {
s.GetCoordinator().Stop()
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
}

func (s *Server) startWatcher() (err error) {
s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.basicCluster)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (s *Server) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error {

// AddServiceReadyCallback implements basicserver.
// It adds callbacks when it's ready for providing tso service.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) {
// Do nothing here. The primary of each keyspace group assigned to this host
// will respond to the requests accordingly.
}
Expand Down
33 changes: 15 additions & 18 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,7 @@ func (m *Participant) setLeader(member participant) {

// unsetLeader unsets the member's leader.
func (m *Participant) unsetLeader() {
var leader participant
switch m.serviceName {
case utils.TSOServiceName:
leader = &tsopb.Participant{}
case utils.SchedulingServiceName:
leader = &schedulingpb.Participant{}
case utils.ResourceManagerServiceName:
leader = &resource_manager.Participant{}
}
leader := NewParticipantByService(m.serviceName)
m.leader.Store(leader)
m.lastLeaderUpdatedTime.Store(time.Now())
}
Expand Down Expand Up @@ -225,15 +217,7 @@ func (m *Participant) PreCheckLeader() error {

// getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func (m *Participant) getPersistentLeader() (participant, int64, error) {
var leader participant
switch m.serviceName {
case utils.TSOServiceName:
leader = &tsopb.Participant{}
case utils.SchedulingServiceName:
leader = &schedulingpb.Participant{}
case utils.ResourceManagerServiceName:
leader = &resource_manager.Participant{}
}
leader := NewParticipantByService(m.serviceName)
ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader)
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -399,3 +383,16 @@ func (m *Participant) campaignCheck() bool {
func (m *Participant) SetCampaignChecker(checker leadershipCheckFunc) {
m.campaignChecker.Store(checker)
}

// NewParticipantByService creates a new participant by service name.
func NewParticipantByService(serviceName string) (p participant) {
switch serviceName {
case utils.TSOServiceName:
p = &tsopb.Participant{}
case utils.SchedulingServiceName:
p = &schedulingpb.Participant{}
case utils.ResourceManagerServiceName:
p = &resource_manager.Participant{}
}
return p
}
6 changes: 6 additions & 0 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ func KeyspaceGroupPrimaryPath(rootPath string, keyspaceGroupID uint32) string {
return path.Join(electionPath, utils.PrimaryKey)
}

// SchedulingPrimaryPath returns the path of scheduling primary.
// Path: /ms/{cluster_id}/scheduling/primary
func SchedulingPrimaryPath(clusterID uint64) string {
return path.Join(SchedulingSvcRootPath(clusterID), utils.PrimaryKey)
}

// KeyspaceGroupsElectionPath returns the path of keyspace groups election.
// default keyspace group: "/ms/{cluster_id}/tso/00000".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}".
Expand Down
4 changes: 2 additions & 2 deletions server/api/min_resolved_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type minResolvedTS struct {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /min-resolved-ts/{store_id} [get]
func (h *minResolvedTSHandler) GetStoreMinResolvedTS(w http.ResponseWriter, r *http.Request) {
c := h.svr.GetRaftCluster()
c := getCluster(r)
idStr := mux.Vars(r)["store_id"]
storeID, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
Expand Down Expand Up @@ -84,7 +84,7 @@ func (h *minResolvedTSHandler) GetStoreMinResolvedTS(w http.ResponseWriter, r *h
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /min-resolved-ts [get]
func (h *minResolvedTSHandler) GetMinResolvedTS(w http.ResponseWriter, r *http.Request) {
c := h.svr.GetRaftCluster()
c := getCluster(r)
scopeMinResolvedTS := c.GetMinResolvedTS()
persistInterval := c.GetPDServerConfig().MinResolvedTSPersistenceInterval

Expand Down
Loading

0 comments on commit c67ed32

Please sign in to comment.