Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: sync store state to scheduling service #7047

Merged
merged 2 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 18 additions & 25 deletions pkg/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,33 +74,26 @@
}
}

// OfflineStore offline a store
func OfflineStore(physicallyDestroyed bool) StoreCreateOption {
// SetStoreState sets the state for the store.
func SetStoreState(state metapb.StoreState, physicallyDestroyed ...bool) StoreCreateOption {
return func(store *StoreInfo) {
meta := typeutil.DeepClone(store.meta, StoreFactory)
meta.State = metapb.StoreState_Offline
meta.NodeState = metapb.NodeState_Removing
meta.PhysicallyDestroyed = physicallyDestroyed
store.meta = meta
}
}

// UpStore up a store
func UpStore() StoreCreateOption {
return func(store *StoreInfo) {
meta := typeutil.DeepClone(store.meta, StoreFactory)
meta.State = metapb.StoreState_Up
meta.NodeState = metapb.NodeState_Serving
store.meta = meta
}
}

// TombstoneStore set a store to tombstone.
func TombstoneStore() StoreCreateOption {
return func(store *StoreInfo) {
meta := typeutil.DeepClone(store.meta, StoreFactory)
meta.State = metapb.StoreState_Tombstone
meta.NodeState = metapb.NodeState_Removed
switch state {
case metapb.StoreState_Up:
meta.State = metapb.StoreState_Up
meta.NodeState = metapb.NodeState_Serving
case metapb.StoreState_Offline:
if len(physicallyDestroyed) != 0 {
meta.State = metapb.StoreState_Offline
meta.NodeState = metapb.NodeState_Removing
meta.PhysicallyDestroyed = physicallyDestroyed[0]
} else {
panic("physicallyDestroyed should be set when set store state to offline")

Check warning on line 91 in pkg/core/store_option.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/store_option.go#L91

Added line #L91 was not covered by tests
}
case metapb.StoreState_Tombstone:
meta.State = metapb.StoreState_Tombstone
meta.NodeState = metapb.NodeState_Removed
}
store.meta = meta
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestCloneStore(t *testing.T) {
break
}
store.Clone(
UpStore(),
SetStoreState(metapb.StoreState_Up),
SetLastHeartbeatTS(time.Now()),
)
}
Expand Down
117 changes: 117 additions & 0 deletions pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package meta

import (
"context"
"sync"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
)

// Watcher is used to watch the PD API server for any meta changes.
type Watcher struct {
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
clusterID uint64
// storePathPrefix is the path of the store in etcd:
// - Key: /pd/{cluster_id}/raft/s/
// - Value: meta store proto.
storePathPrefix string

etcdClient *clientv3.Client
basicCluster *core.BasicCluster
storeWatcher *etcdutil.LoopWatcher
}

// NewWatcher creates a new watcher to watch the meta change from PD API server.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
clusterID uint64,
basicCluster *core.BasicCluster,
) (*Watcher, error) {
ctx, cancel := context.WithCancel(ctx)
w := &Watcher{
ctx: ctx,
cancel: cancel,
clusterID: clusterID,
storePathPrefix: endpoint.StorePathPrefix(clusterID),
etcdClient: etcdClient,
basicCluster: basicCluster,
}
err := w.initializeStoreWatcher()
if err != nil {
return nil, err

Check warning on line 66 in pkg/mcs/scheduling/server/meta/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/meta/watcher.go#L66

Added line #L66 was not covered by tests
}
return w, nil
}

func (w *Watcher) initializeStoreWatcher() error {
putFn := func(kv *mvccpb.KeyValue) error {
store := &metapb.Store{}
if err := proto.Unmarshal(kv.Value, store); err != nil {
log.Warn("failed to unmarshal store entry",
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to call ExtractStoreIDFromPath to get the store ID here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean to add it to the log?

return err

Check warning on line 77 in pkg/mcs/scheduling/server/meta/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/meta/watcher.go#L75-L77

Added lines #L75 - L77 were not covered by tests
}
origin := w.basicCluster.GetStore(store.GetId())
if origin == nil {
w.basicCluster.PutStore(core.NewStoreInfo(store))
return nil
}
w.basicCluster.PutStore(origin.Clone(core.SetStoreState(store.GetState(), store.GetPhysicallyDestroyed())))
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
storeID, err := endpoint.ExtractStoreIDFromPath(w.clusterID, key)
if err != nil {
return err

Check warning on line 91 in pkg/mcs/scheduling/server/meta/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/meta/watcher.go#L91

Added line #L91 was not covered by tests
}
origin := w.basicCluster.GetStore(storeID)
if origin != nil {
w.basicCluster.DeleteStore(origin)
}
return nil
}
postEventFn := func() error {
return nil
}
w.storeWatcher = etcdutil.NewLoopWatcher(
w.ctx, &w.wg,
w.etcdClient,
"scheduling-store-watcher", w.storePathPrefix,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
w.storeWatcher.StartWatchLoop()
return w.storeWatcher.WaitLoad()
}

// Close closes the watcher.
func (w *Watcher) Close() {
w.cancel()
w.wg.Wait()
}
27 changes: 18 additions & 9 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
"github.com/tikv/pd/pkg/mcs/scheduling/server/meta"
"github.com/tikv/pd/pkg/mcs/scheduling/server/rule"
"github.com/tikv/pd/pkg/mcs/server"
"github.com/tikv/pd/pkg/mcs/utils"
Expand Down Expand Up @@ -77,6 +78,7 @@
cfg *config.Config
clusterID uint64
persistConfig *config.PersistConfig
basicCluster *core.BasicCluster

// for the primary election of scheduling
participant *member.Participant
Expand All @@ -98,6 +100,7 @@
// for watching the PD API server meta info updates that are related to the scheduling.
configWatcher *config.Watcher
ruleWatcher *rule.Watcher
metaWatcher *meta.Watcher
}

// Name returns the unique name for this server in the scheduling cluster.
Expand Down Expand Up @@ -279,6 +282,7 @@
s.GetCoordinator().Stop()
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
s.serverLoopCancel()
s.serverLoopWg.Wait()

Expand Down Expand Up @@ -319,6 +323,11 @@
return s.cluster
}

// GetBasicCluster returns the basic cluster.
func (s *Server) GetBasicCluster() *core.BasicCluster {
return s.basicCluster

Check warning on line 328 in pkg/mcs/scheduling/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/server.go#L328

Added line #L328 was not covered by tests
}

// GetCoordinator returns the coordinator.
func (s *Server) GetCoordinator() *schedule.Coordinator {
return s.GetCluster().GetCoordinator()
Expand Down Expand Up @@ -365,15 +374,15 @@
s.participant = member.NewParticipant(s.GetClient())
s.participant.InitInfo(uniqueName, uniqueID, path.Join(schedulingPrimaryPrefix, fmt.Sprintf("%05d", 0)),
utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr)
s.basicCluster = core.NewBasicCluster()
err = s.startWatcher()
if err != nil {
return err
}
s.storage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(s.GetClient(), endpoint.PDRootPath(s.clusterID)), nil)
basicCluster := core.NewBasicCluster()
s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, basicCluster)
s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh)
s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster)
s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh)
if err != nil {
return err
}
Expand Down Expand Up @@ -414,15 +423,15 @@
}

