Skip to content

Commit

Permalink
Add test TestEstimatedLatency
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta committed Sep 23, 2024
1 parent 1b022ce commit c4c9b4f
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 9 deletions.
4 changes: 3 additions & 1 deletion client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ var streamIDAlloc atomic.Int32

const invalidStreamID = "<invalid>"

const maxPendingRequestsInTSOStream = 64

func newTSOStream(ctx context.Context, serverURL string, stream grpcTSOStreamAdapter) *tsoStream {
streamID := fmt.Sprintf("%s-%d", serverURL, streamIDAlloc.Add(1))
// To make error handling in `tsoDispatcher` work, the internal `cancel` and external `cancel` is better to be
Expand All @@ -241,7 +243,7 @@ func newTSOStream(ctx context.Context, serverURL string, stream grpcTSOStreamAda
stream: stream,
streamID: streamID,

pendingRequests: make(chan batchedRequests, 64),
pendingRequests: make(chan batchedRequests, maxPendingRequestsInTSOStream),

cancel: cancel,

Expand Down
70 changes: 62 additions & 8 deletions client/tso_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,14 +455,68 @@ func (s *testTSOStreamSuite) TestTSOStreamConcurrentRunning() {
}
}

//func (s *testTSOStreamSuite) TestEstimatedLatency() {
// s.inner.returnResult(100, 0, 1)
// res := s.getResult(s.mustProcessRequestWithResultCh(1))
// s.NoError(res.err)
// s.Equal(int64(100), res.result.physical)
// s.Equal(int64(0), res.result.logical)
// s.InDelta()
//}
func (s *testTSOStreamSuite) TestEstimatedLatency() {
s.inner.returnResult(100, 0, 1)
res := s.getResult(s.mustProcessRequestWithResultCh(1))
s.re.NoError(res.err)
s.re.Equal(int64(100), res.result.physical)
s.re.Equal(int64(0), res.result.logical)
estimation := s.stream.EstimatedRPCLatency().Seconds()
s.re.Greater(estimation, 0.0)
s.re.InDelta(0.0, estimation, 0.01)

// For each began request, record its startTime and send it to the result returning goroutine.
reqStartTimeCh := make(chan time.Time, maxPendingRequestsInTSOStream)
// Limit concurrent requests to be less than the capacity of tsoStream.pendingRequests.
tokenCh := make(chan struct{}, maxPendingRequestsInTSOStream-1)
for i := 0; i < 40; i++ {
tokenCh <- struct{}{}
}
// Return a result after 50ms delay for each requests
const delay = time.Millisecond * 50
// The goroutine to delay and return the result.
go func() {
allocated := int64(1)
for reqStartTime := range reqStartTimeCh {
now := time.Now()
elapsed := now.Sub(reqStartTime)
if elapsed < delay {
time.Sleep(delay - elapsed)
}
s.inner.returnResult(100, allocated, 1)
allocated++
}
}()

// Limit the test time within 1s
startTime := time.Now()
resCh := make(chan (<-chan callbackInvocation), 100)
// The sending goroutine
go func() {
for time.Since(startTime) < time.Second {
<-tokenCh
reqStartTimeCh <- time.Now()
r := s.mustProcessRequestWithResultCh(1)
resCh <- r
}
close(reqStartTimeCh)
close(resCh)
}()
// Check the result
index := 0
for r := range resCh {
// The first is 1
index++
res := s.getResult(r)
tokenCh <- struct{}{}
s.re.NoError(res.err)
s.re.Equal(int64(100), res.result.physical)
s.re.Equal(int64(index), res.result.logical)
}

s.re.Greater(s.stream.EstimatedRPCLatency(), time.Duration(int64(0.9*float64(delay))))
s.re.Less(s.stream.EstimatedRPCLatency(), time.Duration(math.Floor(1.1*float64(delay))))
}

func TestRCFilter(t *testing.T) {
re := require.New(t)
Expand Down

0 comments on commit c4c9b4f

Please sign in to comment.