From 5c8d3f3b6fd0794d60c14be27dd989518d27d227 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 10 Sep 2024 16:37:14 +0800 Subject: [PATCH] *: fix sync isolation level to default placement rule (#7122) (#7125) close tikv/pd#7121 Signed-off-by: ti-chi-bot Signed-off-by: Ryan Leung Co-authored-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mock/mockcluster/mockcluster.go | 2 +- server/api/operator_test.go | 4 +- server/cluster/cluster.go | 2 +- server/cluster/cluster_test.go | 10 ++--- server/config/persist_options.go | 7 ++++ server/schedule/placement/rule_manager.go | 3 +- .../schedule/placement/rule_manager_test.go | 6 +-- server/server.go | 11 ++--- server/statistics/region_collection_test.go | 2 +- tests/pdctl/config/config_test.go | 40 +++++++++++++++++-- 10 files changed, 65 insertions(+), 22 deletions(-) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index a285e4a8cbf..b2f5bbae66b 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -188,7 +188,7 @@ func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { func (mc *Cluster) initRuleManager() { if mc.RuleManager == nil { mc.RuleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), mc, mc.GetOpts()) - mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels) + mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels, mc.GetReplicationConfig().IsolationLevel) } } diff --git a/server/api/operator_test.go b/server/api/operator_test.go index 86d99c5e726..9f5d3167476 100644 --- a/server/api/operator_test.go +++ b/server/api/operator_test.go @@ -364,7 +364,9 @@ func (s *testTransferRegionOperatorSuite) TestTransferRegionWithPlacementRule(c if tc.placementRuleEnable { err := s.svr.GetRaftCluster().GetRuleManager().Initialize( s.svr.GetRaftCluster().GetOpts().GetMaxReplicas(), - s.svr.GetRaftCluster().GetOpts().GetLocationLabels()) + s.svr.GetRaftCluster().GetOpts().GetLocationLabels(), + s.svr.GetRaftCluster().GetOpts().GetIsolationLevel(), + ) c.Assert(err, IsNil) } if len(tc.rules) > 0 { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index a97d4408536..082d5851a22 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -253,7 +253,7 @@ func (c *RaftCluster) Start(s Server) error { c.ruleManager = placement.NewRuleManager(c.storage, c, c.GetOpts()) if c.opt.IsPlacementRulesEnabled() { - err = c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels()) + err = c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel()) if err != nil { return err } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index cf13b32e46f..728ff399972 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -222,7 +222,7 @@ func (s *testClusterInfoSuite) TestSetOfflineStore(c *C) { cluster.coordinator = newCoordinator(s.ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } @@ -393,7 +393,7 @@ func (s *testClusterInfoSuite) TestUpStore(c *C) { cluster.coordinator = newCoordinator(s.ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } @@ -488,7 +488,7 @@ func (s *testClusterInfoSuite) TestDeleteStoreUpdatesClusterVersion(c *C) { cluster.coordinator = newCoordinator(s.ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } @@ -1118,7 +1118,7 @@ func (s *testClusterInfoSuite) TestOfflineAndMerge(c *C) { cluster.coordinator = newCoordinator(s.ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } @@ -1705,7 +1705,7 @@ func newTestRaftCluster( rc.InitCluster(id, opt, s, basicCluster) rc.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), rc, opt) if opt.IsPlacementRulesEnabled() { - err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } diff --git a/server/config/persist_options.go b/server/config/persist_options.go index fe7203722c2..fc7835e6ae1 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -260,6 +260,13 @@ func (o *PersistOptions) SetSplitMergeInterval(splitMergeInterval time.Duration) o.SetScheduleConfig(v) } +// SetMaxStoreDownTime to set the max store down time. It's only used to test. +func (o *PersistOptions) SetMaxStoreDownTime(time time.Duration) { + v := o.GetScheduleConfig().Clone() + v.MaxStoreDownTime = typeutil.NewDuration(time) + o.SetScheduleConfig(v) +} + // SetMaxMergeRegionSize sets the max merge region size. func (o *PersistOptions) SetMaxMergeRegionSize(maxMergeRegionSize uint64) { v := o.GetScheduleConfig().Clone() diff --git a/server/schedule/placement/rule_manager.go b/server/schedule/placement/rule_manager.go index 04b2d96d9f1..ac5585fdbe0 100644 --- a/server/schedule/placement/rule_manager.go +++ b/server/schedule/placement/rule_manager.go @@ -63,7 +63,7 @@ func NewRuleManager(storage endpoint.RuleStorage, storeSetInformer core.StoreSet // Initialize loads rules from storage. If Placement Rules feature is never enabled, it creates default rule that is // compatible with previous configuration. -func (m *RuleManager) Initialize(maxReplica int, locationLabels []string) error { +func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolationLevel string) error { m.Lock() defer m.Unlock() if m.initialized { @@ -84,6 +84,7 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string) error Role: Voter, Count: maxReplica, LocationLabels: locationLabels, + IsolationLevel: isolationLevel, } if err := m.storage.SaveRule(defaultRule.StoreKey(), defaultRule); err != nil { return err diff --git a/server/schedule/placement/rule_manager_test.go b/server/schedule/placement/rule_manager_test.go index ae750fe5f9b..7e19942e488 100644 --- a/server/schedule/placement/rule_manager_test.go +++ b/server/schedule/placement/rule_manager_test.go @@ -36,7 +36,7 @@ func (s *testManagerSuite) SetUpTest(c *C) { s.store = storage.NewStorageWithMemoryBackend() var err error s.manager = NewRuleManager(s.store, nil, nil) - err = s.manager.Initialize(3, []string{"zone", "rack", "host"}) + err = s.manager.Initialize(3, []string{"zone", "rack", "host"}, "") c.Assert(err, IsNil) } @@ -113,7 +113,7 @@ func (s *testManagerSuite) TestSaveLoad(c *C) { } m2 := NewRuleManager(s.store, nil, nil) - err := m2.Initialize(3, []string{"no", "labels"}) + err := m2.Initialize(3, []string{"no", "labels"}, "") c.Assert(err, IsNil) c.Assert(m2.GetAllRules(), HasLen, 3) c.Assert(m2.GetRule("pd", "default").String(), Equals, rules[0].String()) @@ -128,7 +128,7 @@ func (s *testManagerSuite) TestSetAfterGet(c *C) { s.manager.SetRule(rule) m2 := NewRuleManager(s.store, nil, nil) - err := m2.Initialize(100, []string{}) + err := m2.Initialize(100, []string{}, "") c.Assert(err, IsNil) rule = m2.GetRule("pd", "default") c.Assert(rule.Count, Equals, 1) diff --git a/server/server.go b/server/server.go index 3463b3c6e4a..43bdc458494 100644 --- a/server/server.go +++ b/server/server.go @@ -897,7 +897,7 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { } if cfg.EnablePlacementRules { // initialize rule manager. - if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil { + if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels, cfg.IsolationLevel); err != nil { return err } } else { @@ -920,19 +920,19 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { defaultRule := rc.GetRuleManager().GetRule("pd", "default") CheckInDefaultRule := func() error { - // replication config won't work when placement rule is enabled and exceeds one default rule + // replication config won't work when placement rule is enabled and exceeds one default rule if !(defaultRule != nil && len(defaultRule.StartKey) == 0 && len(defaultRule.EndKey) == 0) { - return errors.New("cannot update MaxReplicas or LocationLabels when placement rules feature is enabled and not only default rule exists, please update rule instead") + return errors.New("cannot update MaxReplicas, LocationLabels or IsolationLevel when placement rules feature is enabled and not only default rule exists, please update rule instead") } - if !(defaultRule.Count == int(old.MaxReplicas) && typeutil.StringsEqual(defaultRule.LocationLabels, []string(old.LocationLabels))) { + if !(defaultRule.Count == int(old.MaxReplicas) && typeutil.StringsEqual(defaultRule.LocationLabels, []string(old.LocationLabels)) && defaultRule.IsolationLevel == old.IsolationLevel) { return errors.New("cannot to update replication config, the default rules do not consistent with replication config, please update rule instead") } return nil } - if !(cfg.MaxReplicas == old.MaxReplicas && typeutil.StringsEqual(cfg.LocationLabels, old.LocationLabels)) { + if !(cfg.MaxReplicas == old.MaxReplicas && typeutil.StringsEqual(cfg.LocationLabels, old.LocationLabels) && cfg.IsolationLevel == old.IsolationLevel) { if err := CheckInDefaultRule(); err != nil { return err } @@ -943,6 +943,7 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { if rule != nil { rule.Count = int(cfg.MaxReplicas) rule.LocationLabels = cfg.LocationLabels + rule.IsolationLevel = cfg.IsolationLevel rc := s.GetRaftCluster() if rc == nil { return errs.ErrNotBootstrapped.GenWithStackByArgs() diff --git a/server/statistics/region_collection_test.go b/server/statistics/region_collection_test.go index eb100e958fd..f9d1193d700 100644 --- a/server/statistics/region_collection_test.go +++ b/server/statistics/region_collection_test.go @@ -42,7 +42,7 @@ func (t *testRegionStatisticsSuite) SetUpTest(c *C) { t.store = storage.NewStorageWithMemoryBackend() var err error t.manager = placement.NewRuleManager(t.store, nil, nil) - err = t.manager.Initialize(3, []string{"zone", "rack", "host"}) + err = t.manager.Initialize(3, []string{"zone", "rack", "host"}, "") c.Assert(err, IsNil) } diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 297cc538606..b15bfd5e7d9 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -637,7 +637,7 @@ func (s *configTestSuite) TestUpdateDefaultReplicaConfig(c *C) { c.Assert(replicationCfg.MaxReplicas, Equals, expect) } - checkLocaltionLabels := func(expect int) { + checkLocationLabels := func(expect int) { args := []string{"-u", pdAddr, "config", "show", "replication"} output, err := pdctl.ExecuteCommand(cmd, args...) c.Assert(err, IsNil) @@ -646,6 +646,15 @@ func (s *configTestSuite) TestUpdateDefaultReplicaConfig(c *C) { c.Assert(replicationCfg.LocationLabels, HasLen, expect) } + checkIsolationLevel := func(expect string) { + args := []string{"-u", pdAddr, "config", "show", "replication"} + output, err := pdctl.ExecuteCommand(cmd, args...) + c.Assert(err, IsNil) + replicationCfg := config.ReplicationConfig{} + c.Assert(json.Unmarshal(output, &replicationCfg), IsNil) + c.Assert(replicationCfg.IsolationLevel, Equals, expect) + } + checkRuleCount := func(expect int) { args := []string{"-u", pdAddr, "config", "placement-rules", "show", "--group", "pd", "--id", "default"} output, err := pdctl.ExecuteCommand(cmd, args...) @@ -664,6 +673,15 @@ func (s *configTestSuite) TestUpdateDefaultReplicaConfig(c *C) { c.Assert(rule.LocationLabels, HasLen, expect) } + checkRuleIsolationLevel := func(expect string) { + args := []string{"-u", pdAddr, "config", "placement-rules", "show", "--group", "pd", "--id", "default"} + output, err := pdctl.ExecuteCommand(cmd, args...) + c.Assert(err, IsNil) + rule := placement.Rule{} + c.Assert(json.Unmarshal(output, &rule), IsNil) + c.Assert(rule.IsolationLevel, Equals, expect) + } + // update successfully when placement rules is not enabled. output, err := pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "max-replicas", "2") c.Assert(err, IsNil) @@ -672,8 +690,13 @@ func (s *configTestSuite) TestUpdateDefaultReplicaConfig(c *C) { output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "location-labels", "zone,host") c.Assert(err, IsNil) c.Assert(strings.Contains(string(output), "Success!"), IsTrue) - checkLocaltionLabels(2) + output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "isolation-level", "zone") + c.Assert(err, IsNil) + c.Assert(strings.Contains(string(output), "Success!"), IsTrue) + checkLocationLabels(2) checkRuleLocationLabels(2) + checkIsolationLevel("zone") + checkRuleIsolationLevel("zone") // update successfully when only one default rule exists. output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "placement-rules", "enable") @@ -686,11 +709,18 @@ func (s *configTestSuite) TestUpdateDefaultReplicaConfig(c *C) { checkMaxReplicas(3) checkRuleCount(3) + // We need to change isolation first because we will validate + // if the location label contains the isolation level when setting location labels. + output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "isolation-level", "host") + c.Assert(err, IsNil) + c.Assert(strings.Contains(string(output), "Success!"), IsTrue) output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "location-labels", "host") c.Assert(err, IsNil) c.Assert(strings.Contains(string(output), "Success!"), IsTrue) - checkLocaltionLabels(1) + checkLocationLabels(1) checkRuleLocationLabels(1) + checkIsolationLevel("host") + checkRuleIsolationLevel("host") // update unsuccessfully when many rule exists. f, _ := os.CreateTemp("/tmp", "pd_tests") @@ -720,8 +750,10 @@ func (s *configTestSuite) TestUpdateDefaultReplicaConfig(c *C) { c.Assert(err, IsNil) checkMaxReplicas(4) checkRuleCount(4) - checkLocaltionLabels(1) + checkLocationLabels(1) checkRuleLocationLabels(1) + checkIsolationLevel("host") + checkRuleIsolationLevel("host") } func (s *configTestSuite) TestPDServerConfig(c *C) {