Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Jun 28, 2023
2 parents a58172f + c1601ca commit 4ca73d0
Show file tree
Hide file tree
Showing 37 changed files with 1,003 additions and 312 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/pingcap/kvproto v0.0.0-20230530111525-e4919c190b46
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5
github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/common v0.26.0
github.com/sasha-s/go-deadlock v0.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5 h1:adV4kUWI7v+v/6joR7lfjFngHhS4eiqwr4g3dHCjHtA=
github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM=
github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45 h1:/jqjj+ydlqjp144LAlHDfHtr7eWJyaNIIXX5viv0RZo=
github.com/pingcap/tidb-dashboard v0.0.0-20230626093106-fcc40851da45/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
4 changes: 2 additions & 2 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,10 +702,10 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID
return errors.Errorf("default keyspace group %d not found", utils.DefaultKeyspaceGroupID)
}
if defaultKeyspaceGroup.IsSplitting() {
return ErrKeyspaceGroupInSplit
return ErrKeyspaceGroupInSplit(utils.DefaultKeyspaceGroupID)
}
if defaultKeyspaceGroup.IsMerging() {
return ErrKeyspaceGroupInMerging
return ErrKeyspaceGroupInMerging(utils.DefaultKeyspaceGroupID)
}
keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, maxEtcdTxnOps)
if err != nil {
Expand Down
79 changes: 45 additions & 34 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (m *GroupManager) DeleteKeyspaceGroupByID(id uint32) (*endpoint.KeyspaceGro
return nil
}
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
return ErrKeyspaceGroupInSplit(id)
}
return m.store.DeleteKeyspaceGroup(txn, id)
}); err != nil {
Expand Down Expand Up @@ -339,10 +339,10 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro
return ErrKeyspaceGroupExists
}
if oldKG.IsSplitting() && overwrite {
return ErrKeyspaceGroupInSplit
return ErrKeyspaceGroupInSplit(keyspaceGroup.ID)
}
if oldKG.IsMerging() && overwrite {
return ErrKeyspaceGroupInMerging
return ErrKeyspaceGroupInMerging(keyspaceGroup.ID)
}
newKG := &endpoint.KeyspaceGroup{
ID: keyspaceGroup.ID,
Expand Down Expand Up @@ -414,13 +414,13 @@ func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupI
func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind, groupID uint64, keyspaceID uint32, mutation int) error {
kg := m.groups[userKind].Get(uint32(groupID))
if kg == nil {
return errors.Errorf("keyspace group %d not found", groupID)
return ErrKeyspaceGroupNotExists(uint32(groupID))
}
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
return ErrKeyspaceGroupInSplit(uint32(groupID))
}
if kg.IsMerging() {
return ErrKeyspaceGroupInMerging
return ErrKeyspaceGroupInMerging(uint32(groupID))
}

