From b95dddaa7e9decaad7c24becb401c67cea83fd3a Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Tue, 10 Sep 2024 15:58:32 +0800 Subject: [PATCH] *: move key_path.go (#8596) ref tikv/pd#4399 Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/keyspace/keyspace.go | 5 +- pkg/keyspace/tso_keyspace_group.go | 5 +- pkg/mcs/resourcemanager/server/server.go | 4 +- pkg/mcs/scheduling/server/config/watcher.go | 6 +- pkg/mcs/scheduling/server/meta/watcher.go | 6 +- pkg/mcs/scheduling/server/rule/watcher.go | 9 +- .../scheduling/server/rule/watcher_test.go | 11 +- pkg/mcs/scheduling/server/server.go | 3 +- pkg/mcs/tso/server/server.go | 6 +- pkg/mcs/utils/expected_primary.go | 8 +- pkg/storage/endpoint/config.go | 17 +-- pkg/storage/endpoint/external_timestamp.go | 5 +- pkg/storage/endpoint/gc_safe_point.go | 21 ++-- pkg/storage/endpoint/key_path_test.go | 29 ----- pkg/storage/endpoint/keyspace.go | 13 +- pkg/storage/endpoint/meta.go | 33 ++--- pkg/storage/endpoint/min_resolved_ts.go | 5 +- pkg/storage/endpoint/replication_status.go | 5 +- pkg/storage/endpoint/resource_group.go | 17 +-- pkg/storage/endpoint/rule.go | 21 ++-- pkg/storage/endpoint/safepoint_v2.go | 23 ++-- pkg/storage/endpoint/service_middleware.go | 9 +- pkg/storage/endpoint/tso.go | 3 +- pkg/storage/endpoint/tso_keyspace_group.go | 11 +- pkg/storage/keyspace_test.go | 12 +- pkg/storage/region_storage.go | 5 +- pkg/storage/storage_gc_test.go | 3 +- pkg/storage/storage_test.go | 7 +- pkg/storage/storage_tso_test.go | 14 +-- pkg/tso/global_allocator.go | 4 +- pkg/tso/keyspace_group_manager.go | 15 +-- pkg/tso/keyspace_group_manager_test.go | 13 +- pkg/tso/local_allocator.go | 4 +- pkg/tso/tso.go | 3 +- pkg/tso/util_test.go | 6 +- .../endpoint => utils/keypath}/key_path.go | 115 +++++++++++------- pkg/utils/keypath/key_path_test.go | 43 +++++++ server/cluster/cluster.go | 4 +- server/gc_service.go | 5 +- server/keyspace_service.go | 4 +- server/server.go | 26 ++-- tests/integrations/client/gc_client_test.go | 8 +- .../mcs/tso/keyspace_group_manager_test.go | 7 +- tests/integrations/mcs/tso/server_test.go | 3 +- tools/pd-backup/pdbackup/backup.go | 4 +- tools/pd-backup/pdbackup/backup_test.go | 4 +- 46 files changed, 323 insertions(+), 261 deletions(-) delete mode 100644 pkg/storage/endpoint/key_path_test.go rename pkg/{storage/endpoint => utils/keypath}/key_path.go (77%) create mode 100644 pkg/utils/keypath/key_path_test.go diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index 08835e24baf..a338e929eb6 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -32,6 +32,7 @@ import ( "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" "go.uber.org/zap" @@ -230,8 +231,8 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac err = manager.splitKeyspaceRegion(newID, waitRegionSplit) if err != nil { err2 := manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error { - idPath := endpoint.KeyspaceIDPath(request.Name) - metaPath := endpoint.KeyspaceMetaPath(newID) + idPath := keypath.KeyspaceIDPath(request.Name) + metaPath := keypath.KeyspaceMetaPath(newID) e := txn.Remove(idPath) if e != nil { return e diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index f1ed6002a8c..a09098b3a47 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -34,6 +34,7 @@ import ( "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -1153,8 +1154,8 @@ func (m *GroupManager) GetKeyspaceGroupPrimaryByID(id uint32) (string, error) { return "", ErrKeyspaceGroupNotExists(id) } - rootPath := endpoint.TSOSvcRootPath(m.clusterID) - primaryPath := endpoint.KeyspaceGroupPrimaryPath(rootPath, id) + rootPath := keypath.TSOSvcRootPath(m.clusterID) + primaryPath := keypath.KeyspaceGroupPrimaryPath(rootPath, id) leader := &tsopb.Participant{} ok, _, err := etcdutil.GetProtoMsgWithModRev(m.client, primaryPath, leader) if err != nil { diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index c8ccfab0333..ccac4d5486a 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -39,9 +39,9 @@ import ( "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/member" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/grpcutil" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/memberutil" "github.com/tikv/pd/pkg/utils/metricutil" @@ -310,7 +310,7 @@ func (s *Server) startServer() (err error) { Id: uniqueID, // id is unique among all participants ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()}, } - s.participant.InitInfo(p, endpoint.ResourceManagerSvcRootPath(s.clusterID), constant.PrimaryKey, "primary election") + s.participant.InitInfo(p, keypath.ResourceManagerSvcRootPath(s.clusterID), constant.PrimaryKey, "primary election") s.service = &Service{ ctx: s.Context(), diff --git a/pkg/mcs/scheduling/server/config/watcher.go b/pkg/mcs/scheduling/server/config/watcher.go index 46201ba3cd3..04dad1fb3f5 100644 --- a/pkg/mcs/scheduling/server/config/watcher.go +++ b/pkg/mcs/scheduling/server/config/watcher.go @@ -27,8 +27,8 @@ import ( sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/storage" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -85,9 +85,9 @@ func NewWatcher( cw := &Watcher{ ctx: ctx, cancel: cancel, - configPath: endpoint.ConfigPath(clusterID), + configPath: keypath.ConfigPath(clusterID), ttlConfigPrefix: sc.TTLConfigPrefix, - schedulerConfigPathPrefix: endpoint.SchedulerConfigPathPrefix(clusterID), + schedulerConfigPathPrefix: keypath.SchedulerConfigPathPrefix(clusterID), etcdClient: etcdClient, PersistConfig: persistConfig, storage: storage, diff --git a/pkg/mcs/scheduling/server/meta/watcher.go b/pkg/mcs/scheduling/server/meta/watcher.go index 42e9cf054ec..40f8c9a4894 100644 --- a/pkg/mcs/scheduling/server/meta/watcher.go +++ b/pkg/mcs/scheduling/server/meta/watcher.go @@ -24,8 +24,8 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/statistics" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -59,7 +59,7 @@ func NewWatcher( ctx: ctx, cancel: cancel, clusterID: clusterID, - storePathPrefix: endpoint.StorePathPrefix(clusterID), + storePathPrefix: keypath.StorePathPrefix(clusterID), etcdClient: etcdClient, basicCluster: basicCluster, } @@ -95,7 +95,7 @@ func (w *Watcher) initializeStoreWatcher() error { } deleteFn := func(kv *mvccpb.KeyValue) error { key := string(kv.Key) - storeID, err := endpoint.ExtractStoreIDFromPath(w.clusterID, key) + storeID, err := keypath.ExtractStoreIDFromPath(w.clusterID, key) if err != nil { return err } diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index 118da18376c..49a9dcea85a 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -26,6 +26,7 @@ import ( "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -85,10 +86,10 @@ func NewWatcher( rw := &Watcher{ ctx: ctx, cancel: cancel, - rulesPathPrefix: endpoint.RulesPathPrefix(clusterID), - ruleCommonPathPrefix: endpoint.RuleCommonPathPrefix(clusterID), - ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID), - regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID), + rulesPathPrefix: keypath.RulesPathPrefix(clusterID), + ruleCommonPathPrefix: keypath.RuleCommonPathPrefix(clusterID), + ruleGroupPathPrefix: keypath.RuleGroupPathPrefix(clusterID), + regionLabelPathPrefix: keypath.RegionLabelPathPrefix(clusterID), etcdClient: etcdClient, ruleStorage: ruleStorage, checkerController: checkerController, diff --git a/pkg/mcs/scheduling/server/rule/watcher_test.go b/pkg/mcs/scheduling/server/rule/watcher_test.go index 11b721b4e9e..bf9ea0afc54 100644 --- a/pkg/mcs/scheduling/server/rule/watcher_test.go +++ b/pkg/mcs/scheduling/server/rule/watcher_test.go @@ -28,6 +28,7 @@ import ( "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/embed" ) @@ -64,10 +65,10 @@ func runWatcherLoadLabelRule(ctx context.Context, re *require.Assertions, client rw := &Watcher{ ctx: ctx, cancel: cancel, - rulesPathPrefix: endpoint.RulesPathPrefix(clusterID), - ruleCommonPathPrefix: endpoint.RuleCommonPathPrefix(clusterID), - ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID), - regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID), + rulesPathPrefix: keypath.RulesPathPrefix(clusterID), + ruleCommonPathPrefix: keypath.RuleCommonPathPrefix(clusterID), + ruleGroupPathPrefix: keypath.RuleGroupPathPrefix(clusterID), + regionLabelPathPrefix: keypath.RegionLabelPathPrefix(clusterID), etcdClient: client, ruleStorage: storage, regionLabeler: labelerManager, @@ -99,7 +100,7 @@ func prepare(t require.TestingT) (context.Context, *clientv3.Client, func()) { } value, err := json.Marshal(rule) re.NoError(err) - key := endpoint.RegionLabelPathPrefix(clusterID) + "/" + rule.ID + key := keypath.RegionLabelPathPrefix(clusterID) + "/" + rule.ID _, err = clientv3.NewKV(client).Put(ctx, key, string(value)) re.NoError(err) } diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index e3a6d9bd648..c4f424aedcb 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -57,6 +57,7 @@ import ( "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/grpcutil" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/memberutil" "github.com/tikv/pd/pkg/utils/metricutil" @@ -458,7 +459,7 @@ func (s *Server) startServer() (err error) { Id: uniqueID, // id is unique among all participants ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()}, } - s.participant.InitInfo(p, endpoint.SchedulingSvcRootPath(s.clusterID), constant.PrimaryKey, "primary election") + s.participant.InitInfo(p, keypath.SchedulingSvcRootPath(s.clusterID), constant.PrimaryKey, "primary election") s.service = &Service{Server: s} s.AddServiceReadyCallback(s.startCluster) diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 0134a65dfd0..002ac3db91b 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -40,11 +40,11 @@ import ( "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/member" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/systimemon" "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/grpcutil" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/metricutil" "github.com/tikv/pd/pkg/utils/tsoutil" @@ -363,8 +363,8 @@ func (s *Server) startServer() (err error) { // Initialize the TSO service. s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context()) - legacySvcRootPath := endpoint.LegacyRootPath(s.clusterID) - tsoSvcRootPath := endpoint.TSOSvcRootPath(s.clusterID) + legacySvcRootPath := keypath.LegacyRootPath(s.clusterID) + tsoSvcRootPath := keypath.TSOSvcRootPath(s.clusterID) s.keyspaceGroupManager = tso.NewKeyspaceGroupManager( s.serverLoopCtx, s.serviceID, s.GetClient(), s.GetHTTPClient(), s.cfg.AdvertiseListenAddr, s.clusterID, legacySvcRootPath, tsoSvcRootPath, s.cfg) diff --git a/pkg/mcs/utils/expected_primary.go b/pkg/mcs/utils/expected_primary.go index 102bb8d785c..bb6d3b0fc37 100644 --- a/pkg/mcs/utils/expected_primary.go +++ b/pkg/mcs/utils/expected_primary.go @@ -26,9 +26,9 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/discovery" "github.com/tikv/pd/pkg/mcs/utils/constant" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -173,10 +173,10 @@ func TransferPrimary(client *clientv3.Client, lease *election.Lease, serviceName var primaryPath string switch serviceName { case constant.SchedulingServiceName: - primaryPath = endpoint.SchedulingPrimaryPath(clusterID) + primaryPath = keypath.SchedulingPrimaryPath(clusterID) case constant.TSOServiceName: - tsoRootPath := endpoint.TSOSvcRootPath(clusterID) - primaryPath = endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, keyspaceGroupID) + tsoRootPath := keypath.TSOSvcRootPath(clusterID) + primaryPath = keypath.KeyspaceGroupPrimaryPath(tsoRootPath, keyspaceGroupID) } _, err = markExpectedPrimaryFlag(client, primaryPath, primaryIDs[nextPrimaryID], grantResp.ID) if err != nil { diff --git a/pkg/storage/endpoint/config.go b/pkg/storage/endpoint/config.go index 71ae8cb5248..d297562f275 100644 --- a/pkg/storage/endpoint/config.go +++ b/pkg/storage/endpoint/config.go @@ -19,6 +19,7 @@ import ( "strings" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/keypath" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -36,9 +37,9 @@ type ConfigStorage interface { var _ ConfigStorage = (*StorageEndpoint)(nil) -// LoadConfig loads config from configPath then unmarshal it to cfg. +// LoadConfig loads config from keypath.Config then unmarshal it to cfg. func (se *StorageEndpoint) LoadConfig(cfg any) (bool, error) { - value, err := se.Load(configPath) + value, err := se.Load(keypath.Config) if err != nil || value == "" { return false, err } @@ -49,14 +50,14 @@ func (se *StorageEndpoint) LoadConfig(cfg any) (bool, error) { return true, nil } -// SaveConfig stores marshallable cfg to the configPath. +// SaveConfig stores marshallable cfg to the keypath.Config. func (se *StorageEndpoint) SaveConfig(cfg any) error { - return se.saveJSON(configPath, cfg) + return se.saveJSON(keypath.Config, cfg) } // LoadAllSchedulerConfigs loads all schedulers' config. func (se *StorageEndpoint) LoadAllSchedulerConfigs() ([]string, []string, error) { - prefix := customSchedulerConfigPath + "/" + prefix := keypath.CustomSchedulerConfigPath + "/" keys, values, err := se.LoadRange(prefix, clientv3.GetPrefixRangeEnd(prefix), MinKVRangeLimit) for i, key := range keys { keys[i] = strings.TrimPrefix(key, prefix) @@ -66,15 +67,15 @@ func (se *StorageEndpoint) LoadAllSchedulerConfigs() ([]string, []string, error) // LoadSchedulerConfig loads the config of the given scheduler. func (se *StorageEndpoint) LoadSchedulerConfig(schedulerName string) (string, error) { - return se.Load(schedulerConfigPath(schedulerName)) + return se.Load(keypath.SchedulerConfigPath(schedulerName)) } // SaveSchedulerConfig saves the config of the given scheduler. func (se *StorageEndpoint) SaveSchedulerConfig(schedulerName string, data []byte) error { - return se.Save(schedulerConfigPath(schedulerName), string(data)) + return se.Save(keypath.SchedulerConfigPath(schedulerName), string(data)) } // RemoveSchedulerConfig removes the config of the given scheduler. func (se *StorageEndpoint) RemoveSchedulerConfig(schedulerName string) error { - return se.Remove(schedulerConfigPath(schedulerName)) + return se.Remove(keypath.SchedulerConfigPath(schedulerName)) } diff --git a/pkg/storage/endpoint/external_timestamp.go b/pkg/storage/endpoint/external_timestamp.go index bc5377d19f4..55f47620042 100644 --- a/pkg/storage/endpoint/external_timestamp.go +++ b/pkg/storage/endpoint/external_timestamp.go @@ -18,6 +18,7 @@ import ( "strconv" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/keypath" ) // ExternalTimestamp is the external timestamp. @@ -36,7 +37,7 @@ var _ ExternalTSStorage = (*StorageEndpoint)(nil) // LoadExternalTS loads the external timestamp from storage. func (se *StorageEndpoint) LoadExternalTS() (uint64, error) { - value, err := se.Load(ExternalTimestampPath()) + value, err := se.Load(keypath.ExternalTimestampPath()) if err != nil || value == "" { return 0, err } @@ -50,5 +51,5 @@ func (se *StorageEndpoint) LoadExternalTS() (uint64, error) { // SaveExternalTS saves the external timestamp. func (se *StorageEndpoint) SaveExternalTS(timestamp uint64) error { value := strconv.FormatUint(timestamp, 16) - return se.Save(ExternalTimestampPath(), value) + return se.Save(keypath.ExternalTimestampPath(), value) } diff --git a/pkg/storage/endpoint/gc_safe_point.go b/pkg/storage/endpoint/gc_safe_point.go index 7b0b0bf86a7..8a6c5c5f789 100644 --- a/pkg/storage/endpoint/gc_safe_point.go +++ b/pkg/storage/endpoint/gc_safe_point.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/keypath" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -49,7 +50,7 @@ var _ GCSafePointStorage = (*StorageEndpoint)(nil) // LoadGCSafePoint loads current GC safe point from storage. func (se *StorageEndpoint) LoadGCSafePoint() (uint64, error) { - value, err := se.Load(gcSafePointPath()) + value, err := se.Load(keypath.GCSafePointPath()) if err != nil || value == "" { return 0, err } @@ -63,12 +64,12 @@ func (se *StorageEndpoint) LoadGCSafePoint() (uint64, error) { // SaveGCSafePoint saves new GC safe point to storage. func (se *StorageEndpoint) SaveGCSafePoint(safePoint uint64) error { value := strconv.FormatUint(safePoint, 16) - return se.Save(gcSafePointPath(), value) + return se.Save(keypath.GCSafePointPath(), value) } // LoadMinServiceGCSafePoint returns the minimum safepoint across all services func (se *StorageEndpoint) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSafePoint, error) { - prefix := GCSafePointServicePrefixPath() + prefix := keypath.GCSafePointServicePrefixPath() prefixEnd := clientv3.GetPrefixRangeEnd(prefix) keys, values, err := se.LoadRange(prefix, prefixEnd, 0) if err != nil { @@ -88,7 +89,7 @@ func (se *StorageEndpoint) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSaf if err := json.Unmarshal([]byte(values[i]), ssp); err != nil { return nil, err } - if ssp.ServiceID == GCWorkerServiceSafePointID { + if ssp.ServiceID == keypath.GCWorkerServiceSafePointID { hasGCWorker = true // If gc_worker's expire time is incorrectly set, fix it. if ssp.ExpiredAt != math.MaxInt64 { @@ -128,7 +129,7 @@ func (se *StorageEndpoint) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSaf func (se *StorageEndpoint) initServiceGCSafePointForGCWorker(initialValue uint64) (*ServiceSafePoint, error) { ssp := &ServiceSafePoint{ - ServiceID: GCWorkerServiceSafePointID, + ServiceID: keypath.GCWorkerServiceSafePointID, SafePoint: initialValue, ExpiredAt: math.MaxInt64, } @@ -140,7 +141,7 @@ func (se *StorageEndpoint) initServiceGCSafePointForGCWorker(initialValue uint64 // LoadAllServiceGCSafePoints returns all services GC safepoints func (se *StorageEndpoint) LoadAllServiceGCSafePoints() ([]*ServiceSafePoint, error) { - prefix := GCSafePointServicePrefixPath() + prefix := keypath.GCSafePointServicePrefixPath() prefixEnd := clientv3.GetPrefixRangeEnd(prefix) keys, values, err := se.LoadRange(prefix, prefixEnd, 0) if err != nil { @@ -168,18 +169,18 @@ func (se *StorageEndpoint) SaveServiceGCSafePoint(ssp *ServiceSafePoint) error { return errors.New("service id of service safepoint cannot be empty") } - if ssp.ServiceID == GCWorkerServiceSafePointID && ssp.ExpiredAt != math.MaxInt64 { + if ssp.ServiceID == keypath.GCWorkerServiceSafePointID && ssp.ExpiredAt != math.MaxInt64 { return errors.New("TTL of gc_worker's service safe point must be infinity") } - return se.saveJSON(gcSafePointServicePath(ssp.ServiceID), ssp) + return se.saveJSON(keypath.GCSafePointServicePath(ssp.ServiceID), ssp) } // RemoveServiceGCSafePoint removes a GC safepoint for the service func (se *StorageEndpoint) RemoveServiceGCSafePoint(serviceID string) error { - if serviceID == GCWorkerServiceSafePointID { + if serviceID == keypath.GCWorkerServiceSafePointID { return errors.New("cannot remove service safe point of gc_worker") } - key := gcSafePointServicePath(serviceID) + key := keypath.GCSafePointServicePath(serviceID) return se.Remove(key) } diff --git a/pkg/storage/endpoint/key_path_test.go b/pkg/storage/endpoint/key_path_test.go deleted file mode 100644 index d6ef584105a..00000000000 --- a/pkg/storage/endpoint/key_path_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package endpoint - -import ( - "fmt" - "math/rand" - "path" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestRegionPath(t *testing.T) { - re := require.New(t) - f := func(id uint64) string { - return path.Join(regionPathPrefix, fmt.Sprintf("%020d", id)) - } - rand.New(rand.NewSource(time.Now().Unix())) - for i := 0; i < 1000; i++ { - id := rand.Uint64() - re.Equal(f(id), RegionPath(id)) - } -} - -func BenchmarkRegionPath(b *testing.B) { - for i := 0; i < b.N; i++ { - _ = RegionPath(uint64(i)) - } -} diff --git a/pkg/storage/endpoint/keyspace.go b/pkg/storage/endpoint/keyspace.go index b892250c463..77240541951 100644 --- a/pkg/storage/endpoint/keyspace.go +++ b/pkg/storage/endpoint/keyspace.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/keypath" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -49,7 +50,7 @@ var _ KeyspaceStorage = (*StorageEndpoint)(nil) // SaveKeyspaceMeta adds a save keyspace meta operation to target transaction. func (*StorageEndpoint) SaveKeyspaceMeta(txn kv.Txn, meta *keyspacepb.KeyspaceMeta) error { - metaPath := KeyspaceMetaPath(meta.GetId()) + metaPath := keypath.KeyspaceMetaPath(meta.GetId()) metaVal, err := proto.Marshal(meta) if err != nil { return errs.ErrProtoMarshal.Wrap(err).GenWithStackByCause() @@ -60,7 +61,7 @@ func (*StorageEndpoint) SaveKeyspaceMeta(txn kv.Txn, meta *keyspacepb.KeyspaceMe // LoadKeyspaceMeta load and return keyspace meta specified by id. // If keyspace does not exist or error occurs, returned meta will be nil. func (*StorageEndpoint) LoadKeyspaceMeta(txn kv.Txn, id uint32) (*keyspacepb.KeyspaceMeta, error) { - metaPath := KeyspaceMetaPath(id) + metaPath := keypath.KeyspaceMetaPath(id) metaVal, err := txn.Load(metaPath) if err != nil || metaVal == "" { return nil, err @@ -75,7 +76,7 @@ func (*StorageEndpoint) LoadKeyspaceMeta(txn kv.Txn, id uint32) (*keyspacepb.Key // SaveKeyspaceID saves keyspace ID to the path specified by keyspace name. func (*StorageEndpoint) SaveKeyspaceID(txn kv.Txn, id uint32, name string) error { - idPath := KeyspaceIDPath(name) + idPath := keypath.KeyspaceIDPath(name) idVal := strconv.FormatUint(uint64(id), SpaceIDBase) return txn.Save(idPath, idVal) } @@ -84,7 +85,7 @@ func (*StorageEndpoint) SaveKeyspaceID(txn kv.Txn, id uint32, name string) error // An additional boolean is returned to indicate whether target id exists, // it returns false if target id not found, or if error occurred. func (*StorageEndpoint) LoadKeyspaceID(txn kv.Txn, name string) (bool, uint32, error) { - idPath := KeyspaceIDPath(name) + idPath := keypath.KeyspaceIDPath(name) idVal, err := txn.Load(idPath) // Failed to load the keyspaceID if loading operation errored, or if keyspace does not exist. if err != nil || idVal == "" { @@ -100,8 +101,8 @@ func (*StorageEndpoint) LoadKeyspaceID(txn kv.Txn, name string) (bool, uint32, e // LoadRangeKeyspace loads keyspaces starting at startID. // limit specifies the limit of loaded keyspaces. func (*StorageEndpoint) LoadRangeKeyspace(txn kv.Txn, startID uint32, limit int) ([]*keyspacepb.KeyspaceMeta, error) { - startKey := KeyspaceMetaPath(startID) - endKey := clientv3.GetPrefixRangeEnd(KeyspaceMetaPrefix()) + startKey := keypath.KeyspaceMetaPath(startID) + endKey := clientv3.GetPrefixRangeEnd(keypath.KeyspaceMetaPrefix()) keys, values, err := txn.LoadRange(startKey, endKey, limit) if err != nil { return nil, err diff --git a/pkg/storage/endpoint/meta.go b/pkg/storage/endpoint/meta.go index 33482da512f..176188be3f3 100644 --- a/pkg/storage/endpoint/meta.go +++ b/pkg/storage/endpoint/meta.go @@ -26,6 +26,7 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/encryption" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/keypath" ) // MetaStorage defines the storage operations on the PD cluster meta info. @@ -62,41 +63,41 @@ const ( // LoadMeta loads cluster meta from the storage. This method will only // be used by the PD server, so we should only implement it for the etcd storage. func (se *StorageEndpoint) LoadMeta(meta *metapb.Cluster) (bool, error) { - return se.loadProto(clusterPath, meta) + return se.loadProto(keypath.ClusterPath, meta) } // SaveMeta save cluster meta to the storage. This method will only // be used by the PD server, so we should only implement it for the etcd storage. func (se *StorageEndpoint) SaveMeta(meta *metapb.Cluster) error { - return se.saveProto(clusterPath, meta) + return se.saveProto(keypath.ClusterPath, meta) } // LoadStoreMeta loads one store from storage. func (se *StorageEndpoint) LoadStoreMeta(storeID uint64, store *metapb.Store) (bool, error) { - return se.loadProto(StorePath(storeID), store) + return se.loadProto(keypath.StorePath(storeID), store) } // SaveStoreMeta saves one store to storage. func (se *StorageEndpoint) SaveStoreMeta(store *metapb.Store) error { - return se.saveProto(StorePath(store.GetId()), store) + return se.saveProto(keypath.StorePath(store.GetId()), store) } // SaveStoreWeight saves a store's leader and region weight to storage. func (se *StorageEndpoint) SaveStoreWeight(storeID uint64, leader, region float64) error { leaderValue := strconv.FormatFloat(leader, 'f', -1, 64) - if err := se.Save(storeLeaderWeightPath(storeID), leaderValue); err != nil { + if err := se.Save(keypath.StoreLeaderWeightPath(storeID), leaderValue); err != nil { return err } regionValue := strconv.FormatFloat(region, 'f', -1, 64) - return se.Save(storeRegionWeightPath(storeID), regionValue) + return se.Save(keypath.StoreRegionWeightPath(storeID), regionValue) } // LoadStores loads all stores from storage to StoresInfo. func (se *StorageEndpoint) LoadStores(f func(store *core.StoreInfo)) error { nextID := uint64(0) - endKey := StorePath(math.MaxUint64) + endKey := keypath.StorePath(math.MaxUint64) for { - key := StorePath(nextID) + key := keypath.StorePath(nextID) _, res, err := se.LoadRange(key, endKey, MinKVRangeLimit) if err != nil { return err @@ -112,11 +113,11 @@ func (se *StorageEndpoint) LoadStores(f func(store *core.StoreInfo)) error { if store.State == metapb.StoreState_Tombstone { store.NodeState = metapb.NodeState_Removed } - leaderWeight, err := se.loadFloatWithDefaultValue(storeLeaderWeightPath(store.GetId()), 1.0) + leaderWeight, err := se.loadFloatWithDefaultValue(keypath.StoreLeaderWeightPath(store.GetId()), 1.0) if err != nil { return err } - regionWeight, err := se.loadFloatWithDefaultValue(storeRegionWeightPath(store.GetId()), 1.0) + regionWeight, err := se.loadFloatWithDefaultValue(keypath.StoreRegionWeightPath(store.GetId()), 1.0) if err != nil { return err } @@ -148,12 +149,12 @@ func (se *StorageEndpoint) loadFloatWithDefaultValue(path string, def float64) ( // DeleteStoreMeta deletes one store from storage. func (se *StorageEndpoint) DeleteStoreMeta(store *metapb.Store) error { - return se.Remove(StorePath(store.GetId())) + return se.Remove(keypath.StorePath(store.GetId())) } // LoadRegion loads one region from the backend storage. func (se *StorageEndpoint) LoadRegion(regionID uint64, region *metapb.Region) (ok bool, err error) { - value, err := se.Load(RegionPath(regionID)) + value, err := se.Load(keypath.RegionPath(regionID)) if err != nil || value == "" { return false, err } @@ -168,7 +169,7 @@ func (se *StorageEndpoint) LoadRegion(regionID uint64, region *metapb.Region) (o // LoadRegions loads all regions from storage to RegionsInfo. func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error { nextID := uint64(0) - endKey := RegionPath(math.MaxUint64) + endKey := keypath.RegionPath(math.MaxUint64) // Since the region key may be very long, using a larger rangeLimit will cause // the message packet to exceed the grpc message size limit (4MB). Here we use @@ -179,7 +180,7 @@ func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core. rangeLimit = 1 time.Sleep(time.Second) }) - startKey := RegionPath(nextID) + startKey := keypath.RegionPath(nextID) _, res, err := se.LoadRange(startKey, endKey, rangeLimit) if err != nil { if rangeLimit /= 2; rangeLimit >= MinKVRangeLimit { @@ -227,12 +228,12 @@ func (se *StorageEndpoint) SaveRegion(region *metapb.Region) error { if err != nil { return errs.ErrProtoMarshal.Wrap(err).GenWithStackByArgs() } - return se.Save(RegionPath(region.GetId()), string(value)) + return se.Save(keypath.RegionPath(region.GetId()), string(value)) } // DeleteRegion deletes one region from storage. func (se *StorageEndpoint) DeleteRegion(region *metapb.Region) error { - return se.Remove(RegionPath(region.GetId())) + return se.Remove(keypath.RegionPath(region.GetId())) } // Flush flushes the pending data to the underlying storage backend. diff --git a/pkg/storage/endpoint/min_resolved_ts.go b/pkg/storage/endpoint/min_resolved_ts.go index a8dd5c48538..5dae1afbaaa 100644 --- a/pkg/storage/endpoint/min_resolved_ts.go +++ b/pkg/storage/endpoint/min_resolved_ts.go @@ -18,6 +18,7 @@ import ( "strconv" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/keypath" ) // MinResolvedTSPoint is the min resolved ts for a store @@ -36,7 +37,7 @@ var _ MinResolvedTSStorage = (*StorageEndpoint)(nil) // LoadMinResolvedTS loads the min resolved ts from storage. func (se *StorageEndpoint) LoadMinResolvedTS() (uint64, error) { - value, err := se.Load(MinResolvedTSPath()) + value, err := se.Load(keypath.MinResolvedTSPath()) if err != nil || value == "" { return 0, err } @@ -50,5 +51,5 @@ func (se *StorageEndpoint) LoadMinResolvedTS() (uint64, error) { // SaveMinResolvedTS saves the min resolved ts. func (se *StorageEndpoint) SaveMinResolvedTS(minResolvedTS uint64) error { value := strconv.FormatUint(minResolvedTS, 16) - return se.Save(MinResolvedTSPath(), value) + return se.Save(keypath.MinResolvedTSPath(), value) } diff --git a/pkg/storage/endpoint/replication_status.go b/pkg/storage/endpoint/replication_status.go index 3cfaaefb9a4..f8ffcbc5bda 100644 --- a/pkg/storage/endpoint/replication_status.go +++ b/pkg/storage/endpoint/replication_status.go @@ -18,6 +18,7 @@ import ( "encoding/json" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/keypath" ) // ReplicationStatusStorage defines the storage operations on the replication status. @@ -30,7 +31,7 @@ var _ ReplicationStatusStorage = (*StorageEndpoint)(nil) // LoadReplicationStatus loads replication status by mode. func (se *StorageEndpoint) LoadReplicationStatus(mode string, status any) (bool, error) { - v, err := se.Load(replicationModePath(mode)) + v, err := se.Load(keypath.ReplicationModePath(mode)) if err != nil || v == "" { return false, err } @@ -43,5 +44,5 @@ func (se *StorageEndpoint) LoadReplicationStatus(mode string, status any) (bool, // SaveReplicationStatus stores replication status by mode. func (se *StorageEndpoint) SaveReplicationStatus(mode string, status any) error { - return se.saveJSON(replicationModePath(mode), status) + return se.saveJSON(keypath.ReplicationModePath(mode), status) } diff --git a/pkg/storage/endpoint/resource_group.go b/pkg/storage/endpoint/resource_group.go index e777ea635c6..733e5ba2c9a 100644 --- a/pkg/storage/endpoint/resource_group.go +++ b/pkg/storage/endpoint/resource_group.go @@ -16,6 +16,7 @@ package endpoint import ( "github.com/gogo/protobuf/proto" + "github.com/tikv/pd/pkg/utils/keypath" ) // ResourceGroupStorage defines the storage operations on the resource group. @@ -34,40 +35,40 @@ var _ ResourceGroupStorage = (*StorageEndpoint)(nil) // SaveResourceGroupSetting stores a resource group to storage. func (se *StorageEndpoint) SaveResourceGroupSetting(name string, msg proto.Message) error { - return se.saveProto(resourceGroupSettingKeyPath(name), msg) + return se.saveProto(keypath.ResourceGroupSettingKeyPath(name), msg) } // DeleteResourceGroupSetting removes a resource group from storage. func (se *StorageEndpoint) DeleteResourceGroupSetting(name string) error { - return se.Remove(resourceGroupSettingKeyPath(name)) + return se.Remove(keypath.ResourceGroupSettingKeyPath(name)) } // LoadResourceGroupSettings loads all resource groups from storage. func (se *StorageEndpoint) LoadResourceGroupSettings(f func(k, v string)) error { - return se.loadRangeByPrefix(resourceGroupSettingsPath+"/", f) + return se.loadRangeByPrefix(keypath.ResourceGroupSettingsPath+"/", f) } // SaveResourceGroupStates stores a resource group to storage. func (se *StorageEndpoint) SaveResourceGroupStates(name string, obj any) error { - return se.saveJSON(resourceGroupStateKeyPath(name), obj) + return se.saveJSON(keypath.ResourceGroupStateKeyPath(name), obj) } // DeleteResourceGroupStates removes a resource group from storage. func (se *StorageEndpoint) DeleteResourceGroupStates(name string) error { - return se.Remove(resourceGroupStateKeyPath(name)) + return se.Remove(keypath.ResourceGroupStateKeyPath(name)) } // LoadResourceGroupStates loads all resource groups from storage. func (se *StorageEndpoint) LoadResourceGroupStates(f func(k, v string)) error { - return se.loadRangeByPrefix(resourceGroupStatesPath+"/", f) + return se.loadRangeByPrefix(keypath.ResourceGroupStatesPath+"/", f) } // SaveControllerConfig stores the resource controller config to storage. func (se *StorageEndpoint) SaveControllerConfig(config any) error { - return se.saveJSON(controllerConfigPath, config) + return se.saveJSON(keypath.ControllerConfigPath, config) } // LoadControllerConfig loads the resource controller config from storage. func (se *StorageEndpoint) LoadControllerConfig() (string, error) { - return se.Load(controllerConfigPath) + return se.Load(keypath.ControllerConfigPath) } diff --git a/pkg/storage/endpoint/rule.go b/pkg/storage/endpoint/rule.go index 84ad6ee1352..cee8dd7c1c3 100644 --- a/pkg/storage/endpoint/rule.go +++ b/pkg/storage/endpoint/rule.go @@ -18,6 +18,7 @@ import ( "context" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/keypath" ) // RuleStorage defines the storage operations on the rule. @@ -45,50 +46,50 @@ var _ RuleStorage = (*StorageEndpoint)(nil) // SaveRule stores a rule cfg to the rulesPath. func (*StorageEndpoint) SaveRule(txn kv.Txn, ruleKey string, rule any) error { - return saveJSONInTxn(txn, ruleKeyPath(ruleKey), rule) + return saveJSONInTxn(txn, keypath.RuleKeyPath(ruleKey), rule) } // DeleteRule removes a rule from storage. func (*StorageEndpoint) DeleteRule(txn kv.Txn, ruleKey string) error { - return txn.Remove(ruleKeyPath(ruleKey)) + return txn.Remove(keypath.RuleKeyPath(ruleKey)) } // LoadRuleGroups loads all rule groups from storage. func (se *StorageEndpoint) LoadRuleGroups(f func(k, v string)) error { - return se.loadRangeByPrefix(ruleGroupPath+"/", f) + return se.loadRangeByPrefix(keypath.RuleGroupPath+"/", f) } // SaveRuleGroup stores a rule group config to storage. func (*StorageEndpoint) SaveRuleGroup(txn kv.Txn, groupID string, group any) error { - return saveJSONInTxn(txn, ruleGroupIDPath(groupID), group) + return saveJSONInTxn(txn, keypath.RuleGroupIDPath(groupID), group) } // DeleteRuleGroup removes a rule group from storage. func (*StorageEndpoint) DeleteRuleGroup(txn kv.Txn, groupID string) error { - return txn.Remove(ruleGroupIDPath(groupID)) + return txn.Remove(keypath.RuleGroupIDPath(groupID)) } // LoadRegionRules loads region rules from storage. func (se *StorageEndpoint) LoadRegionRules(f func(k, v string)) error { - return se.loadRangeByPrefix(regionLabelPath+"/", f) + return se.loadRangeByPrefix(keypath.RegionLabelPath+"/", f) } // SaveRegionRule saves a region rule to the storage. func (*StorageEndpoint) SaveRegionRule(txn kv.Txn, ruleKey string, rule any) error { - return saveJSONInTxn(txn, regionLabelKeyPath(ruleKey), rule) + return saveJSONInTxn(txn, keypath.RegionLabelKeyPath(ruleKey), rule) } // DeleteRegionRule removes a region rule from storage. func (*StorageEndpoint) DeleteRegionRule(txn kv.Txn, ruleKey string) error { - return txn.Remove(regionLabelKeyPath(ruleKey)) + return txn.Remove(keypath.RegionLabelKeyPath(ruleKey)) } // LoadRule load a placement rule from storage. func (se *StorageEndpoint) LoadRule(ruleKey string) (string, error) { - return se.Load(ruleKeyPath(ruleKey)) + return se.Load(keypath.RuleKeyPath(ruleKey)) } // LoadRules loads placement rules from storage. func (se *StorageEndpoint) LoadRules(f func(k, v string)) error { - return se.loadRangeByPrefix(rulesPath+"/", f) + return se.loadRangeByPrefix(keypath.RulesPath+"/", f) } diff --git a/pkg/storage/endpoint/safepoint_v2.go b/pkg/storage/endpoint/safepoint_v2.go index 2072f9c12ec..f16855e86d6 100644 --- a/pkg/storage/endpoint/safepoint_v2.go +++ b/pkg/storage/endpoint/safepoint_v2.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/keypath" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -58,7 +59,7 @@ var _ SafePointV2Storage = (*StorageEndpoint)(nil) // LoadGCSafePointV2 loads gc safe point for the given keyspace. func (se *StorageEndpoint) LoadGCSafePointV2(keyspaceID uint32) (*GCSafePointV2, error) { - key := GCSafePointV2Path(keyspaceID) + key := keypath.GCSafePointV2Path(keyspaceID) value, err := se.Load(key) if err != nil { return nil, err @@ -79,12 +80,12 @@ func (se *StorageEndpoint) LoadGCSafePointV2(keyspaceID uint32) (*GCSafePointV2, // SaveGCSafePointV2 saves gc safe point for the given keyspace. func (se *StorageEndpoint) SaveGCSafePointV2(gcSafePoint *GCSafePointV2) error { - return se.saveJSON(GCSafePointV2Path(gcSafePoint.KeyspaceID), gcSafePoint) + return se.saveJSON(keypath.GCSafePointV2Path(gcSafePoint.KeyspaceID), gcSafePoint) } // LoadAllGCSafePoints returns gc safe point for all keyspaces func (se *StorageEndpoint) LoadAllGCSafePoints() ([]*GCSafePointV2, error) { - prefix := GCSafePointV2Prefix() + prefix := keypath.GCSafePointV2Prefix() prefixEnd := clientv3.GetPrefixRangeEnd(prefix) _, values, err := se.LoadRange(prefix, prefixEnd, 0) if err != nil { @@ -105,7 +106,7 @@ func (se *StorageEndpoint) LoadAllGCSafePoints() ([]*GCSafePointV2, error) { // If no service safe point exist for the given key space or all the service safe points just expired, return nil. // This also attempt to remove expired service safe point. func (se *StorageEndpoint) LoadMinServiceSafePointV2(keyspaceID uint32, now time.Time) (*ServiceSafePointV2, error) { - prefix := ServiceSafePointV2Prefix(keyspaceID) + prefix := keypath.ServiceSafePointV2Prefix(keyspaceID) prefixEnd := clientv3.GetPrefixRangeEnd(prefix) keys, values, err := se.LoadRange(prefix, prefixEnd, 0) if err != nil { @@ -122,7 +123,7 @@ func (se *StorageEndpoint) LoadMinServiceSafePointV2(keyspaceID uint32, now time if err = json.Unmarshal([]byte(values[i]), serviceSafePoint); err != nil { return nil, err } - if serviceSafePoint.ServiceID == GCWorkerServiceSafePointID { + if serviceSafePoint.ServiceID == keypath.GCWorkerServiceSafePointID { hasGCWorker = true // If gc_worker's expire time is incorrectly set, fix it. if serviceSafePoint.ExpiredAt != math.MaxInt64 { @@ -158,7 +159,7 @@ func (se *StorageEndpoint) LoadMinServiceSafePointV2(keyspaceID uint32, now time // LoadServiceSafePointV2 returns ServiceSafePointV2 for given keyspaceID and serviceID. func (se *StorageEndpoint) LoadServiceSafePointV2(keyspaceID uint32, serviceID string) (*ServiceSafePointV2, error) { - key := ServiceSafePointV2Path(keyspaceID, serviceID) + key := keypath.ServiceSafePointV2Path(keyspaceID, serviceID) value, err := se.Load(key) if err != nil { return nil, err @@ -177,7 +178,7 @@ func (se *StorageEndpoint) LoadServiceSafePointV2(keyspaceID uint32, serviceID s func (se *StorageEndpoint) initServiceSafePointV2ForGCWorker(keyspaceID uint32, initialValue uint64) (*ServiceSafePointV2, error) { ssp := &ServiceSafePointV2{ KeyspaceID: keyspaceID, - ServiceID: GCWorkerServiceSafePointID, + ServiceID: keypath.GCWorkerServiceSafePointID, SafePoint: initialValue, ExpiredAt: math.MaxInt64, } @@ -193,19 +194,19 @@ func (se *StorageEndpoint) SaveServiceSafePointV2(serviceSafePoint *ServiceSafeP return errors.New("service id of service safepoint cannot be empty") } - if serviceSafePoint.ServiceID == GCWorkerServiceSafePointID && serviceSafePoint.ExpiredAt != math.MaxInt64 { + if serviceSafePoint.ServiceID == keypath.GCWorkerServiceSafePointID && serviceSafePoint.ExpiredAt != math.MaxInt64 { return errors.New("TTL of gc_worker's service safe point must be infinity") } - key := ServiceSafePointV2Path(serviceSafePoint.KeyspaceID, serviceSafePoint.ServiceID) + key := keypath.ServiceSafePointV2Path(serviceSafePoint.KeyspaceID, serviceSafePoint.ServiceID) return se.saveJSON(key, serviceSafePoint) } // RemoveServiceSafePointV2 removes a service safe point. func (se *StorageEndpoint) RemoveServiceSafePointV2(keyspaceID uint32, serviceID string) error { - if serviceID == GCWorkerServiceSafePointID { + if serviceID == keypath.GCWorkerServiceSafePointID { return errors.New("cannot remove service safe point of gc_worker") } - key := ServiceSafePointV2Path(keyspaceID, serviceID) + key := keypath.ServiceSafePointV2Path(keyspaceID, serviceID) return se.Remove(key) } diff --git a/pkg/storage/endpoint/service_middleware.go b/pkg/storage/endpoint/service_middleware.go index 23095900755..3859dab4d62 100644 --- a/pkg/storage/endpoint/service_middleware.go +++ b/pkg/storage/endpoint/service_middleware.go @@ -18,6 +18,7 @@ import ( "encoding/json" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/keypath" ) // ServiceMiddlewareStorage defines the storage operations on the service middleware. @@ -28,9 +29,9 @@ type ServiceMiddlewareStorage interface { var _ ServiceMiddlewareStorage = (*StorageEndpoint)(nil) -// LoadServiceMiddlewareConfig loads service middleware config from serviceMiddlewarePath then unmarshal it to cfg. +// LoadServiceMiddlewareConfig loads service middleware config from keypath.KeyspaceGroupLocalTSPath then unmarshal it to cfg. func (se *StorageEndpoint) LoadServiceMiddlewareConfig(cfg any) (bool, error) { - value, err := se.Load(serviceMiddlewarePath) + value, err := se.Load(keypath.ServiceMiddlewarePath) if err != nil || value == "" { return false, err } @@ -41,7 +42,7 @@ func (se *StorageEndpoint) LoadServiceMiddlewareConfig(cfg any) (bool, error) { return true, nil } -// SaveServiceMiddlewareConfig stores marshallable cfg to the serviceMiddlewarePath. +// SaveServiceMiddlewareConfig stores marshallable cfg to the keypath.KeyspaceGroupLocalTSPath. func (se *StorageEndpoint) SaveServiceMiddlewareConfig(cfg any) error { - return se.saveJSON(serviceMiddlewarePath, cfg) + return se.saveJSON(keypath.ServiceMiddlewarePath, cfg) } diff --git a/pkg/storage/endpoint/tso.go b/pkg/storage/endpoint/tso.go index 30aceb88fa8..a656f6d2945 100644 --- a/pkg/storage/endpoint/tso.go +++ b/pkg/storage/endpoint/tso.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/typeutil" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -52,7 +53,7 @@ func (se *StorageEndpoint) LoadTimestamp(prefix string) (time.Time, error) { maxTSWindow := typeutil.ZeroTime for i, key := range keys { key := strings.TrimSpace(key) - if !strings.HasSuffix(key, TimestampKey) { + if !strings.HasSuffix(key, keypath.TimestampKey) { continue } tsWindow, err := typeutil.ParseTimestamp([]byte(values[i])) diff --git a/pkg/storage/endpoint/tso_keyspace_group.go b/pkg/storage/endpoint/tso_keyspace_group.go index 9c7b12fff5f..b45d1f1da37 100644 --- a/pkg/storage/endpoint/tso_keyspace_group.go +++ b/pkg/storage/endpoint/tso_keyspace_group.go @@ -20,6 +20,7 @@ import ( "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/typeutil" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -173,7 +174,7 @@ var _ KeyspaceGroupStorage = (*StorageEndpoint)(nil) // LoadKeyspaceGroup loads the keyspace group by ID. func (*StorageEndpoint) LoadKeyspaceGroup(txn kv.Txn, id uint32) (*KeyspaceGroup, error) { - value, err := txn.Load(KeyspaceGroupIDPath(id)) + value, err := txn.Load(keypath.KeyspaceGroupIDPath(id)) if err != nil || value == "" { return nil, err } @@ -186,19 +187,19 @@ func (*StorageEndpoint) LoadKeyspaceGroup(txn kv.Txn, id uint32) (*KeyspaceGroup // SaveKeyspaceGroup saves the keyspace group. func (*StorageEndpoint) SaveKeyspaceGroup(txn kv.Txn, kg *KeyspaceGroup) error { - return saveJSONInTxn(txn, KeyspaceGroupIDPath(kg.ID), kg) + return saveJSONInTxn(txn, keypath.KeyspaceGroupIDPath(kg.ID), kg) } // DeleteKeyspaceGroup deletes the keyspace group. func (*StorageEndpoint) DeleteKeyspaceGroup(txn kv.Txn, id uint32) error { - return txn.Remove(KeyspaceGroupIDPath(id)) + return txn.Remove(keypath.KeyspaceGroupIDPath(id)) } // LoadKeyspaceGroups loads keyspace groups from the start ID with limit. // If limit is 0, it will load all keyspace groups from the start ID. func (se *StorageEndpoint) LoadKeyspaceGroups(startID uint32, limit int) ([]*KeyspaceGroup, error) { - prefix := KeyspaceGroupIDPath(startID) - prefixEnd := clientv3.GetPrefixRangeEnd(KeyspaceGroupIDPrefix()) + prefix := keypath.KeyspaceGroupIDPath(startID) + prefixEnd := clientv3.GetPrefixRangeEnd(keypath.KeyspaceGroupIDPrefix()) keys, values, err := se.LoadRange(prefix, prefixEnd, limit) if err != nil { return nil, err diff --git a/pkg/storage/keyspace_test.go b/pkg/storage/keyspace_test.go index 1236d1b34bd..a6ce94e4711 100644 --- a/pkg/storage/keyspace_test.go +++ b/pkg/storage/keyspace_test.go @@ -21,8 +21,8 @@ import ( "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/stretchr/testify/require" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/keypath" ) func TestSaveLoadKeyspace(t *testing.T) { @@ -145,9 +145,9 @@ func makeTestKeyspaces() []*keyspacepb.KeyspaceMeta { // TestEncodeSpaceID test spaceID encoding. func TestEncodeSpaceID(t *testing.T) { re := require.New(t) - re.Equal("keyspaces/meta/00000000", endpoint.KeyspaceMetaPath(0)) - re.Equal("keyspaces/meta/16777215", endpoint.KeyspaceMetaPath(1<<24-1)) - re.Equal("keyspaces/meta/00000100", endpoint.KeyspaceMetaPath(100)) - re.Equal("keyspaces/meta/00000011", endpoint.KeyspaceMetaPath(11)) - re.Equal("keyspaces/meta/00000010", endpoint.KeyspaceMetaPath(10)) + re.Equal("keyspaces/meta/00000000", keypath.KeyspaceMetaPath(0)) + re.Equal("keyspaces/meta/16777215", keypath.KeyspaceMetaPath(1<<24-1)) + re.Equal("keyspaces/meta/00000100", keypath.KeyspaceMetaPath(100)) + re.Equal("keyspaces/meta/00000011", keypath.KeyspaceMetaPath(11)) + re.Equal("keyspaces/meta/00000010", keypath.KeyspaceMetaPath(10)) } diff --git a/pkg/storage/region_storage.go b/pkg/storage/region_storage.go index 11bc6a7cc21..5a581ec5155 100644 --- a/pkg/storage/region_storage.go +++ b/pkg/storage/region_storage.go @@ -24,6 +24,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/keypath" ) // RegionStorage is a storage for the PD region meta information based on LevelDB, @@ -60,12 +61,12 @@ func (s *RegionStorage) SaveRegion(region *metapb.Region) error { if err != nil { return errs.ErrProtoMarshal.Wrap(err).GenWithStackByCause() } - return s.backend.SaveIntoBatch(endpoint.RegionPath(region.GetId()), value) + return s.backend.SaveIntoBatch(keypath.RegionPath(region.GetId()), value) } // DeleteRegion implements the `endpoint.RegionStorage` interface. func (s *RegionStorage) DeleteRegion(region *metapb.Region) error { - return s.backend.Remove((endpoint.RegionPath(region.GetId()))) + return s.backend.Remove((keypath.RegionPath(region.GetId()))) } // Flush implements the `endpoint.RegionStorage` interface. diff --git a/pkg/storage/storage_gc_test.go b/pkg/storage/storage_gc_test.go index 77f7c7dbf65..b18fcc12afc 100644 --- a/pkg/storage/storage_gc_test.go +++ b/pkg/storage/storage_gc_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/failpoint" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/keypath" ) func testGCSafePoints() []*endpoint.GCSafePointV2 { @@ -93,7 +94,7 @@ func TestLoadMinServiceSafePoint(t *testing.T) { // gc_worker service safepoint will not be removed. ssp, err := storage.LoadMinServiceSafePointV2(testKeyspaceID, currentTime.Add(5000*time.Second)) re.NoError(err) - re.Equal(endpoint.GCWorkerServiceSafePointID, ssp.ServiceID) + re.Equal(keypath.GCWorkerServiceSafePointID, ssp.ServiceID) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/storage/endpoint/removeExpiredKeys")) } diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index d5f27e34714..bd9a587e239 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -30,6 +30,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/keypath" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -37,8 +38,8 @@ func TestBasic(t *testing.T) { re := require.New(t) storage := NewStorageWithMemoryBackend() - re.Equal("raft/s/00000000000000000123", endpoint.StorePath(123)) - re.Equal("raft/r/00000000000000000123", endpoint.RegionPath(123)) + re.Equal("raft/s/00000000000000000123", keypath.StorePath(123)) + re.Equal("raft/r/00000000000000000123", keypath.RegionPath(123)) meta := &metapb.Cluster{Id: 123} ok, err := storage.LoadMeta(meta) @@ -157,7 +158,7 @@ func TestSaveServiceGCSafePoint(t *testing.T) { re.NoError(storage.SaveServiceGCSafePoint(ssp)) } - prefix := endpoint.GCSafePointServicePrefixPath() + prefix := keypath.GCSafePointServicePrefixPath() prefixEnd := clientv3.GetPrefixRangeEnd(prefix) keys, values, err := storage.LoadRange(prefix, prefixEnd, len(serviceSafePoints)) re.NoError(err) diff --git a/pkg/storage/storage_tso_test.go b/pkg/storage/storage_tso_test.go index 9718565d78f..15a17e6a41d 100644 --- a/pkg/storage/storage_tso_test.go +++ b/pkg/storage/storage_tso_test.go @@ -21,8 +21,8 @@ import ( "time" "github.com/stretchr/testify/require" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" ) func TestSaveLoadTimestamp(t *testing.T) { @@ -30,7 +30,7 @@ func TestSaveLoadTimestamp(t *testing.T) { storage, clean := newTestStorage(t) defer clean() expectedTS := time.Now().Round(0) - err := storage.SaveTimestamp(endpoint.TimestampKey, expectedTS) + err := storage.SaveTimestamp(keypath.TimestampKey, expectedTS) re.NoError(err) ts, err := storage.LoadTimestamp("") re.NoError(err) @@ -44,13 +44,13 @@ func TestGlobalLocalTimestamp(t *testing.T) { ltaKey := "lta" dc1LocationKey, dc2LocationKey := "dc1", "dc2" localTS1 := time.Now().Round(0) - l1 := path.Join(ltaKey, dc1LocationKey, endpoint.TimestampKey) - l2 := path.Join(ltaKey, dc2LocationKey, endpoint.TimestampKey) + l1 := path.Join(ltaKey, dc1LocationKey, keypath.TimestampKey) + l2 := path.Join(ltaKey, dc2LocationKey, keypath.TimestampKey) err := storage.SaveTimestamp(l1, localTS1) re.NoError(err) globalTS := time.Now().Round(0) - err = storage.SaveTimestamp(endpoint.TimestampKey, globalTS) + err = storage.SaveTimestamp(keypath.TimestampKey, globalTS) re.NoError(err) localTS2 := time.Now().Round(0) err = storage.SaveTimestamp(l2, localTS2) @@ -70,11 +70,11 @@ func TestTimestampTxn(t *testing.T) { storage, clean := newTestStorage(t) defer clean() globalTS1 := time.Now().Round(0) - err := storage.SaveTimestamp(endpoint.TimestampKey, globalTS1) + err := storage.SaveTimestamp(keypath.TimestampKey, globalTS1) re.NoError(err) globalTS2 := globalTS1.Add(-time.Millisecond).Round(0) - err = storage.SaveTimestamp(endpoint.TimestampKey, globalTS2) + err = storage.SaveTimestamp(keypath.TimestampKey, globalTS2) re.Error(err) ts, err := storage.LoadTimestamp("") diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 38511ee2913..53a6f65a25d 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -34,7 +34,7 @@ import ( "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/slice" - "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -121,7 +121,7 @@ func newGlobalTimestampOracle(am *AllocatorManager) *timestampOracle { oracle := ×tampOracle{ client: am.member.GetLeadership().GetClient(), keyspaceGroupID: am.kgID, - tsPath: endpoint.KeyspaceGroupGlobalTSPath(am.kgID), + tsPath: keypath.KeyspaceGroupGlobalTSPath(am.kgID), storage: am.storage, saveInterval: am.saveInterval, updatePhysicalInterval: am.updatePhysicalInterval, diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 3a574ea8333..7c1d3426ce9 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -41,6 +41,7 @@ import ( "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/memberutil" "github.com/tikv/pd/pkg/utils/syncutil" @@ -431,7 +432,7 @@ func NewKeyspaceGroupManager( kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil) kgm.tsoSvcStorage = endpoint.NewStorageEndpoint( kv.NewEtcdKVBase(kgm.etcdClient, kgm.tsoSvcRootPath), nil) - kgm.compiledKGMembershipIDRegexp = endpoint.GetCompiledKeyspaceGroupIDRegexp() + kgm.compiledKGMembershipIDRegexp = keypath.GetCompiledKeyspaceGroupIDRegexp() kgm.state.initialize() return kgm } @@ -531,7 +532,7 @@ func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error { // Value: endpoint.KeyspaceGroup func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { rootPath := kgm.legacySvcRootPath - startKey := rootPath + "/" + endpoint.KeyspaceGroupIDPrefix() + startKey := rootPath + "/" + keypath.KeyspaceGroupIDPrefix() defaultKGConfigured := false putFn := func(kv *mvccpb.KeyValue) error { @@ -757,7 +758,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro Id: uniqueID, // id is unique among all participants ListenUrls: []string{kgm.cfg.GetAdvertiseListenAddr()}, } - participant.InitInfo(p, endpoint.KeyspaceGroupsElectionPath(kgm.tsoSvcRootPath, group.ID), constant.PrimaryKey, "keyspace group primary election") + participant.InitInfo(p, keypath.KeyspaceGroupsElectionPath(kgm.tsoSvcRootPath, group.ID), constant.PrimaryKey, "keyspace group primary election") // If the keyspace group is in split, we should ensure that the primary elected by the new keyspace group // is always on the same TSO Server node as the primary of the old keyspace group, and this constraint cannot // be broken until the entire split process is completed. @@ -1356,7 +1357,7 @@ mergeLoop: // Check if the keyspace group primaries in the merge map are all gone. if len(mergeMap) != 0 { for id := range mergeMap { - leaderPath := endpoint.KeyspaceGroupPrimaryPath(kgm.tsoSvcRootPath, id) + leaderPath := keypath.KeyspaceGroupPrimaryPath(kgm.tsoSvcRootPath, id) val, err := kgm.tsoSvcStorage.Load(leaderPath) if err != nil { log.Error("failed to check if the keyspace group primary in the merge list has gone", @@ -1386,7 +1387,7 @@ mergeLoop: // calculate the newly merged TSO to make sure it is greater than the original ones. var mergedTS time.Time for _, id := range mergeList { - ts, err := kgm.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupGlobalTSPath(id)) + ts, err := kgm.tsoSvcStorage.LoadTimestamp(keypath.KeyspaceGroupGlobalTSPath(id)) if err != nil { log.Error("failed to load the keyspace group TSO", zap.String("member", kgm.tsoServiceID.ServiceAddr), @@ -1541,8 +1542,8 @@ func (kgm *KeyspaceGroupManager) deletedGroupCleaner() { // Clean up the remaining TSO keys. // TODO: support the Local TSO Allocator clean up. err := kgm.tsoSvcStorage.DeleteTimestamp( - endpoint.TimestampPath( - endpoint.KeyspaceGroupGlobalTSPath(groupID), + keypath.TimestampPath( + keypath.KeyspaceGroupGlobalTSPath(groupID), ), ) if err != nil { diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 95462e3b3d3..13a50084eac 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -35,6 +35,7 @@ import ( "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" @@ -112,7 +113,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestDeletedGroupCleanup() { suite.applyEtcdEvents(re, rootPath, []*etcdEvent{generateKeyspaceGroupPutEvent(1, []uint32{1}, []string{svcAddr})}) // Check if the TSO key is created. testutil.Eventually(re, func() bool { - ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupGlobalTSPath(1)) + ts, err := mgr.tsoSvcStorage.LoadTimestamp(keypath.KeyspaceGroupGlobalTSPath(1)) re.NoError(err) return ts != typeutil.ZeroTime }) @@ -120,7 +121,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestDeletedGroupCleanup() { suite.applyEtcdEvents(re, rootPath, []*etcdEvent{generateKeyspaceGroupDeleteEvent(1)}) // Check if the TSO key is deleted. testutil.Eventually(re, func() bool { - ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupGlobalTSPath(1)) + ts, err := mgr.tsoSvcStorage.LoadTimestamp(keypath.KeyspaceGroupGlobalTSPath(1)) re.NoError(err) return ts == typeutil.ZeroTime }) @@ -139,7 +140,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestDeletedGroupCleanup() { re.NotContains(mgr.deletedGroups, constant.DefaultKeyspaceGroupID) mgr.RUnlock() // Default keyspace group TSO key should NOT be deleted. - ts, err := mgr.legacySvcStorage.LoadTimestamp(endpoint.KeyspaceGroupGlobalTSPath(constant.DefaultKeyspaceGroupID)) + ts, err := mgr.legacySvcStorage.LoadTimestamp(keypath.KeyspaceGroupGlobalTSPath(constant.DefaultKeyspaceGroupID)) re.NoError(err) re.NotEmpty(ts) @@ -816,7 +817,7 @@ func putKeyspaceGroupToEtcd( ctx context.Context, etcdClient *clientv3.Client, rootPath string, group *endpoint.KeyspaceGroup, ) error { - key := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(group.ID)}, "/") + key := strings.Join([]string{rootPath, keypath.KeyspaceGroupIDPath(group.ID)}, "/") value, err := json.Marshal(group) if err != nil { return err @@ -834,7 +835,7 @@ func deleteKeyspaceGroupInEtcd( ctx context.Context, etcdClient *clientv3.Client, rootPath string, id uint32, ) error { - key := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(id)}, "/") + key := strings.Join([]string{rootPath, keypath.KeyspaceGroupIDPath(id)}, "/") if _, err := etcdClient.Delete(ctx, key); err != nil { return err @@ -863,7 +864,7 @@ func addKeyspaceGroupAssignment( Keyspaces: keyspaces, } - key := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(groupID)}, "/") + key := strings.Join([]string{rootPath, keypath.KeyspaceGroupIDPath(groupID)}, "/") value, err := json.Marshal(group) if err != nil { return err diff --git a/pkg/tso/local_allocator.go b/pkg/tso/local_allocator.go index 3bbaa661dd6..040b5891d12 100644 --- a/pkg/tso/local_allocator.go +++ b/pkg/tso/local_allocator.go @@ -26,7 +26,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -72,7 +72,7 @@ func newLocalTimestampOracle(am *AllocatorManager, leadership *election.Leadersh oracle := ×tampOracle{ client: leadership.GetClient(), keyspaceGroupID: am.kgID, - tsPath: endpoint.KeyspaceGroupLocalTSPath(localTSOAllocatorEtcdPrefix, am.kgID, dcLocation), + tsPath: keypath.KeyspaceGroupLocalTSPath(localTSOAllocatorEtcdPrefix, am.kgID, dcLocation), storage: am.storage, saveInterval: am.saveInterval, updatePhysicalInterval: am.updatePhysicalInterval, diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index d04710ac378..427f6771461 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -27,6 +27,7 @@ import ( "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/tsoutil" @@ -152,7 +153,7 @@ func (t *timestampOracle) calibrateLogical(rawLogical int64, suffixBits int) int // GetTimestampPath returns the timestamp path in etcd. func (t *timestampOracle) GetTimestampPath() string { - return endpoint.TimestampPath(t.tsPath) + return keypath.TimestampPath(t.tsPath) } // SyncTimestamp is used to synchronize the timestamp. diff --git a/pkg/tso/util_test.go b/pkg/tso/util_test.go index 8e4971797d5..df8a7854e11 100644 --- a/pkg/tso/util_test.go +++ b/pkg/tso/util_test.go @@ -18,13 +18,13 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/keypath" ) func TestExtractKeyspaceGroupIDFromKeyspaceGroupMembershipPath(t *testing.T) { re := require.New(t) - compiledRegexp := endpoint.GetCompiledKeyspaceGroupIDRegexp() + compiledRegexp := keypath.GetCompiledKeyspaceGroupIDRegexp() rightCases := []struct { path string @@ -74,7 +74,7 @@ func TestExtractKeyspaceGroupIDFromKeyspaceGroupMembershipPath(t *testing.T) { func TestExtractKeyspaceGroupIDFromKeyspaceGroupPrimaryPath(t *testing.T) { re := require.New(t) - compiledRegexp := endpoint.GetCompiledNonDefaultIDRegexp(uint64(111)) + compiledRegexp := keypath.GetCompiledNonDefaultIDRegexp(uint64(111)) rightCases := []struct { path string diff --git a/pkg/storage/endpoint/key_path.go b/pkg/utils/keypath/key_path.go similarity index 77% rename from pkg/storage/endpoint/key_path.go rename to pkg/utils/keypath/key_path.go index 8cc8f172e84..7e1355c7b56 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/utils/keypath/key_path.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package endpoint +package keypath import ( "fmt" @@ -25,18 +25,25 @@ import ( ) const ( - pdRootPath = "/pd" - clusterPath = "raft" - configPath = "config" - serviceMiddlewarePath = "service_middleware" - schedulePath = "schedule" - gcPath = "gc" - ruleCommonPath = "rule" - rulesPath = "rules" - ruleGroupPath = "rule_group" - regionLabelPath = "region_label" - replicationPath = "replication_mode" - customSchedulerConfigPath = "scheduler_config" + pdRootPath = "/pd" + // ClusterPath is the path to save the cluster meta information. + ClusterPath = "raft" + // Config is the path to save the PD config. + Config = "config" + // ServiceMiddlewarePath is the path to save the service middleware config. + ServiceMiddlewarePath = "service_middleware" + schedulePath = "schedule" + gcPath = "gc" + ruleCommonPath = "rule" + // RulesPath is the path to save the placement rules. + RulesPath = "rules" + // RuleGroupPath is the path to save the placement rule groups. + RuleGroupPath = "rule_group" + // RegionLabelPath is the path to save the region label. + RegionLabelPath = "region_label" + replicationPath = "replication_mode" + // CustomSchedulerConfigPath is the path to save the scheduler config. + CustomSchedulerConfigPath = "scheduler_config" // GCWorkerServiceSafePointID is the service id of GC worker. GCWorkerServiceSafePointID = "gc_worker" minResolvedTS = "min_resolved_ts" @@ -51,9 +58,12 @@ const ( serviceSafePointInfix = "service_safe_point" regionPathPrefix = "raft/r" // resource group storage endpoint has prefix `resource_group` - resourceGroupSettingsPath = "settings" - resourceGroupStatesPath = "states" - controllerConfigPath = "controller" + // ResourceGroupSettingsPath is the path to save the resource group settings. + ResourceGroupSettingsPath = "settings" + // ResourceGroupStatesPath is the path to save the resource group states. + ResourceGroupStatesPath = "states" + // ControllerConfigPath is the path to save the controller config. + ControllerConfigPath = "controller" // tso storage endpoint has prefix `tso` tsoServiceKey = constant.TSOServiceName globalTSOAllocatorEtcdPrefix = "gta" @@ -78,29 +88,29 @@ func AppendToRootPath(rootPath string, key string) string { return path.Join(rootPath, key) } -// ClusterRootPath appends the `clusterPath` to the rootPath. +// ClusterRootPath appends the `ClusterPath` to the rootPath. func ClusterRootPath(rootPath string) string { - return AppendToRootPath(rootPath, clusterPath) + return AppendToRootPath(rootPath, ClusterPath) } // ClusterBootstrapTimeKey returns the path to save the cluster bootstrap timestamp. func ClusterBootstrapTimeKey() string { - return path.Join(clusterPath, "status", "raft_bootstrap_time") + return path.Join(ClusterPath, "status", "raft_bootstrap_time") } // ConfigPath returns the path to save the PD config. func ConfigPath(clusterID uint64) string { - return path.Join(PDRootPath(clusterID), configPath) + return path.Join(PDRootPath(clusterID), Config) } // SchedulerConfigPathPrefix returns the path prefix to save the scheduler config. func SchedulerConfigPathPrefix(clusterID uint64) string { - return path.Join(PDRootPath(clusterID), customSchedulerConfigPath) + return path.Join(PDRootPath(clusterID), CustomSchedulerConfigPath) } // RulesPathPrefix returns the path prefix to save the placement rules. func RulesPathPrefix(clusterID uint64) string { - return path.Join(PDRootPath(clusterID), rulesPath) + return path.Join(PDRootPath(clusterID), RulesPath) } // RuleCommonPathPrefix returns the path prefix to save the placement rule common config. @@ -110,26 +120,27 @@ func RuleCommonPathPrefix(clusterID uint64) string { // RuleGroupPathPrefix returns the path prefix to save the placement rule groups. func RuleGroupPathPrefix(clusterID uint64) string { - return path.Join(PDRootPath(clusterID), ruleGroupPath) + return path.Join(PDRootPath(clusterID), RuleGroupPath) } // RegionLabelPathPrefix returns the path prefix to save the region label. func RegionLabelPathPrefix(clusterID uint64) string { - return path.Join(PDRootPath(clusterID), regionLabelPath) + return path.Join(PDRootPath(clusterID), RegionLabelPath) } -func schedulerConfigPath(schedulerName string) string { - return path.Join(customSchedulerConfigPath, schedulerName) +// SchedulerConfigPath returns the path to save the scheduler config. +func SchedulerConfigPath(schedulerName string) string { + return path.Join(CustomSchedulerConfigPath, schedulerName) } // StorePath returns the store meta info key path with the given store ID. func StorePath(storeID uint64) string { - return path.Join(clusterPath, "s", fmt.Sprintf("%020d", storeID)) + return path.Join(ClusterPath, "s", fmt.Sprintf("%020d", storeID)) } // StorePathPrefix returns the store meta info key path prefix. func StorePathPrefix(clusterID uint64) string { - return path.Join(PDRootPath(clusterID), clusterPath, "s") + "/" + return path.Join(PDRootPath(clusterID), ClusterPath, "s") + "/" } // ExtractStoreIDFromPath extracts the store ID from the given path. @@ -138,11 +149,13 @@ func ExtractStoreIDFromPath(clusterID uint64, path string) (uint64, error) { return strconv.ParseUint(idStr, 10, 64) } -func storeLeaderWeightPath(storeID uint64) string { +// StoreLeaderWeightPath returns the store leader weight key path with the given store ID. +func StoreLeaderWeightPath(storeID uint64) string { return path.Join(schedulePath, "store_weight", fmt.Sprintf("%020d", storeID), "leader") } -func storeRegionWeightPath(storeID uint64) string { +// StoreRegionWeightPath returns the store region weight key path with the given store ID. +func StoreRegionWeightPath(storeID uint64) string { return path.Join(schedulePath, "store_weight", fmt.Sprintf("%020d", storeID), "region") } @@ -170,51 +183,59 @@ func RegionPath(regionID uint64) string { return buf.String() } -func resourceGroupSettingKeyPath(groupName string) string { - return path.Join(resourceGroupSettingsPath, groupName) +// ResourceGroupSettingKeyPath returns the path to save the resource group settings. +func ResourceGroupSettingKeyPath(groupName string) string { + return path.Join(ResourceGroupSettingsPath, groupName) } -func resourceGroupStateKeyPath(groupName string) string { - return path.Join(resourceGroupStatesPath, groupName) +// ResourceGroupStateKeyPath returns the path to save the resource group states. +func ResourceGroupStateKeyPath(groupName string) string { + return path.Join(ResourceGroupStatesPath, groupName) } -func ruleKeyPath(ruleKey string) string { - return path.Join(rulesPath, ruleKey) +// RuleKeyPath returns the path to save the placement rule with the given rule key. +func RuleKeyPath(ruleKey string) string { + return path.Join(RulesPath, ruleKey) } -func ruleGroupIDPath(groupID string) string { - return path.Join(ruleGroupPath, groupID) +// RuleGroupIDPath returns the path to save the placement rule group with the given group ID. +func RuleGroupIDPath(groupID string) string { + return path.Join(RuleGroupPath, groupID) } -func regionLabelKeyPath(ruleKey string) string { - return path.Join(regionLabelPath, ruleKey) +// RegionLabelKeyPath returns the path to save the region label with the given rule key. +func RegionLabelKeyPath(ruleKey string) string { + return path.Join(RegionLabelPath, ruleKey) } -func replicationModePath(mode string) string { +// ReplicationModePath returns the path to save the replication mode with the given mode. +func ReplicationModePath(mode string) string { return path.Join(replicationPath, mode) } -func gcSafePointPath() string { +// GCSafePointPath returns the GC safe point key path. +func GCSafePointPath() string { return path.Join(gcPath, "safe_point") } // GCSafePointServicePrefixPath returns the GC safe point service key path prefix. func GCSafePointServicePrefixPath() string { - return path.Join(gcSafePointPath(), "service") + "/" + return path.Join(GCSafePointPath(), "service") + "/" } -func gcSafePointServicePath(serviceID string) string { - return path.Join(gcSafePointPath(), "service", serviceID) +// GCSafePointServicePath returns the GC safe point service key path with the given service ID. +func GCSafePointServicePath(serviceID string) string { + return path.Join(GCSafePointPath(), "service", serviceID) } // MinResolvedTSPath returns the min resolved ts path. func MinResolvedTSPath() string { - return path.Join(clusterPath, minResolvedTS) + return path.Join(ClusterPath, minResolvedTS) } // ExternalTimestampPath returns the external timestamp path. func ExternalTimestampPath() string { - return path.Join(clusterPath, externalTimeStamp) + return path.Join(ClusterPath, externalTimeStamp) } // GCSafePointV2Path is the storage path of gc safe point v2. diff --git a/pkg/utils/keypath/key_path_test.go b/pkg/utils/keypath/key_path_test.go new file mode 100644 index 00000000000..5dc230d43bb --- /dev/null +++ b/pkg/utils/keypath/key_path_test.go @@ -0,0 +1,43 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keypath + +import ( + "fmt" + "math/rand" + "path" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestRegionPath(t *testing.T) { + re := require.New(t) + f := func(id uint64) string { + return path.Join(regionPathPrefix, fmt.Sprintf("%020d", id)) + } + rand.New(rand.NewSource(time.Now().Unix())) + for i := 0; i < 1000; i++ { + id := rand.Uint64() + re.Equal(f(id), RegionPath(id)) + } +} + +func BenchmarkRegionPath(b *testing.B) { + for i := 0; i < b.N; i++ { + _ = RegionPath(uint64(i)) + } +} diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index ef6d45203d8..f1630d433d1 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -55,10 +55,10 @@ import ( "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/syncer" "github.com/tikv/pd/pkg/unsaferecovery" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/netutil" "github.com/tikv/pd/pkg/utils/syncutil" @@ -265,7 +265,7 @@ func (c *RaftCluster) isInitialized() bool { // value of time.Time when there is error or the cluster is not bootstrapped yet. func (c *RaftCluster) loadBootstrapTime() (time.Time, error) { var t time.Time - data, err := c.storage.Load(endpoint.ClusterBootstrapTimeKey()) + data, err := c.storage.Load(keypath.ClusterBootstrapTimeKey()) if err != nil { return t, err } diff --git a/server/gc_service.go b/server/gc_service.go index d31e047b1ea..d88dc8488d6 100644 --- a/server/gc_service.go +++ b/server/gc_service.go @@ -27,6 +27,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/tsoutil" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -148,7 +149,7 @@ func (s *GrpcServer) WatchGCSafePointV2(request *pdpb.WatchGCSafePointV2Request, // - If required revision < CompactRevision, we need to reload all configs to avoid losing data. // - If required revision >= CompactRevision, just keep watching. // Use WithPrevKV() to get the previous key-value pair when get Delete Event. - watchChan := s.client.Watch(ctx, path.Join(s.rootPath, endpoint.GCSafePointV2Prefix()), clientv3.WithRev(revision), clientv3.WithPrefix()) + watchChan := s.client.Watch(ctx, path.Join(s.rootPath, keypath.GCSafePointV2Prefix()), clientv3.WithRev(revision), clientv3.WithPrefix()) for { select { case <-ctx.Done(): @@ -203,7 +204,7 @@ func (s *GrpcServer) GetAllGCSafePointV2(ctx context.Context, request *pdpb.GetA return rsp.(*pdpb.GetAllGCSafePointV2Response), err } - startkey := endpoint.GCSafePointV2Prefix() + startkey := keypath.GCSafePointV2Prefix() endkey := clientv3.GetPrefixRangeEnd(startkey) _, values, revision, err := s.loadRangeFromEtcd(startkey, endkey) diff --git a/server/keyspace_service.go b/server/keyspace_service.go index afc5b41b1b6..8a0b3a7b1f0 100644 --- a/server/keyspace_service.go +++ b/server/keyspace_service.go @@ -23,8 +23,8 @@ import ( "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/pkg/keyspace" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -74,7 +74,7 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques } ctx, cancel := context.WithCancel(s.Context()) defer cancel() - startKey := path.Join(s.rootPath, endpoint.KeyspaceMetaPrefix()) + "/" + startKey := path.Join(s.rootPath, keypath.KeyspaceMetaPrefix()) + "/" keyspaces := make([]*keyspacepb.KeyspaceMeta, 0) putFn := func(kv *mvccpb.KeyValue) error { diff --git a/server/server.go b/server/server.go index 205d220180f..c79f51d8153 100644 --- a/server/server.go +++ b/server/server.go @@ -64,7 +64,6 @@ import ( "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/storage" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/syncer" "github.com/tikv/pd/pkg/systimemon" @@ -73,6 +72,7 @@ import ( "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/jsonutil" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/tsoutil" @@ -439,7 +439,7 @@ func (s *Server) startServer(ctx context.Context) error { metadataGauge.WithLabelValues(fmt.Sprintf("cluster%d", clusterID)).Set(0) bs.ServerInfoGauge.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix())) - s.rootPath = endpoint.PDRootPath(clusterID) + s.rootPath = keypath.PDRootPath(clusterID) s.member.InitMemberInfo(s.cfg.AdvertiseClientUrls, s.cfg.AdvertisePeerUrls, s.Name(), s.rootPath) if err := s.member.SetMemberDeployPath(s.member.ID()); err != nil { return err @@ -494,7 +494,7 @@ func (s *Server) startServer(ctx context.Context) error { keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{ Client: s.client, RootPath: s.rootPath, - AllocPath: endpoint.KeyspaceIDAlloc(), + AllocPath: keypath.KeyspaceIDAlloc(), Label: keyspace.AllocLabel, Member: s.member.MemberValue(), Step: keyspace.AllocStep, @@ -729,7 +729,7 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe if err != nil { return nil, errors.WithStack(err) } - clusterRootPath := endpoint.ClusterRootPath(s.rootPath) + clusterRootPath := keypath.ClusterRootPath(s.rootPath) var ops []clientv3.Op ops = append(ops, clientv3.OpPut(clusterRootPath, string(clusterValue))) @@ -737,7 +737,7 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe // Set bootstrap time // Because we will write the cluster meta into etcd directly, // so we need to handle the root key path manually here. - bootstrapKey := endpoint.AppendToRootPath(s.rootPath, endpoint.ClusterBootstrapTimeKey()) + bootstrapKey := keypath.AppendToRootPath(s.rootPath, keypath.ClusterBootstrapTimeKey()) nano := time.Now().UnixNano() timeData := typeutil.Uint64ToBytes(uint64(nano)) @@ -745,7 +745,7 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe // Set store meta storeMeta := req.GetStore() - storePath := endpoint.AppendToRootPath(s.rootPath, endpoint.StorePath(storeMeta.GetId())) + storePath := keypath.AppendToRootPath(s.rootPath, keypath.StorePath(storeMeta.GetId())) storeValue, err := storeMeta.Marshal() if err != nil { return nil, errors.WithStack(err) @@ -758,7 +758,7 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe } // Set region meta with region id. - regionPath := endpoint.AppendToRootPath(s.rootPath, endpoint.RegionPath(req.GetRegion().GetId())) + regionPath := keypath.AppendToRootPath(s.rootPath, keypath.RegionPath(req.GetRegion().GetId())) ops = append(ops, clientv3.OpPut(regionPath, string(regionValue))) // TODO: we must figure out a better way to handle bootstrap failed, maybe intervene manually. @@ -1962,7 +1962,7 @@ func (s *Server) IsTTLConfigExist(key string) bool { // and is deleted after BR EBS restore is done. func (s *Server) MarkSnapshotRecovering() error { log.Info("mark snapshot recovering") - markPath := endpoint.AppendToRootPath(s.rootPath, recoveringMarkPath) + markPath := keypath.AppendToRootPath(s.rootPath, recoveringMarkPath) // the value doesn't matter, set to a static string _, err := kv.NewSlowLogTxn(s.client). If(clientv3.Compare(clientv3.CreateRevision(markPath), "=", 0)). @@ -1974,7 +1974,7 @@ func (s *Server) MarkSnapshotRecovering() error { // IsSnapshotRecovering check whether recovering-mark marked func (s *Server) IsSnapshotRecovering(ctx context.Context) (bool, error) { - markPath := endpoint.AppendToRootPath(s.rootPath, recoveringMarkPath) + markPath := keypath.AppendToRootPath(s.rootPath, recoveringMarkPath) resp, err := s.client.Get(ctx, markPath) if err != nil { return false, err @@ -1985,7 +1985,7 @@ func (s *Server) IsSnapshotRecovering(ctx context.Context) (bool, error) { // UnmarkSnapshotRecovering unmark recovering mark func (s *Server) UnmarkSnapshotRecovering(ctx context.Context) error { log.Info("unmark snapshot recovering") - markPath := endpoint.AppendToRootPath(s.rootPath, recoveringMarkPath) + markPath := keypath.AppendToRootPath(s.rootPath, recoveringMarkPath) _, err := s.client.Delete(ctx, markPath) // if other client already unmarked, return success too return err @@ -2019,15 +2019,15 @@ func (s *Server) SetServicePrimaryAddr(serviceName, addr string) { func (s *Server) initTSOPrimaryWatcher() { serviceName := constant.TSOServiceName - tsoRootPath := endpoint.TSOSvcRootPath(s.ClusterID()) - tsoServicePrimaryKey := endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, constant.DefaultKeyspaceGroupID) + tsoRootPath := keypath.TSOSvcRootPath(s.ClusterID()) + tsoServicePrimaryKey := keypath.KeyspaceGroupPrimaryPath(tsoRootPath, constant.DefaultKeyspaceGroupID) s.tsoPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, tsoServicePrimaryKey) s.tsoPrimaryWatcher.StartWatchLoop() } func (s *Server) initSchedulingPrimaryWatcher() { serviceName := constant.SchedulingServiceName - primaryKey := endpoint.SchedulingPrimaryPath(s.ClusterID()) + primaryKey := keypath.SchedulingPrimaryPath(s.ClusterID()) s.schedulingPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, primaryKey) s.schedulingPrimaryWatcher.StartWatchLoop() } diff --git a/tests/integrations/client/gc_client_test.go b/tests/integrations/client/gc_client_test.go index 8494b8edbcf..7f96e59d9aa 100644 --- a/tests/integrations/client/gc_client_test.go +++ b/tests/integrations/client/gc_client_test.go @@ -26,8 +26,8 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/assertutil" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server" clientv3 "go.etcd.io/etcd/client/v3" @@ -80,7 +80,7 @@ func (suite *gcClientTestSuite) SetupSuite() { suite.client, err = pd.NewClientWithContext(suite.server.Context(), []string{addr}, pd.SecurityOption{}) re.NoError(err) rootPath := path.Join("/pd", strconv.FormatUint(suite.server.ClusterID(), 10)) - suite.gcSafePointV2Prefix = path.Join(rootPath, endpoint.GCSafePointV2Prefix()) + suite.gcSafePointV2Prefix = path.Join(rootPath, keypath.GCSafePointV2Prefix()) // Enable the fail-point to skip checking keyspace validity. re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/gc/checkKeyspace", "return(true)")) } @@ -205,7 +205,7 @@ func (suite *gcClientTestSuite) mustLoadSafePoint(re *require.Assertions, keyspa // mustDeleteSafePoint deletes the gc safe point of the given keyspace id. func (suite *gcClientTestSuite) mustDeleteSafePoint(re *require.Assertions, keyspaceID uint32) { - safePointPath := path.Join(suite.gcSafePointV2Prefix, endpoint.EncodeKeyspaceID(keyspaceID)) + safePointPath := path.Join(suite.gcSafePointV2Prefix, keypath.EncodeKeyspaceID(keyspaceID)) log.Info("test etcd path", zap.Any("path", safePointPath)) // TODO: Delete _, err := suite.server.GetClient().Delete(suite.server.Context(), safePointPath) re.NoError(err) @@ -213,7 +213,7 @@ func (suite *gcClientTestSuite) mustDeleteSafePoint(re *require.Assertions, keys // mustGetRevision gets the revision of the given keyspace's gc safe point. func (suite *gcClientTestSuite) mustGetRevision(re *require.Assertions, keyspaceID uint32) int64 { - safePointPath := path.Join(suite.gcSafePointV2Prefix, endpoint.EncodeKeyspaceID(keyspaceID)) + safePointPath := path.Join(suite.gcSafePointV2Prefix, keypath.EncodeKeyspaceID(keyspaceID)) res, err := suite.server.GetClient().Get(suite.server.Context(), safePointPath) re.NoError(err) return res.Header.GetRevision() diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index cdd4dc106b9..09d8011c6c8 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -37,6 +37,7 @@ import ( "github.com/tikv/pd/pkg/storage/endpoint" tsopkg "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/server/apiv2/handlers" @@ -217,9 +218,9 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe // for loading/saving timestamp from/to etcd and the right primary path // for primary election. clusterID := suite.pdLeaderServer.GetClusterID() - rootPath := endpoint.TSOSvcRootPath(clusterID) - primaryPath := endpoint.KeyspaceGroupPrimaryPath(rootPath, param.keyspaceGroupID) - timestampPath := endpoint.FullTimestampPath(clusterID, param.keyspaceGroupID) + rootPath := keypath.TSOSvcRootPath(clusterID) + primaryPath := keypath.KeyspaceGroupPrimaryPath(rootPath, param.keyspaceGroupID) + timestampPath := keypath.FullTimestampPath(clusterID, param.keyspaceGroupID) re.Equal(timestampPath, am.GetTimestampPath(tsopkg.GlobalDCLocation)) re.Equal(primaryPath, am.GetMember().GetLeaderPath()) diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index 7ab92026197..43b0405923c 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -37,6 +37,7 @@ import ( "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" @@ -202,7 +203,7 @@ func getEtcdTimestampKeyNum(re *require.Assertions, client *clientv3.Client) int var count int for _, kv := range resp.Kvs { key := strings.TrimSpace(string(kv.Key)) - if !strings.HasSuffix(key, endpoint.TimestampKey) { + if !strings.HasSuffix(key, keypath.TimestampKey) { continue } count++ diff --git a/tools/pd-backup/pdbackup/backup.go b/tools/pd-backup/pdbackup/backup.go index 7eea1fb4b0b..d02e651b879 100644 --- a/tools/pd-backup/pdbackup/backup.go +++ b/tools/pd-backup/pdbackup/backup.go @@ -25,8 +25,8 @@ import ( "path" "strconv" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/server/config" clientv3 "go.etcd.io/etcd/client/v3" @@ -75,7 +75,7 @@ func GetBackupInfo(client *clientv3.Client, pdAddr string) (*BackupInfo, error) backInfo.AllocIDMax = allocIDMax - resp, err = etcdutil.EtcdKVGet(client, endpoint.TimestampPath(rootPath)) + resp, err = etcdutil.EtcdKVGet(client, keypath.TimestampPath(rootPath)) if err != nil { return nil, err } diff --git a/tools/pd-backup/pdbackup/backup_test.go b/tools/pd-backup/pdbackup/backup_test.go index f11cfef8e42..8124763551b 100644 --- a/tools/pd-backup/pdbackup/backup_test.go +++ b/tools/pd-backup/pdbackup/backup_test.go @@ -17,8 +17,8 @@ import ( "github.com/stretchr/testify/suite" sc "github.com/tikv/pd/pkg/schedule/config" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/server/config" @@ -114,7 +114,7 @@ func (s *backupTestSuite) BeforeTest(string, string) { rootPath = path.Join(pdRootPath, strconv.FormatUint(clusterID, 10)) allocTimestampMaxBytes = typeutil.Uint64ToBytes(allocTimestampMax) ) - _, err = s.etcdClient.Put(ctx, endpoint.TimestampPath(rootPath), string(allocTimestampMaxBytes)) + _, err = s.etcdClient.Put(ctx, keypath.TimestampPath(rootPath), string(allocTimestampMaxBytes)) re.NoError(err) var (