Skip to content

Commit

Permalink
Merge branch 'master' into name8
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Sep 24, 2024
2 parents 7f14343 + ade5a93 commit 942495b
Show file tree
Hide file tree
Showing 121 changed files with 3,070 additions and 1,457 deletions.
4 changes: 1 addition & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,7 @@ CLEAN_UT_BINARY := find . -name '*.test.bin'| xargs rm -f

clean-test:
# Cleaning test tmp...
rm -rf /tmp/test_pd*
rm -rf /tmp/pd-tests*
rm -rf /tmp/test_etcd*
rm -rf /tmp/pd_tests*
rm -f $(REAL_CLUSTER_TEST_PATH)/playground.log
go clean -testcache
@$(CLEAN_UT_BINARY)
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7
github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7 h1:V9XS3FQ/P6u+kFaoSyY5DBswIA774BMpIOLDBMrpxKc=
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31 h1:6BY+3T6Hqpw9UZ/D7Om/xB+Xik3NkkYxBV6qCzUdUvU=
github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
24 changes: 17 additions & 7 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ func initAndRegisterMetrics(constLabels prometheus.Labels) {
}

var (
cmdDuration *prometheus.HistogramVec
cmdFailedDuration *prometheus.HistogramVec
requestDuration *prometheus.HistogramVec
tsoBestBatchSize prometheus.Histogram
tsoBatchSize prometheus.Histogram
tsoBatchSendLatency prometheus.Histogram
requestForwarded *prometheus.GaugeVec
cmdDuration *prometheus.HistogramVec
cmdFailedDuration *prometheus.HistogramVec
requestDuration *prometheus.HistogramVec
tsoBestBatchSize prometheus.Histogram
tsoBatchSize prometheus.Histogram
tsoBatchSendLatency prometheus.Histogram
requestForwarded *prometheus.GaugeVec
ongoingRequestCountGauge *prometheus.GaugeVec
)

func initMetrics(constLabels prometheus.Labels) {
Expand Down Expand Up @@ -117,6 +118,15 @@ func initMetrics(constLabels prometheus.Labels) {
Help: "The status to indicate if the request is forwarded",
ConstLabels: constLabels,
}, []string{"host", "delegate"})

ongoingRequestCountGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd_client",
Subsystem: "request",
Name: "ongoing_requests_count",
Help: "Current count of ongoing batch tso requests",
ConstLabels: constLabels,
}, []string{"stream"})
}

