Skip to content

Commit

Permalink
tso, tests: implement the keyspace group merge checker (#6625)
Browse files Browse the repository at this point in the history
ref #6589

Implement the keyspace group merge checker.

Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Jun 25, 2023
1 parent f4d774a commit 16d3b51
Show file tree
Hide file tree
Showing 9 changed files with 426 additions and 52 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,11 @@ error = '''
the keyspace group id is invalid, %s
'''

["PD:tso:ErrKeyspaceGroupIsMerging"]
error = '''
the keyspace group %d is merging
'''

["PD:tso:ErrKeyspaceGroupNotInitialized"]
error = '''
the keyspace group %d isn't initialized
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (
ErrKeyspaceGroupNotInitialized = errors.Normalize("the keyspace group %d isn't initialized", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupNotInitialized"))
ErrKeyspaceNotAssigned = errors.Normalize("the keyspace %d isn't assigned to any keyspace group", errors.RFCCodeText("PD:tso:ErrKeyspaceNotAssigned"))
ErrGetMinTS = errors.Normalize("get min ts failed, %s", errors.RFCCodeText("PD:tso:ErrGetMinTS"))
ErrKeyspaceGroupIsMerging = errors.Normalize("the keyspace group %d is merging", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIsMerging"))
)

// member errors
Expand Down
13 changes: 13 additions & 0 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1406,3 +1406,16 @@ func (am *AllocatorManager) GetLeaderAddr() string {
}
return leaderAddrs[0]
}

// Construct the timestampOracle path prefix, which is:
// 1. for the default keyspace group:
// "" in /pd/{cluster_id}/timestamp
// 2. for the non-default keyspace groups:
// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp
func (am *AllocatorManager) getKeyspaceGroupTSPath(groupID uint32) string {
tsPath := ""
if am.kgID != mcsutils.DefaultKeyspaceGroupID {
tsPath = path.Join(fmt.Sprintf("%05d", groupID), globalTSOAllocatorEtcdPrefix)
}
return tsPath
}
14 changes: 1 addition & 13 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"errors"
"fmt"
"path"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -89,16 +88,6 @@ func NewGlobalTSOAllocator(
am *AllocatorManager,
startGlobalLeaderLoop bool,
) Allocator {
// Construct the timestampOracle path prefix, which is:
// 1. for the default keyspace group:
// "" in /pd/{cluster_id}/timestamp
// 2. for the non-default keyspace groups:
// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp
tsPath := ""
if am.kgID != mcsutils.DefaultKeyspaceGroupID {
tsPath = path.Join(fmt.Sprintf("%05d", am.kgID), globalTSOAllocatorEtcdPrefix)
}

ctx, cancel := context.WithCancel(ctx)
gta := &GlobalTSOAllocator{
ctx: ctx,
Expand All @@ -107,8 +96,7 @@ func NewGlobalTSOAllocator(
member: am.member,
timestampOracle: &timestampOracle{
client: am.member.GetLeadership().GetClient(),
rootPath: am.rootPath,
tsPath: tsPath,
tsPath: am.getKeyspaceGroupTSPath(am.kgID),
storage: am.storage,
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
Expand Down
216 changes: 209 additions & 7 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ import (
"github.com/tikv/pd/pkg/mcs/discovery"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
Expand All @@ -50,6 +52,9 @@ const (
keyspaceGroupsElectionPath = mcsutils.KeyspaceGroupsKey + "/election"
// primaryKey is the key for keyspace group primary election.
primaryKey = "primary"
// mergingCheckInterval is the interval for merging check to see if the keyspace groups
// merging process could be moved forward.
mergingCheckInterval = 5 * time.Second
)

type state struct {
Expand Down Expand Up @@ -241,6 +246,9 @@ type KeyspaceGroupManager struct {
groupWatcher *etcdutil.LoopWatcher

primaryPathBuilder *kgPrimaryPathBuilder

// mergeCheckerCancelMap is the cancel function map for the merge checker of each keyspace group.
mergeCheckerCancelMap sync.Map // GroupID -> context.CancelFunc
}

// NewKeyspaceGroupManager creates a new Keyspace Group Manager.
Expand Down Expand Up @@ -384,12 +392,9 @@ func (kgm *KeyspaceGroupManager) Close() {
}

func (kgm *KeyspaceGroupManager) isAssignedToMe(group *endpoint.KeyspaceGroup) bool {
for _, member := range group.Members {
if member.Address == kgm.tsoServiceID.ServiceAddr {
return true
}
}
return false
return slice.AnyOf(group.Members, func(i int) bool {
return group.Members[i].Address == kgm.tsoServiceID.ServiceAddr
})
}

// updateKeyspaceGroup applies the given keyspace group. If the keyspace group is just assigned to
Expand All @@ -416,9 +421,25 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
return
}

oldAM, oldGroup := kgm.getKeyspaceGroupMeta(group.ID)
// If this host owns a replica of the keyspace group which is the merge target,
// it should run the merging checker when the merge state first time changes.
if !oldGroup.IsMergeTarget() && group.IsMergeTarget() {
ctx, cancel := context.WithCancel(kgm.ctx)
kgm.mergeCheckerCancelMap.Store(group.ID, cancel)
kgm.wg.Add(1)
go kgm.mergingChecker(ctx, group.ID, group.MergeState.MergeList)
}
// If the merge state has been finished, cancel its merging checker.
if oldGroup.IsMergeTarget() && !group.IsMergeTarget() {
if cancel, loaded := kgm.mergeCheckerCancelMap.LoadAndDelete(group.ID); loaded && cancel != nil {
cancel.(context.CancelFunc)()
}
}

// If this host is already assigned a replica of this keyspace group, i.e., the election member
// is already initialized, just update the meta.
if oldAM, oldGroup := kgm.getKeyspaceGroupMeta(group.ID); oldAM != nil {
if oldAM != nil {
kgm.updateKeyspaceGroupMembership(oldGroup, group, true)
return
}
Expand Down Expand Up @@ -738,6 +759,10 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest(
if err != nil {
return pdpb.Timestamp{}, curKeyspaceGroupID, err
}
err = kgm.checkTSOMerge(curKeyspaceGroupID)
if err != nil {
return pdpb.Timestamp{}, curKeyspaceGroupID, err
}
ts, err = am.HandleRequest(dcLocation, count)
return ts, curKeyspaceGroupID, err
}
Expand Down Expand Up @@ -898,3 +923,180 @@ func (kgm *KeyspaceGroupManager) finishSplitKeyspaceGroup(id uint32) error {
kgm.kgs[id] = splitGroup
return nil
}

func (kgm *KeyspaceGroupManager) finishMergeKeyspaceGroup(id uint32) error {
kgm.Lock()
defer kgm.Unlock()
// Check if the keyspace group is in the merging state.
mergeTarget := kgm.kgs[id]
if !mergeTarget.IsMergeTarget() {
return nil
}
// Check if the HTTP client is initialized.
if kgm.httpClient == nil {
return nil
}
statusCode, err := apiutil.DoDelete(
kgm.httpClient,
kgm.cfg.GeBackendEndpoints()+keyspaceGroupsAPIPrefix+fmt.Sprintf("/%d/merge", id))
if err != nil {
return err
}
if statusCode != http.StatusOK {
log.Warn("failed to finish merging keyspace group",
zap.Uint32("keyspace-group-id", id),
zap.Int("status-code", statusCode))
return errs.ErrSendRequest.FastGenByArgs()
}
// Pre-update the split keyspace group split state in memory.
mergeTarget.MergeState = nil
kgm.kgs[id] = mergeTarget
return nil
}

// mergingChecker is used to check if the keyspace group is in merge state, and if so, it will
// make sure the newly merged TSO keep consistent with the original ones.
func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTargetID uint32, mergeList []uint32) {
log.Info("start to merge the keyspace group",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList))
defer logutil.LogPanic()
defer kgm.wg.Done()

checkTicker := time.NewTicker(mergingCheckInterval)
defer checkTicker.Stop()
// Prepare the merge map.
mergeMap := make(map[uint32]struct{}, len(mergeList))
for _, id := range mergeList {
mergeMap[id] = struct{}{}
}

for {
select {
case <-ctx.Done():
log.Info("merging checker is closed",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList))
return
case <-checkTicker.C:
}
// Check if current TSO node is the merge target TSO primary node.
am, err := kgm.GetAllocatorManager(mergeTargetID)
if err != nil {
log.Warn("unable to get the merge target allocator manager",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("keyspace-group-id", mergeTargetID),
zap.Any("merge-list", mergeList),
zap.Error(err))
continue
}
// If the current TSO node is not the merge target TSO primary node,
// we still need to keep this loop running to avoid unexpected primary changes.
if !am.IsLeader() {
log.Debug("current tso node is not the merge target primary",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList))
continue
}
// Check if the keyspace group primaries in the merge map are all gone.
if len(mergeMap) != 0 {
for id := range mergeMap {
leaderPath := path.Join(kgm.primaryPathBuilder.getKeyspaceGroupIDPath(id), primaryKey)
val, err := kgm.tsoSvcStorage.Load(leaderPath)
if err != nil {
log.Error("failed to check if the keyspace group primary in the merge list has gone",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList),
zap.Uint32("merge-id", id),
zap.Any("remaining", mergeMap),
zap.Error(err))
continue
}
if len(val) == 0 {
delete(mergeMap, id)
}
}
}
if len(mergeMap) > 0 {
continue
}
// All the keyspace group primaries in the merge list are gone,
// update the newly merged TSO to make sure it is greater than the original ones.
var mergedTS time.Time
for _, id := range mergeList {
ts, err := kgm.tsoSvcStorage.LoadTimestamp(am.getKeyspaceGroupTSPath(id))
if err != nil || ts == typeutil.ZeroTime {
log.Error("failed to load the keyspace group TSO",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList),
zap.Uint32("merge-id", id),
zap.Time("ts", ts),
zap.Error(err))
mergedTS = typeutil.ZeroTime
break
}
if ts.After(mergedTS) {
mergedTS = ts
}
}
if mergedTS == typeutil.ZeroTime {
continue
}
// Update the newly merged TSO.
// TODO: support the Local TSO Allocator.
allocator, err := am.GetAllocator(GlobalDCLocation)
if err != nil {
log.Error("failed to get the allocator",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList),
zap.Error(err))
continue
}
err = allocator.SetTSO(
tsoutil.GenerateTS(tsoutil.GenerateTimestamp(mergedTS, 1)),
true, true)
if err != nil {
log.Error("failed to update the newly merged TSO",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList),
zap.Time("merged-ts", mergedTS),
zap.Error(err))
continue
}
// Finish the merge.
err = kgm.finishMergeKeyspaceGroup(mergeTargetID)
if err != nil {
log.Error("failed to finish the merge",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList),
zap.Error(err))
continue
}
log.Info("finished merging keyspace group",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
zap.Any("merge-list", mergeList),
zap.Time("merged-ts", mergedTS))
return
}
}

// Reject any request if the keyspace group is in merging state,
// we need to wait for the merging checker to finish the TSO merging.
func (kgm *KeyspaceGroupManager) checkTSOMerge(
keyspaceGroupID uint32,
) error {
_, group := kgm.getKeyspaceGroupMeta(keyspaceGroupID)
if !group.IsMerging() {
return nil
}
return errs.ErrKeyspaceGroupIsMerging.FastGenByArgs(keyspaceGroupID)
}
1 change: 0 additions & 1 deletion pkg/tso/local_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func NewLocalTSOAllocator(
leadership: leadership,
timestampOracle: &timestampOracle{
client: leadership.GetClient(),
rootPath: am.rootPath,
tsPath: tsPath,
storage: am.storage,
saveInterval: am.saveInterval,
Expand Down
3 changes: 1 addition & 2 deletions pkg/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ type tsoObject struct {

// timestampOracle is used to maintain the logic of TSO.
type timestampOracle struct {
client *clientv3.Client
rootPath string
client *clientv3.Client
// When tsPath is empty, it means that it is a global timestampOracle.
tsPath string
storage endpoint.TSOStorage
Expand Down
Loading

0 comments on commit 16d3b51

Please sign in to comment.