Skip to content

Commit

Permalink
Merge branch 'master' into remove-some-store-config
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored Aug 4, 2023
2 parents c53d54d + 365e384 commit acae11c
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 28 deletions.
6 changes: 4 additions & 2 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"math"
"path"
"runtime/trace"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -1135,7 +1136,8 @@ func (am *AllocatorManager) deleteAllocatorGroup(dcLocation string) {
}

// HandleRequest forwards TSO allocation requests to correct TSO Allocators.
func (am *AllocatorManager) HandleRequest(dcLocation string, count uint32) (pdpb.Timestamp, error) {
func (am *AllocatorManager) HandleRequest(ctx context.Context, dcLocation string, count uint32) (pdpb.Timestamp, error) {
defer trace.StartRegion(ctx, "AllocatorManager.HandleRequest").End()
if len(dcLocation) == 0 {
dcLocation = GlobalDCLocation
}
Expand All @@ -1145,7 +1147,7 @@ func (am *AllocatorManager) HandleRequest(dcLocation string, count uint32) (pdpb
return pdpb.Timestamp{}, err
}

return allocatorGroup.allocator.GenerateTSO(count)
return allocatorGroup.allocator.GenerateTSO(ctx, count)
}

// ResetAllocatorGroup will reset the allocator's leadership and TSO initialized in memory.
Expand Down
23 changes: 14 additions & 9 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"runtime/trace"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -61,7 +62,7 @@ type Allocator interface {
SetTSO(tso uint64, ignoreSmaller, skipUpperBoundCheck bool) error
// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
GenerateTSO(count uint32) (pdpb.Timestamp, error)
GenerateTSO(ctx context.Context, count uint32) (pdpb.Timestamp, error)
// Reset is used to reset the TSO allocator.
Reset()
}
Expand Down Expand Up @@ -151,8 +152,8 @@ func (gta *GlobalTSOAllocator) GetTimestampPath() string {
return gta.timestampOracle.GetTimestampPath()
}

func (gta *GlobalTSOAllocator) estimateMaxTS(count uint32, suffixBits int) (*pdpb.Timestamp, bool, error) {
physical, logical, lastUpdateTime := gta.timestampOracle.generateTSO(int64(count), 0)
func (gta *GlobalTSOAllocator) estimateMaxTS(ctx context.Context, count uint32, suffixBits int) (*pdpb.Timestamp, bool, error) {
physical, logical, lastUpdateTime := gta.timestampOracle.generateTSO(ctx, int64(count), 0)
if physical == 0 {
return &pdpb.Timestamp{}, false, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory isn't initialized")
}
Expand Down Expand Up @@ -202,7 +203,8 @@ func (gta *GlobalTSOAllocator) SetTSO(tso uint64, ignoreSmaller, skipUpperBoundC
// 1. Collect the max Local TSO from all Local TSO Allocator leaders and write it back to them as MaxTS.
// 2. Estimate a MaxTS and try to write it to all Local TSO Allocator leaders directly to reduce the RTT.
// During the process, if the estimated MaxTS is not accurate, it will fallback to the collecting way.
func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) {
func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (pdpb.Timestamp, error) {
defer trace.StartRegion(ctx, "GlobalTSOAllocator.GenerateTSO").End()
if !gta.member.GetLeadership().Check() {
tsoCounter.WithLabelValues("not_leader", gta.timestampOracle.dcLocation).Inc()
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested pd %s of cluster", errs.NotLeaderErr))
Expand All @@ -212,8 +214,9 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error)
// No dc-locations configured in the cluster, use the normal Global TSO generation way.
// (without synchronization with other Local TSO Allocators)
if len(dcLocationMap) == 0 {
return gta.timestampOracle.getTS(gta.member.GetLeadership(), count, 0)
return gta.timestampOracle.getTS(ctx, gta.member.GetLeadership(), count, 0)
}
ctx1 := ctx

// Have dc-locations configured in the cluster, use the Global TSO generation way.
// (whit synchronization with other Local TSO Allocators)
Expand All @@ -229,7 +232,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error)
)
// TODO: add a switch to control whether to enable the MaxTSO estimation.
// 1. Estimate a MaxTS among all Local TSO Allocator leaders according to the RTT.
estimatedMaxTSO, shouldRetry, err = gta.estimateMaxTS(count, suffixBits)
estimatedMaxTSO, shouldRetry, err = gta.estimateMaxTS(ctx1, count, suffixBits)
if err != nil {
log.Error("global tso allocator estimates MaxTS failed",
logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0),
Expand Down Expand Up @@ -271,7 +274,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error)
}
// 4. Persist MaxTS into memory, and etcd if needed
var currentGlobalTSO *pdpb.Timestamp
if currentGlobalTSO, err = gta.getCurrentTSO(); err != nil {
if currentGlobalTSO, err = gta.getCurrentTSO(ctx1); err != nil {
log.Error("global tso allocator gets the current global tso in memory failed",
logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0),
errs.ZapError(err))
Expand All @@ -280,7 +283,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error)
if tsoutil.CompareTimestamp(currentGlobalTSO, &globalTSOResp) < 0 {
tsoCounter.WithLabelValues("global_tso_persist", gta.timestampOracle.dcLocation).Inc()
// Update the Global TSO in memory
if err = gta.timestampOracle.resetUserTimestamp(gta.member.GetLeadership(), tsoutil.GenerateTS(&globalTSOResp), true); err != nil {
if err = gta.timestampOracle.resetUserTimestamp(ctx1, gta.member.GetLeadership(), tsoutil.GenerateTS(&globalTSOResp), true); err != nil {
tsoCounter.WithLabelValues("global_tso_persist_err", gta.timestampOracle.dcLocation).Inc()
log.Error("global tso allocator update the global tso in memory failed",
logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0),
Expand Down Expand Up @@ -350,6 +353,7 @@ func (gta *GlobalTSOAllocator) SyncMaxTS(
maxTSO *pdpb.Timestamp,
skipCheck bool,
) error {
defer trace.StartRegion(ctx, "GlobalTSOAllocator.SyncMaxTS").End()
originalMaxTSO := *maxTSO
for i := 0; i < syncMaxRetryCount; i++ {
// Collect all allocator leaders' client URLs
Expand Down Expand Up @@ -501,7 +505,8 @@ func (gta *GlobalTSOAllocator) checkSyncedDCs(dcLocationMap map[string]DCLocatio
return len(unsyncedDCs) == 0, unsyncedDCs
}

func (gta *GlobalTSOAllocator) getCurrentTSO() (*pdpb.Timestamp, error) {
func (gta *GlobalTSOAllocator) getCurrentTSO(ctx context.Context) (*pdpb.Timestamp, error) {
defer trace.StartRegion(ctx, "GlobalTSOAllocator.getCurrentTSO").End()
currentPhysical, currentLogical := gta.timestampOracle.getTSO()
if currentPhysical == typeutil.ZeroTime {
return &pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory isn't initialized")
Expand Down
8 changes: 4 additions & 4 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,7 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest(
if err != nil {
return pdpb.Timestamp{}, curKeyspaceGroupID, err
}
ts, err = am.HandleRequest(dcLocation, count)
ts, err = am.HandleRequest(context.Background(), dcLocation, count)
return ts, curKeyspaceGroupID, err
}

Expand Down Expand Up @@ -1033,7 +1033,7 @@ func (kgm *KeyspaceGroupManager) GetMinTS(
if kgm.kgs[i] != nil && kgm.kgs[i].IsSplitTarget() {
continue
}
ts, err := am.HandleRequest(dcLocation, 1)
ts, err := am.HandleRequest(context.Background(), dcLocation, 1)
if err != nil {
return pdpb.Timestamp{}, kgAskedCount, kgTotalCount, err
}
Expand Down Expand Up @@ -1077,11 +1077,11 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit(
if err != nil {
return err
}
splitTargetTSO, err := splitTargetAllocator.GenerateTSO(1)
splitTargetTSO, err := splitTargetAllocator.GenerateTSO(context.Background(), 1)
if err != nil {
return err
}
splitSourceTSO, err := splitSourceAllocator.GenerateTSO(1)
splitSourceTSO, err := splitSourceAllocator.GenerateTSO(context.Background(), 1)
if err != nil {
return err
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/tso/local_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"path"
"runtime/trace"
"sync/atomic"
"time"

Expand Down Expand Up @@ -122,13 +123,14 @@ func (lta *LocalTSOAllocator) SetTSO(tso uint64, ignoreSmaller, skipUpperBoundCh

// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
func (lta *LocalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) {
func (lta *LocalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (pdpb.Timestamp, error) {
defer trace.StartRegion(ctx, "LocalTSOAllocator.GenerateTSO").End()
if !lta.leadership.Check() {
tsoCounter.WithLabelValues("not_leader", lta.timestampOracle.dcLocation).Inc()
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(
fmt.Sprintf("requested pd %s of %s allocator", errs.NotLeaderErr, lta.timestampOracle.dcLocation))
}
return lta.timestampOracle.getTS(lta.leadership, count, lta.allocatorManager.GetSuffixBits())
return lta.timestampOracle.getTS(ctx, lta.leadership, count, lta.allocatorManager.GetSuffixBits())
}

// Reset is used to reset the TSO allocator.
Expand Down Expand Up @@ -180,7 +182,7 @@ func (lta *LocalTSOAllocator) WriteTSO(maxTS *pdpb.Timestamp) error {
if tsoutil.CompareTimestamp(currentTSO, maxTS) >= 0 {
return nil
}
return lta.timestampOracle.resetUserTimestamp(lta.leadership, tsoutil.GenerateTS(maxTS), true)
return lta.timestampOracle.resetUserTimestamp(context.Background(), lta.leadership, tsoutil.GenerateTS(maxTS), true)
}

// EnableAllocatorLeader sets the Local TSO Allocator itself to a leader.
Expand Down
13 changes: 9 additions & 4 deletions pkg/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package tso

import (
"context"
"fmt"
"runtime/trace"
"sync/atomic"
"time"

Expand Down Expand Up @@ -103,7 +105,8 @@ func (t *timestampOracle) getTSO() (time.Time, int64) {
}

// generateTSO will add the TSO's logical part with the given count and returns the new TSO result.
func (t *timestampOracle) generateTSO(count int64, suffixBits int) (physical int64, logical int64, lastUpdateTime time.Time) {
func (t *timestampOracle) generateTSO(ctx context.Context, count int64, suffixBits int) (physical int64, logical int64, lastUpdateTime time.Time) {
defer trace.StartRegion(ctx, "timestampOracle.generateTSO").End()
t.tsoMux.Lock()
defer t.tsoMux.Unlock()
if t.tsoMux.physical == typeutil.ZeroTime {
Expand Down Expand Up @@ -201,7 +204,8 @@ func (t *timestampOracle) isInitialized() bool {
// When ignoreSmaller is true, resetUserTimestamp will ignore the smaller tso resetting error and do nothing.
// It's used to write MaxTS during the Global TSO synchronization without failing the writing as much as possible.
// cannot set timestamp to one which >= current + maxResetTSGap
func (t *timestampOracle) resetUserTimestamp(leadership *election.Leadership, tso uint64, ignoreSmaller bool) error {
func (t *timestampOracle) resetUserTimestamp(ctx context.Context, leadership *election.Leadership, tso uint64, ignoreSmaller bool) error {
defer trace.StartRegion(ctx, "timestampOracle.resetUserTimestamp").End()
return t.resetUserTimestampInner(leadership, tso, ignoreSmaller, false)
}

Expand Down Expand Up @@ -336,7 +340,8 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error
var maxRetryCount = 10

// getTS is used to get a timestamp.
func (t *timestampOracle) getTS(leadership *election.Leadership, count uint32, suffixBits int) (pdpb.Timestamp, error) {
func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leadership, count uint32, suffixBits int) (pdpb.Timestamp, error) {
defer trace.StartRegion(ctx, "timestampOracle.getTS").End()
var resp pdpb.Timestamp
if count == 0 {
return resp, errs.ErrGenerateTimestamp.FastGenByArgs("tso count should be positive")
Expand All @@ -353,7 +358,7 @@ func (t *timestampOracle) getTS(leadership *election.Leadership, count uint32, s
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory isn't initialized")
}
// Get a new TSO result with the given count
resp.Physical, resp.Logical, _ = t.generateTSO(int64(count), suffixBits)
resp.Physical, resp.Logical, _ = t.generateTSO(ctx, int64(count), suffixBits)
if resp.GetPhysical() == 0 {
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory has been reset")
}
Expand Down
2 changes: 1 addition & 1 deletion server/gc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (s *GrpcServer) UpdateServiceSafePointV2(ctx context.Context, request *pdpb
if s.IsAPIServiceMode() {
nowTSO, err = s.getGlobalTSOFromTSOServer(ctx)
} else {
nowTSO, err = s.tsoAllocatorManager.HandleRequest(tso.GlobalDCLocation, 1)
nowTSO, err = s.tsoAllocatorManager.HandleRequest(ctx, tso.GlobalDCLocation, 1)
}
if err != nil {
return nil, err
Expand Down
11 changes: 7 additions & 4 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"io"
"path"
"runtime/trace"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -148,7 +149,7 @@ func (s *GrpcServer) GetMinTS(
minTS, err = s.GetMinTSFromTSOService(tso.GlobalDCLocation)
} else {
start := time.Now()
ts, internalErr := s.tsoAllocatorManager.HandleRequest(tso.GlobalDCLocation, 1)
ts, internalErr := s.tsoAllocatorManager.HandleRequest(ctx, tso.GlobalDCLocation, 1)
if internalErr == nil {
tsoHandleDuration.Observe(time.Since(start).Seconds())
}
Expand Down Expand Up @@ -386,7 +387,9 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
"mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId())
}
count := request.GetCount()
ts, err := s.tsoAllocatorManager.HandleRequest(request.GetDcLocation(), count)
ctx, task := trace.NewTask(ctx, "tso")
ts, err := s.tsoAllocatorManager.HandleRequest(ctx, request.GetDcLocation(), count)
task.End()
if err != nil {
return status.Errorf(codes.Unknown, err.Error())
}
Expand Down Expand Up @@ -1732,7 +1735,7 @@ func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb
if s.IsAPIServiceMode() {
nowTSO, err = s.getGlobalTSOFromTSOServer(ctx)
} else {
nowTSO, err = s.tsoAllocatorManager.HandleRequest(tso.GlobalDCLocation, 1)
nowTSO, err = s.tsoAllocatorManager.HandleRequest(ctx, tso.GlobalDCLocation, 1)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -2493,7 +2496,7 @@ func (s *GrpcServer) SetExternalTimestamp(ctx context.Context, request *pdpb.Set
if s.IsAPIServiceMode() {
nowTSO, err = s.getGlobalTSOFromTSOServer(ctx)
} else {
nowTSO, err = s.tsoAllocatorManager.HandleRequest(tso.GlobalDCLocation, 1)
nowTSO, err = s.tsoAllocatorManager.HandleRequest(ctx, tso.GlobalDCLocation, 1)
}
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion tests/server/tso/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func testTSOSuffix(re *require.Assertions, cluster *tests.TestCluster, am *tso.A
re.NoError(err)
var tso pdpb.Timestamp
testutil.Eventually(re, func() bool {
tso, err = allocator.GenerateTSO(1)
tso, err = allocator.GenerateTSO(context.Background(), 1)
re.NoError(err)
return tso.GetPhysical() != 0
})
Expand Down

0 comments on commit acae11c

Please sign in to comment.