Skip to content

Commit

Permalink
remove api mode
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Nov 6, 2024
1 parent b27f021 commit f63a4a1
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 109 deletions.
7 changes: 1 addition & 6 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,7 @@ func start(cmd *cobra.Command, args []string, services ...string) {
// Flushing any buffered log entries
defer log.Sync()
memory.InitMemoryHook()
if len(services) != 0 {
versioninfo.Log(server.APIServiceMode)
} else {
versioninfo.Log(server.PDMode)
}

versioninfo.Log("PD")
for _, msg := range cfg.WarningMsgs {
log.Warn(msg)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string,
}

func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, string) {
if !h.s.IsAPIServiceMode() {
if !h.s.IsServiceIndependent(constant.TSOServiceName) && !h.s.IsServiceIndependent(constant.SchedulingServiceName) {
return false, ""
}
if len(h.microserviceRedirectRules) == 0 {
Expand Down Expand Up @@ -143,7 +143,8 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri
addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName)
if !ok || addr == "" {
log.Warn("failed to get the service primary addr when trying to match redirect rules",
zap.String("path", r.URL.Path))
zap.String("path", r.URL.Path), zap.String("addr", addr),
zap.String("target", rule.targetServiceName))
return true, ""
}
// If the URL contains escaped characters, use RawPath instead of Path
Expand Down
9 changes: 3 additions & 6 deletions server/api/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,9 @@ func getMembers(svr *server.Server) (*pdpb.GetMembersResponse, error) {
if members.GetHeader().GetError() != nil {
return nil, errors.WithStack(errors.New(members.GetHeader().GetError().String()))
}
dclocationDistribution := make(map[string][]uint64)
if !svr.IsAPIServiceMode() {
dclocationDistribution, err = svr.GetTSOAllocatorManager().GetClusterDCLocationsFromEtcd()
if err != nil {
return nil, errors.WithStack(err)
}
dclocationDistribution, err := svr.GetTSOAllocatorManager().GetClusterDCLocationsFromEtcd()
if err != nil {
return nil, errors.WithStack(err)
}
for _, m := range members.GetMembers() {
var e error
Expand Down
10 changes: 0 additions & 10 deletions server/apiv2/handlers/micro_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ func RegisterMicroService(r *gin.RouterGroup) {
// @Router /ms/members/{service} [get]
func GetMembers(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
if !svr.IsAPIServiceMode() {
c.AbortWithStatusJSON(http.StatusNotFound, "not support micro service")
return
}

if service := c.Param("service"); len(service) > 0 {
entries, err := discovery.GetMSMembers(service, svr.GetClient())
if err != nil {
Expand All @@ -64,11 +59,6 @@ func GetMembers(c *gin.Context) {
// @Router /ms/primary/{service} [get]
func GetPrimary(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
if !svr.IsAPIServiceMode() {
c.AbortWithStatusJSON(http.StatusNotFound, "not support micro service")
return
}

if service := c.Param("service"); len(service) > 0 {
addr, _ := svr.GetServicePrimaryAddr(c.Request.Context(), service)
c.IndentedJSON(http.StatusOK, addr)
Expand Down
82 changes: 37 additions & 45 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type Server interface {
GetMembers() ([]*pdpb.Member, error)
ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error
GetKeyspaceGroupManager() *keyspace.GroupManager
IsAPIServiceMode() bool
IsKeyspaceEnabled() bool
GetSafePointV2Manager() *gc.SafePointV2Manager
}

Expand All @@ -153,12 +153,12 @@ type RaftCluster struct {
etcdClient *clientv3.Client
httpClient *http.Client

running bool
isAPIServiceMode bool
meta *metapb.Cluster
storage storage.Storage
minResolvedTS uint64
externalTS uint64
running bool
isKeyspaceEnabled bool
meta *metapb.Cluster
storage storage.Storage
minResolvedTS uint64
externalTS uint64

// Keep the previous store limit settings when removing a store.
prevStoreLimit map[uint64]map[storelimit.Type]float64
Expand Down Expand Up @@ -323,7 +323,7 @@ func (c *RaftCluster) Start(s Server) error {
log.Warn("raft cluster has already been started")
return nil
}
c.isAPIServiceMode = s.IsAPIServiceMode()
c.isKeyspaceEnabled = s.IsKeyspaceEnabled()
err := c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
if err != nil {
return err
Expand Down Expand Up @@ -359,7 +359,7 @@ func (c *RaftCluster) Start(s Server) error {
log.Error("load external timestamp meets error", zap.Error(err))
}

if c.isAPIServiceMode {
if c.isKeyspaceEnabled {
// bootstrap keyspace group manager after starting other parts successfully.
// This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster.
err = c.keyspaceGroupManager.Bootstrap(c.ctx)
Expand Down Expand Up @@ -387,51 +387,43 @@ func (c *RaftCluster) Start(s Server) error {
}

func (c *RaftCluster) checkSchedulingService() {
if c.isAPIServiceMode {
servers, err := discovery.Discover(c.etcdClient, constant.SchedulingServiceName)
if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) {
c.startSchedulingJobs(c, c.hbstreams)
c.UnsetServiceIndependent(constant.SchedulingServiceName)
} else {
if c.stopSchedulingJobs() || c.coordinator == nil {
c.initCoordinator(c.ctx, c, c.hbstreams)
}
if !c.IsServiceIndependent(constant.SchedulingServiceName) {
c.SetServiceIndependent(constant.SchedulingServiceName)
}
}
} else {
servers, err := discovery.Discover(c.etcdClient, constant.SchedulingServiceName)
if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) {
c.startSchedulingJobs(c, c.hbstreams)
c.UnsetServiceIndependent(constant.SchedulingServiceName)
} else {
if c.stopSchedulingJobs() || c.coordinator == nil {
c.initCoordinator(c.ctx, c, c.hbstreams)
}
if !c.IsServiceIndependent(constant.SchedulingServiceName) {
c.SetServiceIndependent(constant.SchedulingServiceName)
}
}
}

// checkTSOService checks the TSO service.
func (c *RaftCluster) checkTSOService() {
if c.isAPIServiceMode {
if c.opt.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() {
servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName)
if err != nil || len(servers) == 0 {
if err := c.startTSOJobsIfNeeded(); err != nil {
log.Error("failed to start TSO jobs", errs.ZapError(err))
return
}
if c.IsServiceIndependent(constant.TSOServiceName) {
log.Info("TSO is provided by PD")
c.UnsetServiceIndependent(constant.TSOServiceName)
}
} else {
if err := c.stopTSOJobsIfNeeded(); err != nil {
log.Error("failed to stop TSO jobs", errs.ZapError(err))
return
}
if !c.IsServiceIndependent(constant.TSOServiceName) {
log.Info("TSO is provided by TSO server")
c.SetServiceIndependent(constant.TSOServiceName)
}
if c.opt.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() {
servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName)
if err != nil || len(servers) == 0 {
if err := c.startTSOJobsIfNeeded(); err != nil {
log.Error("failed to start TSO jobs", errs.ZapError(err))
return
}
if c.IsServiceIndependent(constant.TSOServiceName) {
log.Info("TSO is provided by PD")
c.UnsetServiceIndependent(constant.TSOServiceName)
}
} else {
if err := c.stopTSOJobsIfNeeded(); err != nil {
log.Error("failed to stop TSO jobs", errs.ZapError(err))
return
}
if !c.IsServiceIndependent(constant.TSOServiceName) {
log.Info("TSO is provided by TSO server")
c.SetServiceIndependent(constant.TSOServiceName)
}
}
return
}

if err := c.startTSOJobsIfNeeded(); err != nil {
Expand Down
68 changes: 29 additions & 39 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,8 @@ const (

recoveringMarkPath = "cluster/markers/snapshot-recovering"

// PDMode represents that server is in PD mode.
PDMode = "PD"
// APIServiceMode represents that server is in API service mode.
APIServiceMode = "API Service"
// PD is the default service name.
PD = "PD"

// maxRetryTimesGetServicePrimary is the max retry times for getting primary addr.
// Note: it need to be less than client.defaultPDTimeout
Expand Down Expand Up @@ -227,7 +225,7 @@ type Server struct {
auditBackends []audit.Backend

registry *registry.ServiceRegistry
mode string
isKeyspaceEnabled bool
servicePrimaryMap sync.Map /* Store as map[string]string */
tsoPrimaryWatcher *etcdutil.LoopWatcher
schedulingPrimaryWatcher *etcdutil.LoopWatcher
Expand All @@ -241,13 +239,13 @@ type HandlerBuilder func(context.Context, *Server) (http.Handler, apiutil.APISer

// CreateServer creates the UNINITIALIZED pd server with given configuration.
func CreateServer(ctx context.Context, cfg *config.Config, services []string, legacyServiceBuilders ...HandlerBuilder) (*Server, error) {
var mode string
var isKeyspaceEnabled bool
if len(services) != 0 {
mode = APIServiceMode
isKeyspaceEnabled = true
} else {
mode = PDMode
isKeyspaceEnabled = false
}
log.Info(fmt.Sprintf("%s config", mode), zap.Reflect("config", cfg))
log.Info("PD config", zap.Reflect("config", cfg))
serviceMiddlewareCfg := config.NewServiceMiddlewareConfig()

s := &Server{
Expand All @@ -259,7 +257,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le
ctx: ctx,
startTimestamp: time.Now().Unix(),
DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename),
mode: mode,
isKeyspaceEnabled: isKeyspaceEnabled,
tsoClientPool: struct {
syncutil.RWMutex
clients map[string]tsopb.TSO_TsoClient
Expand Down Expand Up @@ -494,7 +492,7 @@ func (s *Server) startServer(ctx context.Context) error {
Member: s.member.MemberValue(),
Step: keyspace.AllocStep,
})
if s.IsAPIServiceMode() {
if s.IsKeyspaceEnabled() {
s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client)
}
s.keyspaceManager = keyspace.NewKeyspaceManager(s.ctx, s.storage, s.cluster, keyspaceIDAllocator, &s.cfg.Keyspace, s.keyspaceGroupManager)
Expand Down Expand Up @@ -545,7 +543,7 @@ func (s *Server) Close() {
s.cgMonitor.StopMonitor()

s.stopServerLoop()
if s.IsAPIServiceMode() {
if s.IsKeyspaceEnabled() {
s.keyspaceGroupManager.Close()
}

Expand Down Expand Up @@ -656,10 +654,8 @@ func (s *Server) startServerLoop(ctx context.Context) {
go s.etcdLeaderLoop()
go s.serverMetricsLoop()
go s.encryptionKeyManagerLoop()
if s.IsAPIServiceMode() {
s.initTSOPrimaryWatcher()
s.initSchedulingPrimaryWatcher()
}
s.initTSOPrimaryWatcher()
s.initSchedulingPrimaryWatcher()
}

func (s *Server) stopServerLoop() {
Expand Down Expand Up @@ -803,9 +799,9 @@ func (s *Server) stopRaftCluster() {
s.cluster.Stop()
}

// IsAPIServiceMode return whether the server is in API service mode.
func (s *Server) IsAPIServiceMode() bool {
return s.mode == APIServiceMode
// IsKeyspaceEnabled return whether the keyspace is enabled.
func (s *Server) IsKeyspaceEnabled() bool {
return s.isKeyspaceEnabled
}

// GetAddr returns the server urls for clients.
Expand Down Expand Up @@ -1410,10 +1406,7 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster {

// IsServiceIndependent returns whether the service is independent.
func (s *Server) IsServiceIndependent(name string) bool {
if s.mode == APIServiceMode && !s.IsClosed() {
if name == constant.TSOServiceName && !s.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() {
return true
}
if !s.IsClosed() {
return s.cluster.IsServiceIndependent(name)
}
return false
Expand Down Expand Up @@ -1615,7 +1608,7 @@ func (s *Server) leaderLoop() {

for {
if s.IsClosed() {
log.Info(fmt.Sprintf("server is closed, return %s leader loop", s.mode))
log.Info("server is closed, return PD leader loop")
return
}

Expand All @@ -1637,10 +1630,8 @@ func (s *Server) leaderLoop() {
log.Error("reload config failed", errs.ZapError(err))
continue
}
if !s.IsAPIServiceMode() {
// Check the cluster dc-location after the PD leader is elected
go s.tsoAllocatorManager.ClusterDCLocationChecker()
}
// Check the cluster dc-location after the PD leader is elected
go s.tsoAllocatorManager.ClusterDCLocationChecker()
syncer := s.cluster.GetRegionSyncer()
if s.persistOptions.IsUseRegionStorage() {
syncer.StartSyncWithLeader(leader.GetListenUrls()[0])
Expand Down Expand Up @@ -1688,13 +1679,13 @@ func (s *Server) leaderLoop() {
}

func (s *Server) campaignLeader() {
log.Info(fmt.Sprintf("start to campaign %s leader", s.mode), zap.String("campaign-leader-name", s.Name()))
log.Info("start to campaign PD leader", zap.String("campaign-leader-name", s.Name()))
if err := s.member.CampaignLeader(s.ctx, s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info(fmt.Sprintf("campaign %s leader meets error due to txn conflict, another PD/API server may campaign successfully", s.mode),
log.Info("campaign %PD leader meets error due to txn conflict, another PD/API server may campaign successfully",
zap.String("campaign-leader-name", s.Name()))
} else {
log.Error(fmt.Sprintf("campaign %s leader meets error due to etcd error", s.mode),
log.Error("campaign PD leader meets error due to etcd error",
zap.String("campaign-leader-name", s.Name()),
errs.ZapError(err))
}
Expand All @@ -1714,7 +1705,7 @@ func (s *Server) campaignLeader() {

// maintain the PD leadership, after this, TSO can be service.
s.member.KeepLeader(ctx)
log.Info(fmt.Sprintf("campaign %s leader ok", s.mode), zap.String("campaign-leader-name", s.Name()))
log.Info("campaign PD leader ok", zap.String("campaign-leader-name", s.Name()))

if err := s.reloadConfigFromKV(); err != nil {
log.Error("failed to reload configuration", errs.ZapError(err))
Expand Down Expand Up @@ -1751,21 +1742,20 @@ func (s *Server) campaignLeader() {
}
// EnableLeader to accept the remaining service, such as GetStore, GetRegion.
s.member.EnableLeader()
member.ServiceMemberGauge.WithLabelValues(s.mode).Set(1)
if !s.IsAPIServiceMode() {
// Check the cluster dc-location after the PD leader is elected.
go s.tsoAllocatorManager.ClusterDCLocationChecker()
}
member.ServiceMemberGauge.WithLabelValues(PD).Set(1)
// Check the cluster dc-location after the PD leader is elected.
go s.tsoAllocatorManager.ClusterDCLocationChecker()

defer resetLeaderOnce.Do(func() {
// as soon as cancel the leadership keepalive, then other member have chance
// to be new leader.
cancel()
s.member.ResetLeader()
member.ServiceMemberGauge.WithLabelValues(s.mode).Set(0)
member.ServiceMemberGauge.WithLabelValues(PD).Set(0)
})

CheckPDVersionWithClusterVersion(s.persistOptions)
log.Info(fmt.Sprintf("%s leader is ready to serve", s.mode), zap.String("leader-name", s.Name()))
log.Info("PD leader is ready to serve", zap.String("leader-name", s.Name()))

leaderTicker := time.NewTicker(constant.LeaderTickInterval)
defer leaderTicker.Stop()
Expand Down
2 changes: 1 addition & 1 deletion server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func TestAPIService(t *testing.T) {
err = svr.Run()
re.NoError(err)
MustWaitLeader(re, []*Server{svr})
re.True(svr.IsAPIServiceMode())
re.True(svr.IsKeyspaceEnabled())
}

func TestIsPathInDirectory(t *testing.T) {
Expand Down

0 comments on commit f63a4a1

Please sign in to comment.