From 2892b4606ec0c6d0efdb7bfeb6fc096dd1d8d683 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 26 Jul 2023 18:21:04 +0800 Subject: [PATCH] rule_checker: can replace unhealthPeer with orphanPeer (#6831) (#6843) close tikv/pd#6559 add logic try to replace unhealthy peer with orphan peer Signed-off-by: ti-chi-bot Signed-off-by: nolouch Co-authored-by: ShuNing Co-authored-by: nolouch --- Makefile | 2 +- server/api/operator_test.go | 28 ++-- server/schedule/checker/rule_checker.go | 53 +++++++- server/schedule/checker/rule_checker_test.go | 127 ++++++++++++++++++- server/schedule/operator/builder.go | 5 +- server/schedule/operator/create_operator.go | 21 ++- server/schedule/operator/operator.go | 2 +- server/schedule/placement/fit.go | 3 + 8 files changed, 221 insertions(+), 20 deletions(-) diff --git a/Makefile b/Makefile index f6a1c9a809a..99d6d680a7a 100644 --- a/Makefile +++ b/Makefile @@ -148,7 +148,7 @@ static: install-tools @ echo "gofmt ..." @ gofmt -s -l -d $(PACKAGE_DIRECTORIES) 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }' @ echo "golangci-lint ..." - @ golangci-lint run --verbose $(PACKAGE_DIRECTORIES) + @ golangci-lint run --verbose $(PACKAGE_DIRECTORIES) --allow-parallel-runners @ echo "revive ..." @ revive -formatter friendly -config revive.toml $(PACKAGES) diff --git a/server/api/operator_test.go b/server/api/operator_test.go index 4b30f9f5e16..973f679f098 100644 --- a/server/api/operator_test.go +++ b/server/api/operator_test.go @@ -217,7 +217,13 @@ func (suite *transferRegionOperatorTestSuite) TestTransferRegionWithPlacementRul regionURL := fmt.Sprintf("%s/operators/%d", suite.urlPrefix, region.GetId()) operator := mustReadURL(re, regionURL) suite.Contains(operator, "operator not found") - + convertStepsToStr := func(steps []string) string { + stepStrs := make([]string, len(steps)) + for i := range steps { + stepStrs[i] = fmt.Sprintf("%d:{%s}", i, steps[i]) + } + return strings.Join(stepStrs, ", ") + } testCases := []struct { name string placementRuleEnable bool @@ -231,25 +237,25 @@ func (suite *transferRegionOperatorTestSuite) TestTransferRegionWithPlacementRul placementRuleEnable: false, input: []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [2, 3]}`), expectedError: nil, - expectSteps: strings.Join([]string{ + expectSteps: convertStepsToStr([]string{ pdoperator.AddLearner{ToStore: 3, PeerID: 1}.String(), pdoperator.PromoteLearner{ToStore: 3, PeerID: 1}.String(), pdoperator.TransferLeader{FromStore: 1, ToStore: 2}.String(), pdoperator.RemovePeer{FromStore: 1, PeerID: 1}.String(), - }, ", "), + }), }, { name: "placement rule disable with peer role", placementRuleEnable: false, input: []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [2, 3], "peer_roles":["follower", "leader"]}`), expectedError: nil, - expectSteps: strings.Join([]string{ + expectSteps: convertStepsToStr([]string{ pdoperator.AddLearner{ToStore: 3, PeerID: 2}.String(), pdoperator.PromoteLearner{ToStore: 3, PeerID: 2}.String(), pdoperator.TransferLeader{FromStore: 1, ToStore: 2}.String(), pdoperator.RemovePeer{FromStore: 1, PeerID: 2}.String(), pdoperator.TransferLeader{FromStore: 2, ToStore: 3}.String(), - }, ", "), + }), }, { name: "default placement rule without peer role", @@ -262,13 +268,13 @@ func (suite *transferRegionOperatorTestSuite) TestTransferRegionWithPlacementRul name: "default placement rule with peer role", placementRuleEnable: true, input: []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [2, 3], "peer_roles":["follower", "leader"]}`), - expectSteps: strings.Join([]string{ + expectSteps: convertStepsToStr([]string{ pdoperator.AddLearner{ToStore: 3, PeerID: 3}.String(), pdoperator.PromoteLearner{ToStore: 3, PeerID: 3}.String(), pdoperator.TransferLeader{FromStore: 1, ToStore: 2}.String(), pdoperator.RemovePeer{FromStore: 1, PeerID: 1}.String(), pdoperator.TransferLeader{FromStore: 2, ToStore: 3}.String(), - }, ", "), + }), }, { name: "default placement rule with invalid input", @@ -323,12 +329,12 @@ func (suite *transferRegionOperatorTestSuite) TestTransferRegionWithPlacementRul }, input: []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [2, 3], "peer_roles":["follower", "leader"]}`), expectedError: nil, - expectSteps: strings.Join([]string{ + expectSteps: convertStepsToStr([]string{ pdoperator.AddLearner{ToStore: 3, PeerID: 5}.String(), pdoperator.PromoteLearner{ToStore: 3, PeerID: 5}.String(), pdoperator.TransferLeader{FromStore: 1, ToStore: 3}.String(), pdoperator.RemovePeer{FromStore: 1, PeerID: 1}.String(), - }, ", "), + }), }, { name: "customized placement rule with valid peer role2", @@ -363,12 +369,12 @@ func (suite *transferRegionOperatorTestSuite) TestTransferRegionWithPlacementRul }, input: []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [2, 3], "peer_roles":["leader", "follower"]}`), expectedError: nil, - expectSteps: strings.Join([]string{ + expectSteps: convertStepsToStr([]string{ pdoperator.AddLearner{ToStore: 3, PeerID: 6}.String(), pdoperator.PromoteLearner{ToStore: 3, PeerID: 6}.String(), pdoperator.TransferLeader{FromStore: 1, ToStore: 2}.String(), pdoperator.RemovePeer{FromStore: 1, PeerID: 1}.String(), - }, ", "), + }), }, } for _, testCase := range testCases { diff --git a/server/schedule/checker/rule_checker.go b/server/schedule/checker/rule_checker.go index 501de81e46d..af3f28839ba 100644 --- a/server/schedule/checker/rule_checker.go +++ b/server/schedule/checker/rule_checker.go @@ -390,14 +390,15 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg if len(fit.OrphanPeers) == 0 { return nil, nil } + var pinDownPeer *metapb.Peer isUnhealthyPeer := func(id uint64) bool { - for _, pendingPeer := range region.GetPendingPeers() { - if pendingPeer.GetId() == id { + for _, downPeer := range region.GetDownPeers() { + if downPeer.Peer.GetId() == id { return true } } - for _, downPeer := range region.GetDownPeers() { - if downPeer.Peer.GetId() == id { + for _, pendingPeer := range region.GetPendingPeers() { + if pendingPeer.GetId() == id { return true } } @@ -414,16 +415,56 @@ loopFits: } for _, p := range rf.Peers { if isUnhealthyPeer(p.GetId()) { + // make sure is down peer. + if region.GetDownPeer(p.GetId()) != nil { + pinDownPeer = p + } hasUnhealthyFit = true break loopFits } } } + // If hasUnhealthyFit is false, it is safe to delete the OrphanPeer. if !hasUnhealthyFit { checkerCounter.WithLabelValues("rule_checker", "remove-orphan-peer").Inc() return operator.CreateRemovePeerOperator("remove-orphan-peer", c.cluster, 0, region, fit.OrphanPeers[0].StoreId) } + + // try to use orphan peers to replace unhealthy down peers. + for _, orphanPeer := range fit.OrphanPeers { + if pinDownPeer != nil { + // make sure the orphan peer is healthy. + if isUnhealthyPeer(orphanPeer.GetId()) { + continue + } + // no consider witness in this path. + if pinDownPeer.GetIsWitness() || orphanPeer.GetIsWitness() { + continue + } + // down peer's store should be down. + if !c.isStoreDownTimeHitMaxDownTime(pinDownPeer.GetStoreId()) { + continue + } + // check if down peer can replace with orphan peer. + dstStore := c.cluster.GetStore(orphanPeer.GetStoreId()) + if fit.Replace(pinDownPeer.GetStoreId(), dstStore) { + destRole := pinDownPeer.GetRole() + orphanPeerRole := orphanPeer.GetRole() + checkerCounter.WithLabelValues("rule_checker", "replace-orphan-peer").Inc() + switch { + case orphanPeerRole == metapb.PeerRole_Learner && destRole == metapb.PeerRole_Voter: + return operator.CreatePromoteLearnerOperatorAndRemovePeer("replace-down-peer-with-orphan-peer", c.cluster, region, orphanPeer, pinDownPeer) + case orphanPeerRole == metapb.PeerRole_Voter && destRole == metapb.PeerRole_Learner: + return operator.CreateDemoteLearnerOperatorAndRemovePeer("replace-down-peer-with-orphan-peer", c.cluster, region, orphanPeer, pinDownPeer) + default: + // destRole should not same with orphanPeerRole. if role is same, it fit with orphanPeer should be better than now. + // destRole never be leader, so we not consider it. + } + } + } + } + // If hasUnhealthyFit is true, try to remove unhealthy orphan peers only if number of OrphanPeers is >= 2. // Ref https://github.com/tikv/pd/issues/4045 if len(fit.OrphanPeers) >= 2 { @@ -462,6 +503,10 @@ func (c *RuleChecker) isDownPeer(region *core.RegionInfo, peer *metapb.Peer) boo func (c *RuleChecker) isStoreDownTimeHitMaxDownTime(storeID uint64) bool { store := c.cluster.GetStore(storeID) + if store == nil { + log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID)) + return false + } return store.DownTime() >= c.cluster.GetOpts().GetMaxStoreDownTime() } diff --git a/server/schedule/checker/rule_checker_test.go b/server/schedule/checker/rule_checker_test.go index f1fe4babbf1..4230cef537a 100644 --- a/server/schedule/checker/rule_checker_test.go +++ b/server/schedule/checker/rule_checker_test.go @@ -361,7 +361,6 @@ func (suite *ruleCheckerTestSuite) TestFixRuleWitness() { op := suite.rc.Check(suite.cluster.GetRegion(1)) suite.NotNil(op) suite.Equal("add-rule-peer", op.Desc()) - fmt.Println(op) suite.Equal(uint64(3), op.Step(0).(operator.AddLearner).ToStore) suite.True(op.Step(0).(operator.AddLearner).IsWitness) } @@ -685,6 +684,132 @@ func (suite *ruleCheckerTestSuite) TestPriorityFixOrphanPeer() { suite.Equal("remove-orphan-peer", op.Desc()) } +func (suite *ruleCheckerTestSuite) TestPriorityFitHealthWithDifferentRole1() { + suite.cluster.SetEnableUseJointConsensus(true) + suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"}) + suite.cluster.AddRegionWithLearner(1, 1, []uint64{2, 3}, []uint64{4}) + r1 := suite.cluster.GetRegion(1) + suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano() + + // set peer3 to pending and down + r1 = r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)})) + r1 = r1.Clone(core.WithDownPeers([]*pdpb.PeerStats{ + { + Peer: r1.GetStorePeer(3), + DownSeconds: 30000, + }, + })) + suite.cluster.PutRegion(r1) + + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Equal(uint64(3), op.Step(0).(operator.ChangePeerV2Enter).DemoteVoters[0].ToStore) + suite.Equal(uint64(4), op.Step(0).(operator.ChangePeerV2Enter).PromoteLearners[0].ToStore) + suite.Equal(uint64(3), op.Step(1).(operator.ChangePeerV2Leave).DemoteVoters[0].ToStore) + suite.Equal(uint64(4), op.Step(1).(operator.ChangePeerV2Leave).PromoteLearners[0].ToStore) + suite.Equal("replace-down-peer-with-orphan-peer", op.Desc()) + + // set peer3 only pending + r1 = r1.Clone(core.WithDownPeers(nil)) + suite.cluster.PutRegion(r1) + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) +} + +func (suite *ruleCheckerTestSuite) TestPriorityFitHealthWithDifferentRole2() { + suite.cluster.SetEnableUseJointConsensus(true) + suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"}) + suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"}) + suite.cluster.AddLeaderRegion(1, 1, 2, 3, 4, 5) + r1 := suite.cluster.GetRegion(1) + + // set peer3 to pending and down, and peer 3 to learner, and store 3 is down + suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano() + r1 = r1.Clone(core.WithLearners([]*metapb.Peer{r1.GetPeer(3)})) + r1 = r1.Clone( + core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)}), + core.WithDownPeers([]*pdpb.PeerStats{ + { + Peer: r1.GetStorePeer(3), + DownSeconds: 30000, + }, + }), + ) + suite.cluster.PutRegion(r1) + + // default and test group => 3 voter + 1 learner + err := suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "test", + ID: "10", + Role: placement.Learner, + Count: 1, + }) + suite.NoError(err) + + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Equal(uint64(5), op.Step(0).(operator.ChangePeerV2Enter).DemoteVoters[0].ToStore) + suite.Equal(uint64(3), op.Step(1).(operator.RemovePeer).FromStore) + suite.Equal("replace-down-peer-with-orphan-peer", op.Desc()) +} + +func (suite *ruleCheckerTestSuite) TestPriorityFitHealthPeersAndTiFlash() { + suite.cluster.SetEnableUseJointConsensus(true) + suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4", "engine": "tiflash"}) + suite.cluster.AddRegionWithLearner(1, 1, []uint64{2, 3}, []uint64{4}) + rule := &placement.Rule{ + GroupID: "pd", + ID: "test", + Role: placement.Voter, + Count: 3, + } + rule2 := &placement.Rule{ + GroupID: "pd", + ID: "test2", + Role: placement.Learner, + Count: 1, + LabelConstraints: []placement.LabelConstraint{ + { + Key: "engine", + Op: placement.In, + Values: []string{"tiflash"}, + }, + }, + } + suite.ruleManager.SetRule(rule) + suite.ruleManager.SetRule(rule2) + suite.ruleManager.DeleteRule("pd", "default") + + r1 := suite.cluster.GetRegion(1) + // set peer3 to pending and down + r1 = r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)})) + r1 = r1.Clone(core.WithDownPeers([]*pdpb.PeerStats{ + { + Peer: r1.GetStorePeer(3), + DownSeconds: 30000, + }, + })) + suite.cluster.PutRegion(r1) + suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano() + + op := suite.rc.Check(suite.cluster.GetRegion(1)) + // should not promote tiflash peer + suite.Nil(op) + + // scale a node, can replace the down peer + suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"}) + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("replace-rule-down-peer", op.Desc()) +} + func (suite *ruleCheckerTestSuite) TestIssue3293() { suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host1"}) diff --git a/server/schedule/operator/builder.go b/server/schedule/operator/builder.go index 4dfb98324e5..93c4048e79d 100644 --- a/server/schedule/operator/builder.go +++ b/server/schedule/operator/builder.go @@ -403,7 +403,6 @@ func (b *Builder) Build(kind OpKind) (*Operator, error) { if brief, b.err = b.prepareBuild(); b.err != nil { return nil, b.err } - if b.useJointConsensus { kind, b.err = b.buildStepsWithJointConsensus(kind) } else { @@ -549,6 +548,10 @@ func (b *Builder) brief() string { return fmt.Sprintf("%s: store %s to %s", op, b.toRemove, b.toAdd) case len(b.toAdd) > 0: return fmt.Sprintf("add peer: store %s", b.toAdd) + case len(b.toRemove) > 0 && len(b.toPromote) > 0: + return fmt.Sprintf("promote peer: store %s, rm peer: store %s", b.toRemove, b.toPromote) + case len(b.toRemove) > 0 && len(b.toDemote) > 0: + return fmt.Sprintf("demote peer: store %s, rm peer: store %s", b.toDemote, b.toRemove) case len(b.toRemove) > 0: return fmt.Sprintf("rm peer: store %s", b.toRemove) case len(b.toPromote) > 0: diff --git a/server/schedule/operator/create_operator.go b/server/schedule/operator/create_operator.go index 206d839ab28..71c37f99c24 100644 --- a/server/schedule/operator/create_operator.go +++ b/server/schedule/operator/create_operator.go @@ -50,6 +50,25 @@ func CreatePromoteLearnerOperator(desc string, ci ClusterInformer, region *core. Build(0) } +// CreatePromoteLearnerOperatorAndRemovePeer creates an operator that promotes a learner and removes a peer. +func CreatePromoteLearnerOperatorAndRemovePeer(desc string, ci ClusterInformer, region *core.RegionInfo, toPromote *metapb.Peer, toRemove *metapb.Peer) (*Operator, error) { + return NewBuilder(desc, ci, region). + PromoteLearner(toPromote.GetStoreId()). + RemovePeer(toRemove.GetStoreId()). + Build(0) +} + +// CreateDemoteLearnerOperatorAndRemovePeer creates an operator that demotes a learner and removes a peer. +func CreateDemoteLearnerOperatorAndRemovePeer(desc string, ci ClusterInformer, region *core.RegionInfo, toDemote *metapb.Peer, toRemove *metapb.Peer) (*Operator, error) { + if !ci.GetOpts().IsUseJointConsensus() { + return nil, errors.Errorf("cannot build demote learner operator due to disabling using joint state") + } + return NewBuilder(desc, ci, region). + DemoteVoter(toDemote.GetStoreId()). + RemovePeer(toRemove.GetStoreId()). + Build(0) +} + // CreateRemovePeerOperator creates an operator that removes a peer from region. func CreateRemovePeerOperator(desc string, ci ClusterInformer, kind OpKind, region *core.RegionInfo, storeID uint64) (*Operator, error) { return NewBuilder(desc, ci, region). @@ -238,7 +257,7 @@ func CreateLeaveJointStateOperator(desc string, ci ClusterInformer, origin *core b := NewBuilder(desc, ci, origin, SkipOriginJointStateCheck, SkipPlacementRulesCheck) if b.err == nil && !core.IsInJointState(origin.GetPeers()...) { - b.err = errors.Errorf("cannot build leave joint state operator for region which is not in joint state") + b.err = errors.Errorf("cannot build leave joint state operator due to disabling using joint state") } if b.err != nil { diff --git a/server/schedule/operator/operator.go b/server/schedule/operator/operator.go index 3fae9d86eea..b38f4555d6c 100644 --- a/server/schedule/operator/operator.go +++ b/server/schedule/operator/operator.go @@ -87,7 +87,7 @@ func (o *Operator) Sync(other *Operator) { func (o *Operator) String() string { stepStrs := make([]string, len(o.steps)) for i := range o.steps { - stepStrs[i] = o.steps[i].String() + stepStrs[i] = fmt.Sprintf("%d:{%s}", i, o.steps[i].String()) } s := fmt.Sprintf("%s {%s} (kind:%s, region:%v(%v, %v), createAt:%s, startAt:%s, currentStep:%v, size:%d, steps:[%s],timeout:[%s])", o.desc, o.brief, o.kind, o.regionID, o.regionEpoch.GetVersion(), o.regionEpoch.GetConfVer(), o.GetCreateTime(), diff --git a/server/schedule/placement/fit.go b/server/schedule/placement/fit.go index 82af3c17d11..454715cdc8e 100644 --- a/server/schedule/placement/fit.go +++ b/server/schedule/placement/fit.go @@ -56,6 +56,9 @@ func (f *RegionFit) IsCached() bool { // Replace return true if the replacement store is fit all constraints and isolation score is not less than the origin. func (f *RegionFit) Replace(srcStoreID uint64, dstStore *core.StoreInfo) bool { + if dstStore == nil { + return false + } fit := f.getRuleFitByStoreID(srcStoreID) // check the target store is fit all constraints. if fit == nil {