Skip to content

Commit

Permalink
Merge branch 'master' into unsafe-recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv authored Apr 3, 2024
2 parents 0acae8a + a2b0e3c commit 6eb1499
Show file tree
Hide file tree
Showing 65 changed files with 1,456 additions and 340 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/pd-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ on:
- release-5.*
- release-6.*
- release-7.*
- release-8.*
pull_request:
branches:
- master
- release-4.0
- release-5.*
- release-6.*
- release-7.*
- release-8.*
concurrency:
group: ${{ github.ref }}-${{ github.workflow }}
cancel-in-progress: true
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/tso-function-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ on:
- release-5.*
- release-6.*
- release-7.*
- release-8.*
pull_request:
branches:
- master
- release-5.*
- release-6.*
- release-7.*
- release-8.*
concurrency:
group: ${{ github.ref }}-${{ github.workflow }}
cancel-in-progress: true
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ coverage.xml
coverage
*.txt
go.work*
embedded_assets_handler.go
76 changes: 57 additions & 19 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,12 +606,22 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
log.Info("[pd] changing service mode",
zap.String("old-mode", c.serviceMode.String()),
zap.String("new-mode", newMode.String()))
c.resetTSOClientLocked(newMode)
oldMode := c.serviceMode
c.serviceMode = newMode
log.Info("[pd] service mode changed",
zap.String("old-mode", oldMode.String()),
zap.String("new-mode", newMode.String()))
}

