Skip to content

Commit

Permalink
Add linear histogram for tests
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta committed Jul 10, 2024
1 parent f5e0b03 commit 95de873
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 20 deletions.
170 changes: 170 additions & 0 deletions client/histogram.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package pd

import (
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/pingcap/log"
"go.uber.org/zap"
)

type Histogram struct {
buckets []int
sum float64
sumSquare float64
count int
interval float64
cutoff float64
}

func NewHistogram(interval float64, bucketsCount int, cutoff float64) *Histogram {
return &Histogram{
buckets: make([]int, bucketsCount),
interval: interval,
count: 0,
cutoff: cutoff,
}
}

func (h *Histogram) Observe(value float64) {
if value >= h.cutoff {
return
}

index := int(value / h.interval)
for index >= len(h.buckets) {
h.buckets = append(h.buckets, 0)
}

h.buckets[index]++
h.count++
h.sum += value
h.sumSquare += value * value
}

func (h *Histogram) GetPercentile(p float64) float64 {
if h.count == 0 {
return 0
}
limit := float64(h.count) * p
result := 0.
for i := 0; i < len(h.buckets); i += 1 {
samplesInBucket := float64(h.buckets[i])
if samplesInBucket >= limit {
result += limit / samplesInBucket * h.interval
break
}
result += h.interval
limit -= samplesInBucket
}
return result
}

func (h *Histogram) GetAvg() float64 {
return h.sum / float64(h.count)
}

func (h *Histogram) String() string {
sb := &strings.Builder{}
_, err := fmt.Fprintf(sb, "{ count: %v, sum: %v, sum_square: %v, interval: %v, buckets.len: %v, buckets: [", h.count, h.sum, h.sumSquare, h.interval, len(h.buckets))
if err != nil {
panic("unreachable")
}

if len(h.buckets) > 0 {
put := func(value, count int) {
if count == 1 {
_, err = fmt.Fprintf(sb, "%v;", value)
} else {
_, err = fmt.Fprintf(sb, "%v,%v;", value, count)
}
if err != nil {
panic("unreachable")
}
}

lastValue := h.buckets[0]
lastValueCount := 1

for i := 1; i < len(h.buckets); i++ {
if h.buckets[i] == lastValue {
lastValueCount++
continue
}

put(lastValue, lastValueCount)
lastValue = h.buckets[i]
lastValueCount = 1
}

put(lastValue, lastValueCount)
}

_, err = sb.WriteString("] }")
if err != nil {
panic("unreachable")
}

return sb.String()
}

func (h *Histogram) Clear() {
h.sum = 0
h.sumSquare = 0
h.count = 0
for i := 0; i < len(h.buckets); i++ {
h.buckets[i] = 0
}
}

type AutoDumpHistogram struct {
name string
mainHistogram *Histogram
backHistogram *Histogram
accumulated *Histogram
isDumping atomic.Bool
lastDumpHistogramTime time.Time
dumpInterval time.Duration
}

func NewAutoDumpingHistogram(name string, interval float64, bucketsCount int, cutoff float64, dumpInterval time.Duration) *AutoDumpHistogram {
return &AutoDumpHistogram{
name: name,
mainHistogram: NewHistogram(interval, bucketsCount, cutoff),
backHistogram: NewHistogram(interval, bucketsCount, cutoff),
accumulated: NewHistogram(interval, bucketsCount, cutoff),
lastDumpHistogramTime: time.Now(),
dumpInterval: dumpInterval,
}
}

func (h *AutoDumpHistogram) Observe(value float64, now time.Time) {
// Not thread-safe.
h.mainHistogram.Observe(value)
if now.Sub(h.lastDumpHistogramTime) >= h.dumpInterval && !h.isDumping.Load() {
h.isDumping.Store(true)
h.mainHistogram, h.backHistogram = h.backHistogram, h.mainHistogram
h.lastDumpHistogramTime = now
go h.dump(now)
}
}

