From 620237127dad4a6c18078cd63c7316d00cb47253 Mon Sep 17 00:00:00 2001 From: nolouch Date: Thu, 4 Jul 2024 15:20:23 +0800 Subject: [PATCH] fix Signed-off-by: nolouch --- client/resource_group/controller/config.go | 26 ++++++++------ .../resource_group/controller/controller.go | 35 +++++++++++++++---- pkg/mcs/resourcemanager/server/config.go | 9 +---- pkg/mcs/resourcemanager/server/config_test.go | 5 --- .../resourcemanager/resource_manager_test.go | 11 +++--- 5 files changed, 49 insertions(+), 37 deletions(-) diff --git a/client/resource_group/controller/config.go b/client/resource_group/controller/config.go index f100596e1704..81ee761d62e1 100644 --- a/client/resource_group/controller/config.go +++ b/client/resource_group/controller/config.go @@ -53,14 +53,16 @@ const ( // defaultMaxWaitDuration is the max duration to wait for the token before throwing error. defaultMaxWaitDuration = 30 * time.Second <<<<<<< HEAD -======= // defaultLTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC. defaultLTBTokenRPCMaxDelay = 1 * time.Second // defaultWaitRetryTimes is the times to retry when waiting for the token. defaultWaitRetryTimes = 20 +======= + // defaultWaitRetryTimes is the times to retry when waiting for the token. + defaultWaitRetryTimes = 10 +>>>>>>> b9240a065... resource_group: add retry configurations (#8041) // defaultWaitRetryInterval is the interval to retry when waiting for the token. defaultWaitRetryInterval = 50 * time.Millisecond ->>>>>>> 6b25787af (resource_control: allow configuration of the maximum retry time for the local bucket (#8352)) ) const ( @@ -110,11 +112,16 @@ type BaseConfig struct { LTBMaxWaitDuration Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"` <<<<<<< HEAD -======= // LTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC. LTBTokenRPCMaxDelay Duration `toml:"ltb-token-rpc-max-delay" json:"ltb-token-rpc-max-delay"` +======= + // WaitRetryInterval is the interval to retry when waiting for the token. + WaitRetryInterval Duration `toml:"wait-retry-interval" json:"wait-retry-interval"` + + // WaitRetryTimes is the times to retry when waiting for the token. + WaitRetryTimes int `toml:"wait-retry-times" json:"wait-retry-times"` +>>>>>>> b9240a065... resource_group: add retry configurations (#8041) ->>>>>>> 6b25787af (resource_control: allow configuration of the maximum retry time for the local bucket (#8352)) // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"` @@ -147,12 +154,6 @@ func (c *Config) Adjust() { // DefaultConfig returns the default resource manager controller configuration. func DefaultConfig() *Config { return &Config{ -<<<<<<< HEAD - DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration), - LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration), - RequestUnit: DefaultRequestUnitConfig(), - EnableControllerTraceLog: false, -======= BaseConfig: BaseConfig{ DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration), RequestUnit: DefaultRequestUnitConfig(), @@ -166,7 +167,6 @@ func DefaultConfig() *Config { WaitRetryTimes: defaultWaitRetryTimes, }, }, ->>>>>>> 6b25787af (resource_control: allow configuration of the maximum retry time for the local bucket (#8352)) } } @@ -222,6 +222,8 @@ type RUConfig struct { // some config for client LTBMaxWaitDuration time.Duration + WaitRetryInterval time.Duration + WaitRetryTimes int DegradedModeWaitDuration time.Duration } @@ -243,6 +245,8 @@ func GenerateRUConfig(config *Config) *RUConfig { WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte), CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost), LTBMaxWaitDuration: config.LTBMaxWaitDuration.Duration, + WaitRetryInterval: config.WaitRetryInterval.Duration, + WaitRetryTimes: config.WaitRetryTimes, DegradedModeWaitDuration: config.DegradedModeWaitDuration.Duration, } } diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index e6a0a7ea500e..0549469776ba 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -38,6 +38,7 @@ import ( ) const ( +<<<<<<< HEAD controllerConfigPath = "resource_group/controller" maxRetry = 10 retryInterval = 50 * time.Millisecond @@ -46,6 +47,14 @@ const ( trickleReserveDuration = 1250 * time.Millisecond slowNotifyFilterDuration = 10 * time.Millisecond watchRetryInterval = 30 * time.Second +======= + controllerConfigPath = "resource_group/controller" + maxNotificationChanLen = 200 + needTokensAmplification = 1.1 + trickleReserveDuration = 1250 * time.Millisecond + + watchRetryInterval = 30 * time.Second +>>>>>>> b9240a065... resource_group: add retry configurations (#8041) ) type selectType int @@ -104,6 +113,20 @@ func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption { } } +// WithWaitRetryInterval is the option to set the retry interval when waiting for the token. +func WithWaitRetryInterval(d time.Duration) ResourceControlCreateOption { + return func(controller *ResourceGroupsController) { + controller.ruConfig.WaitRetryInterval = d + } +} + +// WithWaitRetryTimes is the option to set the times to retry when waiting for the token. +func WithWaitRetryTimes(times int) ResourceControlCreateOption { + return func(controller *ResourceGroupsController) { + controller.ruConfig.WaitRetryTimes = times + } +} + var _ ResourceGroupKVInterceptor = (*ResourceGroupsController)(nil) // ResourceGroupsController implements ResourceGroupKVInterceptor. @@ -189,9 +212,9 @@ func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Con return config, nil } <<<<<<< HEAD - config := &Config{} ======= ->>>>>>> 6b25787af (resource_control: allow configuration of the maximum retry time for the local bucket (#8352)) + config := DefaultConfig() +>>>>>>> b9240a065... resource_group: add retry configurations (#8041) err = json.Unmarshal(kvs[0].GetValue(), config) if err != nil { return nil, err @@ -371,7 +394,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { } for _, item := range resp { cfgRevision = item.Kv.ModRevision - config := &Config{} + config := DefaultConfig() if err := json.Unmarshal(item.Kv.Value, config); err != nil { continue } @@ -1233,7 +1256,7 @@ func (gc *groupCostController) onRequestWait( var i int var d time.Duration retryLoop: - for i = 0; i < maxRetry; i++ { + for i = 0; i < gc.mainCfg.WaitRetryTimes; i++ { switch gc.mode { case rmpb.GroupMode_RawMode: res := make([]*Reservation, 0, len(requestResourceLimitTypeList)) @@ -1257,8 +1280,8 @@ func (gc *groupCostController) onRequestWait( } } gc.metrics.requestRetryCounter.Inc() - time.Sleep(retryInterval) - waitDuration += retryInterval + time.Sleep(gc.mainCfg.WaitRetryInterval) + waitDuration += gc.mainCfg.WaitRetryInterval } if err != nil { if errs.ErrClientResourceGroupThrottled.Equal(err) { diff --git a/pkg/mcs/resourcemanager/server/config.go b/pkg/mcs/resourcemanager/server/config.go index 70041324014d..112750292801 100644 --- a/pkg/mcs/resourcemanager/server/config.go +++ b/pkg/mcs/resourcemanager/server/config.go @@ -117,12 +117,6 @@ func (rmc *ControllerConfig) Adjust(meta *configutil.ConfigMetaData) { if rmc == nil { return } -<<<<<<< HEAD - rmc.RequestUnit.Adjust() - - configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, defaultDegradedModeWaitDuration) - configutil.AdjustDuration(&rmc.LTBMaxWaitDuration, defaultMaxWaitDuration) -======= rmc.RequestUnit.Adjust(meta.Child("request-unit")) if !meta.IsDefined("degraded-mode-wait-duration") { configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, defaultDegradedModeWaitDuration) @@ -133,7 +127,6 @@ func (rmc *ControllerConfig) Adjust(meta *configutil.ConfigMetaData) { if !meta.IsDefined("ltb-token-rpc-max-delay") { configutil.AdjustDuration(&rmc.LTBTokenRPCMaxDelay, defaultLTBTokenRPCMaxDelay) } ->>>>>>> 6b25787af (resource_control: allow configuration of the maximum retry time for the local bucket (#8352)) failpoint.Inject("enableDegradedMode", func() { configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, time.Second) }) @@ -163,7 +156,7 @@ type RequestUnitConfig struct { } // Adjust adjusts the configuration and initializes it with the default value if necessary. -func (ruc *RequestUnitConfig) Adjust() { +func (ruc *RequestUnitConfig) Adjust(_ *configutil.ConfigMetaData) { if ruc == nil { return } diff --git a/pkg/mcs/resourcemanager/server/config_test.go b/pkg/mcs/resourcemanager/server/config_test.go index a50b0a9d3337..47d1ed36515b 100644 --- a/pkg/mcs/resourcemanager/server/config_test.go +++ b/pkg/mcs/resourcemanager/server/config_test.go @@ -43,14 +43,9 @@ read-cpu-ms-cost = 5.0 err = cfg.Adjust(&meta, false) re.NoError(err) -<<<<<<< HEAD - re.Equal(cfg.Controller.DegradedModeWaitDuration.Duration, time.Second*2) - re.Equal(cfg.Controller.LTBMaxWaitDuration.Duration, time.Second*60) -======= re.Equal(2*time.Second, cfg.Controller.DegradedModeWaitDuration.Duration) re.Equal(60*time.Second, cfg.Controller.LTBMaxWaitDuration.Duration) re.Equal(500*time.Millisecond, cfg.Controller.LTBTokenRPCMaxDelay.Duration) ->>>>>>> 6b25787af (resource_control: allow configuration of the maximum retry time for the local bucket (#8352)) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.CPUMsCost-5), 1e-7) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteCostPerByte-4), 1e-7) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteBaseCost-3), 1e-7) diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index 2f791ea1a686..8ee41df1453b 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -33,6 +33,7 @@ import ( "github.com/tikv/pd/client/resource_group/controller" "github.com/tikv/pd/pkg/mcs/resourcemanager/server" "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/tests" "go.uber.org/goleak" @@ -1365,10 +1366,6 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh tokenRPCMaxDelay := 2 * time.Second readBaseCost := 1.5 defaultCfg := controller.DefaultConfig() -<<<<<<< HEAD - // failpoint enableDegradedMode will setup and set it be 1s. - defaultCfg.DegradedModeWaitDuration.Duration = time.Second -======= expectCfg := server.ControllerConfig{ // failpoint enableDegradedMode will setup and set it be 1s. DegradedModeWaitDuration: typeutil.NewDuration(time.Second), @@ -1377,13 +1374,13 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh RequestUnit: server.RequestUnitConfig(defaultCfg.RequestUnit), EnableControllerTraceLog: defaultCfg.EnableControllerTraceLog, } ->>>>>>> 6b25787af (resource_control: allow configuration of the maximum retry time for the local bucket (#8352)) expectRUCfg := controller.GenerateRUConfig(defaultCfg) + expectRUCfg.DegradedModeWaitDuration = time.Second // initial config verification respString := sendRequest("GET", getAddr()+configURL, nil) - defaultString, err := json.Marshal(defaultCfg) + expectStr, err := json.Marshal(expectCfg) re.NoError(err) - re.JSONEq(string(respString), string(defaultString)) + re.JSONEq(string(respString), string(expectStr)) re.EqualValues(expectRUCfg, c1.GetConfig()) testCases := []struct {