Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_control: supports dynamically change the controller config #7042

Merged
merged 11 commits into from
Sep 18, 2023
1 change: 1 addition & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/tikv/pd/client
go 1.21

require (
github.com/BurntSushi/toml v0.3.1
github.com/cloudfoundry/gosigar v1.3.6
github.com/gogo/protobuf v1.3.2
github.com/opentracing/opentracing-go v1.2.0
Expand Down
1 change: 1 addition & 0 deletions client/go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down
44 changes: 22 additions & 22 deletions client/resource_group/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const (
// According to the resource control Grafana panel and Prometheus sampling period, the period should be the factor of 15.
defaultTargetPeriod = 5 * time.Second
// defaultMaxWaitDuration is the max duration to wait for the token before throwing error.
defaultMaxWaitDuration = time.Second
defaultMaxWaitDuration = 30 * time.Second
)

const (
Expand All @@ -73,14 +73,17 @@ const (

// Because the resource manager has not been deployed in microservice mode,
// do not enable this function.
defaultDegradedModeWaitDuration = "0s"
defaultDegradedModeWaitDuration = 0
defaultAvgBatchProportion = 0.7
)

// Config is the configuration of the resource manager controller which includes some option for client needed.
type Config struct {
// EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect.
DegradedModeWaitDuration string `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`
DegradedModeWaitDuration Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`

// LTBMaxWaitDuration is the max wait time duration for local token bucket.
LTBMaxWaitDuration Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"`

// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
// This configuration should be modified carefully.
Expand All @@ -90,7 +93,8 @@ type Config struct {
// DefaultConfig returns the default resource manager controller configuration.
func DefaultConfig() *Config {
return &Config{
DegradedModeWaitDuration: defaultDegradedModeWaitDuration,
DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration),
LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration),
RequestUnit: DefaultRequestUnitConfig(),
}
}
Expand Down Expand Up @@ -143,8 +147,10 @@ type RUConfig struct {
WriteBytesCost RequestUnit
CPUMsCost RequestUnit
// The CPU statistics need to distinguish between different environments.
isSingleGroupByKeyspace bool
maxWaitDuration time.Duration
isSingleGroupByKeyspace bool

// some config for client
LTBMaxWaitDuration time.Duration
DegradedModeWaitDuration time.Duration
}

Expand All @@ -157,21 +163,15 @@ func DefaultRUConfig() *RUConfig {

// GenerateRUConfig generates the configuration by the given request unit configuration.
func GenerateRUConfig(config *Config) *RUConfig {
cfg := &RUConfig{
ReadBaseCost: RequestUnit(config.RequestUnit.ReadBaseCost),
ReadPerBatchBaseCost: RequestUnit(config.RequestUnit.ReadPerBatchBaseCost),
ReadBytesCost: RequestUnit(config.RequestUnit.ReadCostPerByte),
WriteBaseCost: RequestUnit(config.RequestUnit.WriteBaseCost),
WritePerBatchBaseCost: RequestUnit(config.RequestUnit.WritePerBatchBaseCost),
WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte),
CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost),
maxWaitDuration: defaultMaxWaitDuration,
}
duration, err := time.ParseDuration(config.DegradedModeWaitDuration)
if err != nil {
cfg.DegradedModeWaitDuration, _ = time.ParseDuration(defaultDegradedModeWaitDuration)
} else {
cfg.DegradedModeWaitDuration = duration
return &RUConfig{
ReadBaseCost: RequestUnit(config.RequestUnit.ReadBaseCost),
ReadPerBatchBaseCost: RequestUnit(config.RequestUnit.ReadPerBatchBaseCost),
ReadBytesCost: RequestUnit(config.RequestUnit.ReadCostPerByte),
WriteBaseCost: RequestUnit(config.RequestUnit.WriteBaseCost),
WritePerBatchBaseCost: RequestUnit(config.RequestUnit.WritePerBatchBaseCost),
WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte),
CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost),
LTBMaxWaitDuration: config.LTBMaxWaitDuration.Duration,
DegradedModeWaitDuration: config.DegradedModeWaitDuration.Duration,
}
return cfg
}
123 changes: 91 additions & 32 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,11 @@
ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)
LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]pd.GlobalConfigItem, int64, error)
LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error)

// meta storage client
Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error)
Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error)
}

// ResourceControlCreateOption create a ResourceGroupsController with the optional settings.
Expand All @@ -89,7 +91,7 @@
// WithMaxWaitDuration is the option to set the max wait duration for acquiring token buckets.
func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption {
return func(controller *ResourceGroupsController) {
controller.ruConfig.maxWaitDuration = d
controller.ruConfig.LTBMaxWaitDuration = d
}
}

Expand Down Expand Up @@ -122,6 +124,11 @@
// Currently, we don't do multiple `AcquireTokenBuckets`` at the same time, so there are no concurrency problems with `currentRequests`.
currentRequests []*rmpb.TokenBucketRequest
}

opts []ResourceControlCreateOption

// a cache for ru config and make concurrency safe.
safeRuConfig atomic.Pointer[RUConfig]
}

// NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor
Expand All @@ -139,7 +146,7 @@
if requestUnitConfig != nil {
config.RequestUnit = *requestUnitConfig
}
log.Info("load resource controller config", zap.Reflect("config", config))

ruConfig := GenerateRUConfig(config)
controller := &ResourceGroupsController{
clientUniqueID: clientUniqueID,
Expand All @@ -148,34 +155,37 @@
lowTokenNotifyChan: make(chan struct{}, 1),
tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1),
tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen),
opts: opts,
}
for _, opt := range opts {
opt(controller)
}
log.Info("load resource controller config", zap.Reflect("config", config), zap.Reflect("ru-config", controller.ruConfig))
controller.calculators = []ResourceCalculator{newKVCalculator(controller.ruConfig), newSQLCalculator(controller.ruConfig)}
controller.safeRuConfig.Store(controller.ruConfig)
return controller, nil
}

