Skip to content

Commit

Permalink
Merge branch 'tikv:master' into test-etcd-panic
Browse files Browse the repository at this point in the history
  • Loading branch information
lhy1024 authored Sep 26, 2024
2 parents 55552e1 + 76dc560 commit 12da40d
Show file tree
Hide file tree
Showing 35 changed files with 1,397 additions and 256 deletions.
6 changes: 6 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,12 @@ func (c *client) UpdateOption(option DynamicOption, value any) error {
return errors.New("[pd] invalid value type for EnableFollowerHandle option, it should be bool")
}
c.option.setEnableFollowerHandle(enable)
case TSOClientRPCConcurrency:
value, ok := value.(int)
if !ok {
return errors.New("[pd] invalid value type for TSOClientRPCConcurrency option, it should be int")
}
c.option.setTSOClientRPCConcurrency(value)
default:
return errors.New("[pd] unsupported client option")
}
Expand Down
3 changes: 2 additions & 1 deletion client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const (
RegionLabelRulesByIDs = "/pd/api/v1/config/region-label/rules/ids"
// Scheduler
Schedulers = "/pd/api/v1/schedulers"
scatterRangeScheduler = "/pd/api/v1/schedulers/scatter-range-"
scatterRangeScheduler = "/pd/api/v1/schedulers/scatter-range-scheduler-"
// Admin
ResetTS = "/pd/api/v1/admin/reset-ts"
BaseAllocID = "/pd/api/v1/admin/base-alloc-id"
Expand Down Expand Up @@ -183,6 +183,7 @@ func SchedulerByName(name string) string {
}

// ScatterRangeSchedulerWithName returns the scatter range scheduler API with name parameter.
// It is used in https://github.com/pingcap/tidb/blob/2a3352c45dd0f8dd5102adb92879bbfa964e7f5f/pkg/server/handler/tikvhandler/tikv_handler.go#L1252.
func ScatterRangeSchedulerWithName(name string) string {
return fmt.Sprintf("%s%s", scatterRangeScheduler, name)
}
Expand Down
10 changes: 10 additions & 0 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var (
tsoBatchSendLatency prometheus.Histogram
requestForwarded *prometheus.GaugeVec
ongoingRequestCountGauge *prometheus.GaugeVec
estimateTSOLatencyGauge *prometheus.GaugeVec
)

func initMetrics(constLabels prometheus.Labels) {
Expand Down Expand Up @@ -127,6 +128,14 @@ func initMetrics(constLabels prometheus.Labels) {
Help: "Current count of ongoing batch tso requests",
ConstLabels: constLabels,
}, []string{"stream"})
estimateTSOLatencyGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd_client",
Subsystem: "request",
Name: "estimate_tso_latency",
Help: "Estimated latency of an RTT of getting TSO",
ConstLabels: constLabels,
}, []string{"stream"})
}

var (
Expand Down Expand Up @@ -236,4 +245,5 @@ func registerMetrics() {
prometheus.MustRegister(tsoBatchSize)
prometheus.MustRegister(tsoBatchSendLatency)
prometheus.MustRegister(requestForwarded)
prometheus.MustRegister(estimateTSOLatencyGauge)
}
15 changes: 15 additions & 0 deletions client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
defaultMaxTSOBatchWaitInterval time.Duration = 0
defaultEnableTSOFollowerProxy = false
defaultEnableFollowerHandle = false
defaultTSOClientRPCConcurrency = 1
)

// DynamicOption is used to distinguish the dynamic option type.
Expand All @@ -43,6 +44,8 @@ const (
EnableTSOFollowerProxy
// EnableFollowerHandle is the follower handle option.
EnableFollowerHandle
// TSOClientRPCConcurrency controls the amount of ongoing TSO RPC requests at the same time in a single TSO client.
TSOClientRPCConcurrency

dynamicOptionCount
)
Expand Down Expand Up @@ -77,6 +80,7 @@ func newOption() *option {
co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval)
co.dynamicOptions[EnableTSOFollowerProxy].Store(defaultEnableTSOFollowerProxy)
co.dynamicOptions[EnableFollowerHandle].Store(defaultEnableFollowerHandle)
co.dynamicOptions[TSOClientRPCConcurrency].Store(defaultTSOClientRPCConcurrency)
return co
}

Expand Down Expand Up @@ -127,3 +131,14 @@ func (o *option) setEnableTSOFollowerProxy(enable bool) {
func (o *option) getEnableTSOFollowerProxy() bool {
return o.dynamicOptions[EnableTSOFollowerProxy].Load().(bool)
}

func (o *option) setTSOClientRPCConcurrency(value int) {
old := o.getTSOClientRPCConcurrency()
if value != old {
o.dynamicOptions[TSOClientRPCConcurrency].Store(value)
}
}

func (o *option) getTSOClientRPCConcurrency() int {
return o.dynamicOptions[TSOClientRPCConcurrency].Load().(int)
}
42 changes: 42 additions & 0 deletions client/tso_batch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ func (tbc *tsoBatchController) fetchPendingRequests(ctx context.Context, tsoRequ
// TODO: `tbc.collectedRequestCount` should never be non-empty here. Consider do assertion here.
tbc.collectedRequestCount = 0
for {
// If the batch size reaches the maxBatchSize limit but the token haven't arrived yet, don't receive more
// requests, and return when token is ready.
if tbc.collectedRequestCount >= tbc.maxBatchSize && !tokenAcquired {
select {
case <-ctx.Done():
return ctx.Err()
case <-tokenCh:
return nil
}
}

select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -146,6 +157,37 @@ fetchPendingRequestsLoop:
return nil
}

// fetchRequestsWithTimer tries to fetch requests until the given timer ticks. The caller must set the timer properly
// before calling this function.
func (tbc *tsoBatchController) fetchRequestsWithTimer(ctx context.Context, tsoRequestCh <-chan *tsoRequest, timer *time.Timer) error {
batchingLoop:
for tbc.collectedRequestCount < tbc.maxBatchSize {
select {
case <-ctx.Done():
return ctx.Err()
case req := <-tsoRequestCh:
tbc.pushRequest(req)
case <-timer.C:
break batchingLoop
}
}

// Try to collect more requests in non-blocking way.
nonWaitingBatchLoop:
for tbc.collectedRequestCount < tbc.maxBatchSize {
select {
case <-ctx.Done():
return ctx.Err()
case req := <-tsoRequestCh:
tbc.pushRequest(req)
default:
break nonWaitingBatchLoop
}
}

return nil
}

func (tbc *tsoBatchController) pushRequest(tsoReq *tsoRequest) {
tbc.collectedRequests[tbc.collectedRequestCount] = tsoReq
tbc.collectedRequestCount++
Expand Down
Loading

0 comments on commit 12da40d

Please sign in to comment.