Skip to content

Commit

Permalink
Merge branch 'master' into split-scheduler-test
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored Sep 11, 2023
2 parents a8c474c + 68df4b6 commit 61bc0ba
Show file tree
Hide file tree
Showing 15 changed files with 199 additions and 62 deletions.
2 changes: 1 addition & 1 deletion pkg/basicserver/basic_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ type Server interface {
// IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise.
IsServing() bool
// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
AddServiceReadyCallback(callbacks ...func(context.Context))
AddServiceReadyCallback(callbacks ...func(context.Context) error)
}
3 changes: 2 additions & 1 deletion pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (m *Manager) GetBasicServer() bs.Server {
}

// Init initializes the resource group manager.
func (m *Manager) Init(ctx context.Context) {
func (m *Manager) Init(ctx context.Context) error {
// Todo: If we can modify following configs in the future, we should reload these configs.
// Store the controller config into the storage.
m.storage.SaveControllerConfig(m.controllerConfig)
Expand Down Expand Up @@ -156,6 +156,7 @@ func (m *Manager) Init(ctx context.Context) {
m.persistLoop(ctx)
}()
log.Info("resource group manager finishes initialization")
return nil
}

// AddResourceGroup puts a resource group.
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type Server struct {
service *Service

// primaryCallbacks will be called after the server becomes leader.
primaryCallbacks []func(context.Context)
primaryCallbacks []func(context.Context) error

serviceRegister *discovery.ServiceRegister
}
Expand Down Expand Up @@ -232,7 +232,7 @@ func (s *Server) IsClosed() bool {
}

// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) {
s.primaryCallbacks = append(s.primaryCallbacks, callbacks...)
}

Expand Down
18 changes: 13 additions & 5 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,27 @@ func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConf

// AllocID allocates a new ID.
func (c *Cluster) AllocID() (uint64, error) {
cli := c.apiServerLeader.Load().(pdpb.PDClient)
if cli == nil {
c.checkMembershipCh <- struct{}{}
return 0, errors.New("API server leader is not found")
client, err := c.getAPIServerLeaderClient()
if err != nil {
return 0, err
}
resp, err := cli.AllocID(c.ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}})
resp, err := client.AllocID(c.ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}})
if err != nil {
c.checkMembershipCh <- struct{}{}
return 0, err
}
return resp.GetId(), nil
}

func (c *Cluster) getAPIServerLeaderClient() (pdpb.PDClient, error) {
cli := c.apiServerLeader.Load()
if cli == nil {
c.checkMembershipCh <- struct{}{}
return nil, errors.New("API server leader is not found")
}
return cli.(pdpb.PDClient), nil
}

// SwitchAPIServerLeader switches the API server leader.
func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool {
old := c.apiServerLeader.Load()
Expand Down
13 changes: 13 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/mcs/utils"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/configutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
Expand Down Expand Up @@ -239,6 +240,18 @@ func (o *PersistConfig) SetScheduleConfig(cfg *sc.ScheduleConfig) {
o.schedule.Store(cfg)
}

// AdjustScheduleCfg adjusts the schedule config.
func (o *PersistConfig) AdjustScheduleCfg(scheduleCfg *sc.ScheduleConfig) {
// In case we add new default schedulers.
for _, ps := range sc.DefaultSchedulers {
if slice.NoneOf(scheduleCfg.Schedulers, func(i int) bool {
return scheduleCfg.Schedulers[i].Type == ps.Type
}) {
scheduleCfg.Schedulers = append(scheduleCfg.Schedulers, ps)
}
}
}

// GetReplicationConfig returns replication configurations.
func (o *PersistConfig) GetReplicationConfig() *sc.ReplicationConfig {
return o.replication.Load().(*sc.ReplicationConfig)
Expand Down
1 change: 1 addition & 0 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (cw *Watcher) initializeConfigWatcher() error {
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
cw.AdjustScheduleCfg(&cfg.Schedule)
cw.SetClusterVersion(&cfg.ClusterVersion)
cw.SetScheduleConfig(&cfg.Schedule)
cw.SetReplicationConfig(&cfg.Replication)
Expand Down
66 changes: 45 additions & 21 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ type Server struct {
checkMembershipCh chan struct{}

// primaryCallbacks will be called after the server becomes leader.
primaryCallbacks []func(context.Context)
primaryCallbacks []func(context.Context) error
primaryExitCallbacks []func()

// for service registry
serviceID *discovery.ServiceRegistryEntry
Expand Down Expand Up @@ -164,6 +165,9 @@ func (s *Server) updateAPIServerMemberLoop() {
case <-ticker.C:
case <-s.checkMembershipCh:
}
if !s.IsServing() {
continue
}
members, err := s.GetClient().MemberList(ctx)
if err != nil {
log.Warn("failed to list members", errs.ZapError(err))
Expand Down Expand Up @@ -247,9 +251,16 @@ func (s *Server) campaignLeader() {

log.Info("triggering the primary callback functions")
for _, cb := range s.primaryCallbacks {
cb(ctx)
if err := cb(ctx); err != nil {
log.Error("failed to trigger the primary callback functions", errs.ZapError(err))
return
}
}

defer func() {
for _, cb := range s.primaryExitCallbacks {
cb()
}
}()
s.participant.EnableLeader()
log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name()))

Expand Down Expand Up @@ -283,10 +294,6 @@ func (s *Server) Close() {
utils.StopHTTPServer(s)
utils.StopGRPCServer(s)
s.GetListener().Close()
s.GetCoordinator().Stop()
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
s.serverLoopCancel()
s.serverLoopWg.Wait()

Expand All @@ -313,10 +320,15 @@ func (s *Server) IsClosed() bool {
}

// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) {
s.primaryCallbacks = append(s.primaryCallbacks, callbacks...)
}

// AddServiceExitCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) AddServiceExitCallback(callbacks ...func()) {
s.primaryExitCallbacks = append(s.primaryExitCallbacks, callbacks...)
}

// GetTLSConfig gets the security config.
func (s *Server) GetTLSConfig() *grpcutil.TLSConfig {
return &s.cfg.Security.TLSConfig
Expand Down Expand Up @@ -381,20 +393,10 @@ func (s *Server) startServer() (err error) {
ListenUrls: []string{s.cfg.AdvertiseListenAddr},
}
s.participant.InitInfo(p, endpoint.SchedulingSvcRootPath(s.clusterID), utils.PrimaryKey, "primary election")
s.basicCluster = core.NewBasicCluster()
err = s.startWatcher()
if err != nil {
return err
}
s.storage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(s.GetClient(), endpoint.PDRootPath(s.clusterID)), nil)
s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster)
s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh)
if err != nil {
return err
}

s.service = &Service{Server: s}
s.AddServiceReadyCallback(s.startCluster)
s.AddServiceExitCallback(s.stopCluster)
if err := s.InitListener(s.GetTLSConfig(), s.cfg.ListenAddr); err != nil {
return err
}
Expand All @@ -406,7 +408,6 @@ func (s *Server) startServer() (err error) {
go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.GetListener())
s.checkMembershipCh <- struct{}{}
<-serverReadyChan
go s.GetCoordinator().RunUntilStop()

// Run callbacks
log.Info("triggering the start callback functions")
Expand All @@ -429,6 +430,29 @@ func (s *Server) startServer() (err error) {
return nil
}

func (s *Server) startCluster(context.Context) error {
s.basicCluster = core.NewBasicCluster()
err := s.startWatcher()
if err != nil {
return err
}
s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster)
s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh)
if err != nil {
return err
}
go s.GetCoordinator().RunUntilStop()
return nil
}

