Skip to content

Commit

Permalink
Merge branch 'master' into fix-test23
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Aug 9, 2024
2 parents 76fab87 + 92adb24 commit 83ffa10
Show file tree
Hide file tree
Showing 37 changed files with 218 additions and 133 deletions.
25 changes: 25 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ func (c *client) setup() error {
return nil
}

// Close closes the client.
func (c *client) Close() {
c.cancel()
c.wg.Wait()
Expand Down Expand Up @@ -802,6 +803,7 @@ func (c *client) UpdateOption(option DynamicOption, value any) error {
return nil
}

// GetAllMembers gets the members Info from PD.
func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
start := time.Now()
defer func() { cmdDurationGetAllMembers.Observe(time.Since(start).Seconds()) }()
Expand Down Expand Up @@ -848,10 +850,12 @@ func (c *client) getRegionAPIClientAndContext(ctx context.Context, allowFollower
return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower)
}

// GetTSAsync implements the TSOClient interface.
func (c *client) GetTSAsync(ctx context.Context) TSFuture {
return c.GetLocalTSAsync(ctx, globalDCLocation)
}

// GetLocalTSAsync implements the TSOClient interface.
func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFuture {
defer trace.StartRegion(ctx, "pdclient.GetLocalTSAsync").End()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
Expand Down Expand Up @@ -902,16 +906,19 @@ func (c *client) dispatchTSORequestWithRetry(ctx context.Context, dcLocation str
return req
}

// GetTS implements the TSOClient interface.
func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err error) {
resp := c.GetTSAsync(ctx)
return resp.Wait()
}

// GetLocalTS implements the TSOClient interface.
func (c *client) GetLocalTS(ctx context.Context, dcLocation string) (physical int64, logical int64, err error) {
resp := c.GetLocalTSAsync(ctx, dcLocation)
return resp.Wait()
}

// GetMinTS implements the TSOClient interface.
func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) {
// Handle compatibility issue in case of PD/API server doesn't support GetMinTS API.
serviceMode := c.getServiceMode()
Expand Down Expand Up @@ -975,6 +982,7 @@ func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
return r
}

// GetRegionFromMember implements the RPCClient interface.
func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, _ ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -1013,6 +1021,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs
return handleRegionResponse(resp), nil
}

// GetRegion implements the RPCClient interface.
func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -1051,6 +1060,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt
return handleRegionResponse(resp), nil
}

// GetPrevRegion implements the RPCClient interface.
func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -1089,6 +1099,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio
return handleRegionResponse(resp), nil
}

// GetRegionByID implements the RPCClient interface.
func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -1127,6 +1138,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get
return handleRegionResponse(resp), nil
}

// ScanRegions implements the RPCClient interface.
func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -1176,6 +1188,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
return handleRegionsResponse(resp), nil
}

// BatchScanRegions implements the RPCClient interface.
func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit int, opts ...GetRegionOption) ([]*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.BatchScanRegions", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -1274,6 +1287,7 @@ func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*Region {
return regions
}

// GetStore implements the RPCClient interface.
func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetStore", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -1312,6 +1326,7 @@ func handleStoreResponse(resp *pdpb.GetStoreResponse) (*metapb.Store, error) {
return store, nil
}

// GetAllStores implements the RPCClient interface.
func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*metapb.Store, error) {
// Applies options
options := &GetStoreOp{}
Expand Down Expand Up @@ -1345,6 +1360,7 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m
return resp.GetStores(), nil
}

// UpdateGCSafePoint implements the RPCClient interface.
func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.UpdateGCSafePoint", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -1406,6 +1422,7 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string,
return resp.GetMinSafePoint(), nil
}

// ScatterRegion implements the RPCClient interface.
func (c *client) ScatterRegion(ctx context.Context, regionID uint64) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScatterRegion", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -1440,6 +1457,7 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g
return nil
}

// ScatterRegions implements the RPCClient interface.
func (c *client) ScatterRegions(ctx context.Context, regionsID []uint64, opts ...RegionsOption) (*pdpb.ScatterRegionResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScatterRegions", opentracing.ChildOf(span.Context()))
Expand All @@ -1448,6 +1466,7 @@ func (c *client) ScatterRegions(ctx context.Context, regionsID []uint64, opts ..
return c.scatterRegionsWithOptions(ctx, regionsID, opts...)
}

// SplitAndScatterRegions implements the RPCClient interface.
func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.SplitAndScatterRegions", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -1476,6 +1495,7 @@ func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte,
return protoClient.SplitAndScatterRegions(ctx, req)
}

// GetOperator implements the RPCClient interface.
func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetOperator", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -1575,6 +1595,7 @@ func trimHTTPPrefix(str string) string {
return str
}

// LoadGlobalConfig implements the RPCClient interface.
func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
Expand Down Expand Up @@ -1602,6 +1623,7 @@ func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPat
return res, resp.GetRevision(), nil
}

