Skip to content

Commit

Permalink
Merge branch 'master' into fix-test-2024-33
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Nov 13, 2024
2 parents c8ca4ec + ac133d8 commit 4c624ae
Show file tree
Hide file tree
Showing 18 changed files with 205 additions and 690 deletions.
43 changes: 18 additions & 25 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/utils/tlsutil"
"github.com/tikv/pd/client/utils/tsoutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -714,26 +713,28 @@ func (c *client) getRegionAPIClientAndContext(ctx context.Context, allowFollower

// GetTSAsync implements the TSOClient interface.
func (c *client) GetTSAsync(ctx context.Context) TSFuture {
return c.GetLocalTSAsync(ctx, globalDCLocation)
}

// GetLocalTSAsync implements the TSOClient interface.
func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFuture {
defer trace.StartRegion(ctx, "pdclient.GetLocalTSAsync").End()
defer trace.StartRegion(ctx, "pdclient.GetTSAsync").End()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetLocalTSAsync", opentracing.ChildOf(span.Context()))
span = span.Tracer().StartSpan("pdclient.GetTSAsync", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
return c.dispatchTSORequestWithRetry(ctx)
}

return c.dispatchTSORequestWithRetry(ctx, dcLocation)
// GetLocalTSAsync implements the TSOClient interface.
//
// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the
// parameters passed in, this method will default to returning the global TSO.
func (c *client) GetLocalTSAsync(ctx context.Context, _ string) TSFuture {
return c.GetTSAsync(ctx)
}

const (
dispatchRetryDelay = 50 * time.Millisecond
dispatchRetryCount = 2
)

func (c *client) dispatchTSORequestWithRetry(ctx context.Context, dcLocation string) TSFuture {
func (c *client) dispatchTSORequestWithRetry(ctx context.Context) TSFuture {
var (
retryable bool
err error
Expand All @@ -752,7 +753,7 @@ func (c *client) dispatchTSORequestWithRetry(ctx context.Context, dcLocation str
}
// Get a new request from the pool if it's nil or not from the current pool.
if req == nil || req.pool != tsoClient.tsoReqPool {
req = tsoClient.getTSORequest(ctx, dcLocation)
req = tsoClient.getTSORequest(ctx)
}
retryable, err = tsoClient.dispatchRequest(req)
if !retryable {
Expand All @@ -775,9 +776,11 @@ func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err
}

// GetLocalTS implements the TSOClient interface.
func (c *client) GetLocalTS(ctx context.Context, dcLocation string) (physical int64, logical int64, err error) {
resp := c.GetLocalTSAsync(ctx, dcLocation)
return resp.Wait()
//
// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the
// parameters passed in, this method will default to returning the global TSO.
func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logical int64, err error) {
return c.GetTS(ctx)
}

// GetMinTS implements the TSOClient interface.
Expand Down Expand Up @@ -823,7 +826,7 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e
}

minTS := resp.GetTimestamp()
return minTS.Physical, tsoutil.AddLogical(minTS.Logical, 0, minTS.SuffixBits), nil
return minTS.Physical, minTS.Logical, nil
}

func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
Expand Down Expand Up @@ -1600,13 +1603,3 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e
}
return nil
}

// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map
// For test only.
func (c *client) GetTSOAllocators() *sync.Map {
tsoClient := c.getTSOClient()
if tsoClient == nil {
return nil
}
return tsoClient.GetTSOAllocators()
}
99 changes: 25 additions & 74 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pd
import (
"context"
"crypto/tls"
"fmt"
"net/url"
"reflect"
"sort"
Expand All @@ -40,7 +41,6 @@ import (
)

const (
globalDCLocation = "global"
memberUpdateInterval = time.Minute
serviceModeUpdateInterval = 3 * time.Second
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
Expand Down Expand Up @@ -383,21 +383,17 @@ func (c *pdServiceBalancer) get() (ret ServiceClient) {
}

type updateKeyspaceIDFunc func() error
type tsoLocalServURLsUpdatedFunc func(map[string]string) error
type tsoGlobalServURLUpdatedFunc func(string) error
type tsoLeaderURLUpdatedFunc func(string) error

type tsoAllocatorEventSource interface {
// SetTSOLocalServURLsUpdatedCallback adds a callback which will be called when the local tso
// allocator leader list is updated.
SetTSOLocalServURLsUpdatedCallback(callback tsoLocalServURLsUpdatedFunc)
// SetTSOGlobalServURLUpdatedCallback adds a callback which will be called when the global tso
// allocator leader is updated.
SetTSOGlobalServURLUpdatedCallback(callback tsoGlobalServURLUpdatedFunc)
// tsoEventSource subscribes to events related to changes in the TSO leader/primary from the service discovery.
type tsoEventSource interface {
// SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader/primary is updated.
SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc)
}

var (
_ ServiceDiscovery = (*pdServiceDiscovery)(nil)
_ tsoAllocatorEventSource = (*pdServiceDiscovery)(nil)
_ ServiceDiscovery = (*pdServiceDiscovery)(nil)
_ tsoEventSource = (*pdServiceDiscovery)(nil)
)

// pdServiceDiscovery is the service discovery client of PD/API service which is quorum based
Expand Down Expand Up @@ -426,12 +422,8 @@ type pdServiceDiscovery struct {
// membersChangedCbs will be called after there is any membership change in the
// leader and followers
membersChangedCbs []func()
// tsoLocalAllocLeadersUpdatedCb will be called when the local tso allocator
// leader list is updated. The input is a map {DC Location -> Leader URL}
tsoLocalAllocLeadersUpdatedCb tsoLocalServURLsUpdatedFunc
// tsoGlobalAllocLeaderUpdatedCb will be called when the global tso allocator
// leader is updated.
tsoGlobalAllocLeaderUpdatedCb tsoGlobalServURLUpdatedFunc
// tsoLeaderUpdatedCb will be called when the TSO leader is updated.
tsoLeaderUpdatedCb tsoLeaderURLUpdatedFunc

checkMembershipCh chan struct{}

Expand Down Expand Up @@ -801,22 +793,15 @@ func (c *pdServiceDiscovery) AddServiceURLsSwitchedCallback(callbacks ...func())
c.membersChangedCbs = append(c.membersChangedCbs, callbacks...)
}

// SetTSOLocalServURLsUpdatedCallback adds a callback which will be called when the local tso
// allocator leader list is updated.
func (c *pdServiceDiscovery) SetTSOLocalServURLsUpdatedCallback(callback tsoLocalServURLsUpdatedFunc) {
c.tsoLocalAllocLeadersUpdatedCb = callback
}

// SetTSOGlobalServURLUpdatedCallback adds a callback which will be called when the global tso
// allocator leader is updated.
func (c *pdServiceDiscovery) SetTSOGlobalServURLUpdatedCallback(callback tsoGlobalServURLUpdatedFunc) {
// SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader is updated.
func (c *pdServiceDiscovery) SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc) {
url := c.getLeaderURL()
if len(url) > 0 {
if err := callback(url); err != nil {
log.Error("[tso] failed to call back when tso global service url update", zap.String("url", url), errs.ZapError(err))
log.Error("[tso] failed to call back when tso leader url update", zap.String("url", url), errs.ZapError(err))
}
}
c.tsoGlobalAllocLeaderUpdatedCb = callback
c.tsoLeaderUpdatedCb = callback
}

// getLeaderURL returns the leader URL.
Expand Down Expand Up @@ -901,19 +886,16 @@ func (c *pdServiceDiscovery) updateMember() error {

members, err := c.getMembers(c.ctx, url, updateMemberTimeout)
// Check the cluster ID.
if err == nil && members.GetHeader().GetClusterId() != c.clusterID {
err = errs.ErrClientUpdateMember.FastGenByArgs("cluster id does not match")
updatedClusterID := members.GetHeader().GetClusterId()
if err == nil && updatedClusterID != c.clusterID {
log.Warn("[pd] cluster id does not match",
zap.Uint64("updated-cluster-id", updatedClusterID),
zap.Uint64("expected-cluster-id", c.clusterID))
err = errs.ErrClientUpdateMember.FastGenByArgs(fmt.Sprintf("cluster id does not match: %d != %d", updatedClusterID, c.clusterID))
}
// Check the TSO Allocator Leader.
var errTSO error
if err == nil {
if members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 {
err = errs.ErrClientGetLeader.FastGenByArgs("leader url doesn't exist")
}
// Still need to update TsoAllocatorLeaders, even if there is no PD leader
errTSO = c.switchTSOAllocatorLeaders(members.GetTsoAllocatorLeaders())
if err == nil && (members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0) {
err = errs.ErrClientGetLeader.FastGenByArgs("leader url doesn't exist")
}

// Failed to get members
if err != nil {
log.Info("[pd] cannot update member from this url",
Expand All @@ -926,15 +908,9 @@ func (c *pdServiceDiscovery) updateMember() error {
continue
}
}

c.updateURLs(members.GetMembers())
if err := c.updateServiceClient(members.GetMembers(), members.GetLeader()); err != nil {
return err
}

// If `switchLeader` succeeds but `switchTSOAllocatorLeader` has an error,
// the error of `switchTSOAllocatorLeader` will be returned.
return errTSO
return c.updateServiceClient(members.GetMembers(), members.GetLeader())
}
return errs.ErrClientGetMember.FastGenByArgs()
}
Expand Down Expand Up @@ -1009,13 +985,12 @@ func (c *pdServiceDiscovery) switchLeader(url string) (bool, error) {
newConn, err := c.GetOrCreateGRPCConn(url)
// If gRPC connect is created successfully or leader is new, still saves.
if url != oldLeader.GetURL() || newConn != nil {
// Set PD leader and Global TSO Allocator (which is also the PD leader)
leaderClient := newPDServiceClient(url, url, newConn, true)
c.leader.Store(leaderClient)
}
// Run callbacks
if c.tsoGlobalAllocLeaderUpdatedCb != nil {
if err := c.tsoGlobalAllocLeaderUpdatedCb(url); err != nil {
if c.tsoLeaderUpdatedCb != nil {
if err := c.tsoLeaderUpdatedCb(url); err != nil {
return true, err
}
}
Expand Down Expand Up @@ -1102,30 +1077,6 @@ func (c *pdServiceDiscovery) updateServiceClient(members []*pdpb.Member, leader
return err
}

func (c *pdServiceDiscovery) switchTSOAllocatorLeaders(allocatorMap map[string]*pdpb.Member) error {
if len(allocatorMap) == 0 {
return nil
}

allocMap := make(map[string]string)
// Switch to the new one
for dcLocation, member := range allocatorMap {
if len(member.GetClientUrls()) == 0 {
continue
}
allocMap[dcLocation] = member.GetClientUrls()[0]
}

// Run the callback to reflect any possible change in the local tso allocators.
if c.tsoLocalAllocLeadersUpdatedCb != nil {
if err := c.tsoLocalAllocLeadersUpdatedCb(allocMap); err != nil {
return err
}
}

return nil
}

// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given URL.
func (c *pdServiceDiscovery) GetOrCreateGRPCConn(url string) (*grpc.ClientConn, error) {
return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, url, c.tlsCfg, c.option.gRPCDialOptions...)
Expand Down
Loading

0 comments on commit 4c624ae

Please sign in to comment.