diff --git a/client/client.go b/client/client.go index aafe4aba77f..8ba03161227 100644 --- a/client/client.go +++ b/client/client.go @@ -645,6 +645,7 @@ func (c *client) setup() error { return nil } +// Close closes the client. func (c *client) Close() { c.cancel() c.wg.Wait() @@ -802,6 +803,7 @@ func (c *client) UpdateOption(option DynamicOption, value any) error { return nil } +// GetAllMembers gets the members Info from PD. func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { start := time.Now() defer func() { cmdDurationGetAllMembers.Observe(time.Since(start).Seconds()) }() @@ -848,10 +850,12 @@ func (c *client) getRegionAPIClientAndContext(ctx context.Context, allowFollower return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower) } +// GetTSAsync implements the TSOClient interface. func (c *client) GetTSAsync(ctx context.Context) TSFuture { return c.GetLocalTSAsync(ctx, globalDCLocation) } +// GetLocalTSAsync implements the TSOClient interface. func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFuture { defer trace.StartRegion(ctx, "pdclient.GetLocalTSAsync").End() if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { @@ -902,16 +906,19 @@ func (c *client) dispatchTSORequestWithRetry(ctx context.Context, dcLocation str return req } +// GetTS implements the TSOClient interface. func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err error) { resp := c.GetTSAsync(ctx) return resp.Wait() } +// GetLocalTS implements the TSOClient interface. func (c *client) GetLocalTS(ctx context.Context, dcLocation string) (physical int64, logical int64, err error) { resp := c.GetLocalTSAsync(ctx, dcLocation) return resp.Wait() } +// GetMinTS implements the TSOClient interface. func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) { // Handle compatibility issue in case of PD/API server doesn't support GetMinTS API. serviceMode := c.getServiceMode() @@ -975,6 +982,7 @@ func handleRegionResponse(res *pdpb.GetRegionResponse) *Region { return r } +// GetRegionFromMember implements the RPCClient interface. func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, _ ...GetRegionOption) (*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context())) @@ -1013,6 +1021,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs return handleRegionResponse(resp), nil } +// GetRegion implements the RPCClient interface. func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context())) @@ -1051,6 +1060,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt return handleRegionResponse(resp), nil } +// GetPrevRegion implements the RPCClient interface. func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context())) @@ -1089,6 +1099,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio return handleRegionResponse(resp), nil } +// GetRegionByID implements the RPCClient interface. func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...GetRegionOption) (*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context())) @@ -1127,6 +1138,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get return handleRegionResponse(resp), nil } +// ScanRegions implements the RPCClient interface. func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context())) @@ -1176,6 +1188,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, return handleRegionsResponse(resp), nil } +// BatchScanRegions implements the RPCClient interface. func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit int, opts ...GetRegionOption) ([]*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.BatchScanRegions", opentracing.ChildOf(span.Context())) @@ -1274,6 +1287,7 @@ func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*Region { return regions } +// GetStore implements the RPCClient interface. func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.GetStore", opentracing.ChildOf(span.Context())) @@ -1312,6 +1326,7 @@ func handleStoreResponse(resp *pdpb.GetStoreResponse) (*metapb.Store, error) { return store, nil } +// GetAllStores implements the RPCClient interface. func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*metapb.Store, error) { // Applies options options := &GetStoreOp{} @@ -1345,6 +1360,7 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m return resp.GetStores(), nil } +// UpdateGCSafePoint implements the RPCClient interface. func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.UpdateGCSafePoint", opentracing.ChildOf(span.Context())) @@ -1406,6 +1422,7 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, return resp.GetMinSafePoint(), nil } +// ScatterRegion implements the RPCClient interface. func (c *client) ScatterRegion(ctx context.Context, regionID uint64) error { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.ScatterRegion", opentracing.ChildOf(span.Context())) @@ -1440,6 +1457,7 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g return nil } +// ScatterRegions implements the RPCClient interface. func (c *client) ScatterRegions(ctx context.Context, regionsID []uint64, opts ...RegionsOption) (*pdpb.ScatterRegionResponse, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.ScatterRegions", opentracing.ChildOf(span.Context())) @@ -1448,6 +1466,7 @@ func (c *client) ScatterRegions(ctx context.Context, regionsID []uint64, opts .. return c.scatterRegionsWithOptions(ctx, regionsID, opts...) } +// SplitAndScatterRegions implements the RPCClient interface. func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.SplitAndScatterRegions", opentracing.ChildOf(span.Context())) @@ -1476,6 +1495,7 @@ func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, return protoClient.SplitAndScatterRegions(ctx, req) } +// GetOperator implements the RPCClient interface. func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.GetOperator", opentracing.ChildOf(span.Context())) @@ -1575,6 +1595,7 @@ func trimHTTPPrefix(str string) string { return str } +// LoadGlobalConfig implements the RPCClient interface. func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) { ctx, cancel := context.WithTimeout(ctx, c.option.timeout) defer cancel() @@ -1602,6 +1623,7 @@ func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPat return res, resp.GetRevision(), nil } +// StoreGlobalConfig implements the RPCClient interface. func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error { resArr := make([]*pdpb.GlobalConfigItem, len(items)) for i, it := range items { @@ -1620,6 +1642,7 @@ func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items return nil } +// WatchGlobalConfig implements the RPCClient interface. func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revision int64) (chan []GlobalConfigItem, error) { // TODO: Add retry mechanism // register watch components there @@ -1671,6 +1694,7 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis return globalConfigWatcherCh, err } +// GetExternalTimestamp implements the RPCClient interface. func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) { ctx, cancel := context.WithTimeout(ctx, c.option.timeout) defer cancel() @@ -1691,6 +1715,7 @@ func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) { return resp.GetTimestamp(), nil } +// SetExternalTimestamp implements the RPCClient interface. func (c *client) SetExternalTimestamp(ctx context.Context, timestamp uint64) error { ctx, cancel := context.WithTimeout(ctx, c.option.timeout) defer cancel() diff --git a/client/meta_storage_client.go b/client/meta_storage_client.go index fe7e8a33e93..200491cf19a 100644 --- a/client/meta_storage_client.go +++ b/client/meta_storage_client.go @@ -104,6 +104,7 @@ func getPrefix(key []byte) []byte { return []byte{0} } +// Put implements the MetaStorageClient interface. func (c *client) Put(ctx context.Context, key, value []byte, opts ...OpOption) (*meta_storagepb.PutResponse, error) { options := &Op{} for _, opt := range opts { @@ -139,6 +140,7 @@ func (c *client) Put(ctx context.Context, key, value []byte, opts ...OpOption) ( return resp, nil } +// Get implements the MetaStorageClient interface. func (c *client) Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_storagepb.GetResponse, error) { options := &Op{} for _, opt := range opts { @@ -177,6 +179,7 @@ func (c *client) Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_s return resp, nil } +// Watch implements the MetaStorageClient interface. func (c *client) Watch(ctx context.Context, key []byte, opts ...OpOption) (chan []*meta_storagepb.Event, error) { eventCh := make(chan []*meta_storagepb.Event, 100) options := &Op{} diff --git a/client/mock_pd_service_discovery.go b/client/mock_pd_service_discovery.go index f1fabd0a1d2..16462b0b1e6 100644 --- a/client/mock_pd_service_discovery.go +++ b/client/mock_pd_service_discovery.go @@ -56,19 +56,46 @@ func (m *mockPDServiceDiscovery) GetAllServiceClients() []ServiceClient { return m.clients } -func (*mockPDServiceDiscovery) GetClusterID() uint64 { return 0 } -func (*mockPDServiceDiscovery) GetKeyspaceID() uint32 { return 0 } -func (*mockPDServiceDiscovery) GetKeyspaceGroupID() uint32 { return 0 } -func (*mockPDServiceDiscovery) GetServiceURLs() []string { return nil } +// GetClusterID implements the ServiceDiscovery interface. +func (*mockPDServiceDiscovery) GetClusterID() uint64 { return 0 } + +// GetKeyspaceID implements the ServiceDiscovery interface. +func (*mockPDServiceDiscovery) GetKeyspaceID() uint32 { return 0 } + +// GetKeyspaceGroupID implements the ServiceDiscovery interface. +func (*mockPDServiceDiscovery) GetKeyspaceGroupID() uint32 { return 0 } + +// GetServiceURLs implements the ServiceDiscovery interface. +func (*mockPDServiceDiscovery) GetServiceURLs() []string { return nil } + +// GetServingEndpointClientConn implements the ServiceDiscovery interface. func (*mockPDServiceDiscovery) GetServingEndpointClientConn() *grpc.ClientConn { return nil } -func (*mockPDServiceDiscovery) GetClientConns() *sync.Map { return nil } -func (*mockPDServiceDiscovery) GetServingURL() string { return "" } -func (*mockPDServiceDiscovery) GetBackupURLs() []string { return nil } -func (*mockPDServiceDiscovery) GetServiceClient() ServiceClient { return nil } + +// GetClientConns implements the ServiceDiscovery interface. +func (*mockPDServiceDiscovery) GetClientConns() *sync.Map { return nil } + +// GetServingURL implements the ServiceDiscovery interface. +func (*mockPDServiceDiscovery) GetServingURL() string { return "" } + +// GetBackupURLs implements the ServiceDiscovery interface. +func (*mockPDServiceDiscovery) GetBackupURLs() []string { return nil } + +// GetServiceClient implements the ServiceDiscovery interface. +func (*mockPDServiceDiscovery) GetServiceClient() ServiceClient { return nil } + +// GetOrCreateGRPCConn implements the ServiceDiscovery interface. func (*mockPDServiceDiscovery) GetOrCreateGRPCConn(string) (*grpc.ClientConn, error) { return nil, nil } -func (*mockPDServiceDiscovery) ScheduleCheckMemberChanged() {} -func (*mockPDServiceDiscovery) CheckMemberChanged() error { return nil } -func (*mockPDServiceDiscovery) AddServingURLSwitchedCallback(...func()) {} + +// ScheduleCheckMemberChanged implements the ServiceDiscovery interface. +func (*mockPDServiceDiscovery) ScheduleCheckMemberChanged() {} + +// CheckMemberChanged implements the ServiceDiscovery interface. +func (*mockPDServiceDiscovery) CheckMemberChanged() error { return nil } + +// AddServingURLSwitchedCallback implements the ServiceDiscovery interface. +func (*mockPDServiceDiscovery) AddServingURLSwitchedCallback(...func()) {} + +// AddServiceURLsSwitchedCallback implements the ServiceDiscovery interface. func (*mockPDServiceDiscovery) AddServiceURLsSwitchedCallback(...func()) {} diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index 9378ed278e0..e8f4c0d7707 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -482,6 +482,7 @@ func newPDServiceDiscovery( return pdsd } +// Init initializes the PD service discovery. func (c *pdServiceDiscovery) Init() error { if c.isInitialized { return nil diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index 19adbd199b0..45182c07d3a 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -108,6 +108,7 @@ func (c *client) ListResourceGroups(ctx context.Context, ops ...GetResourceGroup return resp.GetGroups(), nil } +// GetResourceGroup implements the ResourceManagerClient interface. func (c *client) GetResourceGroup(ctx context.Context, resourceGroupName string, ops ...GetResourceGroupOption) (*rmpb.ResourceGroup, error) { cc, err := c.resourceManagerClient() if err != nil { @@ -133,10 +134,12 @@ func (c *client) GetResourceGroup(ctx context.Context, resourceGroupName string, return resp.GetGroup(), nil } +// AddResourceGroup implements the ResourceManagerClient interface. func (c *client) AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) { return c.putResourceGroup(ctx, metaGroup, add) } +// ModifyResourceGroup implements the ResourceManagerClient interface. func (c *client) ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) { return c.putResourceGroup(ctx, metaGroup, modify) } @@ -167,6 +170,7 @@ func (c *client) putResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceG return resp.GetBody(), nil } +// DeleteResourceGroup implements the ResourceManagerClient interface. func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) { cc, err := c.resourceManagerClient() if err != nil { @@ -187,6 +191,7 @@ func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName stri return resp.GetBody(), nil } +// LoadResourceGroups implements the ResourceManagerClient interface. func (c *client) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) { resp, err := c.Get(ctx, GroupSettingsPathPrefixBytes, WithPrefix()) if err != nil { @@ -206,6 +211,7 @@ func (c *client) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, return groups, resp.Header.Revision, nil } +// AcquireTokenBuckets implements the ResourceManagerClient interface. func (c *client) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) { req := &tokenRequest{ done: make(chan error, 1), @@ -214,7 +220,7 @@ func (c *client) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBuc Request: request, } c.tokenDispatcher.tokenBatchController.tokenRequestCh <- req - grantedTokens, err := req.Wait() + grantedTokens, err := req.wait() if err != nil { return nil, err } @@ -229,7 +235,7 @@ type tokenRequest struct { TokenBuckets []*rmpb.TokenBucketResponse } -func (req *tokenRequest) Wait() (tokenBuckets []*rmpb.TokenBucketResponse, err error) { +func (req *tokenRequest) wait() (tokenBuckets []*rmpb.TokenBucketResponse, err error) { select { case err = <-req.done: err = errors.WithStack(err) diff --git a/client/tso_stream.go b/client/tso_stream.go index da9cab95ba0..76b6ae3c51c 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -116,6 +116,7 @@ type pdTSOStreamAdapter struct { stream pdpb.PD_TsoClient } +// Send implements the grpcTSOStreamAdapter interface. func (s pdTSOStreamAdapter) Send(clusterID uint64, _, _ uint32, dcLocation string, count int64) error { req := &pdpb.TsoRequest{ Header: &pdpb.RequestHeader{ @@ -127,6 +128,7 @@ func (s pdTSOStreamAdapter) Send(clusterID uint64, _, _ uint32, dcLocation strin return s.stream.Send(req) } +// Recv implements the grpcTSOStreamAdapter interface. func (s pdTSOStreamAdapter) Recv() (tsoRequestResult, error) { resp, err := s.stream.Recv() if err != nil { @@ -145,6 +147,7 @@ type tsoTSOStreamAdapter struct { stream tsopb.TSO_TsoClient } +// Send implements the grpcTSOStreamAdapter interface. func (s tsoTSOStreamAdapter) Send(clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, count int64) error { req := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ @@ -158,6 +161,7 @@ func (s tsoTSOStreamAdapter) Send(clusterID uint64, keyspaceID, keyspaceGroupID return s.stream.Send(req) } +// Recv implements the grpcTSOStreamAdapter interface. func (s tsoTSOStreamAdapter) Recv() (tsoRequestResult, error) { resp, err := s.stream.Recv() if err != nil { diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index 39aa11927ca..ce1522d465f 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -79,6 +79,7 @@ type server struct { *scheserver.Server } +// GetCluster returns the cluster. func (s *server) GetCluster() sche.SchedulerCluster { return s.Server.GetCluster() } diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index 200ab388e30..17bf36cc92f 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -44,6 +44,7 @@ const ( // In order to avoid the patrolRegionScanLimit to be too big or too small, it will be limited to [128,8192]. // It takes about 10s to iterate 1,024,000 regions(with DefaultPatrolRegionInterval=10ms) where other steps are not considered. MinPatrolRegionScanLimit = 128 + // MaxPatrolScanRegionLimit is the max limit of regions to scan for a batch. MaxPatrolScanRegionLimit = 8192 patrolRegionPartition = 1024 ) diff --git a/pkg/schedule/prepare_checker.go b/pkg/schedule/prepare_checker.go index df7074b9073..187d68e3d9f 100644 --- a/pkg/schedule/prepare_checker.go +++ b/pkg/schedule/prepare_checker.go @@ -73,13 +73,14 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti return true } +// IsPrepared returns whether the coordinator is prepared. func (checker *prepareChecker) IsPrepared() bool { checker.RLock() defer checker.RUnlock() return checker.prepared } -// for test purpose +// SetPrepared is for test purpose func (checker *prepareChecker) SetPrepared() { checker.Lock() defer checker.Unlock() diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index 05b429c825f..b434c7ad706 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -100,7 +100,7 @@ func (conf *balanceLeaderSchedulerConfig) validateLocked() bool { return conf.Batch >= 1 && conf.Batch <= 10 } -func (conf *balanceLeaderSchedulerConfig) Clone() *balanceLeaderSchedulerConfig { +func (conf *balanceLeaderSchedulerConfig) clone() *balanceLeaderSchedulerConfig { conf.RLock() defer conf.RUnlock() ranges := make([]core.KeyRange, len(conf.Ranges)) @@ -157,7 +157,7 @@ func (handler *balanceLeaderHandler) updateConfig(w http.ResponseWriter, r *http } func (handler *balanceLeaderHandler) listConfig(w http.ResponseWriter, _ *http.Request) { - conf := handler.config.Clone() + conf := handler.config.clone() handler.rd.JSON(w, http.StatusOK, conf) } @@ -190,6 +190,7 @@ func newBalanceLeaderScheduler(opController *operator.Controller, conf *balanceL return s } +// ServeHTTP implements the http.Handler interface. func (l *balanceLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { l.handler.ServeHTTP(w, r) } @@ -204,12 +205,14 @@ func WithBalanceLeaderName(name string) BalanceLeaderCreateOption { } } +// EncodeConfig implements the Scheduler interface. func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) { l.conf.RLock() defer l.conf.RUnlock() return EncodeConfig(l.conf) } +// ReloadConfig implements the Scheduler interface. func (l *balanceLeaderScheduler) ReloadConfig() error { l.conf.Lock() defer l.conf.Unlock() @@ -229,6 +232,7 @@ func (l *balanceLeaderScheduler) ReloadConfig() error { return nil } +// IsScheduleAllowed implements the Scheduler interface. func (l *balanceLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { allowed := l.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() if !allowed { @@ -329,6 +333,7 @@ func (cs *candidateStores) resortStoreWithPos(pos int) { } } +// Schedule implements the Scheduler interface. func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { basePlan := plan.NewBalanceSchedulerPlan() var collector *plan.Collector diff --git a/pkg/schedule/schedulers/balance_leader_test.go b/pkg/schedule/schedulers/balance_leader_test.go index 44ad1f2c0a5..eb1d8a539ce 100644 --- a/pkg/schedule/schedulers/balance_leader_test.go +++ b/pkg/schedule/schedulers/balance_leader_test.go @@ -30,7 +30,7 @@ func TestBalanceLeaderSchedulerConfigClone(t *testing.T) { Ranges: keyRanges1, Batch: 10, } - conf2 := conf.Clone() + conf2 := conf.clone() re.Equal(conf.Batch, conf2.Batch) re.Equal(conf.Ranges, conf2.Ranges) diff --git a/pkg/schedule/schedulers/balance_witness.go b/pkg/schedule/schedulers/balance_witness.go index 0415ad03618..599af6df637 100644 --- a/pkg/schedule/schedulers/balance_witness.go +++ b/pkg/schedule/schedulers/balance_witness.go @@ -59,7 +59,7 @@ type balanceWitnessSchedulerConfig struct { Batch int `json:"batch"` } -func (conf *balanceWitnessSchedulerConfig) Update(data []byte) (int, any) { +func (conf *balanceWitnessSchedulerConfig) update(data []byte) (int, any) { conf.Lock() defer conf.Unlock() @@ -97,7 +97,7 @@ func (conf *balanceWitnessSchedulerConfig) validateLocked() bool { return conf.Batch >= 1 && conf.Batch <= 10 } -func (conf *balanceWitnessSchedulerConfig) Clone() *balanceWitnessSchedulerConfig { +func (conf *balanceWitnessSchedulerConfig) clone() *balanceWitnessSchedulerConfig { conf.RLock() defer conf.RUnlock() ranges := make([]core.KeyRange, len(conf.Ranges)) @@ -149,12 +149,12 @@ func newBalanceWitnessHandler(conf *balanceWitnessSchedulerConfig) http.Handler func (handler *balanceWitnessHandler) updateConfig(w http.ResponseWriter, r *http.Request) { data, _ := io.ReadAll(r.Body) r.Body.Close() - httpCode, v := handler.config.Update(data) + httpCode, v := handler.config.update(data) handler.rd.JSON(w, httpCode, v) } func (handler *balanceWitnessHandler) listConfig(w http.ResponseWriter, _ *http.Request) { - conf := handler.config.Clone() + conf := handler.config.clone() handler.rd.JSON(w, http.StatusOK, conf) } diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index d43f540a489..d4e26cb1b68 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -72,7 +72,7 @@ func (conf *evictLeaderSchedulerConfig) getBatch() int { return conf.Batch } -func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig { +func (conf *evictLeaderSchedulerConfig) clone() *evictLeaderSchedulerConfig { conf.RLock() defer conf.RUnlock() storeIDWithRanges := make(map[uint64][]core.KeyRange) @@ -460,7 +460,7 @@ func (handler *evictLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R } func (handler *evictLeaderHandler) listConfig(w http.ResponseWriter, _ *http.Request) { - conf := handler.config.Clone() + conf := handler.config.clone() handler.rd.JSON(w, http.StatusOK, conf) } diff --git a/pkg/schedule/schedulers/evict_leader_test.go b/pkg/schedule/schedulers/evict_leader_test.go index eb97be516d7..692dda63437 100644 --- a/pkg/schedule/schedulers/evict_leader_test.go +++ b/pkg/schedule/schedulers/evict_leader_test.go @@ -89,19 +89,19 @@ func TestConfigClone(t *testing.T) { re := require.New(t) emptyConf := &evictLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange)} - con2 := emptyConf.Clone() + con2 := emptyConf.clone() re.Empty(con2.getKeyRangesByID(1)) con2.StoreIDWithRanges[1], _ = getKeyRanges([]string{"a", "b", "c", "d"}) - con3 := con2.Clone() + con3 := con2.clone() re.Equal(len(con3.getRanges(1)), len(con2.getRanges(1))) con3.StoreIDWithRanges[1][0].StartKey = []byte("aaa") - con4 := con3.Clone() + con4 := con3.clone() re.True(bytes.Equal(con4.StoreIDWithRanges[1][0].StartKey, con3.StoreIDWithRanges[1][0].StartKey)) con4.Batch = 10 - con5 := con4.Clone() + con5 := con4.clone() re.Equal(con5.getBatch(), con4.getBatch()) } diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index 767a2ee40a0..206791900c6 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -76,7 +76,7 @@ func initEvictSlowTrendSchedulerConfig(storage endpoint.ConfigStorage) *evictSlo } } -func (conf *evictSlowTrendSchedulerConfig) Clone() *evictSlowTrendSchedulerConfig { +func (conf *evictSlowTrendSchedulerConfig) clone() *evictSlowTrendSchedulerConfig { conf.RLock() defer conf.RUnlock() return &evictSlowTrendSchedulerConfig{ @@ -266,7 +266,7 @@ func (handler *evictSlowTrendHandler) updateConfig(w http.ResponseWriter, r *htt } func (handler *evictSlowTrendHandler) listConfig(w http.ResponseWriter, _ *http.Request) { - conf := handler.config.Clone() + conf := handler.config.clone() handler.rd.JSON(w, http.StatusOK, conf) } @@ -276,6 +276,7 @@ type evictSlowTrendScheduler struct { handler http.Handler } +// GetNextInterval implements the Scheduler interface. func (s *evictSlowTrendScheduler) GetNextInterval(time.Duration) time.Duration { var growthType intervalGrowthType // If it already found a slow node as candidate, the next interval should be shorter diff --git a/pkg/schedule/schedulers/grant_hot_region.go b/pkg/schedule/schedulers/grant_hot_region.go index 38eadb10c41..15a520f95d0 100644 --- a/pkg/schedule/schedulers/grant_hot_region.go +++ b/pkg/schedule/schedulers/grant_hot_region.go @@ -90,7 +90,7 @@ func (conf *grantHotRegionSchedulerConfig) clone() *grantHotRegionSchedulerConfi } } -func (conf *grantHotRegionSchedulerConfig) Persist() error { +func (conf *grantHotRegionSchedulerConfig) persist() error { conf.RLock() defer conf.RUnlock() data, err := EncodeConfig(conf) @@ -215,7 +215,7 @@ func (handler *grantHotRegionHandler) updateConfig(w http.ResponseWriter, r *htt return } - if err = handler.config.Persist(); err != nil { + if err = handler.config.persist(); err != nil { handler.config.setStoreLeaderID(0) handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return @@ -239,6 +239,7 @@ func newGrantHotRegionHandler(config *grantHotRegionSchedulerConfig) http.Handle return router } +// Schedule implements the Scheduler interface. func (s *grantHotRegionScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { grantHotRegionCounter.Inc() typ := s.randomType() diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index 747a7ee6c0c..134eddda880 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -49,7 +49,7 @@ type grantLeaderSchedulerConfig struct { removeSchedulerCb func(name string) error } -func (conf *grantLeaderSchedulerConfig) BuildWithArgs(args []string) error { +func (conf *grantLeaderSchedulerConfig) buildWithArgs(args []string) error { if len(args) != 1 { return errs.ErrSchedulerConfig.FastGenByArgs("id") } @@ -285,7 +285,7 @@ func (handler *grantLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R args = append(args, handler.config.getRanges(id)...) } - err := handler.config.BuildWithArgs(args) + err := handler.config.buildWithArgs(args) if err != nil { handler.rd.JSON(w, http.StatusBadRequest, err.Error()) return diff --git a/pkg/schedule/schedulers/shuffle_hot_region.go b/pkg/schedule/schedulers/shuffle_hot_region.go index 71bc9107fdb..686322961cb 100644 --- a/pkg/schedule/schedulers/shuffle_hot_region.go +++ b/pkg/schedule/schedulers/shuffle_hot_region.go @@ -46,7 +46,7 @@ type shuffleHotRegionSchedulerConfig struct { Limit uint64 `json:"limit"` } -func (conf *shuffleHotRegionSchedulerConfig) Clone() *shuffleHotRegionSchedulerConfig { +func (conf *shuffleHotRegionSchedulerConfig) clone() *shuffleHotRegionSchedulerConfig { conf.RLock() defer conf.RUnlock() return &shuffleHotRegionSchedulerConfig{ @@ -238,7 +238,7 @@ func (handler *shuffleHotRegionHandler) updateConfig(w http.ResponseWriter, r *h } func (handler *shuffleHotRegionHandler) listConfig(w http.ResponseWriter, _ *http.Request) { - conf := handler.config.Clone() + conf := handler.config.clone() handler.rd.JSON(w, http.StatusOK, conf) } diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index 92edea1c82a..7b238890107 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -163,6 +163,7 @@ func newSplitBucketScheduler(opController *operator.Controller, conf *splitBucke return ret } +// ReloadConfig implement Scheduler interface. func (s *splitBucketScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() diff --git a/pkg/unsaferecovery/unsafe_recovery_controller.go b/pkg/unsaferecovery/unsafe_recovery_controller.go index 89cd6e6393c..aa3cf192270 100644 --- a/pkg/unsaferecovery/unsafe_recovery_controller.go +++ b/pkg/unsaferecovery/unsafe_recovery_controller.go @@ -751,31 +751,31 @@ type regionItem struct { // Less returns true if the region start key is less than the other. func (r *regionItem) Less(other *regionItem) bool { - left := r.Region().GetStartKey() - right := other.Region().GetStartKey() + left := r.region().GetStartKey() + right := other.region().GetStartKey() return bytes.Compare(left, right) < 0 } -func (r *regionItem) Contains(key []byte) bool { - start, end := r.Region().GetStartKey(), r.Region().GetEndKey() +func (r *regionItem) contains(key []byte) bool { + start, end := r.region().GetStartKey(), r.region().GetEndKey() return bytes.Compare(key, start) >= 0 && (len(end) == 0 || bytes.Compare(key, end) < 0) } -func (r *regionItem) Region() *metapb.Region { +func (r *regionItem) region() *metapb.Region { return r.report.GetRegionState().GetRegion() } -func (r *regionItem) IsInitialized() bool { - return len(r.Region().Peers) != 0 +func (r *regionItem) isInitialized() bool { + return len(r.region().Peers) != 0 } -func (r *regionItem) IsEpochStale(other *regionItem) bool { - re := r.Region().GetRegionEpoch() - oe := other.Region().GetRegionEpoch() +func (r *regionItem) isEpochStale(other *regionItem) bool { + re := r.region().GetRegionEpoch() + oe := other.region().GetRegionEpoch() return re.GetVersion() < oe.GetVersion() || (re.GetVersion() == oe.GetVersion() && re.GetConfVer() < oe.GetConfVer()) } -func (r *regionItem) IsRaftStale(origin *regionItem, u *Controller) bool { +func (r *regionItem) isRaftStale(origin *regionItem, u *Controller) bool { cmps := []func(a, b *regionItem) int{ func(a, b *regionItem) int { return int(a.report.GetRaftState().GetHardState().GetTerm()) - int(b.report.GetRaftState().GetHardState().GetTerm()) @@ -800,7 +800,7 @@ func (r *regionItem) IsRaftStale(origin *regionItem, u *Controller) bool { return 1 } // better use voter rather than learner - for _, peer := range a.Region().GetPeers() { + for _, peer := range a.region().GetPeers() { if peer.StoreId == a.storeID { if peer.Role == metapb.PeerRole_DemotingVoter || peer.Role == metapb.PeerRole_Learner { return -1 @@ -857,11 +857,11 @@ func (t *regionTree) getOverlaps(item *regionItem) []*regionItem { result = item } - end := item.Region().GetEndKey() + end := item.region().GetEndKey() var overlaps []*regionItem t.tree.AscendGreaterOrEqual(result, func(i *regionItem) bool { over := i - if len(end) > 0 && bytes.Compare(end, over.Region().GetStartKey()) <= 0 { + if len(end) > 0 && bytes.Compare(end, over.region().GetStartKey()) <= 0 { return false } overlaps = append(overlaps, over) @@ -878,7 +878,7 @@ func (t *regionTree) find(item *regionItem) *regionItem { return false }) - if result == nil || !result.Contains(item.Region().GetStartKey()) { + if result == nil || !result.contains(item.region().GetStartKey()) { return nil } @@ -891,15 +891,15 @@ func (t *regionTree) find(item *regionItem) *regionItem { func (t *regionTree) insert(item *regionItem) (bool, error) { overlaps := t.getOverlaps(item) - if t.contains(item.Region().GetId()) { + if t.contains(item.region().GetId()) { // it's ensured by the `buildUpFromReports` that only insert the latest peer of one region. - return false, errors.Errorf("region %v shouldn't be updated twice", item.Region().GetId()) + return false, errors.Errorf("region %v shouldn't be updated twice", item.region().GetId()) } for _, newer := range overlaps { - log.Info("unsafe recovery found overlap regions", logutil.ZapRedactStringer("newer-region-meta", core.RegionToHexMeta(newer.Region())), logutil.ZapRedactStringer("older-region-meta", core.RegionToHexMeta(item.Region()))) + log.Info("unsafe recovery found overlap regions", logutil.ZapRedactStringer("newer-region-meta", core.RegionToHexMeta(newer.region())), logutil.ZapRedactStringer("older-region-meta", core.RegionToHexMeta(item.region()))) // it's ensured by the `buildUpFromReports` that peers are inserted in epoch descending order. - if newer.IsEpochStale(item) { + if newer.isEpochStale(item) { return false, errors.Errorf("region %v's epoch shouldn't be staler than old ones %v", item, newer) } } @@ -907,7 +907,7 @@ func (t *regionTree) insert(item *regionItem) (bool, error) { return false, nil } - t.regions[item.Region().GetId()] = item + t.regions[item.region().GetId()] = item t.tree.ReplaceOrInsert(item) return true, nil } @@ -925,7 +925,7 @@ func (u *Controller) buildUpFromReports() (*regionTree, map[uint64][]*regionItem for storeID, storeReport := range u.storeReports { for _, peerReport := range storeReport.PeerReports { item := ®ionItem{report: peerReport, storeID: storeID} - peersMap[item.Region().GetId()] = append(peersMap[item.Region().GetId()], item) + peersMap[item.region().GetId()] = append(peersMap[item.region().GetId()], item) } } @@ -934,11 +934,11 @@ func (u *Controller) buildUpFromReports() (*regionTree, map[uint64][]*regionItem for _, peers := range peersMap { var latest *regionItem for _, peer := range peers { - if latest == nil || latest.IsEpochStale(peer) { + if latest == nil || latest.isEpochStale(peer) { latest = peer } } - if !latest.IsInitialized() { + if !latest.isInitialized() { // ignore the uninitialized peer continue } @@ -947,7 +947,7 @@ func (u *Controller) buildUpFromReports() (*regionTree, map[uint64][]*regionItem // sort in descending order of epoch sort.SliceStable(newestPeerReports, func(i, j int) bool { - return newestPeerReports[j].IsEpochStale(newestPeerReports[i]) + return newestPeerReports[j].isEpochStale(newestPeerReports[i]) }) newestRegionTree := newRegionTree() @@ -963,7 +963,7 @@ func (u *Controller) buildUpFromReports() (*regionTree, map[uint64][]*regionItem func (u *Controller) selectLeader(peersMap map[uint64][]*regionItem, region *metapb.Region) *regionItem { var leader *regionItem for _, peer := range peersMap[region.GetId()] { - if leader == nil || leader.IsRaftStale(peer, u) { + if leader == nil || leader.isRaftStale(peer, u) { leader = peer } } @@ -978,7 +978,7 @@ func (u *Controller) generateTombstoneTiFlashLearnerPlan(newestRegionTree *regio var err error newestRegionTree.tree.Ascend(func(item *regionItem) bool { - region := item.Region() + region := item.region() if !u.canElectLeader(region, false) { leader := u.selectLeader(peersMap, region) if leader == nil { @@ -1019,7 +1019,7 @@ func (u *Controller) generateForceLeaderPlan(newestRegionTree *regionTree, peers // considering the Failed stores newestRegionTree.tree.Ascend(func(item *regionItem) bool { report := item.report - region := item.Region() + region := item.region() if !u.canElectLeader(region, false) { if hasForceLeader(region) { // already is a force leader, skip @@ -1050,7 +1050,7 @@ func (u *Controller) generateForceLeaderPlan(newestRegionTree *regionTree, peers } if u.autoDetect { // For auto detect, the failedStores is empty. So need to add the detected Failed store to the list - for _, peer := range u.getFailedPeers(leader.Region()) { + for _, peer := range u.getFailedPeers(leader.region()) { found := false for _, store := range storeRecoveryPlan.ForceLeader.FailedStores { if store == peer.StoreId { @@ -1064,7 +1064,7 @@ func (u *Controller) generateForceLeaderPlan(newestRegionTree *regionTree, peers } } storeRecoveryPlan.ForceLeader.EnterForceLeaders = append(storeRecoveryPlan.ForceLeader.EnterForceLeaders, region.GetId()) - u.recordAffectedRegion(leader.Region()) + u.recordAffectedRegion(leader.region()) hasPlan = true } return true @@ -1104,7 +1104,7 @@ func (u *Controller) generateDemoteFailedVoterPlan(newestRegionTree *regionTree, // Check the regions in newest Region Tree to see if it can still elect leader // considering the Failed stores newestRegionTree.tree.Ascend(func(item *regionItem) bool { - region := item.Region() + region := item.region() if !u.canElectLeader(region, false) { leader := findForceLeader(peersMap, region) if leader == nil { @@ -1115,10 +1115,10 @@ func (u *Controller) generateDemoteFailedVoterPlan(newestRegionTree *regionTree, storeRecoveryPlan.Demotes = append(storeRecoveryPlan.Demotes, &pdpb.DemoteFailedVoters{ RegionId: region.GetId(), - FailedVoters: u.getFailedPeers(leader.Region()), + FailedVoters: u.getFailedPeers(leader.region()), }, ) - u.recordAffectedRegion(leader.Region()) + u.recordAffectedRegion(leader.region()) hasPlan = true } return true @@ -1181,7 +1181,7 @@ func (u *Controller) generateCreateEmptyRegionPlan(newestRegionTree *regionTree, lastEnd := []byte("") var lastStoreID uint64 newestRegionTree.tree.Ascend(func(item *regionItem) bool { - region := item.Region() + region := item.region() storeID := item.storeID if !bytes.Equal(region.StartKey, lastEnd) { if u.cluster.GetStore(storeID).IsTiFlash() { @@ -1200,16 +1200,16 @@ func (u *Controller) generateCreateEmptyRegionPlan(newestRegionTree *regionTree, // paranoid check: shouldn't overlap with any of the peers for _, peers := range peersMap { for _, peer := range peers { - if !peer.IsInitialized() { + if !peer.isInitialized() { continue } - if (bytes.Compare(newRegion.StartKey, peer.Region().StartKey) <= 0 && - (len(newRegion.EndKey) == 0 || bytes.Compare(peer.Region().StartKey, newRegion.EndKey) < 0)) || - ((len(peer.Region().EndKey) == 0 || bytes.Compare(newRegion.StartKey, peer.Region().EndKey) < 0) && - (len(newRegion.EndKey) == 0 || (len(peer.Region().EndKey) != 0 && bytes.Compare(peer.Region().EndKey, newRegion.EndKey) <= 0))) { + if (bytes.Compare(newRegion.StartKey, peer.region().StartKey) <= 0 && + (len(newRegion.EndKey) == 0 || bytes.Compare(peer.region().StartKey, newRegion.EndKey) < 0)) || + ((len(peer.region().EndKey) == 0 || bytes.Compare(newRegion.StartKey, peer.region().EndKey) < 0) && + (len(newRegion.EndKey) == 0 || (len(peer.region().EndKey) != 0 && bytes.Compare(peer.region().EndKey, newRegion.EndKey) <= 0))) { err = errors.Errorf( "Find overlap peer %v with newly created empty region %v", - logutil.RedactStringer(core.RegionToHexMeta(peer.Region())), + logutil.RedactStringer(core.RegionToHexMeta(peer.region())), logutil.RedactStringer(core.RegionToHexMeta(newRegion)), ) return false diff --git a/pkg/unsaferecovery/unsafe_recovery_controller_test.go b/pkg/unsaferecovery/unsafe_recovery_controller_test.go index cce38285212..4eeed08077c 100644 --- a/pkg/unsaferecovery/unsafe_recovery_controller_test.go +++ b/pkg/unsaferecovery/unsafe_recovery_controller_test.go @@ -1934,7 +1934,7 @@ func TestSelectLeader(t *testing.T) { Id: 1, } leader := recoveryController.selectLeader(peersMap, region) - re.Equal(leader.Region().Id, c.leaderID, "case: %d", i) + re.Equal(leader.region().Id, c.leaderID, "case: %d", i) } } diff --git a/server/api/admin.go b/server/api/admin.go index 2184dc66aa6..dfaecbec755 100644 --- a/server/api/admin.go +++ b/server/api/admin.go @@ -62,7 +62,7 @@ func (h *adminHandler) DeleteRegionCache(w http.ResponseWriter, r *http.Request) } rc.RemoveRegionIfExist(regionID) if h.svr.IsServiceIndependent(utils.SchedulingServiceName) { - err = h.DeleteRegionCacheInSchedulingServer(regionID) + err = h.deleteRegionCacheInSchedulingServer(regionID) } msg := "The region is removed from server cache." h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err)) @@ -102,7 +102,7 @@ func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Reques // Remove region from cache. rc.RemoveRegionIfExist(regionID) if h.svr.IsServiceIndependent(utils.SchedulingServiceName) { - err = h.DeleteRegionCacheInSchedulingServer(regionID) + err = h.deleteRegionCacheInSchedulingServer(regionID) } msg := "The region is removed from server cache and region meta storage." h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err)) @@ -118,7 +118,7 @@ func (h *adminHandler) DeleteAllRegionCache(w http.ResponseWriter, r *http.Reque rc := getCluster(r) rc.ResetRegionCache() if h.svr.IsServiceIndependent(utils.SchedulingServiceName) { - err = h.DeleteRegionCacheInSchedulingServer() + err = h.deleteRegionCacheInSchedulingServer() } msg := "All regions are removed from server cache." h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err)) @@ -148,7 +148,7 @@ func (h *adminHandler) SavePersistFile(w http.ResponseWriter, r *http.Request) { h.rd.Text(w, http.StatusOK, "") } -func (h *adminHandler) MarkSnapshotRecovering(w http.ResponseWriter, _ *http.Request) { +func (h *adminHandler) markSnapshotRecovering(w http.ResponseWriter, _ *http.Request) { if err := h.svr.MarkSnapshotRecovering(); err != nil { h.rd.Text(w, http.StatusInternalServerError, err.Error()) return @@ -156,7 +156,7 @@ func (h *adminHandler) MarkSnapshotRecovering(w http.ResponseWriter, _ *http.Req h.rd.Text(w, http.StatusOK, "") } -func (h *adminHandler) IsSnapshotRecovering(w http.ResponseWriter, r *http.Request) { +func (h *adminHandler) isSnapshotRecovering(w http.ResponseWriter, r *http.Request) { marked, err := h.svr.IsSnapshotRecovering(r.Context()) if err != nil { h.rd.Text(w, http.StatusInternalServerError, err.Error()) @@ -168,7 +168,7 @@ func (h *adminHandler) IsSnapshotRecovering(w http.ResponseWriter, r *http.Reque h.rd.JSON(w, http.StatusOK, &resStruct{Marked: marked}) } -func (h *adminHandler) UnmarkSnapshotRecovering(w http.ResponseWriter, r *http.Request) { +func (h *adminHandler) unmarkSnapshotRecovering(w http.ResponseWriter, r *http.Request) { if err := h.svr.UnmarkSnapshotRecovering(r.Context()); err != nil { h.rd.Text(w, http.StatusInternalServerError, err.Error()) return @@ -178,7 +178,7 @@ func (h *adminHandler) UnmarkSnapshotRecovering(w http.ResponseWriter, r *http.R // RecoverAllocID recover base alloc id // body should be in {"id": "123"} format -func (h *adminHandler) RecoverAllocID(w http.ResponseWriter, r *http.Request) { +func (h *adminHandler) recoverAllocID(w http.ResponseWriter, r *http.Request) { var input map[string]any if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil { return @@ -215,7 +215,7 @@ func (h *adminHandler) RecoverAllocID(w http.ResponseWriter, r *http.Request) { h.rd.Text(w, http.StatusOK, "") } -func (h *adminHandler) DeleteRegionCacheInSchedulingServer(id ...uint64) error { +func (h *adminHandler) deleteRegionCacheInSchedulingServer(id ...uint64) error { addr, ok := h.svr.GetServicePrimaryAddr(h.svr.Context(), utils.SchedulingServiceName) if !ok { return errs.ErrNotFoundSchedulingAddr.FastGenByArgs() diff --git a/server/api/config.go b/server/api/config.go index d280439a988..9f568221d89 100644 --- a/server/api/config.go +++ b/server/api/config.go @@ -64,7 +64,7 @@ func (h *confHandler) GetConfig(w http.ResponseWriter, r *http.Request) { cfg := h.svr.GetConfig() if h.svr.IsServiceIndependent(utils.SchedulingServiceName) && r.Header.Get(apiutil.XForbiddenForwardToMicroServiceHeader) != "true" { - schedulingServerConfig, err := h.GetSchedulingServerConfig() + schedulingServerConfig, err := h.getSchedulingServerConfig() if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return @@ -338,7 +338,7 @@ func getConfigMap(cfg map[string]any, key []string, value any) map[string]any { func (h *confHandler) GetScheduleConfig(w http.ResponseWriter, r *http.Request) { if h.svr.IsServiceIndependent(utils.SchedulingServiceName) && r.Header.Get(apiutil.XForbiddenForwardToMicroServiceHeader) != "true" { - cfg, err := h.GetSchedulingServerConfig() + cfg, err := h.getSchedulingServerConfig() if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return @@ -412,7 +412,7 @@ func (h *confHandler) SetScheduleConfig(w http.ResponseWriter, r *http.Request) func (h *confHandler) GetReplicationConfig(w http.ResponseWriter, r *http.Request) { if h.svr.IsServiceIndependent(utils.SchedulingServiceName) && r.Header.Get(apiutil.XForbiddenForwardToMicroServiceHeader) != "true" { - cfg, err := h.GetSchedulingServerConfig() + cfg, err := h.getSchedulingServerConfig() if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return @@ -562,7 +562,7 @@ func (h *confHandler) GetPDServerConfig(w http.ResponseWriter, _ *http.Request) h.rd.JSON(w, http.StatusOK, h.svr.GetPDServerConfig()) } -func (h *confHandler) GetSchedulingServerConfig() (*config.Config, error) { +func (h *confHandler) getSchedulingServerConfig() (*config.Config, error) { addr, ok := h.svr.GetServicePrimaryAddr(h.svr.Context(), utils.SchedulingServiceName) if !ok { return nil, errs.ErrNotFoundSchedulingAddr.FastGenByArgs() diff --git a/server/api/diagnostic.go b/server/api/diagnostic.go index 1a05b0d83b8..23016519dee 100644 --- a/server/api/diagnostic.go +++ b/server/api/diagnostic.go @@ -36,7 +36,7 @@ func newDiagnosticHandler(svr *server.Server, rd *render.Render) *diagnosticHand } } -func (h *diagnosticHandler) GetDiagnosticResult(w http.ResponseWriter, r *http.Request) { +func (h *diagnosticHandler) getDiagnosticResult(w http.ResponseWriter, r *http.Request) { name := mux.Vars(r)["name"] result, err := h.handler.GetDiagnosticResult(name) if err != nil { diff --git a/server/api/metric.go b/server/api/metric.go index 6a9dc4a7cde..a457895bddf 100644 --- a/server/api/metric.go +++ b/server/api/metric.go @@ -28,11 +28,11 @@ type queryMetric struct { s *server.Server } -func newQueryMetric(s *server.Server) *queryMetric { +func newqueryMetric(s *server.Server) *queryMetric { return &queryMetric{s: s} } -func (h *queryMetric) QueryMetric(w http.ResponseWriter, r *http.Request) { +func (h *queryMetric) queryMetric(w http.ResponseWriter, r *http.Request) { metricAddr := h.s.GetConfig().PDServerCfg.MetricStorage if metricAddr == "" { http.Error(w, "metric storage doesn't set", http.StatusInternalServerError) diff --git a/server/api/middleware.go b/server/api/middleware.go index 6536935592f..010889f08ce 100644 --- a/server/api/middleware.go +++ b/server/api/middleware.go @@ -88,7 +88,7 @@ func newClusterMiddleware(s *server.Server) clusterMiddleware { } } -func (m clusterMiddleware) Middleware(h http.Handler) http.Handler { +func (m clusterMiddleware) middleware(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { rc := m.s.GetRaftCluster() if rc == nil { diff --git a/server/api/plugin_disable.go b/server/api/plugin_disable.go index 289a140a4d6..596cddac5d7 100644 --- a/server/api/plugin_disable.go +++ b/server/api/plugin_disable.go @@ -30,12 +30,12 @@ func newPluginHandler(*server.Handler, *render.Render) *pluginHandler { return &pluginHandler{} } -func (*pluginHandler) LoadPlugin(w http.ResponseWriter, _ *http.Request) { +func (*pluginHandler) loadPlugin(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusNotImplemented) w.Write([]byte("load plugin is disabled, please `PLUGIN=1 $(MAKE) pd-server` first")) } -func (*pluginHandler) UnloadPlugin(w http.ResponseWriter, _ *http.Request) { +func (*pluginHandler) unloadPlugin(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusNotImplemented) w.Write([]byte("unload plugin is disabled, please `PLUGIN=1 $(MAKE) pd-server` first")) } diff --git a/server/api/region.go b/server/api/region.go index c6bc3d9e699..ae25d659544 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -553,7 +553,7 @@ const ( // @Failure 400 {string} string "The input is invalid." // @Router /regions/writeflow [get] func (h *regionsHandler) GetTopWriteFlowRegions(w http.ResponseWriter, r *http.Request) { - h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { return a.GetBytesWritten() < b.GetBytesWritten() }) + h.getTopNRegions(w, r, func(a, b *core.RegionInfo) bool { return a.GetBytesWritten() < b.GetBytesWritten() }) } // @Tags region @@ -564,7 +564,7 @@ func (h *regionsHandler) GetTopWriteFlowRegions(w http.ResponseWriter, r *http.R // @Failure 400 {string} string "The input is invalid." // @Router /regions/writequery [get] func (h *regionsHandler) GetTopWriteQueryRegions(w http.ResponseWriter, r *http.Request) { - h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { + h.getTopNRegions(w, r, func(a, b *core.RegionInfo) bool { return a.GetWriteQueryNum() < b.GetWriteQueryNum() }) } @@ -577,7 +577,7 @@ func (h *regionsHandler) GetTopWriteQueryRegions(w http.ResponseWriter, r *http. // @Failure 400 {string} string "The input is invalid." // @Router /regions/readflow [get] func (h *regionsHandler) GetTopReadFlowRegions(w http.ResponseWriter, r *http.Request) { - h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { return a.GetBytesRead() < b.GetBytesRead() }) + h.getTopNRegions(w, r, func(a, b *core.RegionInfo) bool { return a.GetBytesRead() < b.GetBytesRead() }) } // @Tags region @@ -588,7 +588,7 @@ func (h *regionsHandler) GetTopReadFlowRegions(w http.ResponseWriter, r *http.Re // @Failure 400 {string} string "The input is invalid." // @Router /regions/readquery [get] func (h *regionsHandler) GetTopReadQueryRegions(w http.ResponseWriter, r *http.Request) { - h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { + h.getTopNRegions(w, r, func(a, b *core.RegionInfo) bool { return a.GetReadQueryNum() < b.GetReadQueryNum() }) } @@ -601,7 +601,7 @@ func (h *regionsHandler) GetTopReadQueryRegions(w http.ResponseWriter, r *http.R // @Failure 400 {string} string "The input is invalid." // @Router /regions/confver [get] func (h *regionsHandler) GetTopConfVerRegions(w http.ResponseWriter, r *http.Request) { - h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { + h.getTopNRegions(w, r, func(a, b *core.RegionInfo) bool { return a.GetMeta().GetRegionEpoch().GetConfVer() < b.GetMeta().GetRegionEpoch().GetConfVer() }) } @@ -614,7 +614,7 @@ func (h *regionsHandler) GetTopConfVerRegions(w http.ResponseWriter, r *http.Req // @Failure 400 {string} string "The input is invalid." // @Router /regions/version [get] func (h *regionsHandler) GetTopVersionRegions(w http.ResponseWriter, r *http.Request) { - h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { + h.getTopNRegions(w, r, func(a, b *core.RegionInfo) bool { return a.GetMeta().GetRegionEpoch().GetVersion() < b.GetMeta().GetRegionEpoch().GetVersion() }) } @@ -627,7 +627,7 @@ func (h *regionsHandler) GetTopVersionRegions(w http.ResponseWriter, r *http.Req // @Failure 400 {string} string "The input is invalid." // @Router /regions/size [get] func (h *regionsHandler) GetTopSizeRegions(w http.ResponseWriter, r *http.Request) { - h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { + h.getTopNRegions(w, r, func(a, b *core.RegionInfo) bool { return a.GetApproximateSize() < b.GetApproximateSize() }) } @@ -640,7 +640,7 @@ func (h *regionsHandler) GetTopSizeRegions(w http.ResponseWriter, r *http.Reques // @Failure 400 {string} string "The input is invalid." // @Router /regions/keys [get] func (h *regionsHandler) GetTopKeysRegions(w http.ResponseWriter, r *http.Request) { - h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { + h.getTopNRegions(w, r, func(a, b *core.RegionInfo) bool { return a.GetApproximateKeys() < b.GetApproximateKeys() }) } @@ -653,7 +653,7 @@ func (h *regionsHandler) GetTopKeysRegions(w http.ResponseWriter, r *http.Reques // @Failure 400 {string} string "The input is invalid." // @Router /regions/cpu [get] func (h *regionsHandler) GetTopCPURegions(w http.ResponseWriter, r *http.Request) { - h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { + h.getTopNRegions(w, r, func(a, b *core.RegionInfo) bool { return a.GetCPUUsage() < b.GetCPUUsage() }) } @@ -740,7 +740,7 @@ func (h *regionsHandler) AccelerateRegionsScheduleInRanges(w http.ResponseWriter h.rd.Text(w, http.StatusOK, msgBuilder.String()) } -func (h *regionsHandler) GetTopNRegions(w http.ResponseWriter, r *http.Request, less func(a, b *core.RegionInfo) bool) { +func (h *regionsHandler) getTopNRegions(w http.ResponseWriter, r *http.Request, less func(a, b *core.RegionInfo) bool) { rc := getCluster(r) limit, err := h.AdjustLimit(r.URL.Query().Get("limit")) if err != nil { diff --git a/server/api/router.go b/server/api/router.go index 7aef165b267..0e129706b43 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -126,7 +126,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { apiRouter := rootRouter.PathPrefix(apiPrefix).Subrouter() clusterRouter := apiRouter.NewRoute().Subrouter() - clusterRouter.Use(newClusterMiddleware(svr).Middleware) + clusterRouter.Use(newClusterMiddleware(svr).middleware) escapeRouter := clusterRouter.NewRoute().Subrouter().UseEncodedPath() @@ -149,11 +149,11 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(apiRouter, "/schedulers/{name}", schedulerHandler.PauseOrResumeScheduler, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) diagnosticHandler := newDiagnosticHandler(svr, rd) - registerFunc(clusterRouter, "/schedulers/diagnostic/{name}", diagnosticHandler.GetDiagnosticResult, setMethods(http.MethodGet), setAuditBackend(prometheus)) + registerFunc(clusterRouter, "/schedulers/diagnostic/{name}", diagnosticHandler.getDiagnosticResult, setMethods(http.MethodGet), setAuditBackend(prometheus)) schedulerConfigHandler := newSchedulerConfigHandler(svr, rd) - registerPrefix(apiRouter, "/scheduler-config", "HandleSchedulerConfig", schedulerConfigHandler.HandleSchedulerConfig, setMethods(http.MethodPost, http.MethodDelete, http.MethodPut, http.MethodPatch), setAuditBackend(localLog, prometheus)) - registerPrefix(apiRouter, "/scheduler-config", "GetSchedulerConfig", schedulerConfigHandler.HandleSchedulerConfig, setMethods(http.MethodGet), setAuditBackend(prometheus)) + registerPrefix(apiRouter, "/scheduler-config", "HandleSchedulerConfig", schedulerConfigHandler.handleSchedulerConfig, setMethods(http.MethodPost, http.MethodDelete, http.MethodPut, http.MethodPatch), setAuditBackend(localLog, prometheus)) + registerPrefix(apiRouter, "/scheduler-config", "GetSchedulerConfig", schedulerConfigHandler.handleSchedulerConfig, setMethods(http.MethodGet), setAuditBackend(prometheus)) clusterHandler := newClusterHandler(svr, rd) registerFunc(apiRouter, "/cluster", clusterHandler.GetCluster, setMethods(http.MethodGet), setAuditBackend(prometheus)) @@ -177,7 +177,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { rulesHandler := newRulesHandler(svr, rd) ruleRouter := clusterRouter.NewRoute().Subrouter() - ruleRouter.Use(newRuleMiddleware(svr, rd).Middleware) + ruleRouter.Use(newRuleMiddleware(svr, rd).middleware) registerFunc(ruleRouter, "/config/rules", rulesHandler.GetAllRules, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(ruleRouter, "/config/rules", rulesHandler.SetAllRules, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) registerFunc(ruleRouter, "/config/rules/batch", rulesHandler.BatchRules, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) @@ -310,10 +310,10 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(clusterRouter, "/admin/storage/region/{id}", adminHandler.DeleteRegionStorage, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus)) registerFunc(clusterRouter, "/admin/cache/regions", adminHandler.DeleteAllRegionCache, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus)) registerFunc(apiRouter, "/admin/persist-file/{file_name}", adminHandler.SavePersistFile, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) - registerFunc(apiRouter, "/admin/cluster/markers/snapshot-recovering", adminHandler.IsSnapshotRecovering, setMethods(http.MethodGet), setAuditBackend(localLog, prometheus)) - registerFunc(apiRouter, "/admin/cluster/markers/snapshot-recovering", adminHandler.MarkSnapshotRecovering, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) - registerFunc(apiRouter, "/admin/cluster/markers/snapshot-recovering", adminHandler.UnmarkSnapshotRecovering, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus)) - registerFunc(apiRouter, "/admin/base-alloc-id", adminHandler.RecoverAllocID, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) + registerFunc(apiRouter, "/admin/cluster/markers/snapshot-recovering", adminHandler.isSnapshotRecovering, setMethods(http.MethodGet), setAuditBackend(localLog, prometheus)) + registerFunc(apiRouter, "/admin/cluster/markers/snapshot-recovering", adminHandler.markSnapshotRecovering, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) + registerFunc(apiRouter, "/admin/cluster/markers/snapshot-recovering", adminHandler.unmarkSnapshotRecovering, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus)) + registerFunc(apiRouter, "/admin/base-alloc-id", adminHandler.recoverAllocID, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) serviceMiddlewareHandler := newServiceMiddlewareHandler(svr, rd) registerFunc(apiRouter, "/service-middleware/config", serviceMiddlewareHandler.GetServiceMiddlewareConfig, setMethods(http.MethodGet), setAuditBackend(prometheus)) @@ -327,16 +327,16 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(clusterRouter, "/replication_mode/status", replicationModeHandler.GetReplicationModeStatus, setAuditBackend(prometheus)) pluginHandler := newPluginHandler(handler, rd) - registerFunc(apiRouter, "/plugin", pluginHandler.LoadPlugin, setMethods(http.MethodPost), setAuditBackend(prometheus)) - registerFunc(apiRouter, "/plugin", pluginHandler.UnloadPlugin, setMethods(http.MethodDelete), setAuditBackend(prometheus)) + registerFunc(apiRouter, "/plugin", pluginHandler.loadPlugin, setMethods(http.MethodPost), setAuditBackend(prometheus)) + registerFunc(apiRouter, "/plugin", pluginHandler.unloadPlugin, setMethods(http.MethodDelete), setAuditBackend(prometheus)) healthHandler := newHealthHandler(svr, rd) registerFunc(apiRouter, "/health", healthHandler.GetHealthStatus, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(apiRouter, "/ping", healthHandler.Ping, setMethods(http.MethodGet), setAuditBackend(prometheus)) // metric query use to query metric data, the protocol is compatible with prometheus. - registerFunc(apiRouter, "/metric/query", newQueryMetric(svr).QueryMetric, setMethods(http.MethodGet, http.MethodPost), setAuditBackend(prometheus)) - registerFunc(apiRouter, "/metric/query_range", newQueryMetric(svr).QueryMetric, setMethods(http.MethodGet, http.MethodPost), setAuditBackend(prometheus)) + registerFunc(apiRouter, "/metric/query", newqueryMetric(svr).queryMetric, setMethods(http.MethodGet, http.MethodPost), setAuditBackend(prometheus)) + registerFunc(apiRouter, "/metric/query_range", newqueryMetric(svr).queryMetric, setMethods(http.MethodGet, http.MethodPost), setAuditBackend(prometheus)) pprofHandler := newPprofHandler(svr, rd) // profile API diff --git a/server/api/rule.go b/server/api/rule.go index bdb3db2016d..27a95a08269 100644 --- a/server/api/rule.go +++ b/server/api/rule.go @@ -57,7 +57,7 @@ func newRuleMiddleware(s *server.Server, rd *render.Render) ruleMiddleware { } } -func (m ruleMiddleware) Middleware(h http.Handler) http.Handler { +func (m ruleMiddleware) middleware(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { manager, err := m.GetRuleManager() if err == errs.ErrPlacementDisabled { diff --git a/server/api/scheduler.go b/server/api/scheduler.go index 306f67ae058..1d502013558 100644 --- a/server/api/scheduler.go +++ b/server/api/scheduler.go @@ -259,7 +259,7 @@ func newSchedulerConfigHandler(svr *server.Server, rd *render.Render) *scheduler } } -func (h *schedulerConfigHandler) HandleSchedulerConfig(w http.ResponseWriter, r *http.Request) { +func (h *schedulerConfigHandler) handleSchedulerConfig(w http.ResponseWriter, r *http.Request) { handler := h.svr.GetHandler() sh, err := handler.GetSchedulerConfigHandler() if err == nil && sh != nil { diff --git a/server/api/server_test.go b/server/api/server_test.go index 7ebee9bcdfe..af41905ad86 100644 --- a/server/api/server_test.go +++ b/server/api/server_test.go @@ -242,7 +242,7 @@ func (suite *serviceTestSuite) TestServiceLabels() { apiutil.NewAccessPath("/pd/api/v1/leader/resign", "")) re.Equal("", serviceLabel) - accessPaths = suite.svr.GetServiceLabels("QueryMetric") + accessPaths = suite.svr.GetServiceLabels("queryMetric") re.Len(accessPaths, 4) sort.Slice(accessPaths, func(i, j int) bool { if accessPaths[i].Path == accessPaths[j].Path { @@ -260,10 +260,10 @@ func (suite *serviceTestSuite) TestServiceLabels() { re.Equal(http.MethodPost, accessPaths[3].Method) serviceLabel = suite.svr.GetAPIAccessServiceLabel( apiutil.NewAccessPath("/pd/api/v1/metric/query", http.MethodPost)) - re.Equal("QueryMetric", serviceLabel) + re.Equal("queryMetric", serviceLabel) serviceLabel = suite.svr.GetAPIAccessServiceLabel( apiutil.NewAccessPath("/pd/api/v1/metric/query", http.MethodGet)) - re.Equal("QueryMetric", serviceLabel) + re.Equal("queryMetric", serviceLabel) } func (suite *adminTestSuite) TestCleanPath() { diff --git a/server/forward.go b/server/forward.go index 5c49b871020..c407e545f6f 100644 --- a/server/forward.go +++ b/server/forward.go @@ -332,7 +332,7 @@ func forwardReportBucketClientToServer(forwardStream pdpb.PD_ReportBucketsClient errCh <- errors.WithStack(err) return } - if err := server.Send(resp); err != nil { + if err := server.send(resp); err != nil { errCh <- errors.WithStack(err) return } diff --git a/server/grpc_service.go b/server/grpc_service.go index fa9156e884e..fee5e0e3355 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1024,7 +1024,7 @@ type bucketHeartbeatServer struct { closed int32 } -func (b *bucketHeartbeatServer) Send(bucket *pdpb.ReportBucketsResponse) error { +func (b *bucketHeartbeatServer) send(bucket *pdpb.ReportBucketsResponse) error { if atomic.LoadInt32(&b.closed) == 1 { return status.Errorf(codes.Canceled, "stream is closed") } @@ -1047,7 +1047,7 @@ func (b *bucketHeartbeatServer) Send(bucket *pdpb.ReportBucketsResponse) error { } } -func (b *bucketHeartbeatServer) Recv() (*pdpb.ReportBucketsRequest, error) { +func (b *bucketHeartbeatServer) recv() (*pdpb.ReportBucketsRequest, error) { if atomic.LoadInt32(&b.closed) == 1 { return nil, io.EOF } @@ -1083,7 +1083,7 @@ func (s *GrpcServer) ReportBuckets(stream pdpb.PD_ReportBucketsServer) error { } } for { - request, err := server.Recv() + request, err := server.recv() failpoint.Inject("grpcClientClosed", func() { err = status.Error(codes.Canceled, "grpc client closed") request = nil @@ -1132,7 +1132,7 @@ func (s *GrpcServer) ReportBuckets(stream pdpb.PD_ReportBucketsServer) error { resp := &pdpb.ReportBucketsResponse{ Header: s.notBootstrappedHeader(), } - err := server.Send(resp) + err := server.send(resp) return errors.WithStack(err) } if err := s.validateRequest(request.GetHeader()); err != nil { diff --git a/tests/cluster.go b/tests/cluster.go index c7368fe3c3a..2aa783590df 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -810,7 +810,7 @@ func (c *TestCluster) HandleReportBuckets(b *metapb.Buckets) error { // Join is used to add a new TestServer into the cluster. func (c *TestCluster) Join(ctx context.Context, opts ...ConfigOption) (*TestServer, error) { - conf, err := c.config.Join().Generate(opts...) + conf, err := c.config.join().Generate(opts...) if err != nil { return nil, err } @@ -824,7 +824,7 @@ func (c *TestCluster) Join(ctx context.Context, opts ...ConfigOption) (*TestServ // JoinAPIServer is used to add a new TestAPIServer into the cluster. func (c *TestCluster) JoinAPIServer(ctx context.Context, opts ...ConfigOption) (*TestServer, error) { - conf, err := c.config.Join().Generate(opts...) + conf, err := c.config.join().Generate(opts...) if err != nil { return nil, err } diff --git a/tests/config.go b/tests/config.go index a162a02009c..62dd5f08c1f 100644 --- a/tests/config.go +++ b/tests/config.go @@ -47,6 +47,7 @@ func newServerConfig(name string, cc *clusterConfig, join bool) *serverConfig { } } +// Generate generates a config for the server. func (c *serverConfig) Generate(opts ...ConfigOption) (*config.Config, error) { arguments := []string{ "--name=" + c.Name, @@ -57,9 +58,9 @@ func (c *serverConfig) Generate(opts ...ConfigOption) (*config.Config, error) { "--advertise-peer-urls=" + c.AdvertisePeerURLs, } if c.Join { - arguments = append(arguments, "--join="+c.ClusterConfig.GetJoinAddr()) + arguments = append(arguments, "--join="+c.ClusterConfig.getJoinAddr()) } else { - arguments = append(arguments, "--initial-cluster="+c.ClusterConfig.GetServerAddrs()) + arguments = append(arguments, "--initial-cluster="+c.ClusterConfig.getServerAddrs()) } flagSet := pflag.NewFlagSet("test", pflag.ContinueOnError) @@ -110,7 +111,7 @@ func newClusterConfig(n int) *clusterConfig { return &cc } -func (c *clusterConfig) Join() *serverConfig { +func (c *clusterConfig) join() *serverConfig { sc := newServerConfig(c.nextServerName(), c, true) c.JoinServers = append(c.JoinServers, sc) return sc @@ -120,7 +121,7 @@ func (c *clusterConfig) nextServerName() string { return fmt.Sprintf("pd%d", len(c.InitialServers)+len(c.JoinServers)+1) } -func (c *clusterConfig) GetServerAddrs() string { +func (c *clusterConfig) getServerAddrs() string { addrs := make([]string, 0, len(c.InitialServers)) for _, s := range c.InitialServers { addrs = append(addrs, fmt.Sprintf("%s=%s", s.Name, s.PeerURLs)) @@ -128,14 +129,16 @@ func (c *clusterConfig) GetServerAddrs() string { return strings.Join(addrs, ",") } -func (c *clusterConfig) GetJoinAddr() string { +func (c *clusterConfig) getJoinAddr() string { return c.InitialServers[0].PeerURLs } +// GetClientURL returns the client URL of the cluster. func (c *clusterConfig) GetClientURL() string { return c.InitialServers[0].ClientURLs } +// GetClientURLs returns all client URLs of the cluster. func (c *clusterConfig) GetClientURLs() []string { urls := make([]string, 0, len(c.InitialServers)) for _, svr := range c.InitialServers { diff --git a/tests/testutil.go b/tests/testutil.go index c895d206c05..2989ba7b010 100644 --- a/tests/testutil.go +++ b/tests/testutil.go @@ -50,6 +50,7 @@ import ( ) var ( + // TestDialClient is a http client for test. TestDialClient = &http.Client{ Transport: &http.Transport{ DisableKeepAlives: true, @@ -286,11 +287,15 @@ func MustReportBuckets(re *require.Assertions, cluster *TestCluster, regionID ui return buckets } +// SchedulerMode is used for test purpose. type SchedulerMode int const ( + // Both represents both PD mode and API mode. Both SchedulerMode = iota + // PDMode represents PD mode. PDMode + // APIMode represents API mode. APIMode )