Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
Signed-off-by: Cabinfever_B <[email protected]>
  • Loading branch information
CabinfeverB committed Mar 7, 2024
2 parents b3741b5 + e72d49b commit c3bed08
Show file tree
Hide file tree
Showing 63 changed files with 838 additions and 354 deletions.
66 changes: 33 additions & 33 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func newClientWithKeyspaceName(
return nil
}

// Create a PD service discovery with null keyspace id, then query the real id wth the keyspace name,
// Create a PD service discovery with null keyspace id, then query the real id with the keyspace name,
// finally update the keyspace id to the PD service discovery for the following interactions.
c.pdSvcDiscovery = newPDServiceDiscovery(
clientCtx, clientCancel, &c.wg, c.setServiceMode, updateKeyspaceIDCb, nullKeyspaceID, c.svrUrls, c.tlsCfg, c.option)
Expand Down Expand Up @@ -805,10 +805,10 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
}

func (c *client) getLocalTSAsyncWithRetry(ctx context.Context, dcLocation string, bo *retry.Backoffer) TSFuture {
defer trace.StartRegion(ctx, "GetLocalTSAsync").End()
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("GetLocalTSAsync", opentracing.ChildOf(span.Context()))
ctx = opentracing.ContextWithSpan(ctx, span)
defer trace.StartRegion(ctx, "pdclient.GetLocalTSAsync").End()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetLocalTSAsync", opentracing.ChildOf(span.Context()))
defer span.Finish()
}

req := tsoReqPool.Get().(*tsoRequest)
Expand Down Expand Up @@ -931,8 +931,8 @@ func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
}

func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -969,8 +969,8 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs
}

func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -1007,8 +1007,8 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt
}

func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -1045,8 +1045,8 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio
}

func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -1083,8 +1083,8 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get
}

func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -1158,8 +1158,8 @@ func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*Region {
}

func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetStore", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetStore", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -1202,8 +1202,8 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m
opt(options)
}

if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetAllStores", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetAllStores", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand All @@ -1229,8 +1229,8 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m
}

func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateGCSafePoint", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.UpdateGCSafePoint", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -1260,8 +1260,8 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6
// determine the safepoint for multiple services, it does not trigger a GC
// job. Use UpdateGCSafePoint to trigger the GC job if needed.
func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateServiceGCSafePoint", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.UpdateServiceGCSafePoint", opentracing.ChildOf(span.Context()))
defer span.Finish()
}

Expand Down Expand Up @@ -1290,8 +1290,8 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string,
}

func (c *client) ScatterRegion(ctx context.Context, regionID uint64) error {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScatterRegion", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScatterRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
return c.scatterRegionsWithGroup(ctx, regionID, "")
Expand Down Expand Up @@ -1324,16 +1324,16 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g
}

func (c *client) ScatterRegions(ctx context.Context, regionsID []uint64, opts ...RegionsOption) (*pdpb.ScatterRegionResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScatterRegions", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScatterRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
return c.scatterRegionsWithOptions(ctx, regionsID, opts...)
}

func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.SplitAndScatterRegions", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.SplitAndScatterRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand All @@ -1360,8 +1360,8 @@ func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte,
}

func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetOperator", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetOperator", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand All @@ -1383,8 +1383,8 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe

