diff --git a/pkg/mcs/scheduling/server/config/watcher.go b/pkg/mcs/scheduling/server/config/watcher.go index b413e243c2b..6ad37045000 100644 --- a/pkg/mcs/scheduling/server/config/watcher.go +++ b/pkg/mcs/scheduling/server/config/watcher.go @@ -119,6 +119,7 @@ func (cw *Watcher) initializeConfigWatcher() error { zap.String("event-kv-key", string(kv.Key)), zap.Error(err)) return err } + log.Info("update scheduling config", zap.Reflect("new", cfg)) cw.AdjustScheduleCfg(&cfg.Schedule) cw.SetClusterVersion(&cfg.ClusterVersion) cw.SetScheduleConfig(&cfg.Schedule) @@ -146,6 +147,7 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error { prefixToTrim := cw.schedulerConfigPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { name := strings.TrimPrefix(string(kv.Key), prefixToTrim) + log.Info("update scheduler config", zap.String("name", string(kv.Value))) err := cw.storage.SaveSchedulerConfig(name, kv.Value) if err != nil { log.Warn("failed to save scheduler config", @@ -161,6 +163,7 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error { return nil } deleteFn := func(kv *mvccpb.KeyValue) error { + log.Info("remove scheduler config", zap.String("key", string(kv.Key))) return cw.storage.RemoveSchedulerConfig( strings.TrimPrefix(string(kv.Key), prefixToTrim), ) diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index cf0e1cd8ba1..dc5735eb540 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -19,10 +19,12 @@ import ( "strings" "sync" + "github.com/pingcap/log" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" + "go.uber.org/zap" ) // ruleStorage is an in-memory storage for Placement Rules, @@ -163,12 +165,14 @@ func (rw *Watcher) initializeRuleWatcher() error { putFn := func(kv *mvccpb.KeyValue) error { // Since the PD API server will validate the rule before saving it to etcd, // so we could directly save the string rule in JSON to the storage here. + log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) return rw.ruleStore.SaveRule( strings.TrimPrefix(string(kv.Key), prefixToTrim), string(kv.Value), ) } deleteFn := func(kv *mvccpb.KeyValue) error { + log.Info("delete placement rule", zap.String("key", string(kv.Key))) return rw.ruleStore.DeleteRule(strings.TrimPrefix(string(kv.Key), prefixToTrim)) } postEventFn := func() error { @@ -188,12 +192,14 @@ func (rw *Watcher) initializeRuleWatcher() error { func (rw *Watcher) initializeGroupWatcher() error { prefixToTrim := rw.ruleGroupPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { + log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) return rw.ruleStore.SaveRuleGroup( strings.TrimPrefix(string(kv.Key), prefixToTrim), string(kv.Value), ) } deleteFn := func(kv *mvccpb.KeyValue) error { + log.Info("delete placement rule group", zap.String("key", string(kv.Key))) return rw.ruleStore.DeleteRuleGroup(strings.TrimPrefix(string(kv.Key), prefixToTrim)) } postEventFn := func() error { @@ -213,12 +219,14 @@ func (rw *Watcher) initializeGroupWatcher() error { func (rw *Watcher) initializeRegionLabelWatcher() error { prefixToTrim := rw.regionLabelPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { + log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) return rw.ruleStore.SaveRegionRule( strings.TrimPrefix(string(kv.Key), prefixToTrim), string(kv.Value), ) } deleteFn := func(kv *mvccpb.KeyValue) error { + log.Info("delete region label rule", zap.String("key", string(kv.Key))) return rw.ruleStore.DeleteRegionRule(strings.TrimPrefix(string(kv.Key), prefixToTrim)) } postEventFn := func() error { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 30ab50f0c9f..33ed5711ac5 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -427,15 +427,15 @@ func (c *RaftCluster) runStoreConfigSync() { defer c.wg.Done() var ( - synced, switchRaftV2Config bool - stores = c.GetStores() + synced, switchRaftV2Config, needPersist bool + stores = c.GetStores() ) // Start the ticker with a second-level timer to accelerate // the bootstrap stage. ticker := time.NewTicker(time.Minute) defer ticker.Stop() for { - synced, switchRaftV2Config = c.syncStoreConfig(stores) + synced, switchRaftV2Config, needPersist = c.syncStoreConfig(stores) if switchRaftV2Config { if err := c.opt.SwitchRaftV2(c.GetStorage()); err != nil { log.Warn("store config persisted failed", zap.Error(err)) @@ -444,8 +444,11 @@ func (c *RaftCluster) runStoreConfigSync() { // Update the stores if the synchronization is not completed. if !synced { stores = c.GetStores() - } else if err := c.opt.Persist(c.storage); err != nil { - log.Warn("store config persisted failed", zap.Error(err)) + } + if needPersist { + if err := c.opt.Persist(c.storage); err != nil { + log.Warn("store config persisted failed", zap.Error(err)) + } } select { case <-c.ctx.Done(): @@ -459,7 +462,8 @@ func (c *RaftCluster) runStoreConfigSync() { // syncStoreConfig syncs the store config from TiKV. // - `synced` is true if sync config from one tikv. // - `switchRaftV2` is true if the config of tikv engine is change to raft-kv2. -func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, switchRaftV2 bool) { +func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, switchRaftV2 bool, needPersist bool) { + var err error for index := 0; index < len(stores); index++ { select { case <-c.ctx.Done(): @@ -479,7 +483,7 @@ func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, sw } // it will try next store if the current store is failed. address := netutil.ResolveLoopBackAddr(stores[index].GetStatusAddress(), stores[index].GetAddress()) - switchRaftV2, err := c.observeStoreConfig(c.ctx, address) + switchRaftV2, needPersist, err = c.observeStoreConfig(c.ctx, address) if err != nil { // delete the store if it is failed and retry next store. stores = append(stores[:index], stores[index+1:]...) @@ -492,34 +496,35 @@ func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, sw } storeSyncConfigEvent.WithLabelValues(address, "succ").Inc() - return true, switchRaftV2 + return true, switchRaftV2, needPersist } - return false, false + return false, false, needPersist } // observeStoreConfig is used to observe the store config changes and // return whether if the new config changes the engine to raft-kv2. -func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (bool, error) { +func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (switchRaftV2 bool, needPersist bool, err error) { cfg, err := c.fetchStoreConfigFromTiKV(ctx, address) if err != nil { - return false, err + return false, false, err } oldCfg := c.opt.GetStoreConfig() if cfg == nil || oldCfg.Equal(cfg) { - return false, nil + return false, false, nil } log.Info("sync the store config successful", zap.String("store-address", address), zap.String("store-config", cfg.String()), zap.String("old-config", oldCfg.String())) - return c.updateStoreConfig(oldCfg, cfg) + return c.updateStoreConfig(oldCfg, cfg), true, nil } // updateStoreConfig updates the store config. This is extracted for testing. -func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *sc.StoreConfig) (bool, error) { +func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *sc.StoreConfig) (switchRaftV2 bool) { cfg.Adjust() c.opt.SetStoreConfig(cfg) - return oldCfg.Storage.Engine != sc.RaftstoreV2 && cfg.Storage.Engine == sc.RaftstoreV2, nil + switchRaftV2 = oldCfg.Storage.Engine != sc.RaftstoreV2 && cfg.Storage.Engine == sc.RaftstoreV2 + return } // fetchStoreConfigFromTiKV tries to fetch the config from the TiKV store URL. diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 33236c5d40c..31f6bb357c3 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1428,8 +1428,10 @@ func TestSyncConfigContext(t *testing.T) { // trip schema header now := time.Now() stores[0].GetMeta().StatusAddress = server.URL[7:] - synced, _ := tc.syncStoreConfig(tc.GetStores()) + synced, switchRaftV2, needPersist := tc.syncStoreConfig(tc.GetStores()) re.False(synced) + re.False(switchRaftV2) + re.False(needPersist) re.Less(time.Since(now), clientTimeout*2) } @@ -1450,15 +1452,17 @@ func TestStoreConfigSync(t *testing.T) { re.Equal(uint64(144), tc.GetStoreConfig().GetRegionMaxSize()) re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/mockFetchStoreConfigFromTiKV", `return("10MiB")`)) // switchRaftV2 will be true. - synced, switchRaftV2 := tc.syncStoreConfig(tc.GetStores()) + synced, switchRaftV2, needPersist := tc.syncStoreConfig(tc.GetStores()) re.True(synced) re.True(switchRaftV2) + re.True(needPersist) re.EqualValues(512, tc.opt.GetMaxMovableHotPeerSize()) re.Equal(uint64(10), tc.GetStoreConfig().GetRegionMaxSize()) // switchRaftV2 will be false this time. - synced, switchRaftV2 = tc.syncStoreConfig(tc.GetStores()) + synced, switchRaftV2, needPersist = tc.syncStoreConfig(tc.GetStores()) re.True(synced) re.False(switchRaftV2) + re.False(needPersist) re.Equal(uint64(10), tc.GetStoreConfig().GetRegionMaxSize()) re.NoError(opt.Persist(tc.GetStorage())) re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/mockFetchStoreConfigFromTiKV"))