func (s *Server) stopCluster() {
s.GetCoordinator().Stop()
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
}

func (s *Server) startWatcher() (err error) {
s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.basicCluster)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (s *Server) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error {

// AddServiceReadyCallback implements basicserver.
// It adds callbacks when it's ready for providing tso service.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) {
// Do nothing here. The primary of each keyspace group assigned to this host
// will respond to the requests accordingly.
}
Expand Down
4 changes: 2 additions & 2 deletions server/api/min_resolved_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type minResolvedTS struct {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /min-resolved-ts/{store_id} [get]
func (h *minResolvedTSHandler) GetStoreMinResolvedTS(w http.ResponseWriter, r *http.Request) {
c := h.svr.GetRaftCluster()
c := getCluster(r)
idStr := mux.Vars(r)["store_id"]
storeID, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
Expand Down Expand Up @@ -84,7 +84,7 @@ func (h *minResolvedTSHandler) GetStoreMinResolvedTS(w http.ResponseWriter, r *h
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /min-resolved-ts [get]
func (h *minResolvedTSHandler) GetMinResolvedTS(w http.ResponseWriter, r *http.Request) {
c := h.svr.GetRaftCluster()
c := getCluster(r)
scopeMinResolvedTS := c.GetMinResolvedTS()
persistInterval := c.GetPDServerConfig().MinResolvedTSPersistenceInterval

Expand Down
4 changes: 3 additions & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1600,7 +1600,6 @@ func (s *GrpcServer) ReportBatchSplit(ctx context.Context, request *pdpb.ReportB
if rc == nil {
return &pdpb.ReportBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil
}

_, err := rc.HandleBatchReportSplit(request)
if err != nil {
return &pdpb.ReportBatchSplitResponse{
Expand Down Expand Up @@ -2089,6 +2088,9 @@ func (s *GrpcServer) SplitAndScatterRegions(ctx context.Context, request *pdpb.S
return rsp.(*pdpb.SplitAndScatterRegionsResponse), err
}
rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.SplitAndScatterRegionsResponse{Header: s.notBootstrappedHeader()}, nil
}
splitFinishedPercentage, newRegionIDs := rc.GetRegionSplitter().SplitRegions(ctx, request.GetSplitKeys(), int(request.GetRetryLimit()))
scatterFinishedPercentage, err := scatterRegions(rc, newRegionIDs, request.GetGroup(), int(request.GetRetryLimit()), false)
if err != nil {
Expand Down
14 changes: 10 additions & 4 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,14 +961,20 @@ func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool, _

// SetStoreLimitScene sets the limit values for different scenes
func (h *Handler) SetStoreLimitScene(scene *storelimit.Scene, limitType storelimit.Type) {
cluster := h.s.GetRaftCluster()
cluster.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType)
rc := h.s.GetRaftCluster()
if rc == nil {
return
}
rc.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType)
}

// GetStoreLimitScene returns the limit values for different scenes
func (h *Handler) GetStoreLimitScene(limitType storelimit.Type) *storelimit.Scene {
cluster := h.s.GetRaftCluster()
return cluster.GetStoreLimiter().StoreLimitScene(limitType)
rc := h.s.GetRaftCluster()
if rc == nil {
return nil
}
return rc.GetStoreLimiter().StoreLimitScene(limitType)
}

// GetProgressByID returns the progress details for a given store ID.
Expand Down
Loading

0 comments on commit 61bc0ba

Please sign in to comment.