Skip to content

Commit

Permalink
Merge branch 'master' into fix-pd-ctl
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] committed Jun 26, 2023
2 parents 531ffa0 + 4d26c8b commit 2fd34c4
Show file tree
Hide file tree
Showing 37 changed files with 1,568 additions and 303 deletions.
4 changes: 2 additions & 2 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (c *tsoClient) updateTSOLocalServAddrs(allocatorMap map[string]string) erro
return err
}
c.tsoAllocators.Store(dcLocation, addr)
log.Info("[tso] switch dc tso allocator serving address",
log.Info("[tso] switch dc tso local allocator serving address",
zap.String("dc-location", dcLocation),
zap.String("new-address", addr),
zap.String("old-address", oldAddr))
Expand All @@ -227,7 +227,7 @@ func (c *tsoClient) updateTSOLocalServAddrs(allocatorMap map[string]string) erro

func (c *tsoClient) updateTSOGlobalServAddr(addr string) error {
c.tsoAllocators.Store(globalDCLocation, addr)
log.Info("[tso] switch dc tso allocator serving address",
log.Info("[tso] switch dc tso global allocator serving address",
zap.String("dc-location", globalDCLocation),
zap.String("new-address", addr))
c.scheduleCheckTSODispatcher()
Expand Down
34 changes: 26 additions & 8 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ type tsoDispatcher struct {
}

type lastTSO struct {
physical int64
logical int64
keyspaceGroupID uint32
physical int64
logical int64
}

const (
Expand Down Expand Up @@ -708,7 +709,7 @@ func (c *tsoClient) processRequests(

requests := tbc.getCollectedRequests()
count := int64(len(requests))
physical, logical, suffixBits, err := stream.processRequests(
respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests(
c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), c.svcDiscovery.GetKeyspaceGroupID(),
dcLocation, requests, tbc.batchStartTime)
if err != nil {
Expand All @@ -717,33 +718,50 @@ func (c *tsoClient) processRequests(
}
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits)
c.compareAndSwapTS(dcLocation, physical, firstLogical, suffixBits, count)
c.compareAndSwapTS(dcLocation, respKeyspaceGroupID, physical, firstLogical, suffixBits, count)
c.finishRequest(requests, physical, firstLogical, suffixBits, nil)
return nil
}

func (c *tsoClient) compareAndSwapTS(dcLocation string, physical, firstLogical int64, suffixBits uint32, count int64) {
func (c *tsoClient) compareAndSwapTS(
dcLocation string, respKeyspaceGroupID uint32,
physical, firstLogical int64, suffixBits uint32, count int64,
) {
largestLogical := tsoutil.AddLogical(firstLogical, count-1, suffixBits)
lastTSOInterface, loaded := c.lastTSMap.LoadOrStore(dcLocation, &lastTSO{
physical: physical,
keyspaceGroupID: respKeyspaceGroupID,
physical: physical,
// Save the largest logical part here
logical: largestLogical,
})
if !loaded {
return
}
lastTSOPointer := lastTSOInterface.(*lastTSO)
lastKeyspaceGroupID := lastTSOPointer.keyspaceGroupID
lastPhysical := lastTSOPointer.physical
lastLogical := lastTSOPointer.logical

if lastKeyspaceGroupID != respKeyspaceGroupID {
log.Info("[tso] keyspace group changed",
zap.String("dc-location", dcLocation),
zap.Uint32("old-group-id", lastKeyspaceGroupID),
zap.Uint32("new-group-id", respKeyspaceGroupID))
}

// The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical
// to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then
// all TSOs we get will be [6, 7, 8, 9, 10].
if tsoutil.TSLessEqual(physical, firstLogical, lastPhysical, lastLogical) {
panic(errors.Errorf(
"%s timestamp fallback, new ts (%d, %d) <= the last one (%d, %d). keyspace: %d, keyspace group: %d",
"%s timestamp fallback, new ts (%d, %d) <= the last one (%d, %d). "+
"last keyspace group: %d, keyspace in request: %d, "+
"keyspace group in request: %d, keyspace group in response: %d",
dcLocation, physical, firstLogical, lastPhysical, lastLogical,
c.svcDiscovery.GetKeyspaceID(), c.svcDiscovery.GetKeyspaceGroupID()))
lastKeyspaceGroupID, c.svcDiscovery.GetKeyspaceID(),
c.svcDiscovery.GetKeyspaceGroupID(), respKeyspaceGroupID))
}
lastTSOPointer.keyspaceGroupID = respKeyspaceGroupID
lastTSOPointer.physical = physical
// Same as above, we save the largest logical part here.
lastTSOPointer.logical = largestLogical
Expand Down
11 changes: 6 additions & 5 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -121,7 +122,7 @@ type tsoServiceDiscovery struct {
metacli MetaStorageClient
apiSvcDiscovery ServiceDiscovery
clusterID uint64
keyspaceID uint32
keyspaceID atomic.Uint32

// defaultDiscoveryKey is the etcd path used for discovering the serving endpoints of
// the default keyspace group
Expand Down Expand Up @@ -165,12 +166,12 @@ func newTSOServiceDiscovery(
cancel: cancel,
metacli: metacli,
apiSvcDiscovery: apiSvcDiscovery,
keyspaceID: keyspaceID,
clusterID: clusterID,
tlsCfg: tlsCfg,
option: option,
checkMembershipCh: make(chan struct{}, 1),
}
c.keyspaceID.Store(keyspaceID)
c.keyspaceGroupSD = &keyspaceGroupSvcDiscovery{
primaryAddr: "",
secondaryAddrs: make([]string, 0),
Expand Down Expand Up @@ -269,12 +270,12 @@ func (c *tsoServiceDiscovery) GetClusterID() uint64 {

// GetKeyspaceID returns the ID of the keyspace
func (c *tsoServiceDiscovery) GetKeyspaceID() uint32 {
return c.keyspaceID
return c.keyspaceID.Load()
}

// SetKeyspaceID sets the ID of the keyspace
func (c *tsoServiceDiscovery) SetKeyspaceID(keyspaceID uint32) {
c.keyspaceID = keyspaceID
c.keyspaceID.Store(keyspaceID)
}

// GetKeyspaceGroupID returns the ID of the keyspace group. If the keyspace group is unknown,
Expand Down Expand Up @@ -426,7 +427,7 @@ func (c *tsoServiceDiscovery) updateMember() error {

var keyspaceGroup *tsopb.KeyspaceGroup
if len(tsoServerAddr) > 0 {
keyspaceGroup, err = c.findGroupByKeyspaceID(c.keyspaceID, tsoServerAddr, updateMemberTimeout)
keyspaceGroup, err = c.findGroupByKeyspaceID(c.GetKeyspaceID(), tsoServerAddr, updateMemberTimeout)
if err != nil {
if c.tsoServerDiscovery.countFailure() {
log.Error("[tso] failed to find the keyspace group", errs.ZapError(err))
Expand Down
8 changes: 5 additions & 3 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type tsoStream interface {
processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
requests []*tsoRequest, batchStartTime time.Time,
) (physical, logical int64, suffixBits uint32, err error)
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error)
}

type pdTSOStream struct {
Expand All @@ -111,7 +111,7 @@ type pdTSOStream struct {

func (s *pdTSOStream) processRequests(
clusterID uint64, _, _ uint32, dcLocation string, requests []*tsoRequest, batchStartTime time.Time,
) (physical, logical int64, suffixBits uint32, err error) {
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
count := int64(len(requests))
req := &pdpb.TsoRequest{
Expand Down Expand Up @@ -149,6 +149,7 @@ func (s *pdTSOStream) processRequests(
}

ts := resp.GetTimestamp()
respKeyspaceGroupID = defaultKeySpaceGroupID
physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
return
}
Expand All @@ -160,7 +161,7 @@ type tsoTSOStream struct {
func (s *tsoTSOStream) processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
requests []*tsoRequest, batchStartTime time.Time,
) (physical, logical int64, suffixBits uint32, err error) {
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
count := int64(len(requests))
req := &tsopb.TsoRequest{
Expand Down Expand Up @@ -200,6 +201,7 @@ func (s *tsoTSOStream) processRequests(
}

ts := resp.GetTimestamp()
respKeyspaceGroupID = resp.GetHeader().GetKeyspaceGroupId()
physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
return
}
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,11 @@ error = '''
the keyspace group id is invalid, %s
'''

["PD:tso:ErrKeyspaceGroupIsMerging"]
error = '''
the keyspace group %d is merging
'''

["PD:tso:ErrKeyspaceGroupNotInitialized"]
error = '''
the keyspace group %d isn't initialized
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (
ErrKeyspaceGroupNotInitialized = errors.Normalize("the keyspace group %d isn't initialized", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupNotInitialized"))
ErrKeyspaceNotAssigned = errors.Normalize("the keyspace %d isn't assigned to any keyspace group", errors.RFCCodeText("PD:tso:ErrKeyspaceNotAssigned"))
ErrGetMinTS = errors.Normalize("get min ts failed, %s", errors.RFCCodeText("PD:tso:ErrGetMinTS"))
ErrKeyspaceGroupIsMerging = errors.Normalize("the keyspace group %d is merging", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIsMerging"))
)

// member errors
Expand Down
25 changes: 22 additions & 3 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,16 @@ func (manager *Manager) allocID() (uint32, error) {
}

// PatrolKeyspaceAssignment is used to patrol all keyspaces and assign them to the keyspace groups.
func (manager *Manager) PatrolKeyspaceAssignment() error {
func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID uint32) error {
if startKeyspaceID > manager.nextPatrolStartID {
manager.nextPatrolStartID = startKeyspaceID
}
if endKeyspaceID != 0 && endKeyspaceID < manager.nextPatrolStartID {
log.Info("[keyspace] end keyspace id is smaller than the next patrol start id, skip patrol",
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("next-patrol-start-id", manager.nextPatrolStartID))
return nil
}
var (
// Some statistics info.
start = time.Now()
Expand All @@ -675,6 +684,8 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
zap.Uint64("patrolled-keyspace-count", patrolledKeyspaceCount),
zap.Uint64("assigned-keyspace-count", assignedKeyspaceCount),
zap.Int("batch-size", maxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
zap.Uint32("next-start-id", nextStartID),
)
Expand Down Expand Up @@ -706,8 +717,8 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
currentStartID = keyspaces[0].GetId()
nextStartID = keyspaces[keyspaceNum-1].GetId() + 1
}
// If there are less than `maxEtcdTxnOps` keyspaces,
// we have reached the end of the keyspace list.
// If there are less than `maxEtcdTxnOps` keyspaces or the next start ID reaches the end,
// there is no need to patrol again.
moreToPatrol = keyspaceNum == maxEtcdTxnOps
var (
assigned = false
Expand All @@ -722,6 +733,10 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
if ks == nil {
continue
}
if endKeyspaceID != 0 && ks.Id > endKeyspaceID {
moreToPatrol = false
break
}
patrolledKeyspaceCount++
manager.metaLock.Lock(ks.Id)
if ks.Config == nil {
Expand All @@ -744,6 +759,8 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
if err != nil {
log.Error("[keyspace] failed to save keyspace meta during patrol",
zap.Int("batch-size", maxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
zap.Uint32("next-start-id", nextStartID),
zap.Uint32("keyspace-id", ks.Id), zap.Error(err))
Expand All @@ -756,6 +773,8 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
if err != nil {
log.Error("[keyspace] failed to save default keyspace group meta during patrol",
zap.Int("batch-size", maxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
zap.Uint32("next-start-id", nextStartID), zap.Error(err))
return err
Expand Down
49 changes: 46 additions & 3 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() {
re.NotNil(defaultKeyspaceGroup)
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(111))
// Patrol the keyspace assignment.
err = suite.manager.PatrolKeyspaceAssignment()
err = suite.manager.PatrolKeyspaceAssignment(0, 0)
re.NoError(err)
// Check if the keyspace is attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
Expand Down Expand Up @@ -424,7 +424,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
// Patrol the keyspace assignment.
err = suite.manager.PatrolKeyspaceAssignment()
err = suite.manager.PatrolKeyspaceAssignment(0, 0)
re.NoError(err)
// Check if all the keyspaces are attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
Expand All @@ -435,6 +435,49 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
}
}

func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() {
re := suite.Require()
// Create some keyspaces without any keyspace group.
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
now := time.Now().Unix()
err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: uint32(i),
Name: strconv.Itoa(i),
State: keyspacepb.KeyspaceState_ENABLED,
CreatedAt: now,
StateChangedAt: now,
})
re.NoError(err)
}
// Check if all the keyspaces are not attached to the default group.
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
// Patrol the keyspace assignment with range [maxEtcdTxnOps/2, maxEtcdTxnOps/2+maxEtcdTxnOps+1]
// to make sure the range crossing the boundary of etcd transaction operation limit.
var (
startKeyspaceID = uint32(maxEtcdTxnOps / 2)
endKeyspaceID = startKeyspaceID + maxEtcdTxnOps + 1
)
err = suite.manager.PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID)
re.NoError(err)
// Check if only the keyspaces within the range are attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
keyspaceID := uint32(i)
if keyspaceID >= startKeyspaceID && keyspaceID <= endKeyspaceID {
re.Contains(defaultKeyspaceGroup.Keyspaces, keyspaceID)
} else {
re.NotContains(defaultKeyspaceGroup.Keyspaces, keyspaceID)
}
}
}

// Benchmark the keyspace assignment patrol.
func BenchmarkPatrolKeyspaceAssignment1000(b *testing.B) {
benchmarkPatrolKeyspaceAssignmentN(1000, b)
Expand Down Expand Up @@ -471,7 +514,7 @@ func benchmarkPatrolKeyspaceAssignmentN(
// Benchmark the keyspace assignment patrol.
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := suite.manager.PatrolKeyspaceAssignment()
err := suite.manager.PatrolKeyspaceAssignment(0, 0)
re.NoError(err)
}
b.StopTimer()
Expand Down
Loading

0 comments on commit 2fd34c4

Please sign in to comment.