Skip to content

Commit

Permalink
support alloc ID for scheduling server
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Aug 24, 2023
1 parent ebceb83 commit 8229a7e
Show file tree
Hide file tree
Showing 8 changed files with 382 additions and 26 deletions.
56 changes: 41 additions & 15 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package server

import (
"context"
"errors"
"sync/atomic"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
sc "github.com/tikv/pd/pkg/schedule/config"
Expand All @@ -18,17 +21,20 @@ import (
// Cluster is used to manage all information for scheduling purpose.
type Cluster struct {
*core.BasicCluster
ruleManager *placement.RuleManager
labelerManager *labeler.RegionLabeler
persistConfig *config.PersistConfig
hotStat *statistics.HotStat
storage storage.Storage
ruleManager *placement.RuleManager
labelerManager *labeler.RegionLabeler
persistConfig *config.PersistConfig
hotStat *statistics.HotStat
storage storage.Storage
checkMembershipCh chan struct{}
apiServerLeader atomic.Value
clusterID uint64
}

const regionLabelGCInterval = time.Hour

// NewCluster creates a new cluster.
func NewCluster(ctx context.Context, storage storage.Storage, cfg *config.Config) (*Cluster, error) {
func NewCluster(ctx context.Context, cfg *config.Config, storage storage.Storage, clusterID uint64, checkMembershipCh chan struct{}) (*Cluster, error) {
basicCluster := core.NewBasicCluster()
persistConfig := config.NewPersistConfig(cfg)
labelerManager, err := labeler.NewRegionLabeler(ctx, storage, regionLabelGCInterval)
Expand All @@ -37,12 +43,14 @@ func NewCluster(ctx context.Context, storage storage.Storage, cfg *config.Config
}

return &Cluster{
BasicCluster: basicCluster,
ruleManager: placement.NewRuleManager(storage, basicCluster, persistConfig),
labelerManager: labelerManager,
persistConfig: persistConfig,
hotStat: statistics.NewHotStat(ctx),
storage: storage,
BasicCluster: basicCluster,
ruleManager: placement.NewRuleManager(storage, basicCluster, persistConfig),
labelerManager: labelerManager,
persistConfig: persistConfig,
hotStat: statistics.NewHotStat(ctx),
storage: storage,
clusterID: clusterID,
checkMembershipCh: checkMembershipCh,
}, nil
}

Expand Down Expand Up @@ -117,11 +125,29 @@ func (c *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider { return c.per
// GetStoreConfig returns the store config.
func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConfig }

// AllocID allocates a new ID.
func (c *Cluster) AllocID() (uint64, error) {
cli := c.apiServerLeader.Load().(pdpb.PDClient)
if cli == nil {
c.checkMembershipCh <- struct{}{}
return 0, errors.New("API server leader is not found")
}
resp, err := cli.AllocID(context.Background(), &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}})
if err != nil {
c.checkMembershipCh <- struct{}{}
return 0, err
}
return resp.GetId(), nil
}

// SwitchAPIServerLeader switches the API server leader.
func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool {
old := c.apiServerLeader.Load()
return c.apiServerLeader.CompareAndSwap(old, new)
}

// TODO: implement the following methods

// UpdateRegionsLabelLevelStats updates the status of the region label level by types.
func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {
}

// AllocID allocates a new ID.
func (c *Cluster) AllocID() (uint64, error) { return 0, nil }
5 changes: 5 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ func (c *Config) adjustLog(meta *configutil.ConfigMetaData) {
}
}

// GetListenAddr returns the ListenAddr
func (c *Config) GetListenAddr() string {
return c.ListenAddr
}

// GetTLSConfig returns the TLS config.
func (c *Config) GetTLSConfig() *grpcutil.TLSConfig {
return &c.Security.TLSConfig
Expand Down
71 changes: 65 additions & 6 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pingcap/sysutil"
"github.com/soheilhy/cmux"
Expand All @@ -60,6 +61,8 @@ import (
"google.golang.org/grpc"
)

const memberUpdateInterval = time.Minute

// Server is the scheduling server, and it implements bs.Server.
type Server struct {
diagnosticspb.DiagnosticsServer
Expand Down Expand Up @@ -87,7 +90,8 @@ type Server struct {
httpClient *http.Client

// Store as map[string]*grpc.ClientConn
clientConns sync.Map
clientConns sync.Map
checkMembershipCh chan struct{}

// for the primary election of scheduling
participant *member.Participant
Expand Down Expand Up @@ -165,8 +169,53 @@ func (s *Server) Run() error {

func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
s.serverLoopWg.Add(1)
s.serverLoopWg.Add(2)
go s.primaryElectionLoop()
go s.updateMemberLoop()
}

func (s *Server) updateMemberLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
ticker := time.NewTicker(memberUpdateInterval)
failpoint.Inject("fastUpdateMember", func() {
ticker.Stop()
ticker = time.NewTicker(100 * time.Millisecond)
})
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("server is closed, exit update member loop")
return
case <-ticker.C:
case <-s.checkMembershipCh:
}
members, err := s.etcdClient.MemberList(ctx)
if err != nil {
log.Warn("failed to list members", errs.ZapError(err))
}
for _, ep := range members.Members {
status, err := s.etcdClient.Status(ctx, ep.ClientURLs[0])
if err != nil {
log.Info("failed to get status of member", zap.String("member-id", fmt.Sprintf("%x", ep.ID)), zap.String("endpoint", ep.ClientURLs[0]), errs.ZapError(err))
continue
}
if status.Leader == ep.ID {
cc, err := s.GetDelegateClient(ctx, ep.ClientURLs[0])
if err != nil {
log.Info("failed to get delegate client", errs.ZapError(err))
}
if s.cluster.SwitchAPIServerLeader(pdpb.NewPDClient(cc)) {
log.Info("switch leader", zap.String("leader-id", fmt.Sprintf("%x", ep.ID)), zap.String("endpoint", ep.ClientURLs[0]))
break
}
}
}
}
}

func (s *Server) primaryElectionLoop() {
Expand All @@ -176,7 +225,7 @@ func (s *Server) primaryElectionLoop() {
for {
select {
case <-s.serverLoopCtx.Done():
log.Info("server is closed, exit resource manager primary election loop")
log.Info("server is closed, exit primary election loop")
return
default:
}
Expand Down Expand Up @@ -260,6 +309,8 @@ func (s *Server) Close() {
s.stopHTTPServer()
s.stopGRPCServer()
s.muxListener.Close()
s.ruleWatcher.Close()
s.configWatcher.Close()
s.serverLoopCancel()
s.serverLoopWg.Wait()

Expand Down Expand Up @@ -333,6 +384,11 @@ func (s *Server) GetCoordinator() *schedule.Coordinator {
return s.coordinator
}

// GetCluster returns the cluster.
func (s *Server) GetCluster() *Cluster {
return s.cluster
}

func (s *Server) initClient() error {
tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
Expand Down Expand Up @@ -495,7 +551,7 @@ func (s *Server) startServer() (err error) {
utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr)
s.storage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(s.etcdClient, endpoint.PDRootPath(s.clusterID)), nil)
s.cluster, err = NewCluster(s.ctx, s.storage, s.cfg)
s.cluster, err = NewCluster(s.ctx, s.cfg, s.storage, s.clusterID, s.checkMembershipCh)
if err != nil {
return err
}
Expand Down Expand Up @@ -524,13 +580,15 @@ func (s *Server) startServer() (err error) {
if err != nil {
return err
}
go s.coordinator.RunUntilStop()

serverReadyChan := make(chan struct{})
defer close(serverReadyChan)
s.serverLoopWg.Add(1)
go s.startGRPCAndHTTPServers(serverReadyChan, s.muxListener)
<-serverReadyChan
s.checkMembershipCh <- struct{}{}
s.startServerLoop()
<-serverReadyChan
go s.coordinator.RunUntilStop()

// Run callbacks
log.Info("triggering the start callback functions")
Expand Down Expand Up @@ -574,6 +632,7 @@ func CreateServer(ctx context.Context, cfg *config.Config) *Server {
cfg: cfg,
persistConfig: config.NewPersistConfig(cfg),
ctx: ctx,
checkMembershipCh: make(chan struct{}, 1),
}
return svr
}
Expand Down
97 changes: 97 additions & 0 deletions tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package scheduling

import (
"context"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/tests"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, testutil.LeakOptions...)
}

type serverTestSuite struct {
suite.Suite
ctx context.Context
cancel context.CancelFunc
cluster *tests.TestCluster
pdLeader *tests.TestServer
backendEndpoints string
}

func TestServerTestSuite(t *testing.T) {
suite.Run(t, new(serverTestSuite))
}

func (suite *serverTestSuite) SetupSuite() {
var err error
re := suite.Require()

suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 3)
re.NoError(err)

err = suite.cluster.RunInitialServers()
re.NoError(err)

leaderName := suite.cluster.WaitLeader()
suite.pdLeader = suite.cluster.GetServer(leaderName)
suite.backendEndpoints = suite.pdLeader.GetAddr()
suite.NoError(suite.pdLeader.BootstrapCluster())
}

func (suite *serverTestSuite) TearDownSuite() {
suite.cluster.Destroy()
suite.cancel()
}

func (suite *serverTestSuite) TestAllocID() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
id, err := tc.GetPrimaryServer().GetCluster().AllocID()
re.NoError(err)
re.NotEqual(uint64(0), id)
}

func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember", `return(true)`))
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
cluster := tc.GetPrimaryServer().GetCluster()
id, err := cluster.AllocID()
re.NoError(err)
re.NotEqual(uint64(0), id)
suite.cluster.ResignLeader()
suite.cluster.WaitLeader()
time.Sleep(time.Second)
id1, err := cluster.AllocID()
re.NoError(err)
re.Greater(id1, id)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember"))
}
2 changes: 1 addition & 1 deletion tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (suite *tsoServerTestSuite) TestParticipantStartWithAdvertiseListenAddr() {
re.NoError(err)

// Setup the logger.
err = tests.InitLogger(cfg)
err = tests.InitTSOLogger(cfg)
re.NoError(err)

s, cleanup, err := tests.NewTSOTestServer(suite.ctx, cfg)
Expand Down
Loading

0 comments on commit 8229a7e

Please sign in to comment.