diff --git a/pkg/schedule/operator_controller.go b/pkg/schedule/operator_controller.go index 692584cc14b..cf63ee03061 100644 --- a/pkg/schedule/operator_controller.go +++ b/pkg/schedule/operator_controller.go @@ -325,7 +325,19 @@ func (oc *OperatorController) AddOperator(ops ...*operator.Operator) bool { // but maybe user want to add operator when waiting queue is busy if oc.exceedStoreLimitLocked(ops...) || !oc.checkAddOperator(false, ops...) { for _, op := range ops { +<<<<<<< HEAD:pkg/schedule/operator_controller.go _ = op.Cancel() +======= + operatorCounter.WithLabelValues(op.Desc(), "exceed-limit").Inc() + _ = op.Cancel(ExceedStoreLimit) + oc.buryOperator(op) + } + return false + } + if pass, reason := oc.checkAddOperator(false, ops...); !pass { + for _, op := range ops { + _ = op.Cancel(reason) +>>>>>>> 9aba1a298 (pkg/schedule, grpc_service : Add error code if scatter_region grpc is failed (#6953)):pkg/schedule/operator/operator_controller.go oc.buryOperator(op) } return false diff --git a/server/grpc_service.go b/server/grpc_service.go index 40c36e35ef7..b1ae3150b99 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1216,8 +1216,19 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg if err != nil { return nil, err } - if op != nil { - rc.GetOperatorController().AddOperator(op) + + if op == nil { + return &pdpb.ScatterRegionResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + "operator could not be allocated"), + }, nil + } + + if !rc.GetOperatorController().AddOperator(op) { + return &pdpb.ScatterRegionResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + "operator cancelled because store limit exceeded"), + }, nil } return &pdpb.ScatterRegionResponse{ diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 5ea05bde40e..d5dff237b88 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -1308,25 +1308,29 @@ func (suite *clientTestSuite) TestUpdateServiceGCSafePoint() { } func (suite *clientTestSuite) TestScatterRegion() { - regionID := regionIDAllocator.alloc() - region := &metapb.Region{ - Id: regionID, - RegionEpoch: &metapb.RegionEpoch{ - ConfVer: 1, - Version: 1, - }, - Peers: peers, - StartKey: []byte("fff"), - EndKey: []byte("ggg"), - } - req := &pdpb.RegionHeartbeatRequest{ - Header: newHeader(suite.srv), - Region: region, - Leader: peers[0], + CreateRegion := func() uint64 { + regionID := regionIDAllocator.alloc() + region := &metapb.Region{ + Id: regionID, + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: 1, + Version: 1, + }, + Peers: peers, + StartKey: []byte("fff"), + EndKey: []byte("ggg"), + } + req := &pdpb.RegionHeartbeatRequest{ + Header: newHeader(suite.srv), + Region: region, + Leader: peers[0], + } + err := suite.regionHeartbeat.Send(req) + suite.NoError(err) + return regionID } - err := suite.regionHeartbeat.Send(req) + var regionID = CreateRegion() regionsID := []uint64{regionID} - suite.NoError(err) // Test interface `ScatterRegions`. re := suite.Require() testutil.Eventually(re, func() bool { @@ -1348,6 +1352,9 @@ func (suite *clientTestSuite) TestScatterRegion() { // Test interface `ScatterRegion`. // TODO: Deprecate interface `ScatterRegion`. + // create a new region as scatter operation from previous test might be running + + regionID = CreateRegion() testutil.Eventually(re, func() bool { err := suite.client.ScatterRegion(context.Background(), regionID) if err != nil {