diff --git a/client/tso_stream.go b/client/tso_stream.go index 0779f7fb1b9..5576d89c0cb 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -231,6 +231,8 @@ var streamIDAlloc atomic.Int32 const invalidStreamID = "" +const maxPendingRequestsInTSOStream = 64 + func newTSOStream(ctx context.Context, serverURL string, stream grpcTSOStreamAdapter) *tsoStream { streamID := fmt.Sprintf("%s-%d", serverURL, streamIDAlloc.Add(1)) // To make error handling in `tsoDispatcher` work, the internal `cancel` and external `cancel` is better to be @@ -241,7 +243,7 @@ func newTSOStream(ctx context.Context, serverURL string, stream grpcTSOStreamAda stream: stream, streamID: streamID, - pendingRequests: make(chan batchedRequests, 64), + pendingRequests: make(chan batchedRequests, maxPendingRequestsInTSOStream), cancel: cancel, diff --git a/client/tso_stream_test.go b/client/tso_stream_test.go index 5016c3cef1f..ed857eb714d 100644 --- a/client/tso_stream_test.go +++ b/client/tso_stream_test.go @@ -455,14 +455,68 @@ func (s *testTSOStreamSuite) TestTSOStreamConcurrentRunning() { } } -//func (s *testTSOStreamSuite) TestEstimatedLatency() { -// s.inner.returnResult(100, 0, 1) -// res := s.getResult(s.mustProcessRequestWithResultCh(1)) -// s.NoError(res.err) -// s.Equal(int64(100), res.result.physical) -// s.Equal(int64(0), res.result.logical) -// s.InDelta() -//} +func (s *testTSOStreamSuite) TestEstimatedLatency() { + s.inner.returnResult(100, 0, 1) + res := s.getResult(s.mustProcessRequestWithResultCh(1)) + s.re.NoError(res.err) + s.re.Equal(int64(100), res.result.physical) + s.re.Equal(int64(0), res.result.logical) + estimation := s.stream.EstimatedRPCLatency().Seconds() + s.re.Greater(estimation, 0.0) + s.re.InDelta(0.0, estimation, 0.01) + + // For each began request, record its startTime and send it to the result returning goroutine. + reqStartTimeCh := make(chan time.Time, maxPendingRequestsInTSOStream) + // Limit concurrent requests to be less than the capacity of tsoStream.pendingRequests. + tokenCh := make(chan struct{}, maxPendingRequestsInTSOStream-1) + for i := 0; i < 40; i++ { + tokenCh <- struct{}{} + } + // Return a result after 50ms delay for each requests + const delay = time.Millisecond * 50 + // The goroutine to delay and return the result. + go func() { + allocated := int64(1) + for reqStartTime := range reqStartTimeCh { + now := time.Now() + elapsed := now.Sub(reqStartTime) + if elapsed < delay { + time.Sleep(delay - elapsed) + } + s.inner.returnResult(100, allocated, 1) + allocated++ + } + }() + + // Limit the test time within 1s + startTime := time.Now() + resCh := make(chan (<-chan callbackInvocation), 100) + // The sending goroutine + go func() { + for time.Since(startTime) < time.Second { + <-tokenCh + reqStartTimeCh <- time.Now() + r := s.mustProcessRequestWithResultCh(1) + resCh <- r + } + close(reqStartTimeCh) + close(resCh) + }() + // Check the result + index := 0 + for r := range resCh { + // The first is 1 + index++ + res := s.getResult(r) + tokenCh <- struct{}{} + s.re.NoError(res.err) + s.re.Equal(int64(100), res.result.physical) + s.re.Equal(int64(index), res.result.logical) + } + + s.re.Greater(s.stream.EstimatedRPCLatency(), time.Duration(int64(0.9*float64(delay)))) + s.re.Less(s.stream.EstimatedRPCLatency(), time.Duration(math.Floor(1.1*float64(delay)))) +} func TestRCFilter(t *testing.T) { re := require.New(t)