diff --git a/client/client.go b/client/client.go index 81bf809ef4d..e2ceb41cfd2 100644 --- a/client/client.go +++ b/client/client.go @@ -76,9 +76,9 @@ type Client interface { GetClusterID(ctx context.Context) uint64 // GetAllMembers gets the members Info from PD GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) - // GetLeaderAddr returns current leader's address. It returns "" before + // GetLeaderURL returns current leader's URL. It returns "" before // syncing leader from server. - GetLeaderAddr() string + GetLeaderURL() string // GetRegion gets a region and its leader Peer from PD by key. // The region may expire after split. Caller is responsible for caching and // taking care of region change. @@ -575,7 +575,7 @@ func (c *client) setup() error { } // Register callbacks - c.pdSvcDiscovery.AddServingAddrSwitchedCallback(c.scheduleUpdateTokenConnection) + c.pdSvcDiscovery.AddServingURLSwitchedCallback(c.scheduleUpdateTokenConnection) // Create dispatchers c.createTokenDispatcher() @@ -680,9 +680,9 @@ func (c *client) GetClusterID(context.Context) uint64 { return c.pdSvcDiscovery.GetClusterID() } -// GetLeaderAddr returns the leader address. -func (c *client) GetLeaderAddr() string { - return c.pdSvcDiscovery.GetServingAddr() +// GetLeaderURL returns the leader URL. +func (c *client) GetLeaderURL() string { + return c.pdSvcDiscovery.GetServingURL() } // GetServiceDiscovery returns the client-side service discovery object @@ -1402,9 +1402,14 @@ func IsLeaderChange(err error) bool { strings.Contains(errMsg, errs.NotServedErr) } +const ( + httpSchemePrefix = "http://" + httpsSchemePrefix = "https://" +) + func trimHTTPPrefix(str string) string { - str = strings.TrimPrefix(str, "http://") - str = strings.TrimPrefix(str, "https://") + str = strings.TrimPrefix(str, httpSchemePrefix) + str = strings.TrimPrefix(str, httpsSchemePrefix) return str } diff --git a/client/grpcutil/grpcutil.go b/client/grpcutil/grpcutil.go index 070cdf7822f..fb9e84f0ca1 100644 --- a/client/grpcutil/grpcutil.go +++ b/client/grpcutil/grpcutil.go @@ -84,8 +84,8 @@ func GetClientConn(ctx context.Context, addr string, tlsCfg *tls.Config, do ...g // BuildForwardContext creates a context with receiver metadata information. // It is used in client side. -func BuildForwardContext(ctx context.Context, addr string) context.Context { - md := metadata.Pairs(ForwardMetadataKey, addr) +func BuildForwardContext(ctx context.Context, url string) context.Context { + md := metadata.Pairs(ForwardMetadataKey, url) return metadata.NewOutgoingContext(ctx, md) } diff --git a/client/http/client.go b/client/http/client.go index 5ac00a8a43b..18802346a4c 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -130,13 +130,13 @@ func (ci *clientInner) requestWithRetry( return errs.ErrClientNoAvailableMember } for _, cli := range clients { - addr := cli.GetHTTPAddress() - statusCode, err = ci.doRequest(ctx, addr, reqInfo, headerOpts...) + url := cli.GetURL() + statusCode, err = ci.doRequest(ctx, url, reqInfo, headerOpts...) if err == nil || noNeedRetry(statusCode) { return err } - log.Debug("[pd] request addr failed", - zap.String("source", ci.source), zap.Bool("is-leader", cli.IsConnectedToLeader()), zap.String("addr", addr), zap.Error(err)) + log.Debug("[pd] request url failed", + zap.String("source", ci.source), zap.Bool("is-leader", cli.IsConnectedToLeader()), zap.String("url", url), zap.Error(err)) } return err } @@ -160,19 +160,19 @@ func noNeedRetry(statusCode int) bool { func (ci *clientInner) doRequest( ctx context.Context, - addr string, reqInfo *requestInfo, + url string, reqInfo *requestInfo, headerOpts ...HeaderOption, ) (int, error) { var ( source = ci.source callerID = reqInfo.callerID name = reqInfo.name - url = reqInfo.getURL(addr) method = reqInfo.method body = reqInfo.body res = reqInfo.res respHandler = reqInfo.respHandler ) + url = reqInfo.getURL(url) logFields := []zap.Field{ zap.String("source", source), zap.String("name", name), diff --git a/client/http/interface.go b/client/http/interface.go index f112cf8362a..6d1bb413145 100644 --- a/client/http/interface.go +++ b/client/http/interface.go @@ -58,6 +58,7 @@ type Client interface { GetClusterVersion(context.Context) (string, error) GetCluster(context.Context) (*metapb.Cluster, error) GetClusterStatus(context.Context) (*ClusterState, error) + GetStatus(context.Context) (*State, error) GetReplicateConfig(context.Context) (map[string]any, error) /* Scheduler-related interfaces */ GetSchedulers(context.Context) ([]string, error) @@ -459,6 +460,20 @@ func (c *client) GetClusterStatus(ctx context.Context) (*ClusterState, error) { return clusterStatus, nil } +// GetStatus gets the status of PD. +func (c *client) GetStatus(ctx context.Context) (*State, error) { + var status *State + err := c.request(ctx, newRequestInfo(). + WithName(getStatusName). + WithURI(Status). + WithMethod(http.MethodGet). + WithResp(&status)) + if err != nil { + return nil, err + } + return status, nil +} + // GetReplicateConfig gets the replication configurations. func (c *client) GetReplicateConfig(ctx context.Context) (map[string]any, error) { var config map[string]any diff --git a/client/http/request_info.go b/client/http/request_info.go index e5f1ee4c3f3..d63cd534a5e 100644 --- a/client/http/request_info.go +++ b/client/http/request_info.go @@ -46,6 +46,7 @@ const ( getClusterVersionName = "GetClusterVersion" getClusterName = "GetCluster" getClusterStatusName = "GetClusterStatus" + getStatusName = "GetStatus" getReplicateConfigName = "GetReplicateConfig" getSchedulersName = "GetSchedulers" createSchedulerName = "CreateScheduler" diff --git a/client/http/types.go b/client/http/types.go index 7d2cbcfaaa8..0cd9abb3843 100644 --- a/client/http/types.go +++ b/client/http/types.go @@ -34,6 +34,15 @@ type ClusterState struct { ReplicationStatus string `json:"replication_status"` } +// State is the status of PD server. +// NOTE: This type sync with https://github.com/tikv/pd/blob/1d77b25656bc18e1f5aa82337d4ab62a34b10087/pkg/versioninfo/versioninfo.go#L29 +type State struct { + BuildTS string `json:"build_ts"` + Version string `json:"version"` + GitHash string `json:"git_hash"` + StartTimestamp int64 `json:"start_timestamp"` +} + // KeyRange defines a range of keys in bytes. type KeyRange struct { startKey []byte diff --git a/client/meta_storage_client.go b/client/meta_storage_client.go index 8b158af2212..fe7e8a33e93 100644 --- a/client/meta_storage_client.go +++ b/client/meta_storage_client.go @@ -124,7 +124,7 @@ func (c *client) Put(ctx context.Context, key, value []byte, opts ...OpOption) ( Lease: options.lease, PrevKv: options.prevKv, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) + ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderURL()) cli := c.metaStorageClient() if cli == nil { cancel() @@ -162,7 +162,7 @@ func (c *client) Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_s Limit: options.limit, Revision: options.revision, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) + ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderURL()) cli := c.metaStorageClient() if cli == nil { cancel() diff --git a/client/mock_pd_service_discovery.go b/client/mock_pd_service_discovery.go index 10f7f080106..b33c8405af9 100644 --- a/client/mock_pd_service_discovery.go +++ b/client/mock_pd_service_discovery.go @@ -41,7 +41,7 @@ func NewMockPDServiceDiscovery(urls []string, tlsCfg *tls.Config) *mockPDService func (m *mockPDServiceDiscovery) Init() error { m.clients = make([]ServiceClient, 0, len(m.urls)) for _, url := range m.urls { - m.clients = append(m.clients, newPDServiceClient(url, url, m.tlsCfg, nil, false)) + m.clients = append(m.clients, newPDServiceClient(url, url, nil, false)) } return nil } @@ -62,13 +62,13 @@ func (m *mockPDServiceDiscovery) GetKeyspaceGroupID() uint32 func (m *mockPDServiceDiscovery) GetServiceURLs() []string { return nil } func (m *mockPDServiceDiscovery) GetServingEndpointClientConn() *grpc.ClientConn { return nil } func (m *mockPDServiceDiscovery) GetClientConns() *sync.Map { return nil } -func (m *mockPDServiceDiscovery) GetServingAddr() string { return "" } -func (m *mockPDServiceDiscovery) GetBackupAddrs() []string { return nil } +func (m *mockPDServiceDiscovery) GetServingURL() string { return "" } +func (m *mockPDServiceDiscovery) GetBackupURLs() []string { return nil } func (m *mockPDServiceDiscovery) GetServiceClient() ServiceClient { return nil } -func (m *mockPDServiceDiscovery) GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) { +func (m *mockPDServiceDiscovery) GetOrCreateGRPCConn(url string) (*grpc.ClientConn, error) { return nil, nil } -func (m *mockPDServiceDiscovery) ScheduleCheckMemberChanged() {} -func (m *mockPDServiceDiscovery) CheckMemberChanged() error { return nil } -func (m *mockPDServiceDiscovery) AddServingAddrSwitchedCallback(callbacks ...func()) {} -func (m *mockPDServiceDiscovery) AddServiceAddrsSwitchedCallback(callbacks ...func()) {} +func (m *mockPDServiceDiscovery) ScheduleCheckMemberChanged() {} +func (m *mockPDServiceDiscovery) CheckMemberChanged() error { return nil } +func (m *mockPDServiceDiscovery) AddServingURLSwitchedCallback(callbacks ...func()) {} +func (m *mockPDServiceDiscovery) AddServiceURLsSwitchedCallback(callbacks ...func()) {} diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index 5d9105e7681..bf627d76ac2 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -17,7 +17,7 @@ package pd import ( "context" "crypto/tls" - "fmt" + "net/url" "reflect" "sort" "strings" @@ -87,15 +87,15 @@ type ServiceDiscovery interface { // which is the leader in a quorum-based cluster or the primary in a primary/secondary // configured cluster. GetServingEndpointClientConn() *grpc.ClientConn - // GetClientConns returns the mapping {addr -> a gRPC connection} + // GetClientConns returns the mapping {URL -> a gRPC connection} GetClientConns() *sync.Map - // GetServingAddr returns the serving endpoint which is the leader in a quorum-based cluster + // GetServingURL returns the serving endpoint which is the leader in a quorum-based cluster // or the primary in a primary/secondary configured cluster. - GetServingAddr() string - // GetBackupAddrs gets the addresses of the current reachable backup service + GetServingURL() string + // GetBackupURLs gets the URLs of the current reachable backup service // endpoints. Backup service endpoints are followers in a quorum-based cluster or // secondaries in a primary/secondary configured cluster. - GetBackupAddrs() []string + GetBackupURLs() []string // GetServiceClient tries to get the leader/primary ServiceClient. // If the leader ServiceClient meets network problem, // it returns a follower/secondary ServiceClient which can forward the request to leader. @@ -103,8 +103,8 @@ type ServiceDiscovery interface { // GetAllServiceClients tries to get all ServiceClient. // If the leader is not nil, it will put the leader service client first in the slice. GetAllServiceClients() []ServiceClient - // GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr - GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) + // GetOrCreateGRPCConn returns the corresponding grpc client connection of the given url. + GetOrCreateGRPCConn(url string) (*grpc.ClientConn, error) // ScheduleCheckMemberChanged is used to trigger a check to see if there is any membership change // among the leader/followers in a quorum-based cluster or among the primary/secondaries in a // primary/secondary configured cluster. @@ -112,22 +112,20 @@ type ServiceDiscovery interface { // CheckMemberChanged immediately check if there is any membership change among the leader/followers // in a quorum-based cluster or among the primary/secondaries in a primary/secondary configured cluster. CheckMemberChanged() error - // AddServingAddrSwitchedCallback adds callbacks which will be called when the leader + // AddServingURLSwitchedCallback adds callbacks which will be called when the leader // in a quorum-based cluster or the primary in a primary/secondary configured cluster // is switched. - AddServingAddrSwitchedCallback(callbacks ...func()) - // AddServiceAddrsSwitchedCallback adds callbacks which will be called when any leader/follower + AddServingURLSwitchedCallback(callbacks ...func()) + // AddServiceURLsSwitchedCallback adds callbacks which will be called when any leader/follower // in a quorum-based cluster or any primary/secondary in a primary/secondary configured cluster // is changed. - AddServiceAddrsSwitchedCallback(callbacks ...func()) + AddServiceURLsSwitchedCallback(callbacks ...func()) } // ServiceClient is an interface that defines a set of operations for a raw PD gRPC client to specific PD server. type ServiceClient interface { - // GetAddress returns the address information of the PD server. - GetAddress() string - // GetHTTPAddress returns the address with HTTP scheme of the PD server. - GetHTTPAddress() string + // GetURL returns the client url of the PD/etcd server. + GetURL() string // GetClientConn returns the gRPC connection of the service client GetClientConn() *grpc.ClientConn // BuildGRPCTargetContext builds a context object with a gRPC context. @@ -149,43 +147,23 @@ var ( ) type pdServiceClient struct { - addr string - httpAddress string - conn *grpc.ClientConn - isLeader bool - leaderAddr string + url string + conn *grpc.ClientConn + isLeader bool + leaderURL string networkFailure atomic.Bool } -func newPDServiceClient(addr, leaderAddr string, tlsCfg *tls.Config, conn *grpc.ClientConn, isLeader bool) ServiceClient { - var httpAddress string - if tlsCfg == nil { - if strings.HasPrefix(addr, httpsScheme) { - addr = strings.TrimPrefix(addr, httpsScheme) - httpAddress = fmt.Sprintf("%s%s", httpScheme, addr) - } else if strings.HasPrefix(addr, httpScheme) { - httpAddress = addr - } else { - httpAddress = fmt.Sprintf("%s://%s", httpScheme, addr) - } - } else { - if strings.HasPrefix(addr, httpsScheme) { - httpAddress = addr - } else if strings.HasPrefix(addr, httpScheme) { - addr = strings.TrimPrefix(addr, httpScheme) - httpAddress = fmt.Sprintf("%s%s", httpsScheme, addr) - } else { - httpAddress = fmt.Sprintf("%s://%s", httpsScheme, addr) - } - } - +// NOTE: In the current implementation, the URL passed in is bound to have a scheme, +// because it is processed in `newPDServiceDiscovery`, and the url returned by etcd member owns the sheme. +// When testing, the URL is also bound to have a scheme. +func newPDServiceClient(url, leaderURL string, conn *grpc.ClientConn, isLeader bool) ServiceClient { cli := &pdServiceClient{ - addr: addr, - httpAddress: httpAddress, - conn: conn, - isLeader: isLeader, - leaderAddr: leaderAddr, + url: url, + conn: conn, + isLeader: isLeader, + leaderURL: leaderURL, } if conn == nil { cli.networkFailure.Store(true) @@ -193,20 +171,12 @@ func newPDServiceClient(addr, leaderAddr string, tlsCfg *tls.Config, conn *grpc. return cli } -// GetAddress implements ServiceClient. -func (c *pdServiceClient) GetAddress() string { - if c == nil { - return "" - } - return c.addr -} - -// GetHTTPAddress implements ServiceClient. -func (c *pdServiceClient) GetHTTPAddress() string { +// GetURL implements ServiceClient. +func (c *pdServiceClient) GetURL() string { if c == nil { return "" } - return c.httpAddress + return c.url } // BuildGRPCTargetContext implements ServiceClient. @@ -215,7 +185,7 @@ func (c *pdServiceClient) BuildGRPCTargetContext(ctx context.Context, toLeader b return ctx } if toLeader { - return grpcutil.BuildForwardContext(ctx, c.leaderAddr) + return grpcutil.BuildForwardContext(ctx, c.leaderURL) } return grpcutil.BuildFollowerHandleContext(ctx) } @@ -243,7 +213,7 @@ func (c *pdServiceClient) checkNetworkAvailable(ctx context.Context) { healthCli := healthpb.NewHealthClient(c.conn) resp, err := healthCli.Check(ctx, &healthpb.HealthCheckRequest{Service: ""}) failpoint.Inject("unreachableNetwork1", func(val failpoint.Value) { - if val, ok := val.(string); (ok && val == c.GetAddress()) || !ok { + if val, ok := val.(string); (ok && val == c.GetURL()) || !ok { resp = nil err = status.New(codes.Unavailable, "unavailable").Err() } @@ -412,16 +382,16 @@ func (c *pdServiceBalancer) get() (ret ServiceClient) { } type updateKeyspaceIDFunc func() error -type tsoLocalServAddrsUpdatedFunc func(map[string]string) error -type tsoGlobalServAddrUpdatedFunc func(string) error +type tsoLocalServURLsUpdatedFunc func(map[string]string) error +type tsoGlobalServURLUpdatedFunc func(string) error type tsoAllocatorEventSource interface { - // SetTSOLocalServAddrsUpdatedCallback adds a callback which will be called when the local tso + // SetTSOLocalServURLsUpdatedCallback adds a callback which will be called when the local tso // allocator leader list is updated. - SetTSOLocalServAddrsUpdatedCallback(callback tsoLocalServAddrsUpdatedFunc) - // SetTSOGlobalServAddrUpdatedCallback adds a callback which will be called when the global tso + SetTSOLocalServURLsUpdatedCallback(callback tsoLocalServURLsUpdatedFunc) + // SetTSOGlobalServURLUpdatedCallback adds a callback which will be called when the global tso // allocator leader is updated. - SetTSOGlobalServAddrUpdatedCallback(callback tsoGlobalServAddrUpdatedFunc) + SetTSOGlobalServURLUpdatedCallback(callback tsoGlobalServURLUpdatedFunc) } var ( @@ -442,10 +412,10 @@ type pdServiceDiscovery struct { all atomic.Value // Store as []pdServiceClient apiCandidateNodes [apiKindCount]*pdServiceBalancer // PD follower URLs. Only for tso. - followerAddresses atomic.Value // Store as []string + followerURLs atomic.Value // Store as []string clusterID uint64 - // addr -> a gRPC connection + // url -> a gRPC connection clientConns sync.Map // Store as map[string]*grpc.ClientConn // serviceModeUpdateCb will be called when the service mode gets updated @@ -456,11 +426,11 @@ type pdServiceDiscovery struct { // leader and followers membersChangedCbs []func() // tsoLocalAllocLeadersUpdatedCb will be called when the local tso allocator - // leader list is updated. The input is a map {DC Location -> Leader Addr} - tsoLocalAllocLeadersUpdatedCb tsoLocalServAddrsUpdatedFunc + // leader list is updated. The input is a map {DC Location -> Leader URL} + tsoLocalAllocLeadersUpdatedCb tsoLocalServURLsUpdatedFunc // tsoGlobalAllocLeaderUpdatedCb will be called when the global tso allocator // leader is updated. - tsoGlobalAllocLeaderUpdatedCb tsoGlobalServAddrUpdatedFunc + tsoGlobalAllocLeaderUpdatedCb tsoGlobalServURLUpdatedFunc checkMembershipCh chan struct{} @@ -506,7 +476,7 @@ func newPDServiceDiscovery( tlsCfg: tlsCfg, option: option, } - urls = addrsToUrls(urls) + urls = addrsToURLs(urls, tlsCfg) pdsd.urls.Store(urls) return pdsd } @@ -701,17 +671,17 @@ func (c *pdServiceDiscovery) discoverMicroservice(svcType serviceType) (urls []s case apiService: urls = c.GetServiceURLs() case tsoService: - leaderAddr := c.getLeaderAddr() - if len(leaderAddr) > 0 { - clusterInfo, err := c.getClusterInfo(c.ctx, leaderAddr, c.option.timeout) + leaderURL := c.getLeaderURL() + if len(leaderURL) > 0 { + clusterInfo, err := c.getClusterInfo(c.ctx, leaderURL, c.option.timeout) if err != nil { log.Error("[pd] failed to get cluster info", - zap.String("leader-addr", leaderAddr), errs.ZapError(err)) + zap.String("leader-url", leaderURL), errs.ZapError(err)) return nil, err } urls = clusterInfo.TsoUrls } else { - err = errors.New("failed to get leader addr") + err = errors.New("failed to get leader url") return nil, err } default: @@ -731,26 +701,26 @@ func (c *pdServiceDiscovery) GetServiceURLs() []string { // which is the leader in a quorum-based cluster or the primary in a primary/secondary // configured cluster. func (c *pdServiceDiscovery) GetServingEndpointClientConn() *grpc.ClientConn { - if cc, ok := c.clientConns.Load(c.getLeaderAddr()); ok { + if cc, ok := c.clientConns.Load(c.getLeaderURL()); ok { return cc.(*grpc.ClientConn) } return nil } -// GetClientConns returns the mapping {addr -> a gRPC connection} +// GetClientConns returns the mapping {URL -> a gRPC connection} func (c *pdServiceDiscovery) GetClientConns() *sync.Map { return &c.clientConns } -// GetServingAddr returns the leader address -func (c *pdServiceDiscovery) GetServingAddr() string { - return c.getLeaderAddr() +// GetServingURL returns the leader url +func (c *pdServiceDiscovery) GetServingURL() string { + return c.getLeaderURL() } -// GetBackupAddrs gets the addresses of the current reachable followers +// GetBackupURLs gets the URLs of the current reachable followers // in a quorum-based cluster. Used for tso currently. -func (c *pdServiceDiscovery) GetBackupAddrs() []string { - return c.getFollowerAddrs() +func (c *pdServiceDiscovery) GetBackupURLs() []string { + return c.getFollowerURLs() } // getLeaderServiceClient returns the leader ServiceClient. @@ -776,7 +746,7 @@ func (c *pdServiceDiscovery) GetServiceClient() ServiceClient { leaderClient := c.getLeaderServiceClient() if c.option.enableForwarding && !leaderClient.Available() { if followerClient := c.getServiceClientByKind(forwardAPIKind); followerClient != nil { - log.Debug("[pd] use follower client", zap.String("addr", followerClient.GetAddress())) + log.Debug("[pd] use follower client", zap.String("url", followerClient.GetURL())) return followerClient } } @@ -811,46 +781,46 @@ func (c *pdServiceDiscovery) CheckMemberChanged() error { return c.updateMember() } -// AddServingAddrSwitchedCallback adds callbacks which will be called +// AddServingURLSwitchedCallback adds callbacks which will be called // when the leader is switched. -func (c *pdServiceDiscovery) AddServingAddrSwitchedCallback(callbacks ...func()) { +func (c *pdServiceDiscovery) AddServingURLSwitchedCallback(callbacks ...func()) { c.leaderSwitchedCbs = append(c.leaderSwitchedCbs, callbacks...) } -// AddServiceAddrsSwitchedCallback adds callbacks which will be called when +// AddServiceURLsSwitchedCallback adds callbacks which will be called when // any leader/follower is changed. -func (c *pdServiceDiscovery) AddServiceAddrsSwitchedCallback(callbacks ...func()) { +func (c *pdServiceDiscovery) AddServiceURLsSwitchedCallback(callbacks ...func()) { c.membersChangedCbs = append(c.membersChangedCbs, callbacks...) } -// SetTSOLocalServAddrsUpdatedCallback adds a callback which will be called when the local tso +// SetTSOLocalServURLsUpdatedCallback adds a callback which will be called when the local tso // allocator leader list is updated. -func (c *pdServiceDiscovery) SetTSOLocalServAddrsUpdatedCallback(callback tsoLocalServAddrsUpdatedFunc) { +func (c *pdServiceDiscovery) SetTSOLocalServURLsUpdatedCallback(callback tsoLocalServURLsUpdatedFunc) { c.tsoLocalAllocLeadersUpdatedCb = callback } -// SetTSOGlobalServAddrUpdatedCallback adds a callback which will be called when the global tso +// SetTSOGlobalServURLUpdatedCallback adds a callback which will be called when the global tso // allocator leader is updated. -func (c *pdServiceDiscovery) SetTSOGlobalServAddrUpdatedCallback(callback tsoGlobalServAddrUpdatedFunc) { - addr := c.getLeaderAddr() - if len(addr) > 0 { - callback(addr) +func (c *pdServiceDiscovery) SetTSOGlobalServURLUpdatedCallback(callback tsoGlobalServURLUpdatedFunc) { + url := c.getLeaderURL() + if len(url) > 0 { + callback(url) } c.tsoGlobalAllocLeaderUpdatedCb = callback } -// getLeaderAddr returns the leader address. -func (c *pdServiceDiscovery) getLeaderAddr() string { - return c.getLeaderServiceClient().GetAddress() +// getLeaderURL returns the leader URL. +func (c *pdServiceDiscovery) getLeaderURL() string { + return c.getLeaderServiceClient().GetURL() } -// getFollowerAddrs returns the follower address. -func (c *pdServiceDiscovery) getFollowerAddrs() []string { - followerAddrs := c.followerAddresses.Load() - if followerAddrs == nil { +// getFollowerURLs returns the follower URLs. +func (c *pdServiceDiscovery) getFollowerURLs() []string { + followerURLs := c.followerURLs.Load() + if followerURLs == nil { return []string{} } - return followerAddrs.([]string) + return followerURLs.([]string) } func (c *pdServiceDiscovery) initClusterID() error { @@ -884,12 +854,12 @@ func (c *pdServiceDiscovery) initClusterID() error { } func (c *pdServiceDiscovery) checkServiceModeChanged() error { - leaderAddr := c.getLeaderAddr() - if len(leaderAddr) == 0 { + leaderURL := c.getLeaderURL() + if len(leaderURL) == 0 { return errors.New("no leader found") } - clusterInfo, err := c.getClusterInfo(c.ctx, leaderAddr, c.option.timeout) + clusterInfo, err := c.getClusterInfo(c.ctx, leaderURL, c.option.timeout) if err != nil { if strings.Contains(err.Error(), "Unimplemented") { // If the method is not supported, we set it to pd mode. @@ -928,7 +898,7 @@ func (c *pdServiceDiscovery) updateMember() error { var errTSO error if err == nil { if members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 { - err = errs.ErrClientGetLeader.FastGenByArgs("leader address doesn't exist") + err = errs.ErrClientGetLeader.FastGenByArgs("leader url doesn't exist") } // Still need to update TsoAllocatorLeaders, even if there is no PD leader errTSO = c.switchTSOAllocatorLeaders(members.GetTsoAllocatorLeaders()) @@ -936,8 +906,8 @@ func (c *pdServiceDiscovery) updateMember() error { // Failed to get members if err != nil { - log.Info("[pd] cannot update member from this address", - zap.String("address", url), + log.Info("[pd] cannot update member from this url", + zap.String("url", url), errs.ZapError(err)) select { case <-c.ctx.Done(): @@ -1020,68 +990,67 @@ func (c *pdServiceDiscovery) updateURLs(members []*pdpb.Member) { log.Info("[pd] update member urls", zap.Strings("old-urls", oldURLs), zap.Strings("new-urls", urls)) } -func (c *pdServiceDiscovery) switchLeader(addrs []string) (bool, error) { - // FIXME: How to safely compare leader urls? For now, only allows one client url. - addr := addrs[0] +func (c *pdServiceDiscovery) switchLeader(url string) (bool, error) { oldLeader := c.getLeaderServiceClient() - if addr == oldLeader.GetAddress() && oldLeader.GetClientConn() != nil { + if url == oldLeader.GetURL() && oldLeader.GetClientConn() != nil { return false, nil } - newConn, err := c.GetOrCreateGRPCConn(addr) + newConn, err := c.GetOrCreateGRPCConn(url) // If gRPC connect is created successfully or leader is new, still saves. - if addr != oldLeader.GetAddress() || newConn != nil { + if url != oldLeader.GetURL() || newConn != nil { // Set PD leader and Global TSO Allocator (which is also the PD leader) - leaderClient := newPDServiceClient(addr, addr, c.tlsCfg, newConn, true) + leaderClient := newPDServiceClient(url, url, newConn, true) c.leader.Store(leaderClient) } // Run callbacks if c.tsoGlobalAllocLeaderUpdatedCb != nil { - if err := c.tsoGlobalAllocLeaderUpdatedCb(addr); err != nil { + if err := c.tsoGlobalAllocLeaderUpdatedCb(url); err != nil { return true, err } } for _, cb := range c.leaderSwitchedCbs { cb() } - log.Info("[pd] switch leader", zap.String("new-leader", addr), zap.String("old-leader", oldLeader.GetAddress())) + log.Info("[pd] switch leader", zap.String("new-leader", url), zap.String("old-leader", oldLeader.GetURL())) return true, err } -func (c *pdServiceDiscovery) updateFollowers(members []*pdpb.Member, leader *pdpb.Member) (changed bool) { +func (c *pdServiceDiscovery) updateFollowers(members []*pdpb.Member, leaderID uint64, leaderURL string) (changed bool) { followers := make(map[string]*pdServiceClient) c.followers.Range(func(key, value any) bool { followers[key.(string)] = value.(*pdServiceClient) return true }) - var followerAddrs []string + var followerURLs []string for _, member := range members { - if member.GetMemberId() != leader.GetMemberId() { + if member.GetMemberId() != leaderID { if len(member.GetClientUrls()) > 0 { - followerAddrs = append(followerAddrs, member.GetClientUrls()...) + // Now we don't apply ServiceClient for TSO Follower Proxy, so just keep the all URLs. + followerURLs = append(followerURLs, member.GetClientUrls()...) // FIXME: How to safely compare urls(also for leader)? For now, only allows one client url. - addr := member.GetClientUrls()[0] - if client, ok := c.followers.Load(addr); ok { + url := pickMatchedURL(member.GetClientUrls(), c.tlsCfg) + if client, ok := c.followers.Load(url); ok { if client.(*pdServiceClient).GetClientConn() == nil { - conn, err := c.GetOrCreateGRPCConn(addr) + conn, err := c.GetOrCreateGRPCConn(url) if err != nil || conn == nil { - log.Warn("[pd] failed to connect follower", zap.String("follower", addr), errs.ZapError(err)) + log.Warn("[pd] failed to connect follower", zap.String("follower", url), errs.ZapError(err)) continue } - follower := newPDServiceClient(addr, leader.GetClientUrls()[0], c.tlsCfg, conn, false) - c.followers.Store(addr, follower) + follower := newPDServiceClient(url, leaderURL, conn, false) + c.followers.Store(url, follower) changed = true } - delete(followers, addr) + delete(followers, url) } else { changed = true - conn, err := c.GetOrCreateGRPCConn(addr) - follower := newPDServiceClient(addr, leader.GetClientUrls()[0], c.tlsCfg, conn, false) + conn, err := c.GetOrCreateGRPCConn(url) + follower := newPDServiceClient(url, leaderURL, conn, false) if err != nil || conn == nil { - log.Warn("[pd] failed to connect follower", zap.String("follower", addr), errs.ZapError(err)) + log.Warn("[pd] failed to connect follower", zap.String("follower", url), errs.ZapError(err)) } - c.followers.LoadOrStore(addr, follower) + c.followers.LoadOrStore(url, follower) } } } @@ -1092,13 +1061,15 @@ func (c *pdServiceDiscovery) updateFollowers(members []*pdpb.Member, leader *pdp c.followers.Delete(key) } } - c.followerAddresses.Store(followerAddrs) + c.followerURLs.Store(followerURLs) return } func (c *pdServiceDiscovery) updateServiceClient(members []*pdpb.Member, leader *pdpb.Member) error { - leaderChanged, err := c.switchLeader(leader.GetClientUrls()) - followerChanged := c.updateFollowers(members, leader) + // FIXME: How to safely compare leader urls? For now, only allows one client url. + leaderURL := pickMatchedURL(leader.GetClientUrls(), c.tlsCfg) + leaderChanged, err := c.switchLeader(leaderURL) + followerChanged := c.updateFollowers(members, leader.GetMemberId(), leaderURL) // don't need to recreate balancer if no changess. if !followerChanged && !leaderChanged { return err @@ -1145,20 +1116,54 @@ func (c *pdServiceDiscovery) switchTSOAllocatorLeaders(allocatorMap map[string]* return nil } -// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr -func (c *pdServiceDiscovery) GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) { - return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, addr, c.tlsCfg, c.option.gRPCDialOptions...) +// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given URL. +func (c *pdServiceDiscovery) GetOrCreateGRPCConn(url string) (*grpc.ClientConn, error) { + return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, url, c.tlsCfg, c.option.gRPCDialOptions...) } -func addrsToUrls(addrs []string) []string { +func addrsToURLs(addrs []string, tlsCfg *tls.Config) []string { // Add default schema "http://" to addrs. urls := make([]string, 0, len(addrs)) for _, addr := range addrs { - if strings.Contains(addr, "://") { - urls = append(urls, addr) - } else { - urls = append(urls, "http://"+addr) - } + urls = append(urls, modifyURLScheme(addr, tlsCfg)) } return urls } + +func modifyURLScheme(uStr string, tlsCfg *tls.Config) string { + u, err := url.Parse(uStr) + if err != nil { + if tlsCfg != nil { + return httpsSchemePrefix + uStr + } + return httpSchemePrefix + uStr + } + if tlsCfg != nil { + u.Scheme = httpsScheme + } else { + u.Scheme = httpScheme + } + return u.String() +} + +// pickMatchedURL picks the matched URL based on the TLS config. +// Note: please make sure the URLs are valid. +func pickMatchedURL(urls []string, tlsCfg *tls.Config) string { + for _, uStr := range urls { + u, err := url.Parse(uStr) + if err != nil { + continue + } + if tlsCfg != nil && u.Scheme == httpsScheme { + return uStr + } + if tlsCfg == nil && u.Scheme == httpScheme { + return uStr + } + } + ret := modifyURLScheme(urls[0], tlsCfg) + log.Warn("[pd] no matched url found", zap.Strings("urls", urls), + zap.Bool("tls-enabled", tlsCfg != nil), + zap.String("attempted-url", ret)) + return ret +} diff --git a/client/pd_service_discovery_test.go b/client/pd_service_discovery_test.go index 226d407b56b..2373fc4c304 100644 --- a/client/pd_service_discovery_test.go +++ b/client/pd_service_discovery_test.go @@ -140,10 +140,16 @@ func (suite *serviceClientTestSuite) SetupSuite() { leaderConn, err1 := grpc.Dial(suite.leaderServer.addr, grpc.WithTransportCredentials(insecure.NewCredentials())) followerConn, err2 := grpc.Dial(suite.followerServer.addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err1 == nil && err2 == nil { - suite.followerClient = newPDServiceClient(suite.followerServer.addr, suite.leaderServer.addr, nil, followerConn, false) - suite.leaderClient = newPDServiceClient(suite.leaderServer.addr, suite.leaderServer.addr, nil, leaderConn, true) + suite.followerClient = newPDServiceClient( + modifyURLScheme(suite.followerServer.addr, nil), + modifyURLScheme(suite.leaderServer.addr, nil), + followerConn, false) + suite.leaderClient = newPDServiceClient( + modifyURLScheme(suite.leaderServer.addr, nil), + modifyURLScheme(suite.leaderServer.addr, nil), + leaderConn, true) suite.followerServer.server.leaderConn = suite.leaderClient.GetClientConn() - suite.followerServer.server.leaderAddr = suite.leaderClient.GetAddress() + suite.followerServer.server.leaderAddr = suite.leaderClient.GetURL() return } time.Sleep(50 * time.Millisecond) @@ -166,16 +172,14 @@ func (suite *serviceClientTestSuite) TearDownSuite() { func (suite *serviceClientTestSuite) TestServiceClient() { re := suite.Require() - leaderAddress := suite.leaderServer.addr - followerAddress := suite.followerServer.addr + leaderAddress := modifyURLScheme(suite.leaderServer.addr, nil) + followerAddress := modifyURLScheme(suite.followerServer.addr, nil) follower := suite.followerClient leader := suite.leaderClient - re.Equal(follower.GetAddress(), followerAddress) - re.Equal(leader.GetAddress(), leaderAddress) - re.Equal(follower.GetHTTPAddress(), "http://"+followerAddress) - re.Equal(leader.GetHTTPAddress(), "http://"+leaderAddress) + re.Equal(follower.GetURL(), followerAddress) + re.Equal(leader.GetURL(), leaderAddress) re.True(follower.Available()) re.True(leader.Available()) @@ -301,18 +305,48 @@ func (suite *serviceClientTestSuite) TestServiceClientBalancer() { re.Equal(int32(5), suite.followerServer.server.getForwardCount()) } -func TestHTTPScheme(t *testing.T) { +func TestServiceClientScheme(t *testing.T) { re := require.New(t) - cli := newPDServiceClient("127.0.0.1:2379", "127.0.0.1:2379", nil, nil, false) - re.Equal("http://127.0.0.1:2379", cli.GetHTTPAddress()) - cli = newPDServiceClient("https://127.0.0.1:2379", "127.0.0.1:2379", nil, nil, false) - re.Equal("http://127.0.0.1:2379", cli.GetHTTPAddress()) - cli = newPDServiceClient("http://127.0.0.1:2379", "127.0.0.1:2379", nil, nil, false) - re.Equal("http://127.0.0.1:2379", cli.GetHTTPAddress()) - cli = newPDServiceClient("127.0.0.1:2379", "127.0.0.1:2379", &tls.Config{}, nil, false) - re.Equal("https://127.0.0.1:2379", cli.GetHTTPAddress()) - cli = newPDServiceClient("https://127.0.0.1:2379", "127.0.0.1:2379", &tls.Config{}, nil, false) - re.Equal("https://127.0.0.1:2379", cli.GetHTTPAddress()) - cli = newPDServiceClient("http://127.0.0.1:2379", "127.0.0.1:2379", &tls.Config{}, nil, false) - re.Equal("https://127.0.0.1:2379", cli.GetHTTPAddress()) + cli := newPDServiceClient(modifyURLScheme("127.0.0.1:2379", nil), modifyURLScheme("127.0.0.1:2379", nil), nil, false) + re.Equal("http://127.0.0.1:2379", cli.GetURL()) + cli = newPDServiceClient(modifyURLScheme("https://127.0.0.1:2379", nil), modifyURLScheme("127.0.0.1:2379", nil), nil, false) + re.Equal("http://127.0.0.1:2379", cli.GetURL()) + cli = newPDServiceClient(modifyURLScheme("http://127.0.0.1:2379", nil), modifyURLScheme("127.0.0.1:2379", nil), nil, false) + re.Equal("http://127.0.0.1:2379", cli.GetURL()) + cli = newPDServiceClient(modifyURLScheme("127.0.0.1:2379", &tls.Config{}), modifyURLScheme("127.0.0.1:2379", &tls.Config{}), nil, false) + re.Equal("https://127.0.0.1:2379", cli.GetURL()) + cli = newPDServiceClient(modifyURLScheme("https://127.0.0.1:2379", &tls.Config{}), modifyURLScheme("127.0.0.1:2379", &tls.Config{}), nil, false) + re.Equal("https://127.0.0.1:2379", cli.GetURL()) + cli = newPDServiceClient(modifyURLScheme("http://127.0.0.1:2379", &tls.Config{}), modifyURLScheme("127.0.0.1:2379", &tls.Config{}), nil, false) + re.Equal("https://127.0.0.1:2379", cli.GetURL()) +} + +func TestSchemeFunction(t *testing.T) { + re := require.New(t) + tlsCfg := &tls.Config{} + re.Equal("https://127.0.0.1:2379", modifyURLScheme("https://127.0.0.1:2379", tlsCfg)) + re.Equal("https://127.0.0.1:2379", modifyURLScheme("http://127.0.0.1:2379", tlsCfg)) + re.Equal("https://127.0.0.1:2379", modifyURLScheme("127.0.0.1:2379", tlsCfg)) + re.Equal("http://127.0.0.1:2379", modifyURLScheme("https://127.0.0.1:2379", nil)) + re.Equal("http://127.0.0.1:2379", modifyURLScheme("http://127.0.0.1:2379", nil)) + re.Equal("http://127.0.0.1:2379", modifyURLScheme("127.0.0.1:2379", nil)) + + urls := []string{ + "http://127.0.0.1:2379", + "https://127.0.0.1:2379", + } + re.Equal("https://127.0.0.1:2379", pickMatchedURL(urls, tlsCfg)) + urls = []string{ + "http://127.0.0.1:2379", + } + re.Equal("https://127.0.0.1:2379", pickMatchedURL(urls, tlsCfg)) + urls = []string{ + "http://127.0.0.1:2379", + "https://127.0.0.1:2379", + } + re.Equal("http://127.0.0.1:2379", pickMatchedURL(urls, nil)) + urls = []string{ + "https://127.0.0.1:2379", + } + re.Equal("http://127.0.0.1:2379", pickMatchedURL(urls, nil)) } diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index 433d17ceeee..872b241cfe7 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -74,7 +74,7 @@ func WithRUStats(op *GetResourceGroupOp) { // resourceManagerClient gets the ResourceManager client of current PD leader. func (c *client) resourceManagerClient() (rmpb.ResourceManagerClient, error) { - cc, err := c.pdSvcDiscovery.GetOrCreateGRPCConn(c.GetLeaderAddr()) + cc, err := c.pdSvcDiscovery.GetOrCreateGRPCConn(c.GetLeaderURL()) if err != nil { return nil, err } diff --git a/client/tso_client.go b/client/tso_client.go index 465db1dbd5f..158d84e043a 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -74,9 +74,9 @@ type tsoClient struct { tsoStreamBuilderFactory // tsoAllocators defines the mapping {dc-location -> TSO allocator leader URL} tsoAllocators sync.Map // Store as map[string]string - // tsoAllocServingAddrSwitchedCallback will be called when any global/local + // tsoAllocServingURLSwitchedCallback will be called when any global/local // tso allocator leader is switched. - tsoAllocServingAddrSwitchedCallback []func() + tsoAllocServingURLSwitchedCallback []func() // tsoDispatcher is used to dispatch different TSO requests to // the corresponding dc-location TSO channel. @@ -109,9 +109,9 @@ func newTSOClient( } eventSrc := svcDiscovery.(tsoAllocatorEventSource) - eventSrc.SetTSOLocalServAddrsUpdatedCallback(c.updateTSOLocalServAddrs) - eventSrc.SetTSOGlobalServAddrUpdatedCallback(c.updateTSOGlobalServAddr) - c.svcDiscovery.AddServiceAddrsSwitchedCallback(c.scheduleUpdateTSOConnectionCtxs) + eventSrc.SetTSOLocalServURLsUpdatedCallback(c.updateTSOLocalServURLs) + eventSrc.SetTSOGlobalServURLUpdatedCallback(c.updateTSOGlobalServURL) + c.svcDiscovery.AddServiceURLsSwitchedCallback(c.scheduleUpdateTSOConnectionCtxs) return c } @@ -155,8 +155,8 @@ func (c *tsoClient) GetTSOAllocators() *sync.Map { return &c.tsoAllocators } -// GetTSOAllocatorServingAddrByDCLocation returns the tso allocator of the given dcLocation -func (c *tsoClient) GetTSOAllocatorServingAddrByDCLocation(dcLocation string) (string, bool) { +// GetTSOAllocatorServingURLByDCLocation returns the tso allocator of the given dcLocation +func (c *tsoClient) GetTSOAllocatorServingURLByDCLocation(dcLocation string) (string, bool) { url, exist := c.tsoAllocators.Load(dcLocation) if !exist { return "", false @@ -179,13 +179,13 @@ func (c *tsoClient) GetTSOAllocatorClientConnByDCLocation(dcLocation string) (*g return cc.(*grpc.ClientConn), url.(string) } -// AddTSOAllocatorServingAddrSwitchedCallback adds callbacks which will be called +// AddTSOAllocatorServingURLSwitchedCallback adds callbacks which will be called // when any global/local tso allocator service endpoint is switched. -func (c *tsoClient) AddTSOAllocatorServingAddrSwitchedCallback(callbacks ...func()) { - c.tsoAllocServingAddrSwitchedCallback = append(c.tsoAllocServingAddrSwitchedCallback, callbacks...) +func (c *tsoClient) AddTSOAllocatorServingURLSwitchedCallback(callbacks ...func()) { + c.tsoAllocServingURLSwitchedCallback = append(c.tsoAllocServingURLSwitchedCallback, callbacks...) } -func (c *tsoClient) updateTSOLocalServAddrs(allocatorMap map[string]string) error { +func (c *tsoClient) updateTSOLocalServURLs(allocatorMap map[string]string) error { if len(allocatorMap) == 0 { return nil } @@ -193,31 +193,31 @@ func (c *tsoClient) updateTSOLocalServAddrs(allocatorMap map[string]string) erro updated := false // Switch to the new one - for dcLocation, addr := range allocatorMap { - if len(addr) == 0 { + for dcLocation, url := range allocatorMap { + if len(url) == 0 { continue } - oldAddr, exist := c.GetTSOAllocatorServingAddrByDCLocation(dcLocation) - if exist && addr == oldAddr { + oldURL, exist := c.GetTSOAllocatorServingURLByDCLocation(dcLocation) + if exist && url == oldURL { continue } updated = true - if _, err := c.svcDiscovery.GetOrCreateGRPCConn(addr); err != nil { - log.Warn("[tso] failed to connect dc tso allocator serving address", + if _, err := c.svcDiscovery.GetOrCreateGRPCConn(url); err != nil { + log.Warn("[tso] failed to connect dc tso allocator serving url", zap.String("dc-location", dcLocation), - zap.String("serving-address", addr), + zap.String("serving-url", url), errs.ZapError(err)) return err } - c.tsoAllocators.Store(dcLocation, addr) - log.Info("[tso] switch dc tso local allocator serving address", + c.tsoAllocators.Store(dcLocation, url) + log.Info("[tso] switch dc tso local allocator serving url", zap.String("dc-location", dcLocation), - zap.String("new-address", addr), - zap.String("old-address", oldAddr)) + zap.String("new-url", url), + zap.String("old-url", oldURL)) } // Garbage collection of the old TSO allocator primaries - c.gcAllocatorServingAddr(allocatorMap) + c.gcAllocatorServingURL(allocatorMap) if updated { c.scheduleCheckTSODispatcher() @@ -226,16 +226,16 @@ func (c *tsoClient) updateTSOLocalServAddrs(allocatorMap map[string]string) erro return nil } -func (c *tsoClient) updateTSOGlobalServAddr(addr string) error { - c.tsoAllocators.Store(globalDCLocation, addr) - log.Info("[tso] switch dc tso global allocator serving address", +func (c *tsoClient) updateTSOGlobalServURL(url string) error { + c.tsoAllocators.Store(globalDCLocation, url) + log.Info("[tso] switch dc tso global allocator serving url", zap.String("dc-location", globalDCLocation), - zap.String("new-address", addr)) + zap.String("new-url", url)) c.scheduleCheckTSODispatcher() return nil } -func (c *tsoClient) gcAllocatorServingAddr(curAllocatorMap map[string]string) { +func (c *tsoClient) gcAllocatorServingURL(curAllocatorMap map[string]string) { // Clean up the old TSO allocators c.tsoAllocators.Range(func(dcLocationKey, _ any) bool { dcLocation := dcLocationKey.(string) @@ -255,24 +255,24 @@ func (c *tsoClient) gcAllocatorServingAddr(curAllocatorMap map[string]string) { // backup service endpoints randomly. Backup service endpoints are followers in a // quorum-based cluster or secondaries in a primary/secondary configured cluster. func (c *tsoClient) backupClientConn() (*grpc.ClientConn, string) { - addrs := c.svcDiscovery.GetBackupAddrs() - if len(addrs) < 1 { + urls := c.svcDiscovery.GetBackupURLs() + if len(urls) < 1 { return nil, "" } var ( cc *grpc.ClientConn err error ) - for i := 0; i < len(addrs); i++ { - addr := addrs[rand.Intn(len(addrs))] - if cc, err = c.svcDiscovery.GetOrCreateGRPCConn(addr); err != nil { + for i := 0; i < len(urls); i++ { + url := urls[rand.Intn(len(urls))] + if cc, err = c.svcDiscovery.GetOrCreateGRPCConn(url); err != nil { continue } healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout) resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) healthCancel() if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { - return cc, addr + return cc, url } } return nil, "" diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 3159a77d135..defe7de2afd 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -250,12 +250,12 @@ func (c *tsoClient) tsoDispatcherCheckLoop() { func (c *tsoClient) checkAllocator( dispatcherCtx context.Context, forwardCancel context.CancelFunc, - dc, forwardedHostTrim, addrTrim, url string, + dc, forwardedHostTrim, addr, url string, updateAndClear func(newAddr string, connectionCtx *tsoConnectionContext)) { defer func() { // cancel the forward stream forwardCancel() - requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(0) + requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(0) }() cc, u := c.GetTSOAllocatorClientConnByDCLocation(dc) var healthCli healthpb.HealthClient @@ -343,12 +343,12 @@ func (c *tsoClient) handleDispatcher( dc string, tbc *tsoBatchController) { var ( - err error - streamAddr string - stream tsoStream - streamCtx context.Context - cancel context.CancelFunc - // addr -> connectionContext + err error + streamURL string + stream tsoStream + streamCtx context.Context + cancel context.CancelFunc + // url -> connectionContext connectionCtxs sync.Map ) defer func() { @@ -448,7 +448,7 @@ tsoBatchLoop: for { connectionCtx := c.chooseStream(&connectionCtxs) if connectionCtx != nil { - streamAddr, stream, streamCtx, cancel = connectionCtx.streamAddr, connectionCtx.stream, connectionCtx.ctx, connectionCtx.cancel + streamURL, stream, streamCtx, cancel = connectionCtx.streamURL, connectionCtx.stream, connectionCtx.ctx, connectionCtx.cancel } // Check stream and retry if necessary. if stream == nil { @@ -475,9 +475,9 @@ tsoBatchLoop: } select { case <-streamCtx.Done(): - log.Info("[tso] tso stream is canceled", zap.String("dc", dc), zap.String("stream-addr", streamAddr)) + log.Info("[tso] tso stream is canceled", zap.String("dc", dc), zap.String("stream-url", streamURL)) // Set `stream` to nil and remove this stream from the `connectionCtxs` due to being canceled. - connectionCtxs.Delete(streamAddr) + connectionCtxs.Delete(streamURL) cancel() stream = nil continue @@ -510,10 +510,10 @@ tsoBatchLoop: c.svcDiscovery.ScheduleCheckMemberChanged() log.Error("[tso] getTS error after processing requests", zap.String("dc-location", dc), - zap.String("stream-addr", streamAddr), + zap.String("stream-url", streamURL), zap.Error(errs.ErrClientGetTSO.FastGenByArgs(err.Error()))) // Set `stream` to nil and remove this stream from the `connectionCtxs` due to error. - connectionCtxs.Delete(streamAddr) + connectionCtxs.Delete(streamURL) cancel() stream = nil // Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP. @@ -557,7 +557,7 @@ func (c *tsoClient) chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoCo } type tsoConnectionContext struct { - streamAddr string + streamURL string // Current stream to send gRPC requests, pdpb.PD_TsoClient for a leader/follower in the PD cluster, // or tsopb.TSO_TsoClient for a primary/secondary in the TSO cluster stream tsoStream @@ -594,16 +594,16 @@ func (c *tsoClient) tryConnectToTSO( url string cc *grpc.ClientConn ) - updateAndClear := func(newAddr string, connectionCtx *tsoConnectionContext) { - if cc, loaded := connectionCtxs.LoadOrStore(newAddr, connectionCtx); loaded { + updateAndClear := func(newURL string, connectionCtx *tsoConnectionContext) { + if cc, loaded := connectionCtxs.LoadOrStore(newURL, connectionCtx); loaded { // If the previous connection still exists, we should close it first. cc.(*tsoConnectionContext).cancel() - connectionCtxs.Store(newAddr, connectionCtx) + connectionCtxs.Store(newURL, connectionCtx) } - connectionCtxs.Range(func(addr, cc any) bool { - if addr.(string) != newAddr { + connectionCtxs.Range(func(url, cc any) bool { + if url.(string) != newURL { cc.(*tsoConnectionContext).cancel() - connectionCtxs.Delete(addr) + connectionCtxs.Delete(url) } return true }) @@ -650,10 +650,10 @@ func (c *tsoClient) tryConnectToTSO( if networkErrNum == maxRetryTimes { // encounter the network error - backupClientConn, addr := c.backupClientConn() + backupClientConn, backupURL := c.backupClientConn() if backupClientConn != nil { - log.Info("[tso] fall back to use follower to forward tso stream", zap.String("dc", dc), zap.String("addr", addr)) - forwardedHost, ok := c.GetTSOAllocatorServingAddrByDCLocation(dc) + log.Info("[tso] fall back to use follower to forward tso stream", zap.String("dc", dc), zap.String("follower-url", backupURL)) + forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc) if !ok { return errors.Errorf("cannot find the allocator leader in %s", dc) } @@ -664,11 +664,11 @@ func (c *tsoClient) tryConnectToTSO( stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.timeout) if err == nil { forwardedHostTrim := trimHTTPPrefix(forwardedHost) - addrTrim := trimHTTPPrefix(addr) + addr := trimHTTPPrefix(backupURL) // the goroutine is used to check the network and change back to the original stream - go c.checkAllocator(dispatcherCtx, cancel, dc, forwardedHostTrim, addrTrim, url, updateAndClear) - requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1) - updateAndClear(addr, &tsoConnectionContext{addr, stream, cctx, cancel}) + go c.checkAllocator(dispatcherCtx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear) + requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1) + updateAndClear(backupURL, &tsoConnectionContext{backupURL, stream, cctx, cancel}) return nil } cancel() @@ -707,8 +707,8 @@ func (c *tsoClient) getAllTSOStreamBuilders() map[string]tsoStreamBuilder { // a TSO proxy to reduce the pressure of the main serving service endpoint. func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map) error { tsoStreamBuilders := c.getAllTSOStreamBuilders() - leaderAddr := c.svcDiscovery.GetServingAddr() - forwardedHost, ok := c.GetTSOAllocatorServingAddrByDCLocation(dc) + leaderAddr := c.svcDiscovery.GetServingURL() + forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc) if !ok { return errors.Errorf("cannot find the allocator leader in %s", dc) } @@ -779,7 +779,7 @@ func (c *tsoClient) processRequests( // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits) curTSOInfo := &tsoInfo{ - tsoServer: stream.getServerAddr(), + tsoServer: stream.getServerURL(), reqKeyspaceGroupID: reqKeyspaceGroupID, respKeyspaceGroupID: respKeyspaceGroupID, respReceivedAt: time.Now(), diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index 03638a1161c..f6c46346d5d 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -58,45 +58,45 @@ var _ tsoAllocatorEventSource = (*tsoServiceDiscovery)(nil) type keyspaceGroupSvcDiscovery struct { sync.RWMutex group *tsopb.KeyspaceGroup - // primaryAddr is the primary serving address - primaryAddr string - // secondaryAddrs are TSO secondary serving addresses - secondaryAddrs []string - // addrs are the primary/secondary serving addresses - addrs []string + // primaryURL is the primary serving URL + primaryURL string + // secondaryURLs are TSO secondary serving URL + secondaryURLs []string + // urls are the primary/secondary serving URL + urls []string } func (k *keyspaceGroupSvcDiscovery) update( keyspaceGroup *tsopb.KeyspaceGroup, - newPrimaryAddr string, - secondaryAddrs, addrs []string, -) (oldPrimaryAddr string, primarySwitched, secondaryChanged bool) { + newPrimaryURL string, + secondaryURLs, urls []string, +) (oldPrimaryURL string, primarySwitched, secondaryChanged bool) { k.Lock() defer k.Unlock() - // If the new primary address is empty, we don't switch the primary address. - oldPrimaryAddr = k.primaryAddr - if len(newPrimaryAddr) > 0 { - primarySwitched = !strings.EqualFold(oldPrimaryAddr, newPrimaryAddr) - k.primaryAddr = newPrimaryAddr + // If the new primary URL is empty, we don't switch the primary URL. + oldPrimaryURL = k.primaryURL + if len(newPrimaryURL) > 0 { + primarySwitched = !strings.EqualFold(oldPrimaryURL, newPrimaryURL) + k.primaryURL = newPrimaryURL } - if !reflect.DeepEqual(k.secondaryAddrs, secondaryAddrs) { - k.secondaryAddrs = secondaryAddrs + if !reflect.DeepEqual(k.secondaryURLs, secondaryURLs) { + k.secondaryURLs = secondaryURLs secondaryChanged = true } k.group = keyspaceGroup - k.addrs = addrs + k.urls = urls return } // tsoServerDiscovery is for discovering the serving endpoints of the TSO servers -// TODO: dynamically update the TSO server addresses in the case of TSO server failover +// TODO: dynamically update the TSO server URLs in the case of TSO server failover // and scale-out/in. type tsoServerDiscovery struct { sync.RWMutex - addrs []string + urls []string // used for round-robin load balancing selectIdx int // failureCount counts the consecutive failures for communicating with the tso servers @@ -107,7 +107,7 @@ func (t *tsoServerDiscovery) countFailure() bool { t.Lock() defer t.Unlock() t.failureCount++ - return t.failureCount >= len(t.addrs) + return t.failureCount >= len(t.urls) } func (t *tsoServerDiscovery) resetFailure() { @@ -133,14 +133,14 @@ type tsoServiceDiscovery struct { // keyspaceGroupSD is for discovering the serving endpoints of the keyspace group keyspaceGroupSD *keyspaceGroupSvcDiscovery - // addr -> a gRPC connection + // URL -> a gRPC connection clientConns sync.Map // Store as map[string]*grpc.ClientConn // localAllocPrimariesUpdatedCb will be called when the local tso allocator primary list is updated. - // The input is a map {DC Location -> Leader Addr} - localAllocPrimariesUpdatedCb tsoLocalServAddrsUpdatedFunc + // The input is a map {DC Location -> Leader URL} + localAllocPrimariesUpdatedCb tsoLocalServURLsUpdatedFunc // globalAllocPrimariesUpdatedCb will be called when the local tso allocator primary list is updated. - globalAllocPrimariesUpdatedCb tsoGlobalServAddrUpdatedFunc + globalAllocPrimariesUpdatedCb tsoGlobalServURLUpdatedFunc checkMembershipCh chan struct{} @@ -173,11 +173,11 @@ func newTSOServiceDiscovery( } c.keyspaceID.Store(keyspaceID) c.keyspaceGroupSD = &keyspaceGroupSvcDiscovery{ - primaryAddr: "", - secondaryAddrs: make([]string, 0), - addrs: make([]string, 0), + primaryURL: "", + secondaryURLs: make([]string, 0), + urls: make([]string, 0), } - c.tsoServerDiscovery = &tsoServerDiscovery{addrs: make([]string, 0)} + c.tsoServerDiscovery = &tsoServerDiscovery{urls: make([]string, 0)} // Start with the default keyspace group. The actual keyspace group, to which the keyspace belongs, // will be discovered later. c.defaultDiscoveryKey = fmt.Sprintf(tsoSvcDiscoveryFormat, clusterID, defaultKeySpaceGroupID) @@ -288,44 +288,44 @@ func (c *tsoServiceDiscovery) GetKeyspaceGroupID() uint32 { return c.keyspaceGroupSD.group.Id } -// GetServiceURLs returns the URLs of the tso primary/secondary addresses of this keyspace group. +// GetServiceURLs returns the URLs of the tso primary/secondary URL of this keyspace group. // For testing use. It should only be called when the client is closed. func (c *tsoServiceDiscovery) GetServiceURLs() []string { c.keyspaceGroupSD.RLock() defer c.keyspaceGroupSD.RUnlock() - return c.keyspaceGroupSD.addrs + return c.keyspaceGroupSD.urls } -// GetServingAddr returns the grpc client connection of the serving endpoint +// GetServingURL returns the grpc client connection of the serving endpoint // which is the primary in a primary/secondary configured cluster. func (c *tsoServiceDiscovery) GetServingEndpointClientConn() *grpc.ClientConn { - if cc, ok := c.clientConns.Load(c.getPrimaryAddr()); ok { + if cc, ok := c.clientConns.Load(c.getPrimaryURL()); ok { return cc.(*grpc.ClientConn) } return nil } -// GetClientConns returns the mapping {addr -> a gRPC connection} +// GetClientConns returns the mapping {URL -> a gRPC connection} func (c *tsoServiceDiscovery) GetClientConns() *sync.Map { return &c.clientConns } -// GetServingAddr returns the serving endpoint which is the primary in a +// GetServingURL returns the serving endpoint which is the primary in a // primary/secondary configured cluster. -func (c *tsoServiceDiscovery) GetServingAddr() string { - return c.getPrimaryAddr() +func (c *tsoServiceDiscovery) GetServingURL() string { + return c.getPrimaryURL() } -// GetBackupAddrs gets the addresses of the current reachable and healthy +// GetBackupURLs gets the URLs of the current reachable and healthy // backup service endpoints. Backup service endpoints are secondaries in // a primary/secondary configured cluster. -func (c *tsoServiceDiscovery) GetBackupAddrs() []string { - return c.getSecondaryAddrs() +func (c *tsoServiceDiscovery) GetBackupURLs() []string { + return c.getSecondaryURLs() } -// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr. -func (c *tsoServiceDiscovery) GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) { - return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, addr, c.tlsCfg, c.option.gRPCDialOptions...) +// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given URL. +func (c *tsoServiceDiscovery) GetOrCreateGRPCConn(url string) (*grpc.ClientConn, error) { + return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, url, c.tlsCfg, c.option.gRPCDialOptions...) } // ScheduleCheckMemberChanged is used to trigger a check to see if there is any change in service endpoints. @@ -347,28 +347,28 @@ func (c *tsoServiceDiscovery) CheckMemberChanged() error { return nil } -// AddServingAddrSwitchedCallback adds callbacks which will be called when the primary in +// AddServingURLSwitchedCallback adds callbacks which will be called when the primary in // a primary/secondary configured cluster is switched. -func (c *tsoServiceDiscovery) AddServingAddrSwitchedCallback(callbacks ...func()) { +func (c *tsoServiceDiscovery) AddServingURLSwitchedCallback(callbacks ...func()) { } -// AddServiceAddrsSwitchedCallback adds callbacks which will be called when any primary/secondary +// AddServiceURLsSwitchedCallback adds callbacks which will be called when any primary/secondary // in a primary/secondary configured cluster is changed. -func (c *tsoServiceDiscovery) AddServiceAddrsSwitchedCallback(callbacks ...func()) { +func (c *tsoServiceDiscovery) AddServiceURLsSwitchedCallback(callbacks ...func()) { } -// SetTSOLocalServAddrsUpdatedCallback adds a callback which will be called when the local tso +// SetTSOLocalServURLsUpdatedCallback adds a callback which will be called when the local tso // allocator leader list is updated. -func (c *tsoServiceDiscovery) SetTSOLocalServAddrsUpdatedCallback(callback tsoLocalServAddrsUpdatedFunc) { +func (c *tsoServiceDiscovery) SetTSOLocalServURLsUpdatedCallback(callback tsoLocalServURLsUpdatedFunc) { c.localAllocPrimariesUpdatedCb = callback } -// SetTSOGlobalServAddrUpdatedCallback adds a callback which will be called when the global tso +// SetTSOGlobalServURLUpdatedCallback adds a callback which will be called when the global tso // allocator leader is updated. -func (c *tsoServiceDiscovery) SetTSOGlobalServAddrUpdatedCallback(callback tsoGlobalServAddrUpdatedFunc) { - addr := c.getPrimaryAddr() - if len(addr) > 0 { - callback(addr) +func (c *tsoServiceDiscovery) SetTSOGlobalServURLUpdatedCallback(callback tsoGlobalServURLUpdatedFunc) { + url := c.getPrimaryURL() + if len(url) > 0 { + callback(url) } c.globalAllocPrimariesUpdatedCb = callback } @@ -383,18 +383,18 @@ func (c *tsoServiceDiscovery) GetAllServiceClients() []ServiceClient { return c.apiSvcDiscovery.GetAllServiceClients() } -// getPrimaryAddr returns the primary address. -func (c *tsoServiceDiscovery) getPrimaryAddr() string { +// getPrimaryURL returns the primary URL. +func (c *tsoServiceDiscovery) getPrimaryURL() string { c.keyspaceGroupSD.RLock() defer c.keyspaceGroupSD.RUnlock() - return c.keyspaceGroupSD.primaryAddr + return c.keyspaceGroupSD.primaryURL } -// getSecondaryAddrs returns the secondary addresses. -func (c *tsoServiceDiscovery) getSecondaryAddrs() []string { +// getSecondaryURLs returns the secondary URLs. +func (c *tsoServiceDiscovery) getSecondaryURLs() []string { c.keyspaceGroupSD.RLock() defer c.keyspaceGroupSD.RUnlock() - return c.keyspaceGroupSD.secondaryAddrs + return c.keyspaceGroupSD.secondaryURLs } func (c *tsoServiceDiscovery) afterPrimarySwitched(oldPrimary, newPrimary string) error { @@ -411,9 +411,9 @@ func (c *tsoServiceDiscovery) afterPrimarySwitched(oldPrimary, newPrimary string } func (c *tsoServiceDiscovery) updateMember() error { - // The keyspace membership or the primary serving address of the keyspace group, to which this + // The keyspace membership or the primary serving URL of the keyspace group, to which this // keyspace belongs, might have been changed. We need to query tso servers to get the latest info. - tsoServerAddr, err := c.getTSOServer(c.apiSvcDiscovery) + tsoServerURL, err := c.getTSOServer(c.apiSvcDiscovery) if err != nil { log.Error("[tso] failed to get tso server", errs.ZapError(err)) return err @@ -421,41 +421,41 @@ func (c *tsoServiceDiscovery) updateMember() error { keyspaceID := c.GetKeyspaceID() var keyspaceGroup *tsopb.KeyspaceGroup - if len(tsoServerAddr) > 0 { - keyspaceGroup, err = c.findGroupByKeyspaceID(keyspaceID, tsoServerAddr, updateMemberTimeout) + if len(tsoServerURL) > 0 { + keyspaceGroup, err = c.findGroupByKeyspaceID(keyspaceID, tsoServerURL, updateMemberTimeout) if err != nil { if c.tsoServerDiscovery.countFailure() { log.Error("[tso] failed to find the keyspace group", zap.Uint32("keyspace-id-in-request", keyspaceID), - zap.String("tso-server-addr", tsoServerAddr), + zap.String("tso-server-url", tsoServerURL), errs.ZapError(err)) } return err } c.tsoServerDiscovery.resetFailure() } else { - // There is no error but no tso server address found, which means + // There is no error but no tso server URL found, which means // the server side hasn't been upgraded to the version that // processes and returns GetClusterInfoResponse.TsoUrls. In this case, - // we fall back to the old way of discovering the tso primary addresses + // we fall back to the old way of discovering the tso primary URL // from etcd directly. c.printFallbackLogOnce.Do(func() { - log.Warn("[tso] no tso server address found,"+ + log.Warn("[tso] no tso server URL found,"+ " fallback to the legacy path to discover from etcd directly", zap.Uint32("keyspace-id-in-request", keyspaceID), - zap.String("tso-server-addr", tsoServerAddr), + zap.String("tso-server-url", tsoServerURL), zap.String("discovery-key", c.defaultDiscoveryKey)) }) - addrs, err := c.discoverWithLegacyPath() + urls, err := c.discoverWithLegacyPath() if err != nil { return err } - if len(addrs) == 0 { - return errors.New("no tso server address found") + if len(urls) == 0 { + return errors.New("no tso server url found") } - members := make([]*tsopb.KeyspaceGroupMember, 0, len(addrs)) - for _, addr := range addrs { - members = append(members, &tsopb.KeyspaceGroupMember{Address: addr}) + members := make([]*tsopb.KeyspaceGroupMember, 0, len(urls)) + for _, url := range urls { + members = append(members, &tsopb.KeyspaceGroupMember{Address: url}) } members[0].IsPrimary = true keyspaceGroup = &tsopb.KeyspaceGroup{ @@ -472,49 +472,49 @@ func (c *tsoServiceDiscovery) updateMember() error { zap.Uint32("old-keyspace-group-id", oldGroupID)) } - // Initialize the serving addresses from the returned keyspace group info. - primaryAddr := "" - secondaryAddrs := make([]string, 0) - addrs := make([]string, 0, len(keyspaceGroup.Members)) + // Initialize the serving URL from the returned keyspace group info. + primaryURL := "" + secondaryURLs := make([]string, 0) + urls := make([]string, 0, len(keyspaceGroup.Members)) for _, m := range keyspaceGroup.Members { - addrs = append(addrs, m.Address) + urls = append(urls, m.Address) if m.IsPrimary { - primaryAddr = m.Address + primaryURL = m.Address } else { - secondaryAddrs = append(secondaryAddrs, m.Address) + secondaryURLs = append(secondaryURLs, m.Address) } } - // If the primary address is not empty, we need to create a grpc connection to it, and do it + // If the primary URL is not empty, we need to create a grpc connection to it, and do it // out of the critical section of the keyspace group service discovery. - if len(primaryAddr) > 0 { - if primarySwitched := !strings.EqualFold(primaryAddr, c.getPrimaryAddr()); primarySwitched { - if _, err := c.GetOrCreateGRPCConn(primaryAddr); err != nil { + if len(primaryURL) > 0 { + if primarySwitched := !strings.EqualFold(primaryURL, c.getPrimaryURL()); primarySwitched { + if _, err := c.GetOrCreateGRPCConn(primaryURL); err != nil { log.Warn("[tso] failed to connect the next primary", zap.Uint32("keyspace-id-in-request", keyspaceID), - zap.String("tso-server-addr", tsoServerAddr), - zap.String("next-primary", primaryAddr), errs.ZapError(err)) + zap.String("tso-server-url", tsoServerURL), + zap.String("next-primary", primaryURL), errs.ZapError(err)) return err } } } oldPrimary, primarySwitched, _ := - c.keyspaceGroupSD.update(keyspaceGroup, primaryAddr, secondaryAddrs, addrs) + c.keyspaceGroupSD.update(keyspaceGroup, primaryURL, secondaryURLs, urls) if primarySwitched { log.Info("[tso] updated keyspace group service discovery info", zap.Uint32("keyspace-id-in-request", keyspaceID), - zap.String("tso-server-addr", tsoServerAddr), + zap.String("tso-server-url", tsoServerURL), zap.String("keyspace-group-service", keyspaceGroup.String())) - if err := c.afterPrimarySwitched(oldPrimary, primaryAddr); err != nil { + if err := c.afterPrimarySwitched(oldPrimary, primaryURL); err != nil { return err } } - // Even if the primary address is empty, we still updated other returned info above, including the - // keyspace group info and the secondary addresses. - if len(primaryAddr) == 0 { - return errors.New("no primary address found") + // Even if the primary URL is empty, we still updated other returned info above, including the + // keyspace group info and the secondary url. + if len(primaryURL) == 0 { + return errors.New("no primary URL found") } return nil @@ -523,7 +523,7 @@ func (c *tsoServiceDiscovery) updateMember() error { // Query the keyspace group info from the tso server by the keyspace ID. The server side will return // the info of the keyspace group to which this keyspace belongs. func (c *tsoServiceDiscovery) findGroupByKeyspaceID( - keyspaceID uint32, tsoSrvAddr string, timeout time.Duration, + keyspaceID uint32, tsoSrvURL string, timeout time.Duration, ) (*tsopb.KeyspaceGroup, error) { failpoint.Inject("unexpectedCallOfFindGroupByKeyspaceID", func(val failpoint.Value) { keyspaceToCheck, ok := val.(int) @@ -534,7 +534,7 @@ func (c *tsoServiceDiscovery) findGroupByKeyspaceID( ctx, cancel := context.WithTimeout(c.ctx, timeout) defer cancel() - cc, err := c.GetOrCreateGRPCConn(tsoSrvAddr) + cc, err := c.GetOrCreateGRPCConn(tsoSrvURL) if err != nil { return nil, err } @@ -572,40 +572,40 @@ func (c *tsoServiceDiscovery) getTSOServer(sd ServiceDiscovery) (string, error) defer c.Unlock() var ( - addrs []string - err error + urls []string + err error ) t := c.tsoServerDiscovery - if len(t.addrs) == 0 || t.failureCount == len(t.addrs) { - addrs, err = sd.(*pdServiceDiscovery).discoverMicroservice(tsoService) + if len(t.urls) == 0 || t.failureCount == len(t.urls) { + urls, err = sd.(*pdServiceDiscovery).discoverMicroservice(tsoService) if err != nil { return "", err } failpoint.Inject("serverReturnsNoTSOAddrs", func() { - log.Info("[failpoint] injected error: server returns no tso addrs") - addrs = nil + log.Info("[failpoint] injected error: server returns no tso URLs") + urls = nil }) - if len(addrs) == 0 { - // There is no error but no tso server address found, which means + if len(urls) == 0 { + // There is no error but no tso server url found, which means // the server side hasn't been upgraded to the version that // processes and returns GetClusterInfoResponse.TsoUrls. Return here // and handle the fallback logic outside of this function. return "", nil } - log.Info("update tso server addresses", zap.Strings("addrs", addrs)) + log.Info("update tso server URLs", zap.Strings("urls", urls)) - t.addrs = addrs + t.urls = urls t.selectIdx = 0 t.failureCount = 0 } // Pick a TSO server in a round-robin way. - tsoServerAddr := t.addrs[t.selectIdx] + tsoServerURL := t.urls[t.selectIdx] t.selectIdx++ - t.selectIdx %= len(t.addrs) + t.selectIdx %= len(t.urls) - return tsoServerAddr, nil + return tsoServerURL, nil } func (c *tsoServiceDiscovery) discoverWithLegacyPath() ([]string, error) { diff --git a/client/tso_stream.go b/client/tso_stream.go index e3203818938..acefa19d21c 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -35,13 +35,13 @@ type tsoStreamBuilderFactory interface { type pdTSOStreamBuilderFactory struct{} func (f *pdTSOStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBuilder { - return &pdTSOStreamBuilder{client: pdpb.NewPDClient(cc), serverAddr: cc.Target()} + return &pdTSOStreamBuilder{client: pdpb.NewPDClient(cc), serverURL: cc.Target()} } type tsoTSOStreamBuilderFactory struct{} func (f *tsoTSOStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBuilder { - return &tsoTSOStreamBuilder{client: tsopb.NewTSOClient(cc), serverAddr: cc.Target()} + return &tsoTSOStreamBuilder{client: tsopb.NewTSOClient(cc), serverURL: cc.Target()} } // TSO Stream Builder @@ -51,8 +51,8 @@ type tsoStreamBuilder interface { } type pdTSOStreamBuilder struct { - serverAddr string - client pdpb.PDClient + serverURL string + client pdpb.PDClient } func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (tsoStream, error) { @@ -62,14 +62,14 @@ func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFun stream, err := b.client.Tso(ctx) done <- struct{}{} if err == nil { - return &pdTSOStream{stream: stream, serverAddr: b.serverAddr}, nil + return &pdTSOStream{stream: stream, serverURL: b.serverURL}, nil } return nil, err } type tsoTSOStreamBuilder struct { - serverAddr string - client tsopb.TSOClient + serverURL string + client tsopb.TSOClient } func (b *tsoTSOStreamBuilder) build( @@ -81,7 +81,7 @@ func (b *tsoTSOStreamBuilder) build( stream, err := b.client.Tso(ctx) done <- struct{}{} if err == nil { - return &tsoTSOStream{stream: stream, serverAddr: b.serverAddr}, nil + return &tsoTSOStream{stream: stream, serverURL: b.serverURL}, nil } return nil, err } @@ -102,7 +102,7 @@ func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done cha // TSO Stream type tsoStream interface { - getServerAddr() string + getServerURL() string // processRequests processes TSO requests in streaming mode to get timestamps processRequests( clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, @@ -111,12 +111,12 @@ type tsoStream interface { } type pdTSOStream struct { - serverAddr string - stream pdpb.PD_TsoClient + serverURL string + stream pdpb.PD_TsoClient } -func (s *pdTSOStream) getServerAddr() string { - return s.serverAddr +func (s *pdTSOStream) getServerURL() string { + return s.serverURL } func (s *pdTSOStream) processRequests( @@ -165,12 +165,12 @@ func (s *pdTSOStream) processRequests( } type tsoTSOStream struct { - serverAddr string - stream tsopb.TSO_TsoClient + serverURL string + stream tsopb.TSO_TsoClient } -func (s *tsoTSOStream) getServerAddr() string { - return s.serverAddr +func (s *tsoTSOStream) getServerURL() string { + return s.serverURL } func (s *tsoTSOStream) processRequests( diff --git a/metrics/alertmanager/pd.rules.yml b/metrics/alertmanager/pd.rules.yml index cf8cea9c3b4..5d51dc4a1c5 100644 --- a/metrics/alertmanager/pd.rules.yml +++ b/metrics/alertmanager/pd.rules.yml @@ -195,3 +195,27 @@ groups: description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values:{{ $value }}' value: '{{ $value }}' summary: PD_cluster_slow_tikv_nums + + - alert: PD_cpu_quota + expr: irate(process_cpu_seconds_total{job="pd"}[30s]) / pd_service_maxprocs > 0.8 + for: 45s + labels: + env: ENV_LABELS_ENV + level: warning + expr: irate(process_cpu_seconds_total{job="pd"}[30s]) / pd_service_maxprocs > 0.8 + annotations: + description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values:{{ $value }}' + value: '{{ $value }}' + summary: PD CPU usage is over 80% of CPU quota + + - alert: PD_memory_quota + expr: process_resident_memory_bytes{job="pd"} / pd_service_memory_quota_bytes > 0.8 + for: 15s + labels: + env: ENV_LABELS_ENV + level: warning + expr: process_resident_memory_bytes{job="pd"} / pd_service_memory_quota_bytes > 0.8 + annotations: + description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values:{{ $value }}' + value: '{{ $value }}' + summary: PD memory usage is over 80% of memory quota diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index 2ead4b1e249..fdab784bf94 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -1822,7 +1822,7 @@ "expr": "pd_service_maxprocs{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",job=~\".*(pd|tso|scheduling).*\"}", "hide": false, "interval": "", - "legendFormat": "{{job}}-{{instance}}-limit", + "legendFormat": "quota-{{job}}-{{instance}}", "refId": "B" } ], @@ -1967,6 +1967,13 @@ "interval": "", "legendFormat": "GCTrigger-{{job}}-{{instance}}", "refId": "G" + }, + { + "expr": "pd_service_memory_quota_bytes{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",job=~\".*pd.*\"}", + "hide": false, + "interval": "", + "legendFormat": "quota-{{job}}-{{instance}}", + "refId": "H" } ], "thresholds": [], @@ -13633,4 +13640,4 @@ "title": "Test-Cluster-PD", "uid": "Q6RuHYIWk", "version": 1 -} \ No newline at end of file +} diff --git a/pkg/basicserver/metrics.go b/pkg/basicserver/metrics.go index 8f26216d696..4e4ab214ed5 100644 --- a/pkg/basicserver/metrics.go +++ b/pkg/basicserver/metrics.go @@ -17,7 +17,7 @@ package server import "github.com/prometheus/client_golang/prometheus" var ( - // ServerMaxProcsGauge record the maxprocs. + // ServerMaxProcsGauge records the maxprocs. ServerMaxProcsGauge = prometheus.NewGauge( prometheus.GaugeOpts{ Namespace: "pd", @@ -26,6 +26,15 @@ var ( Help: "The value of GOMAXPROCS.", }) + // ServerMemoryLimit records the cgroup memory limit. + ServerMemoryLimit = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "service", + Name: "memory_quota_bytes", + Help: "The value of memory quota bytes.", + }) + // ServerInfoGauge indicates the pd server info including version and git hash. ServerInfoGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -38,5 +47,6 @@ var ( func init() { prometheus.MustRegister(ServerMaxProcsGauge) + prometheus.MustRegister(ServerMemoryLimit) prometheus.MustRegister(ServerInfoGauge) } diff --git a/pkg/cgroup/cgroup.go b/pkg/cgroup/cgroup.go index 2a99d2fcd3d..e45dcbc0929 100644 --- a/pkg/cgroup/cgroup.go +++ b/pkg/cgroup/cgroup.go @@ -155,6 +155,33 @@ func readFile(filepath string) (res []byte, err error) { return res, err } +// The field in /proc/self/cgroup and /proc/self/mountinfo may appear as "cpuacct,cpu" or "rw,cpuacct,cpu" +// while the input controller is "cpu,cpuacct" +func controllerMatch(field string, controller string) bool { + if field == controller { + return true + } + + fs := strings.Split(field, ",") + if len(fs) < 2 { + return false + } + cs := strings.Split(controller, ",") + if len(fs) < len(cs) { + return false + } + fmap := make(map[string]struct{}, len(fs)) + for _, f := range fs { + fmap[f] = struct{}{} + } + for _, c := range cs { + if _, ok := fmap[c]; !ok { + return false + } + } + return true +} + // The controller is defined via either type `memory` for cgroup v1 or via empty type for cgroup v2, // where the type is the second field in /proc/[pid]/cgroup file func detectControlPath(cgroupFilePath string, controller string) (string, error) { @@ -185,7 +212,7 @@ func detectControlPath(cgroupFilePath string, controller string) (string, error) // but no known container solutions support it. if f0 == "0" && f1 == "" { unifiedPathIfFound = string(fields[2]) - } else if f1 == controller { + } else if controllerMatch(f1, controller) { var result []byte // In some case, the cgroup path contains `:`. We need to join them back. if len(fields) > 3 { @@ -314,7 +341,7 @@ func detectCgroupVersion(fields [][]byte, controller string) (_ int, found bool) // Check for controller specifically in cgroup v1 (it is listed in super // options field), as the value can't be found if it is not enforced. - if bytes.Equal(fields[pos], []byte("cgroup")) && bytes.Contains(fields[pos+2], []byte(controller)) { + if bytes.Equal(fields[pos], []byte("cgroup")) && controllerMatch(string(fields[pos+2]), controller) { return 1, true } else if bytes.Equal(fields[pos], []byte("cgroup2")) { return 2, true diff --git a/pkg/cgroup/cgroup_cpu.go b/pkg/cgroup/cgroup_cpu.go index 7063aa89bf9..67eace5363c 100644 --- a/pkg/cgroup/cgroup_cpu.go +++ b/pkg/cgroup/cgroup_cpu.go @@ -88,6 +88,49 @@ func getCgroupCPUHelper(root string) (CPUUsage, error) { return res, nil } +// Helper function for getCgroupCPUPeriodAndQuota. Root is always "/", except in tests. +func getCgroupCPUPeriodAndQuota(root string) (period int64, quota int64, err error) { + path, err := detectControlPath(filepath.Join(root, procPathCGroup), "cpu") + if err != nil { + return + } + + // No CPU controller detected + if path == "" { + err = errors.New("no cpu controller detected") + return + } + + mount, ver, err := getCgroupDetails(filepath.Join(root, procPathMountInfo), path, "cpu") + if err != nil { + return + } + + if len(mount) == 2 { + cgroupRootV1 := filepath.Join(root, mount[0]) + cgroupRootV2 := filepath.Join(root, mount[1], path) + period, quota, err = detectCPUQuotaInV2(cgroupRootV2) + if err != nil { + period, quota, err = detectCPUQuotaInV1(cgroupRootV1) + } + if err != nil { + return + } + } else { + switch ver[0] { + case 1: + cgroupRoot := filepath.Join(root, mount[0]) + period, quota, err = detectCPUQuotaInV1(cgroupRoot) + case 2: + cgroupRoot := filepath.Join(root, mount[0], path) + period, quota, err = detectCPUQuotaInV2(cgroupRoot) + default: + err = fmt.Errorf("detected unknown cgroup version index: %d", ver[0]) + } + } + return +} + // CPUShares returns the number of CPUs this cgroup can be expected to // max out. If there's no limit, NumCPU is returned. func (c CPUUsage) CPUShares() float64 { diff --git a/pkg/cgroup/cgroup_cpu_linux.go b/pkg/cgroup/cgroup_cpu_linux.go index 34bce632daa..fa7a8e84efa 100644 --- a/pkg/cgroup/cgroup_cpu_linux.go +++ b/pkg/cgroup/cgroup_cpu_linux.go @@ -44,6 +44,11 @@ func CPUQuotaToGOMAXPROCS(minValue int) (int, CPUQuotaStatus, error) { return maxProcs, CPUQuotaUsed, nil } +// GetCPUPeriodAndQuota returns CPU period and quota time of cgroup. +func GetCPUPeriodAndQuota() (period int64, quota int64, err error) { + return getCgroupCPUPeriodAndQuota("/") +} + // InContainer returns true if the process is running in a container. func InContainer() bool { v, err := os.ReadFile(procPathCGroup) diff --git a/pkg/cgroup/cgroup_cpu_unsupport.go b/pkg/cgroup/cgroup_cpu_unsupport.go index 9576ff52542..72c37aad396 100644 --- a/pkg/cgroup/cgroup_cpu_unsupport.go +++ b/pkg/cgroup/cgroup_cpu_unsupport.go @@ -27,6 +27,12 @@ func GetCgroupCPU() (CPUUsage, error) { return cpuUsage, nil } +// GetCPUPeriodAndQuota returns CPU period and quota time of cgroup. +// This is Linux-specific and not supported in the current OS. +func GetCPUPeriodAndQuota() (period int64, quota int64, err error) { + return -1, -1, nil +} + // CPUQuotaToGOMAXPROCS converts the CPU quota applied to the calling process // to a valid GOMAXPROCS value. This is Linux-specific and not supported in the // current OS. diff --git a/pkg/cgroup/cgroup_mock_test.go b/pkg/cgroup/cgroup_mock_test.go index 949e93bb125..5a7ca9a73dc 100644 --- a/pkg/cgroup/cgroup_mock_test.go +++ b/pkg/cgroup/cgroup_mock_test.go @@ -18,6 +18,7 @@ import ( "os" "path/filepath" "regexp" + "strings" "testing" "github.com/stretchr/testify/require" @@ -370,6 +371,28 @@ const ( ) func TestCgroupsGetCPU(t *testing.T) { + for i := 0; i < 2; i++ { + if i == 1 { + // The field in /proc/self/cgroup and /proc/self/mountinfo may appear as "cpuacct,cpu" or "rw,cpuacct,cpu" + // while the input controller is "cpu,cpuacct" + v1CgroupWithCPUController = strings.ReplaceAll(v1CgroupWithCPUController, "cpu,cpuacct", "cpuacct,cpu") + v1CgroupWithCPUControllerNS = strings.ReplaceAll(v1CgroupWithCPUControllerNS, "cpu,cpuacct", "cpuacct,cpu") + v1CgroupWithCPUControllerNSMountRel = strings.ReplaceAll(v1CgroupWithCPUControllerNSMountRel, "cpu,cpuacct", "cpuacct,cpu") + v1CgroupWithCPUControllerNSMountRelRemount = strings.ReplaceAll(v1CgroupWithCPUControllerNSMountRelRemount, "cpu,cpuacct", "cpuacct,cpu") + v1CgroupWithCPUControllerNS2 = strings.ReplaceAll(v1CgroupWithCPUControllerNS2, "cpu,cpuacct", "cpuacct,cpu") + + v1MountsWithCPUController = strings.ReplaceAll(v1MountsWithCPUController, "rw,cpu,cpuacct", "rw,cpuacct,cpu") + v1MountsWithCPUControllerNS = strings.ReplaceAll(v1MountsWithCPUControllerNS, "rw,cpu,cpuacct", "rw,cpuacct,cpu") + v1MountsWithCPUControllerNSMountRel = strings.ReplaceAll(v1MountsWithCPUControllerNSMountRel, "rw,cpu,cpuacct", "rw,cpuacct,cpu") + v1MountsWithCPUControllerNSMountRelRemount = strings.ReplaceAll(v1MountsWithCPUControllerNSMountRelRemount, "rw,cpu,cpuacct", "rw,cpuacct,cpu") + v1MountsWithCPUControllerNS2 = strings.ReplaceAll(v1MountsWithCPUControllerNS2, "rw,cpu,cpuacct", "rw,cpuacct,cpu") + } + testCgroupGetCPUHelper(t) + testCgroupsGetCPUPeriodAndQuota(t) + } +} + +func testCgroupGetCPUHelper(t *testing.T) { for _, tc := range []struct { name string paths map[string]string @@ -552,6 +575,147 @@ func TestCgroupsGetCPU(t *testing.T) { } } +func testCgroupsGetCPUPeriodAndQuota(t *testing.T) { + for _, tc := range []struct { + name string + paths map[string]string + errMsg string + period int64 + quota int64 + }{ + { + errMsg: "failed to read cpu cgroup from cgroups file:", + }, + { + paths: map[string]string{ + "/proc/self/cgroup": v1CgroupWithoutCPUController, + "/proc/self/mountinfo": v1MountsWithoutCPUController, + }, + errMsg: "no cpu controller detected", + }, + { + paths: map[string]string{ + "/proc/self/cgroup": v1CgroupWithCPUController, + }, + errMsg: "failed to read mounts info from file:", + }, + { + paths: map[string]string{ + "/proc/self/cgroup": v1CgroupWithCPUController, + "/proc/self/mountinfo": v1MountsWithoutCPUController, + }, + errMsg: "failed to detect cgroup root mount and version", + }, + { + paths: map[string]string{ + "/proc/self/cgroup": v1CgroupWithCPUController, + "/proc/self/mountinfo": v1MountsWithCPUController, + "/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_quota_us": "12345", + "/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_period_us": "67890", + }, + quota: int64(12345), + period: int64(67890), + }, + { + paths: map[string]string{ + "/proc/self/cgroup": v1CgroupWithCPUControllerNS, + "/proc/self/mountinfo": v1MountsWithCPUControllerNS, + "/sys/fs/cgroup/cpu,cpuacct/crdb_test/cpu.cfs_quota_us": "12345", + "/sys/fs/cgroup/cpu,cpuacct/crdb_test/cpu.cfs_period_us": "67890", + }, + quota: int64(12345), + period: int64(67890), + }, + { + paths: map[string]string{ + "/proc/self/cgroup": v1CgroupWithCPUControllerNSMountRel, + "/proc/self/mountinfo": v1MountsWithCPUControllerNSMountRel, + }, + errMsg: "failed to detect cgroup root mount and version", + }, + { + paths: map[string]string{ + "/proc/self/cgroup": v1CgroupWithCPUControllerNSMountRelRemount, + "/proc/self/mountinfo": v1MountsWithCPUControllerNSMountRelRemount, + "/sys/fs/cgroup/cpu,cpuacct/crdb_test/cpu.cfs_quota_us": "12345", + "/sys/fs/cgroup/cpu,cpuacct/crdb_test/cpu.cfs_period_us": "67890", + }, + quota: int64(12345), + period: int64(67890), + }, + { + paths: map[string]string{ + "/proc/self/cgroup": v1CgroupWithCPUControllerNS2, + "/proc/self/mountinfo": v1MountsWithCPUControllerNS2, + "/sys/fs/cgroup/cpu,cpuacct/crdb_test/cpu.cfs_quota_us": "12345", + "/sys/fs/cgroup/cpu,cpuacct/crdb_test/cpu.cfs_period_us": "67890", + }, + quota: int64(12345), + period: int64(67890), + }, + { + paths: map[string]string{ + "/proc/self/cgroup": v1CgroupWithCPUController, + "/proc/self/mountinfo": v1MountsWithCPUController, + "/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_quota_us": "-1", + "/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_period_us": "67890", + }, + quota: int64(-1), + period: int64(67890), + }, + { + paths: map[string]string{ + "/proc/self/cgroup": v2CgroupWithMemoryController, + "/proc/self/mountinfo": v2Mounts, + }, + errMsg: "error when read cpu quota from cgroup v2", + }, + { + paths: map[string]string{ + "/proc/self/cgroup": v2CgroupWithMemoryController, + "/proc/self/mountinfo": v2Mounts, + "/sys/fs/cgroup/machine.slice/libpod-f1c6b44c0d61f273952b8daecf154cee1be2d503b7e9184ebf7fcaf48e139810.scope/cpu.max": "foo bar\n", + }, + errMsg: "error when reading cpu quota from cgroup v2 at", + }, + { + paths: map[string]string{ + "/proc/self/cgroup": v2CgroupWithMemoryController, + "/proc/self/mountinfo": v2Mounts, + "/sys/fs/cgroup/machine.slice/libpod-f1c6b44c0d61f273952b8daecf154cee1be2d503b7e9184ebf7fcaf48e139810.scope/cpu.max": "100 1000\n", + }, + quota: int64(100), + period: int64(1000), + }, + { + paths: map[string]string{ + "/proc/self/cgroup": v2CgroupWithMemoryController, + "/proc/self/mountinfo": v2Mounts, + "/sys/fs/cgroup/machine.slice/libpod-f1c6b44c0d61f273952b8daecf154cee1be2d503b7e9184ebf7fcaf48e139810.scope/cpu.max": "max 1000\n", + }, + quota: int64(-1), + period: int64(1000), + }, + { + paths: map[string]string{ + "/proc/self/cgroup": v2CgroupWithMemoryController, + "/proc/self/mountinfo": v2Mounts, + "/sys/fs/cgroup/machine.slice/libpod-f1c6b44c0d61f273952b8daecf154cee1be2d503b7e9184ebf7fcaf48e139810.scope/cpu.max": "100 1000\n", + }, + quota: int64(100), + period: int64(1000), + }, + } { + dir := createFiles(t, tc.paths) + + period, quota, err := getCgroupCPUPeriodAndQuota(dir) + require.True(t, isError(err, tc.errMsg), + "%v %v", err, tc.errMsg) + require.Equal(t, tc.quota, quota) + require.Equal(t, tc.period, period) + } +} + func createFiles(t *testing.T, paths map[string]string) (dir string) { dir = t.TempDir() @@ -564,7 +728,7 @@ func createFiles(t *testing.T, paths map[string]string) (dir string) { return dir } -const ( +var ( //#nosec G101 v1CgroupWithMemoryController = `11:blkio:/kubepods/besteffort/pod1bf924dd-3f6f-11ea-983d-0abc95f90166/c17eb535a47774285717e40bbda777ee72e81471272a5b8ebffd51fdf7f624e3 10:devices:/kubepods/besteffort/podcbfx2j5d-3f6f-11ea-983d-0abc95f90166/c17eb535a47774285717e40bbda777ee72e81471272a5b8ebffd51fdf7f624e3 diff --git a/pkg/utils/grpcutil/grpcutil.go b/pkg/utils/grpcutil/grpcutil.go index 3b233956fa8..9b8cc2feb49 100644 --- a/pkg/utils/grpcutil/grpcutil.go +++ b/pkg/utils/grpcutil/grpcutil.go @@ -169,8 +169,8 @@ func GetClientConn(ctx context.Context, addr string, tlsCfg *tls.Config, do ...g // BuildForwardContext creates a context with receiver metadata information. // It is used in client side. -func BuildForwardContext(ctx context.Context, addr string) context.Context { - md := metadata.Pairs(ForwardMetadataKey, addr) +func BuildForwardContext(ctx context.Context, url string) context.Context { + md := metadata.Pairs(ForwardMetadataKey, url) return metadata.NewOutgoingContext(ctx, md) } diff --git a/server/cgmon.go b/server/cgmon.go new file mode 100644 index 00000000000..5debd85495d --- /dev/null +++ b/server/cgmon.go @@ -0,0 +1,148 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "math" + "runtime" + "sync" + "time" + + "github.com/pingcap/log" + "github.com/shirou/gopsutil/v3/mem" + bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/cgroup" + "go.uber.org/zap" +) + +const ( + refreshInterval = 10 * time.Second +) + +type cgroupMonitor struct { + started bool + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + cfgMaxProcs int + lastMaxProcs int + lastMemoryLimit uint64 +} + +// StartCgroupMonitor uses to start the cgroup monitoring. +// WARN: this function is not thread-safe. +func (cgmon *cgroupMonitor) startCgroupMonitor(ctx context.Context) { + if cgmon.started { + return + } + cgmon.started = true + // Get configured maxprocs. + cgmon.cfgMaxProcs = runtime.GOMAXPROCS(0) + cgmon.ctx, cgmon.cancel = context.WithCancel(ctx) + cgmon.wg.Add(1) + go cgmon.refreshCgroupLoop() + log.Info("cgroup monitor started") +} + +// StopCgroupMonitor uses to stop the cgroup monitoring. +// WARN: this function is not thread-safe. +func (cgmon *cgroupMonitor) stopCgroupMonitor() { + if !cgmon.started { + return + } + cgmon.started = false + if cgmon.cancel != nil { + cgmon.cancel() + } + cgmon.wg.Wait() + log.Info("cgroup monitor stopped") +} + +func (cgmon *cgroupMonitor) refreshCgroupLoop() { + ticker := time.NewTicker(refreshInterval) + defer func() { + if r := recover(); r != nil { + log.Error("[pd] panic in the recoverable goroutine", + zap.String("funcInfo", "refreshCgroupLoop"), + zap.Reflect("r", r), + zap.Stack("stack")) + } + cgmon.wg.Done() + ticker.Stop() + }() + + cgmon.refreshCgroupCPU() + cgmon.refreshCgroupMemory() + for { + select { + case <-cgmon.ctx.Done(): + return + case <-ticker.C: + cgmon.refreshCgroupCPU() + cgmon.refreshCgroupMemory() + } + } +} + +func (cgmon *cgroupMonitor) refreshCgroupCPU() { + // Get the number of CPUs. + quota := runtime.NumCPU() + + // Get CPU quota from cgroup. + cpuPeriod, cpuQuota, err := cgroup.GetCPUPeriodAndQuota() + if err != nil { + log.Warn("failed to get cgroup cpu quota", zap.Error(err)) + return + } + if cpuPeriod > 0 && cpuQuota > 0 { + ratio := float64(cpuQuota) / float64(cpuPeriod) + if ratio < float64(quota) { + quota = int(math.Ceil(ratio)) + } + } + + if quota != cgmon.lastMaxProcs && quota < cgmon.cfgMaxProcs { + runtime.GOMAXPROCS(quota) + log.Info("set the maxprocs", zap.Int("quota", quota)) + bs.ServerMaxProcsGauge.Set(float64(quota)) + cgmon.lastMaxProcs = quota + } else if cgmon.lastMaxProcs == 0 { + log.Info("set the maxprocs", zap.Int("cfgMaxProcs", cgmon.cfgMaxProcs)) + bs.ServerMaxProcsGauge.Set(float64(cgmon.cfgMaxProcs)) + cgmon.lastMaxProcs = cgmon.cfgMaxProcs + } +} + +func (cgmon *cgroupMonitor) refreshCgroupMemory() { + memLimit, err := cgroup.GetMemoryLimit() + if err != nil { + log.Warn("failed to get cgroup memory limit", zap.Error(err)) + return + } + vmem, err := mem.VirtualMemory() + if err != nil { + log.Warn("failed to get system memory size", zap.Error(err)) + return + } + if memLimit > vmem.Total { + memLimit = vmem.Total + } + if memLimit != cgmon.lastMemoryLimit { + log.Info("set the memory limit", zap.Uint64("memLimit", memLimit)) + bs.ServerMemoryLimit.Set(float64(memLimit)) + cgmon.lastMemoryLimit = memLimit + } +} diff --git a/server/server.go b/server/server.go index 6e9341298cd..68c69b38e53 100644 --- a/server/server.go +++ b/server/server.go @@ -235,6 +235,9 @@ type Server struct { servicePrimaryMap sync.Map /* Store as map[string]string */ tsoPrimaryWatcher *etcdutil.LoopWatcher schedulingPrimaryWatcher *etcdutil.LoopWatcher + + // Cgroup Monitor + cgmon cgroupMonitor } // HandlerBuilder builds a server HTTP handler. @@ -542,6 +545,8 @@ func (s *Server) Close() { log.Info("closing server") + s.cgmon.stopCgroupMonitor() + s.stopServerLoop() if s.IsAPIServiceMode() { s.keyspaceGroupManager.Close() @@ -617,6 +622,8 @@ func (s *Server) Run() error { return err } + s.cgmon.startCgroupMonitor(s.ctx) + failpoint.Inject("delayStartServerLoop", func() { time.Sleep(2 * time.Second) }) diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index ca6776aec71..0daa270b2fa 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -109,7 +109,12 @@ func TestClientLeaderChange(t *testing.T) { defer cluster.Destroy() endpoints := runServer(re, cluster) - cli := setupCli(re, ctx, endpoints) + endpointsWithWrongURL := append([]string{}, endpoints...) + // inject wrong http scheme + for i := range endpointsWithWrongURL { + endpointsWithWrongURL[i] = "https://" + strings.TrimPrefix(endpointsWithWrongURL[i], "http://") + } + cli := setupCli(re, ctx, endpointsWithWrongURL) defer cli.Close() innerCli, ok := cli.(interface{ GetServiceDiscovery() pd.ServiceDiscovery }) re.True(ok) @@ -127,14 +132,14 @@ func TestClientLeaderChange(t *testing.T) { re.True(cluster.CheckTSOUnique(ts1)) leader := cluster.GetLeader() - waitLeader(re, innerCli.GetServiceDiscovery(), cluster.GetServer(leader).GetConfig().ClientUrls) + waitLeader(re, innerCli.GetServiceDiscovery(), cluster.GetServer(leader)) err = cluster.GetServer(leader).Stop() re.NoError(err) leader = cluster.WaitLeader() re.NotEmpty(leader) - waitLeader(re, innerCli.GetServiceDiscovery(), cluster.GetServer(leader).GetConfig().ClientUrls) + waitLeader(re, innerCli.GetServiceDiscovery(), cluster.GetServer(leader)) // Check TS won't fall back after leader changed. testutil.Eventually(re, func() bool { @@ -955,10 +960,10 @@ func setupCli(re *require.Assertions, ctx context.Context, endpoints []string, o return cli } -func waitLeader(re *require.Assertions, cli pd.ServiceDiscovery, leader string) { +func waitLeader(re *require.Assertions, cli pd.ServiceDiscovery, leader *tests.TestServer) { testutil.Eventually(re, func() bool { cli.ScheduleCheckMemberChanged() - return cli.GetServingAddr() == leader + return cli.GetServingURL() == leader.GetConfig().ClientUrls && leader.GetAddr() == cli.GetServingURL() }) } @@ -1853,7 +1858,7 @@ func (suite *clientTestSuite) TestMemberUpdateBackOff() { re.True(ok) leader := cluster.GetLeader() - waitLeader(re, innerCli.GetServiceDiscovery(), cluster.GetServer(leader).GetConfig().ClientUrls) + waitLeader(re, innerCli.GetServiceDiscovery(), cluster.GetServer(leader)) memberID := cluster.GetServer(leader).GetLeader().GetMemberId() re.NoError(failpoint.Enable("github.com/tikv/pd/server/leaderLoopCheckAgain", fmt.Sprintf("return(\"%d\")", memberID))) diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index 64a29b39fe0..f53174d8089 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -651,6 +651,22 @@ func (suite *httpClientTestSuite) checkVersion(mode mode, client pd.Client) { re.Equal(versioninfo.PDReleaseVersion, ver) } +func (suite *httpClientTestSuite) TestStatus() { + suite.RunTestInTwoModes(suite.checkStatus) +} + +func (suite *httpClientTestSuite) checkStatus(mode mode, client pd.Client) { + re := suite.Require() + env := suite.env[mode] + + status, err := client.GetStatus(env.ctx) + re.NoError(err) + re.Equal(versioninfo.PDReleaseVersion, status.Version) + re.Equal(versioninfo.PDGitHash, status.GitHash) + re.Equal(versioninfo.PDBuildTS, status.BuildTS) + re.GreaterOrEqual(time.Now().Unix(), status.StartTimestamp) +} + func (suite *httpClientTestSuite) TestAdmin() { suite.RunTestInTwoModes(suite.checkAdmin) } @@ -717,7 +733,7 @@ func (suite *httpClientTestSuite) TestRedirectWithMetrics() { re.Equal(float64(2), out.Counter.GetValue()) c.Close() - leader := sd.GetServingAddr() + leader := sd.GetServingURL() httpClient = pd.NewHTTPClientWithRequestChecker(func(req *http.Request) error { // mock leader success. if !strings.Contains(leader, req.Host) { diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index 074988d9aba..aa7a264f5e6 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -140,7 +140,7 @@ func (suite *resourceManagerClientTestSuite) waitLeader(re *require.Assertions, re.NotNil(innerCli) testutil.Eventually(re, func() bool { innerCli.GetServiceDiscovery().ScheduleCheckMemberChanged() - return innerCli.GetServiceDiscovery().GetServingAddr() == leaderAddr + return innerCli.GetServiceDiscovery().GetServingURL() == leaderAddr }) }