From f63a4a1fd44e60dba889ec67cf672851878c600d Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 6 Nov 2024 17:29:19 +0800 Subject: [PATCH] remove api mode Signed-off-by: Ryan Leung --- cmd/pd-server/main.go | 7 +- pkg/utils/apiutil/serverapi/middleware.go | 5 +- server/api/member.go | 9 +-- server/apiv2/handlers/micro_service.go | 10 --- server/cluster/cluster.go | 82 ++++++++++------------- server/server.go | 68 ++++++++----------- server/server_test.go | 2 +- 7 files changed, 74 insertions(+), 109 deletions(-) diff --git a/cmd/pd-server/main.go b/cmd/pd-server/main.go index 443a50c6811c..fb2538e71dc8 100644 --- a/cmd/pd-server/main.go +++ b/cmd/pd-server/main.go @@ -240,12 +240,7 @@ func start(cmd *cobra.Command, args []string, services ...string) { // Flushing any buffered log entries defer log.Sync() memory.InitMemoryHook() - if len(services) != 0 { - versioninfo.Log(server.APIServiceMode) - } else { - versioninfo.Log(server.PDMode) - } - + versioninfo.Log("PD") for _, msg := range cfg.WarningMsgs { log.Warn(msg) } diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index d6fc98082d6d..57108299e13d 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -114,7 +114,7 @@ func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string, } func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, string) { - if !h.s.IsAPIServiceMode() { + if !h.s.IsServiceIndependent(constant.TSOServiceName) && !h.s.IsServiceIndependent(constant.SchedulingServiceName) { return false, "" } if len(h.microserviceRedirectRules) == 0 { @@ -143,7 +143,8 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName) if !ok || addr == "" { log.Warn("failed to get the service primary addr when trying to match redirect rules", - zap.String("path", r.URL.Path)) + zap.String("path", r.URL.Path), zap.String("addr", addr), + zap.String("target", rule.targetServiceName)) return true, "" } // If the URL contains escaped characters, use RawPath instead of Path diff --git a/server/api/member.go b/server/api/member.go index 02cae4bca1e0..dc81f668d174 100644 --- a/server/api/member.go +++ b/server/api/member.go @@ -71,12 +71,9 @@ func getMembers(svr *server.Server) (*pdpb.GetMembersResponse, error) { if members.GetHeader().GetError() != nil { return nil, errors.WithStack(errors.New(members.GetHeader().GetError().String())) } - dclocationDistribution := make(map[string][]uint64) - if !svr.IsAPIServiceMode() { - dclocationDistribution, err = svr.GetTSOAllocatorManager().GetClusterDCLocationsFromEtcd() - if err != nil { - return nil, errors.WithStack(err) - } + dclocationDistribution, err := svr.GetTSOAllocatorManager().GetClusterDCLocationsFromEtcd() + if err != nil { + return nil, errors.WithStack(err) } for _, m := range members.GetMembers() { var e error diff --git a/server/apiv2/handlers/micro_service.go b/server/apiv2/handlers/micro_service.go index fd44665530f0..6da4ae37e74f 100644 --- a/server/apiv2/handlers/micro_service.go +++ b/server/apiv2/handlers/micro_service.go @@ -38,11 +38,6 @@ func RegisterMicroService(r *gin.RouterGroup) { // @Router /ms/members/{service} [get] func GetMembers(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) - if !svr.IsAPIServiceMode() { - c.AbortWithStatusJSON(http.StatusNotFound, "not support micro service") - return - } - if service := c.Param("service"); len(service) > 0 { entries, err := discovery.GetMSMembers(service, svr.GetClient()) if err != nil { @@ -64,11 +59,6 @@ func GetMembers(c *gin.Context) { // @Router /ms/primary/{service} [get] func GetPrimary(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) - if !svr.IsAPIServiceMode() { - c.AbortWithStatusJSON(http.StatusNotFound, "not support micro service") - return - } - if service := c.Param("service"); len(service) > 0 { addr, _ := svr.GetServicePrimaryAddr(c.Request.Context(), service) c.IndentedJSON(http.StatusOK, addr) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 7eb82c99b257..4bbe6ec2c18a 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -128,7 +128,7 @@ type Server interface { GetMembers() ([]*pdpb.Member, error) ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error GetKeyspaceGroupManager() *keyspace.GroupManager - IsAPIServiceMode() bool + IsKeyspaceEnabled() bool GetSafePointV2Manager() *gc.SafePointV2Manager } @@ -153,12 +153,12 @@ type RaftCluster struct { etcdClient *clientv3.Client httpClient *http.Client - running bool - isAPIServiceMode bool - meta *metapb.Cluster - storage storage.Storage - minResolvedTS uint64 - externalTS uint64 + running bool + isKeyspaceEnabled bool + meta *metapb.Cluster + storage storage.Storage + minResolvedTS uint64 + externalTS uint64 // Keep the previous store limit settings when removing a store. prevStoreLimit map[uint64]map[storelimit.Type]float64 @@ -323,7 +323,7 @@ func (c *RaftCluster) Start(s Server) error { log.Warn("raft cluster has already been started") return nil } - c.isAPIServiceMode = s.IsAPIServiceMode() + c.isKeyspaceEnabled = s.IsKeyspaceEnabled() err := c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager()) if err != nil { return err @@ -359,7 +359,7 @@ func (c *RaftCluster) Start(s Server) error { log.Error("load external timestamp meets error", zap.Error(err)) } - if c.isAPIServiceMode { + if c.isKeyspaceEnabled { // bootstrap keyspace group manager after starting other parts successfully. // This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster. err = c.keyspaceGroupManager.Bootstrap(c.ctx) @@ -387,51 +387,43 @@ func (c *RaftCluster) Start(s Server) error { } func (c *RaftCluster) checkSchedulingService() { - if c.isAPIServiceMode { - servers, err := discovery.Discover(c.etcdClient, constant.SchedulingServiceName) - if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) { - c.startSchedulingJobs(c, c.hbstreams) - c.UnsetServiceIndependent(constant.SchedulingServiceName) - } else { - if c.stopSchedulingJobs() || c.coordinator == nil { - c.initCoordinator(c.ctx, c, c.hbstreams) - } - if !c.IsServiceIndependent(constant.SchedulingServiceName) { - c.SetServiceIndependent(constant.SchedulingServiceName) - } - } - } else { + servers, err := discovery.Discover(c.etcdClient, constant.SchedulingServiceName) + if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) { c.startSchedulingJobs(c, c.hbstreams) c.UnsetServiceIndependent(constant.SchedulingServiceName) + } else { + if c.stopSchedulingJobs() || c.coordinator == nil { + c.initCoordinator(c.ctx, c, c.hbstreams) + } + if !c.IsServiceIndependent(constant.SchedulingServiceName) { + c.SetServiceIndependent(constant.SchedulingServiceName) + } } } // checkTSOService checks the TSO service. func (c *RaftCluster) checkTSOService() { - if c.isAPIServiceMode { - if c.opt.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() { - servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName) - if err != nil || len(servers) == 0 { - if err := c.startTSOJobsIfNeeded(); err != nil { - log.Error("failed to start TSO jobs", errs.ZapError(err)) - return - } - if c.IsServiceIndependent(constant.TSOServiceName) { - log.Info("TSO is provided by PD") - c.UnsetServiceIndependent(constant.TSOServiceName) - } - } else { - if err := c.stopTSOJobsIfNeeded(); err != nil { - log.Error("failed to stop TSO jobs", errs.ZapError(err)) - return - } - if !c.IsServiceIndependent(constant.TSOServiceName) { - log.Info("TSO is provided by TSO server") - c.SetServiceIndependent(constant.TSOServiceName) - } + if c.opt.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() { + servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName) + if err != nil || len(servers) == 0 { + if err := c.startTSOJobsIfNeeded(); err != nil { + log.Error("failed to start TSO jobs", errs.ZapError(err)) + return + } + if c.IsServiceIndependent(constant.TSOServiceName) { + log.Info("TSO is provided by PD") + c.UnsetServiceIndependent(constant.TSOServiceName) + } + } else { + if err := c.stopTSOJobsIfNeeded(); err != nil { + log.Error("failed to stop TSO jobs", errs.ZapError(err)) + return + } + if !c.IsServiceIndependent(constant.TSOServiceName) { + log.Info("TSO is provided by TSO server") + c.SetServiceIndependent(constant.TSOServiceName) } } - return } if err := c.startTSOJobsIfNeeded(); err != nil { diff --git a/server/server.go b/server/server.go index 029c85694c36..6214dd25c21c 100644 --- a/server/server.go +++ b/server/server.go @@ -99,10 +99,8 @@ const ( recoveringMarkPath = "cluster/markers/snapshot-recovering" - // PDMode represents that server is in PD mode. - PDMode = "PD" - // APIServiceMode represents that server is in API service mode. - APIServiceMode = "API Service" + // PD is the default service name. + PD = "PD" // maxRetryTimesGetServicePrimary is the max retry times for getting primary addr. // Note: it need to be less than client.defaultPDTimeout @@ -227,7 +225,7 @@ type Server struct { auditBackends []audit.Backend registry *registry.ServiceRegistry - mode string + isKeyspaceEnabled bool servicePrimaryMap sync.Map /* Store as map[string]string */ tsoPrimaryWatcher *etcdutil.LoopWatcher schedulingPrimaryWatcher *etcdutil.LoopWatcher @@ -241,13 +239,13 @@ type HandlerBuilder func(context.Context, *Server) (http.Handler, apiutil.APISer // CreateServer creates the UNINITIALIZED pd server with given configuration. func CreateServer(ctx context.Context, cfg *config.Config, services []string, legacyServiceBuilders ...HandlerBuilder) (*Server, error) { - var mode string + var isKeyspaceEnabled bool if len(services) != 0 { - mode = APIServiceMode + isKeyspaceEnabled = true } else { - mode = PDMode + isKeyspaceEnabled = false } - log.Info(fmt.Sprintf("%s config", mode), zap.Reflect("config", cfg)) + log.Info("PD config", zap.Reflect("config", cfg)) serviceMiddlewareCfg := config.NewServiceMiddlewareConfig() s := &Server{ @@ -259,7 +257,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le ctx: ctx, startTimestamp: time.Now().Unix(), DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename), - mode: mode, + isKeyspaceEnabled: isKeyspaceEnabled, tsoClientPool: struct { syncutil.RWMutex clients map[string]tsopb.TSO_TsoClient @@ -494,7 +492,7 @@ func (s *Server) startServer(ctx context.Context) error { Member: s.member.MemberValue(), Step: keyspace.AllocStep, }) - if s.IsAPIServiceMode() { + if s.IsKeyspaceEnabled() { s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client) } s.keyspaceManager = keyspace.NewKeyspaceManager(s.ctx, s.storage, s.cluster, keyspaceIDAllocator, &s.cfg.Keyspace, s.keyspaceGroupManager) @@ -545,7 +543,7 @@ func (s *Server) Close() { s.cgMonitor.StopMonitor() s.stopServerLoop() - if s.IsAPIServiceMode() { + if s.IsKeyspaceEnabled() { s.keyspaceGroupManager.Close() } @@ -656,10 +654,8 @@ func (s *Server) startServerLoop(ctx context.Context) { go s.etcdLeaderLoop() go s.serverMetricsLoop() go s.encryptionKeyManagerLoop() - if s.IsAPIServiceMode() { - s.initTSOPrimaryWatcher() - s.initSchedulingPrimaryWatcher() - } + s.initTSOPrimaryWatcher() + s.initSchedulingPrimaryWatcher() } func (s *Server) stopServerLoop() { @@ -803,9 +799,9 @@ func (s *Server) stopRaftCluster() { s.cluster.Stop() } -// IsAPIServiceMode return whether the server is in API service mode. -func (s *Server) IsAPIServiceMode() bool { - return s.mode == APIServiceMode +// IsKeyspaceEnabled return whether the keyspace is enabled. +func (s *Server) IsKeyspaceEnabled() bool { + return s.isKeyspaceEnabled } // GetAddr returns the server urls for clients. @@ -1410,10 +1406,7 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster { // IsServiceIndependent returns whether the service is independent. func (s *Server) IsServiceIndependent(name string) bool { - if s.mode == APIServiceMode && !s.IsClosed() { - if name == constant.TSOServiceName && !s.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() { - return true - } + if !s.IsClosed() { return s.cluster.IsServiceIndependent(name) } return false @@ -1615,7 +1608,7 @@ func (s *Server) leaderLoop() { for { if s.IsClosed() { - log.Info(fmt.Sprintf("server is closed, return %s leader loop", s.mode)) + log.Info("server is closed, return PD leader loop") return } @@ -1637,10 +1630,8 @@ func (s *Server) leaderLoop() { log.Error("reload config failed", errs.ZapError(err)) continue } - if !s.IsAPIServiceMode() { - // Check the cluster dc-location after the PD leader is elected - go s.tsoAllocatorManager.ClusterDCLocationChecker() - } + // Check the cluster dc-location after the PD leader is elected + go s.tsoAllocatorManager.ClusterDCLocationChecker() syncer := s.cluster.GetRegionSyncer() if s.persistOptions.IsUseRegionStorage() { syncer.StartSyncWithLeader(leader.GetListenUrls()[0]) @@ -1688,13 +1679,13 @@ func (s *Server) leaderLoop() { } func (s *Server) campaignLeader() { - log.Info(fmt.Sprintf("start to campaign %s leader", s.mode), zap.String("campaign-leader-name", s.Name())) + log.Info("start to campaign PD leader", zap.String("campaign-leader-name", s.Name())) if err := s.member.CampaignLeader(s.ctx, s.cfg.LeaderLease); err != nil { if err.Error() == errs.ErrEtcdTxnConflict.Error() { - log.Info(fmt.Sprintf("campaign %s leader meets error due to txn conflict, another PD/API server may campaign successfully", s.mode), + log.Info("campaign %PD leader meets error due to txn conflict, another PD/API server may campaign successfully", zap.String("campaign-leader-name", s.Name())) } else { - log.Error(fmt.Sprintf("campaign %s leader meets error due to etcd error", s.mode), + log.Error("campaign PD leader meets error due to etcd error", zap.String("campaign-leader-name", s.Name()), errs.ZapError(err)) } @@ -1714,7 +1705,7 @@ func (s *Server) campaignLeader() { // maintain the PD leadership, after this, TSO can be service. s.member.KeepLeader(ctx) - log.Info(fmt.Sprintf("campaign %s leader ok", s.mode), zap.String("campaign-leader-name", s.Name())) + log.Info("campaign PD leader ok", zap.String("campaign-leader-name", s.Name())) if err := s.reloadConfigFromKV(); err != nil { log.Error("failed to reload configuration", errs.ZapError(err)) @@ -1751,21 +1742,20 @@ func (s *Server) campaignLeader() { } // EnableLeader to accept the remaining service, such as GetStore, GetRegion. s.member.EnableLeader() - member.ServiceMemberGauge.WithLabelValues(s.mode).Set(1) - if !s.IsAPIServiceMode() { - // Check the cluster dc-location after the PD leader is elected. - go s.tsoAllocatorManager.ClusterDCLocationChecker() - } + member.ServiceMemberGauge.WithLabelValues(PD).Set(1) + // Check the cluster dc-location after the PD leader is elected. + go s.tsoAllocatorManager.ClusterDCLocationChecker() + defer resetLeaderOnce.Do(func() { // as soon as cancel the leadership keepalive, then other member have chance // to be new leader. cancel() s.member.ResetLeader() - member.ServiceMemberGauge.WithLabelValues(s.mode).Set(0) + member.ServiceMemberGauge.WithLabelValues(PD).Set(0) }) CheckPDVersionWithClusterVersion(s.persistOptions) - log.Info(fmt.Sprintf("%s leader is ready to serve", s.mode), zap.String("leader-name", s.Name())) + log.Info("PD leader is ready to serve", zap.String("leader-name", s.Name())) leaderTicker := time.NewTicker(constant.LeaderTickInterval) defer leaderTicker.Stop() diff --git a/server/server_test.go b/server/server_test.go index 7dd91b9f61f5..393e60076900 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -271,7 +271,7 @@ func TestAPIService(t *testing.T) { err = svr.Run() re.NoError(err) MustWaitLeader(re, []*Server{svr}) - re.True(svr.IsAPIServiceMode()) + re.True(svr.IsKeyspaceEnabled()) } func TestIsPathInDirectory(t *testing.T) {