diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 563bcc9fc6e..956a7daace6 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -17,6 +17,7 @@ package pd import ( "context" "fmt" + "math" "math/rand" "runtime/trace" "sync" @@ -203,10 +204,8 @@ func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) { return true }) if batchController != nil && batchController.collectedRequestCount != 0 { - if r := recover(); r != nil { - panic(r) - } - log.Fatal("batched tso requests not cleared when exiting the tso dispatcher loop") + // If you encounter this failure, please check the stack in the logs to see if it's a panic. + log.Fatal("batched tso requests not cleared when exiting the tso dispatcher loop", zap.Any("panic", recover())) } tsoErr := errors.WithStack(errClosing) td.revokePendingRequests(tsoErr) @@ -351,14 +350,28 @@ tsoBatchLoop: // the batch, instead of the time when the first request arrives. // Here, if the elapsed time since starting collecting this batch didn't reach the expected batch time, then // continue collecting. - if td.isConcurrentRPCEnabled() && !noDelay { + if td.isConcurrentRPCEnabled() { estimatedLatency := stream.EstimatedRPCLatency() estimateTSOLatencyGauge.WithLabelValues(streamURL).Set(estimatedLatency.Seconds()) + goalBatchTime := estimatedLatency / time.Duration(td.rpcConcurrency) + + failpoint.Inject("tsoDispatcherConcurrentModeAssertDelayDuration", func(val failpoint.Value) { + if s, ok := val.(string); ok { + expected, err := time.ParseDuration(s) + if err != nil { + panic(err) + } + if math.Abs(expected.Seconds()-goalBatchTime.Seconds()) > 1e-6 { + log.Fatal("tsoDispatcher: trying to delay for unexpected duration for the batch", zap.Duration("goalBatchTime", goalBatchTime), zap.Duration("expectedBatchTime", expected)) + } + } else { + panic("invalid value for failpoint tsoDispatcherConcurrentModeAssertDelayDuration: expected string") + } + }) - totalBatchTime := estimatedLatency / time.Duration(td.rpcConcurrency) waitTimerStart := time.Now() - remainingBatchTime := totalBatchTime - waitTimerStart.Sub(currentBatchStartTime) - if remainingBatchTime > 0 { + remainingBatchTime := goalBatchTime - waitTimerStart.Sub(currentBatchStartTime) + if remainingBatchTime > 0 && !noDelay { if !batchingTimer.Stop() { select { case <-batchingTimer.C: diff --git a/client/tso_dispatcher_test.go b/client/tso_dispatcher_test.go index 2aa082a4a64..2f7a1ef6e1e 100644 --- a/client/tso_dispatcher_test.go +++ b/client/tso_dispatcher_test.go @@ -32,6 +32,7 @@ import ( type mockTSOServiceProvider struct { option *option createStream func(ctx context.Context) *tsoStream + updateConnMu sync.Mutex } func newMockTSOServiceProvider(option *option, createStream func(ctx context.Context) *tsoStream) *mockTSOServiceProvider { @@ -50,6 +51,11 @@ func (*mockTSOServiceProvider) getServiceDiscovery() ServiceDiscovery { } func (m *mockTSOServiceProvider) updateConnectionCtxs(ctx context.Context, _dc string, connectionCtxs *sync.Map) bool { + // Avoid concurrent updating in the background updating goroutine and active updating in the dispatcher loop when + // stream is missing. + m.updateConnMu.Lock() + defer m.updateConnMu.Unlock() + _, ok := connectionCtxs.Load(mockStreamURL) if ok { return true @@ -271,6 +277,45 @@ func (s *testTSODispatcherSuite) TestConcurrentRPC() { s.testStaticConcurrencyImpl(16) } +func (s *testTSODispatcherSuite) TestBatchDelaying() { + ctx := context.Background() + s.option.setTSOClientRPCConcurrency(2) + + s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay", "return")) + s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoStreamSimulateEstimatedRPCLatency", `return("12ms")`)) + defer func() { + s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay")) + s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoStreamSimulateEstimatedRPCLatency")) + }() + + // Make sure concurrency option takes effect. + req := s.sendReq(ctx) + s.streamInner.generateNext() + s.reqMustReady(req) + + // Trigger the check. + s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("6ms")`)) + defer func() { + s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration")) + }() + req = s.sendReq(ctx) + s.streamInner.generateNext() + s.reqMustReady(req) + + // Try other concurrency. + s.option.setTSOClientRPCConcurrency(3) + s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("4ms")`)) + req = s.sendReq(ctx) + s.streamInner.generateNext() + s.reqMustReady(req) + + s.option.setTSOClientRPCConcurrency(4) + s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("3ms")`)) + req = s.sendReq(ctx) + s.streamInner.generateNext() + s.reqMustReady(req) +} + func BenchmarkTSODispatcherHandleRequests(b *testing.B) { log.SetLevel(zapcore.FatalLevel)