Skip to content

Commit

Permalink
Add a concurrency test
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta committed Aug 8, 2024
1 parent 88d9f5b commit 623d896
Showing 1 changed file with 80 additions and 9 deletions.
89 changes: 80 additions & 9 deletions client/tso_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type mockTSOStreamImpl struct {
requestCh chan struct{}
resultCh chan resultMsg
keyspaceID uint32
errorState error
}

func newMockTSOStreamImpl() *mockTSOStreamImpl {
Expand All @@ -51,10 +52,17 @@ func (s *mockTSOStreamImpl) Send(clusterID uint64, keyspaceID, keyspaceGroupID u
}

func (s *mockTSOStreamImpl) Recv() (tsoRequestResult, error) {
// This stream have ever receive an error, it returns the error forever.
if s.errorState != nil {
return tsoRequestResult{}, s.errorState
}
res := <-s.resultCh
if !res.breakStream {
<-s.requestCh
}
if res.err != nil {
s.errorState = res.err
}
return res.r, res.err
}

Expand Down Expand Up @@ -134,7 +142,7 @@ func (s *testTSOStreamSuite) getResult(ch <-chan callbackInvocation) callbackInv
}
}

func (s *testTSOStreamSuite) processRequestWithResultCh(count int64) <-chan callbackInvocation {
func (s *testTSOStreamSuite) processRequestWithResultCh(count int64) (<-chan callbackInvocation, error) {
ch := make(chan callbackInvocation, 1)
err := s.stream.processRequests(1, 2, 3, globalDCLocation, count, time.Now(), func(result tsoRequestResult, reqKeyspaceGroupID uint32, streamURL string, err error) {
if err == nil {
Expand All @@ -147,12 +155,20 @@ func (s *testTSOStreamSuite) processRequestWithResultCh(count int64) <-chan call
err: err,
}
})
if err != nil {
return nil, err
}
return ch, nil
}

func (s *testTSOStreamSuite) mustProcessRequestWithResultCh(count int64) <-chan callbackInvocation {
ch, err := s.processRequestWithResultCh(count)
s.NoError(err)
return ch
}

func (s *testTSOStreamSuite) TestTSOStreamBasic() {
ch := s.processRequestWithResultCh(1)
ch := s.mustProcessRequestWithResultCh(1)
s.noResult(ch)
s.inner.returnResult(10, 1, 1)
res := s.getResult(ch)
Expand All @@ -163,7 +179,7 @@ func (s *testTSOStreamSuite) TestTSOStreamBasic() {
s.Equal(int64(1), res.result.logical)
s.Equal(uint32(1), res.result.count)

ch = s.processRequestWithResultCh(2)
ch = s.mustProcessRequestWithResultCh(2)
s.noResult(ch)
s.inner.returnResult(20, 3, 2)
res = s.getResult(ch)
Expand All @@ -174,7 +190,7 @@ func (s *testTSOStreamSuite) TestTSOStreamBasic() {
s.Equal(int64(3), res.result.logical)
s.Equal(uint32(2), res.result.count)

ch = s.processRequestWithResultCh(3)
ch = s.mustProcessRequestWithResultCh(3)
s.noResult(ch)
s.inner.returnError(errors.New("mock rpc error"))
res = s.getResult(ch)
Expand All @@ -193,7 +209,7 @@ func (s *testTSOStreamSuite) testTSOStreamBrokenImpl(err error, pendingRequests
var resultCh []<-chan callbackInvocation

for i := 0; i < pendingRequests; i++ {
ch := s.processRequestWithResultCh(1)
ch := s.mustProcessRequestWithResultCh(1)
resultCh = append(resultCh, ch)
s.noResult(ch)
}
Expand Down Expand Up @@ -239,17 +255,17 @@ func (s *testTSOStreamSuite) TestTSOStreamCanceledWithPendingReq() {

func (s *testTSOStreamSuite) TestTSOStreamFIFO() {
var resultChs []<-chan callbackInvocation
const COUNT = 5
for i := 0; i < COUNT; i++ {
ch := s.processRequestWithResultCh(int64(i + 1))
const count = 5
for i := 0; i < count; i++ {
ch := s.mustProcessRequestWithResultCh(int64(i + 1))
resultChs = append(resultChs, ch)
}

for _, ch := range resultChs {
s.noResult(ch)
}

for i := 0; i < COUNT; i++ {
for i := 0; i < count; i++ {
s.inner.returnResult(int64((i+1)*10), int64(i), uint32(i+1))
}

Expand All @@ -261,3 +277,58 @@ func (s *testTSOStreamSuite) TestTSOStreamFIFO() {
s.Equal(uint32(i+1), res.result.count)
}
}

func (s *testTSOStreamSuite) TestTSOStreamConcurrentRunning() {
resultChCh := make(chan (<-chan callbackInvocation), 10000)
const totalCount = 10000

// Continuously start requests
go func() {
for i := 1; i <= totalCount; i++ {
// Retry loop
for {
ch, err := s.processRequestWithResultCh(int64(i))
if err != nil {
// If the capacity of the request queue is exhausted, it returns this error. As a test, we simply
// spin and retry it until it has enough space, as a coverage of the almost-full case. But note that
// this should not happen in production, in which case the caller of tsoStream should have its own
// limit of concurrent RPC requests.
s.Contains(err.Error(), "unexpected channel full")
continue
}

resultChCh <- ch
break
}
}
}()

// Continuously send results
go func() {
for i := int64(1); i <= totalCount; i++ {
s.inner.returnResult(i*10, i%(1<<18), uint32(i))
}
s.inner.breakStream(io.EOF)
}()

// Check results
for i := int64(1); i <= totalCount; i++ {
ch := <-resultChCh
res := s.getResult(ch)
s.NoError(res.err)
s.Equal(i*10, res.result.physical)
s.Equal(i%(1<<18), res.result.logical)
s.Equal(uint32(i), res.result.count)
}

// After handling all these requests, the stream is ended by an EOF error. The next request won't succeed.
// So, either the `processRequests` function returns an error or the callback is called with an error.
ch, err := s.processRequestWithResultCh(1)
if err != nil {
s.ErrorIs(err, errs.ErrClientTSOStreamClosed)
} else {
res := s.getResult(ch)
s.Error(res.err)
s.ErrorIs(res.err, errs.ErrClientTSOStreamClosed)
}
}

0 comments on commit 623d896

Please sign in to comment.