Skip to content

Commit

Permalink
Merge branch 'master' into unsafe-recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv committed Mar 14, 2024
2 parents 1301df8 + b9ea01a commit e86e7ab
Show file tree
Hide file tree
Showing 28 changed files with 1,019 additions and 524 deletions.
25 changes: 15 additions & 10 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ type Client interface {
GetClusterID(ctx context.Context) uint64
// GetAllMembers gets the members Info from PD
GetAllMembers(ctx context.Context) ([]*pdpb.Member, error)
// GetLeaderAddr returns current leader's address. It returns "" before
// GetLeaderURL returns current leader's URL. It returns "" before
// syncing leader from server.
GetLeaderAddr() string
GetLeaderURL() string
// GetRegion gets a region and its leader Peer from PD by key.
// The region may expire after split. Caller is responsible for caching and
// taking care of region change.
Expand Down Expand Up @@ -575,7 +575,7 @@ func (c *client) setup() error {
}

// Register callbacks
c.pdSvcDiscovery.AddServingAddrSwitchedCallback(c.scheduleUpdateTokenConnection)
c.pdSvcDiscovery.AddServingURLSwitchedCallback(c.scheduleUpdateTokenConnection)

// Create dispatchers
c.createTokenDispatcher()
Expand Down Expand Up @@ -680,9 +680,9 @@ func (c *client) GetClusterID(context.Context) uint64 {
return c.pdSvcDiscovery.GetClusterID()
}

// GetLeaderAddr returns the leader address.
func (c *client) GetLeaderAddr() string {
return c.pdSvcDiscovery.GetServingAddr()
// GetLeaderURL returns the leader URL.
func (c *client) GetLeaderURL() string {
return c.pdSvcDiscovery.GetServingURL()
}

// GetServiceDiscovery returns the client-side service discovery object
Expand Down Expand Up @@ -745,7 +745,7 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
// follower pd client and the context which holds forward information.
func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, context.Context) {
serviceClient := c.pdSvcDiscovery.GetServiceClient()
if serviceClient == nil {
if serviceClient == nil || serviceClient.GetClientConn() == nil {
return nil, ctx
}
return pdpb.NewPDClient(serviceClient.GetClientConn()), serviceClient.BuildGRPCTargetContext(ctx, true)
Expand All @@ -762,7 +762,7 @@ func (c *client) getRegionAPIClientAndContext(ctx context.Context, allowFollower
}
}
serviceClient = c.pdSvcDiscovery.GetServiceClient()
if serviceClient == nil {
if serviceClient == nil || serviceClient.GetClientConn() == nil {
return nil, ctx
}
return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower)
Expand Down Expand Up @@ -1402,9 +1402,14 @@ func IsLeaderChange(err error) bool {
strings.Contains(errMsg, errs.NotServedErr)
}

const (
httpSchemePrefix = "http://"
httpsSchemePrefix = "https://"
)

func trimHTTPPrefix(str string) string {
str = strings.TrimPrefix(str, "http://")
str = strings.TrimPrefix(str, "https://")
str = strings.TrimPrefix(str, httpSchemePrefix)
str = strings.TrimPrefix(str, httpsSchemePrefix)
return str
}

Expand Down
4 changes: 2 additions & 2 deletions client/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func GetClientConn(ctx context.Context, addr string, tlsCfg *tls.Config, do ...g

// BuildForwardContext creates a context with receiver metadata information.
// It is used in client side.
func BuildForwardContext(ctx context.Context, addr string) context.Context {
md := metadata.Pairs(ForwardMetadataKey, addr)
func BuildForwardContext(ctx context.Context, url string) context.Context {
md := metadata.Pairs(ForwardMetadataKey, url)
return metadata.NewOutgoingContext(ctx, md)
}

Expand Down
12 changes: 6 additions & 6 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,13 @@ func (ci *clientInner) requestWithRetry(
return errs.ErrClientNoAvailableMember
}
for _, cli := range clients {
addr := cli.GetHTTPAddress()
statusCode, err = ci.doRequest(ctx, addr, reqInfo, headerOpts...)
url := cli.GetURL()
statusCode, err = ci.doRequest(ctx, url, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
return err
}
log.Debug("[pd] request addr failed",
zap.String("source", ci.source), zap.Bool("is-leader", cli.IsConnectedToLeader()), zap.String("addr", addr), zap.Error(err))
log.Debug("[pd] request url failed",
zap.String("source", ci.source), zap.Bool("is-leader", cli.IsConnectedToLeader()), zap.String("url", url), zap.Error(err))
}
return err
}
Expand All @@ -160,19 +160,19 @@ func noNeedRetry(statusCode int) bool {

func (ci *clientInner) doRequest(
ctx context.Context,
addr string, reqInfo *requestInfo,
url string, reqInfo *requestInfo,
headerOpts ...HeaderOption,
) (int, error) {
var (
source = ci.source
callerID = reqInfo.callerID
name = reqInfo.name
url = reqInfo.getURL(addr)
method = reqInfo.method
body = reqInfo.body
res = reqInfo.res
respHandler = reqInfo.respHandler
)
url = reqInfo.getURL(url)
logFields := []zap.Field{
zap.String("source", source),
zap.String("name", name),
Expand Down
4 changes: 2 additions & 2 deletions client/meta_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (c *client) Put(ctx context.Context, key, value []byte, opts ...OpOption) (
Lease: options.lease,
PrevKv: options.prevKv,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderURL())
cli := c.metaStorageClient()
if cli == nil {
cancel()
Expand Down Expand Up @@ -162,7 +162,7 @@ func (c *client) Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_s
Limit: options.limit,
Revision: options.revision,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderURL())
cli := c.metaStorageClient()
if cli == nil {
cancel()
Expand Down
16 changes: 8 additions & 8 deletions client/mock_pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewMockPDServiceDiscovery(urls []string, tlsCfg *tls.Config) *mockPDService
func (m *mockPDServiceDiscovery) Init() error {
m.clients = make([]ServiceClient, 0, len(m.urls))
for _, url := range m.urls {
m.clients = append(m.clients, newPDServiceClient(url, url, m.tlsCfg, nil, false))
m.clients = append(m.clients, newPDServiceClient(url, url, nil, false))
}
return nil
}
Expand All @@ -62,13 +62,13 @@ func (m *mockPDServiceDiscovery) GetKeyspaceGroupID() uint32
func (m *mockPDServiceDiscovery) GetServiceURLs() []string { return nil }
func (m *mockPDServiceDiscovery) GetServingEndpointClientConn() *grpc.ClientConn { return nil }
func (m *mockPDServiceDiscovery) GetClientConns() *sync.Map { return nil }
func (m *mockPDServiceDiscovery) GetServingAddr() string { return "" }
func (m *mockPDServiceDiscovery) GetBackupAddrs() []string { return nil }
func (m *mockPDServiceDiscovery) GetServingURL() string { return "" }
func (m *mockPDServiceDiscovery) GetBackupURLs() []string { return nil }
func (m *mockPDServiceDiscovery) GetServiceClient() ServiceClient { return nil }
func (m *mockPDServiceDiscovery) GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) {
func (m *mockPDServiceDiscovery) GetOrCreateGRPCConn(url string) (*grpc.ClientConn, error) {
return nil, nil
}
func (m *mockPDServiceDiscovery) ScheduleCheckMemberChanged() {}
func (m *mockPDServiceDiscovery) CheckMemberChanged() error { return nil }
func (m *mockPDServiceDiscovery) AddServingAddrSwitchedCallback(callbacks ...func()) {}
func (m *mockPDServiceDiscovery) AddServiceAddrsSwitchedCallback(callbacks ...func()) {}
func (m *mockPDServiceDiscovery) ScheduleCheckMemberChanged() {}
func (m *mockPDServiceDiscovery) CheckMemberChanged() error { return nil }
func (m *mockPDServiceDiscovery) AddServingURLSwitchedCallback(callbacks ...func()) {}
func (m *mockPDServiceDiscovery) AddServiceURLsSwitchedCallback(callbacks ...func()) {}
Loading

0 comments on commit e86e7ab

Please sign in to comment.