Skip to content

Commit

Permalink
Add tests for calculating batch delay time
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta committed Sep 24, 2024
1 parent f451595 commit 708bbe9
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 8 deletions.
29 changes: 21 additions & 8 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pd
import (
"context"
"fmt"
"math"
"math/rand"
"runtime/trace"
"sync"
Expand Down Expand Up @@ -203,10 +204,8 @@ func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) {
return true
})
if batchController != nil && batchController.collectedRequestCount != 0 {
if r := recover(); r != nil {
panic(r)
}
log.Fatal("batched tso requests not cleared when exiting the tso dispatcher loop")
// If you encounter this failure, please check the stack in the logs to see if it's a panic.
log.Fatal("batched tso requests not cleared when exiting the tso dispatcher loop", zap.Any("panic", recover()))
}
tsoErr := errors.WithStack(errClosing)
td.revokePendingRequests(tsoErr)
Expand Down Expand Up @@ -351,14 +350,28 @@ tsoBatchLoop:
// the batch, instead of the time when the first request arrives.
// Here, if the elapsed time since starting collecting this batch didn't reach the expected batch time, then
// continue collecting.
if td.isConcurrentRPCEnabled() && !noDelay {
if td.isConcurrentRPCEnabled() {
estimatedLatency := stream.EstimatedRPCLatency()
estimateTSOLatencyGauge.WithLabelValues(streamURL).Set(estimatedLatency.Seconds())
goalBatchTime := estimatedLatency / time.Duration(td.rpcConcurrency)

failpoint.Inject("tsoDispatcherConcurrentModeAssertDelayDuration", func(val failpoint.Value) {
if s, ok := val.(string); ok {
expected, err := time.ParseDuration(s)
if err != nil {
panic(err)
}
if math.Abs(expected.Seconds()-goalBatchTime.Seconds()) > 1e-6 {
log.Fatal("tsoDispatcher: trying to delay for unexpected duration for the batch", zap.Duration("goalBatchTime", goalBatchTime), zap.Duration("expectedBatchTime", expected))
}
} else {
panic("invalid value for failpoint tsoDispatcherConcurrentModeAssertDelayDuration: expected string")
}
})

totalBatchTime := estimatedLatency / time.Duration(td.rpcConcurrency)
waitTimerStart := time.Now()
remainingBatchTime := totalBatchTime - waitTimerStart.Sub(currentBatchStartTime)
if remainingBatchTime > 0 {
remainingBatchTime := goalBatchTime - waitTimerStart.Sub(currentBatchStartTime)
if remainingBatchTime > 0 && !noDelay {
if !batchingTimer.Stop() {
select {
case <-batchingTimer.C:
Expand Down
45 changes: 45 additions & 0 deletions client/tso_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
type mockTSOServiceProvider struct {
option *option
createStream func(ctx context.Context) *tsoStream
updateConnMu sync.Mutex
}

func newMockTSOServiceProvider(option *option, createStream func(ctx context.Context) *tsoStream) *mockTSOServiceProvider {
Expand All @@ -50,6 +51,11 @@ func (*mockTSOServiceProvider) getServiceDiscovery() ServiceDiscovery {
}

func (m *mockTSOServiceProvider) updateConnectionCtxs(ctx context.Context, _dc string, connectionCtxs *sync.Map) bool {
// Avoid concurrent updating in the background updating goroutine and active updating in the dispatcher loop when
// stream is missing.
m.updateConnMu.Lock()
defer m.updateConnMu.Unlock()

_, ok := connectionCtxs.Load(mockStreamURL)
if ok {
return true
Expand Down Expand Up @@ -271,6 +277,45 @@ func (s *testTSODispatcherSuite) TestConcurrentRPC() {
s.testStaticConcurrencyImpl(16)
}

func (s *testTSODispatcherSuite) TestBatchDelaying() {
ctx := context.Background()
s.option.setTSOClientRPCConcurrency(2)

s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay", "return"))
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoStreamSimulateEstimatedRPCLatency", `return("12ms")`))
defer func() {
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay"))
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoStreamSimulateEstimatedRPCLatency"))
}()

// Make sure concurrency option takes effect.
req := s.sendReq(ctx)
s.streamInner.generateNext()
s.reqMustReady(req)

// Trigger the check.
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("6ms")`))
defer func() {
s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration"))
}()
req = s.sendReq(ctx)
s.streamInner.generateNext()
s.reqMustReady(req)

// Try other concurrency.
s.option.setTSOClientRPCConcurrency(3)
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("4ms")`))
req = s.sendReq(ctx)
s.streamInner.generateNext()
s.reqMustReady(req)

s.option.setTSOClientRPCConcurrency(4)
s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("3ms")`))
req = s.sendReq(ctx)
s.streamInner.generateNext()
s.reqMustReady(req)
}

func BenchmarkTSODispatcherHandleRequests(b *testing.B) {
log.SetLevel(zapcore.FatalLevel)

Expand Down

0 comments on commit 708bbe9

Please sign in to comment.