// SplitRegions split regions by given split keys
func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitRegionsResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.SplitRegions", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.SplitRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down
8 changes: 4 additions & 4 deletions client/gc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type GCClient interface {

// UpdateGCSafePointV2 update gc safe point for the given keyspace.
func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateGCSafePointV2", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.UpdateGCSafePointV2", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -63,8 +63,8 @@ func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, saf

// UpdateServiceSafePointV2 update service safe point for the given keyspace.
func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateServiceSafePointV2", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.UpdateServiceSafePointV2", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down
2 changes: 1 addition & 1 deletion client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func RegionByKey(key []byte) string {
// RegionsByKeyRange returns the path of PD HTTP API to scan regions with given start key, end key and limit parameters.
func RegionsByKeyRange(keyRange *KeyRange, limit int) string {
startKeyStr, endKeyStr := keyRange.EscapeAsUTF8Str()
return fmt.Sprintf("%s?start_key=%s&end_key=%s&limit=%d",
return fmt.Sprintf("%s?key=%s&end_key=%s&limit=%d",
regionsByKey, startKeyStr, endKeyStr, limit)
}

Expand Down
12 changes: 6 additions & 6 deletions client/keyspace_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func (c *client) keyspaceClient() keyspacepb.KeyspaceClient {

// LoadKeyspace loads and returns target keyspace's metadata.
func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.KeyspaceMeta, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("keyspaceClient.LoadKeyspace", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("keyspaceClient.LoadKeyspace", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -84,8 +84,8 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key
//
// Updated keyspace meta will be returned.
func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keyspacepb.KeyspaceState) (*keyspacepb.KeyspaceMeta, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("keyspaceClient.UpdateKeyspaceState", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("keyspaceClient.UpdateKeyspaceState", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -123,8 +123,8 @@ func (c *client) WatchKeyspaces(ctx context.Context) (chan []*keyspacepb.Keyspac

// GetAllKeyspaces get all keyspaces metadata.
func (c *client) GetAllKeyspaces(ctx context.Context, startID uint32, limit uint32) ([]*keyspacepb.KeyspaceMeta, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("keyspaceClient.GetAllKeyspaces", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("keyspaceClient.GetAllKeyspaces", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down
8 changes: 4 additions & 4 deletions client/meta_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func (c *client) Put(ctx context.Context, key, value []byte, opts ...OpOption) (
opt(options)
}

if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.Put", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.Put", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -148,8 +148,8 @@ func (c *client) Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_s
options.rangeEnd = getPrefix(key)
}

if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.Get", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.Get", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down
6 changes: 4 additions & 2 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ func (c *pdServiceDiscovery) GetServiceClient() ServiceClient {
return leaderClient
}

// GetAllServiceClients implments ServiceDiscovery
// GetAllServiceClients implements ServiceDiscovery
func (c *pdServiceDiscovery) GetAllServiceClients() []ServiceClient {
all := c.all.Load()
if all == nil {
Expand Down Expand Up @@ -893,7 +893,9 @@ func (c *pdServiceDiscovery) checkServiceModeChanged() error {
// If the method is not supported, we set it to pd mode.
// TODO: it's a hack way to solve the compatibility issue.
// we need to remove this after all maintained version supports the method.
c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE)
if c.serviceModeUpdateCb != nil {
c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE)
}
return nil
}
return err
Expand Down
45 changes: 22 additions & 23 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (c *tsoClient) dispatchRequest(ctx context.Context, dcLocation string, requ
return err
}

defer trace.StartRegion(request.requestCtx, "tsoReqEnqueue").End()
defer trace.StartRegion(request.requestCtx, "pdclient.tsoReqEnqueue").End()
select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -115,7 +115,7 @@ func (req *tsoRequest) Wait() (physical int64, logical int64, err error) {
cmdDurationTSOAsyncWait.Observe(start.Sub(req.start).Seconds())
select {
case err = <-req.done:
defer trace.StartRegion(req.requestCtx, "tsoReqDone").End()
defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqDone").End()
err = errors.WithStack(err)
defer tsoReqPool.Put(req)
if err != nil {
Expand Down Expand Up @@ -326,6 +326,14 @@ func (c *tsoClient) createTSODispatcher(dcLocation string) {
make(chan *tsoRequest, defaultMaxTSOBatchSize*2),
defaultMaxTSOBatchSize),
}
failpoint.Inject("shortDispatcherChannel", func() {
dispatcher = &tsoDispatcher{
dispatcherCancel: dispatcherCancel,
tsoBatchController: newTSOBatchController(
make(chan *tsoRequest, 1),
defaultMaxTSOBatchSize),
}
})

if _, ok := c.tsoDispatcher.LoadOrStore(dcLocation, dispatcher); !ok {
// Successfully stored the value. Start the following goroutine.
Expand Down Expand Up @@ -353,7 +361,6 @@ func (c *tsoClient) handleDispatcher(
cancel context.CancelFunc
// addr -> connectionContext
connectionCtxs sync.Map
opts []opentracing.StartSpanOption
)
defer func() {
log.Info("[tso] exit tso dispatcher", zap.String("dc-location", dc))
Expand Down Expand Up @@ -430,7 +437,7 @@ tsoBatchLoop:
} else {
log.Error("[tso] fetch pending tso requests error",
zap.String("dc-location", dc),
errs.ZapError(errs.ErrClientGetTSO, err))
zap.Error(errs.ErrClientGetTSO.FastGenByArgs(err.Error())))
}
return
}
Expand Down Expand Up @@ -502,8 +509,7 @@ tsoBatchLoop:
return
case tsDeadlineCh.(chan *deadline) <- dl:
}
opts = extractSpanReference(tbc, opts[:0])
err = c.processRequests(stream, dc, tbc, opts)
err = c.processRequests(stream, dc, tbc)
close(done)
// If error happens during tso stream handling, reset stream and run the next trial.
if err != nil {
Expand All @@ -516,7 +522,7 @@ tsoBatchLoop:
log.Error("[tso] getTS error after processing requests",
zap.String("dc-location", dc),
zap.String("stream-addr", streamAddr),
errs.ZapError(errs.ErrClientGetTSO, err))
zap.Error(errs.ErrClientGetTSO.FastGenByArgs(err.Error())))
// Set `stream` to nil and remove this stream from the `connectionCtxs` due to error.
connectionCtxs.Delete(streamAddr)
cancel()
Expand Down Expand Up @@ -761,24 +767,17 @@ func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc s
return nil
}

func extractSpanReference(tbc *tsoBatchController, opts []opentracing.StartSpanOption) []opentracing.StartSpanOption {
for _, req := range tbc.getCollectedRequests() {
if span := opentracing.SpanFromContext(req.requestCtx); span != nil {
opts = append(opts, opentracing.ChildOf(span.Context()))
}
}
return opts
}

func (c *tsoClient) processRequests(
stream tsoStream, dcLocation string, tbc *tsoBatchController, opts []opentracing.StartSpanOption,
stream tsoStream, dcLocation string, tbc *tsoBatchController,
) error {
if len(opts) > 0 {
span := opentracing.StartSpan("pdclient.processRequests", opts...)
defer span.Finish()
}

requests := tbc.getCollectedRequests()
for _, req := range requests {
defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqSend").End()
if span := opentracing.SpanFromContext(req.requestCtx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.processRequests", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
}
count := int64(len(requests))
reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID()
respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests(
Expand Down Expand Up @@ -861,7 +860,7 @@ func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical
continue
}
}
tr := trace.StartRegion(requests[i].requestCtx, "tsoReqDequeue")
tr := trace.StartRegion(requests[i].requestCtx, "pdclient.tsoReqDequeue")
requests[i].done <- err
tr.End()
}
Expand Down
Loading

0 comments on commit c3bed08

Please sign in to comment.