Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso: eliminate unnecessary abstractions in the allocator #9158

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pkg/mcs/tso/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/tikv/pd/pkg/utils/logutil"
Expand Down Expand Up @@ -311,7 +310,7 @@ func transferPrimary(c *gin.Context) {
memberMap[member.Address] = true
}

if err := utils.TransferPrimary(svr.GetClient(), allocator.GetAllocator().(*tso.GlobalTSOAllocator).GetExpectedPrimaryLease(),
if err := utils.TransferPrimary(svr.GetClient(), allocator.GetAllocator().GetExpectedPrimaryLease(),
constant.TSOServiceName, svr.Name(), newPrimary, keyspaceGroupID, memberMap); err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
Expand Down
209 changes: 62 additions & 147 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
)

const (
Expand All @@ -50,23 +48,6 @@
PriorityCheck = time.Minute
)

// AllocatorGroupFilter is used to select AllocatorGroup.
type AllocatorGroupFilter func(ag *allocatorGroup) bool

type allocatorGroup struct {
// ctx is built with cancel from a parent context when set up which can be different
// in order to receive Done() signal correctly.
// cancel would be call when allocatorGroup is deleted to stop background loop.
ctx context.Context
cancel context.CancelFunc
// For the Global TSO Allocator, leadership is a PD leader's
// leadership, and for the Local TSO Allocator, leadership
// is a DC-level certificate to allow an allocator to generate
// TSO for local transactions in its DC.
leadership *election.Leadership
allocator Allocator
}

// ElectionMember defines the interface for the election related logic.
type ElectionMember interface {
// ID returns the unique ID in the election group. For example, it can be unique
Expand Down Expand Up @@ -117,15 +98,8 @@
// It is in charge of maintaining TSO allocators' leadership, checking election
// priority, and forwarding TSO allocation requests to correct TSO Allocators.
type AllocatorManager struct {
mu struct {
syncutil.RWMutex
// Global TSO Allocator, as a global single point to allocate
// TSO for global transactions, such as cross-region cases.
allocatorGroup *allocatorGroup
// The max suffix sign we have so far, it will be used to calculate
// the number of suffix bits we need in the TSO logical part.
maxSuffix int32
}
// Global TSO Allocator, as a global single point to allocate TSO for global transactions.
allocator *GlobalTSOAllocator
// for the synchronization purpose of the service loops
svcLoopWG sync.WaitGroup

Expand All @@ -134,17 +108,12 @@
// kgID is the keyspace group ID
kgID uint32
// member is for election use
member ElectionMember
member ElectionMember
storage endpoint.TSOStorage
// TSO config
storage endpoint.TSOStorage
saveInterval time.Duration
updatePhysicalInterval time.Duration
// leaderLease defines the time within which a TSO primary/leader must update its TTL
// in etcd, otherwise etcd will expire the leader key and other servers can campaign
// the primary/leader again. Etcd only supports seconds TTL, so here is second too.
leaderLease int64
maxResetTSGap func() time.Duration
securityConfig *grpcutil.TLSConfig
cfg Config

logFields []zap.Field
}

// NewAllocatorManager creates a new TSO Allocator Manager.
Expand All @@ -157,45 +126,25 @@
) *AllocatorManager {
ctx, cancel := context.WithCancel(ctx)
am := &AllocatorManager{
ctx: ctx,
cancel: cancel,
kgID: keyspaceGroupID,
member: member,
storage: storage,
saveInterval: cfg.GetTSOSaveInterval(),
updatePhysicalInterval: cfg.GetTSOUpdatePhysicalInterval(),
leaderLease: cfg.GetLeaderLease(),
maxResetTSGap: cfg.GetMaxResetTSGap,
securityConfig: cfg.GetTLSConfig(),
ctx: ctx,
cancel: cancel,
kgID: keyspaceGroupID,
member: member,
storage: storage,
cfg: cfg,
logFields: []zap.Field{
logutil.CondUint32("keyspace-group-id", keyspaceGroupID, keyspaceGroupID > 0),
zap.String("name", member.Name()),
},
}
am.mu.allocatorGroup = &allocatorGroup{}
am.allocator = NewGlobalTSOAllocator(ctx, am)

// Set up the TSO Allocator here, it will be initialized once the member campaigns leader successfully.
am.SetUpAllocator(am.ctx, am.member.GetLeadership())
am.svcLoopWG.Add(1)
go am.tsoAllocatorLoop()

return am
}

// SetUpAllocator is used to set up the allocator, which will initialize the allocator and put it into
// an allocator daemon. An TSO Allocator should only be set once, and may be initialized and reset multiple times
// depending on the election.
func (am *AllocatorManager) SetUpAllocator(ctx context.Context, leadership *election.Leadership) {
am.mu.Lock()
defer am.mu.Unlock()

allocator := NewGlobalTSOAllocator(ctx, am)
// Create a new allocatorGroup
ctx, cancel := context.WithCancel(ctx)
am.mu.allocatorGroup = &allocatorGroup{
ctx: ctx,
cancel: cancel,
leadership: leadership,
allocator: allocator,
}
}

// getGroupID returns the keyspace group ID of the allocator manager.
func (am *AllocatorManager) getGroupID() uint32 {
if am == nil {
Expand All @@ -217,112 +166,76 @@
defer logutil.LogPanic()
defer am.svcLoopWG.Done()

am.AllocatorDaemon(am.ctx)
log.Info("exit allocator loop", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0))
}

// close is used to shutdown TSO Allocator updating daemon.
// tso service call this function to shutdown the loop here, but pd manages its own loop.
func (am *AllocatorManager) close() {
log.Info("closing the allocator manager", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0))

