Skip to content

Commit

Permalink
*: move keyspace group primary path code to key_path.go
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Jul 5, 2023
1 parent cf5549e commit b895327
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 77 deletions.
9 changes: 5 additions & 4 deletions pkg/mcs/discovery/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@ package discovery
import (
"strconv"
"strings"

"github.com/tikv/pd/pkg/mcs/utils"
)

const (
registryPrefix = "/ms"
registryKey = "registry"
registryKey = "registry"
)

// RegistryPath returns the full path to store microservice addresses.
func RegistryPath(clusterID, serviceName, serviceAddr string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey, serviceAddr}, "/")
return strings.Join([]string{utils.MicroserviceRootPath, clusterID, serviceName, registryKey, serviceAddr}, "/")
}

// ServicePath returns the path to store microservice addresses.
func ServicePath(clusterID, serviceName string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey}, "/")
return strings.Join([]string{utils.MicroserviceRootPath, clusterID, serviceName, registryKey}, "/")
}

// TSOPath returns the path to store TSO addresses.
Expand Down
5 changes: 3 additions & 2 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
Expand Down Expand Up @@ -366,10 +367,10 @@ func (s *Server) startServer() (err error) {
uniqueName := s.cfg.ListenAddr
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
resourceManagerPrimaryPrefix := fmt.Sprintf("/ms/%d/resource_manager", s.clusterID)
resourceManagerPrimaryPrefix := endpoint.ResourceManagerSvcRootPath(s.clusterID)
s.participant = member.NewParticipant(s.etcdClient)
s.participant.InitInfo(uniqueName, uniqueID, path.Join(resourceManagerPrimaryPrefix, fmt.Sprintf("%05d", 0)),
"primary", "keyspace group primary election", s.cfg.AdvertiseListenAddr)
utils.KeyspaceGroupsPrimaryKey, "keyspace group primary election", s.cfg.AdvertiseListenAddr)

s.service = &Service{
ctx: s.ctx,
Expand Down
11 changes: 3 additions & 8 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/tikv/pd/pkg/mcs/discovery"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/systimemon"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/etcdutil"
Expand All @@ -63,12 +64,6 @@ import (
)

const (
// pdRootPath is the old path for storing the tso related root path.
pdRootPath = "/pd"
msServiceRootPath = "/ms"
// tsoSvcRootPathFormat defines the root path for all etcd paths used for different purposes.
// format: "/ms/{cluster_id}/tso".
tsoSvcRootPathFormat = msServiceRootPath + "/%d/" + mcsutils.TSOServiceName
// maxRetryTimesWaitAPIService is the max retry times for initializing the cluster ID.
maxRetryTimesWaitAPIService = 360
// retryIntervalWaitAPIService is the interval to retry.
Expand Down Expand Up @@ -535,8 +530,8 @@ func (s *Server) startServer() (err error) {

// Initialize the TSO service.
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
legacySvcRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10))
tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID)
legacySvcRootPath := path.Join(mcsutils.PDRootPath, strconv.FormatUint(s.clusterID, 10))
tsoSvcRootPath := endpoint.TSOSvcRootPath(s.clusterID)
s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr,
Expand Down
8 changes: 6 additions & 2 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ const (
// We also reserved 0 for the keyspace group for the same purpose.
DefaultKeyspaceGroupID = uint32(0)

// MicroserviceKey is the key of microservice.
MicroserviceKey = "ms"
// MicroserviceRootPath is the root path of microservice in etcd.
MicroserviceRootPath = "/ms"
// PDRootPath is the old path for storing the tso related root path.
PDRootPath = "/pd"
// APIServiceName is the name of api server.
APIServiceName = "api"
// TSOServiceName is the name of tso server.
Expand All @@ -59,6 +61,8 @@ const (
ResourceManagerServiceName = "resource_manager"
// KeyspaceGroupsKey is the path component of keyspace groups.
KeyspaceGroupsKey = "keyspace_groups"
// KeyspaceGroupsPrimaryKey is the path component of primary for keyspace groups.
KeyspaceGroupsPrimaryKey = "primary"

// MaxKeyspaceGroupCount is the max count of keyspace groups. keyspace group in tso
// is the sharding unit, i.e., by the definition here, the max count of the shards
Expand Down
47 changes: 43 additions & 4 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ const (
tsoServiceKey = utils.TSOServiceName
timestampKey = "timestamp"

tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + utils.KeyspaceGroupsKey
keyspaceGroupMembershipKey = "membership"
tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + utils.KeyspaceGroupsKey
keyspaceGroupsMembershipKey = "membership"
keyspaceGroupsElectionKey = "election"

// we use uint64 to represent ID, the max length of uint64 is 20.
keyLen = 20
Expand Down Expand Up @@ -228,13 +229,13 @@ func EncodeKeyspaceID(spaceID uint32) string {
// KeyspaceGroupIDPrefix returns the prefix of keyspace group id.
// Path: tso/keyspace_groups/membership
func KeyspaceGroupIDPrefix() string {
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey)
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupsMembershipKey)
}

// KeyspaceGroupIDPath returns the path to keyspace id from the given name.
// Path: tso/keyspace_groups/membership/{id}
func KeyspaceGroupIDPath(id uint32) string {
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey, encodeKeyspaceGroupID(id))
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupsMembershipKey, encodeKeyspaceGroupID(id))
}

