Skip to content

Commit

Permalink
use memory storage
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Sep 8, 2023
1 parent 74ead5c commit 2d9f1ea
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 31 deletions.
2 changes: 1 addition & 1 deletion pkg/basicserver/basic_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ type Server interface {
// IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise.
IsServing() bool
// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
AddServiceReadyCallback(callbacks ...func(context.Context))
AddServiceReadyCallback(callbacks ...func(context.Context) error)
}
3 changes: 2 additions & 1 deletion pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (m *Manager) GetBasicServer() bs.Server {
}

// Init initializes the resource group manager.
func (m *Manager) Init(ctx context.Context) {
func (m *Manager) Init(ctx context.Context) error {
// Todo: If we can modify following configs in the future, we should reload these configs.
// Store the controller config into the storage.
m.storage.SaveControllerConfig(m.controllerConfig)
Expand Down Expand Up @@ -156,6 +156,7 @@ func (m *Manager) Init(ctx context.Context) {
m.persistLoop(ctx)
}()
log.Info("resource group manager finishes initialization")
return nil
}

// AddResourceGroup puts a resource group.
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type Server struct {
service *Service

// primaryCallbacks will be called after the server becomes leader.
primaryCallbacks []func(context.Context)
primaryCallbacks []func(context.Context) error

serviceRegister *discovery.ServiceRegister
}
Expand Down Expand Up @@ -232,7 +232,7 @@ func (s *Server) IsClosed() bool {
}

// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) {
s.primaryCallbacks = append(s.primaryCallbacks, callbacks...)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,12 @@ func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConf

// AllocID allocates a new ID.
func (c *Cluster) AllocID() (uint64, error) {
cli := c.apiServerLeader.Load().(pdpb.PDClient)
cli := c.apiServerLeader.Load()
if cli == nil {
c.checkMembershipCh <- struct{}{}
return 0, errors.New("API server leader is not found")
}
resp, err := cli.AllocID(c.ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}})
resp, err := cli.(pdpb.PDClient).AllocID(c.ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}})
if err != nil {
c.checkMembershipCh <- struct{}{}
return 0, err
Expand Down
13 changes: 13 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/mcs/utils"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/configutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
Expand Down Expand Up @@ -239,6 +240,18 @@ func (o *PersistConfig) SetScheduleConfig(cfg *sc.ScheduleConfig) {
o.schedule.Store(cfg)
}

// AdjustScheduleCfg adjusts the schedule config.
func (o *PersistConfig) AdjustScheduleCfg(scheduleCfg *sc.ScheduleConfig) {
// In case we add new default schedulers.
for _, ps := range sc.DefaultSchedulers {
if slice.NoneOf(scheduleCfg.Schedulers, func(i int) bool {
return scheduleCfg.Schedulers[i].Type == ps.Type
}) {
scheduleCfg.Schedulers = append(scheduleCfg.Schedulers, ps)
}
}
}

// GetReplicationConfig returns replication configurations.
func (o *PersistConfig) GetReplicationConfig() *sc.ReplicationConfig {
return o.replication.Load().(*sc.ReplicationConfig)
Expand Down
1 change: 1 addition & 0 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (cw *Watcher) initializeConfigWatcher() error {
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
cw.AdjustScheduleCfg(&cfg.Schedule)
cw.SetClusterVersion(&cfg.ClusterVersion)
cw.SetScheduleConfig(&cfg.Schedule)
cw.SetReplicationConfig(&cfg.Replication)
Expand Down
66 changes: 45 additions & 21 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ type Server struct {
checkMembershipCh chan struct{}

// primaryCallbacks will be called after the server becomes leader.
primaryCallbacks []func(context.Context)
primaryCallbacks []func(context.Context) error
primaryExitCallbacks []func()

// for service registry
serviceID *discovery.ServiceRegistryEntry
Expand Down Expand Up @@ -163,6 +164,9 @@ func (s *Server) updateAPIServerMemberLoop() {
case <-ticker.C:
case <-s.checkMembershipCh:
}
if !s.IsServing() {
continue
}
members, err := s.GetClient().MemberList(ctx)
if err != nil {
log.Warn("failed to list members", errs.ZapError(err))
Expand Down Expand Up @@ -243,9 +247,16 @@ func (s *Server) campaignLeader() {

log.Info("triggering the primary callback functions")
for _, cb := range s.primaryCallbacks {
cb(ctx)
if err := cb(ctx); err != nil {
log.Error("failed to trigger the primary callback functions", errs.ZapError(err))
return
}
}

defer func() {
for _, cb := range s.primaryExitCallbacks {
cb()
}
}()
s.participant.EnableLeader()
log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name()))

Expand Down Expand Up @@ -279,10 +290,6 @@ func (s *Server) Close() {
utils.StopHTTPServer(s)
utils.StopGRPCServer(s)
s.GetListener().Close()
s.GetCoordinator().Stop()
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
s.serverLoopCancel()
s.serverLoopWg.Wait()

Expand All @@ -309,10 +316,15 @@ func (s *Server) IsClosed() bool {
}

// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) {
s.primaryCallbacks = append(s.primaryCallbacks, callbacks...)
}

// AddServiceExitCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) AddServiceExitCallback(callbacks ...func()) {
s.primaryExitCallbacks = append(s.primaryExitCallbacks, callbacks...)
}

// GetTLSConfig gets the security config.
func (s *Server) GetTLSConfig() *grpcutil.TLSConfig {
return &s.cfg.Security.TLSConfig
Expand Down Expand Up @@ -377,20 +389,10 @@ func (s *Server) startServer() (err error) {
ListenUrls: []string{s.cfg.AdvertiseListenAddr},
}
s.participant.InitInfo(p, endpoint.SchedulingSvcRootPath(s.clusterID), utils.PrimaryKey, "primary election")
s.basicCluster = core.NewBasicCluster()
err = s.startWatcher()
if err != nil {
return err
}
s.storage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(s.GetClient(), endpoint.PDRootPath(s.clusterID)), nil)
s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster)
s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh)
if err != nil {
return err
}

s.service = &Service{Server: s}
s.AddServiceReadyCallback(s.startCluster)
s.AddServiceExitCallback(s.stopCluster)
if err := s.InitListener(s.GetTLSConfig(), s.cfg.ListenAddr); err != nil {
return err
}
Expand All @@ -402,7 +404,6 @@ func (s *Server) startServer() (err error) {
go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.GetListener())
s.checkMembershipCh <- struct{}{}
<-serverReadyChan
go s.GetCoordinator().RunUntilStop()

// Run callbacks
log.Info("triggering the start callback functions")
Expand All @@ -425,6 +426,29 @@ func (s *Server) startServer() (err error) {
return nil
}

func (s *Server) startCluster(context.Context) error {
s.basicCluster = core.NewBasicCluster()
err := s.startWatcher()
if err != nil {
return err
}
s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster)
s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh)
if err != nil {
return err
}
go s.GetCoordinator().RunUntilStop()
return nil
}

