Skip to content

Commit

Permalink
sync store state to scheduling service
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Sep 5, 2023
1 parent 7d50755 commit 0f9fbae
Show file tree
Hide file tree
Showing 13 changed files with 255 additions and 59 deletions.
43 changes: 18 additions & 25 deletions pkg/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,33 +74,26 @@ func SetStoreDeployPath(deployPath string) StoreCreateOption {
}
}

// OfflineStore offline a store
func OfflineStore(physicallyDestroyed bool) StoreCreateOption {
// SetStoreState sets the state for the store.
func SetStoreState(state metapb.StoreState, physicallyDestroyed ...bool) StoreCreateOption {
return func(store *StoreInfo) {
meta := typeutil.DeepClone(store.meta, StoreFactory)
meta.State = metapb.StoreState_Offline
meta.NodeState = metapb.NodeState_Removing
meta.PhysicallyDestroyed = physicallyDestroyed
store.meta = meta
}
}

// UpStore up a store
func UpStore() StoreCreateOption {
return func(store *StoreInfo) {
meta := typeutil.DeepClone(store.meta, StoreFactory)
meta.State = metapb.StoreState_Up
meta.NodeState = metapb.NodeState_Serving
store.meta = meta
}
}

// TombstoneStore set a store to tombstone.
func TombstoneStore() StoreCreateOption {
return func(store *StoreInfo) {
meta := typeutil.DeepClone(store.meta, StoreFactory)
meta.State = metapb.StoreState_Tombstone
meta.NodeState = metapb.NodeState_Removed
switch state {
case metapb.StoreState_Up:
meta.State = metapb.StoreState_Up
meta.NodeState = metapb.NodeState_Serving
case metapb.StoreState_Offline:
if len(physicallyDestroyed) != 0 {
meta.State = metapb.StoreState_Offline
meta.NodeState = metapb.NodeState_Removing
meta.PhysicallyDestroyed = physicallyDestroyed[0]
} else {
panic("physicallyDestroyed should be set when set store state to offline")
}
case metapb.StoreState_Tombstone:
meta.State = metapb.StoreState_Tombstone
meta.NodeState = metapb.NodeState_Removed
}
store.meta = meta
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestCloneStore(t *testing.T) {
break
}
store.Clone(
UpStore(),
SetStoreState(metapb.StoreState_Up),
SetLastHeartbeatTS(time.Now()),
)
}
Expand Down
101 changes: 101 additions & 0 deletions pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package meta

import (
"context"
"sync"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
)

// Watcher is used to watch the PD API server for any meta changes.
type Watcher struct {
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc

// storePathPrefix is the path of the store in etcd:
// - Key: /pd/{cluster_id}/raft/s/
// - Value: meta store proto.
storePathPrefix string

etcdClient *clientv3.Client
basicCluster *core.BasicCluster
storeWatcher *etcdutil.LoopWatcher
}

// NewWatcher creates a new watcher to watch the meta change from PD API server.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
clusterID uint64,
basicCluster *core.BasicCluster,
) (*Watcher, error) {
ctx, cancel := context.WithCancel(ctx)
w := &Watcher{
ctx: ctx,
cancel: cancel,
storePathPrefix: endpoint.StorePathPrefix(clusterID),
etcdClient: etcdClient,
basicCluster: basicCluster,
}
err := w.initializeStoreWatcher()
if err != nil {
return nil, err
}
return w, nil
}

func (w *Watcher) initializeStoreWatcher() error {
putFn := func(kv *mvccpb.KeyValue) error {
store := &metapb.Store{}
if err := proto.Unmarshal(kv.Value, store); err != nil {
log.Warn("failed to unmarshal store entry",
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
origin := w.basicCluster.GetStore(store.GetId())
if origin == nil {
w.basicCluster.PutStore(core.NewStoreInfo(store))
return nil
}
w.basicCluster.PutStore(origin.Clone(core.SetStoreState(store.GetState(), store.GetPhysicallyDestroyed())))
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
return nil
}
postEventFn := func() error {
return nil
}
w.storeWatcher = etcdutil.NewLoopWatcher(
w.ctx, &w.wg,
w.etcdClient,
"scheduling-store-watcher", w.storePathPrefix,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
w.storeWatcher.StartWatchLoop()
return w.storeWatcher.WaitLoad()
}
26 changes: 17 additions & 9 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
"github.com/tikv/pd/pkg/mcs/scheduling/server/meta"
"github.com/tikv/pd/pkg/mcs/scheduling/server/rule"
"github.com/tikv/pd/pkg/mcs/server"
"github.com/tikv/pd/pkg/mcs/utils"
Expand Down Expand Up @@ -76,6 +77,7 @@ type Server struct {
cfg *config.Config
clusterID uint64
persistConfig *config.PersistConfig
basicCluster *core.BasicCluster

// for the primary election of scheduling
participant *member.Participant
Expand All @@ -97,6 +99,7 @@ type Server struct {
// for watching the PD API server meta info updates that are related to the scheduling.
configWatcher *config.Watcher
ruleWatcher *rule.Watcher
metaWatcher *meta.Watcher
}

// Name returns the unique name for this server in the scheduling cluster.
Expand Down Expand Up @@ -317,6 +320,11 @@ func (s *Server) GetCluster() *Cluster {
return s.cluster
}

// GetBasicCluster returns the basic cluster.
func (s *Server) GetBasicCluster() *core.BasicCluster {
return s.basicCluster
}

// GetCoordinator returns the coordinator.
func (s *Server) GetCoordinator() *schedule.Coordinator {
return s.GetCluster().GetCoordinator()
Expand Down Expand Up @@ -363,15 +371,15 @@ func (s *Server) startServer() (err error) {
s.participant = member.NewParticipant(s.GetClient())
s.participant.InitInfo(uniqueName, uniqueID, path.Join(schedulingPrimaryPrefix, fmt.Sprintf("%05d", 0)),
utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr)
s.basicCluster = core.NewBasicCluster()
err = s.startWatcher()
if err != nil {
return err
}
s.storage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(s.GetClient(), endpoint.PDRootPath(s.clusterID)), nil)
basicCluster := core.NewBasicCluster()
s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, basicCluster)
s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh)
s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster)
s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh)
if err != nil {
return err
}
Expand Down Expand Up @@ -412,15 +420,15 @@ func (s *Server) startServer() (err error) {
}

func (s *Server) startWatcher() (err error) {
s.configWatcher, err = config.NewWatcher(
s.Context(), s.GetClient(), s.clusterID, s.persistConfig,
)
s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.basicCluster)
if err != nil {
return err
}
s.configWatcher, err = config.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.persistConfig)
if err != nil {
return err
}
s.ruleWatcher, err = rule.NewWatcher(
s.Context(), s.GetClient(), s.clusterID,
)
s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.clusterID)
return err
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (mc *Cluster) GetRegionLabeler() *labeler.RegionLabeler {
func (mc *Cluster) SetStoreUp(storeID uint64) {
store := mc.GetStore(storeID)
newStore := store.Clone(
core.UpStore(),
core.SetStoreState(metapb.StoreState_Up),
core.SetLastHeartbeatTS(time.Now()),
)
mc.PutStore(newStore)
Expand All @@ -241,7 +241,7 @@ func (mc *Cluster) SetStoreUp(storeID uint64) {
func (mc *Cluster) SetStoreDisconnect(storeID uint64) {
store := mc.GetStore(storeID)
newStore := store.Clone(
core.UpStore(),
core.SetStoreState(metapb.StoreState_Up),
core.SetLastHeartbeatTS(time.Now().Add(-time.Second*30)),
)
mc.PutStore(newStore)
Expand All @@ -251,7 +251,7 @@ func (mc *Cluster) SetStoreDisconnect(storeID uint64) {
func (mc *Cluster) SetStoreDown(storeID uint64) {
store := mc.GetStore(storeID)
newStore := store.Clone(
core.UpStore(),
core.SetStoreState(metapb.StoreState_Up),
core.SetLastHeartbeatTS(typeutil.ZeroTime),
)
mc.PutStore(newStore)
Expand All @@ -260,7 +260,7 @@ func (mc *Cluster) SetStoreDown(storeID uint64) {
// SetStoreOffline sets store state to be offline.
func (mc *Cluster) SetStoreOffline(storeID uint64) {
store := mc.GetStore(storeID)
newStore := store.Clone(core.OfflineStore(false))
newStore := store.Clone(core.SetStoreState(metapb.StoreState_Offline, false))
mc.PutStore(newStore)
}

Expand All @@ -287,7 +287,7 @@ func (mc *Cluster) BuryStore(storeID uint64, forceBury bool) error {
}
}

newStore := store.Clone(core.TombstoneStore())
newStore := store.Clone(core.SetStoreState(metapb.StoreState_Tombstone))
mc.PutStore(newStore)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/statistics/region_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestRegionStatistics(t *testing.T) {
{Peer: peers[1], DownSeconds: 3608},
}

store3 := stores[3].Clone(core.OfflineStore(false))
store3 := stores[3].Clone(core.SetStoreState(metapb.StoreState_Offline, false))
stores[3] = store3
r1 := &metapb.Region{Id: 1, Peers: peers, StartKey: []byte("aa"), EndKey: []byte("bb")}
r2 := &metapb.Region{Id: 2, Peers: peers[0:2], StartKey: []byte("cc"), EndKey: []byte("dd")}
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestRegionStatistics(t *testing.T) {
re.Empty(regionStats.stats[LearnerPeer])
re.Empty(regionStats.stats[OfflinePeer])

store3 = stores[3].Clone(core.UpStore())
store3 = stores[3].Clone(core.SetStoreState(metapb.StoreState_Up))
stores[3] = store3
regionStats.Observe(region1, stores)
re.Empty(regionStats.stats[OfflinePeer])
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/store_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestStoreStatistics(t *testing.T) {
stores = append(stores, s)
}

store3 := stores[3].Clone(core.OfflineStore(false))
store3 := stores[3].Clone(core.SetStoreState(metapb.StoreState_Offline, false))
stores[3] = store3
store4 := stores[4].Clone(core.SetLastHeartbeatTS(stores[4].GetLastHeartbeatTS().Add(-time.Hour)))
stores[4] = store4
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestFilterUnhealtyStore(t *testing.T) {
re.Len(stats.GetStoresLoads(), 5)

cluster.PutStore(cluster.GetStore(1).Clone(core.SetLastHeartbeatTS(time.Now().Add(-24 * time.Hour))))
cluster.PutStore(cluster.GetStore(2).Clone(core.TombstoneStore()))
cluster.PutStore(cluster.GetStore(2).Clone(core.SetStoreState(metapb.StoreState_Tombstone)))
cluster.DeleteStore(cluster.GetStore(3))

stats.FilterUnhealthyStore(cluster)
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ func StorePath(storeID uint64) string {
return path.Join(clusterPath, "s", fmt.Sprintf("%020d", storeID))
}

// StorePathPrefix returns the store meta info key path prefix.
func StorePathPrefix(clusterID uint64) string {
return path.Join(PDRootPath(clusterID), clusterPath, "s") + "/"
}

func storeLeaderWeightPath(storeID uint64) string {
return path.Join(schedulePath, "store_weight", fmt.Sprintf("%020d", storeID), "leader")
}
Expand Down
13 changes: 4 additions & 9 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1456,27 +1456,22 @@ func (c *RaftCluster) RemoveStore(storeID uint64, physicallyDestroyed bool) erro
if store == nil {
return errs.ErrStoreNotFound.FastGenByArgs(storeID)
}

// Remove an offline store should be OK, nothing to do.
if store.IsRemoving() && store.IsPhysicallyDestroyed() == physicallyDestroyed {
return nil
}

if store.IsRemoved() {
return errs.ErrStoreRemoved.FastGenByArgs(storeID)
}

if store.IsPhysicallyDestroyed() {
return errs.ErrStoreDestroyed.FastGenByArgs(storeID)
}

if (store.IsPreparing() || store.IsServing()) && !physicallyDestroyed {
if err := c.checkReplicaBeforeOfflineStore(storeID); err != nil {
return err
}
}

newStore := store.Clone(core.OfflineStore(physicallyDestroyed))
newStore := store.Clone(core.SetStoreState(metapb.StoreState_Offline, physicallyDestroyed))
log.Warn("store has been offline",
zap.Uint64("store-id", storeID),
zap.String("store-address", newStore.GetAddress()),
Expand Down Expand Up @@ -1585,7 +1580,7 @@ func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error {
}
}

newStore := store.Clone(core.TombstoneStore())
newStore := store.Clone(core.SetStoreState(metapb.StoreState_Tombstone))
log.Warn("store has been Tombstone",
zap.Uint64("store-id", storeID),
zap.String("store-address", newStore.GetAddress()),
Expand Down Expand Up @@ -1689,7 +1684,7 @@ func (c *RaftCluster) UpStore(storeID uint64) error {
return nil
}

options := []core.StoreCreateOption{core.UpStore()}
options := []core.StoreCreateOption{core.SetStoreState(metapb.StoreState_Up)}
// get the previous store limit recorded in memory
limiter, exist := c.prevStoreLimit[storeID]
if exist {
Expand Down Expand Up @@ -1736,7 +1731,7 @@ func (c *RaftCluster) ReadyToServe(storeID uint64) error {
return errs.ErrStoreServing.FastGenByArgs(storeID)
}

newStore := store.Clone(core.UpStore())
newStore := store.Clone(core.SetStoreState(metapb.StoreState_Up))
log.Info("store has changed to serving",
zap.Uint64("store-id", storeID),
zap.String("store-address", newStore.GetAddress()))
Expand Down
Loading

0 comments on commit 0f9fbae

Please sign in to comment.