Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: Make tsoStream receives asynchronously #8483

Merged
merged 26 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
140a7c2
Add implementation of making tsoStream asynchronous
MyonKeminta Aug 5, 2024
88d9f5b
Add basic tests for rewritten tsoStream
MyonKeminta Aug 7, 2024
623d896
Add a concurrency test
MyonKeminta Aug 8, 2024
86d0f42
Fix some of CI failures
MyonKeminta Aug 8, 2024
ff0f11a
Fix integration tests
MyonKeminta Aug 30, 2024
ef47b9d
Fix tso deadline
MyonKeminta Sep 2, 2024
ff1d50d
Fix stream broken handling
MyonKeminta Sep 2, 2024
9076c9d
Fix comments
MyonKeminta Sep 2, 2024
39e4e0b
try to avoid double invocation to the callback
MyonKeminta Sep 4, 2024
632781b
Fix lint
MyonKeminta Sep 4, 2024
8809c45
Address comments
MyonKeminta Sep 6, 2024
ca64ed0
Pass context outside the tsoStream (push for running CI)
MyonKeminta Sep 9, 2024
67967e4
client: Add benchmark for tsoStream and dispatcher
MyonKeminta Sep 11, 2024
1362aac
Merge branch 'm/add-benchmarks-for-tso-stream-and-dispatcher' into m/…
MyonKeminta Sep 12, 2024
4699b41
Fix build
MyonKeminta Sep 12, 2024
7dbcd77
Reimplement mock receiving
MyonKeminta Sep 12, 2024
f4dc99a
Merge branch 'm/add-benchmarks-for-tso-stream-and-dispatcher' into m/…
MyonKeminta Sep 12, 2024
1f2dcd8
Fix disptacher benchmark
MyonKeminta Sep 12, 2024
123a869
Fix incorrect error generating
MyonKeminta Sep 12, 2024
2c6aaba
Merge branch 'm/add-benchmarks-for-tso-stream-and-dispatcher' into m/…
MyonKeminta Sep 12, 2024
bff9eae
Disable logs in BenchmarkTSOStreamSendRecv
MyonKeminta Sep 12, 2024
ef6e365
Fix lint
MyonKeminta Sep 12, 2024
a9f4aee
Merge branch 'm/add-benchmarks-for-tso-stream-and-dispatcher' into m/…
MyonKeminta Sep 12, 2024
388d705
Make the comments more clear; unset req.streamID when getting new tso…
MyonKeminta Sep 12, 2024
a183aca
Fix lint
MyonKeminta Sep 12, 2024
5314485
Merge branch 'master' into m/make-tso-stream-async
ti-chi-bot[bot] Sep 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ func initAndRegisterMetrics(constLabels prometheus.Labels) {
}

var (
cmdDuration *prometheus.HistogramVec
cmdFailedDuration *prometheus.HistogramVec
requestDuration *prometheus.HistogramVec
tsoBestBatchSize prometheus.Histogram
tsoBatchSize prometheus.Histogram
tsoBatchSendLatency prometheus.Histogram
requestForwarded *prometheus.GaugeVec
cmdDuration *prometheus.HistogramVec
cmdFailedDuration *prometheus.HistogramVec
requestDuration *prometheus.HistogramVec
tsoBestBatchSize prometheus.Histogram
tsoBatchSize prometheus.Histogram
tsoBatchSendLatency prometheus.Histogram
requestForwarded *prometheus.GaugeVec
ongoingRequestCountGauge *prometheus.GaugeVec
)

func initMetrics(constLabels prometheus.Labels) {
Expand Down Expand Up @@ -117,6 +118,15 @@ func initMetrics(constLabels prometheus.Labels) {
Help: "The status to indicate if the request is forwarded",
ConstLabels: constLabels,
}, []string{"host", "delegate"})

ongoingRequestCountGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd_client",
Subsystem: "request",
Name: "ongoing_requests_count",
Help: "Current count of ongoing batch tso requests",
ConstLabels: constLabels,
}, []string{"stream"})
}

var (
Expand Down
93 changes: 55 additions & 38 deletions client/tso_batch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,53 +19,85 @@
"runtime/trace"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
)

type tsoBatchController struct {
maxBatchSize int
// bestBatchSize is a dynamic size that changed based on the current batch effect.
bestBatchSize int

tsoRequestCh chan *tsoRequest
collectedRequests []*tsoRequest
collectedRequestCount int

batchStartTime time.Time
// The time after getting the first request and the token, and before performing extra batching.
extraBatchingStartTime time.Time
}

