From 0433af619d67b1bb204136dce922306a8d913cc4 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 13 Sep 2023 19:45:31 +0800 Subject: [PATCH] *: move operator handler to schedule Signed-off-by: lhy1024 --- pkg/errs/error.go | 42 +++ pkg/schedule/handler/handler.go | 534 +++++++++++++++++++++++++++++++ server/api/hot_status.go | 5 +- server/api/label.go | 3 +- server/api/region.go | 7 +- server/api/rule.go | 2 +- server/api/store.go | 6 +- server/cluster/cluster.go | 4 +- server/grpc_service.go | 4 +- server/handler.go | 545 ++------------------------------ 10 files changed, 622 insertions(+), 530 deletions(-) create mode 100644 pkg/errs/error.go create mode 100644 pkg/schedule/handler/handler.go diff --git a/pkg/errs/error.go b/pkg/errs/error.go new file mode 100644 index 000000000000..5b3830a70247 --- /dev/null +++ b/pkg/errs/error.go @@ -0,0 +1,42 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package errs + +import "github.com/pingcap/errors" + +var ( + // ErrOperatorNotFound is error info for operator not found. + ErrOperatorNotFound = errors.New("operator not found") + // ErrAddOperator is error info for already have an operator when adding operator. + ErrAddOperator = errors.New("failed to add operator, maybe already have one") + // ErrRegionNotAdjacent is error info for region not adjacent. + ErrRegionNotAdjacent = errors.New("two regions are not adjacent") + // ErrRegionNotFound is error info for region not found. + ErrRegionNotFound = func(regionID uint64) error { + return errors.Errorf("region %v not found", regionID) + } + // ErrRegionAbnormalPeer is error info for region has abnormal peer. + ErrRegionAbnormalPeer = func(regionID uint64) error { + return errors.Errorf("region %v has abnormal peer", regionID) + } + // ErrStoreNotFoundByID is error info for store not found. + ErrStoreNotFoundByID = func(storeID uint64) error { + return errors.Errorf("store %v not found", storeID) + } + // ErrPluginNotFound is error info for plugin not found. + ErrPluginNotFound = func(pluginPath string) error { + return errors.Errorf("plugin is not found: %s", pluginPath) + } +) \ No newline at end of file diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go new file mode 100644 index 000000000000..6212e3bbd9da --- /dev/null +++ b/pkg/schedule/handler/handler.go @@ -0,0 +1,534 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handler + +import ( + "bytes" + "encoding/hex" + "strings" + "time" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/pkg/errors" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/schedule" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/filter" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/schedule/scatter" +) + +type Server interface { + GetCoordinator() (*schedule.Coordinator, error) + GetCluster() (sche.SharedCluster, error) +} + +type Handler struct { + Server +} + +// NewHandler creates a new handler. +func NewHandler(server Server) *Handler { + return &Handler{ + Server: server, + } +} + +// GetOperatorController returns OperatorController. +func (h *Handler) GetOperatorController() (*operator.Controller, error) { + co, err := h.GetCoordinator() + if err != nil { + return nil, err + } + if co == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + return co.GetOperatorController(), nil +} + +// GetRegionScatterer returns RegionScatterer. +func (h *Handler) GetRegionScatterer() (*scatter.RegionScatterer, error) { + co, err := h.GetCoordinator() + if err != nil { + return nil, err + } + if co == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + return co.GetRegionScatterer(), nil +} + +// GetOperator returns the region operator. +func (h *Handler) GetOperator(regionID uint64) (*operator.Operator, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + + op := c.GetOperator(regionID) + if op == nil { + return nil, errs.ErrOperatorNotFound + } + + return op, nil +} + +// GetOperatorStatus returns the status of the region operator. +func (h *Handler) GetOperatorStatus(regionID uint64) (*operator.OpWithStatus, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + + op := c.GetOperatorStatus(regionID) + if op == nil { + return nil, errs.ErrOperatorNotFound + } + + return op, nil +} + +// RemoveOperator removes the region operator. +func (h *Handler) RemoveOperator(regionID uint64) error { + c, err := h.GetOperatorController() + if err != nil { + return err + } + + op := c.GetOperator(regionID) + if op == nil { + return errs.ErrOperatorNotFound + } + + _ = c.RemoveOperator(op, operator.AdminStop) + return nil +} + +// GetOperators returns the running operators. +func (h *Handler) GetOperators() ([]*operator.Operator, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + return c.GetOperators(), nil +} + +// GetWaitingOperators returns the waiting operators. +func (h *Handler) GetWaitingOperators() ([]*operator.Operator, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + return c.GetWaitingOperators(), nil +} + +// GetAdminOperators returns the running admin operators. +func (h *Handler) GetAdminOperators() ([]*operator.Operator, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + return c.GetOperatorsOfKind(operator.OpAdmin), nil +} + +// GetLeaderOperators returns the running leader operators. +func (h *Handler) GetLeaderOperators() ([]*operator.Operator, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + return c.GetOperatorsOfKind(operator.OpLeader), nil +} + +// GetRegionOperators returns the running region operators. +func (h *Handler) GetRegionOperators() ([]*operator.Operator, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + return c.GetOperatorsOfKind(operator.OpRegion), nil +} + +// GetHistory returns finished operators' history since start. +func (h *Handler) GetHistory(start time.Time) ([]operator.OpHistory, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + return c.GetHistory(start), nil +} + +// GetRecords returns finished operators since start. +func (h *Handler) GetRecords(from time.Time) ([]*operator.OpRecord, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + records := c.GetRecords(from) + if len(records) == 0 { + return nil, errs.ErrOperatorNotFound + } + return records, nil +} + +// AddTransferLeaderOperator adds an operator to transfer leader to the store. +func (h *Handler) AddTransferLeaderOperator(regionID uint64, storeID uint64) error { + c, err := h.GetCluster() + if err != nil { + return err + } + + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound(regionID) + } + + newLeader := region.GetStoreVoter(storeID) + if newLeader == nil { + return errors.Errorf("region has no voter in store %v", storeID) + } + + op, err := operator.CreateTransferLeaderOperator("admin-transfer-leader", c, region, region.GetLeader().GetStoreId(), newLeader.GetStoreId(), []uint64{}, operator.OpAdmin) + if err != nil { + log.Debug("fail to create transfer leader operator", errs.ZapError(err)) + return err + } + return h.addOperator(op) +} + +// AddTransferRegionOperator adds an operator to transfer region to the stores. +func (h *Handler) AddTransferRegionOperator(regionID uint64, storeIDs map[uint64]placement.PeerRoleType) error { + c, err := h.GetCluster() + if err != nil { + return err + } + + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound(regionID) + } + + if c.GetSharedConfig().IsPlacementRulesEnabled() { + // Cannot determine role without peer role when placement rules enabled. Not supported now. + for _, role := range storeIDs { + if len(role) == 0 { + return errors.New("transfer region without peer role is not supported when placement rules enabled") + } + } + } + for id := range storeIDs { + if err := checkStoreState(c, id); err != nil { + return err + } + } + + roles := make(map[uint64]placement.PeerRoleType) + for id, peerRole := range storeIDs { + if peerRole == "" { + peerRole = placement.Voter + } + roles[id] = peerRole + } + op, err := operator.CreateMoveRegionOperator("admin-move-region", c, region, operator.OpAdmin, roles) + if err != nil { + log.Debug("fail to create move region operator", errs.ZapError(err)) + return err + } + return h.addOperator(op) +} + +// AddTransferPeerOperator adds an operator to transfer peer. +func (h *Handler) AddTransferPeerOperator(regionID uint64, fromStoreID, toStoreID uint64) error { + c, err := h.GetCluster() + if err != nil { + return err + } + + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound(regionID) + } + + oldPeer := region.GetStorePeer(fromStoreID) + if oldPeer == nil { + return errors.Errorf("region has no peer in store %v", fromStoreID) + } + + if err := checkStoreState(c, toStoreID); err != nil { + return err + } + + newPeer := &metapb.Peer{StoreId: toStoreID, Role: oldPeer.GetRole(), IsWitness: oldPeer.GetIsWitness()} + op, err := operator.CreateMovePeerOperator("admin-move-peer", c, region, operator.OpAdmin, fromStoreID, newPeer) + if err != nil { + log.Debug("fail to create move peer operator", errs.ZapError(err)) + return err + } + return h.addOperator(op) +} + +// checkAdminAddPeerOperator checks adminAddPeer operator with given region ID and store ID. +func (h *Handler) checkAdminAddPeerOperator(regionID uint64, toStoreID uint64) (sche.SharedCluster, *core.RegionInfo, error) { + c, err := h.GetCluster() + if err != nil { + return nil, nil, err + } + + region := c.GetRegion(regionID) + if region == nil { + return nil, nil, errs.ErrRegionNotFound(regionID) + } + + if region.GetStorePeer(toStoreID) != nil { + return nil, nil, errors.Errorf("region already has peer in store %v", toStoreID) + } + + if err := checkStoreState(c, toStoreID); err != nil { + return nil, nil, err + } + + return c, region, nil +} + +// AddAddPeerOperator adds an operator to add peer. +func (h *Handler) AddAddPeerOperator(regionID uint64, toStoreID uint64) error { + c, region, err := h.checkAdminAddPeerOperator(regionID, toStoreID) + if err != nil { + return err + } + + newPeer := &metapb.Peer{StoreId: toStoreID} + op, err := operator.CreateAddPeerOperator("admin-add-peer", c, region, newPeer, operator.OpAdmin) + if err != nil { + log.Debug("fail to create add peer operator", errs.ZapError(err)) + return err + } + return h.addOperator(op) +} + +// AddAddLearnerOperator adds an operator to add learner. +func (h *Handler) AddAddLearnerOperator(regionID uint64, toStoreID uint64) error { + c, region, err := h.checkAdminAddPeerOperator(regionID, toStoreID) + if err != nil { + return err + } + + newPeer := &metapb.Peer{ + StoreId: toStoreID, + Role: metapb.PeerRole_Learner, + } + + op, err := operator.CreateAddPeerOperator("admin-add-learner", c, region, newPeer, operator.OpAdmin) + if err != nil { + log.Debug("fail to create add learner operator", errs.ZapError(err)) + return err + } + return h.addOperator(op) +} + +// AddRemovePeerOperator adds an operator to remove peer. +func (h *Handler) AddRemovePeerOperator(regionID uint64, fromStoreID uint64) error { + c, err := h.GetCluster() + if err != nil { + return err + } + + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound(regionID) + } + + if region.GetStorePeer(fromStoreID) == nil { + return errors.Errorf("region has no peer in store %v", fromStoreID) + } + + op, err := operator.CreateRemovePeerOperator("admin-remove-peer", c, operator.OpAdmin, region, fromStoreID) + if err != nil { + log.Debug("fail to create move peer operator", errs.ZapError(err)) + return err + } + return h.addOperator(op) +} + +// AddMergeRegionOperator adds an operator to merge region. +func (h *Handler) AddMergeRegionOperator(regionID uint64, targetID uint64) error { + c, err := h.GetCluster() + if err != nil { + return err + } + + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound(regionID) + } + + target := c.GetRegion(targetID) + if target == nil { + return errs.ErrRegionNotFound(targetID) + } + + if !filter.IsRegionHealthy(region) || !filter.IsRegionReplicated(c, region) { + return errs.ErrRegionAbnormalPeer(regionID) + } + + if !filter.IsRegionHealthy(target) || !filter.IsRegionReplicated(c, target) { + return errs.ErrRegionAbnormalPeer(targetID) + } + + // for the case first region (start key is nil) with the last region (end key is nil) but not adjacent + if (!bytes.Equal(region.GetStartKey(), target.GetEndKey()) || len(region.GetStartKey()) == 0) && + (!bytes.Equal(region.GetEndKey(), target.GetStartKey()) || len(region.GetEndKey()) == 0) { + return errs.ErrRegionNotAdjacent + } + + ops, err := operator.CreateMergeRegionOperator("admin-merge-region", c, region, target, operator.OpAdmin) + if err != nil { + log.Debug("fail to create merge region operator", errs.ZapError(err)) + return err + } + return h.addOperator(ops...) +} + +// AddSplitRegionOperator adds an operator to split a region. +func (h *Handler) AddSplitRegionOperator(regionID uint64, policyStr string, keys []string) error { + c, err := h.GetCluster() + if err != nil { + return err + } + + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound(regionID) + } + + policy, ok := pdpb.CheckPolicy_value[strings.ToUpper(policyStr)] + if !ok { + return errors.Errorf("check policy %s is not supported", policyStr) + } + + var splitKeys [][]byte + if pdpb.CheckPolicy(policy) == pdpb.CheckPolicy_USEKEY { + for i := range keys { + k, err := hex.DecodeString(keys[i]) + if err != nil { + return errors.Errorf("split key %s is not in hex format", keys[i]) + } + splitKeys = append(splitKeys, k) + } + } + + op, err := operator.CreateSplitRegionOperator("admin-split-region", region, operator.OpAdmin, pdpb.CheckPolicy(policy), splitKeys) + if err != nil { + return err + } + + return h.addOperator(op) +} + +// AddScatterRegionOperator adds an operator to scatter a region. +func (h *Handler) AddScatterRegionOperator(regionID uint64, group string) error { + c, err := h.GetCluster() + if err != nil { + return err + } + region := c.GetRegion(regionID) + if region == nil { + return errs.ErrRegionNotFound(regionID) + } + + if c.IsRegionHot(region) { + return errors.Errorf("region %d is a hot region", regionID) + } + + s, err := h.GetRegionScatterer() + if err != nil { + return err + } + + op, err := s.Scatter(region, group, false) + if err != nil { + return err + } + + if op == nil { + return nil + } + return h.addOperator(op) +} + +// AddScatterRegionsOperators add operators to scatter regions and return the processed percentage and error +func (h *Handler) AddScatterRegionsOperators(regionIDs []uint64, startRawKey, endRawKey, group string, retryLimit int) (int, error) { + s, err := h.GetRegionScatterer() + if err != nil { + return 0, err + } + opsCount := 0 + var failures map[uint64]error + // If startKey and endKey are both defined, use them first. + if len(startRawKey) > 0 && len(endRawKey) > 0 { + startKey, err := hex.DecodeString(startRawKey) + if err != nil { + return 0, err + } + endKey, err := hex.DecodeString(endRawKey) + if err != nil { + return 0, err + } + opsCount, failures, err = s.ScatterRegionsByRange(startKey, endKey, group, retryLimit) + if err != nil { + return 0, err + } + } else { + opsCount, failures, err = s.ScatterRegionsByID(regionIDs, group, retryLimit, false) + if err != nil { + return 0, err + } + } + percentage := 100 + if len(failures) > 0 { + percentage = 100 - 100*len(failures)/(opsCount+len(failures)) + } + return percentage, nil +} + +func (h *Handler) addOperator(ops ...*operator.Operator) error { + oc, err := h.GetOperatorController() + if err != nil { + return err + } + + if ok := oc.AddOperator(ops...); !ok { + return errors.WithStack(errs.ErrAddOperator) + } + return nil +} + +func checkStoreState(c sche.SharedCluster, storeID uint64) error { + store := c.GetStore(storeID) + if store == nil { + return errs.ErrStoreNotFound.FastGenByArgs(storeID) + } + if store.IsRemoved() { + return errs.ErrStoreRemoved.FastGenByArgs(storeID) + } + if store.IsUnhealthy() { + return errs.ErrStoreUnhealthy.FastGenByArgs(storeID) + } + return nil +} diff --git a/server/api/hot_status.go b/server/api/hot_status.go index 4f64f1bebc50..a3be72a56ac3 100644 --- a/server/api/hot_status.go +++ b/server/api/hot_status.go @@ -22,6 +22,7 @@ import ( "strconv" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" @@ -117,7 +118,7 @@ func (h *hotStatusHandler) GetHotWriteRegions(w http.ResponseWriter, r *http.Req } store := rc.GetStore(id) if store == nil { - h.rd.JSON(w, http.StatusNotFound, server.ErrStoreNotFound(id).Error()) + h.rd.JSON(w, http.StatusNotFound, errs.ErrStoreNotFoundByID(id).Error()) return } ids = append(ids, id) @@ -153,7 +154,7 @@ func (h *hotStatusHandler) GetHotReadRegions(w http.ResponseWriter, r *http.Requ } store := rc.GetStore(id) if store == nil { - h.rd.JSON(w, http.StatusNotFound, server.ErrStoreNotFound(id).Error()) + h.rd.JSON(w, http.StatusNotFound, errs.ErrStoreNotFoundByID(id).Error()) return } ids = append(ids, id) diff --git a/server/api/label.go b/server/api/label.go index abaad02a4e34..115d93dfdd24 100644 --- a/server/api/label.go +++ b/server/api/label.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server" "github.com/unrolled/render" ) @@ -87,7 +88,7 @@ func (h *labelsHandler) GetStoresByLabel(w http.ResponseWriter, r *http.Request) storeID := s.GetId() store := rc.GetStore(storeID) if store == nil { - h.rd.JSON(w, http.StatusInternalServerError, server.ErrStoreNotFound(storeID).Error()) + h.rd.JSON(w, http.StatusInternalServerError, errs.ErrStoreNotFoundByID(storeID).Error()) return } diff --git a/server/api/region.go b/server/api/region.go index 1c21af53296f..eaa0a29d2d20 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/kvproto/pkg/replication_modepb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/keyspace" "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/statistics" @@ -763,7 +764,7 @@ func (h *regionsHandler) GetRegionSiblings(w http.ResponseWriter, r *http.Reques } region := rc.GetRegion(uint64(id)) if region == nil { - h.rd.JSON(w, http.StatusNotFound, server.ErrRegionNotFound(uint64(id)).Error()) + h.rd.JSON(w, http.StatusNotFound, errs.ErrRegionNotFound(uint64(id)).Error()) return } @@ -1037,7 +1038,7 @@ func (h *regionsHandler) ScatterRegions(w http.ResponseWriter, r *http.Request) h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - opsCount, failures, err = rc.GetRegionScatter().ScatterRegionsByRange(startKey, endKey, group, retryLimit) + opsCount, failures, err = rc.GetRegionScatterer().ScatterRegionsByRange(startKey, endKey, group, retryLimit) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return @@ -1048,7 +1049,7 @@ func (h *regionsHandler) ScatterRegions(w http.ResponseWriter, r *http.Request) h.rd.JSON(w, http.StatusBadRequest, "regions_id is invalid") return } - opsCount, failures, err = rc.GetRegionScatter().ScatterRegionsByID(ids, group, retryLimit, false) + opsCount, failures, err = rc.GetRegionScatterer().ScatterRegionsByID(ids, group, retryLimit, false) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return diff --git a/server/api/rule.go b/server/api/rule.go index 33c63a8faa2c..585a0669b0d2 100644 --- a/server/api/rule.go +++ b/server/api/rule.go @@ -167,7 +167,7 @@ func (h *ruleHandler) preCheckForRegionAndRule(w http.ResponseWriter, r *http.Re } region := cluster.GetRegion(regionID) if region == nil { - h.rd.JSON(w, http.StatusNotFound, server.ErrRegionNotFound(regionID).Error()) + h.rd.JSON(w, http.StatusNotFound, errs.ErrRegionNotFound(regionID).Error()) return cluster, nil } return cluster, region diff --git a/server/api/store.go b/server/api/store.go index a3e8c4518a2c..65e67c1eff37 100644 --- a/server/api/store.go +++ b/server/api/store.go @@ -191,7 +191,7 @@ func (h *storeHandler) GetStore(w http.ResponseWriter, r *http.Request) { store := rc.GetStore(storeID) if store == nil { - h.rd.JSON(w, http.StatusNotFound, server.ErrStoreNotFound(storeID).Error()) + h.rd.JSON(w, http.StatusNotFound, errs.ErrStoreNotFoundByID(storeID).Error()) return } @@ -437,7 +437,7 @@ func (h *storeHandler) SetStoreLimit(w http.ResponseWriter, r *http.Request) { store := rc.GetStore(storeID) if store == nil { - h.rd.JSON(w, http.StatusInternalServerError, server.ErrStoreNotFound(storeID).Error()) + h.rd.JSON(w, http.StatusInternalServerError, errs.ErrStoreNotFoundByID(storeID).Error()) return } @@ -758,7 +758,7 @@ func (h *storesHandler) GetAllStores(w http.ResponseWriter, r *http.Request) { storeID := s.GetId() store := rc.GetStore(storeID) if store == nil { - h.rd.JSON(w, http.StatusInternalServerError, server.ErrStoreNotFound(storeID).Error()) + h.rd.JSON(w, http.StatusInternalServerError, errs.ErrStoreNotFoundByID(storeID).Error()) return } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index b16de73f84ef..53e92297710b 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -765,8 +765,8 @@ func (c *RaftCluster) SetPrepared() { c.coordinator.GetPrepareChecker().SetPrepared() } -// GetRegionScatter returns the region scatter. -func (c *RaftCluster) GetRegionScatter() *scatter.RegionScatterer { +// GetRegionScatterer returns the region scatter. +func (c *RaftCluster) GetRegionScatterer() *scatter.RegionScatterer { return c.coordinator.GetRegionScatterer() } diff --git a/server/grpc_service.go b/server/grpc_service.go index 973c45a622f7..d924e4ba8a95 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1704,7 +1704,7 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg region = core.NewRegionInfo(request.GetRegion(), request.GetLeader()) } - op, err := rc.GetRegionScatter().Scatter(region, request.GetGroup(), request.GetSkipStoreLimit()) + op, err := rc.GetRegionScatterer().Scatter(region, request.GetGroup(), request.GetSkipStoreLimit()) if err != nil { return nil, err } @@ -2106,7 +2106,7 @@ func (s *GrpcServer) SplitAndScatterRegions(ctx context.Context, request *pdpb.S // scatterRegions add operators to scatter regions and return the processed percentage and error func scatterRegions(cluster *cluster.RaftCluster, regionsID []uint64, group string, retryLimit int, skipStoreLimit bool) (int, error) { - opsCount, failures, err := cluster.GetRegionScatter().ScatterRegionsByID(regionsID, group, retryLimit, skipStoreLimit) + opsCount, failures, err := cluster.GetRegionScatterer().ScatterRegionsByID(regionsID, group, retryLimit, skipStoreLimit) if err != nil { return 0, err } diff --git a/server/handler.go b/server/handler.go index adc1e8ecd318..97446f17354a 100644 --- a/server/handler.go +++ b/server/handler.go @@ -15,19 +15,15 @@ package server import ( - "bytes" - "encoding/hex" "encoding/json" "net/http" "net/url" "path" "strconv" - "strings" "time" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/storelimit" @@ -35,9 +31,8 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/schedule" sc "github.com/tikv/pd/pkg/schedule/config" - "github.com/tikv/pd/pkg/schedule/filter" - "github.com/tikv/pd/pkg/schedule/operator" - "github.com/tikv/pd/pkg/schedule/placement" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/handler" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/buckets" @@ -50,36 +45,24 @@ import ( "go.uber.org/zap" ) -var ( - // SchedulerConfigHandlerPath is the api router path of the schedule config handler. - SchedulerConfigHandlerPath = "/api/v1/scheduler-config" - - // ErrOperatorNotFound is error info for operator not found. - ErrOperatorNotFound = errors.New("operator not found") - // ErrAddOperator is error info for already have an operator when adding operator. - ErrAddOperator = errors.New("failed to add operator, maybe already have one") - // ErrRegionNotAdjacent is error info for region not adjacent. - ErrRegionNotAdjacent = errors.New("two regions are not adjacent") - // ErrRegionNotFound is error info for region not found. - ErrRegionNotFound = func(regionID uint64) error { - return errors.Errorf("region %v not found", regionID) - } - // ErrRegionAbnormalPeer is error info for region has abnormal peer. - ErrRegionAbnormalPeer = func(regionID uint64) error { - return errors.Errorf("region %v has abnormal peer", regionID) - } - // ErrStoreNotFound is error info for store not found. - ErrStoreNotFound = func(storeID uint64) error { - return errors.Errorf("store %v not found", storeID) - } - // ErrPluginNotFound is error info for plugin not found. - ErrPluginNotFound = func(pluginPath string) error { - return errors.Errorf("plugin is not found: %s", pluginPath) - } -) +// SchedulerConfigHandlerPath is the api router path of the schedule config handler. +var SchedulerConfigHandlerPath = "/api/v1/scheduler-config" + +type server struct { + *Server +} + +func (s *server) GetCoordinator() (*schedule.Coordinator, error) { + return s.GetRaftCluster().GetCoordinator(), nil +} + +func (s *server) GetCluster() (sche.SharedCluster, error) { + return s.GetRaftCluster(), nil +} // Handler is a helper to export methods to handle API/RPC requests. type Handler struct { + *handler.Handler s *Server opt *config.PersistOptions pluginChMap map[string]chan string @@ -87,7 +70,16 @@ type Handler struct { } func newHandler(s *Server) *Handler { - return &Handler{s: s, opt: s.persistOptions, pluginChMap: make(map[string]chan string), pluginChMapLock: syncutil.RWMutex{}} + h := handler.NewHandler(&server{ + Server: s, + }) + return &Handler{ + Handler: h, + s: s, + opt: s.persistOptions, + pluginChMap: make(map[string]chan string), + pluginChMapLock: syncutil.RWMutex{}, + } } // GetRaftCluster returns RaftCluster. @@ -99,15 +91,6 @@ func (h *Handler) GetRaftCluster() (*cluster.RaftCluster, error) { return rc, nil } -// GetOperatorController returns OperatorController. -func (h *Handler) GetOperatorController() (*operator.Controller, error) { - rc := h.s.GetRaftCluster() - if rc == nil { - return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() - } - return rc.GetOperatorController(), nil -} - // IsSchedulerPaused returns whether scheduler is paused. func (h *Handler) IsSchedulerPaused(name string) (bool, error) { rc, err := h.GetRaftCluster() @@ -170,7 +153,7 @@ func (h *Handler) GetStores() ([]*core.StoreInfo, error) { storeID := s.GetId() store := rc.GetStore(storeID) if store == nil { - return nil, ErrStoreNotFound(storeID) + return nil, errs.ErrStoreNotFoundByID(storeID) } stores = append(stores, store) } @@ -407,119 +390,6 @@ func (h *Handler) AddGrantHotRegionScheduler(leaderID, peers string) error { return h.AddScheduler(schedulers.GrantHotRegionType, leaderID, peers) } -// GetOperator returns the region operator. -func (h *Handler) GetOperator(regionID uint64) (*operator.Operator, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - - op := c.GetOperator(regionID) - if op == nil { - return nil, ErrOperatorNotFound - } - - return op, nil -} - -// GetOperatorStatus returns the status of the region operator. -func (h *Handler) GetOperatorStatus(regionID uint64) (*operator.OpWithStatus, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - - op := c.GetOperatorStatus(regionID) - if op == nil { - return nil, ErrOperatorNotFound - } - - return op, nil -} - -// RemoveOperator removes the region operator. -func (h *Handler) RemoveOperator(regionID uint64) error { - c, err := h.GetOperatorController() - if err != nil { - return err - } - - op := c.GetOperator(regionID) - if op == nil { - return ErrOperatorNotFound - } - - _ = c.RemoveOperator(op, operator.AdminStop) - return nil -} - -// GetOperators returns the running operators. -func (h *Handler) GetOperators() ([]*operator.Operator, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - return c.GetOperators(), nil -} - -// GetWaitingOperators returns the waiting operators. -func (h *Handler) GetWaitingOperators() ([]*operator.Operator, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - return c.GetWaitingOperators(), nil -} - -// GetAdminOperators returns the running admin operators. -func (h *Handler) GetAdminOperators() ([]*operator.Operator, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - return c.GetOperatorsOfKind(operator.OpAdmin), nil -} - -// GetLeaderOperators returns the running leader operators. -func (h *Handler) GetLeaderOperators() ([]*operator.Operator, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - return c.GetOperatorsOfKind(operator.OpLeader), nil -} - -// GetRegionOperators returns the running region operators. -func (h *Handler) GetRegionOperators() ([]*operator.Operator, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - return c.GetOperatorsOfKind(operator.OpRegion), nil -} - -// GetHistory returns finished operators' history since start. -func (h *Handler) GetHistory(start time.Time) ([]operator.OpHistory, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - return c.GetHistory(start), nil -} - -// GetRecords returns finished operators since start. -func (h *Handler) GetRecords(from time.Time) ([]*operator.OpRecord, error) { - c, err := h.GetOperatorController() - if err != nil { - return nil, err - } - records := c.GetRecords(from) - if len(records) == 0 { - return nil, ErrOperatorNotFound - } - return records, nil -} - // SetAllStoresLimit is used to set limit of all stores. func (h *Handler) SetAllStoresLimit(ratePerMin float64, limitType storelimit.Type) error { c, err := h.GetRaftCluster() @@ -576,349 +446,6 @@ func (h *Handler) SetStoreLimit(storeID uint64, ratePerMin float64, limitType st return c.SetStoreLimit(storeID, limitType, ratePerMin) } -// AddTransferLeaderOperator adds an operator to transfer leader to the store. -func (h *Handler) AddTransferLeaderOperator(regionID uint64, storeID uint64) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - newLeader := region.GetStoreVoter(storeID) - if newLeader == nil { - return errors.Errorf("region has no voter in store %v", storeID) - } - - op, err := operator.CreateTransferLeaderOperator("admin-transfer-leader", c, region, region.GetLeader().GetStoreId(), newLeader.GetStoreId(), []uint64{}, operator.OpAdmin) - if err != nil { - log.Debug("fail to create transfer leader operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddTransferRegionOperator adds an operator to transfer region to the stores. -func (h *Handler) AddTransferRegionOperator(regionID uint64, storeIDs map[uint64]placement.PeerRoleType) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - if c.GetOpts().IsPlacementRulesEnabled() { - // Cannot determine role without peer role when placement rules enabled. Not supported now. - for _, role := range storeIDs { - if len(role) == 0 { - return errors.New("transfer region without peer role is not supported when placement rules enabled") - } - } - } - for id := range storeIDs { - if err := checkStoreState(c, id); err != nil { - return err - } - } - - roles := make(map[uint64]placement.PeerRoleType) - for id, peerRole := range storeIDs { - if peerRole == "" { - peerRole = placement.Voter - } - roles[id] = peerRole - } - op, err := operator.CreateMoveRegionOperator("admin-move-region", c, region, operator.OpAdmin, roles) - if err != nil { - log.Debug("fail to create move region operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddTransferPeerOperator adds an operator to transfer peer. -func (h *Handler) AddTransferPeerOperator(regionID uint64, fromStoreID, toStoreID uint64) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - oldPeer := region.GetStorePeer(fromStoreID) - if oldPeer == nil { - return errors.Errorf("region has no peer in store %v", fromStoreID) - } - - if err := checkStoreState(c, toStoreID); err != nil { - return err - } - - newPeer := &metapb.Peer{StoreId: toStoreID, Role: oldPeer.GetRole(), IsWitness: oldPeer.GetIsWitness()} - op, err := operator.CreateMovePeerOperator("admin-move-peer", c, region, operator.OpAdmin, fromStoreID, newPeer) - if err != nil { - log.Debug("fail to create move peer operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// checkAdminAddPeerOperator checks adminAddPeer operator with given region ID and store ID. -func (h *Handler) checkAdminAddPeerOperator(regionID uint64, toStoreID uint64) (*cluster.RaftCluster, *core.RegionInfo, error) { - c, err := h.GetRaftCluster() - if err != nil { - return nil, nil, err - } - - region := c.GetRegion(regionID) - if region == nil { - return nil, nil, ErrRegionNotFound(regionID) - } - - if region.GetStorePeer(toStoreID) != nil { - return nil, nil, errors.Errorf("region already has peer in store %v", toStoreID) - } - - if err := checkStoreState(c, toStoreID); err != nil { - return nil, nil, err - } - - return c, region, nil -} - -// AddAddPeerOperator adds an operator to add peer. -func (h *Handler) AddAddPeerOperator(regionID uint64, toStoreID uint64) error { - c, region, err := h.checkAdminAddPeerOperator(regionID, toStoreID) - if err != nil { - return err - } - - newPeer := &metapb.Peer{StoreId: toStoreID} - op, err := operator.CreateAddPeerOperator("admin-add-peer", c, region, newPeer, operator.OpAdmin) - if err != nil { - log.Debug("fail to create add peer operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddAddLearnerOperator adds an operator to add learner. -func (h *Handler) AddAddLearnerOperator(regionID uint64, toStoreID uint64) error { - c, region, err := h.checkAdminAddPeerOperator(regionID, toStoreID) - if err != nil { - return err - } - - newPeer := &metapb.Peer{ - StoreId: toStoreID, - Role: metapb.PeerRole_Learner, - } - - op, err := operator.CreateAddPeerOperator("admin-add-learner", c, region, newPeer, operator.OpAdmin) - if err != nil { - log.Debug("fail to create add learner operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddRemovePeerOperator adds an operator to remove peer. -func (h *Handler) AddRemovePeerOperator(regionID uint64, fromStoreID uint64) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - if region.GetStorePeer(fromStoreID) == nil { - return errors.Errorf("region has no peer in store %v", fromStoreID) - } - - op, err := operator.CreateRemovePeerOperator("admin-remove-peer", c, operator.OpAdmin, region, fromStoreID) - if err != nil { - log.Debug("fail to create move peer operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddMergeRegionOperator adds an operator to merge region. -func (h *Handler) AddMergeRegionOperator(regionID uint64, targetID uint64) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - target := c.GetRegion(targetID) - if target == nil { - return ErrRegionNotFound(targetID) - } - - if !filter.IsRegionHealthy(region) || !filter.IsRegionReplicated(c, region) { - return ErrRegionAbnormalPeer(regionID) - } - - if !filter.IsRegionHealthy(target) || !filter.IsRegionReplicated(c, target) { - return ErrRegionAbnormalPeer(targetID) - } - - // for the case first region (start key is nil) with the last region (end key is nil) but not adjacent - if (!bytes.Equal(region.GetStartKey(), target.GetEndKey()) || len(region.GetStartKey()) == 0) && - (!bytes.Equal(region.GetEndKey(), target.GetStartKey()) || len(region.GetEndKey()) == 0) { - return ErrRegionNotAdjacent - } - - ops, err := operator.CreateMergeRegionOperator("admin-merge-region", c, region, target, operator.OpAdmin) - if err != nil { - log.Debug("fail to create merge region operator", errs.ZapError(err)) - return err - } - if ok := c.GetOperatorController().AddOperator(ops...); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddSplitRegionOperator adds an operator to split a region. -func (h *Handler) AddSplitRegionOperator(regionID uint64, policyStr string, keys []string) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - policy, ok := pdpb.CheckPolicy_value[strings.ToUpper(policyStr)] - if !ok { - return errors.Errorf("check policy %s is not supported", policyStr) - } - - var splitKeys [][]byte - if pdpb.CheckPolicy(policy) == pdpb.CheckPolicy_USEKEY { - for i := range keys { - k, err := hex.DecodeString(keys[i]) - if err != nil { - return errors.Errorf("split key %s is not in hex format", keys[i]) - } - splitKeys = append(splitKeys, k) - } - } - - op, err := operator.CreateSplitRegionOperator("admin-split-region", region, operator.OpAdmin, pdpb.CheckPolicy(policy), splitKeys) - if err != nil { - return err - } - - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddScatterRegionOperator adds an operator to scatter a region. -func (h *Handler) AddScatterRegionOperator(regionID uint64, group string) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - - region := c.GetRegion(regionID) - if region == nil { - return ErrRegionNotFound(regionID) - } - - if c.IsRegionHot(region) { - return errors.Errorf("region %d is a hot region", regionID) - } - - op, err := c.GetRegionScatter().Scatter(region, group, false) - if err != nil { - return err - } - - if op == nil { - return nil - } - if ok := c.GetOperatorController().AddOperator(op); !ok { - return errors.WithStack(ErrAddOperator) - } - return nil -} - -// AddScatterRegionsOperators add operators to scatter regions and return the processed percentage and error -func (h *Handler) AddScatterRegionsOperators(regionIDs []uint64, startRawKey, endRawKey, group string, retryLimit int) (int, error) { - c, err := h.GetRaftCluster() - if err != nil { - return 0, err - } - opsCount := 0 - var failures map[uint64]error - // If startKey and endKey are both defined, use them first. - if len(startRawKey) > 0 && len(endRawKey) > 0 { - startKey, err := hex.DecodeString(startRawKey) - if err != nil { - return 0, err - } - endKey, err := hex.DecodeString(endRawKey) - if err != nil { - return 0, err - } - opsCount, failures, err = c.GetRegionScatter().ScatterRegionsByRange(startKey, endKey, group, retryLimit) - if err != nil { - return 0, err - } - } else { - opsCount, failures, err = c.GetRegionScatter().ScatterRegionsByID(regionIDs, group, retryLimit, false) - if err != nil { - return 0, err - } - } - percentage := 100 - if len(failures) > 0 { - percentage = 100 - 100*len(failures)/(opsCount+len(failures)) - } - return percentage, nil -} - // GetRegionsByType gets the region with specified type. func (h *Handler) GetRegionsByType(typ statistics.RegionStatisticType) ([]*core.RegionInfo, error) { c := h.s.GetRaftCluster() @@ -1010,7 +537,7 @@ func (h *Handler) PluginUnload(pluginPath string) error { ch <- schedule.PluginUnload return nil } - return ErrPluginNotFound(pluginPath) + return errs.ErrPluginNotFound(pluginPath) } // GetAddr returns the server urls for clients. @@ -1101,20 +628,6 @@ func (h *Handler) GetHistoryHotRegionIter( return iter } -func checkStoreState(rc *cluster.RaftCluster, storeID uint64) error { - store := rc.GetStore(storeID) - if store == nil { - return errs.ErrStoreNotFound.FastGenByArgs(storeID) - } - if store.IsRemoved() { - return errs.ErrStoreRemoved.FastGenByArgs(storeID) - } - if store.IsUnhealthy() { - return errs.ErrStoreUnhealthy.FastGenByArgs(storeID) - } - return nil -} - // RedirectSchedulerUpdate update scheduler config. Export this func to help handle damaged store. func (h *Handler) redirectSchedulerUpdate(name string, storeID float64) error { input := make(map[string]interface{})