// StoreGlobalConfig implements the RPCClient interface.
func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error {
resArr := make([]*pdpb.GlobalConfigItem, len(items))
for i, it := range items {
Expand All @@ -1620,6 +1642,7 @@ func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items
return nil
}

// WatchGlobalConfig implements the RPCClient interface.
func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revision int64) (chan []GlobalConfigItem, error) {
// TODO: Add retry mechanism
// register watch components there
Expand Down Expand Up @@ -1671,6 +1694,7 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis
return globalConfigWatcherCh, err
}

// GetExternalTimestamp implements the RPCClient interface.
func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
Expand All @@ -1691,6 +1715,7 @@ func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) {
return resp.GetTimestamp(), nil
}

// SetExternalTimestamp implements the RPCClient interface.
func (c *client) SetExternalTimestamp(ctx context.Context, timestamp uint64) error {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
Expand Down
3 changes: 3 additions & 0 deletions client/meta_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func getPrefix(key []byte) []byte {
return []byte{0}
}

// Put implements the MetaStorageClient interface.
func (c *client) Put(ctx context.Context, key, value []byte, opts ...OpOption) (*meta_storagepb.PutResponse, error) {
options := &Op{}
for _, opt := range opts {
Expand Down Expand Up @@ -139,6 +140,7 @@ func (c *client) Put(ctx context.Context, key, value []byte, opts ...OpOption) (
return resp, nil
}

// Get implements the MetaStorageClient interface.
func (c *client) Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_storagepb.GetResponse, error) {
options := &Op{}
for _, opt := range opts {
Expand Down Expand Up @@ -177,6 +179,7 @@ func (c *client) Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_s
return resp, nil
}

// Watch implements the MetaStorageClient interface.
func (c *client) Watch(ctx context.Context, key []byte, opts ...OpOption) (chan []*meta_storagepb.Event, error) {
eventCh := make(chan []*meta_storagepb.Event, 100)
options := &Op{}
Expand Down
49 changes: 38 additions & 11 deletions client/mock_pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,46 @@ func (m *mockPDServiceDiscovery) GetAllServiceClients() []ServiceClient {
return m.clients
}

func (*mockPDServiceDiscovery) GetClusterID() uint64 { return 0 }
func (*mockPDServiceDiscovery) GetKeyspaceID() uint32 { return 0 }
func (*mockPDServiceDiscovery) GetKeyspaceGroupID() uint32 { return 0 }
func (*mockPDServiceDiscovery) GetServiceURLs() []string { return nil }
// GetClusterID implements the ServiceDiscovery interface.
func (*mockPDServiceDiscovery) GetClusterID() uint64 { return 0 }

// GetKeyspaceID implements the ServiceDiscovery interface.
func (*mockPDServiceDiscovery) GetKeyspaceID() uint32 { return 0 }

// GetKeyspaceGroupID implements the ServiceDiscovery interface.
func (*mockPDServiceDiscovery) GetKeyspaceGroupID() uint32 { return 0 }

// GetServiceURLs implements the ServiceDiscovery interface.
func (*mockPDServiceDiscovery) GetServiceURLs() []string { return nil }

// GetServingEndpointClientConn implements the ServiceDiscovery interface.
func (*mockPDServiceDiscovery) GetServingEndpointClientConn() *grpc.ClientConn { return nil }
func (*mockPDServiceDiscovery) GetClientConns() *sync.Map { return nil }
func (*mockPDServiceDiscovery) GetServingURL() string { return "" }
func (*mockPDServiceDiscovery) GetBackupURLs() []string { return nil }
func (*mockPDServiceDiscovery) GetServiceClient() ServiceClient { return nil }

// GetClientConns implements the ServiceDiscovery interface.
func (*mockPDServiceDiscovery) GetClientConns() *sync.Map { return nil }

// GetServingURL implements the ServiceDiscovery interface.
func (*mockPDServiceDiscovery) GetServingURL() string { return "" }

// GetBackupURLs implements the ServiceDiscovery interface.
func (*mockPDServiceDiscovery) GetBackupURLs() []string { return nil }

// GetServiceClient implements the ServiceDiscovery interface.
func (*mockPDServiceDiscovery) GetServiceClient() ServiceClient { return nil }

// GetOrCreateGRPCConn implements the ServiceDiscovery interface.
func (*mockPDServiceDiscovery) GetOrCreateGRPCConn(string) (*grpc.ClientConn, error) {
return nil, nil
}
func (*mockPDServiceDiscovery) ScheduleCheckMemberChanged() {}
func (*mockPDServiceDiscovery) CheckMemberChanged() error { return nil }
func (*mockPDServiceDiscovery) AddServingURLSwitchedCallback(...func()) {}

// ScheduleCheckMemberChanged implements the ServiceDiscovery interface.
func (*mockPDServiceDiscovery) ScheduleCheckMemberChanged() {}

// CheckMemberChanged implements the ServiceDiscovery interface.
func (*mockPDServiceDiscovery) CheckMemberChanged() error { return nil }

// AddServingURLSwitchedCallback implements the ServiceDiscovery interface.
func (*mockPDServiceDiscovery) AddServingURLSwitchedCallback(...func()) {}

// AddServiceURLsSwitchedCallback implements the ServiceDiscovery interface.
func (*mockPDServiceDiscovery) AddServiceURLsSwitchedCallback(...func()) {}
1 change: 1 addition & 0 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ func newPDServiceDiscovery(
return pdsd
}

// Init initializes the PD service discovery.
func (c *pdServiceDiscovery) Init() error {
if c.isInitialized {
return nil
Expand Down
10 changes: 8 additions & 2 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (c *client) ListResourceGroups(ctx context.Context, ops ...GetResourceGroup
return resp.GetGroups(), nil
}

// GetResourceGroup implements the ResourceManagerClient interface.
func (c *client) GetResourceGroup(ctx context.Context, resourceGroupName string, ops ...GetResourceGroupOption) (*rmpb.ResourceGroup, error) {
cc, err := c.resourceManagerClient()
if err != nil {
Expand All @@ -133,10 +134,12 @@ func (c *client) GetResourceGroup(ctx context.Context, resourceGroupName string,
return resp.GetGroup(), nil
}

// AddResourceGroup implements the ResourceManagerClient interface.
func (c *client) AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) {
return c.putResourceGroup(ctx, metaGroup, add)
}

// ModifyResourceGroup implements the ResourceManagerClient interface.
func (c *client) ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) {
return c.putResourceGroup(ctx, metaGroup, modify)
}
Expand Down Expand Up @@ -167,6 +170,7 @@ func (c *client) putResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceG
return resp.GetBody(), nil
}

// DeleteResourceGroup implements the ResourceManagerClient interface.
func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) {
cc, err := c.resourceManagerClient()
if err != nil {
Expand All @@ -187,6 +191,7 @@ func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName stri
return resp.GetBody(), nil
}

// LoadResourceGroups implements the ResourceManagerClient interface.
func (c *client) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) {
resp, err := c.Get(ctx, GroupSettingsPathPrefixBytes, WithPrefix())
if err != nil {
Expand All @@ -206,6 +211,7 @@ func (c *client) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup,
return groups, resp.Header.Revision, nil
}

// AcquireTokenBuckets implements the ResourceManagerClient interface.
func (c *client) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) {
req := &tokenRequest{
done: make(chan error, 1),
Expand All @@ -214,7 +220,7 @@ func (c *client) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBuc
Request: request,
}
c.tokenDispatcher.tokenBatchController.tokenRequestCh <- req
grantedTokens, err := req.Wait()
grantedTokens, err := req.wait()
if err != nil {
return nil, err
}
Expand All @@ -229,7 +235,7 @@ type tokenRequest struct {
TokenBuckets []*rmpb.TokenBucketResponse
}

func (req *tokenRequest) Wait() (tokenBuckets []*rmpb.TokenBucketResponse, err error) {
func (req *tokenRequest) wait() (tokenBuckets []*rmpb.TokenBucketResponse, err error) {
select {
case err = <-req.done:
err = errors.WithStack(err)
Expand Down
4 changes: 4 additions & 0 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ type pdTSOStreamAdapter struct {
stream pdpb.PD_TsoClient
}

// Send implements the grpcTSOStreamAdapter interface.
func (s pdTSOStreamAdapter) Send(clusterID uint64, _, _ uint32, dcLocation string, count int64) error {
req := &pdpb.TsoRequest{
Header: &pdpb.RequestHeader{
Expand All @@ -127,6 +128,7 @@ func (s pdTSOStreamAdapter) Send(clusterID uint64, _, _ uint32, dcLocation strin
return s.stream.Send(req)
}

// Recv implements the grpcTSOStreamAdapter interface.
func (s pdTSOStreamAdapter) Recv() (tsoRequestResult, error) {
resp, err := s.stream.Recv()
if err != nil {
Expand All @@ -145,6 +147,7 @@ type tsoTSOStreamAdapter struct {
stream tsopb.TSO_TsoClient
}

// Send implements the grpcTSOStreamAdapter interface.
func (s tsoTSOStreamAdapter) Send(clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, count int64) error {
req := &tsopb.TsoRequest{
Header: &tsopb.RequestHeader{
Expand All @@ -158,6 +161,7 @@ func (s tsoTSOStreamAdapter) Send(clusterID uint64, keyspaceID, keyspaceGroupID
return s.stream.Send(req)
}

// Recv implements the grpcTSOStreamAdapter interface.
func (s tsoTSOStreamAdapter) Recv() (tsoRequestResult, error) {
resp, err := s.stream.Recv()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type server struct {
*scheserver.Server
}

// GetCluster returns the cluster.
func (s *server) GetCluster() sche.SchedulerCluster {
return s.Server.GetCluster()
}
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
// In order to avoid the patrolRegionScanLimit to be too big or too small, it will be limited to [128,8192].
// It takes about 10s to iterate 1,024,000 regions(with DefaultPatrolRegionInterval=10ms) where other steps are not considered.
MinPatrolRegionScanLimit = 128
// MaxPatrolScanRegionLimit is the max limit of regions to scan for a batch.
MaxPatrolScanRegionLimit = 8192
patrolRegionPartition = 1024
)
Expand Down
Loading

0 comments on commit 83ffa10

Please sign in to comment.