Skip to content

Commit

Permalink
enable server/*.go execpt server/api/*
Browse files Browse the repository at this point in the history
Signed-off-by: okJiang <[email protected]>
  • Loading branch information
okJiang committed Jun 21, 2024
1 parent 2caf2f6 commit cf68b06
Show file tree
Hide file tree
Showing 22 changed files with 110 additions and 60 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,6 @@ issues:
- path: (pd-analysis|pd-api-bench|pd-backup|pd-ctl|pd-heartbeat-bench|pd-recover|pd-simulator|pd-tso-bench|pd-ut|regions-dump|stores-dump)
linters:
- errcheck
- path: (server/api/.*\.go|server/.*\.go|pkg/schedule/schedulers/.*\.go)
- path: (server/api/.*\.go|pkg/schedule/schedulers/.*\.go)
linters:
- errcheck
12 changes: 8 additions & 4 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,12 @@ func start(cmd *cobra.Command, args []string, services ...string) {
schedulers.Register()
cfg := config.NewConfig()
flagSet := cmd.Flags()
flagSet.Parse(args)
err := cfg.Parse(flagSet)
err := flagSet.Parse(args)
if err != nil {
cmd.Println(err)
return
}
err = cfg.Parse(flagSet)
defer logutil.LogPanic()

if err != nil {
Expand Down Expand Up @@ -231,7 +235,7 @@ func start(cmd *cobra.Command, args []string, services ...string) {
log.Fatal("initialize logger error", errs.ZapError(err))
}
// Flushing any buffered log entries
defer log.Sync()
defer log.Sync() //nolint:errcheck
memory.InitMemoryHook()
if len(services) != 0 {
versioninfo.Log(server.APIServiceMode)
Expand Down Expand Up @@ -295,6 +299,6 @@ func start(cmd *cobra.Command, args []string, services ...string) {
}

func exit(code int) {
log.Sync()
log.Sync() // nolint:errcheck
os.Exit(code)
}
6 changes: 3 additions & 3 deletions pkg/mcs/metastorage/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type dummyRestService struct{}

func (dummyRestService) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNotImplemented)
w.Write([]byte("not implemented"))
w.Write([]byte("not implemented")) // nolint:errcheck
}

// Service is the gRPC service for meta storage.
Expand All @@ -71,9 +71,9 @@ func (s *Service) RegisterGRPCService(g *grpc.Server) {
}

// RegisterRESTHandler registers the service to REST server.
func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) {
func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error {
handler, group := SetUpRestHandler(s)
apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
return apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
}

func (s *Service) checkServing() error {
Expand Down
16 changes: 11 additions & 5 deletions pkg/mcs/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type ServiceBuilder func(bs.Server) RegistrableService
// RegistrableService is the interface that should wraps the RegisterService method.
type RegistrableService interface {
RegisterGRPCService(g *grpc.Server)
RegisterRESTHandler(userDefineHandlers map[string]http.Handler)
RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error
}

// ServiceRegistry is a map that stores all registered grpc services.
Expand Down Expand Up @@ -82,14 +82,20 @@ func (r *ServiceRegistry) InstallAllRESTHandler(srv bs.Server, h map[string]http
for name, builder := range r.builders {
serviceName := createServiceName(prefix, name)
if l, ok := r.services[serviceName]; ok {
l.RegisterRESTHandler(h)
log.Info("restful API service already registered", zap.String("prefix", prefix), zap.String("service-name", name))
if err := l.RegisterRESTHandler(h); err != nil {
log.Error("register REST handler failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
} else {
log.Info("restful API service already registered", zap.String("prefix", prefix), zap.String("service-name", name))
}
continue
}
l := builder(srv)
r.services[serviceName] = l
l.RegisterRESTHandler(h)
log.Info("restful API service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name))
if err := l.RegisterRESTHandler(h); err != nil {
log.Error("register REST handler failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
} else {
log.Info("restful API service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name))
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/mcs/resourcemanager/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ func (c *Config) Adjust(meta *toml.MetaData) error {
}

c.adjustLog(configMetaData.Child("log"))
c.Security.Encryption.Adjust()
if err := c.Security.Encryption.Adjust(); err != nil {
return err
}

c.Controller.Adjust(configMetaData.Child("controller"))
configutil.AdjustInt64(&c.LeaderLease, utils.DefaultLeaderLease)
Expand Down
10 changes: 6 additions & 4 deletions pkg/mcs/resourcemanager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type dummyRestService struct{}

func (dummyRestService) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNotImplemented)
w.Write([]byte("not implemented"))
w.Write([]byte("not implemented")) // nolint:errcheck
}

// Service is the gRPC service for resource manager.
Expand All @@ -76,9 +76,9 @@ func (s *Service) RegisterGRPCService(g *grpc.Server) {
}

// RegisterRESTHandler registers the service to REST server.
func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) {
func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error {
handler, group := SetUpRestHandler(s)
apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
return apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
}

// GetManager returns the resource manager.
Expand Down Expand Up @@ -228,6 +228,8 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu
log.Debug("finish token request from", zap.String("resource-group", resourceGroupName))
resps.Responses = append(resps.Responses, resp)
}
stream.Send(resps)
if err := stream.Send(resps); err != nil {
return errors.WithStack(err)
}
}
}
4 changes: 3 additions & 1 deletion pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,9 @@ func (m *Manager) persistResourceGroupRunningState() {
m.RLock()
group, ok := m.groups[keys[idx]]
if ok {
group.persistStates(m.storage)
if err := group.persistStates(m.storage); err != nil {
log.Error("persist resource group state failed", zap.Error(err))
}
}
m.RUnlock()
}
Expand Down
18 changes: 13 additions & 5 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ func (s *Server) campaignLeader() {

log.Info("triggering the primary callback functions")
for _, cb := range s.primaryCallbacks {
cb(ctx)
if err := cb(ctx); err != nil {
log.Error("failed to trigger the primary callback function", errs.ZapError(err))
}
}

s.participant.EnableLeader()
Expand Down Expand Up @@ -213,7 +215,9 @@ func (s *Server) Close() {
}

log.Info("closing resource manager server ...")
s.serviceRegister.Deregister()
if err := s.serviceRegister.Deregister(); err != nil {
log.Error("failed to deregister the service", errs.ZapError(err))
}
utils.StopHTTPServer(s)
utils.StopGRPCServer(s)
s.GetListener().Close()
Expand Down Expand Up @@ -362,10 +366,14 @@ func CreateServer(ctx context.Context, cfg *Config) *Server {

// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server
func CreateServerWrapper(cmd *cobra.Command, args []string) {
cmd.Flags().Parse(args)
err := cmd.Flags().Parse(args)
if err != nil {
cmd.Println(err)
return
}
cfg := NewConfig()
flagSet := cmd.Flags()
err := cfg.Parse(flagSet)
err = cfg.Parse(flagSet)
defer logutil.LogPanic()

if err != nil {
Expand All @@ -389,7 +397,7 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) {
log.Fatal("initialize logger error", errs.ZapError(err))
}
// Flushing any buffered log entries
defer log.Sync()
defer log.Sync() // nolint:errcheck

versioninfo.Log(serviceName)
log.Info("resource manager config", zap.Reflect("config", cfg))
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewTestServer(ctx context.Context, re *require.Assertions, cfg *Config) (*S
re.NoError(err)
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
// Flushing any buffered log entries
defer log.Sync()
defer log.Sync() // nolint:errcheck

s := CreateServer(ctx, cfg)
if err = s.Run(); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ func (c *Config) adjust(meta *toml.MetaData) error {
}

c.adjustLog(configMetaData.Child("log"))
c.Security.Encryption.Adjust()
if err := c.Security.Encryption.Adjust(); err != nil {
return err
}

configutil.AdjustInt64(&c.LeaderLease, utils.DefaultLeaderLease)

Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type dummyRestService struct{}

func (dummyRestService) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNotImplemented)
w.Write([]byte("not implemented"))
w.Write([]byte("not implemented")) // nolint:errcheck
}

// ConfigProvider is used to get scheduling config from the given
Expand Down Expand Up @@ -336,9 +336,9 @@ func (s *Service) RegisterGRPCService(g *grpc.Server) {
}

// RegisterRESTHandler registers the service to REST server.
func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) {
func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error {
handler, group := SetUpRestHandler(s)
apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
return apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
}

func (s *Service) errorHeader(err *schedulingpb.Error) *schedulingpb.ResponseHeader {
Expand Down
14 changes: 10 additions & 4 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,9 @@ func (s *Server) Close() {
}

log.Info("closing scheduling server ...")
s.serviceRegister.Deregister()
if err := s.serviceRegister.Deregister(); err != nil {
log.Error("failed to deregister the service", errs.ZapError(err))
}
utils.StopHTTPServer(s)
utils.StopGRPCServer(s)
s.GetListener().Close()
Expand Down Expand Up @@ -563,10 +565,14 @@ func CreateServer(ctx context.Context, cfg *config.Config) *Server {
// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server
func CreateServerWrapper(cmd *cobra.Command, args []string) {
schedulers.Register()
cmd.Flags().Parse(args)
err := cmd.Flags().Parse(args)
if err != nil {
cmd.Println(err)
return
}
cfg := config.NewConfig()
flagSet := cmd.Flags()
err := cfg.Parse(flagSet)
err = cfg.Parse(flagSet)
defer logutil.LogPanic()

if err != nil {
Expand All @@ -590,7 +596,7 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) {
log.Fatal("initialize logger error", errs.ZapError(err))
}
// Flushing any buffered log entries
defer log.Sync()
defer log.Sync() // nolint:errcheck

versioninfo.Log(serviceName)
log.Info("scheduling service config", zap.Reflect("config", cfg))
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewTestServer(ctx context.Context, re *require.Assertions, cfg *config.Conf
re.NoError(err)
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
// Flushing any buffered log entries
defer log.Sync()
defer log.Sync() // nolint:errcheck

s := CreateServer(ctx, cfg)
if err = s.Run(); err != nil {
Expand Down
4 changes: 1 addition & 3 deletions pkg/mcs/tso/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,7 @@ func (c *Config) Adjust(meta *toml.MetaData) error {
}

c.adjustLog(configMetaData.Child("log"))
c.Security.Encryption.Adjust()

return nil
return c.Security.Encryption.Adjust()
}

func (c *Config) adjustLog(meta *configutil.ConfigMetaData) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/tso/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type dummyRestService struct{}

func (dummyRestService) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNotImplemented)
w.Write([]byte("not implemented"))
w.Write([]byte("not implemented")) // nolint:errcheck
}

// ConfigProvider is used to get tso config from the given
Expand Down Expand Up @@ -79,9 +79,9 @@ func (s *Service) RegisterGRPCService(g *grpc.Server) {
}

// RegisterRESTHandler registers the service to REST server.
func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) {
func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error {
handler, group := SetUpRestHandler(s)
apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
return apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
}

// Tso returns a stream of timestamps
Expand Down
14 changes: 10 additions & 4 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ func (s *Server) Close() {
log.Info("closing tso server ...")
// close tso service loops in the keyspace group manager
s.keyspaceGroupManager.Close()
s.serviceRegister.Deregister()
if err := s.serviceRegister.Deregister(); err != nil {
log.Error("failed to deregister the service", errs.ZapError(err))
}
utils.StopHTTPServer(s)
utils.StopGRPCServer(s)
s.GetListener().Close()
Expand Down Expand Up @@ -435,10 +437,14 @@ func CreateServer(ctx context.Context, cfg *Config) *Server {

// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server
func CreateServerWrapper(cmd *cobra.Command, args []string) {
cmd.Flags().Parse(args)
err := cmd.Flags().Parse(args)
if err != nil {
cmd.Println(err)
return
}
cfg := NewConfig()
flagSet := cmd.Flags()
err := cfg.Parse(flagSet)
err = cfg.Parse(flagSet)
defer logutil.LogPanic()

if err != nil {
Expand All @@ -462,7 +468,7 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) {
log.Fatal("initialize logger error", errs.ZapError(err))
}
// Flushing any buffered log entries
defer log.Sync()
defer log.Sync() // nolint:errcheck

versioninfo.Log(serviceName)
log.Info("TSO service config", zap.Reflect("config", cfg))
Expand Down
11 changes: 6 additions & 5 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2172,7 +2172,9 @@ func (c *RaftCluster) runMinResolvedTSJob() {
interval = c.opt.GetMinResolvedTSPersistenceInterval()
if interval != 0 {
if current, needPersist := c.CheckAndUpdateMinResolvedTS(); needPersist {
c.storage.SaveMinResolvedTS(current)
if err := c.storage.SaveMinResolvedTS(current); err != nil {
log.Error("persist min resolved ts meet error", errs.ZapError(err))
}
}
} else {
// If interval in config is zero, it means not to persist resolved ts and check config with this interval
Expand Down Expand Up @@ -2249,8 +2251,7 @@ func (c *RaftCluster) SetExternalTS(timestamp uint64) error {
c.Lock()
defer c.Unlock()
c.externalTS = timestamp
c.storage.SaveExternalTS(timestamp)
return nil
return c.storage.SaveExternalTS(timestamp)
}

// SetStoreLimit sets a store limit for a given type and rate.
Expand Down Expand Up @@ -2286,8 +2287,8 @@ func (c *RaftCluster) SetAllStoresLimit(typ storelimit.Type, ratePerMin float64)
}

// SetAllStoresLimitTTL sets all store limit for a given type and rate with ttl.
func (c *RaftCluster) SetAllStoresLimitTTL(typ storelimit.Type, ratePerMin float64, ttl time.Duration) {
c.opt.SetAllStoresLimitTTL(c.ctx, c.etcdClient, typ, ratePerMin, ttl)
func (c *RaftCluster) SetAllStoresLimitTTL(typ storelimit.Type, ratePerMin float64, ttl time.Duration) error {
return c.opt.SetAllStoresLimitTTL(c.ctx, c.etcdClient, typ, ratePerMin, ttl)
}

// GetClusterVersion returns the current cluster version.
Expand Down
Loading

0 comments on commit cf68b06

Please sign in to comment.