Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: Support parallel TSO RPC requests on single dispatcher loop #8633

Open
wants to merge 42 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
140a7c2
Add implementation of making tsoStream asynchronous
MyonKeminta Aug 5, 2024
88d9f5b
Add basic tests for rewritten tsoStream
MyonKeminta Aug 7, 2024
623d896
Add a concurrency test
MyonKeminta Aug 8, 2024
86d0f42
Fix some of CI failures
MyonKeminta Aug 8, 2024
ff0f11a
Fix integration tests
MyonKeminta Aug 30, 2024
ef47b9d
Fix tso deadline
MyonKeminta Sep 2, 2024
ff1d50d
Fix stream broken handling
MyonKeminta Sep 2, 2024
9076c9d
Fix comments
MyonKeminta Sep 2, 2024
39e4e0b
try to avoid double invocation to the callback
MyonKeminta Sep 4, 2024
632781b
Fix lint
MyonKeminta Sep 4, 2024
8809c45
Address comments
MyonKeminta Sep 6, 2024
ca64ed0
Pass context outside the tsoStream (push for running CI)
MyonKeminta Sep 9, 2024
67967e4
client: Add benchmark for tsoStream and dispatcher
MyonKeminta Sep 11, 2024
1362aac
Merge branch 'm/add-benchmarks-for-tso-stream-and-dispatcher' into m/…
MyonKeminta Sep 12, 2024
4699b41
Fix build
MyonKeminta Sep 12, 2024
7dbcd77
Reimplement mock receiving
MyonKeminta Sep 12, 2024
f4dc99a
Merge branch 'm/add-benchmarks-for-tso-stream-and-dispatcher' into m/…
MyonKeminta Sep 12, 2024
1f2dcd8
Fix disptacher benchmark
MyonKeminta Sep 12, 2024
123a869
Fix incorrect error generating
MyonKeminta Sep 12, 2024
2c6aaba
Merge branch 'm/add-benchmarks-for-tso-stream-and-dispatcher' into m/…
MyonKeminta Sep 12, 2024
bff9eae
Disable logs in BenchmarkTSOStreamSendRecv
MyonKeminta Sep 12, 2024
ef6e365
Fix lint
MyonKeminta Sep 12, 2024
a9f4aee
Merge branch 'm/add-benchmarks-for-tso-stream-and-dispatcher' into m/…
MyonKeminta Sep 12, 2024
388d705
Make the comments more clear; unset req.streamID when getting new tso…
MyonKeminta Sep 12, 2024
a183aca
Fix lint
MyonKeminta Sep 12, 2024
4296c3f
Add latency estimation
MyonKeminta Sep 12, 2024
9b5b054
Support concurrent RPC
MyonKeminta Sep 14, 2024
c44339f
Merge commit '098b802fc' into m/support-parallel-tso-requests-in-one-…
MyonKeminta Sep 14, 2024
364101f
Merge remote-tracking branch 'upstream/master' into m/support-paralle…
MyonKeminta Sep 14, 2024
4bb33a2
Fix token not acquired when maxBatchSize is reached before token is r…
MyonKeminta Sep 14, 2024
f016265
Merge branch 'master' of https://github.com/tikv/pd into m/support-pa…
MyonKeminta Sep 23, 2024
e7590ec
fix lint
MyonKeminta Sep 23, 2024
e5f9355
Add test to the rcFilter
MyonKeminta Sep 23, 2024
1b022ce
Adapt the monotonicity check for parallel RPC
MyonKeminta Sep 23, 2024
c4c9b4f
Add test TestEstimatedLatency
MyonKeminta Sep 23, 2024
f451595
Add tests for concurrency limiting
MyonKeminta Sep 24, 2024
708bbe9
Add tests for calculating batch delay time
MyonKeminta Sep 24, 2024
4a795fe
Fix lint
MyonKeminta Sep 24, 2024
0bfe55f
Address comments; add more comments to explain checkMonotonicity
MyonKeminta Sep 25, 2024
0bfe9a6
Fix data race
MyonKeminta Sep 25, 2024
f67467e
Update estimated latency metric in tsoStream instead of tsoDispatcher
MyonKeminta Sep 25, 2024
8f45037
Merge branch 'master' into m/support-parallel-tso-requests-in-one-stream
ti-chi-bot[bot] Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,12 @@
return errors.New("[pd] invalid value type for EnableFollowerHandle option, it should be bool")
}
c.option.setEnableFollowerHandle(enable)
case TSOClientRPCConcurrency:
value, ok := value.(int)
if !ok {
return errors.New("[pd] invalid value type for TSOClientRPCConcurrency option, it should be int")

Check warning on line 803 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L800-L803

Added lines #L800 - L803 were not covered by tests
}
c.option.setTSOClientRPCConcurrency(value)

Check warning on line 805 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L805

Added line #L805 was not covered by tests
default:
return errors.New("[pd] unsupported client option")
}
Expand Down
10 changes: 10 additions & 0 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var (
tsoBatchSendLatency prometheus.Histogram
requestForwarded *prometheus.GaugeVec
ongoingRequestCountGauge *prometheus.GaugeVec
estimateTSOLatencyGauge *prometheus.GaugeVec
)