func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Config, error) {
items, _, err := provider.LoadGlobalConfig(ctx, nil, controllerConfigPath)
resp, err := provider.Get(ctx, []byte(controllerConfigPath))
if err != nil {
return nil, err
}
if len(items) == 0 {
if len(resp.Kvs) == 0 {
log.Warn("[resource group controller] server does not save config, load config failed")
return DefaultConfig(), nil
}
config := &Config{}
err = json.Unmarshal(items[0].PayLoad, config)
err = json.Unmarshal(resp.Kvs[0].GetValue(), config)
if err != nil {
return nil, err
}
return config, nil
}

// GetConfig returns the config of controller. It's only used for test.
// GetConfig returns the config of controller.
func (c *ResourceGroupsController) GetConfig() *RUConfig {
return c.ruConfig
return c.safeRuConfig.Load()
}

// Source List
Expand Down Expand Up @@ -213,22 +223,63 @@
stateUpdateTicker = time.NewTicker(time.Millisecond * 100)
})

_, revision, err := c.provider.LoadResourceGroups(ctx)
_, metaRevision, err := c.provider.LoadResourceGroups(ctx)
if err != nil {
log.Warn("load resource group revision failed", zap.Error(err))

Check warning on line 228 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L228

Added line #L228 was not covered by tests
}
resp, err := c.provider.Get(ctx, []byte(controllerConfigPath))
if err != nil {
log.Warn("load resource group revision failed", zap.Error(err))
}
var watchChannel chan []*meta_storagepb.Event
cfgRevision := resp.GetHeader().GetRevision()
var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event
if !c.ruConfig.isSingleGroupByKeyspace {
watchChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix())
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))

Check warning on line 239 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L239

Added line #L239 was not covered by tests
}
}
watchRetryTimer := time.NewTimer(watchRetryInterval)
if err == nil || c.ruConfig.isSingleGroupByKeyspace {
watchRetryTimer.Stop()

watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group config failed", zap.Error(err))

Check warning on line 245 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L245

Added line #L245 was not covered by tests
}
watchRetryTimer := time.NewTimer(watchRetryInterval)
defer watchRetryTimer.Stop()

for {
select {
/* tickers */
case <-cleanupTicker.C:
c.cleanUpResourceGroup()
case <-stateUpdateTicker.C:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
if len(c.run.currentRequests) == 0 {
c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */)
}
case <-watchRetryTimer.C:
if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil {
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
watchRetryTimer.Reset(watchRetryInterval)

Check warning on line 266 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L265-L266

Added lines #L265 - L266 were not covered by tests
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
})

Check warning on line 269 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L268-L269

Added lines #L268 - L269 were not covered by tests
}
}
if watchConfigChannel == nil {
watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group config failed", zap.Error(err))
watchRetryTimer.Reset(watchRetryInterval)

Check warning on line 276 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L273-L276

Added lines #L273 - L276 were not covered by tests
}
}

case <-emergencyTokenAcquisitionTicker.C:
c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition)
/* channels */
case <-c.loopCtx.Done():
resourceGroupStatusGauge.Reset()
return
Expand All @@ -242,14 +293,6 @@
c.handleTokenBucketResponse(resp)
}
c.run.currentRequests = nil
case <-cleanupTicker.C:
c.cleanUpResourceGroup()
case <-stateUpdateTicker.C:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
if len(c.run.currentRequests) == 0 {
c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */)
}
case <-c.lowTokenNotifyChan:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
Expand All @@ -259,24 +302,22 @@
if c.run.inDegradedMode {
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
}
case <-emergencyTokenAcquisitionTicker.C:
c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition)
case resp, ok := <-watchChannel:
case resp, ok := <-watchMetaChannel:
failpoint.Inject("disableWatch", func() {
if c.ruConfig.isSingleGroupByKeyspace {
panic("disableWatch")
}
})
if !ok {
watchChannel = nil
watchMetaChannel = nil
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
})
continue
}
for _, item := range resp {
revision = item.Kv.ModRevision
metaRevision = item.Kv.ModRevision
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
Expand All @@ -293,14 +334,32 @@
}
}
}
case <-watchRetryTimer.C:
watchChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix())
if err != nil {
case resp, ok := <-watchConfigChannel:
if !ok {
watchConfigChannel = nil
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
})
continue
}
for _, item := range resp {
cfgRevision = item.Kv.ModRevision
config := &Config{}
if err := json.Unmarshal(item.Kv.Value, config); err != nil {
continue

Check warning on line 350 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L350

Added line #L350 was not covered by tests
}
c.ruConfig = GenerateRUConfig(config)

// Stay compatible with serverless
for _, opt := range c.opts {
opt(c)
}
copyCfg := *c.ruConfig
c.safeRuConfig.Store(&copyCfg)
log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig))
}

case gc := <-c.tokenBucketUpdateChan:
now := gc.run.now
go gc.handleTokenBucketUpdateEvent(c.loopCtx, now)
Expand Down Expand Up @@ -1127,7 +1186,7 @@
res := make([]*Reservation, 0, len(requestResourceLimitTypeList))
for typ, counter := range gc.run.resourceTokens {
if v := getRawResourceValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v))
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))

Check warning on line 1189 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L1189

Added line #L1189 was not covered by tests
}
}
if d, err = WaitReservations(ctx, now, res); err == nil {
Expand All @@ -1137,7 +1196,7 @@
res := make([]*Reservation, 0, len(requestUnitLimitTypeList))
for typ, counter := range gc.run.requestUnitTokens {
if v := getRUValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v))
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil {
Expand Down
Loading
Loading