Skip to content

Commit

Permalink
Merge branch 'master' into lp-fix-7992
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Apr 3, 2024
2 parents 00cfdaf + 6fe44d7 commit 6da6e25
Show file tree
Hide file tree
Showing 19 changed files with 345 additions and 80 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ coverage.xml
coverage
*.txt
go.work*
embedded_assets_handler.go
1 change: 1 addition & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var (
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
ErrClientNoAvailableMember = errors.Normalize("no available member", errors.RFCCodeText("PD:client:ErrClientNoAvailableMember"))
ErrClientNoTargetMember = errors.Normalize("no target member", errors.RFCCodeText("PD:client:ErrClientNoTargetMember"))
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/stretchr/testify v1.8.2
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.11
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4
google.golang.org/grpc v1.59.0
Expand All @@ -33,7 +34,6 @@ require (
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
10 changes: 5 additions & 5 deletions client/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,25 +124,25 @@ func getValueFromMetadata(ctx context.Context, key string, f func(context.Contex

// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr.
// Returns the old one if's already existed in the clientConns; otherwise creates a new one and returns it.
func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, addr string, tlsCfg *tls.Config, opt ...grpc.DialOption) (*grpc.ClientConn, error) {
conn, ok := clientConns.Load(addr)
func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, url string, tlsCfg *tls.Config, opt ...grpc.DialOption) (*grpc.ClientConn, error) {
conn, ok := clientConns.Load(url)
if ok {
// TODO: check the connection state.
return conn.(*grpc.ClientConn), nil
}
dCtx, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()
cc, err := GetClientConn(dCtx, addr, tlsCfg, opt...)
cc, err := GetClientConn(dCtx, url, tlsCfg, opt...)
failpoint.Inject("unreachableNetwork2", func(val failpoint.Value) {
if val, ok := val.(string); ok && val == addr {
if val, ok := val.(string); ok && val == url {
cc = nil
err = errors.Errorf("unreachable network")
}
})
if err != nil {
return nil, err
}
conn, loaded := clientConns.LoadOrStore(addr, cc)
conn, loaded := clientConns.LoadOrStore(url, cc)
if !loaded {
// Successfully stored the connection.
return cc, nil
Expand Down
19 changes: 18 additions & 1 deletion client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,23 @@ func (ci *clientInner) requestWithRetry(
if len(clients) == 0 {
return errs.ErrClientNoAvailableMember
}
skipNum := 0
for _, cli := range clients {
url := cli.GetURL()
if reqInfo.targetURL != "" && reqInfo.targetURL != url {
skipNum++
continue
}
statusCode, err = ci.doRequest(ctx, url, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
return err
}
log.Debug("[pd] request url failed",
zap.String("source", ci.source), zap.Bool("is-leader", cli.IsConnectedToLeader()), zap.String("url", url), zap.Error(err))
}
if skipNum == len(clients) {
return errs.ErrClientNoTargetMember
}
return err
}
if reqInfo.bo == nil {
Expand Down Expand Up @@ -244,6 +252,7 @@ type client struct {
callerID string
respHandler respHandleFunc
bo *retry.Backoffer
targetURL string
}

// ClientOption configures the HTTP client.
Expand Down Expand Up @@ -343,6 +352,13 @@ func (c *client) WithBackoffer(bo *retry.Backoffer) Client {
return &newClient
}

// WithTargetURL sets and returns a new client with the given target URL.
func (c *client) WithTargetURL(targetURL string) Client {
newClient := *c
newClient.targetURL = targetURL
return &newClient
}

// Header key definition constants.
const (
pdAllowFollowerHandleKey = "PD-Allow-Follower-Handle"
Expand All @@ -363,7 +379,8 @@ func (c *client) request(ctx context.Context, reqInfo *requestInfo, headerOpts .
return c.inner.requestWithRetry(ctx, reqInfo.
WithCallerID(c.callerID).
WithRespHandler(c.respHandler).
WithBackoffer(c.bo),
WithBackoffer(c.bo).
WithTargetURL(c.targetURL),
headerOpts...)
}

Expand Down
16 changes: 15 additions & 1 deletion client/http/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/retry"
"go.uber.org/atomic"
)
Expand Down Expand Up @@ -49,7 +50,7 @@ func TestPDAllowFollowerHandleHeader(t *testing.T) {
re.Equal(2, checked)
}

func TestCallerID(t *testing.T) {
func TestWithCallerID(t *testing.T) {
re := require.New(t)
checked := 0
expectedVal := atomic.NewString(defaultCallerID)
Expand Down Expand Up @@ -96,3 +97,16 @@ func TestWithBackoffer(t *testing.T) {
re.InDelta(3*time.Second, time.Since(start), float64(250*time.Millisecond))
re.ErrorIs(err, context.DeadlineExceeded)
}

func TestWithTargetURL(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := newClientWithMockServiceDiscovery("test-with-target-url", []string{"http://127.0.0.1", "http://127.0.0.2", "http://127.0.0.3"})
defer c.Close()

_, err := c.WithTargetURL("http://127.0.0.4").GetStatus(ctx)
re.ErrorIs(err, errs.ErrClientNoTargetMember)
_, err = c.WithTargetURL("http://127.0.0.2").GetStatus(ctx)
re.ErrorContains(err, "connect: connection refused")
}
20 changes: 14 additions & 6 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,12 @@ type Client interface {
DeleteOperators(context.Context) error

/* Keyspace interface */
UpdateKeyspaceSafePointVersion(ctx context.Context, keyspaceName string, keyspaceSafePointVersion *KeyspaceSafePointVersionConfig) error

// UpdateKeyspaceGCManagementType update the `gc_management_type` in keyspace meta config.
// If `gc_management_type` is `global_gc`, it means the current keyspace requires a tidb without 'keyspace-name'
// configured to run a global gc worker to calculate a global gc safe point.
// If `gc_management_type` is `keyspace_level_gc` it means the current keyspace can calculate gc safe point by its own.
UpdateKeyspaceGCManagementType(ctx context.Context, keyspaceName string, keyspaceGCManagementType *KeyspaceGCManagementTypeConfig) error
GetKeyspaceMetaByName(ctx context.Context, keyspaceName string) (*keyspacepb.KeyspaceMeta, error)

/* Client-related methods */
Expand All @@ -111,6 +116,8 @@ type Client interface {
WithRespHandler(func(resp *http.Response, res any) error) Client
// WithBackoffer sets and returns a new client with the given backoffer.
WithBackoffer(*retry.Backoffer) Client
// WithTargetURL sets and returns a new client with the given target URL.
WithTargetURL(string) Client
// Close gracefully closes the HTTP client.
Close()
}
Expand Down Expand Up @@ -467,7 +474,8 @@ func (c *client) GetStatus(ctx context.Context) (*State, error) {
WithName(getStatusName).
WithURI(Status).
WithMethod(http.MethodGet).
WithResp(&status))
WithResp(&status),
WithAllowFollowerHandle())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -921,14 +929,14 @@ func (c *client) DeleteOperators(ctx context.Context) error {
WithMethod(http.MethodDelete))
}

// UpdateKeyspaceSafePointVersion patches the keyspace config.
func (c *client) UpdateKeyspaceSafePointVersion(ctx context.Context, keyspaceName string, keyspaceSafePointVersion *KeyspaceSafePointVersionConfig) error {
keyspaceConfigPatchJSON, err := json.Marshal(keyspaceSafePointVersion)
// UpdateKeyspaceGCManagementType patches the keyspace config.
func (c *client) UpdateKeyspaceGCManagementType(ctx context.Context, keyspaceName string, keyspaceGCmanagementType *KeyspaceGCManagementTypeConfig) error {
keyspaceConfigPatchJSON, err := json.Marshal(keyspaceGCmanagementType)
if err != nil {
return errors.Trace(err)
}
return c.request(ctx, newRequestInfo().
WithName(UpdateKeyspaceSafePointVersionName).
WithName(UpdateKeyspaceGCManagementTypeName).
WithURI(GetUpdateKeyspaceConfigURL(keyspaceName)).
WithMethod(http.MethodPatch).
WithBody(keyspaceConfigPatchJSON))
Expand Down
9 changes: 8 additions & 1 deletion client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ const (
setSnapshotRecoveringMarkName = "SetSnapshotRecoveringMark"
deleteSnapshotRecoveringMarkName = "DeleteSnapshotRecoveringMark"
deleteOperators = "DeleteOperators"
UpdateKeyspaceSafePointVersionName = "UpdateKeyspaceSafePointVersion"
UpdateKeyspaceGCManagementTypeName = "UpdateKeyspaceGCManagementType"
GetKeyspaceMetaByNameName = "GetKeyspaceMetaByName"
)

Expand All @@ -91,6 +91,7 @@ type requestInfo struct {
res any
respHandler respHandleFunc
bo *retry.Backoffer
targetURL string
}

// newRequestInfo creates a new request info.
Expand Down Expand Up @@ -146,6 +147,12 @@ func (ri *requestInfo) WithBackoffer(bo *retry.Backoffer) *requestInfo {
return ri
}

// WithTargetURL sets the target URL of the request.
func (ri *requestInfo) WithTargetURL(targetURL string) *requestInfo {
ri.targetURL = targetURL
return ri
}

func (ri *requestInfo) getURL(addr string) string {
return fmt.Sprintf("%s%s", addr, ri.uri)
}
15 changes: 9 additions & 6 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,14 +624,17 @@ type MicroServiceMember struct {
StartTimestamp int64 `json:"start-timestamp"`
}

// KeyspaceSafePointVersion represents parameters needed to modify the safe point version.
type KeyspaceSafePointVersion struct {
SafePointVersion string `json:"safe_point_version,omitempty"`
// KeyspaceGCManagementType represents parameters needed to modify the gc management type.
// If `gc_management_type` is `global_gc`, it means the current keyspace requires a tidb without 'keyspace-name'
// configured to run a global gc worker to calculate a global gc safe point.
// If `gc_management_type` is `keyspace_level_gc` it means the current keyspace can calculate gc safe point by its own.
type KeyspaceGCManagementType struct {
GCManagementType string `json:"gc_management_type,omitempty"`
}

// KeyspaceSafePointVersionConfig represents parameters needed to modify target keyspace's configs.
type KeyspaceSafePointVersionConfig struct {
Config KeyspaceSafePointVersion `json:"config"`
// KeyspaceGCManagementTypeConfig represents parameters needed to modify target keyspace's configs.
type KeyspaceGCManagementTypeConfig struct {
Config KeyspaceGCManagementType `json:"config"`
}

// tempKeyspaceMeta is the keyspace meta struct that returned from the http interface.
Expand Down
2 changes: 1 addition & 1 deletion client/mock_pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewMockPDServiceDiscovery(urls []string, tlsCfg *tls.Config) *mockPDService
func (m *mockPDServiceDiscovery) Init() error {
m.clients = make([]ServiceClient, 0, len(m.urls))
for _, url := range m.urls {
m.clients = append(m.clients, newPDServiceClient(url, url, nil, false))
m.clients = append(m.clients, newPDServiceClient(url, m.urls[0], nil, false))
}
return nil
}
Expand Down
21 changes: 16 additions & 5 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"go.uber.org/multierr"
)

const maxRecordErrorCount = 20

// Backoffer is a backoff policy for retrying operations.
type Backoffer struct {
// base defines the initial time interval to wait before each retry.
Expand All @@ -34,6 +37,7 @@ type Backoffer struct {
// By default, all errors are retryable.
retryableChecker func(err error) bool

attempt int
next time.Duration
currentTotal time.Duration
}
Expand All @@ -45,11 +49,16 @@ func (bo *Backoffer) Exec(
) error {
defer bo.resetBackoff()
var (
err error
after *time.Timer
allErrors error
after *time.Timer
)
for {
err = fn()
err := fn()
bo.attempt++
if bo.attempt < maxRecordErrorCount {
// multierr.Append will ignore nil error.
allErrors = multierr.Append(allErrors, err)
}
if !bo.isRetryable(err) {
break
}
Expand All @@ -62,7 +71,7 @@ func (bo *Backoffer) Exec(
select {
case <-ctx.Done():
after.Stop()
return errors.Trace(ctx.Err())
return multierr.Append(allErrors, errors.Trace(ctx.Err()))
case <-after.C:
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
Expand All @@ -77,7 +86,7 @@ func (bo *Backoffer) Exec(
}
}
}
return err
return allErrors
}

// InitialBackoffer make the initial state for retrying.
Expand All @@ -102,6 +111,7 @@ func InitialBackoffer(base, max, total time.Duration) *Backoffer {
},
next: base,
currentTotal: 0,
attempt: 0,
}
}

Expand Down Expand Up @@ -141,6 +151,7 @@ func (bo *Backoffer) exponentialInterval() time.Duration {
func (bo *Backoffer) resetBackoff() {
bo.next = bo.base
bo.currentTotal = 0
bo.attempt = 0
}

// Only used for test.
Expand Down
1 change: 1 addition & 0 deletions client/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func TestBackoffer(t *testing.T) {
return expectedErr
})
re.InDelta(total, time.Since(start), float64(250*time.Millisecond))
re.ErrorContains(err, "test; test; test; test")
re.ErrorIs(err, expectedErr)
re.Equal(4, execCount)
re.True(isBackofferReset(bo))
Expand Down
Loading

0 comments on commit 6da6e25

Please sign in to comment.