func initMetrics(constLabels prometheus.Labels) {
Expand Down Expand Up @@ -127,6 +128,14 @@ func initMetrics(constLabels prometheus.Labels) {
Help: "Current count of ongoing batch tso requests",
ConstLabels: constLabels,
}, []string{"stream"})
estimateTSOLatencyGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd_client",
Subsystem: "request",
Name: "estimate_tso_latency",
Help: "Estimated latency of an RTT of getting TSO",
ConstLabels: constLabels,
}, []string{"stream"})
}

var (
Expand Down Expand Up @@ -236,4 +245,5 @@ func registerMetrics() {
prometheus.MustRegister(tsoBatchSize)
prometheus.MustRegister(tsoBatchSendLatency)
prometheus.MustRegister(requestForwarded)
prometheus.MustRegister(estimateTSOLatencyGauge)
}
15 changes: 15 additions & 0 deletions client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
defaultMaxTSOBatchWaitInterval time.Duration = 0
defaultEnableTSOFollowerProxy = false
defaultEnableFollowerHandle = false
defaultTSOClientRPCConcurrency = 1
)

// DynamicOption is used to distinguish the dynamic option type.
Expand All @@ -43,6 +44,8 @@ const (
EnableTSOFollowerProxy
// EnableFollowerHandle is the follower handle option.
EnableFollowerHandle
// TSOClientRPCConcurrency controls the amount of ongoing TSO RPC requests at the same time in a single TSO client.
TSOClientRPCConcurrency

dynamicOptionCount
)
Expand Down Expand Up @@ -77,6 +80,7 @@ func newOption() *option {
co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval)
co.dynamicOptions[EnableTSOFollowerProxy].Store(defaultEnableTSOFollowerProxy)
co.dynamicOptions[EnableFollowerHandle].Store(defaultEnableFollowerHandle)
co.dynamicOptions[TSOClientRPCConcurrency].Store(defaultTSOClientRPCConcurrency)
return co
}

Expand Down Expand Up @@ -127,3 +131,14 @@ func (o *option) setEnableTSOFollowerProxy(enable bool) {
func (o *option) getEnableTSOFollowerProxy() bool {
return o.dynamicOptions[EnableTSOFollowerProxy].Load().(bool)
}

func (o *option) setTSOClientRPCConcurrency(value int) {
old := o.getTSOClientRPCConcurrency()
if value != old {
o.dynamicOptions[TSOClientRPCConcurrency].Store(value)
}
}

func (o *option) getTSOClientRPCConcurrency() int {
return o.dynamicOptions[TSOClientRPCConcurrency].Load().(int)
}
42 changes: 42 additions & 0 deletions client/tso_batch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@
// TODO: `tbc.collectedRequestCount` should never be non-empty here. Consider do assertion here.
tbc.collectedRequestCount = 0
for {
// If the batch size reaches the maxBatchSize limit but the token haven't arrived yet, don't receive more
// requests, and return when token is ready.
if tbc.collectedRequestCount >= tbc.maxBatchSize && !tokenAcquired {
select {
case <-ctx.Done():
return ctx.Err()
case <-tokenCh:
return nil

Check warning on line 74 in client/tso_batch_controller.go

View check run for this annotation

Codecov / codecov/patch

client/tso_batch_controller.go#L70-L74

Added lines #L70 - L74 were not covered by tests
}
}

select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -146,6 +157,37 @@
return nil
}

// fetchRequestsWithTimer tries to fetch requests until the given timer ticks. The caller must set the timer properly
// before calling this function.
func (tbc *tsoBatchController) fetchRequestsWithTimer(ctx context.Context, tsoRequestCh <-chan *tsoRequest, timer *time.Timer) error {
batchingLoop:
for tbc.collectedRequestCount < tbc.maxBatchSize {
select {
case <-ctx.Done():
return ctx.Err()
case req := <-tsoRequestCh:
tbc.pushRequest(req)
case <-timer.C:
break batchingLoop

Check warning on line 171 in client/tso_batch_controller.go

View check run for this annotation

Codecov / codecov/patch

client/tso_batch_controller.go#L162-L171

Added lines #L162 - L171 were not covered by tests
}
}

// Try to collect more requests in non-blocking way.
nonWaitingBatchLoop:
for tbc.collectedRequestCount < tbc.maxBatchSize {
select {
case <-ctx.Done():
return ctx.Err()
case req := <-tsoRequestCh:
tbc.pushRequest(req)
default:
break nonWaitingBatchLoop

Check warning on line 184 in client/tso_batch_controller.go

View check run for this annotation

Codecov / codecov/patch

client/tso_batch_controller.go#L176-L184

Added lines #L176 - L184 were not covered by tests
}
}

return nil

Check warning on line 188 in client/tso_batch_controller.go

View check run for this annotation

Codecov / codecov/patch

client/tso_batch_controller.go#L188

Added line #L188 was not covered by tests
}

func (tbc *tsoBatchController) pushRequest(tsoReq *tsoRequest) {
tbc.collectedRequests[tbc.collectedRequestCount] = tsoReq
tbc.collectedRequestCount++
Expand Down
Loading