// Reset a new TSO client.
func (c *client) resetTSOClientLocked(mode pdpb.ServiceMode) {
// Re-create a new TSO client.
var (
newTSOCli *tsoClient
newTSOSvcDiscovery ServiceDiscovery
)
switch newMode {
switch mode {
case pdpb.ServiceMode_PD_SVC_MODE:
newTSOCli = newTSOClient(c.ctx, c.option,
c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{})
Expand Down Expand Up @@ -649,11 +659,6 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
// We are switching from API service mode to PD service mode, so delete the old tso microservice discovery.
oldTSOSvcDiscovery.Close()
}
oldMode := c.serviceMode
c.serviceMode = newMode
log.Info("[pd] service mode changed",
zap.String("old-mode", oldMode.String()),
zap.String("new-mode", newMode.String()))
}

func (c *client) getTSOClient() *tsoClient {
Expand All @@ -662,6 +667,13 @@ func (c *client) getTSOClient() *tsoClient {
return c.tsoClient
}

// ResetTSOClient resets the TSO client, only for test.
func (c *client) ResetTSOClient() {
c.Lock()
defer c.Unlock()
c.resetTSOClientLocked(c.serviceMode)
}

func (c *client) getServiceMode() pdpb.ServiceMode {
c.RLock()
defer c.RUnlock()
Expand Down Expand Up @@ -779,26 +791,52 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
defer span.Finish()
}

req := c.getTSORequest(ctx, dcLocation)
if err := c.dispatchTSORequestWithRetry(req); err != nil {
req.tryDone(err)
}
return req
}

func (c *client) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest {
req := tsoReqPool.Get().(*tsoRequest)
req.requestCtx = ctx
req.clientCtx = c.ctx
tsoClient := c.getTSOClient()
// Set needed fields in the request before using it.
req.start = time.Now()
req.clientCtx = c.ctx
req.requestCtx = ctx
req.physical = 0
req.logical = 0
req.dcLocation = dcLocation
return req
}

if tsoClient == nil {
req.done <- errs.ErrClientGetTSO.FastGenByArgs("tso client is nil")
return req
}
const (
dispatchRetryDelay = 50 * time.Millisecond
dispatchRetryCount = 2
)

if err := tsoClient.dispatchRequest(ctx, dcLocation, req); err != nil {
// Wait for a while and try again
time.Sleep(50 * time.Millisecond)
if err = tsoClient.dispatchRequest(ctx, dcLocation, req); err != nil {
req.done <- err
func (c *client) dispatchTSORequestWithRetry(req *tsoRequest) error {
var (
retryable bool
err error
)
for i := 0; i < dispatchRetryCount; i++ {
// Do not delay for the first time.
if i > 0 {
time.Sleep(dispatchRetryDelay)
}
// Get the tsoClient each time, as it may be initialized or switched during the process.
tsoClient := c.getTSOClient()
if tsoClient == nil {
err = errs.ErrClientGetTSO.FastGenByArgs("tso client is nil")
continue
}
retryable, err = tsoClient.dispatchRequest(req)
if !retryable {
break
}
}
return req
return err
}

func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err error) {
Expand Down
6 changes: 6 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ func TestUpdateURLs(t *testing.T) {
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members)
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2], members[0]}), cli.GetServiceURLs())
cli.updateURLs(members[1:])
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members[2:])
re.Equal(getURLs([]*pdpb.Member{members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members[3:])
re.Equal(getURLs([]*pdpb.Member{members[3]}), cli.GetServiceURLs())
}

const testClientURL = "tmp://test.url:5255"
Expand Down
1 change: 1 addition & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var (
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
ErrClientNoAvailableMember = errors.Normalize("no available member", errors.RFCCodeText("PD:client:ErrClientNoAvailableMember"))
ErrClientNoTargetMember = errors.Normalize("no target member", errors.RFCCodeText("PD:client:ErrClientNoTargetMember"))
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/stretchr/testify v1.8.2
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.11
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4
google.golang.org/grpc v1.59.0
Expand All @@ -33,7 +34,6 @@ require (
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
10 changes: 5 additions & 5 deletions client/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,25 +124,25 @@ func getValueFromMetadata(ctx context.Context, key string, f func(context.Contex

// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr.
// Returns the old one if's already existed in the clientConns; otherwise creates a new one and returns it.
func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, addr string, tlsCfg *tls.Config, opt ...grpc.DialOption) (*grpc.ClientConn, error) {
conn, ok := clientConns.Load(addr)
func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, url string, tlsCfg *tls.Config, opt ...grpc.DialOption) (*grpc.ClientConn, error) {
conn, ok := clientConns.Load(url)
if ok {
// TODO: check the connection state.
return conn.(*grpc.ClientConn), nil
}
dCtx, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()
cc, err := GetClientConn(dCtx, addr, tlsCfg, opt...)
cc, err := GetClientConn(dCtx, url, tlsCfg, opt...)
failpoint.Inject("unreachableNetwork2", func(val failpoint.Value) {
if val, ok := val.(string); ok && val == addr {
if val, ok := val.(string); ok && val == url {
cc = nil
err = errors.Errorf("unreachable network")
}
})
if err != nil {
return nil, err
}
conn, loaded := clientConns.LoadOrStore(addr, cc)
conn, loaded := clientConns.LoadOrStore(url, cc)
if !loaded {
// Successfully stored the connection.
return cc, nil
Expand Down
19 changes: 18 additions & 1 deletion client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,23 @@ func (ci *clientInner) requestWithRetry(
if len(clients) == 0 {
return errs.ErrClientNoAvailableMember
}
skipNum := 0
for _, cli := range clients {
url := cli.GetURL()
if reqInfo.targetURL != "" && reqInfo.targetURL != url {
skipNum++
continue
}
statusCode, err = ci.doRequest(ctx, url, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
return err
}
log.Debug("[pd] request url failed",
zap.String("source", ci.source), zap.Bool("is-leader", cli.IsConnectedToLeader()), zap.String("url", url), zap.Error(err))
}
if skipNum == len(clients) {
return errs.ErrClientNoTargetMember
}
return err
}
if reqInfo.bo == nil {
Expand Down Expand Up @@ -244,6 +252,7 @@ type client struct {
callerID string
respHandler respHandleFunc
bo *retry.Backoffer
targetURL string
}

// ClientOption configures the HTTP client.
Expand Down Expand Up @@ -343,6 +352,13 @@ func (c *client) WithBackoffer(bo *retry.Backoffer) Client {
return &newClient
}

// WithTargetURL sets and returns a new client with the given target URL.
func (c *client) WithTargetURL(targetURL string) Client {
newClient := *c
newClient.targetURL = targetURL
return &newClient
}

// Header key definition constants.
const (
pdAllowFollowerHandleKey = "PD-Allow-Follower-Handle"
Expand All @@ -363,7 +379,8 @@ func (c *client) request(ctx context.Context, reqInfo *requestInfo, headerOpts .
return c.inner.requestWithRetry(ctx, reqInfo.
WithCallerID(c.callerID).
WithRespHandler(c.respHandler).
WithBackoffer(c.bo),
WithBackoffer(c.bo).
WithTargetURL(c.targetURL),
headerOpts...)
}

Expand Down
16 changes: 15 additions & 1 deletion client/http/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/retry"
"go.uber.org/atomic"
)
Expand Down Expand Up @@ -49,7 +50,7 @@ func TestPDAllowFollowerHandleHeader(t *testing.T) {
re.Equal(2, checked)
}

func TestCallerID(t *testing.T) {
func TestWithCallerID(t *testing.T) {
re := require.New(t)
checked := 0
expectedVal := atomic.NewString(defaultCallerID)
Expand Down Expand Up @@ -96,3 +97,16 @@ func TestWithBackoffer(t *testing.T) {
re.InDelta(3*time.Second, time.Since(start), float64(250*time.Millisecond))
re.ErrorIs(err, context.DeadlineExceeded)
}

func TestWithTargetURL(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := newClientWithMockServiceDiscovery("test-with-target-url", []string{"http://127.0.0.1", "http://127.0.0.2", "http://127.0.0.3"})
defer c.Close()

_, err := c.WithTargetURL("http://127.0.0.4").GetStatus(ctx)
re.ErrorIs(err, errs.ErrClientNoTargetMember)
_, err = c.WithTargetURL("http://127.0.0.2").GetStatus(ctx)
re.ErrorContains(err, "connect: connection refused")
}
20 changes: 14 additions & 6 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,12 @@ type Client interface {
DeleteOperators(context.Context) error

/* Keyspace interface */
UpdateKeyspaceSafePointVersion(ctx context.Context, keyspaceName string, keyspaceSafePointVersion *KeyspaceSafePointVersionConfig) error

// UpdateKeyspaceGCManagementType update the `gc_management_type` in keyspace meta config.
// If `gc_management_type` is `global_gc`, it means the current keyspace requires a tidb without 'keyspace-name'
// configured to run a global gc worker to calculate a global gc safe point.
// If `gc_management_type` is `keyspace_level_gc` it means the current keyspace can calculate gc safe point by its own.
UpdateKeyspaceGCManagementType(ctx context.Context, keyspaceName string, keyspaceGCManagementType *KeyspaceGCManagementTypeConfig) error
GetKeyspaceMetaByName(ctx context.Context, keyspaceName string) (*keyspacepb.KeyspaceMeta, error)

/* Client-related methods */
Expand All @@ -111,6 +116,8 @@ type Client interface {
WithRespHandler(func(resp *http.Response, res any) error) Client
// WithBackoffer sets and returns a new client with the given backoffer.
WithBackoffer(*retry.Backoffer) Client
// WithTargetURL sets and returns a new client with the given target URL.
WithTargetURL(string) Client
// Close gracefully closes the HTTP client.
Close()
}
Expand Down Expand Up @@ -467,7 +474,8 @@ func (c *client) GetStatus(ctx context.Context) (*State, error) {
WithName(getStatusName).
WithURI(Status).
WithMethod(http.MethodGet).
WithResp(&status))
WithResp(&status),
WithAllowFollowerHandle())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -921,14 +929,14 @@ func (c *client) DeleteOperators(ctx context.Context) error {
WithMethod(http.MethodDelete))
}

// UpdateKeyspaceSafePointVersion patches the keyspace config.
func (c *client) UpdateKeyspaceSafePointVersion(ctx context.Context, keyspaceName string, keyspaceSafePointVersion *KeyspaceSafePointVersionConfig) error {
keyspaceConfigPatchJSON, err := json.Marshal(keyspaceSafePointVersion)
// UpdateKeyspaceGCManagementType patches the keyspace config.
func (c *client) UpdateKeyspaceGCManagementType(ctx context.Context, keyspaceName string, keyspaceGCmanagementType *KeyspaceGCManagementTypeConfig) error {
keyspaceConfigPatchJSON, err := json.Marshal(keyspaceGCmanagementType)
if err != nil {
return errors.Trace(err)
}
return c.request(ctx, newRequestInfo().
WithName(UpdateKeyspaceSafePointVersionName).
WithName(UpdateKeyspaceGCManagementTypeName).
WithURI(GetUpdateKeyspaceConfigURL(keyspaceName)).
WithMethod(http.MethodPatch).
WithBody(keyspaceConfigPatchJSON))
Expand Down
9 changes: 8 additions & 1 deletion client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ const (
setSnapshotRecoveringMarkName = "SetSnapshotRecoveringMark"
deleteSnapshotRecoveringMarkName = "DeleteSnapshotRecoveringMark"
deleteOperators = "DeleteOperators"
UpdateKeyspaceSafePointVersionName = "UpdateKeyspaceSafePointVersion"
UpdateKeyspaceGCManagementTypeName = "UpdateKeyspaceGCManagementType"
GetKeyspaceMetaByNameName = "GetKeyspaceMetaByName"
)

Expand All @@ -91,6 +91,7 @@ type requestInfo struct {
res any
respHandler respHandleFunc
bo *retry.Backoffer
targetURL string
}

// newRequestInfo creates a new request info.
Expand Down Expand Up @@ -146,6 +147,12 @@ func (ri *requestInfo) WithBackoffer(bo *retry.Backoffer) *requestInfo {
return ri
}

// WithTargetURL sets the target URL of the request.
func (ri *requestInfo) WithTargetURL(targetURL string) *requestInfo {
ri.targetURL = targetURL
return ri
}

func (ri *requestInfo) getURL(addr string) string {
return fmt.Sprintf("%s%s", addr, ri.uri)
}
Loading

0 comments on commit 6eb1499

Please sign in to comment.