Skip to content

Commit

Permalink
init rule manager
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Aug 29, 2023
1 parent 425a094 commit 9296f96
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
7 changes: 5 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ type Cluster struct {
const regionLabelGCInterval = time.Hour

// NewCluster creates a new cluster.
func NewCluster(ctx context.Context, cfg *config.Config, storage storage.Storage, basicCluster *core.BasicCluster, hbStreams *hbstream.HeartbeatStreams) (*Cluster, error) {
persistConfig := config.NewPersistConfig(cfg)
func NewCluster(ctx context.Context, persistConfig *config.PersistConfig, storage storage.Storage, basicCluster *core.BasicCluster, hbStreams *hbstream.HeartbeatStreams) (*Cluster, error) {
labelerManager, err := labeler.NewRegionLabeler(ctx, storage, regionLabelGCInterval)
if err != nil {
return nil, err
Expand All @@ -49,6 +48,10 @@ func NewCluster(ctx context.Context, cfg *config.Config, storage storage.Storage
storage: storage,
}
c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams)
err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels())
if err != nil {
return nil, err
}
return c, nil
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,11 +409,15 @@ func (s *Server) startServer() (err error) {
s.participant = member.NewParticipant(s.etcdClient)
s.participant.InitInfo(uniqueName, uniqueID, path.Join(schedulingPrimaryPrefix, fmt.Sprintf("%05d", 0)),
utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr)
err = s.startWatcher()
if err != nil {
return err
}
s.storage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(s.etcdClient, endpoint.PDRootPath(s.clusterID)), nil)
basicCluster := core.NewBasicCluster()
s.hbStreams = hbstream.NewHeartbeatStreams(s.ctx, s.clusterID, basicCluster)
s.cluster, err = NewCluster(s.ctx, s.cfg, s.storage, basicCluster, s.hbStreams)
s.cluster, err = NewCluster(s.ctx, s.persistConfig, s.storage, basicCluster, s.hbStreams)
if err != nil {
return err
}
Expand All @@ -436,10 +440,6 @@ func (s *Server) startServer() (err error) {
if err != nil {
return err
}
err = s.startWatcher()
if err != nil {
return err
}

go s.GetCoordinator().RunUntilStop()
serverReadyChan := make(chan struct{})
Expand Down

0 comments on commit 9296f96

Please sign in to comment.