func (h *AutoDumpHistogram) dump(now time.Time) {
defer h.isDumping.Store(false)

h.accumulated.sum += h.backHistogram.sum
h.accumulated.sumSquare += h.backHistogram.sumSquare
h.accumulated.count += h.backHistogram.count
for i := 0; i < len(h.accumulated.buckets) && i < len(h.backHistogram.buckets); i++ {
h.accumulated.buckets[i] += h.backHistogram.buckets[i]
}
if len(h.backHistogram.buckets) > len(h.accumulated.buckets) {
h.accumulated.buckets = append(h.accumulated.buckets, h.backHistogram.buckets[len(h.accumulated.buckets):]...)
}

log.Info("dumping histogram", zap.String("name", h.name), zap.Time("time", now), zap.Stringer("histogram", h.accumulated))

h.backHistogram.Clear()
}
22 changes: 15 additions & 7 deletions client/tso_batch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func newTSOBatchController(tsoRequestCh chan *tsoRequest, maxBatchSize int) *tso

// fetchPendingRequests will start a new round of the batch collecting from the channel.
// It returns true if everything goes well, otherwise false which means we should stop the service.
func (tbc *tsoBatchController) fetchPendingRequests(ctx context.Context, maxBatchWaitInterval time.Duration) error {
func (tbc *tsoBatchController) fetchPendingRequests(ctx context.Context, maxBatchWaitInterval time.Duration, beforeReceiveDurationHist *AutoDumpHistogram) error {
var firstRequest *tsoRequest
select {
case <-ctx.Done():
Expand All @@ -59,14 +59,14 @@ func (tbc *tsoBatchController) fetchPendingRequests(ctx context.Context, maxBatc
// Start to batch when the first TSO request arrives.
tbc.batchStartTime = time.Now()
tbc.collectedRequestCount = 0
tbc.pushRequest(firstRequest)
tbc.pushRequest(firstRequest, beforeReceiveDurationHist, tbc.batchStartTime)

// This loop is for trying best to collect more requests, so we use `tbc.maxBatchSize` here.
fetchPendingRequestsLoop:
for tbc.collectedRequestCount < tbc.maxBatchSize {
select {
case tsoReq := <-tbc.tsoRequestCh:
tbc.pushRequest(tsoReq)
tbc.pushRequest(tsoReq, beforeReceiveDurationHist, tbc.batchStartTime)
case <-ctx.Done():
return ctx.Err()
default:
Expand All @@ -89,7 +89,7 @@ fetchPendingRequestsLoop:
for tbc.collectedRequestCount < tbc.bestBatchSize {
select {
case tsoReq := <-tbc.tsoRequestCh:
tbc.pushRequest(tsoReq)
tbc.pushRequest(tsoReq, beforeReceiveDurationHist, time.Now())
case <-ctx.Done():
return ctx.Err()
case <-after.C:
Expand All @@ -104,7 +104,7 @@ fetchPendingRequestsLoop:
for tbc.collectedRequestCount < tbc.maxBatchSize {
select {
case tsoReq := <-tbc.tsoRequestCh:
tbc.pushRequest(tsoReq)
tbc.pushRequest(tsoReq, beforeReceiveDurationHist, time.Now())
case <-ctx.Done():
return ctx.Err()
default:
Expand All @@ -114,9 +114,10 @@ fetchPendingRequestsLoop:
return nil
}

func (tbc *tsoBatchController) pushRequest(tsoReq *tsoRequest) {
func (tbc *tsoBatchController) pushRequest(tsoReq *tsoRequest, beforeReceiveDurationHist *AutoDumpHistogram, now time.Time) {
tbc.collectedRequests[tbc.collectedRequestCount] = tsoReq
tbc.collectedRequestCount++
beforeReceiveDurationHist.Observe(now.Sub(tsoReq.start).Seconds(), now)
}

func (tbc *tsoBatchController) getCollectedRequests() []*tsoRequest {
Expand All @@ -135,14 +136,21 @@ func (tbc *tsoBatchController) adjustBestBatchSize() {
tbc.bestBatchSize++
}
}

func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical int64, suffixBits uint32, err error) {
tbc.finishCollectedRequestsWithStatFunc(physical, firstLogical, suffixBits, err, nil)
}

func (tbc *tsoBatchController) finishCollectedRequestsWithStatFunc(physical, firstLogical int64, suffixBits uint32, err error, statFunc func(latency time.Duration, now time.Time)) {
now := time.Now()
for i := 0; i < tbc.collectedRequestCount; i++ {
tsoReq := tbc.collectedRequests[i]
// Retrieve the request context before the request is done to trace without race.
requestCtx := tsoReq.requestCtx
tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits)
tsoReq.tryDone(err)
if statFunc != nil {
statFunc(now.Sub(tsoReq.start), now)
}
trace.StartRegion(requestCtx, "pdclient.tsoReqDequeue").End()
}
// Prevent the finished requests from being processed again.
Expand Down
37 changes: 27 additions & 10 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"fmt"
"math/rand"
"runtime/trace"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -63,6 +65,8 @@ type tsoInfo struct {
logical int64
}

var tsoDispatcherIDAlloc atomic.Int32

type tsoServiceProvider interface {
getOption() *option
getServiceDiscovery() ServiceDiscovery
Expand All @@ -82,6 +86,11 @@ type tsoDispatcher struct {
lastTSOInfo *tsoInfo

updateConnectionCtxsCh chan struct{}

tsoCallDurationHist *AutoDumpHistogram
beforeHandleDurationHist *AutoDumpHistogram

statFunc func(time.Duration, time.Time)
}

func newTSODispatcher(
Expand All @@ -90,6 +99,7 @@ func newTSODispatcher(
maxBatchSize int,
provider tsoServiceProvider,
) *tsoDispatcher {
id := strconv.Itoa(int(tsoDispatcherIDAlloc.Add(1)))
dispatcherCtx, dispatcherCancel := context.WithCancel(ctx)
tsoBatchController := newTSOBatchController(
make(chan *tsoRequest, maxBatchSize*2),
Expand All @@ -102,15 +112,18 @@ func newTSODispatcher(
)
})
td := &tsoDispatcher{
ctx: dispatcherCtx,
cancel: dispatcherCancel,
dc: dc,
provider: provider,
connectionCtxs: &sync.Map{},
batchController: tsoBatchController,
tsDeadlineCh: make(chan *deadline, 1),
updateConnectionCtxsCh: make(chan struct{}, 1),
ctx: dispatcherCtx,
cancel: dispatcherCancel,
dc: dc,
provider: provider,
connectionCtxs: &sync.Map{},
batchController: tsoBatchController,
tsDeadlineCh: make(chan *deadline, 1),
updateConnectionCtxsCh: make(chan struct{}, 1),
tsoCallDurationHist: NewAutoDumpingHistogram("tsoCallDurationHist-"+id, 2e-5, 2000, 1, time.Minute),
beforeHandleDurationHist: NewAutoDumpingHistogram("beforeHandleDurationHist-"+id, 2e-5, 2000, 1, time.Minute),
}
td.statFunc = td.observeLatency
go td.watchTSDeadline()
return td
}
Expand Down Expand Up @@ -203,7 +216,7 @@ tsoBatchLoop:
maxBatchWaitInterval := option.getMaxTSOBatchWaitInterval()
// Once the TSO requests are collected, must make sure they could be finished or revoked eventually,
// otherwise the upper caller may get blocked on waiting for the results.
if err = batchController.fetchPendingRequests(ctx, maxBatchWaitInterval); err != nil {
if err = batchController.fetchPendingRequests(ctx, maxBatchWaitInterval, td.beforeHandleDurationHist); err != nil {
// Finish the collected requests if the fetch failed.
batchController.finishCollectedRequests(0, 0, 0, errors.WithStack(err))
if err == context.Canceled {
Expand Down Expand Up @@ -392,6 +405,10 @@ func chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext
return connectionCtx
}

func (td *tsoDispatcher) observeLatency(latency time.Duration, now time.Time) {
td.tsoCallDurationHist.Observe(latency.Seconds(), now)
}

func (td *tsoDispatcher) processRequests(
stream *tsoStream, dcLocation string, tbc *tsoBatchController,
) error {
Expand Down Expand Up @@ -440,7 +457,7 @@ func (td *tsoDispatcher) processRequests(
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits)
td.compareAndSwapTS(curTSOInfo, firstLogical)
tbc.finishCollectedRequests(physical, firstLogical, suffixBits, nil)
tbc.finishCollectedRequestsWithStatFunc(physical, firstLogical, suffixBits, nil, td.statFunc)
return nil
}

Expand Down
Loading

0 comments on commit 95de873

Please sign in to comment.