Skip to content

Commit

Permalink
refactor ms server
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Aug 30, 2023
1 parent 9a574ed commit 50ea886
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 406 deletions.
2 changes: 1 addition & 1 deletion pkg/basicserver/basic_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

// Server defines the common basic behaviors of a server
type Server interface {
// Name returns the unique Name for this server in the cluster.
// Name returns the unique name for this server in the cluster.
Name() string
// GetAddr returns the address of the server.
GetAddr() string
Expand Down
154 changes: 23 additions & 131 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@ package server

import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"path"
Expand All @@ -36,8 +33,10 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/sysutil"
"github.com/spf13/cobra"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
Expand All @@ -47,60 +46,40 @@ import (
"github.com/tikv/pd/pkg/utils/memberutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"github.com/tikv/pd/pkg/versioninfo"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"google.golang.org/grpc"
)

var _ bs.Server = (*Server)(nil)

// Server is the resource manager server, and it implements bs.Server.
type Server struct {
*server.BaseServer
diagnosticspb.DiagnosticsServer
// Server state. 0 is not running, 1 is running.
isRunning int64
// Server start timestamp
startTimestamp int64

ctx context.Context
serverLoopCtx context.Context
serverLoopCancel func()
serverLoopWg sync.WaitGroup

cfg *Config
clusterID uint64
name string
listenURL *url.URL

// for the primary election of resource manager
participant *member.Participant
etcdClient *clientv3.Client
httpClient *http.Client

secure bool
muxListener net.Listener
grpcServer *grpc.Server
httpServer *http.Server
service *Service

// Store as map[string]*grpc.ClientConn
clientConns sync.Map
service *Service

// Callback functions for different stages
// startCallbacks will be called after the server is started.
startCallbacks []func()
// primaryCallbacks will be called after the server becomes leader.
primaryCallbacks []func(context.Context)

serviceRegister *discovery.ServiceRegister
}

// Name returns the unique etcd name for this server in etcd cluster.
// Name returns the unique name for this server in the resource manager cluster.
func (s *Server) Name() string {
return s.name
}

// Context returns the context.
func (s *Server) Context() context.Context {
return s.ctx
return s.cfg.Name
}

// GetAddr returns the server address.
Expand All @@ -126,7 +105,7 @@ func (s *Server) Run() (err error) {
}

func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context())
s.serverLoopWg.Add(1)
go s.primaryElectionLoop()
}
Expand Down Expand Up @@ -221,18 +200,18 @@ func (s *Server) Close() {
s.serviceRegister.Deregister()
utils.StopHTTPServer(s)
utils.StopGRPCServer(s)
s.muxListener.Close()
s.GetListener().Close()
s.serverLoopCancel()
s.serverLoopWg.Wait()

if s.etcdClient != nil {
if err := s.etcdClient.Close(); err != nil {
if s.GetClient() != nil {
if err := s.GetClient().Close(); err != nil {
log.Error("close etcd client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err))
}
}

if s.httpClient != nil {
s.httpClient.CloseIdleConnections()
if s.GetHTTPClient() != nil {
s.GetHTTPClient().CloseIdleConnections()
}

log.Info("resource manager server is closed")
Expand All @@ -243,21 +222,6 @@ func (s *Server) GetControllerConfig() *ControllerConfig {
return &s.cfg.Controller
}

// GetClient returns builtin etcd client.
func (s *Server) GetClient() *clientv3.Client {
return s.etcdClient
}

// GetHTTPClient returns builtin http client.
func (s *Server) GetHTTPClient() *http.Client {
return s.httpClient
}

// AddStartCallback adds a callback in the startServer phase.
func (s *Server) AddStartCallback(callbacks ...func()) {
s.startCallbacks = append(s.startCallbacks, callbacks...)
}

// IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) IsServing() bool {
return !s.IsClosed() && s.participant.IsLeader()
Expand All @@ -268,11 +232,6 @@ func (s *Server) IsClosed() bool {
return s != nil && atomic.LoadInt64(&s.isRunning) == 0
}

// IsSecure checks if the server enable TLS.
func (s *Server) IsSecure() bool {
return s.secure
}

// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
s.primaryCallbacks = append(s.primaryCallbacks, callbacks...)
Expand All @@ -283,11 +242,6 @@ func (s *Server) GetBackendEndpoints() string {
return s.cfg.BackendEndpoints
}

// GetClientConns returns the client connections.
func (s *Server) GetClientConns() *sync.Map {
return &s.clientConns
}

// ServerLoopWgDone decreases the server loop wait group.
func (s *Server) ServerLoopWgDone() {
s.serverLoopWg.Done()
Expand All @@ -298,76 +252,28 @@ func (s *Server) ServerLoopWgAdd(n int) {
s.serverLoopWg.Add(n)
}

// GetHTTPServer returns the http server.
func (s *Server) GetHTTPServer() *http.Server {
return s.httpServer
}

// SetHTTPServer sets the http server.
func (s *Server) SetHTTPServer(httpServer *http.Server) {
s.httpServer = httpServer
}

// SetUpRestHandler sets up the REST handler.
func (s *Server) SetUpRestHandler() (http.Handler, apiutil.APIServiceGroup) {
return SetUpRestHandler(s.service)
}

// GetGRPCServer returns the grpc server.
func (s *Server) GetGRPCServer() *grpc.Server {
return s.grpcServer
}

// SetGRPCServer sets the grpc server.
func (s *Server) SetGRPCServer(grpcServer *grpc.Server) {
s.grpcServer = grpcServer
}

// RegisterGRPCService registers the grpc service.
func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) {
s.service.RegisterGRPCService(grpcServer)
}

// SetETCDClient sets the etcd client.
func (s *Server) SetETCDClient(etcdClient *clientv3.Client) {
s.etcdClient = etcdClient
}

// SetHTTPClient sets the http client.
func (s *Server) SetHTTPClient(httpClient *http.Client) {
s.httpClient = httpClient
}

// GetTLSConfig gets the security config.
func (s *Server) GetTLSConfig() *grpcutil.TLSConfig {
return &s.cfg.Security.TLSConfig
}

// GetDelegateClient returns grpc client connection talking to the forwarded host
func (s *Server) GetDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error) {
client, ok := s.clientConns.Load(forwardedHost)
if !ok {
tlsConfig, err := s.GetTLSConfig().ToTLSConfig()
if err != nil {
return nil, err
}
cc, err := grpcutil.GetClientConn(ctx, forwardedHost, tlsConfig)
if err != nil {
return nil, err
}
client = cc
s.clientConns.Store(forwardedHost, cc)
}
return client.(*grpc.ClientConn), nil
}

// GetLeaderListenUrls gets service endpoints from the leader in election group.
func (s *Server) GetLeaderListenUrls() []string {
return s.participant.GetLeaderListenUrls()
}

func (s *Server) startServer() (err error) {
if s.clusterID, err = utils.InitClusterID(s.ctx, s.etcdClient); err != nil {
if s.clusterID, err = utils.InitClusterID(s.Context(), s.GetClient()); err != nil {
return err
}
log.Info("init cluster id", zap.Uint64("cluster-id", s.clusterID))
Expand All @@ -379,42 +285,29 @@ func (s *Server) startServer() (err error) {
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
resourceManagerPrimaryPrefix := endpoint.ResourceManagerSvcRootPath(s.clusterID)
s.participant = member.NewParticipant(s.etcdClient)
s.participant = member.NewParticipant(s.GetClient())
s.participant.InitInfo(uniqueName, uniqueID, path.Join(resourceManagerPrimaryPrefix, fmt.Sprintf("%05d", 0)),
utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr)
s.listenURL, err = url.Parse(s.cfg.ListenAddr)
if err != nil {
return err
}

s.service = &Service{
ctx: s.ctx,
ctx: s.Context(),
manager: NewManager[*Server](s),
}

tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
return err
}
if tlsConfig != nil {
s.secure = true
s.muxListener, err = tls.Listen(utils.TCPNetworkStr, s.listenURL.Host, tlsConfig)
} else {
s.muxListener, err = net.Listen(utils.TCPNetworkStr, s.listenURL.Host)
}
if err != nil {
if err := s.InitListener(s.GetTLSConfig(), s.cfg.ListenAddr); err != nil {
return err
}

serverReadyChan := make(chan struct{})
defer close(serverReadyChan)
s.serverLoopWg.Add(1)
go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.muxListener)
go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.GetListener())
<-serverReadyChan
s.startServerLoop()

// Run callbacks
log.Info("triggering the start callback functions")
for _, cb := range s.startCallbacks {
for _, cb := range s.GetStartCallbacks() {
cb()
}

Expand All @@ -424,7 +317,7 @@ func (s *Server) startServer() (err error) {
if err != nil {
return err
}
s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, strconv.FormatUint(s.clusterID, 10),
s.serviceRegister = discovery.NewServiceRegister(s.Context(), s.GetClient(), strconv.FormatUint(s.clusterID, 10),
utils.ResourceManagerServiceName, s.cfg.AdvertiseListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds)
if err := s.serviceRegister.Register(); err != nil {
log.Error("failed to register the service", zap.String("service-name", utils.ResourceManagerServiceName), errs.ZapError(err))
Expand All @@ -437,10 +330,9 @@ func (s *Server) startServer() (err error) {
// CreateServer creates the Server
func CreateServer(ctx context.Context, cfg *Config) *Server {
svr := &Server{
BaseServer: server.NewBaseServer(ctx),
DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename),
startTimestamp: time.Now().Unix(),
cfg: cfg,
ctx: ctx,
}
return svr
}
Expand Down
Loading

0 comments on commit 50ea886

Please sign in to comment.