Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into export-config
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch committed Sep 6, 2023
2 parents 423b583 + a83322f commit 6729238
Show file tree
Hide file tree
Showing 33 changed files with 420 additions and 134 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,11 @@ error = '''
sync max ts failed, %s
'''

["PD:tso:ErrUpdateTimestamp"]
error = '''
update timestamp failed, %s
'''

["PD:typeutil:ErrBytesToUint64"]
error = '''
invalid data, must 8 bytes, but %d
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30 h1:EvqKcDT7ceGLW0mXqM8Cp5Z8DfgQRnwj2YTnlCLj2QI=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974 h1:Gn8rf2Mb3QDifUQHdtcopqKclc9L11hjhZFYBE65lcw=
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
4 changes: 4 additions & 0 deletions pd.code-workspace
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
{
"name": "pd-tso-bench",
"path": "tools/pd-tso-bench"
},
{
"name": "pd-api-bench",
"path": "tools/pd-api-bench"
}
],
"settings": {}
Expand Down
43 changes: 18 additions & 25 deletions pkg/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,33 +74,26 @@ func SetStoreDeployPath(deployPath string) StoreCreateOption {
}
}

// OfflineStore offline a store
func OfflineStore(physicallyDestroyed bool) StoreCreateOption {
// SetStoreState sets the state for the store.
func SetStoreState(state metapb.StoreState, physicallyDestroyed ...bool) StoreCreateOption {
return func(store *StoreInfo) {
meta := typeutil.DeepClone(store.meta, StoreFactory)
meta.State = metapb.StoreState_Offline
meta.NodeState = metapb.NodeState_Removing
meta.PhysicallyDestroyed = physicallyDestroyed
store.meta = meta
}
}

// UpStore up a store
func UpStore() StoreCreateOption {
return func(store *StoreInfo) {
meta := typeutil.DeepClone(store.meta, StoreFactory)
meta.State = metapb.StoreState_Up
meta.NodeState = metapb.NodeState_Serving
store.meta = meta
}
}

// TombstoneStore set a store to tombstone.
func TombstoneStore() StoreCreateOption {
return func(store *StoreInfo) {
meta := typeutil.DeepClone(store.meta, StoreFactory)
meta.State = metapb.StoreState_Tombstone
meta.NodeState = metapb.NodeState_Removed
switch state {
case metapb.StoreState_Up:
meta.State = metapb.StoreState_Up
meta.NodeState = metapb.NodeState_Serving
case metapb.StoreState_Offline:
if len(physicallyDestroyed) != 0 {
meta.State = metapb.StoreState_Offline
meta.NodeState = metapb.NodeState_Removing
meta.PhysicallyDestroyed = physicallyDestroyed[0]
} else {
panic("physicallyDestroyed should be set when set store state to offline")
}
case metapb.StoreState_Tombstone:
meta.State = metapb.StoreState_Tombstone
meta.NodeState = metapb.NodeState_Removed
}
store.meta = meta
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestCloneStore(t *testing.T) {
break
}
store.Clone(
UpStore(),
SetStoreState(metapb.StoreState_Up),
SetLastHeartbeatTS(time.Now()),
)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
ErrSyncMaxTS = errors.Normalize("sync max ts failed, %s", errors.RFCCodeText("PD:tso:ErrSyncMaxTS"))
ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp"))
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
ErrUpdateTimestamp = errors.Normalize("update timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrUpdateTimestamp"))
ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow"))
ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout"))
ErrKeyspaceGroupIDInvalid = errors.Normalize("the keyspace group id is invalid, %s", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIDInvalid"))
Expand Down
14 changes: 8 additions & 6 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ package server

import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"path"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -30,6 +28,7 @@ import (
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"github.com/pingcap/sysutil"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -284,10 +283,13 @@ func (s *Server) startServer() (err error) {
uniqueName := s.cfg.ListenAddr
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
resourceManagerPrimaryPrefix := endpoint.ResourceManagerSvcRootPath(s.clusterID)
s.participant = member.NewParticipant(s.GetClient())
s.participant.InitInfo(uniqueName, uniqueID, path.Join(resourceManagerPrimaryPrefix, fmt.Sprintf("%05d", 0)),
utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr)
s.participant = member.NewParticipant(s.GetClient(), utils.ResourceManagerServiceName)
p := &resource_manager.Participant{
Name: uniqueName,
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{s.cfg.AdvertiseListenAddr},
}
s.participant.InitInfo(p, endpoint.ResourceManagerSvcRootPath(s.clusterID), utils.PrimaryKey, "primary election")

s.service = &Service{
ctx: s.Context(),
Expand Down
117 changes: 117 additions & 0 deletions pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package meta

import (
"context"
"sync"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
)

// Watcher is used to watch the PD API server for any meta changes.
type Watcher struct {
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
clusterID uint64
// storePathPrefix is the path of the store in etcd:
// - Key: /pd/{cluster_id}/raft/s/
// - Value: meta store proto.
storePathPrefix string

etcdClient *clientv3.Client
basicCluster *core.BasicCluster
storeWatcher *etcdutil.LoopWatcher
}

// NewWatcher creates a new watcher to watch the meta change from PD API server.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
clusterID uint64,
basicCluster *core.BasicCluster,
) (*Watcher, error) {
ctx, cancel := context.WithCancel(ctx)
w := &Watcher{
ctx: ctx,
cancel: cancel,
clusterID: clusterID,
storePathPrefix: endpoint.StorePathPrefix(clusterID),
etcdClient: etcdClient,
basicCluster: basicCluster,
}
err := w.initializeStoreWatcher()
if err != nil {
return nil, err
}
return w, nil
}

func (w *Watcher) initializeStoreWatcher() error {
putFn := func(kv *mvccpb.KeyValue) error {
store := &metapb.Store{}
if err := proto.Unmarshal(kv.Value, store); err != nil {
log.Warn("failed to unmarshal store entry",
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
origin := w.basicCluster.GetStore(store.GetId())
if origin == nil {
w.basicCluster.PutStore(core.NewStoreInfo(store))
return nil
}
w.basicCluster.PutStore(origin.Clone(core.SetStoreState(store.GetState(), store.GetPhysicallyDestroyed())))
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
storeID, err := endpoint.ExtractStoreIDFromPath(w.clusterID, key)
if err != nil {
return err
}
origin := w.basicCluster.GetStore(storeID)
if origin != nil {
w.basicCluster.DeleteStore(origin)
}
return nil
}
postEventFn := func() error {
return nil
}
w.storeWatcher = etcdutil.NewLoopWatcher(
w.ctx, &w.wg,
w.etcdClient,
"scheduling-store-watcher", w.storePathPrefix,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
w.storeWatcher.StartWatchLoop()
return w.storeWatcher.WaitLoad()
}

// Close closes the watcher.
func (w *Watcher) Close() {
w.cancel()
w.wg.Wait()
}
40 changes: 26 additions & 14 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"net/http"
"os"
"os/signal"
"path"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/log"
"github.com/pingcap/sysutil"
"github.com/spf13/cobra"
Expand All @@ -39,6 +39,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
"github.com/tikv/pd/pkg/mcs/scheduling/server/meta"
"github.com/tikv/pd/pkg/mcs/scheduling/server/rule"
"github.com/tikv/pd/pkg/mcs/server"
"github.com/tikv/pd/pkg/mcs/utils"
Expand Down Expand Up @@ -77,6 +78,7 @@ type Server struct {
cfg *config.Config
clusterID uint64
persistConfig *config.PersistConfig
basicCluster *core.BasicCluster

// for the primary election of scheduling
participant *member.Participant
Expand All @@ -98,6 +100,7 @@ type Server struct {
// for watching the PD API server meta info updates that are related to the scheduling.
configWatcher *config.Watcher
ruleWatcher *rule.Watcher
metaWatcher *meta.Watcher
}

// Name returns the unique name for this server in the scheduling cluster.
Expand Down Expand Up @@ -279,6 +282,7 @@ func (s *Server) Close() {
s.GetCoordinator().Stop()
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
s.serverLoopCancel()
s.serverLoopWg.Wait()

Expand Down Expand Up @@ -319,6 +323,11 @@ func (s *Server) GetCluster() *Cluster {
return s.cluster
}

// GetBasicCluster returns the basic cluster.
func (s *Server) GetBasicCluster() *core.BasicCluster {
return s.basicCluster
}

// GetCoordinator returns the coordinator.
func (s *Server) GetCoordinator() *schedule.Coordinator {
return s.GetCluster().GetCoordinator()
Expand Down Expand Up @@ -361,19 +370,22 @@ func (s *Server) startServer() (err error) {
uniqueName := s.cfg.ListenAddr
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
schedulingPrimaryPrefix := endpoint.SchedulingSvcRootPath(s.clusterID)
s.participant = member.NewParticipant(s.GetClient())
s.participant.InitInfo(uniqueName, uniqueID, path.Join(schedulingPrimaryPrefix, fmt.Sprintf("%05d", 0)),
utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr)
s.participant = member.NewParticipant(s.GetClient(), utils.SchedulingServiceName)
p := &schedulingpb.Participant{
Name: uniqueName,
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{s.cfg.AdvertiseListenAddr},
}
s.participant.InitInfo(p, endpoint.SchedulingSvcRootPath(s.clusterID), utils.PrimaryKey, "primary election")
s.basicCluster = core.NewBasicCluster()
err = s.startWatcher()
if err != nil {
return err
}
s.storage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(s.GetClient(), endpoint.PDRootPath(s.clusterID)), nil)
basicCluster := core.NewBasicCluster()
s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, basicCluster)
s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh)
s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster)
s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh)
if err != nil {
return err
}
Expand Down Expand Up @@ -414,15 +426,15 @@ func (s *Server) startServer() (err error) {
}

func (s *Server) startWatcher() (err error) {
s.configWatcher, err = config.NewWatcher(
s.Context(), s.GetClient(), s.clusterID, s.persistConfig,
)
s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.basicCluster)
if err != nil {
return err
}
s.configWatcher, err = config.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.persistConfig)
if err != nil {
return err
}
s.ruleWatcher, err = rule.NewWatcher(
s.Context(), s.GetClient(), s.clusterID,
)
s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.clusterID)
return err
}

Expand Down
Loading

0 comments on commit 6729238

Please sign in to comment.