func (s *Server) startWatcher() (err error) {
s.configWatcher, err = config.NewWatcher(
s.Context(), s.GetClient(), s.clusterID, s.persistConfig,
)
s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.basicCluster)
if err != nil {
return err

Check warning on line 428 in pkg/mcs/scheduling/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/server.go#L428

Added line #L428 was not covered by tests
}
s.configWatcher, err = config.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.persistConfig)
if err != nil {
return err
}
s.ruleWatcher, err = rule.NewWatcher(
s.Context(), s.GetClient(), s.clusterID,
)
s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.clusterID)
return err
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (mc *Cluster) GetRegionLabeler() *labeler.RegionLabeler {
func (mc *Cluster) SetStoreUp(storeID uint64) {
store := mc.GetStore(storeID)
newStore := store.Clone(
core.UpStore(),
core.SetStoreState(metapb.StoreState_Up),
core.SetLastHeartbeatTS(time.Now()),
)
mc.PutStore(newStore)
Expand All @@ -241,7 +241,7 @@ func (mc *Cluster) SetStoreUp(storeID uint64) {
func (mc *Cluster) SetStoreDisconnect(storeID uint64) {
store := mc.GetStore(storeID)
newStore := store.Clone(
core.UpStore(),
core.SetStoreState(metapb.StoreState_Up),
core.SetLastHeartbeatTS(time.Now().Add(-time.Second*30)),
)
mc.PutStore(newStore)
Expand All @@ -251,7 +251,7 @@ func (mc *Cluster) SetStoreDisconnect(storeID uint64) {
func (mc *Cluster) SetStoreDown(storeID uint64) {
store := mc.GetStore(storeID)
newStore := store.Clone(
core.UpStore(),
core.SetStoreState(metapb.StoreState_Up),
core.SetLastHeartbeatTS(typeutil.ZeroTime),
)
mc.PutStore(newStore)
Expand All @@ -260,7 +260,7 @@ func (mc *Cluster) SetStoreDown(storeID uint64) {
// SetStoreOffline sets store state to be offline.
func (mc *Cluster) SetStoreOffline(storeID uint64) {
store := mc.GetStore(storeID)
newStore := store.Clone(core.OfflineStore(false))
newStore := store.Clone(core.SetStoreState(metapb.StoreState_Offline, false))
mc.PutStore(newStore)
}

Expand All @@ -287,7 +287,7 @@ func (mc *Cluster) BuryStore(storeID uint64, forceBury bool) error {
}
}

newStore := store.Clone(core.TombstoneStore())
newStore := store.Clone(core.SetStoreState(metapb.StoreState_Tombstone))
mc.PutStore(newStore)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/statistics/region_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestRegionStatistics(t *testing.T) {
{Peer: peers[1], DownSeconds: 3608},
}

store3 := stores[3].Clone(core.OfflineStore(false))
store3 := stores[3].Clone(core.SetStoreState(metapb.StoreState_Offline, false))
stores[3] = store3
r1 := &metapb.Region{Id: 1, Peers: peers, StartKey: []byte("aa"), EndKey: []byte("bb")}
r2 := &metapb.Region{Id: 2, Peers: peers[0:2], StartKey: []byte("cc"), EndKey: []byte("dd")}
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestRegionStatistics(t *testing.T) {
re.Empty(regionStats.stats[LearnerPeer])
re.Empty(regionStats.stats[OfflinePeer])

store3 = stores[3].Clone(core.UpStore())
store3 = stores[3].Clone(core.SetStoreState(metapb.StoreState_Up))
stores[3] = store3
regionStats.Observe(region1, stores)
re.Empty(regionStats.stats[OfflinePeer])
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/store_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestStoreStatistics(t *testing.T) {
stores = append(stores, s)
}

store3 := stores[3].Clone(core.OfflineStore(false))
store3 := stores[3].Clone(core.SetStoreState(metapb.StoreState_Offline, false))
stores[3] = store3
store4 := stores[4].Clone(core.SetLastHeartbeatTS(stores[4].GetLastHeartbeatTS().Add(-time.Hour)))
stores[4] = store4
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestFilterUnhealtyStore(t *testing.T) {
re.Len(stats.GetStoresLoads(), 5)

cluster.PutStore(cluster.GetStore(1).Clone(core.SetLastHeartbeatTS(time.Now().Add(-24 * time.Hour))))
cluster.PutStore(cluster.GetStore(2).Clone(core.TombstoneStore()))
cluster.PutStore(cluster.GetStore(2).Clone(core.SetStoreState(metapb.StoreState_Tombstone)))
cluster.DeleteStore(cluster.GetStore(3))

stats.FilterUnhealthyStore(cluster)
Expand Down
11 changes: 11 additions & 0 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,17 @@ func StorePath(storeID uint64) string {
return path.Join(clusterPath, "s", fmt.Sprintf("%020d", storeID))
}

// StorePathPrefix returns the store meta info key path prefix.
func StorePathPrefix(clusterID uint64) string {
return path.Join(PDRootPath(clusterID), clusterPath, "s") + "/"
}

// ExtractStoreIDFromPath extracts the store ID from the given path.
func ExtractStoreIDFromPath(clusterID uint64, path string) (uint64, error) {
idStr := strings.TrimLeft(strings.TrimPrefix(path, StorePathPrefix(clusterID)), "0")
return strconv.ParseUint(idStr, 10, 64)
}

func storeLeaderWeightPath(storeID uint64) string {
return path.Join(schedulePath, "store_weight", fmt.Sprintf("%020d", storeID), "leader")
}
Expand Down
Loading
Loading