func newTSOBatchController(tsoRequestCh chan *tsoRequest, maxBatchSize int) *tsoBatchController {
func newTSOBatchController(maxBatchSize int) *tsoBatchController {
return &tsoBatchController{
maxBatchSize: maxBatchSize,
bestBatchSize: 8, /* Starting from a low value is necessary because we need to make sure it will be converged to (current_batch_size - 4) */
tsoRequestCh: tsoRequestCh,
collectedRequests: make([]*tsoRequest, maxBatchSize+1),
collectedRequestCount: 0,
}
}

// fetchPendingRequests will start a new round of the batch collecting from the channel.
// It returns true if everything goes well, otherwise false which means we should stop the service.
func (tbc *tsoBatchController) fetchPendingRequests(ctx context.Context, maxBatchWaitInterval time.Duration) error {
var firstRequest *tsoRequest
select {
case <-ctx.Done():
return ctx.Err()
case firstRequest = <-tbc.tsoRequestCh:
}
// Start to batch when the first TSO request arrives.
tbc.batchStartTime = time.Now()
// It returns nil error if everything goes well, otherwise a non-nil error which means we should stop the service.
// It's guaranteed that if this function failed after collecting some requests, then these requests will be cancelled
// when the function returns, so the caller don't need to clear them manually.
func (tbc *tsoBatchController) fetchPendingRequests(ctx context.Context, tsoRequestCh <-chan *tsoRequest, tokenCh chan struct{}, maxBatchWaitInterval time.Duration) (errRet error) {
var tokenAcquired bool
defer func() {
if errRet != nil {
// Something went wrong when collecting a batch of requests. Release the token and cancel collected requests
// if any.
if tokenAcquired {
tokenCh <- struct{}{}
}
tbc.finishCollectedRequests(0, 0, 0, invalidStreamID, errRet)
}
}()

// Wait until BOTH the first request and the token have arrived.
// TODO: `tbc.collectedRequestCount` should never be non-empty here. Consider do assertion here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also a TODO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion should be true in my understanding. Is it true? If so, I can change it to an assertion here.

The assignment statement in the next line exists before my change. If I understood it right, it didn't make much sense here, since if it's non-zero and we set it zero, it might mean that some requests will be dropped without notifying their caller and leave them waiting forever.

tbc.collectedRequestCount = 0
tbc.pushRequest(firstRequest)
for {
select {
case <-ctx.Done():
return ctx.Err()
case req := <-tsoRequestCh:
// Start to batch when the first TSO request arrives.
tbc.pushRequest(req)
// A request arrives but the token is not ready yet. Continue waiting, and also allowing collecting the next
// request if it arrives.
continue
case <-tokenCh:
tokenAcquired = true
}

// The token is ready. If the first request didn't arrive, wait for it.
if tbc.collectedRequestCount == 0 {
select {
case <-ctx.Done():
return ctx.Err()
case firstRequest := <-tsoRequestCh:
tbc.pushRequest(firstRequest)
}
}

// Both token and the first request have arrived.
break
}

tbc.extraBatchingStartTime = time.Now()

// This loop is for trying best to collect more requests, so we use `tbc.maxBatchSize` here.
fetchPendingRequestsLoop:
for tbc.collectedRequestCount < tbc.maxBatchSize {
select {
case tsoReq := <-tbc.tsoRequestCh:
case tsoReq := <-tsoRequestCh:
tbc.pushRequest(tsoReq)
case <-ctx.Done():
return ctx.Err()
Expand All @@ -88,7 +120,7 @@
defer after.Stop()
for tbc.collectedRequestCount < tbc.bestBatchSize {
select {
case tsoReq := <-tbc.tsoRequestCh:
case tsoReq := <-tsoRequestCh:

Check warning on line 123 in client/tso_batch_controller.go

View check run for this annotation

Codecov / codecov/patch

client/tso_batch_controller.go#L123

Added line #L123 was not covered by tests
tbc.pushRequest(tsoReq)
case <-ctx.Done():
return ctx.Err()
Expand All @@ -103,7 +135,7 @@
// we can adjust the `tbc.bestBatchSize` dynamically later.
for tbc.collectedRequestCount < tbc.maxBatchSize {
select {
case tsoReq := <-tbc.tsoRequestCh:
case tsoReq := <-tsoRequestCh:

Check warning on line 138 in client/tso_batch_controller.go

View check run for this annotation

Codecov / codecov/patch

client/tso_batch_controller.go#L138

Added line #L138 was not covered by tests
tbc.pushRequest(tsoReq)
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -136,31 +168,16 @@
}
}

func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical int64, suffixBits uint32, err error) {
func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical int64, suffixBits uint32, streamID string, err error) {
for i := 0; i < tbc.collectedRequestCount; i++ {
tsoReq := tbc.collectedRequests[i]
// Retrieve the request context before the request is done to trace without race.
requestCtx := tsoReq.requestCtx
tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits)
tsoReq.streamID = streamID
tsoReq.tryDone(err)
trace.StartRegion(requestCtx, "pdclient.tsoReqDequeue").End()
}
// Prevent the finished requests from being processed again.
tbc.collectedRequestCount = 0
}

func (tbc *tsoBatchController) revokePendingRequests(err error) {
for i := 0; i < len(tbc.tsoRequestCh); i++ {
req := <-tbc.tsoRequestCh
req.tryDone(err)
}
}

func (tbc *tsoBatchController) clear() {
log.Info("[pd] clear the tso batch controller",
zap.Int("max-batch-size", tbc.maxBatchSize), zap.Int("best-batch-size", tbc.bestBatchSize),
zap.Int("collected-request-count", tbc.collectedRequestCount), zap.Int("pending-request-count", len(tbc.tsoRequestCh)))
tsoErr := errors.WithStack(errClosing)
tbc.finishCollectedRequests(0, 0, 0, tsoErr)
tbc.revokePendingRequests(tsoErr)
}
1 change: 1 addition & 0 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (c *tsoClient) getTSORequest(ctx context.Context, dcLocation string) *tsoRe
req.physical = 0
req.logical = 0
req.dcLocation = dcLocation
req.streamID = ""
return req
}

Expand Down
Loading