diff --git a/client/resource_group/controller/config.go b/client/resource_group/controller/config.go index 2095bc606018..5cd34e43105e 100644 --- a/client/resource_group/controller/config.go +++ b/client/resource_group/controller/config.go @@ -18,6 +18,7 @@ import ( "time" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/tikv/pd/pkg/typeutil" ) var ( @@ -51,7 +52,7 @@ const ( // According to the resource control Grafana panel and Prometheus sampling period, the period should be the factor of 15. defaultTargetPeriod = 5 * time.Second // defaultMaxWaitDuration is the max duration to wait for the token before throwing error. - defaultMaxWaitDuration = time.Second + defaultMaxWaitDuration = 30 * time.Second ) const ( @@ -82,6 +83,9 @@ type Config struct { // EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect. DegradedModeWaitDuration string `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"` + // LTBMaxWaitDuration is the max wait time duration for local token bucket. + LTBMaxWaitDuration typeutil.Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"` + // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"` @@ -91,6 +95,7 @@ type Config struct { func DefaultConfig() *Config { return &Config{ DegradedModeWaitDuration: defaultDegradedModeWaitDuration, + LTBMaxWaitDuration: defaultMaxWaitDuration, RequestUnit: DefaultRequestUnitConfig(), } } @@ -143,8 +148,10 @@ type RUConfig struct { WriteBytesCost RequestUnit CPUMsCost RequestUnit // The CPU statistics need to distinguish between different environments. - isSingleGroupByKeyspace bool - maxWaitDuration time.Duration + isSingleGroupByKeyspace bool + + // some config for client + LTBMaxWaitDuration time.Duration DegradedModeWaitDuration time.Duration } @@ -165,7 +172,7 @@ func GenerateRUConfig(config *Config) *RUConfig { WritePerBatchBaseCost: RequestUnit(config.RequestUnit.WritePerBatchBaseCost), WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte), CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost), - maxWaitDuration: defaultMaxWaitDuration, + LTBMaxWaitDuration: config.LTBMaxWaitDuration, } duration, err := time.ParseDuration(config.DegradedModeWaitDuration) if err != nil { diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index c79bfec1e565..8597d73cb134 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -89,7 +89,7 @@ func EnableSingleGroupByKeyspace() ResourceControlCreateOption { // WithMaxWaitDuration is the option to set the max wait duration for acquiring token buckets. func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption { return func(controller *ResourceGroupsController) { - controller.ruConfig.maxWaitDuration = d + controller.ruConfig.LTBMaxWaitDuration = d } } @@ -122,6 +122,8 @@ type ResourceGroupsController struct { // Currently, we don't do multiple `AcquireTokenBuckets`` at the same time, so there are no concurrency problems with `currentRequests`. currentRequests []*rmpb.TokenBucketRequest } + + opts []ResourceControlCreateOption } // NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor @@ -148,6 +150,7 @@ func NewResourceGroupController( lowTokenNotifyChan: make(chan struct{}, 1), tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1), tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen), + opts: opts, } for _, opt := range opts { opt(controller) @@ -213,22 +216,61 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { stateUpdateTicker = time.NewTicker(time.Millisecond * 100) }) - _, revision, err := c.provider.LoadResourceGroups(ctx) + _, metaRevision, err := c.provider.LoadResourceGroups(ctx) + if err != nil { + log.Warn("load resource group revision failed", zap.Error(err)) + } + _, cfgRevision, err := c.provider.LoadGlobalConfig(ctx, nil, controllerConfigPath) if err != nil { log.Warn("load resource group revision failed", zap.Error(err)) } - var watchChannel chan []*meta_storagepb.Event + var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event if !c.ruConfig.isSingleGroupByKeyspace { - watchChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix()) + watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix()) + if err != nil { + log.Warn("watch resource group meta failed", zap.Error(err)) + } } - watchRetryTimer := time.NewTimer(watchRetryInterval) - if err == nil || c.ruConfig.isSingleGroupByKeyspace { - watchRetryTimer.Stop() + watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix()) + if err != nil { + log.Warn("watch resource group config failed", zap.Error(err)) } + watchRetryTimer := time.NewTimer(watchRetryInterval) defer watchRetryTimer.Stop() for { select { + /* tickers */ + case <-cleanupTicker.C: + c.cleanUpResourceGroup() + case <-stateUpdateTicker.C: + c.executeOnAllGroups((*groupCostController).updateRunState) + c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) + if len(c.run.currentRequests) == 0 { + c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */) + } + case <-watchRetryTimer.C: + if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil { + watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix()) + if err != nil { + log.Warn("watch resource group meta failed", zap.Error(err)) + watchRetryTimer.Reset(watchRetryInterval) + failpoint.Inject("watchStreamError", func() { + watchRetryTimer.Reset(20 * time.Millisecond) + }) + } + } + if watchConfigChannel == nil { + watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix()) + if err != nil { + log.Warn("watch resource group config failed", zap.Error(err)) + watchRetryTimer.Reset(watchRetryInterval) + } + } + + case <-emergencyTokenAcquisitionTicker.C: + c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition) + /* channels */ case <-c.loopCtx.Done(): resourceGroupStatusGauge.Reset() return @@ -242,14 +284,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { c.handleTokenBucketResponse(resp) } c.run.currentRequests = nil - case <-cleanupTicker.C: - c.cleanUpResourceGroup() - case <-stateUpdateTicker.C: - c.executeOnAllGroups((*groupCostController).updateRunState) - c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) - if len(c.run.currentRequests) == 0 { - c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */) - } case <-c.lowTokenNotifyChan: c.executeOnAllGroups((*groupCostController).updateRunState) c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) @@ -259,16 +293,14 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { if c.run.inDegradedMode { c.executeOnAllGroups((*groupCostController).applyDegradedMode) } - case <-emergencyTokenAcquisitionTicker.C: - c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition) - case resp, ok := <-watchChannel: + case resp, ok := <-watchMetaChannel: failpoint.Inject("disableWatch", func() { if c.ruConfig.isSingleGroupByKeyspace { panic("disableWatch") } }) if !ok { - watchChannel = nil + watchMetaChannel = nil watchRetryTimer.Reset(watchRetryInterval) failpoint.Inject("watchStreamError", func() { watchRetryTimer.Reset(20 * time.Millisecond) @@ -276,7 +308,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { continue } for _, item := range resp { - revision = item.Kv.ModRevision + metaRevision = item.Kv.ModRevision group := &rmpb.ResourceGroup{} if err := proto.Unmarshal(item.Kv.Value, group); err != nil { continue @@ -293,14 +325,32 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { } } } - case <-watchRetryTimer.C: - watchChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix()) - if err != nil { + case resp, ok := <-watchConfigChannel: + if !ok { + watchConfigChannel = nil watchRetryTimer.Reset(watchRetryInterval) failpoint.Inject("watchStreamError", func() { watchRetryTimer.Reset(20 * time.Millisecond) }) + continue } + for _, item := range resp { + cfgRevision = item.Kv.ModRevision + if !strings.HasPrefix(string(item.Kv.Key), controllerConfigPath) { + continue + } + config := &Config{} + if err := json.Unmarshal(item.Kv.Value, config); err != nil { + continue + } + c.ruConfig = GenerateRUConfig(config) + // Stay compatible with serverless + for _, opt := range c.opts { + opt(c) + } + log.Info("load resource controller config after config changed", zap.Reflect("config", config)) + } + case gc := <-c.tokenBucketUpdateChan: now := gc.run.now go gc.handleTokenBucketUpdateEvent(c.loopCtx, now) @@ -1127,7 +1177,7 @@ func (gc *groupCostController) onRequestWait( res := make([]*Reservation, 0, len(requestResourceLimitTypeList)) for typ, counter := range gc.run.resourceTokens { if v := getRawResourceValueFromConsumption(delta, typ); v > 0 { - res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v)) + res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) } } if d, err = WaitReservations(ctx, now, res); err == nil { @@ -1137,7 +1187,7 @@ func (gc *groupCostController) onRequestWait( res := make([]*Reservation, 0, len(requestUnitLimitTypeList)) for typ, counter := range gc.run.requestUnitTokens { if v := getRUValueFromConsumption(delta, typ); v > 0 { - res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v)) + res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) } } if d, err = WaitReservations(ctx, now, res); err == nil { diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index 61919a2ccb2f..04fcc44c95ab 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -34,6 +34,7 @@ const ( add actionType = 0 modify actionType = 1 groupSettingsPathPrefix = "resource_group/settings" + controllerPathPrefix = "resource_manager/controller" // errNotPrimary is returned when the requested server is not primary. errNotPrimary = "not primary" // errNotLeader is returned when the requested server is not pd leader. @@ -43,6 +44,9 @@ const ( // GroupSettingsPathPrefixBytes is used to watch or get resource groups. var GroupSettingsPathPrefixBytes = []byte(groupSettingsPathPrefix) +// ControllerPathPrefixBytes is used to watch or get controller config. +var ControllerPathPrefixBytes = []byte(controllerPathPrefix) + // ResourceManagerClient manages resource group info and token request. type ResourceManagerClient interface { ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) diff --git a/pkg/mcs/resourcemanager/server/apis/v1/api.go b/pkg/mcs/resourcemanager/server/apis/v1/api.go index 411933e55c34..b769685b54f2 100644 --- a/pkg/mcs/resourcemanager/server/apis/v1/api.go +++ b/pkg/mcs/resourcemanager/server/apis/v1/api.go @@ -16,7 +16,9 @@ package apis import ( "errors" + "fmt" "net/http" + "reflect" "sync" "github.com/gin-contrib/cors" @@ -29,6 +31,7 @@ import ( "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" + "github.com/tikv/pd/pkg/utils/reflectutil" ) // APIPathPrefix is the prefix of the API path. @@ -97,6 +100,8 @@ func (s *Service) RegisterRouter() { configEndpoint.GET("/group/:name", s.getResourceGroup) configEndpoint.GET("/groups", s.getResourceGroupList) configEndpoint.DELETE("/group/:name", s.deleteResourceGroup) + configEndpoint.GET("/controller", s.getControllerConfig) + configEndpoint.POST("/controller", s.setControllerConfig) } func (s *Service) handler() http.Handler { @@ -191,3 +196,43 @@ func (s *Service) deleteResourceGroup(c *gin.Context) { } c.String(http.StatusOK, "Success!") } + +// GetControllerConfig +// +// @Tags ResourceManager +// @Summary Get the resource controller config. +// @Success 200 {string} json format of rmserver.ControllerConfig +// @Failure 400 {string} error +// @Router /config/controller [GET] +func (s *Service) getControllerConfig(c *gin.Context) { + config := s.manager.GetControllerConfig() + c.IndentedJSON(http.StatusOK, config) +} + +// SetControllerConfig +// +// @Tags ResourceManager +// @Summary Set the resource controller config. +// @Param config body object true "json params, rmserver.ControllerConfig" +// @Success 200 {string} string "Success!" +// @Failure 400 {string} error +// @Router /config/controller [POST] +func (s *Service) setControllerConfig(c *gin.Context) { + conf := make(map[string]interface{}) + if err := c.ShouldBindJSON(&conf); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + for k, v := range conf { + key := reflectutil.FindJSONFullTagByChildTag(reflect.TypeOf(rmserver.ControllerConfig{}), k) + if key == "" { + c.String(http.StatusBadRequest, fmt.Sprintf("config item %s not found", k)) + return + } + if err := s.manager.UpdateControllerConfigItem(key, v); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + } + c.String(http.StatusOK, "Success!") +} diff --git a/pkg/mcs/resourcemanager/server/config.go b/pkg/mcs/resourcemanager/server/config.go index 51fbe3884580..c41cae81b743 100644 --- a/pkg/mcs/resourcemanager/server/config.go +++ b/pkg/mcs/resourcemanager/server/config.go @@ -94,6 +94,9 @@ type ControllerConfig struct { // EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect. DegradedModeWaitDuration typeutil.Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"` + // LTBMaxWaitDuration is the max wait time duration for local token bucket. + LTBMaxWaitDuration typeutil.Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"` + // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"` diff --git a/pkg/mcs/resourcemanager/server/config_test.go b/pkg/mcs/resourcemanager/server/config_test.go index c0cac4da9c0c..dd8dd2d2814f 100644 --- a/pkg/mcs/resourcemanager/server/config_test.go +++ b/pkg/mcs/resourcemanager/server/config_test.go @@ -27,6 +27,7 @@ func TestControllerConfig(t *testing.T) { re := require.New(t) cfgData := ` [controller] +ltb-max-wait-duration = "60s" degraded-mode-wait-duration = "2s" [controller.request-unit] read-base-cost = 1.0 @@ -42,6 +43,7 @@ read-cpu-ms-cost = 5.0 re.NoError(err) re.Equal(cfg.Controller.DegradedModeWaitDuration.Duration, time.Second*2) + re.Equal(cfg.Controller.LTBMaxWaitDuration.Duration, time.Second*60) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.CPUMsCost-5), 1e-7) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteCostPerByte-4), 1e-7) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteBaseCost-3), 1e-7) diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index a9e53f347fa0..320e124a9420 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -19,6 +19,7 @@ import ( "encoding/json" "math" "sort" + "strings" "sync" "time" @@ -26,10 +27,12 @@ import ( "github.com/pingcap/failpoint" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/log" + "github.com/pkg/errors" bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/jsonutil" "github.com/tikv/pd/pkg/utils/logutil" "go.uber.org/zap" ) @@ -102,8 +105,14 @@ func (m *Manager) GetBasicServer() bs.Server { // Init initializes the resource group manager. func (m *Manager) Init(ctx context.Context) { - // Todo: If we can modify following configs in the future, we should reload these configs. - // Store the controller config into the storage. + v, err := m.storage.LoadControllerConfig() + if err != nil { + log.Error("resource controller config load failed, fallback to default config", zap.Error(err), zap.String("v", v)) + } else if err = json.Unmarshal([]byte(v), &m.controllerConfig); err != nil { + log.Error("un-marshall controller config failed", zap.Error(err), zap.String("v", v)) + } + + // re-save the config to make sure the config has been persisted. m.storage.SaveControllerConfig(m.controllerConfig) // Load resource group meta info from storage. m.groups = make(map[string]*ResourceGroup) @@ -158,6 +167,47 @@ func (m *Manager) Init(ctx context.Context) { log.Info("resource group manager finishes initialization") } +// UpdateControllerConfigItem updates the controller config item. +func (m *Manager) UpdateControllerConfigItem(key string, value interface{}) error { + kp := strings.Split(key, ".") + if len(kp) == 0 { + return errors.Errorf("invalid key %s", key) + } + m.Lock() + var config interface{} + switch kp[0] { + case "request-unit": + config = &m.controllerConfig.RequestUnit + default: + config = m.controllerConfig + } + updated, found, err := jsonutil.AddKeyValue(config, kp[len(kp)-1], value) + if err != nil { + m.Unlock() + return err + } + + if !found { + m.Unlock() + return errors.Errorf("config item %s not found", key) + } + m.Unlock() + if updated { + if err := m.storage.SaveControllerConfig(m.controllerConfig); err != nil { + log.Error("save controller config failed", zap.Error(err)) + } + log.Info("updated controller config item", zap.String("key", key), zap.Any("value", value)) + } + return nil +} + +// GetControllerConfig returns the controller config. +func (m *Manager) GetControllerConfig() *ControllerConfig { + m.RLock() + defer m.RUnlock() + return m.controllerConfig +} + // AddResourceGroup puts a resource group. // NOTE: AddResourceGroup should also be idempotent because tidb depends // on this retry mechanism. diff --git a/pkg/storage/endpoint/resource_group.go b/pkg/storage/endpoint/resource_group.go index f1b3feb36aab..150ea77a1c7f 100644 --- a/pkg/storage/endpoint/resource_group.go +++ b/pkg/storage/endpoint/resource_group.go @@ -27,6 +27,7 @@ type ResourceGroupStorage interface { SaveResourceGroupStates(name string, obj interface{}) error DeleteResourceGroupStates(name string) error SaveControllerConfig(config interface{}) error + LoadControllerConfig() (string, error) } var _ ResourceGroupStorage = (*StorageEndpoint)(nil) @@ -65,3 +66,8 @@ func (se *StorageEndpoint) LoadResourceGroupStates(f func(k, v string)) error { func (se *StorageEndpoint) SaveControllerConfig(config interface{}) error { return se.saveJSON(controllerConfigPath, config) } + +// LoadControllerConfig loads the resource controller config from storage. +func (se *StorageEndpoint) LoadControllerConfig() (string, error) { + return se.Load(controllerConfigPath) +} diff --git a/pkg/utils/reflectutil/tag.go b/pkg/utils/reflectutil/tag.go index 3671519f96b0..a383b398deef 100644 --- a/pkg/utils/reflectutil/tag.go +++ b/pkg/utils/reflectutil/tag.go @@ -17,6 +17,9 @@ package reflectutil import ( "reflect" "strings" + + "github.com/pingcap/log" + "go.uber.org/zap" ) // FindJSONFullTagByChildTag is used to find field by child json tag recursively and return the full tag of field. @@ -46,8 +49,10 @@ func FindJSONFullTagByChildTag(t reflect.Type, tag string) string { // FindSameFieldByJSON is used to check whether there is same field between `m` and `v` func FindSameFieldByJSON(v interface{}, m map[string]interface{}) bool { t := reflect.TypeOf(v).Elem() + log.Info("debug", zap.String("kind", t.Kind().String()), zap.Int("num", t.NumField())) for i := 0; i < t.NumField(); i++ { jsonTag := t.Field(i).Tag.Get("json") + log.Info("debug", zap.String("kind", t.Kind().String()), zap.String("tag", jsonTag)) if i := strings.Index(jsonTag, ","); i != -1 { // trim 'foobar,string' to 'foobar' jsonTag = jsonTag[:i] }