func (s *Server) stopCluster() {
s.GetCoordinator().Stop()
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
}

func (s *Server) startWatcher() (err error) {
s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.basicCluster)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (s *Server) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error {

// AddServiceReadyCallback implements basicserver.
// It adds callbacks when it's ready for providing tso service.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) {
// Do nothing here. The primary of each keyspace group assigned to this host
// will respond to the requests accordingly.
}
Expand Down
4 changes: 2 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ type Server struct {
// startCallbacks will be called after the server is started.
startCallbacks []func()
// leaderCallbacks will be called after the server becomes leader.
leaderCallbacks []func(context.Context)
leaderCallbacks []func(context.Context) error
// closeCallbacks will be called before the server is closed.
closeCallbacks []func()

Expand Down Expand Up @@ -1547,7 +1547,7 @@ func (s *Server) IsServing() bool {
}

// AddServiceReadyCallback adds callbacks when the server becomes the leader if there is embedded etcd, or the primary otherwise.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) {
s.leaderCallbacks = append(s.leaderCallbacks, callbacks...)
}

Expand Down
22 changes: 21 additions & 1 deletion tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,16 @@ func (suite *serverTestSuite) TearDownSuite() {

func (suite *serverTestSuite) TestAllocID() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember", `return(true)`))
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
time.Sleep(200 * time.Millisecond)
id, err := tc.GetPrimaryServer().GetCluster().AllocID()
re.NoError(err)
re.NotEqual(uint64(0), id)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember"))
}

func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() {
Expand All @@ -83,15 +86,32 @@ func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() {
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
time.Sleep(200 * time.Millisecond)
cluster := tc.GetPrimaryServer().GetCluster()
id, err := cluster.AllocID()
re.NoError(err)
re.NotEqual(uint64(0), id)
suite.cluster.ResignLeader()
suite.cluster.WaitLeader()
time.Sleep(time.Second)
time.Sleep(200 * time.Millisecond)
id1, err := cluster.AllocID()
re.NoError(err)
re.Greater(id1, id)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember"))
}

func (suite *serverTestSuite) TestPrimaryChange() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
primary := tc.GetPrimaryServer()
addr := primary.GetAddr()
re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5)
primary.Close()
tc.WaitForPrimaryServing(re)
primary = tc.GetPrimaryServer()
re.NotEqual(addr, primary.GetAddr())
re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5)
}

0 comments on commit 2d9f1ea

Please sign in to comment.