var (
Expand Down
93 changes: 55 additions & 38 deletions client/tso_batch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,53 +19,85 @@ import (
"runtime/trace"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
)

type tsoBatchController struct {
maxBatchSize int
// bestBatchSize is a dynamic size that changed based on the current batch effect.
bestBatchSize int

tsoRequestCh chan *tsoRequest
collectedRequests []*tsoRequest
collectedRequestCount int

batchStartTime time.Time
// The time after getting the first request and the token, and before performing extra batching.
extraBatchingStartTime time.Time
}

func newTSOBatchController(tsoRequestCh chan *tsoRequest, maxBatchSize int) *tsoBatchController {
func newTSOBatchController(maxBatchSize int) *tsoBatchController {
return &tsoBatchController{
maxBatchSize: maxBatchSize,
bestBatchSize: 8, /* Starting from a low value is necessary because we need to make sure it will be converged to (current_batch_size - 4) */
tsoRequestCh: tsoRequestCh,
collectedRequests: make([]*tsoRequest, maxBatchSize+1),
collectedRequestCount: 0,
}
}

// fetchPendingRequests will start a new round of the batch collecting from the channel.
// It returns true if everything goes well, otherwise false which means we should stop the service.
func (tbc *tsoBatchController) fetchPendingRequests(ctx context.Context, maxBatchWaitInterval time.Duration) error {
var firstRequest *tsoRequest
select {
case <-ctx.Done():
return ctx.Err()
case firstRequest = <-tbc.tsoRequestCh:
}
// Start to batch when the first TSO request arrives.
tbc.batchStartTime = time.Now()
// It returns nil error if everything goes well, otherwise a non-nil error which means we should stop the service.
// It's guaranteed that if this function failed after collecting some requests, then these requests will be cancelled
// when the function returns, so the caller don't need to clear them manually.
func (tbc *tsoBatchController) fetchPendingRequests(ctx context.Context, tsoRequestCh <-chan *tsoRequest, tokenCh chan struct{}, maxBatchWaitInterval time.Duration) (errRet error) {
var tokenAcquired bool
defer func() {
if errRet != nil {
// Something went wrong when collecting a batch of requests. Release the token and cancel collected requests
// if any.
if tokenAcquired {
tokenCh <- struct{}{}
}
tbc.finishCollectedRequests(0, 0, 0, invalidStreamID, errRet)
}
}()

// Wait until BOTH the first request and the token have arrived.
// TODO: `tbc.collectedRequestCount` should never be non-empty here. Consider do assertion here.
tbc.collectedRequestCount = 0
tbc.pushRequest(firstRequest)
for {
select {
case <-ctx.Done():
return ctx.Err()
case req := <-tsoRequestCh:
// Start to batch when the first TSO request arrives.
tbc.pushRequest(req)
// A request arrives but the token is not ready yet. Continue waiting, and also allowing collecting the next
// request if it arrives.
continue
case <-tokenCh:
tokenAcquired = true
}

// The token is ready. If the first request didn't arrive, wait for it.
if tbc.collectedRequestCount == 0 {
select {
case <-ctx.Done():
return ctx.Err()
case firstRequest := <-tsoRequestCh:
tbc.pushRequest(firstRequest)
}
}

// Both token and the first request have arrived.
break
}

tbc.extraBatchingStartTime = time.Now()

// This loop is for trying best to collect more requests, so we use `tbc.maxBatchSize` here.
fetchPendingRequestsLoop:
for tbc.collectedRequestCount < tbc.maxBatchSize {
select {
case tsoReq := <-tbc.tsoRequestCh:
case tsoReq := <-tsoRequestCh:
tbc.pushRequest(tsoReq)
case <-ctx.Done():
return ctx.Err()
Expand All @@ -88,7 +120,7 @@ fetchPendingRequestsLoop:
defer after.Stop()
for tbc.collectedRequestCount < tbc.bestBatchSize {
select {
case tsoReq := <-tbc.tsoRequestCh:
case tsoReq := <-tsoRequestCh:
tbc.pushRequest(tsoReq)
case <-ctx.Done():
return ctx.Err()
Expand All @@ -103,7 +135,7 @@ fetchPendingRequestsLoop:
// we can adjust the `tbc.bestBatchSize` dynamically later.
for tbc.collectedRequestCount < tbc.maxBatchSize {
select {
case tsoReq := <-tbc.tsoRequestCh:
case tsoReq := <-tsoRequestCh:
tbc.pushRequest(tsoReq)
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -136,31 +168,16 @@ func (tbc *tsoBatchController) adjustBestBatchSize() {
}
}

func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical int64, suffixBits uint32, err error) {
func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical int64, suffixBits uint32, streamID string, err error) {
for i := 0; i < tbc.collectedRequestCount; i++ {
tsoReq := tbc.collectedRequests[i]
// Retrieve the request context before the request is done to trace without race.
requestCtx := tsoReq.requestCtx
tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits)
tsoReq.streamID = streamID
tsoReq.tryDone(err)
trace.StartRegion(requestCtx, "pdclient.tsoReqDequeue").End()
}
// Prevent the finished requests from being processed again.
tbc.collectedRequestCount = 0
}

func (tbc *tsoBatchController) revokePendingRequests(err error) {
for i := 0; i < len(tbc.tsoRequestCh); i++ {
req := <-tbc.tsoRequestCh
req.tryDone(err)
}
}

func (tbc *tsoBatchController) clear() {
log.Info("[pd] clear the tso batch controller",
zap.Int("max-batch-size", tbc.maxBatchSize), zap.Int("best-batch-size", tbc.bestBatchSize),
zap.Int("collected-request-count", tbc.collectedRequestCount), zap.Int("pending-request-count", len(tbc.tsoRequestCh)))
tsoErr := errors.WithStack(errClosing)
tbc.finishCollectedRequests(0, 0, 0, tsoErr)
tbc.revokePendingRequests(tsoErr)
}
1 change: 1 addition & 0 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (c *tsoClient) getTSORequest(ctx context.Context, dcLocation string) *tsoRe
req.physical = 0
req.logical = 0
req.dcLocation = dcLocation
req.streamID = ""
return req
}

Expand Down
Loading

0 comments on commit 942495b

Please sign in to comment.