diff --git a/pkg/core/store_option.go b/pkg/core/store_option.go index a1e15fd056d..8a2aa1ef089 100644 --- a/pkg/core/store_option.go +++ b/pkg/core/store_option.go @@ -74,33 +74,26 @@ func SetStoreDeployPath(deployPath string) StoreCreateOption { } } -// OfflineStore offline a store -func OfflineStore(physicallyDestroyed bool) StoreCreateOption { +// SetStoreState sets the state for the store. +func SetStoreState(state metapb.StoreState, physicallyDestroyed ...bool) StoreCreateOption { return func(store *StoreInfo) { meta := typeutil.DeepClone(store.meta, StoreFactory) - meta.State = metapb.StoreState_Offline - meta.NodeState = metapb.NodeState_Removing - meta.PhysicallyDestroyed = physicallyDestroyed - store.meta = meta - } -} - -// UpStore up a store -func UpStore() StoreCreateOption { - return func(store *StoreInfo) { - meta := typeutil.DeepClone(store.meta, StoreFactory) - meta.State = metapb.StoreState_Up - meta.NodeState = metapb.NodeState_Serving - store.meta = meta - } -} - -// TombstoneStore set a store to tombstone. -func TombstoneStore() StoreCreateOption { - return func(store *StoreInfo) { - meta := typeutil.DeepClone(store.meta, StoreFactory) - meta.State = metapb.StoreState_Tombstone - meta.NodeState = metapb.NodeState_Removed + switch state { + case metapb.StoreState_Up: + meta.State = metapb.StoreState_Up + meta.NodeState = metapb.NodeState_Serving + case metapb.StoreState_Offline: + if len(physicallyDestroyed) != 0 { + meta.State = metapb.StoreState_Offline + meta.NodeState = metapb.NodeState_Removing + meta.PhysicallyDestroyed = physicallyDestroyed[0] + } else { + panic("physicallyDestroyed should be set when set store state to offline") + } + case metapb.StoreState_Tombstone: + meta.State = metapb.StoreState_Tombstone + meta.NodeState = metapb.NodeState_Removed + } store.meta = meta } } diff --git a/pkg/core/store_test.go b/pkg/core/store_test.go index be0fd0f9418..67618a63ea9 100644 --- a/pkg/core/store_test.go +++ b/pkg/core/store_test.go @@ -84,7 +84,7 @@ func TestCloneStore(t *testing.T) { break } store.Clone( - UpStore(), + SetStoreState(metapb.StoreState_Up), SetLastHeartbeatTS(time.Now()), ) } diff --git a/pkg/mcs/scheduling/server/meta/watcher.go b/pkg/mcs/scheduling/server/meta/watcher.go new file mode 100644 index 00000000000..1e58bbd845f --- /dev/null +++ b/pkg/mcs/scheduling/server/meta/watcher.go @@ -0,0 +1,117 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package meta + +import ( + "context" + "sync" + + "github.com/gogo/protobuf/proto" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/etcdutil" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/mvcc/mvccpb" + "go.uber.org/zap" +) + +// Watcher is used to watch the PD API server for any meta changes. +type Watcher struct { + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + clusterID uint64 + // storePathPrefix is the path of the store in etcd: + // - Key: /pd/{cluster_id}/raft/s/ + // - Value: meta store proto. + storePathPrefix string + + etcdClient *clientv3.Client + basicCluster *core.BasicCluster + storeWatcher *etcdutil.LoopWatcher +} + +// NewWatcher creates a new watcher to watch the meta change from PD API server. +func NewWatcher( + ctx context.Context, + etcdClient *clientv3.Client, + clusterID uint64, + basicCluster *core.BasicCluster, +) (*Watcher, error) { + ctx, cancel := context.WithCancel(ctx) + w := &Watcher{ + ctx: ctx, + cancel: cancel, + clusterID: clusterID, + storePathPrefix: endpoint.StorePathPrefix(clusterID), + etcdClient: etcdClient, + basicCluster: basicCluster, + } + err := w.initializeStoreWatcher() + if err != nil { + return nil, err + } + return w, nil +} + +func (w *Watcher) initializeStoreWatcher() error { + putFn := func(kv *mvccpb.KeyValue) error { + store := &metapb.Store{} + if err := proto.Unmarshal(kv.Value, store); err != nil { + log.Warn("failed to unmarshal store entry", + zap.String("event-kv-key", string(kv.Key)), zap.Error(err)) + return err + } + origin := w.basicCluster.GetStore(store.GetId()) + if origin == nil { + w.basicCluster.PutStore(core.NewStoreInfo(store)) + return nil + } + w.basicCluster.PutStore(origin.Clone(core.SetStoreState(store.GetState(), store.GetPhysicallyDestroyed()))) + return nil + } + deleteFn := func(kv *mvccpb.KeyValue) error { + key := string(kv.Key) + storeID, err := endpoint.ExtractStoreIDFromPath(w.clusterID, key) + if err != nil { + return err + } + origin := w.basicCluster.GetStore(storeID) + if origin != nil { + w.basicCluster.DeleteStore(origin) + } + return nil + } + postEventFn := func() error { + return nil + } + w.storeWatcher = etcdutil.NewLoopWatcher( + w.ctx, &w.wg, + w.etcdClient, + "scheduling-store-watcher", w.storePathPrefix, + putFn, deleteFn, postEventFn, + clientv3.WithPrefix(), + ) + w.storeWatcher.StartWatchLoop() + return w.storeWatcher.WaitLoad() +} + +// Close closes the watcher. +func (w *Watcher) Close() { + w.cancel() + w.wg.Wait() +} diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index d488e4fe4e3..8cb477f7a24 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -39,6 +39,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/discovery" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" + "github.com/tikv/pd/pkg/mcs/scheduling/server/meta" "github.com/tikv/pd/pkg/mcs/scheduling/server/rule" "github.com/tikv/pd/pkg/mcs/server" "github.com/tikv/pd/pkg/mcs/utils" @@ -77,6 +78,7 @@ type Server struct { cfg *config.Config clusterID uint64 persistConfig *config.PersistConfig + basicCluster *core.BasicCluster // for the primary election of scheduling participant *member.Participant @@ -98,6 +100,7 @@ type Server struct { // for watching the PD API server meta info updates that are related to the scheduling. configWatcher *config.Watcher ruleWatcher *rule.Watcher + metaWatcher *meta.Watcher } // Name returns the unique name for this server in the scheduling cluster. @@ -279,6 +282,7 @@ func (s *Server) Close() { s.GetCoordinator().Stop() s.ruleWatcher.Close() s.configWatcher.Close() + s.metaWatcher.Close() s.serverLoopCancel() s.serverLoopWg.Wait() @@ -319,6 +323,11 @@ func (s *Server) GetCluster() *Cluster { return s.cluster } +// GetBasicCluster returns the basic cluster. +func (s *Server) GetBasicCluster() *core.BasicCluster { + return s.basicCluster +} + // GetCoordinator returns the coordinator. func (s *Server) GetCoordinator() *schedule.Coordinator { return s.GetCluster().GetCoordinator() @@ -365,15 +374,15 @@ func (s *Server) startServer() (err error) { s.participant = member.NewParticipant(s.GetClient()) s.participant.InitInfo(uniqueName, uniqueID, path.Join(schedulingPrimaryPrefix, fmt.Sprintf("%05d", 0)), utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr) + s.basicCluster = core.NewBasicCluster() err = s.startWatcher() if err != nil { return err } s.storage = endpoint.NewStorageEndpoint( kv.NewEtcdKVBase(s.GetClient(), endpoint.PDRootPath(s.clusterID)), nil) - basicCluster := core.NewBasicCluster() - s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, basicCluster) - s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh) + s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster) + s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh) if err != nil { return err } @@ -414,15 +423,15 @@ func (s *Server) startServer() (err error) { } func (s *Server) startWatcher() (err error) { - s.configWatcher, err = config.NewWatcher( - s.Context(), s.GetClient(), s.clusterID, s.persistConfig, - ) + s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.basicCluster) + if err != nil { + return err + } + s.configWatcher, err = config.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.persistConfig) if err != nil { return err } - s.ruleWatcher, err = rule.NewWatcher( - s.Context(), s.GetClient(), s.clusterID, - ) + s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.clusterID) return err } diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index ce392d26a39..cf25bb89c3c 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -231,7 +231,7 @@ func (mc *Cluster) GetRegionLabeler() *labeler.RegionLabeler { func (mc *Cluster) SetStoreUp(storeID uint64) { store := mc.GetStore(storeID) newStore := store.Clone( - core.UpStore(), + core.SetStoreState(metapb.StoreState_Up), core.SetLastHeartbeatTS(time.Now()), ) mc.PutStore(newStore) @@ -241,7 +241,7 @@ func (mc *Cluster) SetStoreUp(storeID uint64) { func (mc *Cluster) SetStoreDisconnect(storeID uint64) { store := mc.GetStore(storeID) newStore := store.Clone( - core.UpStore(), + core.SetStoreState(metapb.StoreState_Up), core.SetLastHeartbeatTS(time.Now().Add(-time.Second*30)), ) mc.PutStore(newStore) @@ -251,7 +251,7 @@ func (mc *Cluster) SetStoreDisconnect(storeID uint64) { func (mc *Cluster) SetStoreDown(storeID uint64) { store := mc.GetStore(storeID) newStore := store.Clone( - core.UpStore(), + core.SetStoreState(metapb.StoreState_Up), core.SetLastHeartbeatTS(typeutil.ZeroTime), ) mc.PutStore(newStore) @@ -260,7 +260,7 @@ func (mc *Cluster) SetStoreDown(storeID uint64) { // SetStoreOffline sets store state to be offline. func (mc *Cluster) SetStoreOffline(storeID uint64) { store := mc.GetStore(storeID) - newStore := store.Clone(core.OfflineStore(false)) + newStore := store.Clone(core.SetStoreState(metapb.StoreState_Offline, false)) mc.PutStore(newStore) } @@ -287,7 +287,7 @@ func (mc *Cluster) BuryStore(storeID uint64, forceBury bool) error { } } - newStore := store.Clone(core.TombstoneStore()) + newStore := store.Clone(core.SetStoreState(metapb.StoreState_Tombstone)) mc.PutStore(newStore) return nil } diff --git a/pkg/statistics/region_collection_test.go b/pkg/statistics/region_collection_test.go index bc15c648598..78cbd840934 100644 --- a/pkg/statistics/region_collection_test.go +++ b/pkg/statistics/region_collection_test.go @@ -59,7 +59,7 @@ func TestRegionStatistics(t *testing.T) { {Peer: peers[1], DownSeconds: 3608}, } - store3 := stores[3].Clone(core.OfflineStore(false)) + store3 := stores[3].Clone(core.SetStoreState(metapb.StoreState_Offline, false)) stores[3] = store3 r1 := &metapb.Region{Id: 1, Peers: peers, StartKey: []byte("aa"), EndKey: []byte("bb")} r2 := &metapb.Region{Id: 2, Peers: peers[0:2], StartKey: []byte("cc"), EndKey: []byte("dd")} @@ -109,7 +109,7 @@ func TestRegionStatistics(t *testing.T) { re.Empty(regionStats.stats[LearnerPeer]) re.Empty(regionStats.stats[OfflinePeer]) - store3 = stores[3].Clone(core.UpStore()) + store3 = stores[3].Clone(core.SetStoreState(metapb.StoreState_Up)) stores[3] = store3 regionStats.Observe(region1, stores) re.Empty(regionStats.stats[OfflinePeer]) diff --git a/pkg/statistics/store_collection_test.go b/pkg/statistics/store_collection_test.go index 74ef12c54a9..054e55a9fda 100644 --- a/pkg/statistics/store_collection_test.go +++ b/pkg/statistics/store_collection_test.go @@ -55,7 +55,7 @@ func TestStoreStatistics(t *testing.T) { stores = append(stores, s) } - store3 := stores[3].Clone(core.OfflineStore(false)) + store3 := stores[3].Clone(core.SetStoreState(metapb.StoreState_Offline, false)) stores[3] = store3 store4 := stores[4].Clone(core.SetLastHeartbeatTS(stores[4].GetLastHeartbeatTS().Add(-time.Hour))) stores[4] = store4 diff --git a/pkg/statistics/store_test.go b/pkg/statistics/store_test.go index 9f5d9a1cc42..a0e7140a882 100644 --- a/pkg/statistics/store_test.go +++ b/pkg/statistics/store_test.go @@ -35,7 +35,7 @@ func TestFilterUnhealtyStore(t *testing.T) { re.Len(stats.GetStoresLoads(), 5) cluster.PutStore(cluster.GetStore(1).Clone(core.SetLastHeartbeatTS(time.Now().Add(-24 * time.Hour)))) - cluster.PutStore(cluster.GetStore(2).Clone(core.TombstoneStore())) + cluster.PutStore(cluster.GetStore(2).Clone(core.SetStoreState(metapb.StoreState_Tombstone))) cluster.DeleteStore(cluster.GetStore(3)) stats.FilterUnhealthyStore(cluster) diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index 872d994a270..85af79203a4 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -121,6 +121,17 @@ func StorePath(storeID uint64) string { return path.Join(clusterPath, "s", fmt.Sprintf("%020d", storeID)) } +// StorePathPrefix returns the store meta info key path prefix. +func StorePathPrefix(clusterID uint64) string { + return path.Join(PDRootPath(clusterID), clusterPath, "s") + "/" +} + +// ExtractStoreIDFromPath extracts the store ID from the given path. +func ExtractStoreIDFromPath(clusterID uint64, path string) (uint64, error) { + idStr := strings.TrimLeft(strings.TrimPrefix(path, StorePathPrefix(clusterID)), "0") + return strconv.ParseUint(idStr, 10, 64) +} + func storeLeaderWeightPath(storeID uint64) string { return path.Join(schedulePath, "store_weight", fmt.Sprintf("%020d", storeID), "leader") } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index dc07b260d87..d48d9df7afd 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -332,13 +332,13 @@ func (c *RaftCluster) Start(s Server) error { } c.initSchedulers() } else { - c.wg.Add(3) + c.wg.Add(2) go c.runCoordinator() go c.runStatsBackgroundJobs() - go c.runMetricsCollectionJob() } - c.wg.Add(7) + c.wg.Add(8) + go c.runMetricsCollectionJob() go c.runNodeStateCheckJob() go c.syncRegions() go c.runReplicationMode() @@ -610,6 +610,7 @@ func (c *RaftCluster) runMetricsCollectionJob() { ticker := time.NewTicker(metricsCollectionJobInterval) failpoint.Inject("highFrequencyClusterJobs", func() { + ticker.Stop() ticker = time.NewTicker(time.Microsecond) }) @@ -634,6 +635,7 @@ func (c *RaftCluster) runNodeStateCheckJob() { ticker := time.NewTicker(nodeStateCheckJobInterval) failpoint.Inject("highFrequencyClusterJobs", func() { + ticker.Stop() ticker = time.NewTicker(2 * time.Second) }) defer ticker.Stop() @@ -1486,27 +1488,22 @@ func (c *RaftCluster) RemoveStore(storeID uint64, physicallyDestroyed bool) erro if store == nil { return errs.ErrStoreNotFound.FastGenByArgs(storeID) } - // Remove an offline store should be OK, nothing to do. if store.IsRemoving() && store.IsPhysicallyDestroyed() == physicallyDestroyed { return nil } - if store.IsRemoved() { return errs.ErrStoreRemoved.FastGenByArgs(storeID) } - if store.IsPhysicallyDestroyed() { return errs.ErrStoreDestroyed.FastGenByArgs(storeID) } - if (store.IsPreparing() || store.IsServing()) && !physicallyDestroyed { if err := c.checkReplicaBeforeOfflineStore(storeID); err != nil { return err } } - - newStore := store.Clone(core.OfflineStore(physicallyDestroyed)) + newStore := store.Clone(core.SetStoreState(metapb.StoreState_Offline, physicallyDestroyed)) log.Warn("store has been offline", zap.Uint64("store-id", storeID), zap.String("store-address", newStore.GetAddress()), @@ -1615,7 +1612,7 @@ func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error { } } - newStore := store.Clone(core.TombstoneStore()) + newStore := store.Clone(core.SetStoreState(metapb.StoreState_Tombstone)) log.Warn("store has been Tombstone", zap.Uint64("store-id", storeID), zap.String("store-address", newStore.GetAddress()), @@ -1721,7 +1718,7 @@ func (c *RaftCluster) UpStore(storeID uint64) error { return nil } - options := []core.StoreCreateOption{core.UpStore()} + options := []core.StoreCreateOption{core.SetStoreState(metapb.StoreState_Up)} // get the previous store limit recorded in memory limiter, exist := c.prevStoreLimit[storeID] if exist { @@ -1768,7 +1765,7 @@ func (c *RaftCluster) ReadyToServe(storeID uint64) error { return errs.ErrStoreServing.FastGenByArgs(storeID) } - newStore := store.Clone(core.UpStore()) + newStore := store.Clone(core.SetStoreState(metapb.StoreState_Up)) log.Info("store has changed to serving", zap.Uint64("store-id", storeID), zap.String("store-address", newStore.GetAddress())) diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index e5bf862174b..605fd222502 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -225,7 +225,7 @@ func TestFilterUnhealthyStore(t *testing.T) { Available: 50, RegionCount: 1, } - newStore := store.Clone(core.TombstoneStore()) + newStore := store.Clone(core.SetStoreState(metapb.StoreState_Tombstone)) re.NoError(cluster.putStoreLocked(newStore)) re.NoError(cluster.HandleStoreHeartbeat(req, resp)) re.Nil(cluster.hotStat.GetRollingStoreStats(store.GetID())) @@ -2351,7 +2351,7 @@ func (c *testCluster) addLeaderStore(storeID uint64, leaderCount int) error { func (c *testCluster) setStoreDown(storeID uint64) error { store := c.GetStore(storeID) newStore := store.Clone( - core.UpStore(), + core.SetStoreState(metapb.StoreState_Up), core.SetLastHeartbeatTS(typeutil.ZeroTime), ) c.Lock() @@ -2361,7 +2361,7 @@ func (c *testCluster) setStoreDown(storeID uint64) error { func (c *testCluster) setStoreOffline(storeID uint64) error { store := c.GetStore(storeID) - newStore := store.Clone(core.OfflineStore(false)) + newStore := store.Clone(core.SetStoreState(metapb.StoreState_Offline, false)) c.Lock() defer c.Unlock() return c.putStoreLocked(newStore) diff --git a/tests/integrations/mcs/scheduling/meta_test.go b/tests/integrations/mcs/scheduling/meta_test.go new file mode 100644 index 00000000000..74497e0b552 --- /dev/null +++ b/tests/integrations/mcs/scheduling/meta_test.go @@ -0,0 +1,102 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduling + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/mcs/scheduling/server/meta" + "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/tests" +) + +type metaTestSuite struct { + suite.Suite + + ctx context.Context + cancel context.CancelFunc + + // The PD cluster. + cluster *tests.TestCluster + // pdLeaderServer is the leader server of the PD cluster. + pdLeaderServer *tests.TestServer +} + +func TestMeta(t *testing.T) { + suite.Run(t, &metaTestSuite{}) +} + +func (suite *metaTestSuite) SetupSuite() { + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) + var err error + suite.ctx, suite.cancel = context.WithCancel(context.Background()) + suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1) + re.NoError(err) + err = suite.cluster.RunInitialServers() + re.NoError(err) + leaderName := suite.cluster.WaitLeader() + suite.pdLeaderServer = suite.cluster.GetServer(leaderName) + re.NoError(suite.pdLeaderServer.BootstrapCluster()) +} + +func (suite *metaTestSuite) TearDownSuite() { + suite.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) + suite.cancel() + suite.cluster.Destroy() +} + +func (suite *metaTestSuite) TestStoreWatch() { + re := suite.Require() + + cluster := core.NewBasicCluster() + // Create a meta watcher. + _, err := meta.NewWatcher( + suite.ctx, + suite.pdLeaderServer.GetEtcdClient(), + suite.cluster.GetCluster().GetId(), + cluster, + ) + re.NoError(err) + for i := uint64(1); i <= 4; i++ { + suite.pdLeaderServer.GetServer().GetRaftCluster().PutStore( + &metapb.Store{Id: i, Address: fmt.Sprintf("mock-%d", i), State: metapb.StoreState_Up, NodeState: metapb.NodeState_Serving, LastHeartbeat: time.Now().UnixNano()}, + ) + } + + suite.pdLeaderServer.GetRaftCluster().RemoveStore(2, false) + testutil.Eventually(re, func() bool { + s := cluster.GetStore(2) + if s == nil { + return false + } + return s.GetState() == metapb.StoreState_Offline + }) + re.Len(cluster.GetStores(), 4) + testutil.Eventually(re, func() bool { + return cluster.GetStore(2).GetState() == metapb.StoreState_Tombstone + }) + re.NoError(suite.pdLeaderServer.GetRaftCluster().RemoveTombStoneRecords()) + testutil.Eventually(re, func() bool { + return cluster.GetStore(2) == nil + }) +} diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 87acdf897fd..f22a754b8bf 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -351,11 +351,11 @@ func getTestDeployPath(storeID uint64) string { func resetStoreState(re *require.Assertions, rc *cluster.RaftCluster, storeID uint64, state metapb.StoreState) { store := rc.GetStore(storeID) re.NotNil(store) - newStore := store.Clone(core.OfflineStore(false)) + newStore := store.Clone(core.SetStoreState(metapb.StoreState_Offline, false)) if state == metapb.StoreState_Up { - newStore = newStore.Clone(core.UpStore()) + newStore = newStore.Clone(core.SetStoreState(metapb.StoreState_Up)) } else if state == metapb.StoreState_Tombstone { - newStore = newStore.Clone(core.TombstoneStore()) + newStore = newStore.Clone(core.SetStoreState(metapb.StoreState_Tombstone)) } rc.GetBasicCluster().PutStore(newStore)