// GetCompiledKeyspaceGroupIDRegexp returns the compiled regular expression for matching keyspace group id.
Expand All @@ -243,6 +244,44 @@ func GetCompiledKeyspaceGroupIDRegexp() *regexp.Regexp {
return regexp.MustCompile(pattern)
}

// ResourceManagerSvcRootPath returns the root path of resource manager service.
func ResourceManagerSvcRootPath(clusterID uint64) string {
return svcRootPath(clusterID, utils.ResourceManagerServiceName)
}

// TSOSvcRootPath returns the root path of tso service.
func TSOSvcRootPath(clusterID uint64) string {
return svcRootPath(clusterID, utils.TSOServiceName)
}

func svcRootPath(clusterID uint64, svcName string) string {
c := strconv.FormatUint(clusterID, 10)
return path.Join(utils.MicroserviceRootPath, c, svcName)
}

// KeyspaceGroupPrimaryPath returns the path of keyspace group primary.
// default keyspace group: "/ms/{cluster_id}/tso/00000/primary".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
func KeyspaceGroupPrimaryPath(rootPath string, keyspaceGroupID uint32) string {
electionPath := KeyspaceGroupsElectionPath(rootPath, keyspaceGroupID)
return path.Join(electionPath, utils.KeyspaceGroupsPrimaryKey)
}

// KeyspaceGroupsElectionPath returns the path of keyspace groups election.
func KeyspaceGroupsElectionPath(rootPath string, keyspaceGroupID uint32) string {
if keyspaceGroupID == utils.DefaultKeyspaceGroupID {
return path.Join(rootPath, "00000")
}
return path.Join(rootPath, utils.KeyspaceGroupsKey, keyspaceGroupsElectionKey, fmt.Sprintf("%05d", keyspaceGroupID))
}

// GetCompiledNonDefaultIDRegexp returns the compiled regular expression for matching non-default keyspace group id.
func GetCompiledNonDefaultIDRegexp(clusterID uint64) *regexp.Regexp {
rootPath := TSOSvcRootPath(clusterID)
pattern := strings.Join([]string{rootPath, utils.KeyspaceGroupsKey, keyspaceGroupsElectionKey, `(\d{5})`, utils.KeyspaceGroupsPrimaryKey + `$`}, "/")
return regexp.MustCompile(pattern)
}

