diff --git a/client/Makefile b/client/Makefile index d2ed116fa79..b51f6805328 100644 --- a/client/Makefile +++ b/client/Makefile @@ -31,9 +31,10 @@ install-tools: cd .. && $(MAKE) install-tools static: install-tools + @ echo "gofmt ..." @ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }' @ echo "golangci-lint ..." - @ golangci-lint run -c ../.golangci.yml --verbose ./... --allow-parallel-runners + @ golangci-lint run -c ../.golangci.yml --verbose ./... @ echo "revive ..." @ revive -formatter friendly -config ../revive.toml ./... diff --git a/client/errs/errno.go b/client/errs/errno.go index 708265ef0dd..ec9374141e9 100644 --- a/client/errs/errno.go +++ b/client/errs/errno.go @@ -33,24 +33,27 @@ const ( // Note: keep the same as the ones defined on the server side, because the client side checks if an error message // contains this string to judge whether the leader is changed. NotServedErr = "is not served" - // RetryTimeoutErr indicates the request is timeout. + // RetryTimeoutErr indicates the server is busy. RetryTimeoutErr = "retry timeout" ) // client errors var ( - ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient")) - ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream")) - ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed")) - ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout")) - ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO")) - ErrClientGetLeader = errors.Normalize("get leader from %v error", errors.RFCCodeText("PD:client:ErrClientGetLeader")) - ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember")) - ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo")) - ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember")) - ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal")) - ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse")) - ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint")) + ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient")) + ErrClientGetMetaStorageClient = errors.Normalize("failed to get meta storage client", errors.RFCCodeText("PD:client:ErrClientGetMetaStorageClient")) + ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream")) + ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed")) + ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout")) + ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO")) + ErrClientGetLeader = errors.Normalize("get leader from %v error", errors.RFCCodeText("PD:client:ErrClientGetLeader")) + ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember")) + ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo")) + ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember")) + ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal")) + ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse")) + ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint")) + ErrClientFindGroupByKeyspaceID = errors.Normalize("can't find keyspace group by keyspace id", errors.RFCCodeText("PD:client:ErrClientFindGroupByKeyspaceID")) + ErrClientWatchGCSafePointV2Stream = errors.Normalize("watch gc safe point v2 stream failed, %s", errors.RFCCodeText("PD:client:ErrClientWatchGCSafePointV2Stream")) ) // grpcutil errors diff --git a/client/go.mod b/client/go.mod index 04508ed8e75..1e0d7ff93f4 100644 --- a/client/go.mod +++ b/client/go.mod @@ -3,6 +3,7 @@ module github.com/tikv/pd/client go 1.20 require ( + github.com/BurntSushi/toml v0.3.1 github.com/elastic/gosigar v0.14.2 github.com/gogo/protobuf v1.3.2 github.com/opentracing/opentracing-go v1.2.0 diff --git a/client/go.sum b/client/go.sum index 86b99315ecc..8b35ed71bd8 100644 --- a/client/go.sum +++ b/client/go.sum @@ -1,4 +1,5 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/client/meta_storage_client.go b/client/meta_storage_client.go index 4b2e56a4e8b..b203fb914d3 100644 --- a/client/meta_storage_client.go +++ b/client/meta_storage_client.go @@ -20,11 +20,11 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/meta_storagepb" - "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" + "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/grpcutil" - "go.uber.org/zap" ) // MetaStorageClient is the interface for meta storage client. @@ -125,7 +125,12 @@ func (c *client) Put(ctx context.Context, key, value []byte, opts ...OpOption) ( PrevKv: options.prevKv, } ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - resp, err := c.metaStorageClient().Put(ctx, req) + cli := c.metaStorageClient() + if cli == nil { + cancel() + return nil, errs.ErrClientGetMetaStorageClient + } + resp, err := cli.Put(ctx, req) cancel() if err = c.respForMetaStorageErr(cmdFailedDurationPut, start, err, resp.GetHeader()); err != nil { @@ -158,7 +163,12 @@ func (c *client) Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_s Revision: options.revision, } ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - resp, err := c.metaStorageClient().Get(ctx, req) + cli := c.metaStorageClient() + if cli == nil { + cancel() + return nil, errs.ErrClientGetMetaStorageClient + } + resp, err := cli.Get(ctx, req) cancel() if err = c.respForMetaStorageErr(cmdFailedDurationGet, start, err, resp.GetHeader()); err != nil { @@ -177,7 +187,11 @@ func (c *client) Watch(ctx context.Context, key []byte, opts ...OpOption) (chan options.rangeEnd = getPrefix(key) } - res, err := c.metaStorageClient().Watch(ctx, &meta_storagepb.WatchRequest{ + cli := c.metaStorageClient() + if cli == nil { + return nil, errs.ErrClientGetMetaStorageClient + } + res, err := cli.Watch(ctx, &meta_storagepb.WatchRequest{ Key: key, RangeEnd: options.rangeEnd, StartRevision: options.revision, @@ -190,13 +204,12 @@ func (c *client) Watch(ctx context.Context, key []byte, opts ...OpOption) (chan go func() { defer func() { close(eventCh) - if r := recover(); r != nil { - log.Error("[pd] panic in client `Watch`", zap.Any("error", r)) - return - } }() for { resp, err := res.Recv() + failpoint.Inject("watchStreamError", func() { + err = errors.Errorf("fake error") + }) if err != nil { return } diff --git a/client/resource_group/controller/config.go b/client/resource_group/controller/config.go index 9c9ff74baa5..e3241df3d83 100644 --- a/client/resource_group/controller/config.go +++ b/client/resource_group/controller/config.go @@ -52,7 +52,7 @@ const ( // According to the resource control Grafana panel and Prometheus sampling period, the period should be the factor of 15. defaultTargetPeriod = 5 * time.Second // defaultMaxWaitDuration is the max duration to wait for the token before throwing error. - defaultMaxWaitDuration = time.Second + defaultMaxWaitDuration = 30 * time.Second ) const ( @@ -67,23 +67,27 @@ const ( // Because the resource manager has not been deployed in microservice mode, // do not enable this function. - defaultDegradedModeWaitDuration = "0s" + defaultDegradedModeWaitDuration = 0 ) -// ControllerConfig is the configuration of the resource manager controller which includes some option for client needed. -type ControllerConfig struct { +// Config is the configuration of the resource manager controller which includes some option for client needed. +type Config struct { // EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect. - DegradedModeWaitDuration string `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"` + DegradedModeWaitDuration Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"` + + // LTBMaxWaitDuration is the max wait time duration for local token bucket. + LTBMaxWaitDuration Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"` // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"` } -// DefaultControllerConfig returns the default resource manager controller configuration. -func DefaultControllerConfig() *ControllerConfig { - return &ControllerConfig{ - DegradedModeWaitDuration: defaultDegradedModeWaitDuration, +// DefaultConfig returns the default resource manager controller configuration. +func DefaultConfig() *Config { + return &Config{ + DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration), + LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration), RequestUnit: DefaultRequestUnitConfig(), } } @@ -117,10 +121,10 @@ func DefaultRequestUnitConfig() RequestUnitConfig { } } -// Config is the configuration of the resource units, which gives the read/write request +// RUConfig is the configuration of the resource units, which gives the read/write request // units or request resource cost standards. It should be calculated by a given `RequestUnitConfig` // or `RequestResourceConfig`. -type Config struct { +type RUConfig struct { // RU model config ReadBaseCost RequestUnit ReadBytesCost RequestUnit @@ -128,33 +132,29 @@ type Config struct { WriteBytesCost RequestUnit CPUMsCost RequestUnit // The CPU statistics need to distinguish between different environments. - isSingleGroupByKeyspace bool - maxWaitDuration time.Duration + isSingleGroupByKeyspace bool + + // some config for client + LTBMaxWaitDuration time.Duration DegradedModeWaitDuration time.Duration } -// DefaultConfig returns the default configuration. -func DefaultConfig() *Config { - return GenerateConfig( - DefaultControllerConfig(), +// DefaultRUConfig returns the default configuration. +func DefaultRUConfig() *RUConfig { + return GenerateRUConfig( + DefaultConfig(), ) } -// GenerateConfig generates the configuration by the given request unit configuration. -func GenerateConfig(config *ControllerConfig) *Config { - cfg := &Config{ - ReadBaseCost: RequestUnit(config.RequestUnit.ReadBaseCost), - ReadBytesCost: RequestUnit(config.RequestUnit.ReadCostPerByte), - WriteBaseCost: RequestUnit(config.RequestUnit.WriteBaseCost), - WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte), - CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost), - maxWaitDuration: defaultMaxWaitDuration, - } - duration, err := time.ParseDuration(config.DegradedModeWaitDuration) - if err != nil { - cfg.DegradedModeWaitDuration, _ = time.ParseDuration(defaultDegradedModeWaitDuration) - } else { - cfg.DegradedModeWaitDuration = duration +// GenerateRUConfig generates the configuration by the given request unit configuration. +func GenerateRUConfig(config *Config) *RUConfig { + return &RUConfig{ + ReadBaseCost: RequestUnit(config.RequestUnit.ReadBaseCost), + ReadBytesCost: RequestUnit(config.RequestUnit.ReadCostPerByte), + WriteBaseCost: RequestUnit(config.RequestUnit.WriteBaseCost), + WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte), + CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost), + LTBMaxWaitDuration: config.LTBMaxWaitDuration.Duration, + DegradedModeWaitDuration: config.DegradedModeWaitDuration.Duration, } - return cfg } diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 6a08756e85d..a9dfd610403 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -22,8 +22,10 @@ import ( "sync/atomic" "time" + "github.com/gogo/protobuf/proto" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/meta_storagepb" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" @@ -39,6 +41,8 @@ const ( maxNotificationChanLen = 200 needTokensAmplification = 1.1 trickleReserveDuration = 1250 * time.Millisecond + + watchRetryInterval = 30 * time.Second ) type selectType int @@ -58,13 +62,16 @@ type ResourceGroupKVInterceptor interface { // ResourceGroupProvider provides some api to interact with resource manager server。 type ResourceGroupProvider interface { - ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) GetResourceGroup(ctx context.Context, resourceGroupName string) (*rmpb.ResourceGroup, error) AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) - LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]pd.GlobalConfigItem, int64, error) + LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) + + // meta storage client + Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error) + Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error) } // ResourceControlCreateOption create a ResourceGroupsController with the optional settings. @@ -73,25 +80,25 @@ type ResourceControlCreateOption func(controller *ResourceGroupsController) // EnableSingleGroupByKeyspace is the option to enable single group by keyspace feature. func EnableSingleGroupByKeyspace() ResourceControlCreateOption { return func(controller *ResourceGroupsController) { - controller.config.isSingleGroupByKeyspace = true + controller.ruConfig.isSingleGroupByKeyspace = true } } // WithMaxWaitDuration is the option to set the max wait duration for acquiring token buckets. func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption { return func(controller *ResourceGroupsController) { - controller.config.maxWaitDuration = d + controller.ruConfig.LTBMaxWaitDuration = d } } var _ ResourceGroupKVInterceptor = (*ResourceGroupsController)(nil) -// ResourceGroupsController impls ResourceGroupKVInterceptor. +// ResourceGroupsController implements ResourceGroupKVInterceptor. type ResourceGroupsController struct { clientUniqueID uint64 provider ResourceGroupProvider groupsController sync.Map - config *Config + ruConfig *RUConfig loopCtx context.Context loopCancel func() @@ -113,6 +120,11 @@ type ResourceGroupsController struct { // Currently, we don't do multiple `AcquireTokenBuckets`` at the same time, so there are no concurrency problems with `currentRequests`. currentRequests []*rmpb.TokenBucketRequest } + + opts []ResourceControlCreateOption + + // a cache for ru config and make concurrency safe. + safeRuConfig atomic.Pointer[RUConfig] } // NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor @@ -123,49 +135,53 @@ func NewResourceGroupController( requestUnitConfig *RequestUnitConfig, opts ...ResourceControlCreateOption, ) (*ResourceGroupsController, error) { - controllerConfig, err := loadServerConfig(ctx, provider) + config, err := loadServerConfig(ctx, provider) if err != nil { return nil, err } if requestUnitConfig != nil { - controllerConfig.RequestUnit = *requestUnitConfig + config.RequestUnit = *requestUnitConfig } - config := GenerateConfig(controllerConfig) + + ruConfig := GenerateRUConfig(config) controller := &ResourceGroupsController{ clientUniqueID: clientUniqueID, provider: provider, - config: config, + ruConfig: ruConfig, lowTokenNotifyChan: make(chan struct{}, 1), tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1), tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen), + opts: opts, } for _, opt := range opts { opt(controller) } - controller.calculators = []ResourceCalculator{newKVCalculator(controller.config), newSQLCalculator(controller.config)} + log.Info("load resource controller config", zap.Reflect("config", config), zap.Reflect("ru-config", controller.ruConfig)) + controller.calculators = []ResourceCalculator{newKVCalculator(controller.ruConfig), newSQLCalculator(controller.ruConfig)} + controller.safeRuConfig.Store(controller.ruConfig) return controller, nil } -func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*ControllerConfig, error) { - items, _, err := provider.LoadGlobalConfig(ctx, nil, controllerConfigPath) +func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Config, error) { + resp, err := provider.Get(ctx, []byte(controllerConfigPath)) if err != nil { return nil, err } - if len(items) == 0 { + if len(resp.Kvs) == 0 { log.Warn("[resource group controller] server does not save config, load config failed") - return DefaultControllerConfig(), nil + return DefaultConfig(), nil } - controllerConfig := &ControllerConfig{} - err = json.Unmarshal(items[0].PayLoad, controllerConfig) + config := &Config{} + err = json.Unmarshal(resp.Kvs[0].GetValue(), config) if err != nil { return nil, err } - return controllerConfig, nil + return config, nil } -// GetConfig returns the config of controller. It's only used for test. -func (c *ResourceGroupsController) GetConfig() *Config { - return c.config +// GetConfig returns the config of controller. +func (c *ResourceGroupsController) GetConfig() *RUConfig { + return c.safeRuConfig.Load() } // Source List @@ -178,8 +194,8 @@ const ( func (c *ResourceGroupsController) Start(ctx context.Context) { c.loopCtx, c.loopCancel = context.WithCancel(ctx) go func() { - if c.config.DegradedModeWaitDuration > 0 { - c.run.responseDeadline = time.NewTimer(c.config.DegradedModeWaitDuration) + if c.ruConfig.DegradedModeWaitDuration > 0 { + c.run.responseDeadline = time.NewTimer(c.ruConfig.DegradedModeWaitDuration) c.run.responseDeadline.Stop() defer c.run.responseDeadline.Stop() } @@ -203,8 +219,65 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { stateUpdateTicker = time.NewTicker(time.Millisecond * 100) }) + _, metaRevision, err := c.provider.LoadResourceGroups(ctx) + if err != nil { + log.Warn("load resource group revision failed", zap.Error(err)) + } + resp, err := c.provider.Get(ctx, []byte(controllerConfigPath)) + if err != nil { + log.Warn("load resource group revision failed", zap.Error(err)) + } + cfgRevision := resp.GetHeader().GetRevision() + var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event + if !c.ruConfig.isSingleGroupByKeyspace { + // Use WithPrevKV() to get the previous key-value pair when get Delete Event. + watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV()) + if err != nil { + log.Warn("watch resource group meta failed", zap.Error(err)) + } + } + + watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix()) + if err != nil { + log.Warn("watch resource group config failed", zap.Error(err)) + } + watchRetryTimer := time.NewTimer(watchRetryInterval) + defer watchRetryTimer.Stop() + for { select { + /* tickers */ + case <-cleanupTicker.C: + c.cleanUpResourceGroup() + case <-stateUpdateTicker.C: + c.executeOnAllGroups((*groupCostController).updateRunState) + c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) + if len(c.run.currentRequests) == 0 { + c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */) + } + case <-watchRetryTimer.C: + if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil { + // Use WithPrevKV() to get the previous key-value pair when get Delete Event. + watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV()) + if err != nil { + log.Warn("watch resource group meta failed", zap.Error(err)) + watchRetryTimer.Reset(watchRetryInterval) + failpoint.Inject("watchStreamError", func() { + watchRetryTimer.Reset(20 * time.Millisecond) + }) + } + } + if watchConfigChannel == nil { + watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix()) + if err != nil { + log.Warn("watch resource group config failed", zap.Error(err)) + watchRetryTimer.Reset(watchRetryInterval) + } + } + + case <-emergencyTokenAcquisitionTicker.C: + c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition) + /* channels */ case <-c.loopCtx.Done(): resourceGroupStatusGauge.Reset() return @@ -218,16 +291,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { c.handleTokenBucketResponse(resp) } c.run.currentRequests = nil - case <-cleanupTicker.C: - if err := c.cleanUpResourceGroup(c.loopCtx); err != nil { - log.Error("[resource group controller] clean up resource groups failed", zap.Error(err)) - } - case <-stateUpdateTicker.C: - c.executeOnAllGroups((*groupCostController).updateRunState) - c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) - if len(c.run.currentRequests) == 0 { - c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */) - } case <-c.lowTokenNotifyChan: c.executeOnAllGroups((*groupCostController).updateRunState) c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) @@ -237,8 +300,73 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { if c.run.inDegradedMode { c.executeOnAllGroups((*groupCostController).applyDegradedMode) } - case <-emergencyTokenAcquisitionTicker.C: - c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition) + case resp, ok := <-watchMetaChannel: + failpoint.Inject("disableWatch", func() { + if c.ruConfig.isSingleGroupByKeyspace { + panic("disableWatch") + } + }) + if !ok { + watchMetaChannel = nil + watchRetryTimer.Reset(watchRetryInterval) + failpoint.Inject("watchStreamError", func() { + watchRetryTimer.Reset(20 * time.Millisecond) + }) + continue + } + for _, item := range resp { + metaRevision = item.Kv.ModRevision + group := &rmpb.ResourceGroup{} + switch item.Type { + case meta_storagepb.Event_PUT: + if err = proto.Unmarshal(item.Kv.Value, group); err != nil { + continue + } + if item, ok := c.groupsController.Load(group.Name); ok { + gc := item.(*groupCostController) + gc.modifyMeta(group) + } + case meta_storagepb.Event_DELETE: + if item.PrevKv != nil { + if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil { + continue + } + if _, ok := c.groupsController.LoadAndDelete(group.Name); ok { + resourceGroupStatusGauge.DeleteLabelValues(group.Name) + } + } else { + // Prev-kv is compacted means there must have been a delete event before this event, + // which means that this is just a duplicated event, so we can just ignore it. + log.Info("previous key-value pair has been compacted", zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value))) + } + } + } + case resp, ok := <-watchConfigChannel: + if !ok { + watchConfigChannel = nil + watchRetryTimer.Reset(watchRetryInterval) + failpoint.Inject("watchStreamError", func() { + watchRetryTimer.Reset(20 * time.Millisecond) + }) + continue + } + for _, item := range resp { + cfgRevision = item.Kv.ModRevision + config := &Config{} + if err := json.Unmarshal(item.Kv.Value, config); err != nil { + continue + } + c.ruConfig = GenerateRUConfig(config) + + // Stay compatible with serverless + for _, opt := range c.opts { + opt(c) + } + copyCfg := *c.ruConfig + c.safeRuConfig.Store(©Cfg) + log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig)) + } + case gc := <-c.tokenBucketUpdateChan: now := gc.run.now go gc.handleTokenBucketUpdateEvent(c.loopCtx, now) @@ -268,13 +396,16 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name if err != nil { return nil, err } + if group == nil { + return nil, errors.Errorf("%s does not exists", name) + } // Check again to prevent initializing the same resource group concurrently. if tmp, ok := c.groupsController.Load(name); ok { gc := tmp.(*groupCostController) return gc, nil } // Initialize the resource group controller. - gc, err := newGroupCostController(group, c.config, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) + gc, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) if err != nil { return nil, err } @@ -289,23 +420,9 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name return tmp.(*groupCostController), nil } -func (c *ResourceGroupsController) cleanUpResourceGroup(ctx context.Context) error { - groups, err := c.provider.ListResourceGroups(ctx) - if err != nil { - return errs.ErrClientListResourceGroup.FastGenByArgs(err.Error()) - } - latestGroups := make(map[string]struct{}) - for _, group := range groups { - latestGroups[group.GetName()] = struct{}{} - } +func (c *ResourceGroupsController) cleanUpResourceGroup() { c.groupsController.Range(func(key, value any) bool { resourceGroupName := key.(string) - if _, ok := latestGroups[resourceGroupName]; !ok { - c.groupsController.Delete(key) - resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName) - return true - } - gc := value.(*groupCostController) // Check for stale resource groups, which will be deleted when consumption is continuously unchanged. gc.mu.Lock() @@ -323,7 +440,6 @@ func (c *ResourceGroupsController) cleanUpResourceGroup(ctx context.Context) err } return true }) - return nil } func (c *ResourceGroupsController) executeOnAllGroups(f func(controller *groupCostController)) { @@ -379,8 +495,8 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, TargetRequestPeriodMs: uint64(defaultTargetPeriod / time.Millisecond), ClientUniqueId: c.clientUniqueID, } - if c.config.DegradedModeWaitDuration > 0 && c.responseDeadlineCh == nil { - c.run.responseDeadline.Reset(c.config.DegradedModeWaitDuration) + if c.ruConfig.DegradedModeWaitDuration > 0 && c.responseDeadlineCh == nil { + c.run.responseDeadline.Reset(c.ruConfig.DegradedModeWaitDuration) c.responseDeadlineCh = c.run.responseDeadline.C } go func() { @@ -420,18 +536,32 @@ func (c *ResourceGroupsController) OnResponse( ) (*rmpb.Consumption, error) { tmp, ok := c.groupsController.Load(resourceGroupName) if !ok { - log.Warn("[resource group controller] resource group name does not exist", zap.String("resourceGroupName", resourceGroupName)) + log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName)) return &rmpb.Consumption{}, nil } return tmp.(*groupCostController).onResponse(req, resp) } -type groupCostController struct { - *rmpb.ResourceGroup - mainCfg *Config - calculators []ResourceCalculator - mode rmpb.GroupMode +// GetResourceGroup returns the meta setting of the given resource group name. +func (c *ResourceGroupsController) GetResourceGroup(resourceGroupName string) (*rmpb.ResourceGroup, error) { + gc, err := c.tryGetResourceGroup(c.loopCtx, resourceGroupName) + if err != nil { + return nil, err + } + return gc.getMeta(), nil +} +type groupCostController struct { + // invariant attributes + name string + mode rmpb.GroupMode + mainCfg *RUConfig + // meta info + meta *rmpb.ResourceGroup + metaLock sync.RWMutex + + // following fields are used for token limiter. + calculators []ResourceCalculator handleRespFunc func(*rmpb.TokenBucketResponse) successfulRequestDuration prometheus.Observer @@ -514,7 +644,7 @@ type tokenCounter struct { func newGroupCostController( group *rmpb.ResourceGroup, - mainCfg *Config, + mainCfg *RUConfig, lowRUNotifyChan chan struct{}, tokenBucketUpdateChan chan *groupCostController, ) (*groupCostController, error) { @@ -527,8 +657,10 @@ func newGroupCostController( return nil, errs.ErrClientResourceGroupConfigUnavailable.FastGenByArgs("not supports the resource type") } gc := &groupCostController{ - ResourceGroup: group, + meta: group, + name: group.Name, mainCfg: mainCfg, + mode: group.GetMode(), successfulRequestDuration: successfulRequestDuration.WithLabelValues(group.Name), failedRequestCounter: failedRequestCounter.WithLabelValues(group.Name), requestRetryCounter: requestRetryCounter.WithLabelValues(group.Name), @@ -537,7 +669,6 @@ func newGroupCostController( newKVCalculator(mainCfg), newSQLCalculator(mainCfg), }, - mode: group.GetMode(), tokenBucketUpdateChan: tokenBucketUpdateChan, lowRUNotifyChan: lowRUNotifyChan, burstable: &atomic.Bool{}, @@ -582,19 +713,19 @@ func (gc *groupCostController) initRunState() { return cfg } + gc.metaLock.RLock() + defer gc.metaLock.RUnlock() switch gc.mode { case rmpb.GroupMode_RUMode: gc.run.requestUnitTokens = make(map[rmpb.RequestUnitType]*tokenCounter) for typ := range requestUnitLimitTypeList { - tb := getRUTokenBucketSetting(gc.ResourceGroup, typ) - cfg := cfgFunc(tb) - limiter := NewLimiterWithCfg(now, cfg, gc.lowRUNotifyChan) + limiter := NewLimiterWithCfg(now, cfgFunc(getRUTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan) counter := &tokenCounter{ limiter: limiter, avgRUPerSec: 0, avgLastTime: now, getTokenBucketFunc: func() *rmpb.TokenBucket { - return getRUTokenBucketSetting(gc.ResourceGroup, typ) + return getRUTokenBucketSetting(gc.meta, typ) }, } gc.run.requestUnitTokens[typ] = counter @@ -602,15 +733,13 @@ func (gc *groupCostController) initRunState() { case rmpb.GroupMode_RawMode: gc.run.resourceTokens = make(map[rmpb.RawResourceType]*tokenCounter) for typ := range requestResourceLimitTypeList { - tb := getRawResourceTokenBucketSetting(gc.ResourceGroup, typ) - cfg := cfgFunc(tb) - limiter := NewLimiterWithCfg(now, cfg, gc.lowRUNotifyChan) + limiter := NewLimiterWithCfg(now, cfgFunc(getRawResourceTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan) counter := &tokenCounter{ limiter: limiter, avgRUPerSec: 0, avgLastTime: now, getTokenBucketFunc: func() *rmpb.TokenBucket { - return getRawResourceTokenBucketSetting(gc.ResourceGroup, typ) + return getRawResourceTokenBucketSetting(gc.meta, typ) }, } gc.run.resourceTokens[typ] = counter @@ -637,7 +766,7 @@ func (gc *groupCostController) updateRunState() { } *gc.run.consumption = *gc.mu.consumption gc.mu.Unlock() - log.Debug("[resource group controller] update run state", zap.Any("request unit consumption", gc.run.consumption)) + log.Debug("[resource group controller] update run state", zap.Any("request-unit-consumption", gc.run.consumption)) gc.run.now = newTime } @@ -718,7 +847,7 @@ func (gc *groupCostController) updateAvgRaWResourcePerSec() { if !gc.calcAvg(counter, getRawResourceValueFromConsumption(gc.run.consumption, typ)) { continue } - log.Debug("[resource group controller] update avg raw resource per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec)) + log.Debug("[resource group controller] update avg raw resource per sec", zap.String("name", gc.name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec)) } gc.burstable.Store(isBurstable) } @@ -732,7 +861,7 @@ func (gc *groupCostController) updateAvgRUPerSec() { if !gc.calcAvg(counter, getRUValueFromConsumption(gc.run.consumption, typ)) { continue } - log.Debug("[resource group controller] update avg ru per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec)) + log.Debug("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec)) } gc.burstable.Store(isBurstable) } @@ -771,7 +900,7 @@ func (gc *groupCostController) shouldReportConsumption() bool { if timeSinceLastRequest >= extendedReportingPeriodFactor*defaultTargetPeriod { return true } - switch gc.Mode { + switch gc.mode { case rmpb.GroupMode_RUMode: for typ := range requestUnitLimitTypeList { if getRUValueFromConsumption(gc.run.consumption, typ)-getRUValueFromConsumption(gc.run.lastRequestConsumption, typ) >= consumptionsReportingThreshold { @@ -837,7 +966,7 @@ func (gc *groupCostController) applyBasicConfigForRUTokenCounters() { cfg.NewRate = 99999999 }) counter.limiter.Reconfigure(gc.run.now, cfg, resetLowProcess()) - log.Info("[resource group controller] resource token bucket enter degraded mode", zap.String("resource group", gc.Name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)])) + log.Info("[resource group controller] resource token bucket enter degraded mode", zap.String("resource-group", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)])) } } @@ -918,7 +1047,7 @@ func initCounterNotify(counter *tokenCounter) { func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType) *rmpb.TokenBucketRequest { req := &rmpb.TokenBucketRequest{ - ResourceGroupName: gc.ResourceGroup.GetName(), + ResourceGroupName: gc.name, } // collect request resource selected := gc.run.requestInProgress @@ -982,6 +1111,18 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType return req } +func (gc *groupCostController) getMeta() *rmpb.ResourceGroup { + gc.metaLock.Lock() + defer gc.metaLock.Unlock() + return gc.meta +} + +func (gc *groupCostController) modifyMeta(newMeta *rmpb.ResourceGroup) { + gc.metaLock.Lock() + defer gc.metaLock.Unlock() + gc.meta = newMeta +} + func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 { // `needTokensAmplification` is used to properly amplify a need. The reason is that in the current implementation, // the token returned from the server determines the average consumption speed. @@ -1018,7 +1159,7 @@ func (gc *groupCostController) onRequestWait( res := make([]*Reservation, 0, len(requestResourceLimitTypeList)) for typ, counter := range gc.run.resourceTokens { if v := getRawResourceValueFromConsumption(delta, typ); v > 0 { - res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v)) + res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) } } if d, err = WaitReservations(ctx, now, res); err == nil { @@ -1028,7 +1169,7 @@ func (gc *groupCostController) onRequestWait( res := make([]*Reservation, 0, len(requestUnitLimitTypeList)) for typ, counter := range gc.run.requestUnitTokens { if v := getRUValueFromConsumption(delta, typ); v > 0 { - res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v)) + res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) } } if d, err = WaitReservations(ctx, now, res); err == nil { @@ -1109,11 +1250,14 @@ func (gc *groupCostController) onResponse( return delta, nil } -// CheckResourceGroupExist checks if groupsController map {rg.name -> resource group controller} -// contains name. Used for test only. -func (c *ResourceGroupsController) CheckResourceGroupExist(name string) bool { - _, ok := c.groupsController.Load(name) - return ok +// GetActiveResourceGroup is used to get action resource group. +// This is used for test only. +func (c *ResourceGroupsController) GetActiveResourceGroup(resourceGroupName string) *rmpb.ResourceGroup { + tmp, ok := c.groupsController.Load(resourceGroupName) + if !ok { + return nil + } + return tmp.(*groupCostController).getMeta() } // This is used for test only. diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index d3b2a29c211..165d501ddb1 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -42,7 +42,7 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController } ch1 := make(chan struct{}) ch2 := make(chan *groupCostController) - gc, err := newGroupCostController(group, DefaultConfig(), ch1, ch2) + gc, err := newGroupCostController(group, DefaultRUConfig(), ch1, ch2) re.NoError(err) return gc } diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index 3e4926829e2..e0d7bca2b55 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -285,7 +285,7 @@ type tokenBucketReconfigureArgs struct { NotifyThreshold float64 } -// LimiterOption is used to modify the Limiter during construction. +// LimiterOption configures Limiter. type LimiterOption func(*Limiter) func resetLowProcess() func(*Limiter) { @@ -387,7 +387,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur return r } -// ResetRemainingNotifyTimes resets remainingNotifyTimes. +// ResetRemainingNotifyTimes resets the remaining notify times to 3. func (lim *Limiter) ResetRemainingNotifyTimes() { lim.mu.Lock() defer lim.mu.Unlock() diff --git a/client/resource_group/controller/model.go b/client/resource_group/controller/model.go index 711ed9f5956..d5755c3160a 100644 --- a/client/resource_group/controller/model.go +++ b/client/resource_group/controller/model.go @@ -64,13 +64,13 @@ type ResourceCalculator interface { // KVCalculator is used to calculate the KV-side consumption. type KVCalculator struct { - *Config + *RUConfig } var _ ResourceCalculator = (*KVCalculator)(nil) -func newKVCalculator(cfg *Config) *KVCalculator { - return &KVCalculator{Config: cfg} +func newKVCalculator(cfg *RUConfig) *KVCalculator { + return &KVCalculator{RUConfig: cfg} } // Trickle ... @@ -140,13 +140,13 @@ func (kc *KVCalculator) payBackWriteCost(consumption *rmpb.Consumption, req Requ // SQLCalculator is used to calculate the SQL-side consumption. type SQLCalculator struct { - *Config + *RUConfig } var _ ResourceCalculator = (*SQLCalculator)(nil) -func newSQLCalculator(cfg *Config) *SQLCalculator { - return &SQLCalculator{Config: cfg} +func newSQLCalculator(cfg *RUConfig) *SQLCalculator { + return &SQLCalculator{RUConfig: cfg} } // Trickle update sql layer CPU consumption. diff --git a/client/resource_group/controller/testutil.go b/client/resource_group/controller/testutil.go index 9866a518997..4df8c9bba0d 100644 --- a/client/resource_group/controller/testutil.go +++ b/client/resource_group/controller/testutil.go @@ -51,6 +51,11 @@ func (tri *TestRequestInfo) StoreID() uint64 { return tri.storeID } +// ReplicaNumber implements the RequestInfo interface. +func (tri *TestRequestInfo) ReplicaNumber() int64 { + return 1 +} + // TestResponseInfo is used to test the response info interface. type TestResponseInfo struct { readBytes uint64 diff --git a/client/resource_group/controller/util.go b/client/resource_group/controller/util.go new file mode 100644 index 00000000000..e3450e0ae0d --- /dev/null +++ b/client/resource_group/controller/util.go @@ -0,0 +1,68 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "fmt" + "strconv" + "time" + + "github.com/pingcap/errors" +) + +// Duration is a wrapper of time.Duration for TOML and JSON. +type Duration struct { + time.Duration +} + +// NewDuration creates a Duration from time.Duration. +func NewDuration(duration time.Duration) Duration { + return Duration{Duration: duration} +} + +// MarshalJSON returns the duration as a JSON string. +func (d *Duration) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`"%s"`, d.String())), nil +} + +// UnmarshalJSON parses a JSON string into the duration. +func (d *Duration) UnmarshalJSON(text []byte) error { + s, err := strconv.Unquote(string(text)) + if err != nil { + return errors.WithStack(err) + } + duration, err := time.ParseDuration(s) + if err != nil { + return errors.WithStack(err) + } + d.Duration = duration + return nil +} + +// UnmarshalText parses a TOML string into the duration. +func (d *Duration) UnmarshalText(text []byte) error { + var err error + d.Duration, err = time.ParseDuration(string(text)) + return errors.WithStack(err) +} + +// MarshalText returns the duration as a JSON string. +func (d Duration) MarshalText() ([]byte, error) { + return []byte(d.String()), nil +} diff --git a/client/resource_group/controller/util_test.go b/client/resource_group/controller/util_test.go new file mode 100644 index 00000000000..b542e6713dc --- /dev/null +++ b/client/resource_group/controller/util_test.go @@ -0,0 +1,51 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "encoding/json" + "testing" + + "github.com/BurntSushi/toml" + "github.com/stretchr/testify/require" +) + +type example struct { + Interval Duration `json:"interval" toml:"interval"` +} + +func TestDurationJSON(t *testing.T) { + t.Parallel() + re := require.New(t) + example := &example{} + + text := []byte(`{"interval":"1h1m1s"}`) + re.NoError(json.Unmarshal(text, example)) + re.Equal(float64(60*60+60+1), example.Interval.Seconds()) + + b, err := json.Marshal(example) + re.NoError(err) + re.Equal(string(text), string(b)) +} + +func TestDurationTOML(t *testing.T) { + t.Parallel() + re := require.New(t) + example := &example{} + + text := []byte(`interval = "1h1m1s"`) + re.Nil(toml.Unmarshal(text, example)) + re.Equal(float64(60*60+60+1), example.Interval.Seconds()) +} diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index 90cfc977acf..fbf474cabc3 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -21,7 +21,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/meta_storagepb" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" @@ -31,15 +31,22 @@ import ( type actionType int const ( - add actionType = 0 - modify actionType = 1 - groupSettingsPathPrefix = "resource_group/settings" + add actionType = 0 + modify actionType = 1 + groupSettingsPathPrefix = "resource_group/settings" + controllerConfigPathPrefix = "resource_group/controller" // errNotPrimary is returned when the requested server is not primary. errNotPrimary = "not primary" // errNotLeader is returned when the requested server is not pd leader. errNotLeader = "not leader" ) +// GroupSettingsPathPrefixBytes is used to watch or get resource groups. +var GroupSettingsPathPrefixBytes = []byte(groupSettingsPathPrefix) + +// ControllerConfigPathPrefixBytes is used to watch or get controller config. +var ControllerConfigPathPrefixBytes = []byte(controllerConfigPathPrefix) + // ResourceManagerClient manages resource group info and token request. type ResourceManagerClient interface { ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) @@ -47,8 +54,9 @@ type ResourceManagerClient interface { AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) - WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) + LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) + Watch(ctx context.Context, key []byte, opts ...OpOption) (chan []*meta_storagepb.Event, error) } // resourceManagerClient gets the ResourceManager client of current PD leader. @@ -160,50 +168,23 @@ func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName stri return resp.GetBody(), nil } -// WatchResourceGroup [just for TEST] watches resource groups changes. -// It returns a stream of slices of resource groups. -// The first message in stream contains all current resource groups, -// all subsequent messages contains new events[PUT/DELETE] for all resource groups. -func (c *client) WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) { - configChan, err := c.WatchGlobalConfig(ctx, groupSettingsPathPrefix, revision) +func (c *client) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) { + resp, err := c.Get(ctx, GroupSettingsPathPrefixBytes, WithPrefix()) if err != nil { - return nil, err + return nil, 0, err } - resourceGroupWatcherChan := make(chan []*rmpb.ResourceGroup) - go func() { - defer func() { - close(resourceGroupWatcherChan) - if r := recover(); r != nil { - log.Error("[pd] panic in ResourceManagerClient `WatchResourceGroups`", zap.Any("error", r)) - return - } - }() - for { - select { - case <-ctx.Done(): - return - case res, ok := <-configChan: - if !ok { - return - } - groups := make([]*rmpb.ResourceGroup, 0, len(res)) - for _, item := range res { - switch item.EventType { - case pdpb.EventType_PUT: - group := &rmpb.ResourceGroup{} - if err := proto.Unmarshal([]byte(item.Value), group); err != nil { - return - } - groups = append(groups, group) - case pdpb.EventType_DELETE: - continue - } - } - resourceGroupWatcherChan <- groups - } + if resp.Header.Error != nil { + return nil, resp.Header.Revision, errors.Errorf(resp.Header.Error.Message) + } + groups := make([]*rmpb.ResourceGroup, 0, len(resp.Kvs)) + for _, item := range resp.Kvs { + group := &rmpb.ResourceGroup{} + if err := proto.Unmarshal(item.Value, group); err != nil { + continue } - }() - return resourceGroupWatcherChan, err + groups = append(groups, group) + } + return groups, resp.Header.Revision, nil } func (c *client) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) { diff --git a/pkg/basicserver/basic_server.go b/pkg/basicserver/basic_server.go index 3a21da0f9d9..98a34d3b8b2 100644 --- a/pkg/basicserver/basic_server.go +++ b/pkg/basicserver/basic_server.go @@ -44,5 +44,5 @@ type Server interface { // IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise. IsServing() bool // AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. - AddServiceReadyCallback(callbacks ...func(context.Context)) + AddServiceReadyCallback(callbacks ...func(context.Context) error) } diff --git a/pkg/mcs/meta_storage/server/grpc_service.go b/pkg/mcs/meta_storage/server/grpc_service.go index 9d27e817e29..e9d35fbf14b 100644 --- a/pkg/mcs/meta_storage/server/grpc_service.go +++ b/pkg/mcs/meta_storage/server/grpc_service.go @@ -126,7 +126,14 @@ func (s *Service) Watch(req *meta_storagepb.WatchRequest, server meta_storagepb. events := make([]*meta_storagepb.Event, 0, len(res.Events)) for _, e := range res.Events { - event := &meta_storagepb.Event{Kv: &meta_storagepb.KeyValue{Key: e.Kv.Key, Value: e.Kv.Value}, Type: meta_storagepb.Event_EventType(e.Type)} + event := &meta_storagepb.Event{Kv: &meta_storagepb.KeyValue{ + Key: e.Kv.Key, + Value: e.Kv.Value, + ModRevision: e.Kv.ModRevision, + CreateRevision: e.Kv.CreateRevision, + Version: e.Kv.Version, + Lease: e.Kv.Lease, + }, Type: meta_storagepb.Event_EventType(e.Type)} if e.PrevKv != nil { event.PrevKv = &meta_storagepb.KeyValue{Key: e.PrevKv.Key, Value: e.PrevKv.Value} } diff --git a/pkg/mcs/resource_manager/server/apis/v1/api.go b/pkg/mcs/resource_manager/server/apis/v1/api.go index c6c0fc8e9e1..7e7d59c1f29 100644 --- a/pkg/mcs/resource_manager/server/apis/v1/api.go +++ b/pkg/mcs/resource_manager/server/apis/v1/api.go @@ -16,7 +16,9 @@ package apis import ( "errors" + "fmt" "net/http" + "reflect" "sync" "github.com/gin-contrib/cors" @@ -29,6 +31,7 @@ import ( "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" + "github.com/tikv/pd/pkg/utils/reflectutil" ) // APIPathPrefix is the prefix of the API path. @@ -96,6 +99,8 @@ func (s *Service) RegisterRouter() { configEndpoint.GET("/group/:name", s.getResourceGroup) configEndpoint.GET("/groups", s.getResourceGroupList) configEndpoint.DELETE("/group/:name", s.deleteResourceGroup) + configEndpoint.GET("/controller", s.getControllerConfig) + configEndpoint.POST("/controller", s.setControllerConfig) } func (s *Service) handler() http.Handler { @@ -190,3 +195,43 @@ func (s *Service) deleteResourceGroup(c *gin.Context) { } c.JSON(http.StatusOK, "Success!") } + +// GetControllerConfig +// +// @Tags ResourceManager +// @Summary Get the resource controller config. +// @Success 200 {string} json format of rmserver.ControllerConfig +// @Failure 400 {string} error +// @Router /config/controller [GET] +func (s *Service) getControllerConfig(c *gin.Context) { + config := s.manager.GetControllerConfig() + c.IndentedJSON(http.StatusOK, config) +} + +// SetControllerConfig +// +// @Tags ResourceManager +// @Summary Set the resource controller config. +// @Param config body object true "json params, rmserver.ControllerConfig" +// @Success 200 {string} string "Success!" +// @Failure 400 {string} error +// @Router /config/controller [POST] +func (s *Service) setControllerConfig(c *gin.Context) { + conf := make(map[string]interface{}) + if err := c.ShouldBindJSON(&conf); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + for k, v := range conf { + key := reflectutil.FindJSONFullTagByChildTag(reflect.TypeOf(rmserver.ControllerConfig{}), k) + if key == "" { + c.String(http.StatusBadRequest, fmt.Sprintf("config item %s not found", k)) + return + } + if err := s.manager.UpdateControllerConfigItem(key, v); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + } + c.String(http.StatusOK, "Success!") +} diff --git a/pkg/mcs/resource_manager/server/config.go b/pkg/mcs/resource_manager/server/config.go index 9f7ee63231b..d5e9c09602d 100644 --- a/pkg/mcs/resource_manager/server/config.go +++ b/pkg/mcs/resource_manager/server/config.go @@ -51,6 +51,8 @@ const ( // Because the resource manager has not been deployed in microservice mode, // do not enable this function. defaultDegradedModeWaitDuration = time.Second * 0 + // defaultMaxWaitDuration is the max duration to wait for the token before throwing error. + defaultMaxWaitDuration = 30 * time.Second ) // Config is the configuration for the resource manager. @@ -85,6 +87,9 @@ type ControllerConfig struct { // EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect. DegradedModeWaitDuration typeutil.Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"` + // LTBMaxWaitDuration is the max wait time duration for local token bucket. + LTBMaxWaitDuration typeutil.Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"` + // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"` @@ -98,6 +103,7 @@ func (rmc *ControllerConfig) Adjust(meta *configutil.ConfigMetaData) { rmc.RequestUnit.Adjust() configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, defaultDegradedModeWaitDuration) + configutil.AdjustDuration(&rmc.LTBMaxWaitDuration, defaultMaxWaitDuration) failpoint.Inject("enableDegradedMode", func() { configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, time.Second) }) diff --git a/pkg/mcs/resource_manager/server/config_test.go b/pkg/mcs/resource_manager/server/config_test.go index c0cac4da9c0..dd8dd2d2814 100644 --- a/pkg/mcs/resource_manager/server/config_test.go +++ b/pkg/mcs/resource_manager/server/config_test.go @@ -27,6 +27,7 @@ func TestControllerConfig(t *testing.T) { re := require.New(t) cfgData := ` [controller] +ltb-max-wait-duration = "60s" degraded-mode-wait-duration = "2s" [controller.request-unit] read-base-cost = 1.0 @@ -42,6 +43,7 @@ read-cpu-ms-cost = 5.0 re.NoError(err) re.Equal(cfg.Controller.DegradedModeWaitDuration.Duration, time.Second*2) + re.Equal(cfg.Controller.LTBMaxWaitDuration.Duration, time.Second*60) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.CPUMsCost-5), 1e-7) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteCostPerByte-4), 1e-7) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteBaseCost-3), 1e-7) diff --git a/pkg/mcs/resource_manager/server/manager.go b/pkg/mcs/resource_manager/server/manager.go index 7bb3148b75b..fb9f88857aa 100644 --- a/pkg/mcs/resource_manager/server/manager.go +++ b/pkg/mcs/resource_manager/server/manager.go @@ -19,10 +19,12 @@ import ( "encoding/json" "math" "sort" + "strings" "sync" "time" "github.com/gogo/protobuf/proto" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/log" @@ -30,6 +32,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/jsonutil" "github.com/tikv/pd/pkg/utils/logutil" "go.uber.org/zap" ) @@ -98,33 +101,47 @@ func (m *Manager) GetBasicServer() bs.Server { } // Init initializes the resource group manager. -func (m *Manager) Init(ctx context.Context) { - // Todo: If we can modify following configs in the future, we should reload these configs. - // Store the controller config into the storage. - m.storage.SaveControllerConfig(m.controllerConfig) +func (m *Manager) Init(ctx context.Context) error { + v, err := m.storage.LoadControllerConfig() + if err != nil { + log.Error("resource controller config load failed", zap.Error(err), zap.String("v", v)) + return err + } + if err = json.Unmarshal([]byte(v), &m.controllerConfig); err != nil { + log.Error("un-marshall controller config failed, fallback to default", zap.Error(err), zap.String("v", v)) + } + + // re-save the config to make sure the config has been persisted. + if err := m.storage.SaveControllerConfig(m.controllerConfig); err != nil { + return err + } // Load resource group meta info from storage. m.groups = make(map[string]*ResourceGroup) handler := func(k, v string) { group := &rmpb.ResourceGroup{} if err := proto.Unmarshal([]byte(v), group); err != nil { - log.Error("err", zap.Error(err), zap.String("k", k), zap.String("v", v)) + log.Error("failed to parse the resource group", zap.Error(err), zap.String("k", k), zap.String("v", v)) panic(err) } m.groups[group.Name] = FromProtoResourceGroup(group) } - m.storage.LoadResourceGroupSettings(handler) + if err := m.storage.LoadResourceGroupSettings(handler); err != nil { + return err + } // Load resource group states from storage. tokenHandler := func(k, v string) { tokens := &GroupStates{} if err := json.Unmarshal([]byte(v), tokens); err != nil { - log.Error("err", zap.Error(err), zap.String("k", k), zap.String("v", v)) + log.Error("failed to parse the resource group state", zap.Error(err), zap.String("k", k), zap.String("v", v)) panic(err) } if group, ok := m.groups[k]; ok { group.SetStatesIntoResourceGroup(tokens) } } - m.storage.LoadResourceGroupStates(tokenHandler) + if err := m.storage.LoadResourceGroupStates(tokenHandler); err != nil { + return err + } // Add default group if it's not inited. if _, ok := m.groups[reservedDefaultGroupName]; !ok { @@ -153,6 +170,48 @@ func (m *Manager) Init(ctx context.Context) { m.persistLoop(ctx) }() log.Info("resource group manager finishes initialization") + return nil +} + +// UpdateControllerConfigItem updates the controller config item. +func (m *Manager) UpdateControllerConfigItem(key string, value interface{}) error { + kp := strings.Split(key, ".") + if len(kp) == 0 { + return errors.Errorf("invalid key %s", key) + } + m.Lock() + var config interface{} + switch kp[0] { + case "request-unit": + config = &m.controllerConfig.RequestUnit + default: + config = m.controllerConfig + } + updated, found, err := jsonutil.AddKeyValue(config, kp[len(kp)-1], value) + if err != nil { + m.Unlock() + return err + } + + if !found { + m.Unlock() + return errors.Errorf("config item %s not found", key) + } + m.Unlock() + if updated { + if err := m.storage.SaveControllerConfig(m.controllerConfig); err != nil { + log.Error("save controller config failed", zap.Error(err)) + } + log.Info("updated controller config item", zap.String("key", key), zap.Any("value", value)) + } + return nil +} + +// GetControllerConfig returns the controller config. +func (m *Manager) GetControllerConfig() *ControllerConfig { + m.RLock() + defer m.RUnlock() + return m.controllerConfig } // AddResourceGroup puts a resource group. diff --git a/pkg/mcs/resource_manager/server/server.go b/pkg/mcs/resource_manager/server/server.go index a2ce35edf56..17e86a3c02b 100644 --- a/pkg/mcs/resource_manager/server/server.go +++ b/pkg/mcs/resource_manager/server/server.go @@ -77,7 +77,7 @@ type Server struct { // startCallbacks will be called after the server is started. startCallbacks []func() // primaryCallbacks will be called after the server becomes leader. - primaryCallbacks []func(context.Context) + primaryCallbacks []func(context.Context) error serviceRegister *discovery.ServiceRegister } @@ -251,7 +251,7 @@ func (s *Server) IsClosed() bool { } // AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. -func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) { s.primaryCallbacks = append(s.primaryCallbacks, callbacks...) } diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 54e6266df5d..2a3172d6592 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -237,7 +237,7 @@ func (s *Server) GetLeaderListenUrls() []string { // AddServiceReadyCallback implements basicserver. // It adds callbacks when it's ready for providing tso service. -func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) { // Do nothing here. The primary of each keyspace group assigned to this host // will respond to the requests accordingly. } diff --git a/pkg/storage/endpoint/resource_group.go b/pkg/storage/endpoint/resource_group.go index f1b3feb36aa..150ea77a1c7 100644 --- a/pkg/storage/endpoint/resource_group.go +++ b/pkg/storage/endpoint/resource_group.go @@ -27,6 +27,7 @@ type ResourceGroupStorage interface { SaveResourceGroupStates(name string, obj interface{}) error DeleteResourceGroupStates(name string) error SaveControllerConfig(config interface{}) error + LoadControllerConfig() (string, error) } var _ ResourceGroupStorage = (*StorageEndpoint)(nil) @@ -65,3 +66,8 @@ func (se *StorageEndpoint) LoadResourceGroupStates(f func(k, v string)) error { func (se *StorageEndpoint) SaveControllerConfig(config interface{}) error { return se.saveJSON(controllerConfigPath, config) } + +// LoadControllerConfig loads the resource controller config from storage. +func (se *StorageEndpoint) LoadControllerConfig() (string, error) { + return se.Load(controllerConfigPath) +} diff --git a/server/grpc_service.go b/server/grpc_service.go index 47eb7a96a3b..1b47cd5ed30 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1964,7 +1964,7 @@ func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, serve } else { // Prev-kv is compacted means there must have been a delete event before this event, // which means that this is just a duplicated event, so we can just ignore it. - log.Info("previous key-value pair has been compacted", zap.String("previous key", string(e.Kv.Key))) + log.Info("previous key-value pair has been compacted", zap.String("required-key", string(e.Kv.Key))) } } } diff --git a/server/server.go b/server/server.go index fa4f329dead..1ae433d56ba 100644 --- a/server/server.go +++ b/server/server.go @@ -189,7 +189,7 @@ type Server struct { // startCallbacks will be called after the server is started. startCallbacks []func() // leaderCallbacks will be called after the server becomes leader. - leaderCallbacks []func(context.Context) + leaderCallbacks []func(context.Context) error // closeCallbacks will be called before the server is closed. closeCallbacks []func() @@ -1436,7 +1436,7 @@ func (s *Server) IsServing() bool { } // AddServiceReadyCallback adds callbacks when the server becomes the leader if there is embedded etcd, or the primary otherwise. -func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) { s.leaderCallbacks = append(s.leaderCallbacks, callbacks...) } @@ -1582,7 +1582,10 @@ func (s *Server) campaignLeader() { log.Info("triggering the leader callback functions") for _, cb := range s.leaderCallbacks { - cb(ctx) + if err := cb(ctx); err != nil { + log.Error("failed to execute leader callback function", errs.ZapError(err)) + return + } } // Try to create raft cluster. diff --git a/tests/integrations/client/Makefile b/tests/integrations/client/Makefile index 38d01fdc185..3fa24a11240 100644 --- a/tests/integrations/client/Makefile +++ b/tests/integrations/client/Makefile @@ -18,9 +18,10 @@ PATH := $(GO_TOOLS_BIN_PATH):$(PATH) SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash) static: install-tools + @ echo "gofmt ..." @ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }' @ echo "golangci-lint ..." - @ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./... --allow-parallel-runners + @ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./... @ echo "revive ..." @ revive -formatter friendly -config $(ROOT_PATH)/revive.toml ./... diff --git a/tests/integrations/mcs/Makefile b/tests/integrations/mcs/Makefile index a446e0a4b61..146fd1a0933 100644 --- a/tests/integrations/mcs/Makefile +++ b/tests/integrations/mcs/Makefile @@ -18,9 +18,10 @@ PATH := $(GO_TOOLS_BIN_PATH):$(PATH) SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash) static: install-tools + @ echo "gofmt ..." @ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }' @ echo "golangci-lint ..." - @ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./... --allow-parallel-runners + @ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./... @ echo "revive ..." @ revive -formatter friendly -config $(ROOT_PATH)/revive.toml ./... diff --git a/tests/integrations/mcs/resource_manager/resource_manager_test.go b/tests/integrations/mcs/resource_manager/resource_manager_test.go index 99c97cd4d9d..17d1a3090c4 100644 --- a/tests/integrations/mcs/resource_manager/resource_manager_test.go +++ b/tests/integrations/mcs/resource_manager/resource_manager_test.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "io" + "math/rand" "net/http" "strconv" "strings" @@ -178,78 +179,161 @@ func (suite *resourceManagerClientTestSuite) resignAndWaitLeader() { func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { re := suite.Require() cli := suite.client + groupNamePrefix := "watch_test" group := &rmpb.ResourceGroup{ - Name: "test", + Name: groupNamePrefix, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ Settings: &rmpb.TokenLimitSettings{ FillRate: 10000, }, - Tokens: 100000, }, }, } - // Mock get revision by listing - for i := 0; i < 3; i++ { - group.Name += strconv.Itoa(i) - resp, err := cli.AddResourceGroup(suite.ctx, group) - group.Name = "test" - re.NoError(err) - re.Contains(resp, "Success!") - } - lresp, err := cli.ListResourceGroups(suite.ctx) - re.NoError(err) - re.Equal(len(lresp), 4) - // Start watcher - watchChan, err := suite.client.WatchResourceGroup(suite.ctx, int64(0)) - suite.NoError(err) + controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) + controller.Start(suite.ctx) + defer controller.Stop() + // Mock add resource groups - for i := 3; i < 9; i++ { - group.Name = "test" + strconv.Itoa(i) + var meta *rmpb.ResourceGroup + groupsNum := 10 + for i := 0; i < groupsNum; i++ { + group.Name = groupNamePrefix + strconv.Itoa(i) resp, err := cli.AddResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") + + // Make sure the resource group active + meta, err = controller.GetResourceGroup(group.Name) + re.NotNil(meta) + re.NoError(err) + meta = controller.GetActiveResourceGroup(group.Name) + re.NotNil(meta) } // Mock modify resource groups - modifySettings := func(gs *rmpb.ResourceGroup) { + modifySettings := func(gs *rmpb.ResourceGroup, fillRate uint64) { gs.RUSettings = &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ Settings: &rmpb.TokenLimitSettings{ - FillRate: 20000, + FillRate: fillRate, }, }, } } - for i := 0; i < 9; i++ { - group.Name = "test" + strconv.Itoa(i) - modifySettings(group) + for i := 0; i < groupsNum; i++ { + group.Name = groupNamePrefix + strconv.Itoa(i) + modifySettings(group, 20000) resp, err := cli.ModifyResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") } + for i := 0; i < groupsNum; i++ { + testutil.Eventually(re, func() bool { + name := groupNamePrefix + strconv.Itoa(i) + meta = controller.GetActiveResourceGroup(name) + if meta != nil { + return meta.RUSettings.RU.Settings.FillRate == uint64(20000) + } + return false + }, testutil.WithTickInterval(50*time.Millisecond)) + } + + // Mock reset watch stream + re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/watchStreamError", "return(true)")) + group.Name = groupNamePrefix + strconv.Itoa(groupsNum) + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + // Make sure the resource group active + meta, err = controller.GetResourceGroup(group.Name) + re.NotNil(meta) + re.NoError(err) + modifySettings(group, 30000) + resp, err = cli.ModifyResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + testutil.Eventually(re, func() bool { + meta = controller.GetActiveResourceGroup(group.Name) + return meta.RUSettings.RU.Settings.FillRate == uint64(30000) + }, testutil.WithTickInterval(100*time.Millisecond)) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/watchStreamError")) + // Mock delete resource groups suite.cleanupResourceGroups() - // Check watch result - i := 0 - for { - select { - case <-time.After(time.Second): - return - case res := <-watchChan: - if i < 6 { - for _, r := range res { - suite.Equal(uint64(10000), r.RUSettings.RU.Settings.FillRate) - i++ - } - } else { // after modify - for _, r := range res { - suite.Equal(uint64(20000), r.RUSettings.RU.Settings.FillRate) - i++ - } - } + for i := 0; i < groupsNum; i++ { + testutil.Eventually(re, func() bool { + name := groupNamePrefix + strconv.Itoa(i) + meta = controller.GetActiveResourceGroup(name) + return meta == nil + }, testutil.WithTickInterval(50*time.Millisecond)) + } +} + +func (suite *resourceManagerClientTestSuite) TestWatchWithSingleGroupByKeyspace() { + re := suite.Require() + cli := suite.client + + // We need to disable watch stream for `isSingleGroupByKeyspace`. + re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/disableWatch", "return(true)")) + defer func() { + re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/disableWatch")) + }() + // Distinguish the controller with and without enabling `isSingleGroupByKeyspace`. + controllerKeySpace, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil, controller.EnableSingleGroupByKeyspace()) + controller, _ := controller.NewResourceGroupController(suite.ctx, 2, cli, nil) + controller.Start(suite.ctx) + controllerKeySpace.Start(suite.ctx) + defer controllerKeySpace.Stop() + defer controller.Stop() + + // Mock add resource group. + group := &rmpb.ResourceGroup{ + Name: "keyspace_test", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 10000, + }, + Tokens: 100000, + }, + }, + } + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + + tcs := tokenConsumptionPerSecond{rruTokensAtATime: 100} + controller.OnRequestWait(suite.ctx, group.Name, tcs.makeReadRequest()) + meta := controller.GetActiveResourceGroup(group.Name) + re.Equal(meta.RUSettings.RU, group.RUSettings.RU) + + controllerKeySpace.OnRequestWait(suite.ctx, group.Name, tcs.makeReadRequest()) + metaKeySpace := controllerKeySpace.GetActiveResourceGroup(group.Name) + re.Equal(metaKeySpace.RUSettings.RU, group.RUSettings.RU) + + // Mock modify resource groups + modifySettings := func(gs *rmpb.ResourceGroup, fillRate uint64) { + gs.RUSettings = &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: fillRate, + }, + }, } } + modifySettings(group, 20000) + resp, err = cli.ModifyResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + + testutil.Eventually(re, func() bool { + meta = controller.GetActiveResourceGroup(group.Name) + return meta.RUSettings.RU.Settings.FillRate == uint64(20000) + }, testutil.WithTickInterval(100*time.Millisecond)) + metaKeySpace = controllerKeySpace.GetActiveResourceGroup(group.Name) + re.Equal(metaKeySpace.RUSettings.RU.Settings.FillRate, uint64(10000)) } const buffDuration = time.Millisecond * 300 @@ -289,11 +373,21 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { re := suite.Require() cli := suite.client - for _, group := range suite.initGroups { - resp, err := cli.AddResourceGroup(suite.ctx, group) - re.NoError(err) - re.Contains(resp, "Success!") + rg := &rmpb.ResourceGroup{ + Name: "controller_test", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 10000, + }, + Tokens: 100000, + }, + }, } + resp, err := cli.AddResourceGroup(suite.ctx, rg) + re.NoError(err) + re.Contains(resp, "Success!") cfg := &controller.RequestUnitConfig{ ReadBaseCost: 1, @@ -305,6 +399,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, controller.EnableSingleGroupByKeyspace()) controller.Start(suite.ctx) + defer controller.Stop() testCases := []struct { resourceGroupName string @@ -312,7 +407,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { len int }{ { - resourceGroupName: suite.initGroups[0].Name, + resourceGroupName: rg.Name, len: 8, tcs: []tokenConsumptionPerSecond{ {rruTokensAtATime: 50, wruTokensAtATime: 20, times: 100, waitDuration: 0}, @@ -363,11 +458,10 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate", "return(true)")) tcs := tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 900000000, times: 1, waitDuration: 0} wreq := tcs.makeWriteRequest() - _, _, err := controller.OnRequestWait(suite.ctx, suite.initGroups[0].Name, wreq) + _, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq) re.Error(err) time.Sleep(time.Millisecond * 200) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate")) - controller.Stop() } // TestSwitchBurst is used to test https://github.com/tikv/pd/issues/6209 @@ -411,6 +505,7 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { } controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, controller.EnableSingleGroupByKeyspace()) controller.Start(suite.ctx) + defer controller.Stop() resourceGroupName := suite.initGroups[1].Name tcs := tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 2, times: 100, waitDuration: 0} for j := 0; j < tcs.times; j++ { @@ -499,14 +594,28 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { re.Less(duration, 100*time.Millisecond) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/acceleratedReportingPeriod")) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/acceleratedSpeedTrend")) - controller.Stop() } func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { re := suite.Require() cli := suite.client - for _, group := range suite.initGroups { + groupNames := []string{"penalty_test1", "penalty_test2"} + // Mock add 2 resource groups. + group := &rmpb.ResourceGroup{ + Name: groupNames[0], + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 10000, + }, + Tokens: 100000, + }, + }, + } + for _, name := range groupNames { + group.Name = name resp, err := cli.AddResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") @@ -521,8 +630,9 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { } c, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, controller.EnableSingleGroupByKeyspace()) c.Start(suite.ctx) + defer c.Stop() - resourceGroupName := suite.initGroups[1].Name + resourceGroupName := groupNames[0] // init req := controller.NewTestRequestInfo(false, 0, 2 /* store2 */) resp := controller.NewTestResponseInfo(0, time.Duration(30), true) @@ -581,7 +691,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { re.NoError(err) // from different group, should be zero - resourceGroupName = suite.initGroups[2].Name + resourceGroupName = groupNames[1] req4 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */) resp4 := controller.NewTestResponseInfo(0, time.Duration(10), true) _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4) @@ -595,8 +705,7 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { re := suite.Require() cli := suite.client - groups := make([]*rmpb.ResourceGroup, 0) - groups = append(groups, suite.initGroups...) + groups := suite.initGroups for _, group := range groups { resp, err := cli.AddResourceGroup(suite.ctx, group) re.NoError(err) @@ -608,7 +717,6 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { } re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/resource_manager/server/fastPersist", `return(true)`)) suite.resignAndWaitLeader() - groups = append(groups, &rmpb.ResourceGroup{Name: "test3"}) for i := 0; i < 3; i++ { for _, group := range groups { requests := make([]*rmpb.RequestUnitItem, 0) @@ -668,8 +776,8 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { expectMarshal string modifySettings func(*rmpb.ResourceGroup) }{ - {"test1", rmpb.GroupMode_RUMode, true, true, - `{"name":"test1","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":10000},"state":{"initialized":false}}},"priority":0}`, + {"CRUD_test1", rmpb.GroupMode_RUMode, true, true, + `{"name":"CRUD_test1","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":10000},"state":{"initialized":false}}},"priority":0}`, func(gs *rmpb.ResourceGroup) { gs.RUSettings = &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ @@ -681,8 +789,8 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { }, }, - {"test2", rmpb.GroupMode_RUMode, true, true, - `{"name":"test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":20000},"state":{"initialized":false}}},"priority":0}`, + {"CRUD_test2", rmpb.GroupMode_RUMode, true, true, + `{"name":"CRUD_test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":20000},"state":{"initialized":false}}},"priority":0}`, func(gs *rmpb.ResourceGroup) { gs.RUSettings = &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ @@ -693,8 +801,8 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { } }, }, - {"test2", rmpb.GroupMode_RUMode, false, true, - `{"name":"test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":30000,"burst_limit":-1},"state":{"initialized":false}}},"priority":0}`, + {"CRUD_test2", rmpb.GroupMode_RUMode, false, true, + `{"name":"CRUD_test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":30000,"burst_limit":-1},"state":{"initialized":false}}},"priority":0}`, func(gs *rmpb.ResourceGroup) { gs.RUSettings = &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ @@ -909,15 +1017,10 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientFailover() cli := suite.client group := &rmpb.ResourceGroup{ - Name: "test3", + Name: "failover_test", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{ - RU: &rmpb.TokenBucket{ - Settings: &rmpb.TokenLimitSettings{ - FillRate: 10000, - }, - Tokens: 100000, - }, + RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{}}, }, } addResp, err := cli.AddResourceGroup(suite.ctx, group) @@ -946,8 +1049,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo re := suite.Require() cli := suite.client + groupName := "mode_test" group := &rmpb.ResourceGroup{ - Name: "modetest", + Name: groupName, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ @@ -974,6 +1078,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/degradedModeRU", "return(true)")) controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg) controller.Start(suite.ctx) + defer controller.Stop() tc := tokenConsumptionPerSecond{ rruTokensAtATime: 0, wruTokensAtATime: 10000, @@ -982,15 +1087,15 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo rruTokensAtATime: 0, wruTokensAtATime: 2, } - controller.OnRequestWait(suite.ctx, "modetest", tc.makeWriteRequest()) + controller.OnRequestWait(suite.ctx, groupName, tc.makeWriteRequest()) time.Sleep(time.Second * 2) beginTime := time.Now() // This is used to make sure resource group in lowRU. for i := 0; i < 100; i++ { - controller.OnRequestWait(suite.ctx, "modetest", tc2.makeWriteRequest()) + controller.OnRequestWait(suite.ctx, groupName, tc2.makeWriteRequest()) } for i := 0; i < 100; i++ { - controller.OnRequestWait(suite.ctx, "modetest", tc.makeWriteRequest()) + controller.OnRequestWait(suite.ctx, groupName, tc.makeWriteRequest()) } endTime := time.Now() // we can not check `inDegradedMode` because of data race. @@ -1008,7 +1113,7 @@ func (suite *resourceManagerClientTestSuite) TestLoadRequestUnitConfig() { re.NoError(err) config := ctr.GetConfig() re.NotNil(config) - expectedConfig := controller.DefaultConfig() + expectedConfig := controller.DefaultRUConfig() re.Equal(expectedConfig.ReadBaseCost, config.ReadBaseCost) re.Equal(expectedConfig.ReadBytesCost, config.ReadBytesCost) re.Equal(expectedConfig.WriteBaseCost, config.WriteBaseCost) @@ -1026,9 +1131,9 @@ func (suite *resourceManagerClientTestSuite) TestLoadRequestUnitConfig() { re.NoError(err) config = ctr.GetConfig() re.NotNil(config) - controllerConfig := controller.DefaultControllerConfig() + controllerConfig := controller.DefaultConfig() controllerConfig.RequestUnit = *ruConfig - expectedConfig = controller.GenerateConfig(controllerConfig) + expectedConfig = controller.GenerateRUConfig(controllerConfig) re.Equal(expectedConfig.ReadBaseCost, config.ReadBaseCost) re.Equal(expectedConfig.ReadBytesCost, config.ReadBytesCost) re.Equal(expectedConfig.WriteBaseCost, config.WriteBaseCost) @@ -1042,19 +1147,22 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() { re := suite.Require() cli := suite.client - for _, group := range suite.initGroups { - resp, err := cli.AddResourceGroup(suite.ctx, group) - re.NoError(err) - re.Contains(resp, "Success!") + // Mock add resource group. + group := &rmpb.ResourceGroup{ + Name: "stale_test", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{}}, + }, } + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") - ruConfig := &controller.RequestUnitConfig{ - ReadBaseCost: 1, - ReadCostPerByte: 1, - } re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/fastCleanup", `return(true)`)) - controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, ruConfig) + controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) controller.Start(suite.ctx) + defer controller.Stop() testConfig := struct { tcs tokenConsumptionPerSecond @@ -1069,14 +1177,125 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() { rreq := testConfig.tcs.makeReadRequest() rres := testConfig.tcs.makeReadResponse() for j := 0; j < testConfig.times; j++ { - controller.OnRequestWait(suite.ctx, suite.initGroups[0].Name, rreq) - controller.OnResponse(suite.initGroups[0].Name, rreq, rres) + controller.OnRequestWait(suite.ctx, group.Name, rreq) + controller.OnResponse(group.Name, rreq, rres) time.Sleep(100 * time.Microsecond) } time.Sleep(1 * time.Second) - re.False(controller.CheckResourceGroupExist(suite.initGroups[0].Name)) + re.Nil(controller.GetActiveResourceGroup(group.Name)) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/fastCleanup")) - controller.Stop() +} + +func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigChanged() { + re := suite.Require() + cli := suite.client + // Mock add resource group. + group := &rmpb.ResourceGroup{ + Name: "config_change_test", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{}}, + }, + } + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + + c1, err := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) + re.NoError(err) + c1.Start(suite.ctx) + // with client option + c2, err := controller.NewResourceGroupController(suite.ctx, 2, cli, nil, controller.WithMaxWaitDuration(time.Hour)) + re.NoError(err) + c2.Start(suite.ctx) + // helper function for sending HTTP requests and checking responses + sendRequest := func(method, url string, body io.Reader) []byte { + req, err := http.NewRequest(method, url, body) + re.NoError(err) + resp, err := http.DefaultClient.Do(req) + re.NoError(err) + defer resp.Body.Close() + bytes, err := io.ReadAll(resp.Body) + re.NoError(err) + if resp.StatusCode != http.StatusOK { + re.Fail(string(bytes)) + } + return bytes + } + + getAddr := func() string { + server := suite.cluster.GetServer(suite.cluster.GetLeader()) + if rand.Intn(100)%2 == 1 { + server = suite.cluster.GetServer(suite.cluster.GetFollower()) + } + return server.GetAddr() + } + + configURL := "/resource-manager/api/v1/config/controller" + waitDuration := 10 * time.Second + readBaseCost := 1.5 + defaultCfg := controller.DefaultConfig() + // failpoint enableDegradedMode will setup and set it be 1s. + defaultCfg.DegradedModeWaitDuration.Duration = time.Second + expectRUCfg := controller.GenerateRUConfig(defaultCfg) + // initial config verification + respString := sendRequest("GET", getAddr()+configURL, nil) + defaultString, err := json.Marshal(defaultCfg) + re.NoError(err) + re.JSONEq(string(respString), string(defaultString)) + re.EqualValues(expectRUCfg, c1.GetConfig()) + + testCases := []struct { + configJSON string + value interface{} + expected func(ruConfig *controller.RUConfig) + }{ + { + configJSON: fmt.Sprintf(`{"degraded-mode-wait-duration": "%v"}`, waitDuration), + value: waitDuration, + expected: func(ruConfig *controller.RUConfig) { ruConfig.DegradedModeWaitDuration = waitDuration }, + }, + { + configJSON: fmt.Sprintf(`{"ltb-max-wait-duration": "%v"}`, waitDuration), + value: waitDuration, + expected: func(ruConfig *controller.RUConfig) { ruConfig.LTBMaxWaitDuration = waitDuration }, + }, + { + configJSON: fmt.Sprintf(`{"read-base-cost": %v}`, readBaseCost), + value: readBaseCost, + expected: func(ruConfig *controller.RUConfig) { ruConfig.ReadBaseCost = controller.RequestUnit(readBaseCost) }, + }, + { + configJSON: fmt.Sprintf(`{"write-base-cost": %v}`, readBaseCost*2), + value: readBaseCost * 2, + expected: func(ruConfig *controller.RUConfig) { ruConfig.WriteBaseCost = controller.RequestUnit(readBaseCost * 2) }, + }, + { + // reset the degraded-mode-wait-duration to default in test. + configJSON: fmt.Sprintf(`{"degraded-mode-wait-duration": "%v"}`, time.Second), + value: time.Second, + expected: func(ruConfig *controller.RUConfig) { ruConfig.DegradedModeWaitDuration = time.Second }, + }, + } + // change properties one by one and verify each time + for _, t := range testCases { + sendRequest("POST", getAddr()+configURL, strings.NewReader(t.configJSON)) + time.Sleep(500 * time.Millisecond) + t.expected(expectRUCfg) + re.EqualValues(expectRUCfg, c1.GetConfig()) + + expectRUCfg2 := *expectRUCfg + // always apply the client option + expectRUCfg2.LTBMaxWaitDuration = time.Hour + re.EqualValues(&expectRUCfg2, c2.GetConfig()) + } + // restart c1 + c1.Stop() + c1, err = controller.NewResourceGroupController(suite.ctx, 1, cli, nil) + re.NoError(err) + re.EqualValues(expectRUCfg, c1.GetConfig()) + c1.Stop() + c2.Stop() } diff --git a/tests/integrations/tso/Makefile b/tests/integrations/tso/Makefile index c7aba3c0af2..e3c8c14a7ae 100644 --- a/tests/integrations/tso/Makefile +++ b/tests/integrations/tso/Makefile @@ -18,9 +18,10 @@ PATH := $(GO_TOOLS_BIN_PATH):$(PATH) SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash) static: install-tools + @ echo "gofmt ..." @ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }' @ echo "golangci-lint ..." - @ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./... --allow-parallel-runners + @ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./... @ echo "revive ..." @ revive -formatter friendly -config $(ROOT_PATH)/revive.toml ./... diff --git a/tools/pd-tso-bench/go.sum b/tools/pd-tso-bench/go.sum index 5b615224d68..52b83bd79ee 100644 --- a/tools/pd-tso-bench/go.sum +++ b/tools/pd-tso-bench/go.sum @@ -694,6 +694,7 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69 github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= +github.com/golang/glog v1.1.0/go.mod h1:pfYeQZ3JWZoXTV5sFc986z3HTpwQs9At6P4ImfuP3NQ= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=