changed := false
Expand Down Expand Up @@ -473,11 +473,14 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse
if newKG == nil {
return errors.Errorf("keyspace group %s not found in %s group", newGroupID, newUserKind)
}
if oldKG.IsSplitting() || newKG.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
if oldKG.IsMerging() || newKG.IsMerging() {
return ErrKeyspaceGroupInMerging
if oldKG.IsSplitting() {
return ErrKeyspaceGroupInSplit(uint32(oldID))
} else if newKG.IsSplitting() {
return ErrKeyspaceGroupInSplit(uint32(newID))
} else if oldKG.IsMerging() {
return ErrKeyspaceGroupInMerging(uint32(oldID))
} else if newKG.IsMerging() {
return ErrKeyspaceGroupInMerging(uint32(newID))
}

var updateOld, updateNew bool
Expand Down Expand Up @@ -523,15 +526,15 @@ func (m *GroupManager) SplitKeyspaceGroupByID(
return err
}
if splitSourceKg == nil {
return ErrKeyspaceGroupNotExists
return ErrKeyspaceGroupNotExists(splitSourceID)
}
// A keyspace group can not take part in multiple split processes.
if splitSourceKg.IsSplitting() {
return ErrKeyspaceGroupInSplit
return ErrKeyspaceGroupInSplit(splitSourceID)
}
// A keyspace group can not be split when it is in merging.
if splitSourceKg.IsMerging() {
return ErrKeyspaceGroupInMerging
return ErrKeyspaceGroupInMerging(splitSourceID)
}
// Check if the source keyspace group has enough replicas.
if len(splitSourceKg.Members) < utils.DefaultKeyspaceGroupReplicaCount {
Expand Down Expand Up @@ -615,7 +618,15 @@ func buildSplitKeyspaces(
oldSplit = append(oldSplit, keyspace)
}
}
return oldSplit, new, nil
// Dedup new keyspaces if it's necessary.
if newNum == len(newKeyspaceMap) {
return oldSplit, new, nil
}
newSplit := make([]uint32, 0, len(newKeyspaceMap))
for keyspace := range newKeyspaceMap {
newSplit = append(newSplit, keyspace)
}
return oldSplit, newSplit, nil
}
// Split according to the start and end keyspace ID.
if startKeyspaceID == 0 && endKeyspaceID == 0 {
Expand Down Expand Up @@ -653,22 +664,22 @@ func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error {
return err
}
if splitTargetKg == nil {
return ErrKeyspaceGroupNotExists
return ErrKeyspaceGroupNotExists(splitTargetID)
}
// Check if it's in the split state.
if !splitTargetKg.IsSplitTarget() {
return ErrKeyspaceGroupNotInSplit
return ErrKeyspaceGroupNotInSplit(splitTargetID)
}
// Load the split source keyspace group then.
splitSourceKg, err = m.store.LoadKeyspaceGroup(txn, splitTargetKg.SplitSource())
if err != nil {
return err
}
if splitSourceKg == nil {
return ErrKeyspaceGroupNotExists
return ErrKeyspaceGroupNotExists(splitTargetKg.SplitSource())
}
if !splitSourceKg.IsSplitSource() {
return ErrKeyspaceGroupNotInSplit
return ErrKeyspaceGroupNotInSplit(splitTargetKg.SplitSource())
}
splitTargetKg.SplitState = nil
splitSourceKg.SplitState = nil
Expand Down Expand Up @@ -713,13 +724,13 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount
return err
}
if kg == nil {
return ErrKeyspaceGroupNotExists
return ErrKeyspaceGroupNotExists(id)
}
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
return ErrKeyspaceGroupInSplit(id)
}
if kg.IsMerging() {
return ErrKeyspaceGroupInMerging
return ErrKeyspaceGroupInMerging(id)
}
exists := make(map[string]struct{})
for _, member := range kg.Members {
Expand Down Expand Up @@ -775,13 +786,13 @@ func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error
return err
}
if kg == nil {
return ErrKeyspaceGroupNotExists
return ErrKeyspaceGroupNotExists(id)
}
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
return ErrKeyspaceGroupInSplit(id)
}
if kg.IsMerging() {
return ErrKeyspaceGroupInMerging
return ErrKeyspaceGroupInMerging(id)
}
members := make([]endpoint.KeyspaceGroupMember, 0, len(nodes))
for _, node := range nodes {
Expand Down Expand Up @@ -812,13 +823,13 @@ func (m *GroupManager) SetPriorityForKeyspaceGroup(id uint32, node string, prior
return err
}
if kg == nil {
return ErrKeyspaceGroupNotExists
return ErrKeyspaceGroupNotExists(id)
}
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
return ErrKeyspaceGroupInSplit(id)
}
if kg.IsMerging() {
return ErrKeyspaceGroupInMerging
return ErrKeyspaceGroupInMerging(id)
}
inKeyspaceGroup := false
members := make([]endpoint.KeyspaceGroupMember, 0, len(kg.Members))
Expand Down Expand Up @@ -883,15 +894,15 @@ func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uin
return err
}
if kg == nil {
return ErrKeyspaceGroupNotExists
return ErrKeyspaceGroupNotExists(kgID)
}
// A keyspace group can not be merged if it's in splitting.
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
return ErrKeyspaceGroupInSplit(kgID)
}
// A keyspace group can not be split when it is in merging.
if kg.IsMerging() {
return ErrKeyspaceGroupInMerging
return ErrKeyspaceGroupInMerging(kgID)
}
groups[kgID] = kg
}
Expand Down Expand Up @@ -947,11 +958,11 @@ func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error {
return err
}
if mergeTargetKg == nil {
return ErrKeyspaceGroupNotExists
return ErrKeyspaceGroupNotExists(mergeTargetID)
}
// Check if it's in the merging state.
if !mergeTargetKg.IsMergeTarget() {
return ErrKeyspaceGroupNotInMerging
return ErrKeyspaceGroupNotInMerging(mergeTargetID)
}
// Make sure all merging keyspace groups are deleted.
for _, kgID := range mergeTargetKg.MergeState.MergeList {
Expand All @@ -960,7 +971,7 @@ func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error {
return err
}
if kg != nil {
return ErrKeyspaceGroupNotInMerging
return ErrKeyspaceGroupNotInMerging(kgID)
}
}
mergeTargetKg.MergeState = nil
Expand Down
18 changes: 9 additions & 9 deletions pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,25 +276,25 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {

// finish the split of the keyspace group 2
err = suite.kgm.FinishSplitKeyspaceByID(2)
re.ErrorIs(err, ErrKeyspaceGroupNotInSplit)
re.ErrorContains(err, ErrKeyspaceGroupNotInSplit(2).Error())
// finish the split of a non-existing keyspace group
err = suite.kgm.FinishSplitKeyspaceByID(5)
re.ErrorIs(err, ErrKeyspaceGroupNotExists)
re.ErrorContains(err, ErrKeyspaceGroupNotExists(5).Error())
// split the in-split keyspace group
err = suite.kgm.SplitKeyspaceGroupByID(2, 4, []uint32{333})
re.ErrorIs(err, ErrKeyspaceGroupInSplit)
re.ErrorContains(err, ErrKeyspaceGroupInSplit(2).Error())
// remove the in-split keyspace group
kg2, err = suite.kgm.DeleteKeyspaceGroupByID(2)
re.Nil(kg2)
re.ErrorIs(err, ErrKeyspaceGroupInSplit)
re.ErrorContains(err, ErrKeyspaceGroupInSplit(2).Error())
kg4, err = suite.kgm.DeleteKeyspaceGroupByID(4)
re.Nil(kg4)
re.ErrorIs(err, ErrKeyspaceGroupInSplit)
re.ErrorContains(err, ErrKeyspaceGroupInSplit(4).Error())
// update the in-split keyspace group
err = suite.kg.kgm.UpdateKeyspaceForGroup(endpoint.Standard, "2", 444, opAdd)
re.ErrorIs(err, ErrKeyspaceGroupInSplit)
re.ErrorContains(err, ErrKeyspaceGroupInSplit(2).Error())
err = suite.kg.kgm.UpdateKeyspaceForGroup(endpoint.Standard, "4", 444, opAdd)
re.ErrorIs(err, ErrKeyspaceGroupInSplit)
re.ErrorContains(err, ErrKeyspaceGroupInSplit(4).Error())

// finish the split of keyspace group 4
err = suite.kgm.FinishSplitKeyspaceByID(4)
Expand All @@ -314,7 +314,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {

// split a non-existing keyspace group
err = suite.kgm.SplitKeyspaceGroupByID(3, 5, nil)
re.ErrorIs(err, ErrKeyspaceGroupNotExists)
re.ErrorContains(err, ErrKeyspaceGroupNotExists(3).Error())
// split into an existing keyspace group
err = suite.kgm.SplitKeyspaceGroupByID(2, 4, nil)
re.ErrorIs(err, ErrKeyspaceGroupExists)
Expand Down Expand Up @@ -442,7 +442,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupMerge() {

// merge a non-existing keyspace group
err = suite.kgm.MergeKeyspaceGroups(4, []uint32{5})
re.ErrorIs(err, ErrKeyspaceGroupNotExists)
re.ErrorContains(err, ErrKeyspaceGroupNotExists(5).Error())
// merge with the number of keyspace groups exceeds the limit
err = suite.kgm.MergeKeyspaceGroups(1, make([]uint32, maxEtcdTxnOps/2))
re.ErrorIs(err, ErrExceedMaxEtcdTxnOps)
Expand Down
20 changes: 15 additions & 5 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,25 @@ var (
// ErrKeyspaceGroupExists indicates target keyspace group already exists.
ErrKeyspaceGroupExists = errors.New("keyspace group already exists")
// ErrKeyspaceGroupNotExists is used to indicate target keyspace group does not exist.
ErrKeyspaceGroupNotExists = errors.New("keyspace group does not exist")
ErrKeyspaceGroupNotExists = func(groupID uint32) error {
return errors.Errorf("keyspace group %v does not exist", groupID)
}
// ErrKeyspaceGroupInSplit is used to indicate target keyspace group is in split state.
ErrKeyspaceGroupInSplit = errors.New("keyspace group is in split state")
ErrKeyspaceGroupInSplit = func(groupID uint32) error {
return errors.Errorf("keyspace group %v is in split state", groupID)
}
// ErrKeyspaceGroupNotInSplit is used to indicate target keyspace group is not in split state.
ErrKeyspaceGroupNotInSplit = errors.New("keyspace group is not in split state")
ErrKeyspaceGroupNotInSplit = func(groupID uint32) error {
return errors.Errorf("keyspace group %v is not in split state", groupID)
}
// ErrKeyspaceGroupInMerging is used to indicate target keyspace group is in merging state.
ErrKeyspaceGroupInMerging = errors.New("keyspace group is in merging state")
ErrKeyspaceGroupInMerging = func(groupID uint32) error {
return errors.Errorf("keyspace group %v is in merging state", groupID)
}
// ErrKeyspaceGroupNotInMerging is used to indicate target keyspace group is not in merging state.
ErrKeyspaceGroupNotInMerging = errors.New("keyspace group is not in merging state")
ErrKeyspaceGroupNotInMerging = func(groupID uint32) error {
return errors.Errorf("keyspace group %v is not in merging state", groupID)
}
// ErrKeyspaceNotInKeyspaceGroup is used to indicate target keyspace is not in this keyspace group.
ErrKeyspaceNotInKeyspaceGroup = errors.New("keyspace is not in this keyspace group")
// ErrNodeNotInKeyspaceGroup is used to indicate the tso node is not in this keyspace group.
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

// Discover is used to get all the service instances of the specified service name.
func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, error) {
key := discoveryPath(clusterID, serviceName) + "/"
key := ServicePath(clusterID, serviceName) + "/"
endKey := clientv3.GetPrefixRangeEnd(key)

withRange := clientv3.WithRange(endKey)
Expand Down
8 changes: 5 additions & 3 deletions pkg/mcs/discovery/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ const (
registryKey = "registry"
)

func registryPath(clusterID, serviceName, serviceAddr string) string {
// RegistryPath returns the full path to store microservice addresses.
func RegistryPath(clusterID, serviceName, serviceAddr string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey, serviceAddr}, "/")
}

func discoveryPath(clusterID, serviceName string) string {
// ServicePath returns the path to store microservice addresses.
func ServicePath(clusterID, serviceName string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey}, "/")
}

// TSOPath returns the path to store TSO addresses.
func TSOPath(clusterID uint64) string {
return discoveryPath(strconv.FormatUint(clusterID, 10), "tso") + "/"
return ServicePath(strconv.FormatUint(clusterID, 10), "tso") + "/"
}
2 changes: 1 addition & 1 deletion pkg/mcs/discovery/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type ServiceRegister struct {
// NewServiceRegister creates a new ServiceRegister.
func NewServiceRegister(ctx context.Context, cli *clientv3.Client, clusterID, serviceName, serviceAddr, serializedValue string, ttl int64) *ServiceRegister {
cctx, cancel := context.WithCancel(ctx)
serviceKey := registryPath(clusterID, serviceName, serviceAddr)
serviceKey := RegistryPath(clusterID, serviceName, serviceAddr)
return &ServiceRegister{
ctx: cctx,
cancel: cancel,
Expand Down
3 changes: 2 additions & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,8 @@ func (s *Server) startServer() (err error) {
tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID)
s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr, legacySvcRootPath, tsoSvcRootPath, s.cfg)
s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr,
discovery.TSOPath(s.clusterID), legacySvcRootPath, tsoSvcRootPath, s.cfg)
if err := s.keyspaceGroupManager.Initialize(); err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,8 @@ const (
DefaultKeyspaceGroupReplicaCount = 2

// DefaultKeyspaceGroupReplicaPriority is the default priority of a keyspace group replica.
// It's used in keyspace group primary weighted-election to balance primaries' distribution.
// Among multiple replicas of a keyspace group, the higher the priority, the more likely
// the replica is to be elected as primary.
DefaultKeyspaceGroupReplicaPriority = 0
)
Loading

0 comments on commit 4ca73d0

Please sign in to comment.