// encodeKeyspaceGroupID from uint32 to string.
func encodeKeyspaceGroupID(groupID uint32) string {
return fmt.Sprintf("%05d", groupID)
Expand Down
41 changes: 3 additions & 38 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"math"
"net/http"
"path"
"regexp"
"sort"
"strings"
Expand Down Expand Up @@ -51,9 +50,6 @@ import (
)

const (
keyspaceGroupsElectionPath = mcsutils.KeyspaceGroupsKey + "/election"
// primaryKey is the key for keyspace group primary election.
primaryKey = "primary"
// mergingCheckInterval is the interval for merging check to see if the keyspace groups
// merging process could be moved forward.
mergingCheckInterval = 5 * time.Second
Expand Down Expand Up @@ -231,32 +227,6 @@ func (s *state) getNextPrimaryToReset(
return nil, nil, 0, groupID
}

// kgPrimaryPathBuilder builds the path for keyspace group primary election.
// default keyspace group: "/ms/{cluster_id}/tso/00000/primary".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
type kgPrimaryPathBuilder struct {
// rootPath is "/ms/{cluster_id}/tso".
rootPath string
// defaultKeyspaceGroupIDPath is "/ms/{cluster_id}/tso/00000".
defaultKeyspaceGroupIDPath string
}

// getKeyspaceGroupIDPath returns the keyspace group primary ID path.
// default keyspace group: "/ms/{cluster_id}/tso/00000".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}".
func (p *kgPrimaryPathBuilder) getKeyspaceGroupIDPath(keyspaceGroupID uint32) string {
if keyspaceGroupID == mcsutils.DefaultKeyspaceGroupID {
return p.defaultKeyspaceGroupIDPath
}
return path.Join(p.rootPath, keyspaceGroupsElectionPath, fmt.Sprintf("%05d", keyspaceGroupID))
}

// getCompiledNonDefaultIDRegexp returns the compiled regular expression for matching non-default keyspace group id.
func (p *kgPrimaryPathBuilder) getCompiledNonDefaultIDRegexp() *regexp.Regexp {
pattern := strings.Join([]string{p.rootPath, keyspaceGroupsElectionPath, `(\d{5})`, primaryKey + `$`}, "/")
return regexp.MustCompile(pattern)
}

// KeyspaceGroupManager manages the members of the keyspace groups assigned to this host.
// The replicas campaign for the leaders which provide the tso service for the corresponding
// keyspace groups.
Expand Down Expand Up @@ -330,7 +300,6 @@ type KeyspaceGroupManager struct {
// mergeCheckerCancelMap is the cancel function map for the merge checker of each keyspace group.
mergeCheckerCancelMap sync.Map // GroupID -> context.CancelFunc

primaryPathBuilder *kgPrimaryPathBuilder
primaryPriorityCheckInterval time.Duration

// tsoNodes is the registered tso servers.
Expand Down Expand Up @@ -381,10 +350,6 @@ func NewKeyspaceGroupManager(
kgm.tsoSvcStorage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.tsoSvcRootPath), nil)
kgm.compiledKGMembershipIDRegexp = endpoint.GetCompiledKeyspaceGroupIDRegexp()
kgm.primaryPathBuilder = &kgPrimaryPathBuilder{
rootPath: kgm.tsoSvcRootPath,
defaultKeyspaceGroupIDPath: path.Join(kgm.tsoSvcRootPath, "00000"),
}
kgm.state.initialize()
return kgm
}
Expand Down Expand Up @@ -692,8 +657,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
// Initialize the participant info to join the primary election.
participant := member.NewParticipant(kgm.etcdClient)
participant.InitInfo(
uniqueName, uniqueID, kgm.primaryPathBuilder.getKeyspaceGroupIDPath(group.ID),
primaryKey, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr())
uniqueName, uniqueID, endpoint.KeyspaceGroupsElectionPath(kgm.tsoSvcRootPath, group.ID),
mcsutils.KeyspaceGroupsPrimaryKey, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr())
// If the keyspace group is in split, we should ensure that the primary elected by the new keyspace group
// is always on the same TSO Server node as the primary of the old keyspace group, and this constraint cannot
// be broken until the entire split process is completed.
Expand Down Expand Up @@ -1248,7 +1213,7 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget
// Check if the keyspace group primaries in the merge map are all gone.
if len(mergeMap) != 0 {
for id := range mergeMap {
leaderPath := path.Join(kgm.primaryPathBuilder.getKeyspaceGroupIDPath(id), primaryKey)
leaderPath := endpoint.KeyspaceGroupPrimaryPath(kgm.tsoSvcRootPath, id)
val, err := kgm.tsoSvcStorage.Load(leaderPath)
if err != nil {
log.Error("failed to check if the keyspace group primary in the merge list has gone",
Expand Down
4 changes: 2 additions & 2 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() {
guid := uuid.New().String()
tsoServiceKey := discovery.ServicePath(guid, "tso") + "/"
legacySvcRootPath := path.Join("/pd", guid)
tsoSvcRootPath := path.Join("/ms", guid, "tso")
tsoSvcRootPath := path.Join(mcsutils.MicroserviceRootPath, guid, "tso")
electionNamePrefix := "tso-server-" + guid

kgm := NewKeyspaceGroupManager(
Expand Down Expand Up @@ -766,7 +766,7 @@ func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager(
tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: cfg.GetAdvertiseListenAddr()}
tsoServiceKey := discovery.ServicePath(uniqueStr, "tso") + "/"
legacySvcRootPath := path.Join("/pd", uniqueStr)
tsoSvcRootPath := path.Join("/ms", uniqueStr, "tso")
tsoSvcRootPath := path.Join(mcsutils.MicroserviceRootPath, uniqueStr, "tso")
electionNamePrefix := "kgm-test-" + cfg.GetAdvertiseListenAddr()

kgm := NewKeyspaceGroupManager(
Expand Down
9 changes: 1 addition & 8 deletions pkg/tso/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package tso

import (
"path"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -75,13 +74,7 @@ func TestExtractKeyspaceGroupIDFromKeyspaceGroupMembershipPath(t *testing.T) {
func TestExtractKeyspaceGroupIDFromKeyspaceGroupPrimaryPath(t *testing.T) {
re := require.New(t)

tsoSvcRootPath := "/ms/111/tso"
primaryPathBuilder := &kgPrimaryPathBuilder{
rootPath: tsoSvcRootPath,
defaultKeyspaceGroupIDPath: path.Join(tsoSvcRootPath, "00000"),
}

compiledRegexp := primaryPathBuilder.getCompiledNonDefaultIDRegexp()
compiledRegexp := endpoint.GetCompiledNonDefaultIDRegexp(uint64(111))

rightCases := []struct {
path string
Expand Down
7 changes: 2 additions & 5 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1844,13 +1844,10 @@ func (s *Server) SetServicePrimaryAddr(serviceName, addr string) {
s.servicePrimaryMap.Store(serviceName, addr)
}

func (s *Server) servicePrimaryKey(serviceName string) string {
return fmt.Sprintf("/ms/%d/%s/%s/%s", s.clusterID, serviceName, fmt.Sprintf("%05d", 0), "primary")
}

func (s *Server) initTSOPrimaryWatcher() {
serviceName := mcs.TSOServiceName
tsoServicePrimaryKey := s.servicePrimaryKey(serviceName)
tsoRootPath := endpoint.TSOSvcRootPath(s.clusterID)
tsoServicePrimaryKey := endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, mcs.DefaultKeyspaceGroupID)
putFn := func(kv *mvccpb.KeyValue) error {
primary := &tsopb.Participant{} // TODO: use Generics
if err := proto.Unmarshal(kv.Value, primary); err != nil {
Expand Down
6 changes: 2 additions & 4 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,15 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe
// for primary election.
var (
timestampPath string
primaryPath string
)
clusterID := strconv.FormatUint(suite.pdLeaderServer.GetClusterID(), 10)
rootPath := endpoint.TSOSvcRootPath(suite.pdLeaderServer.GetClusterID())
primaryPath := endpoint.KeyspaceGroupPrimaryPath(rootPath, param.keyspaceGroupID)
if param.keyspaceGroupID == mcsutils.DefaultKeyspaceGroupID {
timestampPath = fmt.Sprintf("/pd/%s/timestamp", clusterID)
primaryPath = fmt.Sprintf("/ms/%s/tso/00000/primary", clusterID)
} else {
timestampPath = fmt.Sprintf("/ms/%s/tso/%05d/gta/timestamp",
clusterID, param.keyspaceGroupID)
primaryPath = fmt.Sprintf("/ms/%s/tso/%s/election/%05d/primary",
clusterID, mcsutils.KeyspaceGroupsKey, param.keyspaceGroupID)
}
re.Equal(timestampPath, am.GetTimestampPath(tsopkg.GlobalDCLocation))
re.Equal(primaryPath, am.GetMember().GetLeaderPath())
Expand Down

0 comments on commit b895327

Please sign in to comment.