diff --git a/client/client.go b/client/client.go index 8c8299daeab..aafe4aba77f 100644 --- a/client/client.go +++ b/client/client.go @@ -214,8 +214,9 @@ func WithSkipStoreLimit() RegionsOption { // GetRegionOp represents available options when getting regions. type GetRegionOp struct { - needBuckets bool - allowFollowerHandle bool + needBuckets bool + allowFollowerHandle bool + outputMustContainAllKeyRange bool } // GetRegionOption configures GetRegionOp. @@ -231,6 +232,11 @@ func WithAllowFollowerHandle() GetRegionOption { return func(op *GetRegionOp) { op.allowFollowerHandle = true } } +// WithOutputMustContainAllKeyRange means the output must contain all key ranges. +func WithOutputMustContainAllKeyRange() GetRegionOption { + return func(op *GetRegionOp) { op.outputMustContainAllKeyRange = true } +} + var ( // errUnmatchedClusterID is returned when found a PD with a different cluster ID. errUnmatchedClusterID = errors.New("[pd] unmatched cluster id") @@ -1193,10 +1199,11 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit pbRanges = append(pbRanges, &pdpb.KeyRange{StartKey: r.StartKey, EndKey: r.EndKey}) } req := &pdpb.BatchScanRegionsRequest{ - Header: c.requestHeader(), - NeedBuckets: options.needBuckets, - Ranges: pbRanges, - Limit: int32(limit), + Header: c.requestHeader(), + NeedBuckets: options.needBuckets, + Ranges: pbRanges, + Limit: int32(limit), + ContainAllKeyRange: options.outputMustContainAllKeyRange, } serviceClient, cctx := c.getRegionAPIClientAndContext(scanCtx, options.allowFollowerHandle && c.option.getEnableFollowerHandle()) if serviceClient == nil { diff --git a/client/go.mod b/client/go.mod index 7c782695539..8dc706a4540 100644 --- a/client/go.mod +++ b/client/go.mod @@ -10,7 +10,7 @@ require ( github.com/opentracing/opentracing-go v1.2.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 + github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.8.2 diff --git a/client/go.sum b/client/go.sum index 8f85f5ce7ed..20c154c30dc 100644 --- a/client/go.sum +++ b/client/go.sum @@ -46,8 +46,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 h1:6aIKNB2YGAec4IUDLw6G2eDECiGiufZcgEbZSCELBx0= -github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7 h1:V9XS3FQ/P6u+kFaoSyY5DBswIA774BMpIOLDBMrpxKc= +github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/client/metrics.go b/client/metrics.go index f3c47d7e787..a11362669b3 100644 --- a/client/metrics.go +++ b/client/metrics.go @@ -159,11 +159,13 @@ var ( cmdFailedDurationUpdateServiceGCSafePoint prometheus.Observer cmdFailedDurationLoadKeyspace prometheus.Observer cmdFailedDurationUpdateKeyspaceState prometheus.Observer - requestDurationTSO prometheus.Observer cmdFailedDurationGet prometheus.Observer cmdFailedDurationPut prometheus.Observer cmdFailedDurationUpdateGCSafePointV2 prometheus.Observer cmdFailedDurationUpdateServiceSafePointV2 prometheus.Observer + + requestDurationTSO prometheus.Observer + requestFailedDurationTSO prometheus.Observer ) func initCmdDurations() { @@ -207,11 +209,13 @@ func initCmdDurations() { cmdFailedDurationUpdateServiceGCSafePoint = cmdFailedDuration.WithLabelValues("update_service_gc_safe_point") cmdFailedDurationLoadKeyspace = cmdFailedDuration.WithLabelValues("load_keyspace") cmdFailedDurationUpdateKeyspaceState = cmdFailedDuration.WithLabelValues("update_keyspace_state") - requestDurationTSO = requestDuration.WithLabelValues("tso") cmdFailedDurationGet = cmdFailedDuration.WithLabelValues("get") cmdFailedDurationPut = cmdFailedDuration.WithLabelValues("put") cmdFailedDurationUpdateGCSafePointV2 = cmdFailedDuration.WithLabelValues("update_gc_safe_point_v2") cmdFailedDurationUpdateServiceSafePointV2 = cmdFailedDuration.WithLabelValues("update_service_safe_point_v2") + + requestDurationTSO = requestDuration.WithLabelValues("tso") + requestFailedDurationTSO = requestDuration.WithLabelValues("tso-failed") } func registerMetrics() { diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 8d57c46e855..cc18817d9c5 100644 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -357,33 +357,38 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { if err = proto.Unmarshal(item.Kv.Value, group); err != nil { continue } - if gc, ok := c.loadGroupController(group.Name); ok { + name := group.GetName() + gc, ok := c.loadGroupController(name) + if !ok { + continue + } + if !gc.tombstone.Load() { gc.modifyMeta(group) - // If the resource group is marked as tombstone before, set it as active again. - if swapped := gc.tombstone.CompareAndSwap(true, false); swapped { - resourceGroupStatusGauge.WithLabelValues(group.Name, gc.name).Set(1) - log.Info("[resource group controller] mark resource group as active", zap.String("name", group.Name)) - } + continue + } + // If the resource group is marked as tombstone before, re-create the resource group controller. + newGC, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) + if err != nil { + log.Warn("[resource group controller] re-create resource group cost controller for tombstone failed", + zap.String("name", name), zap.Error(err)) + continue + } + if c.groupsController.CompareAndSwap(name, gc, newGC) { + log.Info("[resource group controller] re-create resource group cost controller for tombstone", + zap.String("name", name)) } case meta_storagepb.Event_DELETE: - if item.PrevKv != nil { - if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil { - continue - } - // Do not delete the resource group immediately, just mark it as tombstone. - // For the requests that are still in progress, fallback to the default resource group. - if gc, ok := c.loadGroupController(group.Name); ok { - gc.tombstone.Store(true) - resourceGroupStatusGauge.DeleteLabelValues(group.Name, group.Name) - resourceGroupStatusGauge.WithLabelValues(group.Name, defaultResourceGroupName).Set(1) - log.Info("[resource group controller] mark resource group as tombstone", zap.String("name", group.Name)) - } - } else { - // Prev-kv is compacted means there must have been a delete event before this event, - // which means that this is just a duplicated event, so we can just ignore it. + // Prev-kv is compacted means there must have been a delete event before this event, + // which means that this is just a duplicated event, so we can just ignore it. + if item.PrevKv == nil { log.Info("[resource group controller] previous key-value pair has been compacted", zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value))) + continue + } + if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil { + continue } + c.tombstoneGroupCostController(group.GetName()) } } case resp, ok := <-watchConfigChannel: @@ -446,15 +451,23 @@ func (c *ResourceGroupsController) loadOrStoreGroupController(name string, gc *g return tmp.(*groupCostController), loaded } -// tryGetResourceGroup will try to get the resource group controller from local cache first, -// if the local cache misses, it will then call gRPC to fetch the resource group info from server. -func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name string) (*groupCostController, error) { +// NewResourceGroupNotExistErr returns a new error that indicates the resource group does not exist. +// It's exported for testing. +func NewResourceGroupNotExistErr(name string) error { + return errors.Errorf("%s does not exist", name) +} + +// tryGetResourceGroupController will try to get the resource group controller from local cache first. +// If the local cache misses, it will then call gRPC to fetch the resource group info from the remote server. +// If `useTombstone` is true, it will return the resource group controller even if it is marked as tombstone. +func (c *ResourceGroupsController) tryGetResourceGroupController( + ctx context.Context, name string, useTombstone bool, +) (*groupCostController, error) { // Get from the local cache first. gc, ok := c.loadGroupController(name) if ok { - // If the resource group is marked as tombstone, fallback to the default resource group. - if gc.tombstone.Load() && name != defaultResourceGroupName { - return c.tryGetResourceGroup(ctx, defaultResourceGroupName) + if !useTombstone && gc.tombstone.Load() { + return nil, NewResourceGroupNotExistErr(name) } return gc, nil } @@ -464,7 +477,7 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name return nil, err } if group == nil { - return nil, errors.Errorf("%s does not exists", name) + return nil, NewResourceGroupNotExistErr(name) } // Check again to prevent initializing the same resource group concurrently. if gc, ok = c.loadGroupController(name); ok { @@ -476,14 +489,51 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name return nil, err } // Check again to prevent initializing the same resource group concurrently. - gc, loaded := c.loadOrStoreGroupController(group.Name, gc) + gc, loaded := c.loadOrStoreGroupController(name, gc) if !loaded { resourceGroupStatusGauge.WithLabelValues(name, group.Name).Set(1) - log.Info("[resource group controller] create resource group cost controller", zap.String("name", group.GetName())) + log.Info("[resource group controller] create resource group cost controller", zap.String("name", name)) } return gc, nil } +// Do not delete the resource group immediately to prevent from interrupting the ongoing request, +// mark it as tombstone and create a default resource group controller for it. +func (c *ResourceGroupsController) tombstoneGroupCostController(name string) { + _, ok := c.loadGroupController(name) + if !ok { + return + } + // The default resource group controller should never be deleted. + if name == defaultResourceGroupName { + return + } + // Try to get the default group meta first. + defaultGC, err := c.tryGetResourceGroupController(c.loopCtx, defaultResourceGroupName, false) + if err != nil || defaultGC == nil { + log.Warn("[resource group controller] get default resource group meta for tombstone failed", + zap.String("name", name), zap.Error(err)) + // Directly delete the resource group controller if the default group is not available. + c.groupsController.Delete(name) + return + } + // Create a default resource group controller for the tombstone resource group independently. + gc, err := newGroupCostController(defaultGC.getMeta(), c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) + if err != nil { + log.Warn("[resource group controller] create default resource group cost controller for tombstone failed", + zap.String("name", name), zap.Error(err)) + // Directly delete the resource group controller if the default group controller cannot be created. + c.groupsController.Delete(name) + return + } + gc.tombstone.Store(true) + c.groupsController.Store(name, gc) + // Its metrics will be deleted in the cleanup process. + resourceGroupStatusGauge.WithLabelValues(name, name).Set(2) + log.Info("[resource group controller] default resource group controller cost created for tombstone", + zap.String("name", name)) +} + func (c *ResourceGroupsController) cleanUpResourceGroup() { c.groupsController.Range(func(key, value any) bool { resourceGroupName := key.(string) @@ -496,7 +546,6 @@ func (c *ResourceGroupsController) cleanUpResourceGroup() { if gc.inactive || gc.tombstone.Load() { c.groupsController.Delete(resourceGroupName) resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, resourceGroupName) - resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, defaultResourceGroupName) return true } gc.inactive = true @@ -589,7 +638,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, func (c *ResourceGroupsController) OnRequestWait( ctx context.Context, resourceGroupName string, info RequestInfo, ) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) { - gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) + gc, err := c.tryGetResourceGroupController(ctx, resourceGroupName, true) if err != nil { return nil, nil, time.Duration(0), 0, err } @@ -605,17 +654,13 @@ func (c *ResourceGroupsController) OnResponse( log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName)) return &rmpb.Consumption{}, nil } - // If the resource group is marked as tombstone, fallback to the default resource group. - if gc.tombstone.Load() && resourceGroupName != defaultResourceGroupName { - return c.OnResponse(defaultResourceGroupName, req, resp) - } return gc.onResponse(req, resp) } // IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it. func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context, resourceGroupName, requestResource string) bool { - gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) + gc, err := c.tryGetResourceGroupController(ctx, resourceGroupName, false) if err != nil { return false } @@ -626,7 +671,7 @@ func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context, func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context, bg *rmpb.BackgroundSettings, requestResource string) bool { // fallback to default resource group. if bg == nil { - gc, err := c.tryGetResourceGroup(ctx, defaultResourceGroupName) + gc, err := c.tryGetResourceGroupController(ctx, defaultResourceGroupName, false) if err != nil { return false } @@ -646,7 +691,7 @@ func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context, // GetResourceGroup returns the meta setting of the given resource group name. func (c *ResourceGroupsController) GetResourceGroup(resourceGroupName string) (*rmpb.ResourceGroup, error) { - gc, err := c.tryGetResourceGroup(c.loopCtx, resourceGroupName) + gc, err := c.tryGetResourceGroupController(c.loopCtx, resourceGroupName, false) if err != nil { return nil, err } diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index e198effb2d8..821364c292f 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -211,11 +211,11 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) { mockProvider.On("GetResourceGroup", mock.Anything, defaultResourceGroupName, mock.Anything).Return(defaultResourceGroup, nil) mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil) - c1, err := controller.tryGetResourceGroup(ctx, defaultResourceGroupName) + c1, err := controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false) re.NoError(err) re.Equal(defaultResourceGroup, c1.meta) - c2, err := controller.tryGetResourceGroup(ctx, "test-group") + c2, err := controller.tryGetResourceGroupController(ctx, "test-group", false) re.NoError(err) re.Equal(testResourceGroup, c2.meta) @@ -271,7 +271,7 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) { } } -func TestGetController(t *testing.T) { +func TestTryGetController(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -286,39 +286,51 @@ func TestGetController(t *testing.T) { mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil) mockProvider.On("GetResourceGroup", mock.Anything, "test-group-non-existent", mock.Anything).Return((*rmpb.ResourceGroup)(nil), nil) - c, err := controller.GetResourceGroup("test-group-non-existent") + gc, err := controller.tryGetResourceGroupController(ctx, "test-group-non-existent", false) re.Error(err) - re.Nil(c) - c, err = controller.GetResourceGroup(defaultResourceGroupName) + re.Nil(gc) + gc, err = controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false) re.NoError(err) - re.Equal(defaultResourceGroup, c) - c, err = controller.GetResourceGroup("test-group") + re.Equal(defaultResourceGroup, gc.getMeta()) + gc, err = controller.tryGetResourceGroupController(ctx, "test-group", false) re.NoError(err) - re.Equal(testResourceGroup, c) - _, _, _, _, err = controller.OnRequestWait(ctx, "test-group", &TestRequestInfo{}) + re.Equal(testResourceGroup, gc.getMeta()) + requestInfo, responseInfo := NewTestRequestInfo(true, 1, 1), NewTestResponseInfo(1, time.Millisecond, true) + _, _, _, _, err = controller.OnRequestWait(ctx, "test-group", requestInfo) re.NoError(err) - _, err = controller.OnResponse("test-group", &TestRequestInfo{}, &TestResponseInfo{}) + consumption, err := controller.OnResponse("test-group", requestInfo, responseInfo) re.NoError(err) + re.NotEmpty(consumption) // Mark the tombstone manually to test the fallback case. - gc, err := controller.tryGetResourceGroup(ctx, "test-group") + gc, err = controller.tryGetResourceGroupController(ctx, "test-group", false) re.NoError(err) - gc.tombstone.Store(true) - c, err = controller.GetResourceGroup("test-group") + re.NotNil(gc) + controller.tombstoneGroupCostController("test-group") + gc, err = controller.tryGetResourceGroupController(ctx, "test-group", false) + re.Error(err) + re.Nil(gc) + gc, err = controller.tryGetResourceGroupController(ctx, "test-group", true) + re.NoError(err) + re.Equal(defaultResourceGroup, gc.getMeta()) + _, _, _, _, err = controller.OnRequestWait(ctx, "test-group", requestInfo) re.NoError(err) - re.Equal(defaultResourceGroup, c) - _, _, _, _, err = controller.OnRequestWait(ctx, "test-group", &TestRequestInfo{}) + consumption, err = controller.OnResponse("test-group", requestInfo, responseInfo) re.NoError(err) - _, err = controller.OnResponse("test-group", &TestRequestInfo{}, &TestResponseInfo{}) + re.NotEmpty(consumption) + // Test the default group protection. + gc, err = controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false) re.NoError(err) - // Mark the default group tombstone manually to test the fallback case. - gc, err = controller.tryGetResourceGroup(ctx, defaultResourceGroupName) + re.Equal(defaultResourceGroup, gc.getMeta()) + controller.tombstoneGroupCostController(defaultResourceGroupName) + gc, err = controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false) re.NoError(err) - gc.tombstone.Store(true) - c, err = controller.GetResourceGroup(defaultResourceGroupName) + re.Equal(defaultResourceGroup, gc.getMeta()) + gc, err = controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, true) re.NoError(err) - re.Equal(defaultResourceGroup, c) - _, _, _, _, err = controller.OnRequestWait(ctx, defaultResourceGroupName, &TestRequestInfo{}) + re.Equal(defaultResourceGroup, gc.getMeta()) + _, _, _, _, err = controller.OnRequestWait(ctx, defaultResourceGroupName, requestInfo) re.NoError(err) - _, err = controller.OnResponse(defaultResourceGroupName, &TestRequestInfo{}, &TestResponseInfo{}) + consumption, err = controller.OnResponse(defaultResourceGroupName, requestInfo, responseInfo) re.NoError(err) + re.NotEmpty(consumption) } diff --git a/client/tso_stream.go b/client/tso_stream.go index dd5b9422aae..9c4d78dfe18 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -141,7 +141,9 @@ func (s *pdTSOStream) processRequests( } tsoBatchSendLatency.Observe(time.Since(batchStartTime).Seconds()) resp, err := s.stream.Recv() + duration := time.Since(start).Seconds() if err != nil { + requestFailedDurationTSO.Observe(duration) if err == io.EOF { err = errs.ErrClientTSOStreamClosed } else { @@ -149,7 +151,7 @@ func (s *pdTSOStream) processRequests( } return } - requestDurationTSO.Observe(time.Since(start).Seconds()) + requestDurationTSO.Observe(duration) tsoBatchSize.Observe(float64(count)) if resp.GetCount() != uint32(count) { @@ -197,7 +199,9 @@ func (s *tsoTSOStream) processRequests( } tsoBatchSendLatency.Observe(time.Since(batchStartTime).Seconds()) resp, err := s.stream.Recv() + duration := time.Since(start).Seconds() if err != nil { + requestFailedDurationTSO.Observe(duration) if err == io.EOF { err = errs.ErrClientTSOStreamClosed } else { @@ -205,7 +209,7 @@ func (s *tsoTSOStream) processRequests( } return } - requestDurationTSO.Observe(time.Since(start).Seconds()) + requestDurationTSO.Observe(duration) tsoBatchSize.Observe(float64(count)) if resp.GetCount() != uint32(count) { diff --git a/go.mod b/go.mod index 1ef14f416e8..aaf0ed1a435 100644 --- a/go.mod +++ b/go.mod @@ -34,10 +34,10 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 + github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 - github.com/pingcap/tidb-dashboard v0.0.0-20240612100141-91f6c281e441 + github.com/pingcap/tidb-dashboard v0.0.0-20240718034516-e6e78c7c120b github.com/prometheus/client_golang v1.19.0 github.com/prometheus/common v0.51.1 github.com/sasha-s/go-deadlock v0.2.0 @@ -173,7 +173,7 @@ require ( go.etcd.io/bbolt v1.3.9 // indirect go.uber.org/dig v1.9.0 // indirect go.uber.org/fx v1.12.0 // indirect - go.uber.org/multierr v1.11.0 // indirect + go.uber.org/multierr v1.11.0 golang.org/x/arch v0.3.0 // indirect golang.org/x/crypto v0.21.0 // indirect golang.org/x/image v0.10.0 // indirect diff --git a/go.sum b/go.sum index 659cd116e9c..480f99af33e 100644 --- a/go.sum +++ b/go.sum @@ -371,15 +371,15 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 h1:6aIKNB2YGAec4IUDLw6G2eDECiGiufZcgEbZSCELBx0= -github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7 h1:V9XS3FQ/P6u+kFaoSyY5DBswIA774BMpIOLDBMrpxKc= +github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20240612100141-91f6c281e441 h1:01flLztcoWBeT5pe69Q8LAB2Hty0s9Rqc3RvHU4AQK8= -github.com/pingcap/tidb-dashboard v0.0.0-20240612100141-91f6c281e441/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= +github.com/pingcap/tidb-dashboard v0.0.0-20240718034516-e6e78c7c120b h1:MKgJ9yCQxD5ewLERuoiiD9XVOHuqZ2WRZnB20yMiKyo= +github.com/pingcap/tidb-dashboard v0.0.0-20240718034516-e6e78c7c120b/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/pkg/core/basic_cluster.go b/pkg/core/basic_cluster.go index ea78c4ccf9c..f0b23bd6434 100644 --- a/pkg/core/basic_cluster.go +++ b/pkg/core/basic_cluster.go @@ -14,6 +14,8 @@ package core +import "bytes" + // BasicCluster provides basic data member and interface for a tikv cluster. type BasicCluster struct { *StoresInfo @@ -97,7 +99,29 @@ type RegionSetInformer interface { GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo) ScanRegions(startKey, endKey []byte, limit int) []*RegionInfo GetRegionByKey(regionKey []byte) *RegionInfo - BatchScanRegions(keyRanges *KeyRanges, limit int) []*RegionInfo + BatchScanRegions(keyRanges *KeyRanges, opts ...BatchScanRegionsOptionFunc) ([]*RegionInfo, error) +} + +type batchScanRegionsOptions struct { + limit int + outputMustContainAllKeyRange bool +} + +// BatchScanRegionsOptionFunc is the option function for BatchScanRegions. +type BatchScanRegionsOptionFunc func(*batchScanRegionsOptions) + +// WithLimit is an option for batchScanRegionsOptions. +func WithLimit(limit int) BatchScanRegionsOptionFunc { + return func(opt *batchScanRegionsOptions) { + opt.limit = limit + } +} + +// WithOutputMustContainAllKeyRange is an option for batchScanRegionsOptions. +func WithOutputMustContainAllKeyRange() BatchScanRegionsOptionFunc { + return func(opt *batchScanRegionsOptions) { + opt.outputMustContainAllKeyRange = true + } } // StoreSetInformer provides access to a shared informer of stores. @@ -136,7 +160,7 @@ func NewKeyRange(startKey, endKey string) KeyRange { } } -// KeyRanges is a slice of KeyRange. +// KeyRanges is a slice of monotonically increasing KeyRange. type KeyRanges struct { krs []*KeyRange } @@ -163,3 +187,30 @@ func (rs *KeyRanges) Ranges() []*KeyRange { } return rs.krs } + +// Merge merges the continuous KeyRanges. +func (rs *KeyRanges) Merge() { + if len(rs.krs) == 0 { + return + } + merged := make([]*KeyRange, 0, len(rs.krs)) + start := rs.krs[0].StartKey + end := rs.krs[0].EndKey + for _, kr := range rs.krs[1:] { + if bytes.Equal(end, kr.StartKey) { + end = kr.EndKey + } else { + merged = append(merged, &KeyRange{ + StartKey: start, + EndKey: end, + }) + start = kr.StartKey + end = kr.EndKey + } + } + merged = append(merged, &KeyRange{ + StartKey: start, + EndKey: end, + }) + rs.krs = merged +} diff --git a/pkg/core/basic_cluster_test.go b/pkg/core/basic_cluster_test.go new file mode 100644 index 00000000000..3d74dd49eea --- /dev/null +++ b/pkg/core/basic_cluster_test.go @@ -0,0 +1,93 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMergeKeyRanges(t *testing.T) { + re := require.New(t) + + testCases := []struct { + name string + input []*KeyRange + expect []*KeyRange + }{ + { + name: "empty", + input: []*KeyRange{}, + expect: []*KeyRange{}, + }, + { + name: "single", + input: []*KeyRange{ + {StartKey: []byte("a"), EndKey: []byte("b")}, + }, + expect: []*KeyRange{ + {StartKey: []byte("a"), EndKey: []byte("b")}, + }, + }, + { + name: "non-overlapping", + input: []*KeyRange{ + {StartKey: []byte("a"), EndKey: []byte("b")}, + {StartKey: []byte("c"), EndKey: []byte("d")}, + }, + expect: []*KeyRange{ + {StartKey: []byte("a"), EndKey: []byte("b")}, + {StartKey: []byte("c"), EndKey: []byte("d")}, + }, + }, + { + name: "continuous", + input: []*KeyRange{ + {StartKey: []byte("a"), EndKey: []byte("b")}, + {StartKey: []byte("b"), EndKey: []byte("c")}, + }, + expect: []*KeyRange{ + {StartKey: []byte("a"), EndKey: []byte("c")}, + }, + }, + { + name: "boundless 1", + input: []*KeyRange{ + {StartKey: nil, EndKey: []byte("b")}, + {StartKey: []byte("b"), EndKey: []byte("c")}, + }, + expect: []*KeyRange{ + {StartKey: nil, EndKey: []byte("c")}, + }, + }, + { + name: "boundless 2", + input: []*KeyRange{ + {StartKey: []byte("a"), EndKey: []byte("b")}, + {StartKey: []byte("b"), EndKey: nil}, + }, + expect: []*KeyRange{ + {StartKey: []byte("a"), EndKey: nil}, + }, + }, + } + + for _, tc := range testCases { + rs := &KeyRanges{krs: tc.input} + rs.Merge() + re.Equal(tc.expect, rs.Ranges(), tc.name) + } +} diff --git a/pkg/core/region.go b/pkg/core/region.go index f0c78f443bd..eb8b89aecff 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -1826,37 +1826,91 @@ func (r *RegionsInfo) ScanRegions(startKey, endKey []byte, limit int) []*RegionI // BatchScanRegions scans regions in given key pairs, returns at most `limit` regions. // limit <= 0 means no limit. // The given key pairs should be non-overlapping. -func (r *RegionsInfo) BatchScanRegions(keyRanges *KeyRanges, limit int) []*RegionInfo { - r.t.RLock() - defer r.t.RUnlock() - +func (r *RegionsInfo) BatchScanRegions(keyRanges *KeyRanges, opts ...BatchScanRegionsOptionFunc) ([]*RegionInfo, error) { + keyRanges.Merge() krs := keyRanges.Ranges() res := make([]*RegionInfo, 0, len(krs)) - var lastRegion *RegionInfo + + scanOptions := &batchScanRegionsOptions{} + for _, opt := range opts { + opt(scanOptions) + } + + r.t.RLock() + defer r.t.RUnlock() for _, keyRange := range krs { - if limit > 0 && len(res) >= limit { - return res + if scanOptions.limit > 0 && len(res) >= scanOptions.limit { + res = res[:scanOptions.limit] + return res, nil } - if lastRegion != nil { - if lastRegion.Contains(keyRange.EndKey) { - continue - } else if lastRegion.Contains(keyRange.StartKey) { - keyRange.StartKey = lastRegion.GetEndKey() - } + + regions, err := scanRegion(r.tree, keyRange, scanOptions.limit, scanOptions.outputMustContainAllKeyRange) + if err != nil { + return nil, err } - r.tree.scanRange(keyRange.StartKey, func(region *RegionInfo) bool { - if len(keyRange.EndKey) > 0 && bytes.Compare(region.GetStartKey(), keyRange.EndKey) >= 0 { - return false - } - if limit > 0 && len(res) >= limit { + if len(res) > 0 && len(regions) > 0 && res[len(res)-1].meta.Id == regions[0].meta.Id { + // skip the region that has been scanned + regions = regions[1:] + } + res = append(res, regions...) + } + return res, nil +} + +func scanRegion(regionTree *regionTree, keyRange *KeyRange, limit int, outputMustContainAllKeyRange bool) ([]*RegionInfo, error) { + var ( + res []*RegionInfo + lastRegion = &RegionInfo{ + meta: &metapb.Region{EndKey: keyRange.StartKey}, + } + exceedLimit = func() bool { return limit > 0 && len(res) >= limit } + err error + ) + regionTree.scanRange(keyRange.StartKey, func(region *RegionInfo) bool { + if len(keyRange.EndKey) > 0 && len(region.GetStartKey()) > 0 && + bytes.Compare(region.GetStartKey(), keyRange.EndKey) >= 0 { + return false + } + if exceedLimit() { + return false + } + if len(lastRegion.GetEndKey()) > 0 && len(region.GetStartKey()) > 0 && + bytes.Compare(region.GetStartKey(), lastRegion.GetEndKey()) > 0 { + err = errs.ErrRegionNotAdjacent.FastGen( + "key range[%x, %x) found a hole region between region[%x, %x) and region[%x, %x)", + keyRange.StartKey, keyRange.EndKey, + lastRegion.GetStartKey(), lastRegion.GetEndKey(), + region.GetStartKey(), region.GetEndKey()) + log.Warn("scan regions failed", zap.Bool("outputMustContainAllKeyRange", + outputMustContainAllKeyRange), zap.Error(err)) + if outputMustContainAllKeyRange { return false } - lastRegion = region - res = append(res, region) - return true - }) + } + + lastRegion = region + res = append(res, region) + return true + }) + if outputMustContainAllKeyRange && err != nil { + return nil, err } - return res + + if !(exceedLimit()) && len(keyRange.EndKey) > 0 && len(lastRegion.GetEndKey()) > 0 && + bytes.Compare(lastRegion.GetEndKey(), keyRange.EndKey) < 0 { + err = errs.ErrRegionNotAdjacent.FastGen( + "key range[%x, %x) found a hole region in the last, the last scanned region is [%x, %x), [%x, %x) is missing", + keyRange.StartKey, keyRange.EndKey, + lastRegion.GetStartKey(), lastRegion.GetEndKey(), + lastRegion.GetEndKey(), keyRange.EndKey) + log.Warn("scan regions failed", zap.Bool("outputMustContainAllKeyRange", + outputMustContainAllKeyRange), zap.Error(err)) + if outputMustContainAllKeyRange { + return nil, err + } + } + + return res, nil } // ScanRegionWithIterator scans from the first region containing or behind start key, diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 816bba4efae..845944780e4 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/mock/mockid" ) @@ -1141,3 +1142,62 @@ func TestCntRefAfterResetRegionCache(t *testing.T) { regions.CheckAndPutRegion(region) re.Equal(int32(2), region.GetRef()) } + +func TestScanRegion(t *testing.T) { + var ( + re = require.New(t) + tree = newRegionTree() + needContainAllRanges = true + regions []*RegionInfo + err error + ) + scanError := func(startKey, endKey []byte, limit int) { + regions, err = scanRegion(tree, &KeyRange{StartKey: startKey, EndKey: endKey}, limit, needContainAllRanges) + re.Error(err) + } + scanNoError := func(startKey, endKey []byte, limit int) []*RegionInfo { + regions, err = scanRegion(tree, &KeyRange{StartKey: startKey, EndKey: endKey}, limit, needContainAllRanges) + re.NoError(err) + return regions + } + // region1 + // [a, b) + updateNewItem(tree, NewTestRegionInfo(1, 1, []byte("a"), []byte("b"))) + re.Len(scanNoError([]byte("a"), []byte("b"), 0), 1) + scanError([]byte("a"), []byte("c"), 0) + re.Len(scanNoError([]byte("a"), []byte("c"), 1), 1) + + // region1 | region2 + // [a, b) | [b, c) + updateNewItem(tree, NewTestRegionInfo(2, 1, []byte("b"), []byte("c"))) + re.Len(scanNoError([]byte("a"), []byte("c"), 0), 2) + re.Len(scanNoError([]byte("a"), []byte("c"), 1), 1) + + // region1 | region2 | region3 + // [a, b) | [b, c) | [d, f) + updateNewItem(tree, NewTestRegionInfo(3, 1, []byte("d"), []byte("f"))) + scanError([]byte("a"), []byte("e"), 0) + scanError([]byte("c"), []byte("e"), 0) + + // region1 | region2 | region3 | region4 + // [a, b) | [b, c) | [d, f) | [f, i) + updateNewItem(tree, NewTestRegionInfo(4, 1, []byte("f"), []byte("i"))) + scanError([]byte("c"), []byte("g"), 0) + re.Len(scanNoError([]byte("g"), []byte("h"), 0), 1) + re.Equal(uint64(4), regions[0].GetID()) + // test error type + scanError([]byte(string('a'-1)), []byte("g"), 0) + re.True(errs.ErrRegionNotAdjacent.Equal(err)) + + // region1 | region2 | region3 | region4 | region5 | region6 + // [a, b) | [b, c) | [d, f) | [f, i) | [j, k) | [l, +∞)] + updateNewItem(tree, NewTestRegionInfo(6, 1, []byte("l"), nil)) + // test boundless + re.Len(scanNoError([]byte("m"), nil, 0), 1) + + // ********** needContainAllRanges = false ********** + // Tests that previously reported errors will no longer report errors. + needContainAllRanges = false + re.Len(scanNoError([]byte("a"), []byte("e"), 0), 3) + re.Len(scanNoError([]byte("c"), []byte("e"), 0), 1) +} diff --git a/pkg/response/region.go b/pkg/response/region.go index 153294c2861..6db7f135ad8 100644 --- a/pkg/response/region.go +++ b/pkg/response/region.go @@ -115,17 +115,18 @@ type RegionInfo struct { RegionEpoch *metapb.RegionEpoch `json:"epoch,omitempty"` Peers []MetaPeer `json:"peers,omitempty"` - Leader MetaPeer `json:"leader,omitempty"` - DownPeers []PDPeerStats `json:"down_peers,omitempty"` - PendingPeers []MetaPeer `json:"pending_peers,omitempty"` - CPUUsage uint64 `json:"cpu_usage"` - WrittenBytes uint64 `json:"written_bytes"` - ReadBytes uint64 `json:"read_bytes"` - WrittenKeys uint64 `json:"written_keys"` - ReadKeys uint64 `json:"read_keys"` - ApproximateSize int64 `json:"approximate_size"` - ApproximateKeys int64 `json:"approximate_keys"` - Buckets []string `json:"buckets,omitempty"` + Leader MetaPeer `json:"leader,omitempty"` + DownPeers []PDPeerStats `json:"down_peers,omitempty"` + PendingPeers []MetaPeer `json:"pending_peers,omitempty"` + CPUUsage uint64 `json:"cpu_usage"` + WrittenBytes uint64 `json:"written_bytes"` + ReadBytes uint64 `json:"read_bytes"` + WrittenKeys uint64 `json:"written_keys"` + ReadKeys uint64 `json:"read_keys"` + ApproximateSize int64 `json:"approximate_size"` + ApproximateKeys int64 `json:"approximate_keys"` + ApproximateKvSize int64 `json:"approximate_kv_size"` + Buckets []string `json:"buckets,omitempty"` ReplicationStatus *ReplicationStatus `json:"replication_status,omitempty"` } @@ -173,6 +174,7 @@ func InitRegion(r *core.RegionInfo, s *RegionInfo) *RegionInfo { s.ReadKeys = r.GetKeysRead() s.ApproximateSize = r.GetApproximateSize() s.ApproximateKeys = r.GetApproximateKeys() + s.ApproximateKvSize = r.GetApproximateKvSize() s.ReplicationStatus = fromPBReplicationStatus(r.GetReplicationStatus()) s.Buckets = nil diff --git a/pkg/schedule/checker/merge_checker.go b/pkg/schedule/checker/merge_checker.go index d7a28ad0ff8..1a7548a1084 100644 --- a/pkg/schedule/checker/merge_checker.go +++ b/pkg/schedule/checker/merge_checker.go @@ -46,6 +46,8 @@ const ( mergeOptionValueDeny = "deny" ) +var gcInterval = time.Minute + // MergeChecker ensures region to merge with adjacent region when size is small type MergeChecker struct { PauseController @@ -57,7 +59,7 @@ type MergeChecker struct { // NewMergeChecker creates a merge checker. func NewMergeChecker(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider) *MergeChecker { - splitCache := cache.NewIDTTL(ctx, time.Minute, conf.GetSplitMergeInterval()) + splitCache := cache.NewIDTTL(ctx, gcInterval, conf.GetSplitMergeInterval()) return &MergeChecker{ cluster: cluster, conf: conf, @@ -88,13 +90,16 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { return nil } + // update the split cache. + // It must be called before the following merge checker logic. + m.splitCache.UpdateTTL(m.conf.GetSplitMergeInterval()) + expireTime := m.startTime.Add(m.conf.GetSplitMergeInterval()) if time.Now().Before(expireTime) { mergeCheckerRecentlyStartCounter.Inc() return nil } - m.splitCache.UpdateTTL(m.conf.GetSplitMergeInterval()) if m.splitCache.Exists(region.GetID()) { mergeCheckerRecentlySplitCounter.Inc() return nil diff --git a/pkg/schedule/checker/merge_checker_test.go b/pkg/schedule/checker/merge_checker_test.go index 06e8d468de3..61b8cd579df 100644 --- a/pkg/schedule/checker/merge_checker_test.go +++ b/pkg/schedule/checker/merge_checker_test.go @@ -56,6 +56,7 @@ func TestMergeCheckerTestSuite(t *testing.T) { func (suite *mergeCheckerTestSuite) SetupTest() { cfg := mockconfig.NewTestOptions() + gcInterval = 100 * time.Millisecond suite.ctx, suite.cancel = context.WithCancel(context.Background()) suite.cluster = mockcluster.NewCluster(suite.ctx, cfg) suite.cluster.SetMaxMergeRegionSize(2) @@ -84,6 +85,7 @@ func (suite *mergeCheckerTestSuite) SetupTest() { } func (suite *mergeCheckerTestSuite) TearDownTest() { + gcInterval = time.Minute suite.cancel() } @@ -234,6 +236,7 @@ func (suite *mergeCheckerTestSuite) TestBasic() { ops = suite.mc.Check(suite.regions[3]) re.Nil(ops) + // issue #4616 suite.cluster.SetSplitMergeInterval(500 * time.Millisecond) ops = suite.mc.Check(suite.regions[2]) re.Nil(ops) @@ -245,6 +248,19 @@ func (suite *mergeCheckerTestSuite) TestBasic() { re.NotNil(ops) ops = suite.mc.Check(suite.regions[3]) re.NotNil(ops) + + // issue #8405 + suite.mc.startTime = time.Now() + suite.cluster.SetSplitMergeInterval(time.Second) + suite.cluster.SetSplitMergeInterval(time.Hour) + suite.mc.RecordRegionSplit([]uint64{suite.regions[2].GetID()}) + suite.cluster.SetSplitMergeInterval(time.Second) + suite.mc.Check(suite.regions[2]) // trigger the config update + time.Sleep(time.Second) // wait for the cache to gc + ops = suite.mc.Check(suite.regions[2]) + re.NotNil(ops) + ops = suite.mc.Check(suite.regions[3]) + re.NotNil(ops) } func (suite *mergeCheckerTestSuite) TestMatchPeers() { diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index c4b9cd6ab5e..80755fbdbe5 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -55,17 +55,6 @@ const ( transferOut = "transfer-out" ) -var ( - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - balanceLeaderScheduleCounter = schedulerCounter.WithLabelValues(BalanceLeaderName, "schedule") - balanceLeaderNoLeaderRegionCounter = schedulerCounter.WithLabelValues(BalanceLeaderName, "no-leader-region") - balanceLeaderRegionHotCounter = schedulerCounter.WithLabelValues(BalanceLeaderName, "region-hot") - balanceLeaderNoTargetStoreCounter = schedulerCounter.WithLabelValues(BalanceLeaderName, "no-target-store") - balanceLeaderNoFollowerRegionCounter = schedulerCounter.WithLabelValues(BalanceLeaderName, "no-follower-region") - balanceLeaderSkipCounter = schedulerCounter.WithLabelValues(BalanceLeaderName, "skip") - balanceLeaderNewOpCounter = schedulerCounter.WithLabelValues(BalanceLeaderName, "new-operator") -) - type balanceLeaderSchedulerConfig struct { syncutil.RWMutex storage endpoint.ConfigStorage diff --git a/pkg/schedule/schedulers/balance_region.go b/pkg/schedule/schedulers/balance_region.go index bfc1a236481..488b7635b77 100644 --- a/pkg/schedule/schedulers/balance_region.go +++ b/pkg/schedule/schedulers/balance_region.go @@ -36,18 +36,6 @@ const ( BalanceRegionType = "balance-region" ) -var ( - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - balanceRegionScheduleCounter = schedulerCounter.WithLabelValues(BalanceRegionName, "schedule") - balanceRegionNoRegionCounter = schedulerCounter.WithLabelValues(BalanceRegionName, "no-region") - balanceRegionHotCounter = schedulerCounter.WithLabelValues(BalanceRegionName, "region-hot") - balanceRegionNoLeaderCounter = schedulerCounter.WithLabelValues(BalanceRegionName, "no-leader") - balanceRegionNewOpCounter = schedulerCounter.WithLabelValues(BalanceRegionName, "new-operator") - balanceRegionSkipCounter = schedulerCounter.WithLabelValues(BalanceRegionName, "skip") - balanceRegionCreateOpFailCounter = schedulerCounter.WithLabelValues(BalanceRegionName, "create-operator-fail") - balanceRegionNoReplacementCounter = schedulerCounter.WithLabelValues(BalanceRegionName, "no-replacement") -) - type balanceRegionSchedulerConfig struct { Name string `json:"name"` Ranges []core.KeyRange `json:"ranges"` diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 8f56643f384..2adcfbe7e48 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -47,19 +47,12 @@ const ( lastStoreDeleteInfo = "The last store has been deleted" ) -var ( - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - evictLeaderCounter = schedulerCounter.WithLabelValues(EvictLeaderName, "schedule") - evictLeaderNoLeaderCounter = schedulerCounter.WithLabelValues(EvictLeaderName, "no-leader") - evictLeaderPickUnhealthyCounter = schedulerCounter.WithLabelValues(EvictLeaderName, "pick-unhealthy-region") - evictLeaderNoTargetStoreCounter = schedulerCounter.WithLabelValues(EvictLeaderName, "no-target-store") - evictLeaderNewOperatorCounter = schedulerCounter.WithLabelValues(EvictLeaderName, "new-operator") -) - type evictLeaderSchedulerConfig struct { syncutil.RWMutex storage endpoint.ConfigStorage StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"` + // Batch is used to generate multiple operators by one scheduling + Batch int `json:"batch"` cluster *core.BasicCluster removeSchedulerCb func(string) error } @@ -74,23 +67,10 @@ func (conf *evictLeaderSchedulerConfig) getStores() []uint64 { return stores } -func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error { - if len(args) != 1 { - return errs.ErrSchedulerConfig.FastGenByArgs("id") - } - - id, err := strconv.ParseUint(args[0], 10, 64) - if err != nil { - return errs.ErrStrconvParseUint.Wrap(err) - } - ranges, err := getKeyRanges(args[1:]) - if err != nil { - return err - } - conf.Lock() - defer conf.Unlock() - conf.StoreIDWithRanges[id] = ranges - return nil +func (conf *evictLeaderSchedulerConfig) getBatch() int { + conf.RLock() + defer conf.RUnlock() + return conf.Batch } func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig { @@ -102,13 +82,12 @@ func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig { } return &evictLeaderSchedulerConfig{ StoreIDWithRanges: storeIDWithRanges, + Batch: conf.Batch, } } -func (conf *evictLeaderSchedulerConfig) Persist() error { +func (conf *evictLeaderSchedulerConfig) persistLocked() error { name := conf.getSchedulerName() - conf.RLock() - defer conf.RUnlock() data, err := EncodeConfig(conf) failpoint.Inject("persistFail", func() { err = errors.New("fail to persist") @@ -134,29 +113,29 @@ func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string { return res } -func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last bool) { - conf.Lock() - defer conf.Unlock() +func (conf *evictLeaderSchedulerConfig) removeStoreLocked(id uint64) (bool, error) { _, exists := conf.StoreIDWithRanges[id] - succ, last = false, false if exists { delete(conf.StoreIDWithRanges, id) conf.cluster.ResumeLeaderTransfer(id) - succ = true - last = len(conf.StoreIDWithRanges) == 0 + return len(conf.StoreIDWithRanges) == 0, nil } - return succ, last + return false, errs.ErrScheduleConfigNotExist.FastGenByArgs() } -func (conf *evictLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) { - conf.Lock() - defer conf.Unlock() +func (conf *evictLeaderSchedulerConfig) resetStoreLocked(id uint64, keyRange []core.KeyRange) { if err := conf.cluster.PauseLeaderTransfer(id); err != nil { log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err)) } conf.StoreIDWithRanges[id] = keyRange } +func (conf *evictLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) { + conf.Lock() + defer conf.Unlock() + conf.resetStoreLocked(id, keyRange) +} + func (conf *evictLeaderSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRange { conf.RLock() defer conf.RUnlock() @@ -166,6 +145,108 @@ func (conf *evictLeaderSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRa return nil } +func (conf *evictLeaderSchedulerConfig) encodeConfig() ([]byte, error) { + conf.RLock() + defer conf.RUnlock() + return EncodeConfig(conf) +} + +func (conf *evictLeaderSchedulerConfig) reloadConfig(name string) error { + conf.Lock() + defer conf.Unlock() + cfgData, err := conf.storage.LoadSchedulerConfig(name) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &evictLeaderSchedulerConfig{} + if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + pauseAndResumeLeaderTransfer(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) + conf.StoreIDWithRanges = newCfg.StoreIDWithRanges + conf.Batch = newCfg.Batch + return nil +} + +func (conf *evictLeaderSchedulerConfig) pauseLeaderTransfer(cluster sche.SchedulerCluster) error { + conf.RLock() + defer conf.RUnlock() + var res error + for id := range conf.StoreIDWithRanges { + if err := cluster.PauseLeaderTransfer(id); err != nil { + res = err + } + } + return res +} + +func (conf *evictLeaderSchedulerConfig) resumeLeaderTransfer(cluster sche.SchedulerCluster) { + conf.RLock() + defer conf.RUnlock() + for id := range conf.StoreIDWithRanges { + cluster.ResumeLeaderTransfer(id) + } +} + +func (conf *evictLeaderSchedulerConfig) pauseLeaderTransferIfStoreNotExist(id uint64) (bool, error) { + conf.RLock() + defer conf.RUnlock() + if _, exist := conf.StoreIDWithRanges[id]; !exist { + if err := conf.cluster.PauseLeaderTransfer(id); err != nil { + return exist, err + } + } + return true, nil +} + +func (conf *evictLeaderSchedulerConfig) update(id uint64, newRanges []core.KeyRange, batch int) error { + conf.Lock() + defer conf.Unlock() + if id != 0 { + conf.StoreIDWithRanges[id] = newRanges + } + conf.Batch = batch + err := conf.persistLocked() + if err != nil && id != 0 { + _, _ = conf.removeStoreLocked(id) + } + return err +} + +func (conf *evictLeaderSchedulerConfig) delete(id uint64) (any, error) { + conf.Lock() + var resp any + last, err := conf.removeStoreLocked(id) + if err != nil { + conf.Unlock() + return resp, err + } + + keyRanges := conf.StoreIDWithRanges[id] + err = conf.persistLocked() + if err != nil { + conf.resetStoreLocked(id, keyRanges) + conf.Unlock() + return resp, err + } + if !last { + conf.Unlock() + return resp, nil + } + conf.Unlock() + if err := conf.removeSchedulerCb(EvictLeaderName); err != nil { + if !errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) { + conf.resetStore(id, keyRanges) + } + return resp, err + } + resp = lastStoreDeleteInfo + return resp, nil +} + type evictLeaderScheduler struct { *BaseScheduler conf *evictLeaderSchedulerConfig @@ -202,48 +283,19 @@ func (*evictLeaderScheduler) GetType() string { } func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) { - s.conf.RLock() - defer s.conf.RUnlock() - return EncodeConfig(s.conf) + return s.conf.encodeConfig() } func (s *evictLeaderScheduler) ReloadConfig() error { - s.conf.Lock() - defer s.conf.Unlock() - cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } - newCfg := &evictLeaderSchedulerConfig{} - if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { - return err - } - pauseAndResumeLeaderTransfer(s.conf.cluster, s.conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) - s.conf.StoreIDWithRanges = newCfg.StoreIDWithRanges - return nil + return s.conf.reloadConfig(s.GetName()) } func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { - s.conf.RLock() - defer s.conf.RUnlock() - var res error - for id := range s.conf.StoreIDWithRanges { - if err := cluster.PauseLeaderTransfer(id); err != nil { - res = err - } - } - return res + return s.conf.pauseLeaderTransfer(cluster) } func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { - s.conf.RLock() - defer s.conf.RUnlock() - for id := range s.conf.StoreIDWithRanges { - cluster.ResumeLeaderTransfer(id) - } + s.conf.resumeLeaderTransfer(cluster) } func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { @@ -256,7 +308,7 @@ func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { evictLeaderCounter.Inc() - return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize), nil + return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf), nil } func uniqueAppendOperator(dst []*operator.Operator, src ...*operator.Operator) []*operator.Operator { @@ -277,10 +329,12 @@ func uniqueAppendOperator(dst []*operator.Operator, src ...*operator.Operator) [ type evictLeaderStoresConf interface { getStores() []uint64 getKeyRangesByID(id uint64) []core.KeyRange + getBatch() int } -func scheduleEvictLeaderBatch(name, typ string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf, batchSize int) []*operator.Operator { +func scheduleEvictLeaderBatch(name, typ string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { var ops []*operator.Operator + batchSize := conf.getBatch() for i := 0; i < batchSize; i++ { once := scheduleEvictLeaderOnce(name, typ, cluster, conf) // no more regions @@ -363,39 +417,50 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { return } - var args []string - var exists bool - var id uint64 - idFloat, ok := input["store_id"].(float64) - if ok { + var ( + exist bool + err error + id uint64 + newRanges []core.KeyRange + ) + idFloat, inputHasStoreID := input["store_id"].(float64) + if inputHasStoreID { id = (uint64)(idFloat) - handler.config.RLock() - if _, exists = handler.config.StoreIDWithRanges[id]; !exists { - if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil { - handler.config.RUnlock() - handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } + exist, err = handler.config.pauseLeaderTransferIfStoreNotExist(id) + if err != nil { + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return } - handler.config.RUnlock() - args = append(args, strconv.FormatUint(id, 10)) + } + + batch := handler.config.getBatch() + batchFloat, ok := input["batch"].(float64) + if ok { + if batchFloat < 1 || batchFloat > 10 { + handler.rd.JSON(w, http.StatusBadRequest, "batch is invalid, it should be in [1, 10]") + return + } + batch = (int)(batchFloat) } ranges, ok := (input["ranges"]).([]string) if ok { - args = append(args, ranges...) - } else if exists { - args = append(args, handler.config.getRanges(id)...) + if !inputHasStoreID { + handler.rd.JSON(w, http.StatusInternalServerError, errs.ErrSchedulerConfig.FastGenByArgs("id")) + return + } + } else if exist { + ranges = handler.config.getRanges(id) } - err := handler.config.BuildWithArgs(args) + newRanges, err = getKeyRanges(ranges) if err != nil { - handler.rd.JSON(w, http.StatusBadRequest, err.Error()) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - err = handler.config.Persist() + + err = handler.config.update(id, newRanges, batch) if err != nil { - handler.config.removeStore(id) handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } @@ -415,33 +480,17 @@ func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R return } - var resp any - keyRanges := handler.config.getKeyRangesByID(id) - succ, last := handler.config.removeStore(id) - if succ { - err = handler.config.Persist() - if err != nil { - handler.config.resetStore(id, keyRanges) + resp, err := handler.config.delete(id) + if err != nil { + if errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) || errors.ErrorEqual(err, errs.ErrScheduleConfigNotExist.FastGenByArgs()) { + handler.rd.JSON(w, http.StatusNotFound, err.Error()) + } else { handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - if last { - if err := handler.config.removeSchedulerCb(EvictLeaderName); err != nil { - if errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) { - handler.rd.JSON(w, http.StatusNotFound, err.Error()) - } else { - handler.config.resetStore(id, keyRanges) - handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) - } - return - } - resp = lastStoreDeleteInfo } - handler.rd.JSON(w, http.StatusOK, resp) return } - handler.rd.JSON(w, http.StatusNotFound, errs.ErrScheduleConfigNotExist.FastGenByArgs().Error()) + handler.rd.JSON(w, http.StatusOK, resp) } func newEvictLeaderHandler(config *evictLeaderSchedulerConfig) http.Handler { diff --git a/pkg/schedule/schedulers/evict_leader_test.go b/pkg/schedule/schedulers/evict_leader_test.go index a91b1c3c937..63f7cde3b15 100644 --- a/pkg/schedule/schedulers/evict_leader_test.go +++ b/pkg/schedule/schedulers/evict_leader_test.go @@ -89,18 +89,43 @@ func TestConfigClone(t *testing.T) { emptyConf := &evictLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange)} con2 := emptyConf.Clone() - re.Empty(emptyConf.getKeyRangesByID(1)) - re.NoError(con2.BuildWithArgs([]string{"1"})) - re.NotEmpty(con2.getKeyRangesByID(1)) - re.Empty(emptyConf.getKeyRangesByID(1)) + re.Empty(con2.getKeyRangesByID(1)) + con2.StoreIDWithRanges[1], _ = getKeyRanges([]string{"a", "b", "c", "d"}) con3 := con2.Clone() - con3.StoreIDWithRanges[1], _ = getKeyRanges([]string{"a", "b", "c", "d"}) - re.Empty(emptyConf.getKeyRangesByID(1)) - re.NotEqual(len(con3.getRanges(1)), len(con2.getRanges(1))) + re.Equal(len(con3.getRanges(1)), len(con2.getRanges(1))) + con3.StoreIDWithRanges[1][0].StartKey = []byte("aaa") con4 := con3.Clone() re.True(bytes.Equal(con4.StoreIDWithRanges[1][0].StartKey, con3.StoreIDWithRanges[1][0].StartKey)) - con4.StoreIDWithRanges[1][0].StartKey = []byte("aaa") - re.False(bytes.Equal(con4.StoreIDWithRanges[1][0].StartKey, con3.StoreIDWithRanges[1][0].StartKey)) + + con4.Batch = 10 + con5 := con4.Clone() + re.Equal(con5.getBatch(), con4.getBatch()) +} + +func TestBatchEvict(t *testing.T) { + re := require.New(t) + cancel, _, tc, oc := prepareSchedulersTest() + defer cancel() + + // Add stores 1, 2, 3 + tc.AddLeaderStore(1, 0) + tc.AddLeaderStore(2, 0) + tc.AddLeaderStore(3, 0) + // the random might be the same, so we add 1000 regions to make sure the batch is full + for i := 1; i <= 1000; i++ { + tc.AddLeaderRegion(uint64(i), 1, 2, 3) + } + tc.AddLeaderRegion(6, 2, 1, 3) + tc.AddLeaderRegion(7, 3, 1, 2) + + sl, err := CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"1"}), func(string) error { return nil }) + re.NoError(err) + re.True(sl.IsScheduleAllowed(tc)) + ops, _ := sl.Schedule(tc, false) + re.Len(ops, 3) + sl.(*evictLeaderScheduler).conf.Batch = 5 + ops, _ = sl.Schedule(tc, false) + re.Len(ops, 5) } diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index 9b13e292c87..c9f10fa610f 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -43,9 +43,6 @@ const ( slowStoreRecoverThreshold = 1 ) -// WithLabelValues is a heavy operation, define variable to avoid call it every time. -var evictSlowStoreCounter = schedulerCounter.WithLabelValues(EvictSlowStoreName, "schedule") - type evictSlowStoreSchedulerConfig struct { syncutil.RWMutex cluster *core.BasicCluster @@ -99,6 +96,10 @@ func (conf *evictSlowStoreSchedulerConfig) getKeyRangesByID(id uint64) []core.Ke return []core.KeyRange{core.NewKeyRange("", "")} } +func (*evictSlowStoreSchedulerConfig) getBatch() int { + return EvictLeaderBatchSize +} + func (conf *evictSlowStoreSchedulerConfig) evictStore() uint64 { if len(conf.getStores()) == 0 { return 0 @@ -266,7 +267,7 @@ func (s *evictSlowStoreScheduler) cleanupEvictLeader(cluster sche.SchedulerClust } func (s *evictSlowStoreScheduler) schedulerEvictLeader(cluster sche.SchedulerCluster) []*operator.Operator { - return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) + return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf) } func (s *evictSlowStoreScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index da3dbc24e95..dc2266b5540 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -110,6 +110,10 @@ func (conf *evictSlowTrendSchedulerConfig) getKeyRangesByID(id uint64) []core.Ke return []core.KeyRange{core.NewKeyRange("", "")} } +func (*evictSlowTrendSchedulerConfig) getBatch() int { + return EvictLeaderBatchSize +} + func (conf *evictSlowTrendSchedulerConfig) hasEvictedStores() bool { conf.RLock() defer conf.RUnlock() @@ -370,7 +374,7 @@ func (s *evictSlowTrendScheduler) scheduleEvictLeader(cluster sche.SchedulerClus return nil } storeSlowTrendEvictedStatusGauge.WithLabelValues(store.GetAddress(), strconv.FormatUint(store.GetID(), 10)).Set(1) - return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) + return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf) } func (s *evictSlowTrendScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { diff --git a/pkg/schedule/schedulers/grant_hot_region.go b/pkg/schedule/schedulers/grant_hot_region.go index 4ab82be4cbe..a19a4e1bf4b 100644 --- a/pkg/schedule/schedulers/grant_hot_region.go +++ b/pkg/schedule/schedulers/grant_hot_region.go @@ -47,12 +47,6 @@ const ( GrantHotRegionType = "grant-hot-region" ) -var ( - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - grantHotRegionCounter = schedulerCounter.WithLabelValues(GrantHotRegionName, "schedule") - grantHotRegionSkipCounter = schedulerCounter.WithLabelValues(GrantHotRegionName, "skip") -) - type grantHotRegionSchedulerConfig struct { syncutil.RWMutex storage endpoint.ConfigStorage diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index 4752ef3e61d..21900fac85d 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -42,13 +42,6 @@ const ( GrantLeaderType = "grant-leader" ) -var ( - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - grantLeaderCounter = schedulerCounter.WithLabelValues(GrantLeaderName, "schedule") - grantLeaderNoFollowerCounter = schedulerCounter.WithLabelValues(GrantLeaderName, "no-follower") - grantLeaderNewOperatorCounter = schedulerCounter.WithLabelValues(GrantLeaderName, "new-operator") -) - type grantLeaderSchedulerConfig struct { syncutil.RWMutex storage endpoint.ConfigStorage diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index d20473fb010..d7e83fd4fb2 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -71,44 +71,6 @@ var ( statisticsInterval = time.Second ) -var ( - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - hotSchedulerCounter = schedulerCounter.WithLabelValues(HotRegionName, "schedule") - hotSchedulerSkipCounter = schedulerCounter.WithLabelValues(HotRegionName, "skip") - hotSchedulerSearchRevertRegionsCounter = schedulerCounter.WithLabelValues(HotRegionName, "search_revert_regions") - hotSchedulerNotSameEngineCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_same_engine") - hotSchedulerNoRegionCounter = schedulerCounter.WithLabelValues(HotRegionName, "no_region") - hotSchedulerUnhealthyReplicaCounter = schedulerCounter.WithLabelValues(HotRegionName, "unhealthy_replica") - hotSchedulerAbnormalReplicaCounter = schedulerCounter.WithLabelValues(HotRegionName, "abnormal_replica") - hotSchedulerCreateOperatorFailedCounter = schedulerCounter.WithLabelValues(HotRegionName, "create_operator_failed") - hotSchedulerNewOperatorCounter = schedulerCounter.WithLabelValues(HotRegionName, "new_operator") - hotSchedulerSnapshotSenderLimitCounter = schedulerCounter.WithLabelValues(HotRegionName, "snapshot_sender_limit") - - // counter related with the split region - hotSchedulerNotFoundSplitKeysCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_found_split_keys") - hotSchedulerRegionBucketsNotHotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_not_hot") - hotSchedulerOnlyOneBucketsHotCounter = schedulerCounter.WithLabelValues(HotRegionName, "only_one_buckets_hot") - hotSchedulerHotBucketNotValidCounter = schedulerCounter.WithLabelValues(HotRegionName, "hot_buckets_not_valid") - hotSchedulerRegionBucketsSingleHotSpotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_single_hot_spot") - hotSchedulerSplitSuccessCounter = schedulerCounter.WithLabelValues(HotRegionName, "split_success") - hotSchedulerNeedSplitBeforeScheduleCounter = schedulerCounter.WithLabelValues(HotRegionName, "need_split_before_move_peer") - hotSchedulerRegionTooHotNeedSplitCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_is_too_hot_need_split") - - hotSchedulerMoveLeaderCounter = schedulerCounter.WithLabelValues(HotRegionName, moveLeader.String()) - hotSchedulerMovePeerCounter = schedulerCounter.WithLabelValues(HotRegionName, movePeer.String()) - hotSchedulerTransferLeaderCounter = schedulerCounter.WithLabelValues(HotRegionName, transferLeader.String()) - - readSkipAllDimUniformStoreCounter = schedulerCounter.WithLabelValues(HotRegionName, "read-skip-all-dim-uniform-store") - writeSkipAllDimUniformStoreCounter = schedulerCounter.WithLabelValues(HotRegionName, "write-skip-all-dim-uniform-store") - readSkipByteDimUniformStoreCounter = schedulerCounter.WithLabelValues(HotRegionName, "read-skip-byte-uniform-store") - writeSkipByteDimUniformStoreCounter = schedulerCounter.WithLabelValues(HotRegionName, "write-skip-byte-uniform-store") - readSkipKeyDimUniformStoreCounter = schedulerCounter.WithLabelValues(HotRegionName, "read-skip-key-uniform-store") - writeSkipKeyDimUniformStoreCounter = schedulerCounter.WithLabelValues(HotRegionName, "write-skip-key-uniform-store") - readSkipQueryDimUniformStoreCounter = schedulerCounter.WithLabelValues(HotRegionName, "read-skip-query-uniform-store") - writeSkipQueryDimUniformStoreCounter = schedulerCounter.WithLabelValues(HotRegionName, "write-skip-query-uniform-store") - pendingOpFailsStoreCounter = schedulerCounter.WithLabelValues(HotRegionName, "pending-op-fails") -) - type baseHotScheduler struct { *BaseScheduler // stLoadInfos contain store statistics information by resource type. diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 6bca686404d..777c8b3d625 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -137,6 +137,7 @@ func schedulersRegister() { return err } conf.StoreIDWithRanges[id] = ranges + conf.Batch = EvictLeaderBatchSize return nil } }) diff --git a/pkg/schedule/schedulers/label.go b/pkg/schedule/schedulers/label.go index 24875e3e26a..6b7a98f8d02 100644 --- a/pkg/schedule/schedulers/label.go +++ b/pkg/schedule/schedulers/label.go @@ -34,15 +34,6 @@ const ( LabelType = "label" ) -var ( - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - labelCounter = schedulerCounter.WithLabelValues(LabelName, "schedule") - labelNewOperatorCounter = schedulerCounter.WithLabelValues(LabelName, "new-operator") - labelNoTargetCounter = schedulerCounter.WithLabelValues(LabelName, "no-target") - labelSkipCounter = schedulerCounter.WithLabelValues(LabelName, "skip") - labelNoRegionCounter = schedulerCounter.WithLabelValues(LabelName, "no-region") -) - type labelSchedulerConfig struct { Name string `json:"name"` Ranges []core.KeyRange `json:"ranges"` diff --git a/pkg/schedule/schedulers/metrics.go b/pkg/schedule/schedulers/metrics.go index 34e4606a7ce..f8bd2b4d686 100644 --- a/pkg/schedule/schedulers/metrics.go +++ b/pkg/schedule/schedulers/metrics.go @@ -14,7 +14,10 @@ package schedulers -import "github.com/prometheus/client_golang/prometheus" +import ( + "github.com/prometheus/client_golang/prometheus" + types "github.com/tikv/pd/pkg/schedule/type" +) var ( schedulerStatusGauge = prometheus.NewGaugeVec( @@ -161,3 +164,180 @@ func init() { prometheus.MustRegister(storeSlowTrendMiscGauge) prometheus.MustRegister(HotPendingSum) } + +func balanceLeaderCounterWithEvent(event string) prometheus.Counter { + return schedulerCounter.WithLabelValues(types.BalanceLeaderScheduler.String(), event) +} + +func balanceRegionCounterWithEvent(event string) prometheus.Counter { + return schedulerCounter.WithLabelValues(types.BalanceRegionScheduler.String(), event) +} + +func evictLeaderCounterWithEvent(event string) prometheus.Counter { + return schedulerCounter.WithLabelValues(types.EvictLeaderScheduler.String(), event) +} + +func grantHotRegionCounterWithEvent(event string) prometheus.Counter { + return schedulerCounter.WithLabelValues(types.GrantHotRegionScheduler.String(), event) +} + +func grantLeaderCounterWithEvent(event string) prometheus.Counter { + return schedulerCounter.WithLabelValues(types.GrantHotRegionScheduler.String(), event) +} + +func hotRegionCounterWithEvent(event string) prometheus.Counter { + return schedulerCounter.WithLabelValues(types.HotRegionScheduler.String(), event) +} + +func labelCounterWithEvent(event string) prometheus.Counter { + return schedulerCounter.WithLabelValues(types.LabelScheduler.String(), event) +} + +func randomMergeCounterWithEvent(event string) prometheus.Counter { + return schedulerCounter.WithLabelValues(types.RandomMergeScheduler.String(), event) +} + +func scatterRangeCounterWithEvent(event string) prometheus.Counter { + return schedulerCounter.WithLabelValues(types.ScatterRangeScheduler.String(), event) +} + +func shuffleHotRegionCounterWithEvent(event string) prometheus.Counter { + return schedulerCounter.WithLabelValues(types.ShuffleHotRegionScheduler.String(), event) +} + +func shuffleLeaderCounterWithEvent(event string) prometheus.Counter { + return schedulerCounter.WithLabelValues(types.ShuffleLeaderScheduler.String(), event) +} + +func shuffleRegionCounterWithEvent(event string) prometheus.Counter { + return schedulerCounter.WithLabelValues(types.ShuffleRegionScheduler.String(), event) +} + +func splitBucketCounterWithEvent(event string) prometheus.Counter { + return schedulerCounter.WithLabelValues(types.SplitBucketScheduler.String(), event) +} + +func transferWitnessLeaderCounterWithEvent(event string) prometheus.Counter { + return schedulerCounter.WithLabelValues(types.TransferWitnessLeaderScheduler.String(), event) +} + +// WithLabelValues is a heavy operation, define variable to avoid call it every time. +var ( + balanceLeaderScheduleCounter = balanceLeaderCounterWithEvent("schedule") + balanceLeaderNoLeaderRegionCounter = balanceLeaderCounterWithEvent("no-leader-region") + balanceLeaderRegionHotCounter = balanceLeaderCounterWithEvent("region-hot") + balanceLeaderNoTargetStoreCounter = balanceLeaderCounterWithEvent("no-target-store") + balanceLeaderNoFollowerRegionCounter = balanceLeaderCounterWithEvent("no-follower-region") + balanceLeaderSkipCounter = balanceLeaderCounterWithEvent("skip") + balanceLeaderNewOpCounter = balanceLeaderCounterWithEvent("new-operator") + + balanceRegionScheduleCounter = balanceRegionCounterWithEvent("schedule") + balanceRegionNoRegionCounter = balanceRegionCounterWithEvent("no-region") + balanceRegionHotCounter = balanceRegionCounterWithEvent("region-hot") + balanceRegionNoLeaderCounter = balanceRegionCounterWithEvent("no-leader") + balanceRegionNewOpCounter = balanceRegionCounterWithEvent("new-operator") + balanceRegionSkipCounter = balanceRegionCounterWithEvent("skip") + balanceRegionCreateOpFailCounter = balanceRegionCounterWithEvent("create-operator-fail") + balanceRegionNoReplacementCounter = balanceRegionCounterWithEvent("no-replacement") + + evictLeaderCounter = evictLeaderCounterWithEvent("schedule") + evictLeaderNoLeaderCounter = evictLeaderCounterWithEvent("no-leader") + evictLeaderPickUnhealthyCounter = evictLeaderCounterWithEvent("pick-unhealthy-region") + evictLeaderNoTargetStoreCounter = evictLeaderCounterWithEvent("no-target-store") + evictLeaderNewOperatorCounter = evictLeaderCounterWithEvent("new-operator") + + evictSlowStoreCounter = schedulerCounter.WithLabelValues(types.EvictSlowStoreScheduler.String(), "schedule") + + grantHotRegionCounter = grantHotRegionCounterWithEvent("schedule") + grantHotRegionSkipCounter = grantHotRegionCounterWithEvent("skip") + + grantLeaderCounter = grantLeaderCounterWithEvent("schedule") + grantLeaderNoFollowerCounter = grantLeaderCounterWithEvent("no-follower") + grantLeaderNewOperatorCounter = grantLeaderCounterWithEvent("new-operator") + + // counter related with the hot region + hotSchedulerCounter = hotRegionCounterWithEvent("schedule") + hotSchedulerSkipCounter = hotRegionCounterWithEvent("skip") + hotSchedulerSearchRevertRegionsCounter = hotRegionCounterWithEvent("search_revert_regions") + hotSchedulerNotSameEngineCounter = hotRegionCounterWithEvent("not_same_engine") + hotSchedulerNoRegionCounter = hotRegionCounterWithEvent("no_region") + hotSchedulerUnhealthyReplicaCounter = hotRegionCounterWithEvent("unhealthy_replica") + hotSchedulerAbnormalReplicaCounter = hotRegionCounterWithEvent("abnormal_replica") + hotSchedulerCreateOperatorFailedCounter = hotRegionCounterWithEvent("create_operator_failed") + hotSchedulerNewOperatorCounter = hotRegionCounterWithEvent("new_operator") + hotSchedulerSnapshotSenderLimitCounter = hotRegionCounterWithEvent("snapshot_sender_limit") + // hot region counter related with the split region + hotSchedulerNotFoundSplitKeysCounter = hotRegionCounterWithEvent("not_found_split_keys") + hotSchedulerRegionBucketsNotHotCounter = hotRegionCounterWithEvent("region_buckets_not_hot") + hotSchedulerOnlyOneBucketsHotCounter = hotRegionCounterWithEvent("only_one_buckets_hot") + hotSchedulerHotBucketNotValidCounter = hotRegionCounterWithEvent("hot_buckets_not_valid") + hotSchedulerRegionBucketsSingleHotSpotCounter = hotRegionCounterWithEvent("region_buckets_single_hot_spot") + hotSchedulerSplitSuccessCounter = hotRegionCounterWithEvent("split_success") + hotSchedulerNeedSplitBeforeScheduleCounter = hotRegionCounterWithEvent("need_split_before_move_peer") + hotSchedulerRegionTooHotNeedSplitCounter = hotRegionCounterWithEvent("region_is_too_hot_need_split") + // hot region counter related with the move peer + hotSchedulerMoveLeaderCounter = hotRegionCounterWithEvent(moveLeader.String()) + hotSchedulerMovePeerCounter = hotRegionCounterWithEvent(movePeer.String()) + hotSchedulerTransferLeaderCounter = hotRegionCounterWithEvent(transferLeader.String()) + // hot region counter related with reading and writing + readSkipAllDimUniformStoreCounter = hotRegionCounterWithEvent("read-skip-all-dim-uniform-store") + writeSkipAllDimUniformStoreCounter = hotRegionCounterWithEvent("write-skip-all-dim-uniform-store") + readSkipByteDimUniformStoreCounter = hotRegionCounterWithEvent("read-skip-byte-uniform-store") + writeSkipByteDimUniformStoreCounter = hotRegionCounterWithEvent("write-skip-byte-uniform-store") + readSkipKeyDimUniformStoreCounter = hotRegionCounterWithEvent("read-skip-key-uniform-store") + writeSkipKeyDimUniformStoreCounter = hotRegionCounterWithEvent("write-skip-key-uniform-store") + readSkipQueryDimUniformStoreCounter = hotRegionCounterWithEvent("read-skip-query-uniform-store") + writeSkipQueryDimUniformStoreCounter = hotRegionCounterWithEvent("write-skip-query-uniform-store") + pendingOpFailsStoreCounter = hotRegionCounterWithEvent("pending-op-fails") + + labelCounter = labelCounterWithEvent("schedule") + labelNewOperatorCounter = labelCounterWithEvent("new-operator") + labelNoTargetCounter = labelCounterWithEvent("no-target") + labelSkipCounter = labelCounterWithEvent("skip") + labelNoRegionCounter = labelCounterWithEvent("no-region") + + randomMergeCounter = randomMergeCounterWithEvent("schedule") + randomMergeNewOperatorCounter = randomMergeCounterWithEvent("new-operator") + randomMergeNoSourceStoreCounter = randomMergeCounterWithEvent("no-source-store") + randomMergeNoRegionCounter = randomMergeCounterWithEvent("no-region") + randomMergeNoTargetStoreCounter = randomMergeCounterWithEvent("no-target-store") + randomMergeNotAllowedCounter = randomMergeCounterWithEvent("not-allowed") + + scatterRangeCounter = scatterRangeCounterWithEvent("schedule") + scatterRangeNewOperatorCounter = scatterRangeCounterWithEvent("new-operator") + scatterRangeNewLeaderOperatorCounter = scatterRangeCounterWithEvent("new-leader-operator") + scatterRangeNewRegionOperatorCounter = scatterRangeCounterWithEvent("new-region-operator") + scatterRangeNoNeedBalanceRegionCounter = scatterRangeCounterWithEvent("no-need-balance-region") + scatterRangeNoNeedBalanceLeaderCounter = scatterRangeCounterWithEvent("no-need-balance-leader") + + shuffleHotRegionCounter = shuffleHotRegionCounterWithEvent("schedule") + shuffleHotRegionNewOperatorCounter = shuffleHotRegionCounterWithEvent("new-operator") + shuffleHotRegionSkipCounter = shuffleHotRegionCounterWithEvent("skip") + + shuffleLeaderCounter = shuffleLeaderCounterWithEvent("schedule") + shuffleLeaderNewOperatorCounter = shuffleLeaderCounterWithEvent("new-operator") + shuffleLeaderNoTargetStoreCounter = shuffleLeaderCounterWithEvent("no-target-store") + shuffleLeaderNoFollowerCounter = shuffleLeaderCounterWithEvent("no-follower") + + shuffleRegionCounter = shuffleRegionCounterWithEvent("schedule") + shuffleRegionNewOperatorCounter = shuffleRegionCounterWithEvent("new-operator") + shuffleRegionNoRegionCounter = shuffleRegionCounterWithEvent("no-region") + shuffleRegionNoNewPeerCounter = shuffleRegionCounterWithEvent("no-new-peer") + shuffleRegionCreateOperatorFailCounter = shuffleRegionCounterWithEvent("create-operator-fail") + shuffleRegionNoSourceStoreCounter = shuffleRegionCounterWithEvent("no-source-store") + + splitBucketDisableCounter = splitBucketCounterWithEvent("bucket-disable") + splitBuckerSplitLimitCounter = splitBucketCounterWithEvent("split-limit") + splitBucketScheduleCounter = splitBucketCounterWithEvent("schedule") + splitBucketNoRegionCounter = splitBucketCounterWithEvent("no-region") + splitBucketRegionTooSmallCounter = splitBucketCounterWithEvent("region-too-small") + splitBucketOperatorExistCounter = splitBucketCounterWithEvent("operator-exist") + splitBucketKeyRangeNotMatchCounter = splitBucketCounterWithEvent("key-range-not-match") + splitBucketNoSplitKeysCounter = splitBucketCounterWithEvent("no-split-keys") + splitBucketCreateOperatorFailCounter = splitBucketCounterWithEvent("create-operator-fail") + splitBucketNewOperatorCounter = splitBucketCounterWithEvent("new-operator") + + transferWitnessLeaderCounter = transferWitnessLeaderCounterWithEvent("schedule") + transferWitnessLeaderNewOperatorCounter = transferWitnessLeaderCounterWithEvent("new-operator") + transferWitnessLeaderNoTargetStoreCounter = transferWitnessLeaderCounterWithEvent("no-target-store") +) diff --git a/pkg/schedule/schedulers/random_merge.go b/pkg/schedule/schedulers/random_merge.go index 7fec0bd9530..ff96afe03eb 100644 --- a/pkg/schedule/schedulers/random_merge.go +++ b/pkg/schedule/schedulers/random_merge.go @@ -35,16 +35,6 @@ const ( RandomMergeType = "random-merge" ) -var ( - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - randomMergeCounter = schedulerCounter.WithLabelValues(RandomMergeName, "schedule") - randomMergeNewOperatorCounter = schedulerCounter.WithLabelValues(RandomMergeName, "new-operator") - randomMergeNoSourceStoreCounter = schedulerCounter.WithLabelValues(RandomMergeName, "no-source-store") - randomMergeNoRegionCounter = schedulerCounter.WithLabelValues(RandomMergeName, "no-region") - randomMergeNoTargetStoreCounter = schedulerCounter.WithLabelValues(RandomMergeName, "no-target-store") - randomMergeNotAllowedCounter = schedulerCounter.WithLabelValues(RandomMergeName, "not-allowed") -) - type randomMergeSchedulerConfig struct { Name string `json:"name"` Ranges []core.KeyRange `json:"ranges"` diff --git a/pkg/schedule/schedulers/scatter_range.go b/pkg/schedule/schedulers/scatter_range.go index ebee66dc207..17c67a154ab 100644 --- a/pkg/schedule/schedulers/scatter_range.go +++ b/pkg/schedule/schedulers/scatter_range.go @@ -38,16 +38,6 @@ const ( ScatterRangeName = "scatter-range" ) -var ( - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - scatterRangeCounter = schedulerCounter.WithLabelValues(ScatterRangeName, "schedule") - scatterRangeNewOperatorCounter = schedulerCounter.WithLabelValues(ScatterRangeName, "new-operator") - scatterRangeNewLeaderOperatorCounter = schedulerCounter.WithLabelValues(ScatterRangeName, "new-leader-operator") - scatterRangeNewRegionOperatorCounter = schedulerCounter.WithLabelValues(ScatterRangeName, "new-region-operator") - scatterRangeNoNeedBalanceRegionCounter = schedulerCounter.WithLabelValues(ScatterRangeName, "no-need-balance-region") - scatterRangeNoNeedBalanceLeaderCounter = schedulerCounter.WithLabelValues(ScatterRangeName, "no-need-balance-leader") -) - type scatterRangeSchedulerConfig struct { syncutil.RWMutex storage endpoint.ConfigStorage diff --git a/pkg/schedule/schedulers/scheduler_test.go b/pkg/schedule/schedulers/scheduler_test.go index 5a603515942..48040841c76 100644 --- a/pkg/schedule/schedulers/scheduler_test.go +++ b/pkg/schedule/schedulers/scheduler_test.go @@ -148,8 +148,8 @@ func TestRemoveRejectLeader(t *testing.T) { el, err := CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"1"}), func(string) error { return nil }) re.NoError(err) tc.DeleteStore(tc.GetStore(1)) - succ, _ := el.(*evictLeaderScheduler).conf.removeStore(1) - re.True(succ) + _, err = el.(*evictLeaderScheduler).conf.removeStoreLocked(1) + re.NoError(err) } func TestShuffleHotRegionScheduleBalance(t *testing.T) { diff --git a/pkg/schedule/schedulers/shuffle_hot_region.go b/pkg/schedule/schedulers/shuffle_hot_region.go index 4b5b5fd68bf..f4b566c56a4 100644 --- a/pkg/schedule/schedulers/shuffle_hot_region.go +++ b/pkg/schedule/schedulers/shuffle_hot_region.go @@ -41,13 +41,6 @@ const ( ShuffleHotRegionType = "shuffle-hot-region" ) -var ( - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - shuffleHotRegionCounter = schedulerCounter.WithLabelValues(ShuffleHotRegionName, "schedule") - shuffleHotRegionNewOperatorCounter = schedulerCounter.WithLabelValues(ShuffleHotRegionName, "new-operator") - shuffleHotRegionSkipCounter = schedulerCounter.WithLabelValues(ShuffleHotRegionName, "skip") -) - type shuffleHotRegionSchedulerConfig struct { syncutil.RWMutex storage endpoint.ConfigStorage diff --git a/pkg/schedule/schedulers/shuffle_leader.go b/pkg/schedule/schedulers/shuffle_leader.go index 5b3dfd9fd20..17b5fae6448 100644 --- a/pkg/schedule/schedulers/shuffle_leader.go +++ b/pkg/schedule/schedulers/shuffle_leader.go @@ -32,14 +32,6 @@ const ( ShuffleLeaderType = "shuffle-leader" ) -var ( - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - shuffleLeaderCounter = schedulerCounter.WithLabelValues(ShuffleLeaderName, "schedule") - shuffleLeaderNewOperatorCounter = schedulerCounter.WithLabelValues(ShuffleLeaderName, "new-operator") - shuffleLeaderNoTargetStoreCounter = schedulerCounter.WithLabelValues(ShuffleLeaderName, "no-target-store") - shuffleLeaderNoFollowerCounter = schedulerCounter.WithLabelValues(ShuffleLeaderName, "no-follower") -) - type shuffleLeaderSchedulerConfig struct { Name string `json:"name"` Ranges []core.KeyRange `json:"ranges"` diff --git a/pkg/schedule/schedulers/shuffle_region.go b/pkg/schedule/schedulers/shuffle_region.go index b1a100384ae..57f6c618962 100644 --- a/pkg/schedule/schedulers/shuffle_region.go +++ b/pkg/schedule/schedulers/shuffle_region.go @@ -33,16 +33,6 @@ const ( ShuffleRegionType = "shuffle-region" ) -var ( - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - shuffleRegionCounter = schedulerCounter.WithLabelValues(ShuffleRegionName, "schedule") - shuffleRegionNewOperatorCounter = schedulerCounter.WithLabelValues(ShuffleRegionName, "new-operator") - shuffleRegionNoRegionCounter = schedulerCounter.WithLabelValues(ShuffleRegionName, "no-region") - shuffleRegionNoNewPeerCounter = schedulerCounter.WithLabelValues(ShuffleRegionName, "no-new-peer") - shuffleRegionCreateOperatorFailCounter = schedulerCounter.WithLabelValues(ShuffleRegionName, "create-operator-fail") - shuffleRegionNoSourceStoreCounter = schedulerCounter.WithLabelValues(ShuffleRegionName, "no-source-store") -) - type shuffleRegionScheduler struct { *BaseScheduler conf *shuffleRegionSchedulerConfig diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index 9b049bf6ba1..4516dfe4433 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -45,20 +45,6 @@ const ( defaultSplitLimit = 10 ) -var ( - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - splitBucketDisableCounter = schedulerCounter.WithLabelValues(SplitBucketName, "bucket-disable") - splitBuckerSplitLimitCounter = schedulerCounter.WithLabelValues(SplitBucketName, "split-limit") - splitBucketScheduleCounter = schedulerCounter.WithLabelValues(SplitBucketName, "schedule") - splitBucketNoRegionCounter = schedulerCounter.WithLabelValues(SplitBucketName, "no-region") - splitBucketRegionTooSmallCounter = schedulerCounter.WithLabelValues(SplitBucketName, "region-too-small") - splitBucketOperatorExistCounter = schedulerCounter.WithLabelValues(SplitBucketName, "operator-exist") - splitBucketKeyRangeNotMatchCounter = schedulerCounter.WithLabelValues(SplitBucketName, "key-range-not-match") - splitBucketNoSplitKeysCounter = schedulerCounter.WithLabelValues(SplitBucketName, "no-split-keys") - splitBucketCreateOperatorFailCounter = schedulerCounter.WithLabelValues(SplitBucketName, "create-operator-fail") - splitBucketNewOperatorCounter = schedulerCounter.WithLabelValues(SplitBucketName, "new-operator") -) - func initSplitBucketConfig() *splitBucketSchedulerConfig { return &splitBucketSchedulerConfig{ Degree: defaultHotDegree, diff --git a/pkg/schedule/schedulers/transfer_witness_leader.go b/pkg/schedule/schedulers/transfer_witness_leader.go index 9ba78985d13..2050194b9ae 100644 --- a/pkg/schedule/schedulers/transfer_witness_leader.go +++ b/pkg/schedule/schedulers/transfer_witness_leader.go @@ -40,13 +40,6 @@ const ( transferWitnessLeaderRecvMaxRegionSize = 10000 ) -var ( - // WithLabelValues is a heavy operation, define variable to avoid call it every time. - transferWitnessLeaderCounter = schedulerCounter.WithLabelValues(TransferWitnessLeaderName, "schedule") - transferWitnessLeaderNewOperatorCounter = schedulerCounter.WithLabelValues(TransferWitnessLeaderName, "new-operator") - transferWitnessLeaderNoTargetStoreCounter = schedulerCounter.WithLabelValues(TransferWitnessLeaderName, "no-target-store") -) - type transferWitnessLeaderScheduler struct { *BaseScheduler regions chan *core.RegionInfo diff --git a/pkg/schedule/type/type.go b/pkg/schedule/type/type.go index d872bf0408c..65b2f0f682d 100644 --- a/pkg/schedule/type/type.go +++ b/pkg/schedule/type/type.go @@ -33,4 +33,40 @@ const ( RuleChecker CheckerSchedulerType = "rule-checker" // SplitChecker is the name for split checker. SplitChecker CheckerSchedulerType = "split-checker" + + // BalanceLeaderScheduler is balance leader scheduler name. + BalanceLeaderScheduler CheckerSchedulerType = "balance-leader-scheduler" + // BalanceRegionScheduler is balance region scheduler name. + BalanceRegionScheduler CheckerSchedulerType = "balance-region-scheduler" + // BalanceWitnessScheduler is balance witness scheduler name. + BalanceWitnessScheduler CheckerSchedulerType = "balance-witness-scheduler" + // EvictLeaderScheduler is evict leader scheduler name. + EvictLeaderScheduler CheckerSchedulerType = "evict-leader-scheduler" + // EvictSlowStoreScheduler is evict leader scheduler name. + EvictSlowStoreScheduler CheckerSchedulerType = "evict-slow-store-scheduler" + // EvictSlowTrendScheduler is evict leader by slow trend scheduler name. + EvictSlowTrendScheduler CheckerSchedulerType = "evict-slow-trend-scheduler" + // GrantLeaderScheduler is grant leader scheduler name. + GrantLeaderScheduler CheckerSchedulerType = "grant-leader-scheduler" + // GrantHotRegionScheduler is grant hot region scheduler name. + GrantHotRegionScheduler CheckerSchedulerType = "grant-hot-region-scheduler" + // HotRegionScheduler is balance hot region scheduler name. + HotRegionScheduler CheckerSchedulerType = "balance-hot-region-scheduler" + // RandomMergeScheduler is random merge scheduler name. + RandomMergeScheduler CheckerSchedulerType = "random-merge-scheduler" + // ScatterRangeScheduler is scatter range scheduler name. + // TODO: update to `scatter-range-scheduler` + ScatterRangeScheduler CheckerSchedulerType = "scatter-range" + // ShuffleHotRegionScheduler is shuffle hot region scheduler name. + ShuffleHotRegionScheduler CheckerSchedulerType = "shuffle-hot-region-scheduler" + // ShuffleLeaderScheduler is shuffle leader scheduler name. + ShuffleLeaderScheduler CheckerSchedulerType = "shuffle-leader-scheduler" + // ShuffleRegionScheduler is shuffle region scheduler name. + ShuffleRegionScheduler CheckerSchedulerType = "shuffle-region-scheduler" + // SplitBucketScheduler is the split bucket name. + SplitBucketScheduler CheckerSchedulerType = "split-bucket-scheduler" + // TransferWitnessLeaderScheduler is transfer witness leader scheduler name. + TransferWitnessLeaderScheduler CheckerSchedulerType = "transfer-witness-leader-scheduler" + // LabelScheduler is label scheduler name. + LabelScheduler CheckerSchedulerType = "label-scheduler" ) diff --git a/scripts/dashboard-version b/scripts/dashboard-version index 08a22137df5..42ff906db4e 100644 --- a/scripts/dashboard-version +++ b/scripts/dashboard-version @@ -1,3 +1,3 @@ # This file is updated by running scripts/update-dashboard.sh # Don't edit it manullay -8.2.0-91f6c281 +8.3.0-e6e78c7c diff --git a/server/grpc_service.go b/server/grpc_service.go index d3f58dfe1ab..7b18be47fde 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -48,6 +48,7 @@ import ( "github.com/tikv/pd/pkg/versioninfo" "github.com/tikv/pd/server/cluster" "go.etcd.io/etcd/clientv3" + "go.uber.org/multierr" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -1680,7 +1681,22 @@ func (s *GrpcServer) BatchScanRegions(ctx context.Context, request *pdpb.BatchSc } keyRanges.Append(reqRange.StartKey, reqRange.EndKey) } - res := rc.BatchScanRegions(keyRanges, int(limit)) + + scanOptions := []core.BatchScanRegionsOptionFunc{core.WithLimit(int(limit))} + if request.ContainAllKeyRange { + scanOptions = append(scanOptions, core.WithOutputMustContainAllKeyRange()) + } + res, err := rc.BatchScanRegions(keyRanges, scanOptions...) + if err != nil { + if errs.ErrRegionNotAdjacent.Equal(multierr.Errors(err)[0]) { + return &pdpb.BatchScanRegionsResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_REGIONS_NOT_CONTAIN_ALL_KEY_RANGE, err.Error()), + }, nil + } + return &pdpb.BatchScanRegionsResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } regions := make([]*pdpb.Region, 0, len(res)) for _, r := range res { leader := r.GetLeader() diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index ba4d16a8234..4138b775d7c 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -2005,9 +2005,13 @@ func waitLeaderChange(re *require.Assertions, cluster *tests.TestCluster, old st } func (suite *clientTestSuite) TestBatchScanRegions() { - re := suite.Require() - regionLen := 10 - regions := make([]*metapb.Region, 0, regionLen) + var ( + re = suite.Require() + ctx = context.Background() + regionLen = 10 + regions = make([]*metapb.Region, 0, regionLen) + ) + for i := 0; i < regionLen; i++ { regionID := regionIDAllocator.alloc() r := &metapb.Region{ @@ -2032,7 +2036,7 @@ func (suite *clientTestSuite) TestBatchScanRegions() { // Wait for region heartbeats. testutil.Eventually(re, func() bool { - scanRegions, err := suite.client.BatchScanRegions(context.Background(), []pd.KeyRange{{StartKey: []byte{0}, EndKey: nil}}, 10) + scanRegions, err := suite.client.BatchScanRegions(ctx, []pd.KeyRange{{StartKey: []byte{0}, EndKey: nil}}, 10) return err == nil && len(scanRegions) == 10 }) @@ -2053,39 +2057,45 @@ func (suite *clientTestSuite) TestBatchScanRegions() { suite.srv.GetRaftCluster().HandleRegionHeartbeat(region6) t := suite.T() + var outputMustContainAllKeyRangeOptions []bool check := func(ranges []pd.KeyRange, limit int, expect []*metapb.Region) { for _, bucket := range []bool{false, true} { - var opts []pd.GetRegionOption - if bucket { - opts = append(opts, pd.WithBuckets()) - } - scanRegions, err := suite.client.BatchScanRegions(context.Background(), ranges, limit, opts...) - re.NoError(err) - re.Len(scanRegions, len(expect)) - t.Log("scanRegions", scanRegions) - t.Log("expect", expect) - for i := range expect { - re.Equal(expect[i], scanRegions[i].Meta) - - if scanRegions[i].Meta.GetId() == region3.GetID() { - re.Equal(&metapb.Peer{}, scanRegions[i].Leader) - } else { - re.Equal(expect[i].Peers[0], scanRegions[i].Leader) + for _, outputMustContainAllKeyRange := range outputMustContainAllKeyRangeOptions { + var opts []pd.GetRegionOption + if bucket { + opts = append(opts, pd.WithBuckets()) } - - if scanRegions[i].Meta.GetId() == region4.GetID() { - re.Equal([]*metapb.Peer{expect[i].Peers[1]}, scanRegions[i].DownPeers) + if outputMustContainAllKeyRange { + opts = append(opts, pd.WithOutputMustContainAllKeyRange()) } + scanRegions, err := suite.client.BatchScanRegions(ctx, ranges, limit, opts...) + re.NoError(err) + t.Log("scanRegions", scanRegions) + t.Log("expect", expect) + re.Len(scanRegions, len(expect)) + for i := range expect { + re.Equal(expect[i], scanRegions[i].Meta) + + if scanRegions[i].Meta.GetId() == region3.GetID() { + re.Equal(&metapb.Peer{}, scanRegions[i].Leader) + } else { + re.Equal(expect[i].Peers[0], scanRegions[i].Leader) + } - if scanRegions[i].Meta.GetId() == region5.GetID() { - re.Equal([]*metapb.Peer{expect[i].Peers[1], expect[i].Peers[2]}, scanRegions[i].PendingPeers) - } + if scanRegions[i].Meta.GetId() == region4.GetID() { + re.Equal([]*metapb.Peer{expect[i].Peers[1]}, scanRegions[i].DownPeers) + } - if scanRegions[i].Meta.GetId() == region6.GetID() { - if !bucket { - re.Nil(scanRegions[i].Buckets) - } else { - re.Equal(scanRegions[i].Buckets, region6.GetBuckets()) + if scanRegions[i].Meta.GetId() == region5.GetID() { + re.Equal([]*metapb.Peer{expect[i].Peers[1], expect[i].Peers[2]}, scanRegions[i].PendingPeers) + } + + if scanRegions[i].Meta.GetId() == region6.GetID() { + if !bucket { + re.Nil(scanRegions[i].Buckets) + } else { + re.Equal(scanRegions[i].Buckets, region6.GetBuckets()) + } } } } @@ -2093,6 +2103,7 @@ func (suite *clientTestSuite) TestBatchScanRegions() { } // valid ranges + outputMustContainAllKeyRangeOptions = []bool{false, true} check([]pd.KeyRange{{StartKey: []byte{0}, EndKey: nil}}, 10, regions) check([]pd.KeyRange{{StartKey: []byte{1}, EndKey: nil}}, 5, regions[1:6]) check([]pd.KeyRange{ @@ -2109,6 +2120,8 @@ func (suite *clientTestSuite) TestBatchScanRegions() { {StartKey: []byte{6}, EndKey: []byte{7}}, {StartKey: []byte{8}, EndKey: []byte{9}}, }, 3, []*metapb.Region{regions[0], regions[2], regions[4]}) + + outputMustContainAllKeyRangeOptions = []bool{false} check([]pd.KeyRange{ {StartKey: []byte{0}, EndKey: []byte{0, 1}}, // non-continuous ranges in a region {StartKey: []byte{0, 2}, EndKey: []byte{0, 3}}, @@ -2116,14 +2129,56 @@ func (suite *clientTestSuite) TestBatchScanRegions() { {StartKey: []byte{0, 5}, EndKey: []byte{0, 6}}, {StartKey: []byte{0, 7}, EndKey: []byte{3}}, {StartKey: []byte{4}, EndKey: []byte{5}}, - }, 2, []*metapb.Region{regions[0], regions[1]}) + }, 10, []*metapb.Region{regions[0], regions[1], regions[2], regions[4]}) + outputMustContainAllKeyRangeOptions = []bool{false} + check([]pd.KeyRange{ + {StartKey: []byte{9}, EndKey: []byte{10, 1}}, + }, 10, []*metapb.Region{regions[9]}) // invalid ranges - _, err := suite.client.BatchScanRegions(context.Background(), []pd.KeyRange{{StartKey: []byte{1}, EndKey: []byte{0}}}, 10) - re.Error(err, "invalid key range, start key > end key") - _, err = suite.client.BatchScanRegions(context.Background(), []pd.KeyRange{ + _, err := suite.client.BatchScanRegions( + ctx, + []pd.KeyRange{{StartKey: []byte{1}, EndKey: []byte{0}}}, + 10, + pd.WithOutputMustContainAllKeyRange(), + ) + re.ErrorContains(err, "invalid key range, start key > end key") + _, err = suite.client.BatchScanRegions(ctx, []pd.KeyRange{ {StartKey: []byte{0}, EndKey: []byte{2}}, {StartKey: []byte{1}, EndKey: []byte{3}}, }, 10) - re.Error(err, "invalid key range, ranges overlapped") + re.ErrorContains(err, "invalid key range, ranges overlapped") + _, err = suite.client.BatchScanRegions( + ctx, + []pd.KeyRange{{StartKey: []byte{9}, EndKey: []byte{10, 1}}}, + 10, + pd.WithOutputMustContainAllKeyRange(), + ) + re.ErrorContains(err, "found a hole region in the last") + req := &pdpb.RegionHeartbeatRequest{ + Header: newHeader(suite.srv), + Region: &metapb.Region{ + Id: 100, + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: 1, + Version: 1, + }, + StartKey: []byte{100}, + EndKey: []byte{101}, + Peers: peers, + }, + Leader: peers[0], + } + re.NoError(suite.regionHeartbeat.Send(req)) + + // Wait for region heartbeats. + testutil.Eventually(re, func() bool { + _, err = suite.client.BatchScanRegions( + ctx, + []pd.KeyRange{{StartKey: []byte{9}, EndKey: []byte{101}}}, + 10, + pd.WithOutputMustContainAllKeyRange(), + ) + return err != nil && strings.Contains(err.Error(), "found a hole region between") + }) } diff --git a/tests/integrations/go.mod b/tests/integrations/go.mod index 8a570d52458..9076d4ed256 100644 --- a/tests/integrations/go.mod +++ b/tests/integrations/go.mod @@ -14,7 +14,7 @@ require ( github.com/go-sql-driver/mysql v1.7.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c - github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 + github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.19.0 github.com/prometheus/client_model v0.6.0 @@ -125,7 +125,7 @@ require ( github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d // indirect github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20240612100141-91f6c281e441 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20240718034516-e6e78c7c120b // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tests/integrations/go.sum b/tests/integrations/go.sum index c88919f6571..be23ca15174 100644 --- a/tests/integrations/go.sum +++ b/tests/integrations/go.sum @@ -368,15 +368,15 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ= github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 h1:6aIKNB2YGAec4IUDLw6G2eDECiGiufZcgEbZSCELBx0= -github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7 h1:V9XS3FQ/P6u+kFaoSyY5DBswIA774BMpIOLDBMrpxKc= +github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20240612100141-91f6c281e441 h1:01flLztcoWBeT5pe69Q8LAB2Hty0s9Rqc3RvHU4AQK8= -github.com/pingcap/tidb-dashboard v0.0.0-20240612100141-91f6c281e441/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= +github.com/pingcap/tidb-dashboard v0.0.0-20240718034516-e6e78c7c120b h1:MKgJ9yCQxD5ewLERuoiiD9XVOHuqZ2WRZnB20yMiKyo= +github.com/pingcap/tidb-dashboard v0.0.0-20240718034516-e6e78c7c120b/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index 635cb17b822..10b1a0b4520 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -403,9 +403,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { CPUMsCost: 1, } - controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg) - controller.Start(suite.ctx) - defer controller.Stop() + rgsController, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg) + rgsController.Start(suite.ctx) + defer rgsController.Stop() testCases := []struct { resourceGroupName string @@ -445,13 +445,13 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { rres := cas.tcs[i].makeReadResponse() wres := cas.tcs[i].makeWriteResponse() startTime := time.Now() - _, _, _, _, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq) + _, _, _, _, err := rgsController.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq) re.NoError(err) - _, _, _, _, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq) + _, _, _, _, err = rgsController.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq) re.NoError(err) sum += time.Since(startTime) - controller.OnResponse(cas.resourceGroupName, rreq, rres) - controller.OnResponse(cas.resourceGroupName, wreq, wres) + rgsController.OnResponse(cas.resourceGroupName, rreq, rres) + rgsController.OnResponse(cas.resourceGroupName, wreq, wres) time.Sleep(1000 * time.Microsecond) } re.LessOrEqual(sum, buffDuration+cas.tcs[i].waitDuration) @@ -464,11 +464,11 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate", "return(true)")) tcs := tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 900000000, times: 1, waitDuration: 0} wreq := tcs.makeWriteRequest() - _, _, _, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq) + _, _, _, _, err = rgsController.OnRequestWait(suite.ctx, rg.Name, wreq) re.Error(err) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate")) - group, err := controller.GetResourceGroup(rg.Name) + group, err := rgsController.GetResourceGroup(rg.Name) re.NoError(err) re.Equal(rg, group) // Delete the resource group and make sure it is tombstone. @@ -476,19 +476,21 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { re.NoError(err) re.Contains(resp, "Success!") // Make sure the resource group is watched by the controller and marked as tombstone. + expectedErr := controller.NewResourceGroupNotExistErr(rg.Name) testutil.Eventually(re, func() bool { - gc, err := controller.GetResourceGroup(rg.Name) - re.NoError(err) - return gc.GetName() == "default" + gc, err := rgsController.GetResourceGroup(rg.Name) + return err.Error() == expectedErr.Error() && gc == nil }, testutil.WithTickInterval(50*time.Millisecond)) // Add the resource group again. resp, err = cli.AddResourceGroup(suite.ctx, rg) re.NoError(err) re.Contains(resp, "Success!") - // Make sure the resource group can be set to active again. + // Make sure the resource group can be get by the controller again. testutil.Eventually(re, func() bool { - gc, err := controller.GetResourceGroup(rg.Name) - re.NoError(err) + gc, err := rgsController.GetResourceGroup(rg.Name) + if err != nil { + re.EqualError(err, expectedErr.Error()) + } return gc.GetName() == rg.Name }, testutil.WithTickInterval(50*time.Millisecond)) } diff --git a/tests/integrations/mcs/resourcemanager/server_test.go b/tests/integrations/mcs/resourcemanager/server_test.go index eaeef99e9d6..5ab8745f7be 100644 --- a/tests/integrations/mcs/resourcemanager/server_test.go +++ b/tests/integrations/mcs/resourcemanager/server_test.go @@ -25,6 +25,7 @@ import ( rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/stretchr/testify/require" "github.com/tikv/pd/client/grpcutil" + bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/versioninfo" "github.com/tikv/pd/tests" @@ -49,6 +50,7 @@ func TestResourceManagerServer(t *testing.T) { s, cleanup := tests.StartSingleResourceManagerTestServer(ctx, re, leader.GetAddr(), tempurl.Alloc()) addr := s.GetAddr() defer cleanup() + tests.WaitForPrimaryServing(re, map[string]bs.Server{addr: s}) // Test registered GRPC Service cc, err := grpcutil.GetClientConn(ctx, addr, nil) diff --git a/tools/go.mod b/tools/go.mod index f424f12458e..ed6813f0bbd 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -22,7 +22,7 @@ require ( github.com/mattn/go-shellwords v1.0.12 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 + github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.19.0 @@ -127,7 +127,7 @@ require ( github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d // indirect github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20240612100141-91f6c281e441 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20240718034516-e6e78c7c120b // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect diff --git a/tools/go.sum b/tools/go.sum index c2656b3e656..e9d47a8b02b 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -365,15 +365,15 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 h1:6aIKNB2YGAec4IUDLw6G2eDECiGiufZcgEbZSCELBx0= -github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7 h1:V9XS3FQ/P6u+kFaoSyY5DBswIA774BMpIOLDBMrpxKc= +github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20240612100141-91f6c281e441 h1:01flLztcoWBeT5pe69Q8LAB2Hty0s9Rqc3RvHU4AQK8= -github.com/pingcap/tidb-dashboard v0.0.0-20240612100141-91f6c281e441/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= +github.com/pingcap/tidb-dashboard v0.0.0-20240718034516-e6e78c7c120b h1:MKgJ9yCQxD5ewLERuoiiD9XVOHuqZ2WRZnB20yMiKyo= +github.com/pingcap/tidb-dashboard v0.0.0-20240718034516-e6e78c7c120b/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index c1db24cc176..4c85bb64037 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -592,6 +592,10 @@ func newConfigEvictLeaderCommand() *cobra.Command { Use: "delete-store ", Short: "delete a store from evict leader list", Run: func(cmd *cobra.Command, args []string) { deleteStoreFromSchedulerConfig(cmd, c.Name(), args) }, + }, &cobra.Command{ + Use: "set ", + Short: "set the config item", + Run: func(cmd *cobra.Command, args []string) { postSchedulerConfigCommandFunc(cmd, c.Name(), args) }, }) return c } diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go index 3f58175b5fa..3a6e29f3586 100644 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -140,7 +140,7 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *pdTests.TestCluster) { testutil.Eventually(re, func() bool { configInfo := make(map[string]any) mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName}, &configInfo) - return reflect.DeepEqual(expectedConfig, configInfo) + return reflect.DeepEqual(expectedConfig["store-id-ranges"], configInfo["store-id-ranges"]) }) } @@ -530,6 +530,27 @@ func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestClust return !strings.Contains(echo, "shuffle-hot-region-scheduler") }) + // test evict leader scheduler + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return strings.Contains(echo, "evict-leader-scheduler") + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-leader-scheduler", "set", "batch", "5"}, nil) + re.Contains(echo, "Success!") + conf = make(map[string]any) + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-leader-scheduler"}, &conf) + return conf["batch"] == 5. + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return !strings.Contains(echo, "evict-leader-scheduler") + }) + // test balance leader config conf = make(map[string]any) conf1 := make(map[string]any) diff --git a/tools/pd-ut/ut.go b/tools/pd-ut/ut.go index de62151fcb3..dcf0c17c686 100644 --- a/tools/pd-ut/ut.go +++ b/tools/pd-ut/ut.go @@ -673,6 +673,8 @@ func failureCases(input []JUnitTestCase) int { func (*numa) testCommand(pkg string, fn string) *exec.Cmd { args := make([]string, 0, 10) + // let the test run in the verbose mode. + args = append(args, "-test.v") exe := "./" + testFileName(pkg) if coverProfile != "" { fileName := strings.ReplaceAll(pkg, "/", "_") + "." + fn