diff --git a/client/go.mod b/client/go.mod index 9eb066d0fcc..099bcf86296 100644 --- a/client/go.mod +++ b/client/go.mod @@ -3,6 +3,7 @@ module github.com/tikv/pd/client go 1.21 require ( + github.com/BurntSushi/toml v0.3.1 github.com/cloudfoundry/gosigar v1.3.6 github.com/gogo/protobuf v1.3.2 github.com/opentracing/opentracing-go v1.2.0 diff --git a/client/go.sum b/client/go.sum index 33ba3254d53..9261ab4a999 100644 --- a/client/go.sum +++ b/client/go.sum @@ -1,4 +1,5 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/client/resource_group/controller/config.go b/client/resource_group/controller/config.go index 2095bc60601..16a2525cd0d 100644 --- a/client/resource_group/controller/config.go +++ b/client/resource_group/controller/config.go @@ -51,7 +51,7 @@ const ( // According to the resource control Grafana panel and Prometheus sampling period, the period should be the factor of 15. defaultTargetPeriod = 5 * time.Second // defaultMaxWaitDuration is the max duration to wait for the token before throwing error. - defaultMaxWaitDuration = time.Second + defaultMaxWaitDuration = 30 * time.Second ) const ( @@ -73,14 +73,17 @@ const ( // Because the resource manager has not been deployed in microservice mode, // do not enable this function. - defaultDegradedModeWaitDuration = "0s" + defaultDegradedModeWaitDuration = 0 defaultAvgBatchProportion = 0.7 ) // Config is the configuration of the resource manager controller which includes some option for client needed. type Config struct { // EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect. - DegradedModeWaitDuration string `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"` + DegradedModeWaitDuration Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"` + + // LTBMaxWaitDuration is the max wait time duration for local token bucket. + LTBMaxWaitDuration Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"` // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. @@ -90,7 +93,8 @@ type Config struct { // DefaultConfig returns the default resource manager controller configuration. func DefaultConfig() *Config { return &Config{ - DegradedModeWaitDuration: defaultDegradedModeWaitDuration, + DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration), + LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration), RequestUnit: DefaultRequestUnitConfig(), } } @@ -143,8 +147,10 @@ type RUConfig struct { WriteBytesCost RequestUnit CPUMsCost RequestUnit // The CPU statistics need to distinguish between different environments. - isSingleGroupByKeyspace bool - maxWaitDuration time.Duration + isSingleGroupByKeyspace bool + + // some config for client + LTBMaxWaitDuration time.Duration DegradedModeWaitDuration time.Duration } @@ -157,21 +163,15 @@ func DefaultRUConfig() *RUConfig { // GenerateRUConfig generates the configuration by the given request unit configuration. func GenerateRUConfig(config *Config) *RUConfig { - cfg := &RUConfig{ - ReadBaseCost: RequestUnit(config.RequestUnit.ReadBaseCost), - ReadPerBatchBaseCost: RequestUnit(config.RequestUnit.ReadPerBatchBaseCost), - ReadBytesCost: RequestUnit(config.RequestUnit.ReadCostPerByte), - WriteBaseCost: RequestUnit(config.RequestUnit.WriteBaseCost), - WritePerBatchBaseCost: RequestUnit(config.RequestUnit.WritePerBatchBaseCost), - WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte), - CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost), - maxWaitDuration: defaultMaxWaitDuration, - } - duration, err := time.ParseDuration(config.DegradedModeWaitDuration) - if err != nil { - cfg.DegradedModeWaitDuration, _ = time.ParseDuration(defaultDegradedModeWaitDuration) - } else { - cfg.DegradedModeWaitDuration = duration + return &RUConfig{ + ReadBaseCost: RequestUnit(config.RequestUnit.ReadBaseCost), + ReadPerBatchBaseCost: RequestUnit(config.RequestUnit.ReadPerBatchBaseCost), + ReadBytesCost: RequestUnit(config.RequestUnit.ReadCostPerByte), + WriteBaseCost: RequestUnit(config.RequestUnit.WriteBaseCost), + WritePerBatchBaseCost: RequestUnit(config.RequestUnit.WritePerBatchBaseCost), + WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte), + CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost), + LTBMaxWaitDuration: config.LTBMaxWaitDuration.Duration, + DegradedModeWaitDuration: config.DegradedModeWaitDuration.Duration, } - return cfg } diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index c79bfec1e56..528369df229 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -71,9 +71,11 @@ type ResourceGroupProvider interface { ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) - LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]pd.GlobalConfigItem, int64, error) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) + + // meta storage client Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error) + Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error) } // ResourceControlCreateOption create a ResourceGroupsController with the optional settings. @@ -89,7 +91,7 @@ func EnableSingleGroupByKeyspace() ResourceControlCreateOption { // WithMaxWaitDuration is the option to set the max wait duration for acquiring token buckets. func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption { return func(controller *ResourceGroupsController) { - controller.ruConfig.maxWaitDuration = d + controller.ruConfig.LTBMaxWaitDuration = d } } @@ -122,6 +124,11 @@ type ResourceGroupsController struct { // Currently, we don't do multiple `AcquireTokenBuckets`` at the same time, so there are no concurrency problems with `currentRequests`. currentRequests []*rmpb.TokenBucketRequest } + + opts []ResourceControlCreateOption + + // a cache for ru config and make concurrency safe. + safeRuConfig atomic.Pointer[RUConfig] } // NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor @@ -139,7 +146,7 @@ func NewResourceGroupController( if requestUnitConfig != nil { config.RequestUnit = *requestUnitConfig } - log.Info("load resource controller config", zap.Reflect("config", config)) + ruConfig := GenerateRUConfig(config) controller := &ResourceGroupsController{ clientUniqueID: clientUniqueID, @@ -148,34 +155,37 @@ func NewResourceGroupController( lowTokenNotifyChan: make(chan struct{}, 1), tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1), tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen), + opts: opts, } for _, opt := range opts { opt(controller) } + log.Info("load resource controller config", zap.Reflect("config", config), zap.Reflect("ru-config", controller.ruConfig)) controller.calculators = []ResourceCalculator{newKVCalculator(controller.ruConfig), newSQLCalculator(controller.ruConfig)} + controller.safeRuConfig.Store(controller.ruConfig) return controller, nil } func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Config, error) { - items, _, err := provider.LoadGlobalConfig(ctx, nil, controllerConfigPath) + resp, err := provider.Get(ctx, []byte(controllerConfigPath)) if err != nil { return nil, err } - if len(items) == 0 { + if len(resp.Kvs) == 0 { log.Warn("[resource group controller] server does not save config, load config failed") return DefaultConfig(), nil } config := &Config{} - err = json.Unmarshal(items[0].PayLoad, config) + err = json.Unmarshal(resp.Kvs[0].GetValue(), config) if err != nil { return nil, err } return config, nil } -// GetConfig returns the config of controller. It's only used for test. +// GetConfig returns the config of controller. func (c *ResourceGroupsController) GetConfig() *RUConfig { - return c.ruConfig + return c.safeRuConfig.Load() } // Source List @@ -213,22 +223,63 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { stateUpdateTicker = time.NewTicker(time.Millisecond * 100) }) - _, revision, err := c.provider.LoadResourceGroups(ctx) + _, metaRevision, err := c.provider.LoadResourceGroups(ctx) + if err != nil { + log.Warn("load resource group revision failed", zap.Error(err)) + } + resp, err := c.provider.Get(ctx, []byte(controllerConfigPath)) if err != nil { log.Warn("load resource group revision failed", zap.Error(err)) } - var watchChannel chan []*meta_storagepb.Event + cfgRevision := resp.GetHeader().GetRevision() + var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event if !c.ruConfig.isSingleGroupByKeyspace { - watchChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix()) + watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix()) + if err != nil { + log.Warn("watch resource group meta failed", zap.Error(err)) + } } - watchRetryTimer := time.NewTimer(watchRetryInterval) - if err == nil || c.ruConfig.isSingleGroupByKeyspace { - watchRetryTimer.Stop() + + watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix()) + if err != nil { + log.Warn("watch resource group config failed", zap.Error(err)) } + watchRetryTimer := time.NewTimer(watchRetryInterval) defer watchRetryTimer.Stop() for { select { + /* tickers */ + case <-cleanupTicker.C: + c.cleanUpResourceGroup() + case <-stateUpdateTicker.C: + c.executeOnAllGroups((*groupCostController).updateRunState) + c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) + if len(c.run.currentRequests) == 0 { + c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */) + } + case <-watchRetryTimer.C: + if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil { + watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix()) + if err != nil { + log.Warn("watch resource group meta failed", zap.Error(err)) + watchRetryTimer.Reset(watchRetryInterval) + failpoint.Inject("watchStreamError", func() { + watchRetryTimer.Reset(20 * time.Millisecond) + }) + } + } + if watchConfigChannel == nil { + watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix()) + if err != nil { + log.Warn("watch resource group config failed", zap.Error(err)) + watchRetryTimer.Reset(watchRetryInterval) + } + } + + case <-emergencyTokenAcquisitionTicker.C: + c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition) + /* channels */ case <-c.loopCtx.Done(): resourceGroupStatusGauge.Reset() return @@ -242,14 +293,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { c.handleTokenBucketResponse(resp) } c.run.currentRequests = nil - case <-cleanupTicker.C: - c.cleanUpResourceGroup() - case <-stateUpdateTicker.C: - c.executeOnAllGroups((*groupCostController).updateRunState) - c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) - if len(c.run.currentRequests) == 0 { - c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */) - } case <-c.lowTokenNotifyChan: c.executeOnAllGroups((*groupCostController).updateRunState) c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) @@ -259,16 +302,14 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { if c.run.inDegradedMode { c.executeOnAllGroups((*groupCostController).applyDegradedMode) } - case <-emergencyTokenAcquisitionTicker.C: - c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition) - case resp, ok := <-watchChannel: + case resp, ok := <-watchMetaChannel: failpoint.Inject("disableWatch", func() { if c.ruConfig.isSingleGroupByKeyspace { panic("disableWatch") } }) if !ok { - watchChannel = nil + watchMetaChannel = nil watchRetryTimer.Reset(watchRetryInterval) failpoint.Inject("watchStreamError", func() { watchRetryTimer.Reset(20 * time.Millisecond) @@ -276,7 +317,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { continue } for _, item := range resp { - revision = item.Kv.ModRevision + metaRevision = item.Kv.ModRevision group := &rmpb.ResourceGroup{} if err := proto.Unmarshal(item.Kv.Value, group); err != nil { continue @@ -293,14 +334,32 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { } } } - case <-watchRetryTimer.C: - watchChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix()) - if err != nil { + case resp, ok := <-watchConfigChannel: + if !ok { + watchConfigChannel = nil watchRetryTimer.Reset(watchRetryInterval) failpoint.Inject("watchStreamError", func() { watchRetryTimer.Reset(20 * time.Millisecond) }) + continue + } + for _, item := range resp { + cfgRevision = item.Kv.ModRevision + config := &Config{} + if err := json.Unmarshal(item.Kv.Value, config); err != nil { + continue + } + c.ruConfig = GenerateRUConfig(config) + + // Stay compatible with serverless + for _, opt := range c.opts { + opt(c) + } + copyCfg := *c.ruConfig + c.safeRuConfig.Store(©Cfg) + log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig)) } + case gc := <-c.tokenBucketUpdateChan: now := gc.run.now go gc.handleTokenBucketUpdateEvent(c.loopCtx, now) @@ -1127,7 +1186,7 @@ func (gc *groupCostController) onRequestWait( res := make([]*Reservation, 0, len(requestResourceLimitTypeList)) for typ, counter := range gc.run.resourceTokens { if v := getRawResourceValueFromConsumption(delta, typ); v > 0 { - res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v)) + res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) } } if d, err = WaitReservations(ctx, now, res); err == nil { @@ -1137,7 +1196,7 @@ func (gc *groupCostController) onRequestWait( res := make([]*Reservation, 0, len(requestUnitLimitTypeList)) for typ, counter := range gc.run.requestUnitTokens { if v := getRUValueFromConsumption(delta, typ); v > 0 { - res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v)) + res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) } } if d, err = WaitReservations(ctx, now, res); err == nil { diff --git a/client/resource_group/controller/util.go b/client/resource_group/controller/util.go new file mode 100644 index 00000000000..e3450e0ae0d --- /dev/null +++ b/client/resource_group/controller/util.go @@ -0,0 +1,68 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "fmt" + "strconv" + "time" + + "github.com/pingcap/errors" +) + +// Duration is a wrapper of time.Duration for TOML and JSON. +type Duration struct { + time.Duration +} + +// NewDuration creates a Duration from time.Duration. +func NewDuration(duration time.Duration) Duration { + return Duration{Duration: duration} +} + +// MarshalJSON returns the duration as a JSON string. +func (d *Duration) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`"%s"`, d.String())), nil +} + +// UnmarshalJSON parses a JSON string into the duration. +func (d *Duration) UnmarshalJSON(text []byte) error { + s, err := strconv.Unquote(string(text)) + if err != nil { + return errors.WithStack(err) + } + duration, err := time.ParseDuration(s) + if err != nil { + return errors.WithStack(err) + } + d.Duration = duration + return nil +} + +// UnmarshalText parses a TOML string into the duration. +func (d *Duration) UnmarshalText(text []byte) error { + var err error + d.Duration, err = time.ParseDuration(string(text)) + return errors.WithStack(err) +} + +// MarshalText returns the duration as a JSON string. +func (d Duration) MarshalText() ([]byte, error) { + return []byte(d.String()), nil +} diff --git a/client/resource_group/controller/util_test.go b/client/resource_group/controller/util_test.go new file mode 100644 index 00000000000..b542e6713dc --- /dev/null +++ b/client/resource_group/controller/util_test.go @@ -0,0 +1,51 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "encoding/json" + "testing" + + "github.com/BurntSushi/toml" + "github.com/stretchr/testify/require" +) + +type example struct { + Interval Duration `json:"interval" toml:"interval"` +} + +func TestDurationJSON(t *testing.T) { + t.Parallel() + re := require.New(t) + example := &example{} + + text := []byte(`{"interval":"1h1m1s"}`) + re.NoError(json.Unmarshal(text, example)) + re.Equal(float64(60*60+60+1), example.Interval.Seconds()) + + b, err := json.Marshal(example) + re.NoError(err) + re.Equal(string(text), string(b)) +} + +func TestDurationTOML(t *testing.T) { + t.Parallel() + re := require.New(t) + example := &example{} + + text := []byte(`interval = "1h1m1s"`) + re.Nil(toml.Unmarshal(text, example)) + re.Equal(float64(60*60+60+1), example.Interval.Seconds()) +} diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index 61919a2ccb2..68b2de66ae2 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -31,9 +31,10 @@ import ( type actionType int const ( - add actionType = 0 - modify actionType = 1 - groupSettingsPathPrefix = "resource_group/settings" + add actionType = 0 + modify actionType = 1 + groupSettingsPathPrefix = "resource_group/settings" + controllerConfigPathPrefix = "resource_group/controller" // errNotPrimary is returned when the requested server is not primary. errNotPrimary = "not primary" // errNotLeader is returned when the requested server is not pd leader. @@ -43,6 +44,9 @@ const ( // GroupSettingsPathPrefixBytes is used to watch or get resource groups. var GroupSettingsPathPrefixBytes = []byte(groupSettingsPathPrefix) +// ControllerConfigPathPrefixBytes is used to watch or get controller config. +var ControllerConfigPathPrefixBytes = []byte(controllerConfigPathPrefix) + // ResourceManagerClient manages resource group info and token request. type ResourceManagerClient interface { ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) diff --git a/errors.toml b/errors.toml index a0040195e5d..6766da79572 100644 --- a/errors.toml +++ b/errors.toml @@ -516,6 +516,36 @@ error = ''' TCP socks error ''' +["PD:operator:ErrAddOperator"] +error = ''' +failed to add operator, maybe already have one +''' + +["PD:operator:ErrOperatorNotFound"] +error = ''' +operator not found +''' + +["PD:operator:ErrPluginNotFound"] +error = ''' +plugin is not found: %s +''' + +["PD:operator:ErrRegionAbnormalPeer"] +error = ''' +region %v has abnormal peer +''' + +["PD:operator:ErrRegionNotAdjacent"] +error = ''' +two regions are not adjacent +''' + +["PD:operator:ErrRegionNotFound"] +error = ''' +region %v not found +''' + ["PD:os:ErrOSOpen"] error = ''' open error diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index a077751f561..9eedb144f95 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -92,6 +92,30 @@ var ( ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO")) ) +// operator errors +var ( + // ErrOperatorNotFound is error info for operator not found. + ErrOperatorNotFound = errors.Normalize("operator not found", errors.RFCCodeText("PD:operator:ErrOperatorNotFound")) + // ErrAddOperator is error info for already have an operator when adding operator. + ErrAddOperator = errors.Normalize("failed to add operator, maybe already have one", errors.RFCCodeText("PD:operator:ErrAddOperator")) +) + +// region errors +var ( + // ErrRegionNotAdjacent is error info for region not adjacent. + ErrRegionNotAdjacent = errors.Normalize("two regions are not adjacent", errors.RFCCodeText("PD:operator:ErrRegionNotAdjacent")) + // ErrRegionNotFound is error info for region not found. + ErrRegionNotFound = errors.Normalize("region %v not found", errors.RFCCodeText("PD:operator:ErrRegionNotFound")) + // ErrRegionAbnormalPeer is error info for region has abnormal peer. + ErrRegionAbnormalPeer = errors.Normalize("region %v has abnormal peer", errors.RFCCodeText("PD:operator:ErrRegionAbnormalPeer")) +) + +// plugin errors +var ( + // ErrPluginNotFound is error info for plugin not found. + ErrPluginNotFound = errors.Normalize("plugin is not found: %s", errors.RFCCodeText("PD:operator:ErrPluginNotFound")) +) + // schedule errors var ( ErrUnexpectedOperatorStatus = errors.Normalize("operator with unexpected status", errors.RFCCodeText("PD:schedule:ErrUnexpectedOperatorStatus")) diff --git a/pkg/mcs/resourcemanager/server/apis/v1/api.go b/pkg/mcs/resourcemanager/server/apis/v1/api.go index 7c5e3e010dc..970880788d4 100644 --- a/pkg/mcs/resourcemanager/server/apis/v1/api.go +++ b/pkg/mcs/resourcemanager/server/apis/v1/api.go @@ -16,7 +16,9 @@ package apis import ( "errors" + "fmt" "net/http" + "reflect" "sync" "github.com/gin-contrib/cors" @@ -29,6 +31,7 @@ import ( "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" + "github.com/tikv/pd/pkg/utils/reflectutil" ) // APIPathPrefix is the prefix of the API path. @@ -97,6 +100,8 @@ func (s *Service) RegisterRouter() { configEndpoint.GET("/group/:name", s.getResourceGroup) configEndpoint.GET("/groups", s.getResourceGroupList) configEndpoint.DELETE("/group/:name", s.deleteResourceGroup) + configEndpoint.GET("/controller", s.getControllerConfig) + configEndpoint.POST("/controller", s.setControllerConfig) } func (s *Service) handler() http.Handler { @@ -191,3 +196,43 @@ func (s *Service) deleteResourceGroup(c *gin.Context) { } c.String(http.StatusOK, "Success!") } + +// GetControllerConfig +// +// @Tags ResourceManager +// @Summary Get the resource controller config. +// @Success 200 {string} json format of rmserver.ControllerConfig +// @Failure 400 {string} error +// @Router /config/controller [GET] +func (s *Service) getControllerConfig(c *gin.Context) { + config := s.manager.GetControllerConfig() + c.IndentedJSON(http.StatusOK, config) +} + +// SetControllerConfig +// +// @Tags ResourceManager +// @Summary Set the resource controller config. +// @Param config body object true "json params, rmserver.ControllerConfig" +// @Success 200 {string} string "Success!" +// @Failure 400 {string} error +// @Router /config/controller [POST] +func (s *Service) setControllerConfig(c *gin.Context) { + conf := make(map[string]interface{}) + if err := c.ShouldBindJSON(&conf); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + for k, v := range conf { + key := reflectutil.FindJSONFullTagByChildTag(reflect.TypeOf(rmserver.ControllerConfig{}), k) + if key == "" { + c.String(http.StatusBadRequest, fmt.Sprintf("config item %s not found", k)) + return + } + if err := s.manager.UpdateControllerConfigItem(key, v); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + } + c.String(http.StatusOK, "Success!") +} diff --git a/pkg/mcs/resourcemanager/server/config.go b/pkg/mcs/resourcemanager/server/config.go index 51fbe388458..3f64b2987fd 100644 --- a/pkg/mcs/resourcemanager/server/config.go +++ b/pkg/mcs/resourcemanager/server/config.go @@ -57,6 +57,8 @@ const ( // Because the resource manager has not been deployed in microservice mode, // do not enable this function. defaultDegradedModeWaitDuration = time.Second * 0 + // defaultMaxWaitDuration is the max duration to wait for the token before throwing error. + defaultMaxWaitDuration = 30 * time.Second ) // Config is the configuration for the resource manager. @@ -94,6 +96,9 @@ type ControllerConfig struct { // EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect. DegradedModeWaitDuration typeutil.Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"` + // LTBMaxWaitDuration is the max wait time duration for local token bucket. + LTBMaxWaitDuration typeutil.Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"` + // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"` @@ -107,6 +112,7 @@ func (rmc *ControllerConfig) Adjust(meta *configutil.ConfigMetaData) { rmc.RequestUnit.Adjust() configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, defaultDegradedModeWaitDuration) + configutil.AdjustDuration(&rmc.LTBMaxWaitDuration, defaultMaxWaitDuration) failpoint.Inject("enableDegradedMode", func() { configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, time.Second) }) diff --git a/pkg/mcs/resourcemanager/server/config_test.go b/pkg/mcs/resourcemanager/server/config_test.go index c0cac4da9c0..dd8dd2d2814 100644 --- a/pkg/mcs/resourcemanager/server/config_test.go +++ b/pkg/mcs/resourcemanager/server/config_test.go @@ -27,6 +27,7 @@ func TestControllerConfig(t *testing.T) { re := require.New(t) cfgData := ` [controller] +ltb-max-wait-duration = "60s" degraded-mode-wait-duration = "2s" [controller.request-unit] read-base-cost = 1.0 @@ -42,6 +43,7 @@ read-cpu-ms-cost = 5.0 re.NoError(err) re.Equal(cfg.Controller.DegradedModeWaitDuration.Duration, time.Second*2) + re.Equal(cfg.Controller.LTBMaxWaitDuration.Duration, time.Second*60) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.CPUMsCost-5), 1e-7) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteCostPerByte-4), 1e-7) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteBaseCost-3), 1e-7) diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index 6d1b872575b..21866ee1156 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -19,10 +19,12 @@ import ( "encoding/json" "math" "sort" + "strings" "sync" "time" "github.com/gogo/protobuf/proto" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/log" @@ -30,6 +32,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/jsonutil" "github.com/tikv/pd/pkg/utils/logutil" "go.uber.org/zap" ) @@ -102,32 +105,46 @@ func (m *Manager) GetBasicServer() bs.Server { // Init initializes the resource group manager. func (m *Manager) Init(ctx context.Context) error { - // Todo: If we can modify following configs in the future, we should reload these configs. - // Store the controller config into the storage. - m.storage.SaveControllerConfig(m.controllerConfig) + v, err := m.storage.LoadControllerConfig() + if err != nil { + log.Error("resource controller config load failed", zap.Error(err), zap.String("v", v)) + return err + } + if err = json.Unmarshal([]byte(v), &m.controllerConfig); err != nil { + log.Error("un-marshall controller config failed, fallback to default", zap.Error(err), zap.String("v", v)) + } + + // re-save the config to make sure the config has been persisted. + if err := m.storage.SaveControllerConfig(m.controllerConfig); err != nil { + return err + } // Load resource group meta info from storage. m.groups = make(map[string]*ResourceGroup) handler := func(k, v string) { group := &rmpb.ResourceGroup{} if err := proto.Unmarshal([]byte(v), group); err != nil { - log.Error("err", zap.Error(err), zap.String("k", k), zap.String("v", v)) + log.Error("failed to parse the resource group", zap.Error(err), zap.String("k", k), zap.String("v", v)) panic(err) } m.groups[group.Name] = FromProtoResourceGroup(group) } - m.storage.LoadResourceGroupSettings(handler) + if err := m.storage.LoadResourceGroupSettings(handler); err != nil { + return err + } // Load resource group states from storage. tokenHandler := func(k, v string) { tokens := &GroupStates{} if err := json.Unmarshal([]byte(v), tokens); err != nil { - log.Error("err", zap.Error(err), zap.String("k", k), zap.String("v", v)) + log.Error("failed to parse the resource group state", zap.Error(err), zap.String("k", k), zap.String("v", v)) panic(err) } if group, ok := m.groups[k]; ok { group.SetStatesIntoResourceGroup(tokens) } } - m.storage.LoadResourceGroupStates(tokenHandler) + if err := m.storage.LoadResourceGroupStates(tokenHandler); err != nil { + return err + } // Add default group if it's not inited. if _, ok := m.groups[reservedDefaultGroupName]; !ok { @@ -159,6 +176,47 @@ func (m *Manager) Init(ctx context.Context) error { return nil } +// UpdateControllerConfigItem updates the controller config item. +func (m *Manager) UpdateControllerConfigItem(key string, value interface{}) error { + kp := strings.Split(key, ".") + if len(kp) == 0 { + return errors.Errorf("invalid key %s", key) + } + m.Lock() + var config interface{} + switch kp[0] { + case "request-unit": + config = &m.controllerConfig.RequestUnit + default: + config = m.controllerConfig + } + updated, found, err := jsonutil.AddKeyValue(config, kp[len(kp)-1], value) + if err != nil { + m.Unlock() + return err + } + + if !found { + m.Unlock() + return errors.Errorf("config item %s not found", key) + } + m.Unlock() + if updated { + if err := m.storage.SaveControllerConfig(m.controllerConfig); err != nil { + log.Error("save controller config failed", zap.Error(err)) + } + log.Info("updated controller config item", zap.String("key", key), zap.Any("value", value)) + } + return nil +} + +// GetControllerConfig returns the controller config. +func (m *Manager) GetControllerConfig() *ControllerConfig { + m.RLock() + defer m.RUnlock() + return m.controllerConfig +} + // AddResourceGroup puts a resource group. // NOTE: AddResourceGroup should also be idempotent because tidb depends // on this retry mechanism. diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go new file mode 100644 index 00000000000..d48941726d0 --- /dev/null +++ b/pkg/schedule/handler/handler.go @@ -0,0 +1,500 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handler + +import ( + "bytes" + "encoding/hex" + "strings" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/schedule" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/filter" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/schedule/scatter" +) + +// Server is the interface for handler about schedule. +// TODO: remove it after GetCluster is unified between PD server and Scheduling server. +type Server interface { + GetCoordinator() *schedule.Coordinator + GetCluster() sche.SharedCluster +} + +// Handler is a handler to handle http request about schedule. +type Handler struct { + Server +} + +// NewHandler creates a new handler. +func NewHandler(server Server) *Handler { + return &Handler{ + Server: server, + } +} + +// GetOperatorController returns OperatorController. +func (h *Handler) GetOperatorController() (*operator.Controller, error) { + co := h.GetCoordinator() + if co == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + return co.GetOperatorController(), nil +} + +// GetRegionScatterer returns RegionScatterer. +func (h *Handler) GetRegionScatterer() (*scatter.RegionScatterer, error) { + co := h.GetCoordinator() + if co == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + return co.GetRegionScatterer(), nil +} + +// GetOperator returns the region operator. +func (h *Handler) GetOperator(regionID uint64) (*operator.Operator, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + + op := c.GetOperator(regionID) + if op == nil { + return nil, errs.ErrOperatorNotFound + } + + return op, nil +} + +// GetOperatorStatus returns the status of the region operator. +func (h *Handler) GetOperatorStatus(regionID uint64) (*operator.OpWithStatus, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + + op := c.GetOperatorStatus(regionID) + if op == nil { + return nil, errs.ErrOperatorNotFound + } + + return op, nil +} + +// RemoveOperator removes the region operator. +func (h *Handler) RemoveOperator(regionID uint64) error { + c, err := h.GetOperatorController() + if err != nil { + return err + } + + op := c.GetOperator(regionID) + if op == nil { + return errs.ErrOperatorNotFound + } + + _ = c.RemoveOperator(op, operator.AdminStop) + return nil +} + +// GetOperators returns the running operators. +func (h *Handler) GetOperators() ([]*operator.Operator, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + return c.GetOperators(), nil +} + +// GetWaitingOperators returns the waiting operators. +func (h *Handler) GetWaitingOperators() ([]*operator.Operator, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + return c.GetWaitingOperators(), nil +} + +// GetAdminOperators returns the running admin operators. +func (h *Handler) GetAdminOperators() ([]*operator.Operator, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + return c.GetOperatorsOfKind(operator.OpAdmin), nil +} + +// GetLeaderOperators returns the running leader operators. +func (h *Handler) GetLeaderOperators() ([]*operator.Operator, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + return c.GetOperatorsOfKind(operator.OpLeader), nil +} + +// GetRegionOperators returns the running region operators. +func (h *Handler) GetRegionOperators() ([]*operator.Operator, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + return c.GetOperatorsOfKind(operator.OpRegion), nil +} + +// GetHistory returns finished operators' history since start. +func (h *Handler) GetHistory(start time.Time) ([]operator.OpHistory, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + return c.GetHistory(start), nil +} + +// GetRecords returns finished operators since start. +func (h *Handler) GetRecords(from time.Time) ([]*operator.OpRecord, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + records := c.GetRecords(from) + if len(records) == 0 { + return nil, errs.ErrOperatorNotFound + } + return records, nil +} + +// AddTransferLeaderOperator adds an operator to transfer leader to the store. +func (h *Handler) AddTransferLeaderOperator(regionID uint64, storeID uint64) error { + c := h.GetCluster() + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound.FastGenByArgs(regionID) + } + + newLeader := region.GetStoreVoter(storeID) + if newLeader == nil { + return errors.Errorf("region has no voter in store %v", storeID) + } + + op, err := operator.CreateTransferLeaderOperator("admin-transfer-leader", c, region, region.GetLeader().GetStoreId(), newLeader.GetStoreId(), []uint64{}, operator.OpAdmin) + if err != nil { + log.Debug("fail to create transfer leader operator", errs.ZapError(err)) + return err + } + return h.addOperator(op) +} + +// AddTransferRegionOperator adds an operator to transfer region to the stores. +func (h *Handler) AddTransferRegionOperator(regionID uint64, storeIDs map[uint64]placement.PeerRoleType) error { + c := h.GetCluster() + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound.FastGenByArgs(regionID) + } + + if c.GetSharedConfig().IsPlacementRulesEnabled() { + // Cannot determine role without peer role when placement rules enabled. Not supported now. + for _, role := range storeIDs { + if len(role) == 0 { + return errors.New("transfer region without peer role is not supported when placement rules enabled") + } + } + } + for id := range storeIDs { + if err := checkStoreState(c, id); err != nil { + return err + } + } + + roles := make(map[uint64]placement.PeerRoleType) + for id, peerRole := range storeIDs { + if peerRole == "" { + peerRole = placement.Voter + } + roles[id] = peerRole + } + op, err := operator.CreateMoveRegionOperator("admin-move-region", c, region, operator.OpAdmin, roles) + if err != nil { + log.Debug("fail to create move region operator", errs.ZapError(err)) + return err + } + return h.addOperator(op) +} + +// AddTransferPeerOperator adds an operator to transfer peer. +func (h *Handler) AddTransferPeerOperator(regionID uint64, fromStoreID, toStoreID uint64) error { + c := h.GetCluster() + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound.FastGenByArgs(regionID) + } + + oldPeer := region.GetStorePeer(fromStoreID) + if oldPeer == nil { + return errors.Errorf("region has no peer in store %v", fromStoreID) + } + + if err := checkStoreState(c, toStoreID); err != nil { + return err + } + + newPeer := &metapb.Peer{StoreId: toStoreID, Role: oldPeer.GetRole(), IsWitness: oldPeer.GetIsWitness()} + op, err := operator.CreateMovePeerOperator("admin-move-peer", c, region, operator.OpAdmin, fromStoreID, newPeer) + if err != nil { + log.Debug("fail to create move peer operator", errs.ZapError(err)) + return err + } + return h.addOperator(op) +} + +// checkAdminAddPeerOperator checks adminAddPeer operator with given region ID and store ID. +func (h *Handler) checkAdminAddPeerOperator(regionID uint64, toStoreID uint64) (sche.SharedCluster, *core.RegionInfo, error) { + c := h.GetCluster() + region := c.GetRegion(regionID) + if region == nil { + return nil, nil, errs.ErrRegionNotFound.FastGenByArgs(regionID) + } + + if region.GetStorePeer(toStoreID) != nil { + return nil, nil, errors.Errorf("region already has peer in store %v", toStoreID) + } + + if err := checkStoreState(c, toStoreID); err != nil { + return nil, nil, err + } + + return c, region, nil +} + +// AddAddPeerOperator adds an operator to add peer. +func (h *Handler) AddAddPeerOperator(regionID uint64, toStoreID uint64) error { + c, region, err := h.checkAdminAddPeerOperator(regionID, toStoreID) + if err != nil { + return err + } + + newPeer := &metapb.Peer{StoreId: toStoreID} + op, err := operator.CreateAddPeerOperator("admin-add-peer", c, region, newPeer, operator.OpAdmin) + if err != nil { + log.Debug("fail to create add peer operator", errs.ZapError(err)) + return err + } + return h.addOperator(op) +} + +// AddAddLearnerOperator adds an operator to add learner. +func (h *Handler) AddAddLearnerOperator(regionID uint64, toStoreID uint64) error { + c, region, err := h.checkAdminAddPeerOperator(regionID, toStoreID) + if err != nil { + return err + } + + newPeer := &metapb.Peer{ + StoreId: toStoreID, + Role: metapb.PeerRole_Learner, + } + + op, err := operator.CreateAddPeerOperator("admin-add-learner", c, region, newPeer, operator.OpAdmin) + if err != nil { + log.Debug("fail to create add learner operator", errs.ZapError(err)) + return err + } + return h.addOperator(op) +} + +// AddRemovePeerOperator adds an operator to remove peer. +func (h *Handler) AddRemovePeerOperator(regionID uint64, fromStoreID uint64) error { + c := h.GetCluster() + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound.FastGenByArgs(regionID) + } + + if region.GetStorePeer(fromStoreID) == nil { + return errors.Errorf("region has no peer in store %v", fromStoreID) + } + + op, err := operator.CreateRemovePeerOperator("admin-remove-peer", c, operator.OpAdmin, region, fromStoreID) + if err != nil { + log.Debug("fail to create move peer operator", errs.ZapError(err)) + return err + } + return h.addOperator(op) +} + +// AddMergeRegionOperator adds an operator to merge region. +func (h *Handler) AddMergeRegionOperator(regionID uint64, targetID uint64) error { + c := h.GetCluster() + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound.FastGenByArgs(regionID) + } + + target := c.GetRegion(targetID) + if target == nil { + return errs.ErrRegionNotFound.FastGenByArgs(targetID) + } + + if !filter.IsRegionHealthy(region) || !filter.IsRegionReplicated(c, region) { + return errs.ErrRegionAbnormalPeer.FastGenByArgs(regionID) + } + + if !filter.IsRegionHealthy(target) || !filter.IsRegionReplicated(c, target) { + return errs.ErrRegionAbnormalPeer.FastGenByArgs(targetID) + } + + // for the case first region (start key is nil) with the last region (end key is nil) but not adjacent + if (!bytes.Equal(region.GetStartKey(), target.GetEndKey()) || len(region.GetStartKey()) == 0) && + (!bytes.Equal(region.GetEndKey(), target.GetStartKey()) || len(region.GetEndKey()) == 0) { + return errs.ErrRegionNotAdjacent + } + + ops, err := operator.CreateMergeRegionOperator("admin-merge-region", c, region, target, operator.OpAdmin) + if err != nil { + log.Debug("fail to create merge region operator", errs.ZapError(err)) + return err + } + return h.addOperator(ops...) +} + +// AddSplitRegionOperator adds an operator to split a region. +func (h *Handler) AddSplitRegionOperator(regionID uint64, policyStr string, keys []string) error { + c := h.GetCluster() + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound.FastGenByArgs(regionID) + } + + policy, ok := pdpb.CheckPolicy_value[strings.ToUpper(policyStr)] + if !ok { + return errors.Errorf("check policy %s is not supported", policyStr) + } + + var splitKeys [][]byte + if pdpb.CheckPolicy(policy) == pdpb.CheckPolicy_USEKEY { + for i := range keys { + k, err := hex.DecodeString(keys[i]) + if err != nil { + return errors.Errorf("split key %s is not in hex format", keys[i]) + } + splitKeys = append(splitKeys, k) + } + } + + op, err := operator.CreateSplitRegionOperator("admin-split-region", region, operator.OpAdmin, pdpb.CheckPolicy(policy), splitKeys) + if err != nil { + return err + } + + return h.addOperator(op) +} + +// AddScatterRegionOperator adds an operator to scatter a region. +func (h *Handler) AddScatterRegionOperator(regionID uint64, group string) error { + c := h.GetCluster() + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound.FastGenByArgs(regionID) + } + + if c.IsRegionHot(region) { + return errors.Errorf("region %d is a hot region", regionID) + } + + s, err := h.GetRegionScatterer() + if err != nil { + return err + } + + op, err := s.Scatter(region, group, false) + if err != nil { + return err + } + + if op == nil { + return nil + } + return h.addOperator(op) +} + +// AddScatterRegionsOperators add operators to scatter regions and return the processed percentage and error +func (h *Handler) AddScatterRegionsOperators(regionIDs []uint64, startRawKey, endRawKey, group string, retryLimit int) (int, error) { + s, err := h.GetRegionScatterer() + if err != nil { + return 0, err + } + opsCount := 0 + var failures map[uint64]error + // If startKey and endKey are both defined, use them first. + if len(startRawKey) > 0 && len(endRawKey) > 0 { + startKey, err := hex.DecodeString(startRawKey) + if err != nil { + return 0, err + } + endKey, err := hex.DecodeString(endRawKey) + if err != nil { + return 0, err + } + opsCount, failures, err = s.ScatterRegionsByRange(startKey, endKey, group, retryLimit) + if err != nil { + return 0, err + } + } else { + opsCount, failures, err = s.ScatterRegionsByID(regionIDs, group, retryLimit, false) + if err != nil { + return 0, err + } + } + percentage := 100 + if len(failures) > 0 { + percentage = 100 - 100*len(failures)/(opsCount+len(failures)) + } + return percentage, nil +} + +func (h *Handler) addOperator(ops ...*operator.Operator) error { + oc, err := h.GetOperatorController() + if err != nil { + return err + } + + if ok := oc.AddOperator(ops...); !ok { + return errors.WithStack(errs.ErrAddOperator) + } + return nil +} + +func checkStoreState(c sche.SharedCluster, storeID uint64) error { + store := c.GetStore(storeID) + if store == nil { + return errs.ErrStoreNotFound.FastGenByArgs(storeID) + } + if store.IsRemoved() { + return errs.ErrStoreRemoved.FastGenByArgs(storeID) + } + if store.IsUnhealthy() { + return errs.ErrStoreUnhealthy.FastGenByArgs(storeID) + } + return nil +} diff --git a/pkg/storage/endpoint/resource_group.go b/pkg/storage/endpoint/resource_group.go index f1b3feb36aa..150ea77a1c7 100644 --- a/pkg/storage/endpoint/resource_group.go +++ b/pkg/storage/endpoint/resource_group.go @@ -27,6 +27,7 @@ type ResourceGroupStorage interface { SaveResourceGroupStates(name string, obj interface{}) error DeleteResourceGroupStates(name string) error SaveControllerConfig(config interface{}) error + LoadControllerConfig() (string, error) } var _ ResourceGroupStorage = (*StorageEndpoint)(nil) @@ -65,3 +66,8 @@ func (se *StorageEndpoint) LoadResourceGroupStates(f func(k, v string)) error { func (se *StorageEndpoint) SaveControllerConfig(config interface{}) error { return se.saveJSON(controllerConfigPath, config) } + +// LoadControllerConfig loads the resource controller config from storage. +func (se *StorageEndpoint) LoadControllerConfig() (string, error) { + return se.Load(controllerConfigPath) +} diff --git a/server/api/hot_status.go b/server/api/hot_status.go index 4f64f1bebc5..7779591de1f 100644 --- a/server/api/hot_status.go +++ b/server/api/hot_status.go @@ -22,6 +22,7 @@ import ( "strconv" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" @@ -117,7 +118,7 @@ func (h *hotStatusHandler) GetHotWriteRegions(w http.ResponseWriter, r *http.Req } store := rc.GetStore(id) if store == nil { - h.rd.JSON(w, http.StatusNotFound, server.ErrStoreNotFound(id).Error()) + h.rd.JSON(w, http.StatusNotFound, errs.ErrStoreNotFound.FastGenByArgs(id).Error()) return } ids = append(ids, id) @@ -153,7 +154,7 @@ func (h *hotStatusHandler) GetHotReadRegions(w http.ResponseWriter, r *http.Requ } store := rc.GetStore(id) if store == nil { - h.rd.JSON(w, http.StatusNotFound, server.ErrStoreNotFound(id).Error()) + h.rd.JSON(w, http.StatusNotFound, errs.ErrStoreNotFound.FastGenByArgs(id).Error()) return } ids = append(ids, id) diff --git a/server/api/label.go b/server/api/label.go index abaad02a4e3..b7f279d86cc 100644 --- a/server/api/label.go +++ b/server/api/label.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server" "github.com/unrolled/render" ) @@ -87,7 +88,7 @@ func (h *labelsHandler) GetStoresByLabel(w http.ResponseWriter, r *http.Request) storeID := s.GetId() store := rc.GetStore(storeID) if store == nil { - h.rd.JSON(w, http.StatusInternalServerError, server.ErrStoreNotFound(storeID).Error()) + h.rd.JSON(w, http.StatusInternalServerError, errs.ErrStoreNotFound.FastGenByArgs(storeID).Error()) return } diff --git a/server/api/region.go b/server/api/region.go index 1c21af53296..42b430974c4 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/kvproto/pkg/replication_modepb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/keyspace" "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/statistics" @@ -763,7 +764,7 @@ func (h *regionsHandler) GetRegionSiblings(w http.ResponseWriter, r *http.Reques } region := rc.GetRegion(uint64(id)) if region == nil { - h.rd.JSON(w, http.StatusNotFound, server.ErrRegionNotFound(uint64(id)).Error()) + h.rd.JSON(w, http.StatusNotFound, errs.ErrRegionNotFound.FastGenByArgs(uint64(id)).Error()) return } @@ -1037,7 +1038,7 @@ func (h *regionsHandler) ScatterRegions(w http.ResponseWriter, r *http.Request) h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - opsCount, failures, err = rc.GetRegionScatter().ScatterRegionsByRange(startKey, endKey, group, retryLimit) + opsCount, failures, err = rc.GetRegionScatterer().ScatterRegionsByRange(startKey, endKey, group, retryLimit) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return @@ -1048,7 +1049,7 @@ func (h *regionsHandler) ScatterRegions(w http.ResponseWriter, r *http.Request) h.rd.JSON(w, http.StatusBadRequest, "regions_id is invalid") return } - opsCount, failures, err = rc.GetRegionScatter().ScatterRegionsByID(ids, group, retryLimit, false) + opsCount, failures, err = rc.GetRegionScatterer().ScatterRegionsByID(ids, group, retryLimit, false) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return diff --git a/server/api/rule.go b/server/api/rule.go index 33c63a8faa2..b3a720ece41 100644 --- a/server/api/rule.go +++ b/server/api/rule.go @@ -167,7 +167,7 @@ func (h *ruleHandler) preCheckForRegionAndRule(w http.ResponseWriter, r *http.Re } region := cluster.GetRegion(regionID) if region == nil { - h.rd.JSON(w, http.StatusNotFound, server.ErrRegionNotFound(regionID).Error()) + h.rd.JSON(w, http.StatusNotFound, errs.ErrRegionNotFound.FastGenByArgs(regionID).Error()) return cluster, nil } return cluster, region diff --git a/server/api/store.go b/server/api/store.go index a3e8c4518a2..7c820a3befa 100644 --- a/server/api/store.go +++ b/server/api/store.go @@ -191,7 +191,7 @@ func (h *storeHandler) GetStore(w http.ResponseWriter, r *http.Request) { store := rc.GetStore(storeID) if store == nil { - h.rd.JSON(w, http.StatusNotFound, server.ErrStoreNotFound(storeID).Error()) + h.rd.JSON(w, http.StatusNotFound, errs.ErrStoreNotFound.FastGenByArgs(storeID).Error()) return } @@ -437,7 +437,7 @@ func (h *storeHandler) SetStoreLimit(w http.ResponseWriter, r *http.Request) { store := rc.GetStore(storeID) if store == nil { - h.rd.JSON(w, http.StatusInternalServerError, server.ErrStoreNotFound(storeID).Error()) + h.rd.JSON(w, http.StatusInternalServerError, errs.ErrStoreNotFound.FastGenByArgs(storeID).Error()) return } @@ -758,7 +758,7 @@ func (h *storesHandler) GetAllStores(w http.ResponseWriter, r *http.Request) { storeID := s.GetId() store := rc.GetStore(storeID) if store == nil { - h.rd.JSON(w, http.StatusInternalServerError, server.ErrStoreNotFound(storeID).Error()) + h.rd.JSON(w, http.StatusInternalServerError, errs.ErrStoreNotFound.FastGenByArgs(storeID).Error()) return } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 29a8709bdac..94761c330b6 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -436,7 +436,7 @@ func (c *RaftCluster) runStoreConfigSync() { for { synced, switchRaftV2Config = c.syncStoreConfig(stores) if switchRaftV2Config { - if err := c.opt.Persist(c.GetStorage()); err != nil { + if err := c.opt.SwitchRaftV2(c.GetStorage()); err != nil { log.Warn("store config persisted failed", zap.Error(err)) } } @@ -759,8 +759,8 @@ func (c *RaftCluster) SetPrepared() { c.coordinator.GetPrepareChecker().SetPrepared() } -// GetRegionScatter returns the region scatter. -func (c *RaftCluster) GetRegionScatter() *scatter.RegionScatterer { +// GetRegionScatterer returns the region scatter. +func (c *RaftCluster) GetRegionScatterer() *scatter.RegionScatterer { return c.coordinator.GetRegionScatterer() } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index ea8d27b155f..5679fd6128d 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1469,6 +1469,10 @@ func TestStoreConfigSync(t *testing.T) { err = opt.Reload(tc.GetStorage()) re.NoError(err) re.Equal(tc.GetOpts().(*config.PersistOptions).GetStoreConfig(), opt.GetStoreConfig()) + + re.Equal("v1", opt.GetScheduleConfig().StoreLimitVersion) + re.NoError(opt.SwitchRaftV2(tc.GetStorage())) + re.Equal("v2", opt.GetScheduleConfig().StoreLimitVersion) } func TestUpdateStorePendingPeerCount(t *testing.T) { diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 1ea0b79424f..3f1c4d4a24e 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -762,6 +762,12 @@ type persistedConfig struct { StoreConfig sc.StoreConfig `json:"store"` } +// SwitchRaftV2 update some config if tikv raft engine switch into partition raft v2 +func (o *PersistOptions) SwitchRaftV2(storage endpoint.ConfigStorage) error { + o.GetScheduleConfig().StoreLimitVersion = "v2" + return o.Persist(storage) +} + // Persist saves the configuration to the storage. func (o *PersistOptions) Persist(storage endpoint.ConfigStorage) error { cfg := &persistedConfig{ diff --git a/server/grpc_service.go b/server/grpc_service.go index 55b265e32a5..5a483b71818 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1755,7 +1755,7 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg region = core.NewRegionInfo(request.GetRegion(), request.GetLeader()) } - op, err := rc.GetRegionScatter().Scatter(region, request.GetGroup(), request.GetSkipStoreLimit()) + op, err := rc.GetRegionScatterer().Scatter(region, request.GetGroup(), request.GetSkipStoreLimit()) if err != nil { return nil, err } @@ -2152,7 +2152,7 @@ func (s *GrpcServer) SplitAndScatterRegions(ctx context.Context, request *pdpb.S // scatterRegions add operators to scatter regions and return the processed percentage and error func scatterRegions(cluster *cluster.RaftCluster, regionsID []uint64, group string, retryLimit int, skipStoreLimit bool) (int, error) { - opsCount, failures, err := cluster.GetRegionScatter().ScatterRegionsByID(regionsID, group, retryLimit, skipStoreLimit) + opsCount, failures, err := cluster.GetRegionScatterer().ScatterRegionsByID(regionsID, group, retryLimit, skipStoreLimit) if err != nil { return 0, err } diff --git a/server/handler.go b/server/handler.go index 29373dc286e..ecc337b7193 100644 --- a/server/handler.go +++ b/server/handler.go @@ -15,19 +15,15 @@ package server import ( - "bytes" - "encoding/hex" "encoding/json" "net/http" "net/url" "path" "strconv" - "strings" "time" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/storelimit" @@ -35,9 +31,8 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/schedule" sc "github.com/tikv/pd/pkg/schedule/config" - "github.com/tikv/pd/pkg/schedule/filter" - "github.com/tikv/pd/pkg/schedule/operator" - "github.com/tikv/pd/pkg/schedule/placement" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/handler" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/buckets" @@ -50,36 +45,24 @@ import ( "go.uber.org/zap" ) -var ( - // SchedulerConfigHandlerPath is the api router path of the schedule config handler. - SchedulerConfigHandlerPath = "/api/v1/scheduler-config" - - // ErrOperatorNotFound is error info for operator not found. - ErrOperatorNotFound = errors.New("operator not found") - // ErrAddOperator is error info for already have an operator when adding operator. - ErrAddOperator = errors.New("failed to add operator, maybe already have one") - // ErrRegionNotAdjacent is error info for region not adjacent. - ErrRegionNotAdjacent = errors.New("two regions are not adjacent") - // ErrRegionNotFound is error info for region not found. - ErrRegionNotFound = func(regionID uint64) error { - return errors.Errorf("region %v not found", regionID) - } - // ErrRegionAbnormalPeer is error info for region has abnormal peer. - ErrRegionAbnormalPeer = func(regionID uint64) error { - return errors.Errorf("region %v has abnormal peer", regionID) - } - // ErrStoreNotFound is error info for store not found. - ErrStoreNotFound = func(storeID uint64) error { - return errors.Errorf("store %v not found", storeID) - } - // ErrPluginNotFound is error info for plugin not found. - ErrPluginNotFound = func(pluginPath string) error { - return errors.Errorf("plugin is not found: %s", pluginPath) - } -) +// SchedulerConfigHandlerPath is the api router path of the schedule config handler. +var SchedulerConfigHandlerPath = "/api/v1/scheduler-config" + +type server struct { + *Server +} + +func (s *server) GetCoordinator() *schedule.Coordinator { + return s.GetRaftCluster().GetCoordinator() +} + +func (s *server) GetCluster() sche.SharedCluster { + return s.GetRaftCluster() +} // Handler is a helper to export methods to handle API/RPC requests. type Handler struct { + *handler.Handler s *Server opt *config.PersistOptions pluginChMap map[string]chan string @@ -87,7 +70,16 @@ type Handler struct { } func newHandler(s *Server) *Handler { - return &Handler{s: s, opt: s.persistOptions, pluginChMap: make(map[string]chan string), pluginChMapLock: syncutil.RWMutex{}} + h := handler.NewHandler(&server{ + Server: s, + }) + return &Handler{ + Handler: h, + s: s, + opt: s.persistOptions, + pluginChMap: make(map[string]chan string), + pluginChMapLock: syncutil.RWMutex{}, + } } // GetRaftCluster returns RaftCluster. @@ -99,15 +91,6 @@ func (h *Handler) GetRaftCluster() (*cluster.RaftCluster, error) { return rc, nil } -// GetOperatorController returns OperatorController. -func (h *Handler) GetOperatorController() (*operator.Controller, error) { - rc := h.s.GetRaftCluster() - if rc == nil { - return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() - } - return rc.GetOperatorController(), nil -} - // IsSchedulerPaused returns whether scheduler is paused. func (h *Handler) IsSchedulerPaused(name string) (bool, error) { rc, err := h.GetRaftCluster() @@ -170,7 +153,7 @@ func (h *Handler) GetStores() ([]*core.StoreInfo, error) { storeID := s.GetId() store := rc.GetStore(storeID) if store == nil { - return nil, ErrStoreNotFound(storeID) + return nil, errs.ErrStoreNotFound.FastGenByArgs(storeID) } stores = append(stores, store) } @@ -412,119 +395,6 @@ func (h *Handler) AddGrantHotRegionScheduler(leaderID, peers string) error { return h.AddScheduler(schedulers.GrantHotRegionType, leaderID, peers) } -// GetOperator returns the region operator. -func (h *Handler) GetOperator(regionID uint64) (*operator.Operator, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - - op := c.GetOperator(regionID) - if op == nil { - return nil, ErrOperatorNotFound - } - - return op, nil -} - -// GetOperatorStatus returns the status of the region operator. -func (h *Handler) GetOperatorStatus(regionID uint64) (*operator.OpWithStatus, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - - op := c.GetOperatorStatus(regionID) - if op == nil { - return nil, ErrOperatorNotFound - } - - return op, nil -} - -// RemoveOperator removes the region operator. -func (h *Handler) RemoveOperator(regionID uint64) error { - c, err := h.GetOperatorController() - if err != nil { - return err - } - - op := c.GetOperator(regionID) - if op == nil { - return ErrOperatorNotFound - } - - _ = c.RemoveOperator(op, operator.AdminStop) - return nil -} - -// GetOperators returns the running operators. -func (h *Handler) GetOperators() ([]*operator.Operator, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - return c.GetOperators(), nil -} - -// GetWaitingOperators returns the waiting operators. -func (h *Handler) GetWaitingOperators() ([]*operator.Operator, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - return c.GetWaitingOperators(), nil -} - -// GetAdminOperators returns the running admin operators. -func (h *Handler) GetAdminOperators() ([]*operator.Operator, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - return c.GetOperatorsOfKind(operator.OpAdmin), nil -} - -// GetLeaderOperators returns the running leader operators. -func (h *Handler) GetLeaderOperators() ([]*operator.Operator, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - return c.GetOperatorsOfKind(operator.OpLeader), nil -} - -// GetRegionOperators returns the running region operators. -func (h *Handler) GetRegionOperators() ([]*operator.Operator, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - return c.GetOperatorsOfKind(operator.OpRegion), nil -} - -// GetHistory returns finished operators' history since start. -func (h *Handler) GetHistory(start time.Time) ([]operator.OpHistory, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - return c.GetHistory(start), nil -} - -// GetRecords returns finished operators since start. -func (h *Handler) GetRecords(from time.Time) ([]*operator.OpRecord, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - records := c.GetRecords(from) - if len(records) == 0 { - return nil, ErrOperatorNotFound - } - return records, nil -} - // SetAllStoresLimit is used to set limit of all stores. func (h *Handler) SetAllStoresLimit(ratePerMin float64, limitType storelimit.Type) error { c, err := h.GetRaftCluster() @@ -581,349 +451,6 @@ func (h *Handler) SetStoreLimit(storeID uint64, ratePerMin float64, limitType st return c.SetStoreLimit(storeID, limitType, ratePerMin) } -// AddTransferLeaderOperator adds an operator to transfer leader to the store. -func (h *Handler) AddTransferLeaderOperator(regionID uint64, storeID uint64) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - newLeader := region.GetStoreVoter(storeID) - if newLeader == nil { - return errors.Errorf("region has no voter in store %v", storeID) - } - - op, err := operator.CreateTransferLeaderOperator("admin-transfer-leader", c, region, region.GetLeader().GetStoreId(), newLeader.GetStoreId(), []uint64{}, operator.OpAdmin) - if err != nil { - log.Debug("fail to create transfer leader operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddTransferRegionOperator adds an operator to transfer region to the stores. -func (h *Handler) AddTransferRegionOperator(regionID uint64, storeIDs map[uint64]placement.PeerRoleType) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - if c.GetOpts().IsPlacementRulesEnabled() { - // Cannot determine role without peer role when placement rules enabled. Not supported now. - for _, role := range storeIDs { - if len(role) == 0 { - return errors.New("transfer region without peer role is not supported when placement rules enabled") - } - } - } - for id := range storeIDs { - if err := checkStoreState(c, id); err != nil { - return err - } - } - - roles := make(map[uint64]placement.PeerRoleType) - for id, peerRole := range storeIDs { - if peerRole == "" { - peerRole = placement.Voter - } - roles[id] = peerRole - } - op, err := operator.CreateMoveRegionOperator("admin-move-region", c, region, operator.OpAdmin, roles) - if err != nil { - log.Debug("fail to create move region operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddTransferPeerOperator adds an operator to transfer peer. -func (h *Handler) AddTransferPeerOperator(regionID uint64, fromStoreID, toStoreID uint64) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - oldPeer := region.GetStorePeer(fromStoreID) - if oldPeer == nil { - return errors.Errorf("region has no peer in store %v", fromStoreID) - } - - if err := checkStoreState(c, toStoreID); err != nil { - return err - } - - newPeer := &metapb.Peer{StoreId: toStoreID, Role: oldPeer.GetRole(), IsWitness: oldPeer.GetIsWitness()} - op, err := operator.CreateMovePeerOperator("admin-move-peer", c, region, operator.OpAdmin, fromStoreID, newPeer) - if err != nil { - log.Debug("fail to create move peer operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// checkAdminAddPeerOperator checks adminAddPeer operator with given region ID and store ID. -func (h *Handler) checkAdminAddPeerOperator(regionID uint64, toStoreID uint64) (*cluster.RaftCluster, *core.RegionInfo, error) { - c, err := h.GetRaftCluster() - if err != nil { - return nil, nil, err - } - - region := c.GetRegion(regionID) - if region == nil { - return nil, nil, ErrRegionNotFound(regionID) - } - - if region.GetStorePeer(toStoreID) != nil { - return nil, nil, errors.Errorf("region already has peer in store %v", toStoreID) - } - - if err := checkStoreState(c, toStoreID); err != nil { - return nil, nil, err - } - - return c, region, nil -} - -// AddAddPeerOperator adds an operator to add peer. -func (h *Handler) AddAddPeerOperator(regionID uint64, toStoreID uint64) error { - c, region, err := h.checkAdminAddPeerOperator(regionID, toStoreID) - if err != nil { - return err - } - - newPeer := &metapb.Peer{StoreId: toStoreID} - op, err := operator.CreateAddPeerOperator("admin-add-peer", c, region, newPeer, operator.OpAdmin) - if err != nil { - log.Debug("fail to create add peer operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddAddLearnerOperator adds an operator to add learner. -func (h *Handler) AddAddLearnerOperator(regionID uint64, toStoreID uint64) error { - c, region, err := h.checkAdminAddPeerOperator(regionID, toStoreID) - if err != nil { - return err - } - - newPeer := &metapb.Peer{ - StoreId: toStoreID, - Role: metapb.PeerRole_Learner, - } - - op, err := operator.CreateAddPeerOperator("admin-add-learner", c, region, newPeer, operator.OpAdmin) - if err != nil { - log.Debug("fail to create add learner operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddRemovePeerOperator adds an operator to remove peer. -func (h *Handler) AddRemovePeerOperator(regionID uint64, fromStoreID uint64) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - if region.GetStorePeer(fromStoreID) == nil { - return errors.Errorf("region has no peer in store %v", fromStoreID) - } - - op, err := operator.CreateRemovePeerOperator("admin-remove-peer", c, operator.OpAdmin, region, fromStoreID) - if err != nil { - log.Debug("fail to create move peer operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddMergeRegionOperator adds an operator to merge region. -func (h *Handler) AddMergeRegionOperator(regionID uint64, targetID uint64) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - target := c.GetRegion(targetID) - if target == nil { - return ErrRegionNotFound(targetID) - } - - if !filter.IsRegionHealthy(region) || !filter.IsRegionReplicated(c, region) { - return ErrRegionAbnormalPeer(regionID) - } - - if !filter.IsRegionHealthy(target) || !filter.IsRegionReplicated(c, target) { - return ErrRegionAbnormalPeer(targetID) - } - - // for the case first region (start key is nil) with the last region (end key is nil) but not adjacent - if (!bytes.Equal(region.GetStartKey(), target.GetEndKey()) || len(region.GetStartKey()) == 0) && - (!bytes.Equal(region.GetEndKey(), target.GetStartKey()) || len(region.GetEndKey()) == 0) { - return ErrRegionNotAdjacent - } - - ops, err := operator.CreateMergeRegionOperator("admin-merge-region", c, region, target, operator.OpAdmin) - if err != nil { - log.Debug("fail to create merge region operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(ops...); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddSplitRegionOperator adds an operator to split a region. -func (h *Handler) AddSplitRegionOperator(regionID uint64, policyStr string, keys []string) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - policy, ok := pdpb.CheckPolicy_value[strings.ToUpper(policyStr)] - if !ok { - return errors.Errorf("check policy %s is not supported", policyStr) - } - - var splitKeys [][]byte - if pdpb.CheckPolicy(policy) == pdpb.CheckPolicy_USEKEY { - for i := range keys { - k, err := hex.DecodeString(keys[i]) - if err != nil { - return errors.Errorf("split key %s is not in hex format", keys[i]) - } - splitKeys = append(splitKeys, k) - } - } - - op, err := operator.CreateSplitRegionOperator("admin-split-region", region, operator.OpAdmin, pdpb.CheckPolicy(policy), splitKeys) - if err != nil { - return err - } - - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddScatterRegionOperator adds an operator to scatter a region. -func (h *Handler) AddScatterRegionOperator(regionID uint64, group string) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - if c.IsRegionHot(region) { - return errors.Errorf("region %d is a hot region", regionID) - } - - op, err := c.GetRegionScatter().Scatter(region, group, false) - if err != nil { - return err - } - - if op == nil { - return nil - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddScatterRegionsOperators add operators to scatter regions and return the processed percentage and error -func (h *Handler) AddScatterRegionsOperators(regionIDs []uint64, startRawKey, endRawKey, group string, retryLimit int) (int, error) { - c, err := h.GetRaftCluster() - if err != nil { - return 0, err - } - opsCount := 0 - var failures map[uint64]error - // If startKey and endKey are both defined, use them first. - if len(startRawKey) > 0 && len(endRawKey) > 0 { - startKey, err := hex.DecodeString(startRawKey) - if err != nil { - return 0, err - } - endKey, err := hex.DecodeString(endRawKey) - if err != nil { - return 0, err - } - opsCount, failures, err = c.GetRegionScatter().ScatterRegionsByRange(startKey, endKey, group, retryLimit) - if err != nil { - return 0, err - } - } else { - opsCount, failures, err = c.GetRegionScatter().ScatterRegionsByID(regionIDs, group, retryLimit, false) - if err != nil { - return 0, err - } - } - percentage := 100 - if len(failures) > 0 { - percentage = 100 - 100*len(failures)/(opsCount+len(failures)) - } - return percentage, nil -} - // GetRegionsByType gets the region with specified type. func (h *Handler) GetRegionsByType(typ statistics.RegionStatisticType) ([]*core.RegionInfo, error) { c := h.s.GetRaftCluster() @@ -1015,7 +542,7 @@ func (h *Handler) PluginUnload(pluginPath string) error { ch <- schedule.PluginUnload return nil } - return ErrPluginNotFound(pluginPath) + return errs.ErrPluginNotFound.FastGenByArgs(pluginPath) } // GetAddr returns the server urls for clients. @@ -1106,20 +633,6 @@ func (h *Handler) GetHistoryHotRegionIter( return iter } -func checkStoreState(rc *cluster.RaftCluster, storeID uint64) error { - store := rc.GetStore(storeID) - if store == nil { - return errs.ErrStoreNotFound.FastGenByArgs(storeID) - } - if store.IsRemoved() { - return errs.ErrStoreRemoved.FastGenByArgs(storeID) - } - if store.IsUnhealthy() { - return errs.ErrStoreUnhealthy.FastGenByArgs(storeID) - } - return nil -} - // RedirectSchedulerUpdate update scheduler config. Export this func to help handle damaged store. func (h *Handler) redirectSchedulerUpdate(name string, storeID float64) error { input := make(map[string]interface{}) diff --git a/server/server.go b/server/server.go index 7c19d8ff7c5..9e72477368d 100644 --- a/server/server.go +++ b/server/server.go @@ -1709,7 +1709,10 @@ func (s *Server) campaignLeader() { log.Info("triggering the leader callback functions") for _, cb := range s.leaderCallbacks { - cb(ctx) + if err := cb(ctx); err != nil { + log.Error("failed to execute leader callback function", errs.ZapError(err)) + return + } } // Try to create raft cluster. diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index 926484cea1e..546339bee0f 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "io" + "math/rand" "net/http" "strconv" "strings" @@ -1265,3 +1266,106 @@ func (suite *resourceManagerClientTestSuite) TestSkipConsumptionForBackgroundJob c.Stop() } + +func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigChanged() { + re := suite.Require() + cli := suite.client + for _, group := range suite.initGroups { + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + } + c1, err := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) + re.NoError(err) + c1.Start(suite.ctx) + // with client option + c2, err := controller.NewResourceGroupController(suite.ctx, 2, cli, nil, controller.WithMaxWaitDuration(time.Hour)) + re.NoError(err) + c2.Start(suite.ctx) + // helper function for sending HTTP requests and checking responses + sendRequest := func(method, url string, body io.Reader) []byte { + req, err := http.NewRequest(method, url, body) + re.NoError(err) + resp, err := http.DefaultClient.Do(req) + re.NoError(err) + defer resp.Body.Close() + bytes, err := io.ReadAll(resp.Body) + re.NoError(err) + if resp.StatusCode != http.StatusOK { + re.Fail(string(bytes)) + } + return bytes + } + + getAddr := func() string { + server := suite.cluster.GetServer(suite.cluster.GetLeader()) + if rand.Intn(100)%2 == 1 { + server = suite.cluster.GetServer(suite.cluster.GetFollower()) + } + return server.GetAddr() + } + + configURL := "/resource-manager/api/v1/config/controller" + waitDuration := 10 * time.Second + readBaseCost := 1.5 + defaultCfg := controller.DefaultConfig() + // failpoint enableDegradedMode will setup and set it be 1s. + defaultCfg.DegradedModeWaitDuration.Duration = time.Second + expectRUCfg := controller.GenerateRUConfig(defaultCfg) + // initial config verification + respString := sendRequest("GET", getAddr()+configURL, nil) + defaultString, err := json.Marshal(defaultCfg) + re.NoError(err) + re.JSONEq(string(respString), string(defaultString)) + re.EqualValues(expectRUCfg, c1.GetConfig()) + + testCases := []struct { + configJSON string + value interface{} + expected func(ruConfig *controller.RUConfig) + }{ + { + configJSON: fmt.Sprintf(`{"degraded-mode-wait-duration": "%v"}`, waitDuration), + value: waitDuration, + expected: func(ruConfig *controller.RUConfig) { ruConfig.DegradedModeWaitDuration = waitDuration }, + }, + { + configJSON: fmt.Sprintf(`{"ltb-max-wait-duration": "%v"}`, waitDuration), + value: waitDuration, + expected: func(ruConfig *controller.RUConfig) { ruConfig.LTBMaxWaitDuration = waitDuration }, + }, + { + configJSON: fmt.Sprintf(`{"read-base-cost": %v}`, readBaseCost), + value: readBaseCost, + expected: func(ruConfig *controller.RUConfig) { ruConfig.ReadBaseCost = controller.RequestUnit(readBaseCost) }, + }, + { + configJSON: fmt.Sprintf(`{"write-base-cost": %v}`, readBaseCost*2), + value: readBaseCost * 2, + expected: func(ruConfig *controller.RUConfig) { ruConfig.WriteBaseCost = controller.RequestUnit(readBaseCost * 2) }, + }, + { + // reset the degraded-mode-wait-duration to default in test. + configJSON: fmt.Sprintf(`{"degraded-mode-wait-duration": "%v"}`, time.Second), + value: time.Second, + expected: func(ruConfig *controller.RUConfig) { ruConfig.DegradedModeWaitDuration = time.Second }, + }, + } + // change properties one by one and verify each time + for _, t := range testCases { + sendRequest("POST", getAddr()+configURL, strings.NewReader(t.configJSON)) + time.Sleep(500 * time.Millisecond) + t.expected(expectRUCfg) + re.EqualValues(expectRUCfg, c1.GetConfig()) + + expectRUCfg2 := *expectRUCfg + // always apply the client option + expectRUCfg2.LTBMaxWaitDuration = time.Hour + re.EqualValues(&expectRUCfg2, c2.GetConfig()) + } + // restart c1 + c1.Stop() + c1, err = controller.NewResourceGroupController(suite.ctx, 1, cli, nil) + re.NoError(err) + re.EqualValues(expectRUCfg, c1.GetConfig()) +}