Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: support specifying target member #7909

Merged
merged 3 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ coverage.xml
coverage
*.txt
go.work*
embedded_assets_handler.go
1 change: 1 addition & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var (
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
ErrClientNoAvailableMember = errors.Normalize("no available member", errors.RFCCodeText("PD:client:ErrClientNoAvailableMember"))
ErrClientNoTargetMember = errors.Normalize("no target member", errors.RFCCodeText("PD:client:ErrClientNoTargetMember"))
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
Expand Down
10 changes: 5 additions & 5 deletions client/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,25 +124,25 @@ func getValueFromMetadata(ctx context.Context, key string, f func(context.Contex

// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr.
// Returns the old one if's already existed in the clientConns; otherwise creates a new one and returns it.
func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, addr string, tlsCfg *tls.Config, opt ...grpc.DialOption) (*grpc.ClientConn, error) {
conn, ok := clientConns.Load(addr)
func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, url string, tlsCfg *tls.Config, opt ...grpc.DialOption) (*grpc.ClientConn, error) {
conn, ok := clientConns.Load(url)
if ok {
// TODO: check the connection state.
return conn.(*grpc.ClientConn), nil
}
dCtx, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()
cc, err := GetClientConn(dCtx, addr, tlsCfg, opt...)
cc, err := GetClientConn(dCtx, url, tlsCfg, opt...)
failpoint.Inject("unreachableNetwork2", func(val failpoint.Value) {
if val, ok := val.(string); ok && val == addr {
if val, ok := val.(string); ok && val == url {
cc = nil
err = errors.Errorf("unreachable network")
}
})
if err != nil {
return nil, err
}
conn, loaded := clientConns.LoadOrStore(addr, cc)
conn, loaded := clientConns.LoadOrStore(url, cc)
if !loaded {
// Successfully stored the connection.
return cc, nil
Expand Down
19 changes: 18 additions & 1 deletion client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,23 @@
if len(clients) == 0 {
return errs.ErrClientNoAvailableMember
}
skipNum := 0
for _, cli := range clients {
url := cli.GetURL()
if reqInfo.targetURL != "" && reqInfo.targetURL != url {
skipNum++
continue

Check warning on line 137 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L136-L137

Added lines #L136 - L137 were not covered by tests
}
statusCode, err = ci.doRequest(ctx, url, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
return err
}
log.Debug("[pd] request url failed",
zap.String("source", ci.source), zap.Bool("is-leader", cli.IsConnectedToLeader()), zap.String("url", url), zap.Error(err))
}
if skipNum == len(clients) {
return errs.ErrClientNoTargetMember

Check warning on line 147 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L147

Added line #L147 was not covered by tests
}
return err
}
if reqInfo.bo == nil {
Expand Down Expand Up @@ -244,6 +252,7 @@
callerID string
respHandler respHandleFunc
bo *retry.Backoffer
targetURL string
}

// ClientOption configures the HTTP client.
Expand Down Expand Up @@ -343,6 +352,13 @@
return &newClient
}

// WithTargetURL sets and returns a new client with the given target URL.
func (c *client) WithTargetURL(targetURL string) Client {
newClient := *c
newClient.targetURL = targetURL
return &newClient

Check warning on line 359 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L357-L359

Added lines #L357 - L359 were not covered by tests
}

// Header key definition constants.
const (
pdAllowFollowerHandleKey = "PD-Allow-Follower-Handle"
Expand All @@ -363,7 +379,8 @@
return c.inner.requestWithRetry(ctx, reqInfo.
WithCallerID(c.callerID).
WithRespHandler(c.respHandler).
WithBackoffer(c.bo),
WithBackoffer(c.bo).
WithTargetURL(c.targetURL),
headerOpts...)
}

Expand Down
16 changes: 15 additions & 1 deletion client/http/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/retry"
"go.uber.org/atomic"
)
Expand Down Expand Up @@ -49,7 +50,7 @@ func TestPDAllowFollowerHandleHeader(t *testing.T) {
re.Equal(2, checked)
}

func TestCallerID(t *testing.T) {
func TestWithCallerID(t *testing.T) {
re := require.New(t)
checked := 0
expectedVal := atomic.NewString(defaultCallerID)
Expand Down Expand Up @@ -96,3 +97,16 @@ func TestWithBackoffer(t *testing.T) {
re.InDelta(3*time.Second, time.Since(start), float64(250*time.Millisecond))
re.ErrorIs(err, context.DeadlineExceeded)
}

func TestWithTargetURL(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := newClientWithMockServiceDiscovery("test-with-target-url", []string{"http://127.0.0.1", "http://127.0.0.2", "http://127.0.0.3"})
defer c.Close()

_, err := c.WithTargetURL("http://127.0.0.4").GetStatus(ctx)
re.ErrorIs(err, errs.ErrClientNoTargetMember)
_, err = c.WithTargetURL("http://127.0.0.2").GetStatus(ctx)
re.ErrorContains(err, "connect: connection refused")
}
5 changes: 4 additions & 1 deletion client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ type Client interface {
WithRespHandler(func(resp *http.Response, res any) error) Client
// WithBackoffer sets and returns a new client with the given backoffer.
WithBackoffer(*retry.Backoffer) Client
// WithTargetURL sets and returns a new client with the given target URL.
WithTargetURL(string) Client
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to make it a request-level option rather than a client one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have the request level option now.

// Close gracefully closes the HTTP client.
Close()
}
Expand Down Expand Up @@ -472,7 +474,8 @@ func (c *client) GetStatus(ctx context.Context) (*State, error) {
WithName(getStatusName).
WithURI(Status).
WithMethod(http.MethodGet).
WithResp(&status))
WithResp(&status),
WithAllowFollowerHandle())
if err != nil {
return nil, err
}
Expand Down
7 changes: 7 additions & 0 deletions client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type requestInfo struct {
res any
respHandler respHandleFunc
bo *retry.Backoffer
targetURL string
}

// newRequestInfo creates a new request info.
Expand Down Expand Up @@ -146,6 +147,12 @@ func (ri *requestInfo) WithBackoffer(bo *retry.Backoffer) *requestInfo {
return ri
}

// WithTargetURL sets the target URL of the request.
func (ri *requestInfo) WithTargetURL(targetURL string) *requestInfo {
ri.targetURL = targetURL
return ri
}

func (ri *requestInfo) getURL(addr string) string {
return fmt.Sprintf("%s%s", addr, ri.uri)
}
2 changes: 1 addition & 1 deletion client/mock_pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
func (m *mockPDServiceDiscovery) Init() error {
m.clients = make([]ServiceClient, 0, len(m.urls))
for _, url := range m.urls {
m.clients = append(m.clients, newPDServiceClient(url, url, nil, false))
m.clients = append(m.clients, newPDServiceClient(url, m.urls[0], nil, false))

Check warning on line 44 in client/mock_pd_service_discovery.go

View check run for this annotation

Codecov / codecov/patch

client/mock_pd_service_discovery.go#L44

Added line #L44 was not covered by tests
}
return nil
}
Expand Down
Loading