am.GetAllocator().(*GlobalTSOAllocator).close()
am.cancel()
am.svcLoopWG.Wait()

log.Info("closed the allocator manager", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0))
}

// GetMember returns the ElectionMember of this AllocatorManager.
func (am *AllocatorManager) GetMember() ElectionMember {
return am.member
}

// AllocatorDaemon is used to update every allocator's TSO and check whether we have
// any new local allocator that needs to be set up.
func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) {
log.Info("entering into allocator daemon", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0))

tsTicker := time.NewTicker(am.updatePhysicalInterval)
tsTicker := time.NewTicker(am.cfg.GetTSOUpdatePhysicalInterval())
failpoint.Inject("fastUpdatePhysicalInterval", func() {
tsTicker.Reset(time.Millisecond)
})
defer tsTicker.Stop()

log.Info("entering into allocator update loop", am.logFields...)
for {
select {
case <-tsTicker.C:
allocatorGroup := am.mu.allocatorGroup
// Update the initialized TSO Allocator to advance TSO.
if allocatorGroup.allocator.IsInitialize() && allocatorGroup.leadership.Check() {
am.updateAllocator(allocatorGroup)
// Only try to update when the member is leader and the allocator is initialized.
if !am.member.IsLeader() || !am.allocator.IsInitialize() {
continue
}
if err := am.allocator.UpdateTSO(); err != nil {
log.Warn("failed to update allocator's timestamp", append(am.logFields, errs.ZapError(err))...)
am.ResetAllocatorGroup(false)
return
}
case <-ctx.Done():
log.Info("exit allocator daemon", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0))
case <-am.ctx.Done():
am.allocator.reset()
log.Info("exit the allocator update loop", am.logFields...)
return
}
}
}

// updateAllocator is used to update the allocator in the group.
func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) {
defer logutil.LogPanic()
// close is used to shutdown TSO Allocator updating daemon.
// tso service call this function to shutdown the loop here, but pd manages its own loop.
func (am *AllocatorManager) close() {
log.Info("closing the allocator manager", am.logFields...)

select {
case <-ag.ctx.Done():
// Resetting the allocator will clear TSO in memory
ag.allocator.Reset()
log.Info("exit the allocator update loop", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0))
return
default:
}
if !ag.leadership.Check() {
log.Info("allocator doesn't campaign leadership yet",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0))
time.Sleep(200 * time.Millisecond)
return
}
if err := ag.allocator.UpdateTSO(); err != nil {
log.Warn("failed to update allocator's timestamp",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.String("name", am.member.Name()),
errs.ZapError(err))
am.ResetAllocatorGroup(false)
return
}
am.allocator.close()
am.cancel()
am.svcLoopWG.Wait()

log.Info("closed the allocator manager", am.logFields...)
}

// GetMember returns the ElectionMember of this AllocatorManager.
func (am *AllocatorManager) GetMember() ElectionMember {
return am.member
}

// HandleRequest forwards TSO allocation requests to correct TSO Allocators.
func (am *AllocatorManager) HandleRequest(ctx context.Context, count uint32) (pdpb.Timestamp, error) {
defer trace.StartRegion(ctx, "AllocatorManager.HandleRequest").End()
return am.GetAllocator().GenerateTSO(ctx, count)
return am.allocator.generateTSO(ctx, count)
}

// ResetAllocatorGroup will reset the allocator's leadership and TSO initialized in memory.
// It usually should be called before re-triggering an Allocator leader campaign.
func (am *AllocatorManager) ResetAllocatorGroup(skipResetLeader bool) {
am.mu.Lock()
defer am.mu.Unlock()
am.mu.allocatorGroup.allocator.Reset()
am.allocator.reset()
// Reset if it still has the leadership. Otherwise the data race may occur because of the re-campaigning.
if !skipResetLeader && am.mu.allocatorGroup.leadership.Check() {
am.mu.allocatorGroup.leadership.Reset()
if !skipResetLeader && am.member.IsLeader() {
am.member.ResetLeader()
}
}

// GetAllocator get the allocator by dc-location.
func (am *AllocatorManager) GetAllocator() Allocator {
am.mu.RLock()
defer am.mu.RUnlock()
return am.mu.allocatorGroup.allocator
// GetAllocator returns the global TSO allocator.
func (am *AllocatorManager) GetAllocator() *GlobalTSOAllocator {
return am.allocator
}

// IsLeader returns whether the current member is the leader in the election group.
func (am *AllocatorManager) IsLeader() bool {
if am == nil || am.member == nil || !am.member.IsLeader() {
func (am *AllocatorManager) isLeader() bool {
if am == nil || am.member == nil {
return false
}
return true
return am.member.IsLeader()
}

// GetLeaderAddr returns the address of leader in the election group.
Expand All @@ -337,13 +250,15 @@
return leaderAddrs[0]
}

// The PD server will conduct its own leadership election independently of the allocator manager,
// while the TSO service will manage its leadership election within the allocator manager.
// This function is used to manually initiate the allocator leadership election loop.
func (am *AllocatorManager) startGlobalAllocatorLoop() {
globalTSOAllocator, ok := am.mu.allocatorGroup.allocator.(*GlobalTSOAllocator)
if !ok {
if am.allocator == nil {
// it should never happen
log.Error("failed to start global allocator loop, global allocator not found")
log.Error("failed to start global allocator loop, global allocator not found", am.logFields...)

Check warning on line 259 in pkg/tso/allocator_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/allocator_manager.go#L259

Added line #L259 was not covered by tests
return
}
globalTSOAllocator.wg.Add(1)
go globalTSOAllocator.primaryElectionLoop()
am.allocator.wg.Add(1)
go am.allocator.primaryElectionLoop(am.getGroupID())
}
Loading