diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 2e4b8311583..a94c9e3032b 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -32,8 +32,7 @@ type Cluster struct { const regionLabelGCInterval = time.Hour // NewCluster creates a new cluster. -func NewCluster(ctx context.Context, cfg *config.Config, storage storage.Storage, basicCluster *core.BasicCluster, hbStreams *hbstream.HeartbeatStreams) (*Cluster, error) { - persistConfig := config.NewPersistConfig(cfg) +func NewCluster(ctx context.Context, persistConfig *config.PersistConfig, storage storage.Storage, basicCluster *core.BasicCluster, hbStreams *hbstream.HeartbeatStreams) (*Cluster, error) { labelerManager, err := labeler.NewRegionLabeler(ctx, storage, regionLabelGCInterval) if err != nil { return nil, err @@ -49,6 +48,10 @@ func NewCluster(ctx context.Context, cfg *config.Config, storage storage.Storage storage: storage, } c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) + err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels()) + if err != nil { + return nil, err + } return c, nil } diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 3591974d439..c84aee3e654 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -409,11 +409,15 @@ func (s *Server) startServer() (err error) { s.participant = member.NewParticipant(s.etcdClient) s.participant.InitInfo(uniqueName, uniqueID, path.Join(schedulingPrimaryPrefix, fmt.Sprintf("%05d", 0)), utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr) + err = s.startWatcher() + if err != nil { + return err + } s.storage = endpoint.NewStorageEndpoint( kv.NewEtcdKVBase(s.etcdClient, endpoint.PDRootPath(s.clusterID)), nil) basicCluster := core.NewBasicCluster() s.hbStreams = hbstream.NewHeartbeatStreams(s.ctx, s.clusterID, basicCluster) - s.cluster, err = NewCluster(s.ctx, s.cfg, s.storage, basicCluster, s.hbStreams) + s.cluster, err = NewCluster(s.ctx, s.persistConfig, s.storage, basicCluster, s.hbStreams) if err != nil { return err } @@ -436,10 +440,6 @@ func (s *Server) startServer() (err error) { if err != nil { return err } - err = s.startWatcher() - if err != nil { - return err - } go s.GetCoordinator().RunUntilStop() serverReadyChan := make(chan struct{})