Skip to content

Commit

Permalink
Add basic tests for rewritten tsoStream
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta committed Aug 7, 2024
1 parent 140a7c2 commit 88d9f5b
Show file tree
Hide file tree
Showing 2 changed files with 307 additions and 17 deletions.
61 changes: 44 additions & 17 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,25 @@ func (s *tsoStream) processRequests(

func (s *tsoStream) recvLoop(ctx context.Context) {
var finishWithErr error
var currentReq batchedRequests
var hasReq bool

defer func() {
if r := recover(); r != nil {
log.Fatal("tsoStream.recvLoop internal panic", zap.Stack("stacktrace"), zap.Any("panicMessage", r))
}

if finishWithErr == nil {
// The loop must exit with a non-nil error (including io.EOF and context.Canceled). This should be
// unreachable code.
log.Fatal("tsoStream.recvLoop exited without error info")
}

if hasReq {
// There's an unfinished request, cancel it, otherwise it will be blocked forever.
currentReq.callback(tsoRequestResult{}, currentReq.reqKeyspaceGroupID, s.serverURL, finishWithErr)
}

s.cancel()
for !s.state.CompareAndSwap(streamStateIdle, streamStateClosing) {
switch state := s.state.Load(); state {
Expand All @@ -318,13 +335,11 @@ func (s *tsoStream) recvLoop(ctx context.Context) {
}
}

// The loop must end with an error (including context.Canceled).
if finishWithErr == nil {
log.Fatal("tsoStream recvLoop ended without error", zap.String("stream", s.streamID))
}
log.Info("tsoStream.recvLoop ended", zap.String("stream", s.streamID), zap.Error(finishWithErr))

close(s.pendingRequests)

// Cancel remaining pending requests.
for req := range s.pendingRequests {
req.callback(tsoRequestResult{}, req.reqKeyspaceGroupID, s.serverURL, finishWithErr)
}
Expand All @@ -345,42 +360,54 @@ recvLoop:

res, err := s.stream.Recv()

// Load the corresponding batchedRequests
var req batchedRequests
// Try to load the corresponding `batchedRequests`. If `Recv` is successful, there must be a request pending
// in the queue.
select {
case req = <-s.pendingRequests:
case currentReq = <-s.pendingRequests:
hasReq = true
default:
finishWithErr = errors.New("tsoStream timing order broken")
break
hasReq = false
}

durationSeconds := time.Since(req.startTime).Seconds()
durationSeconds := time.Since(currentReq.startTime).Seconds()

if err != nil {
requestFailedDurationTSO.Observe(durationSeconds)
// If a request is pending and error occurs, observe the duration it has cost.
// Note that it's also possible that the stream is broken due to network without being requested. In this
// case, `Recv` may return an error while no request is pending.
if hasReq {
requestFailedDurationTSO.Observe(durationSeconds)
}
if err == io.EOF {
finishWithErr = errs.ErrClientTSOStreamClosed
} else {
finishWithErr = errors.WithStack(err)
}
break
break recvLoop
} else if !hasReq {
finishWithErr = errors.New("tsoStream timing order broken")
break recvLoop
}

latencySeconds := durationSeconds
requestDurationTSO.Observe(latencySeconds)
tsoBatchSize.Observe(float64(res.count))

if res.count != uint32(req.count) {
if res.count != uint32(currentReq.count) {
finishWithErr = errors.WithStack(errTSOLength)
break
break recvLoop
}

req.callback(res, req.reqKeyspaceGroupID, s.serverURL, nil)
currentReq.callback(res, currentReq.reqKeyspaceGroupID, s.serverURL, nil)
// After finishing the requests, unset these variables which will be checked in the defer block.
currentReq = batchedRequests{}
hasReq = false

s.ongoingRequestCountGauge.Set(float64(s.ongoingRequests.Add(-1)))
}
}

func (s *tsoStream) Close() {
s.cancel()
// WaitForClosed blocks until the stream is closed and the inner loop exits.
func (s *tsoStream) WaitForClosed() {
s.wg.Wait()
}
263 changes: 263 additions & 0 deletions client/tso_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pd

import (
"context"
"io"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/client/errs"
)

type resultMsg struct {
r tsoRequestResult
err error
breakStream bool
}

type mockTSOStreamImpl struct {
requestCh chan struct{}
resultCh chan resultMsg
keyspaceID uint32
}

func newMockTSOStreamImpl() *mockTSOStreamImpl {
return &mockTSOStreamImpl{
requestCh: make(chan struct{}, 64),
resultCh: make(chan resultMsg, 64),
keyspaceID: 0,
}
}

func (s *mockTSOStreamImpl) Send(clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, count int64) error {
s.requestCh <- struct{}{}
return nil
}

func (s *mockTSOStreamImpl) Recv() (tsoRequestResult, error) {
res := <-s.resultCh
if !res.breakStream {
<-s.requestCh
}
return res.r, res.err
}

func (s *mockTSOStreamImpl) returnResult(physical int64, logical int64, count uint32) {
s.resultCh <- resultMsg{
r: tsoRequestResult{
physical: physical,
logical: logical,
count: count,
suffixBits: 0,
respKeyspaceGroupID: s.keyspaceID,
},
}
}

func (s *mockTSOStreamImpl) returnError(err error) {
s.resultCh <- resultMsg{
err: err,
}
}

func (s *mockTSOStreamImpl) breakStream(err error) {
s.resultCh <- resultMsg{
err: err,
breakStream: true,
}
}

func (s *mockTSOStreamImpl) stop() {
s.breakStream(io.EOF)
}

type callbackInvocation struct {
result tsoRequestResult
streamURL string
err error
}

type testTSOStreamSuite struct {
suite.Suite

inner *mockTSOStreamImpl
stream *tsoStream
}

func (s *testTSOStreamSuite) SetupTest() {
s.inner = newMockTSOStreamImpl()
s.stream = newTSOStream("mock:///", s.inner)
}

func (s *testTSOStreamSuite) TearDownTest() {
s.inner.stop()
s.stream.WaitForClosed()
s.inner = nil
s.stream = nil
}

func TestTSOStreamTestSuite(t *testing.T) {
suite.Run(t, new(testTSOStreamSuite))
}

func (s *testTSOStreamSuite) noResult(ch <-chan callbackInvocation) {
select {
case res := <-ch:
s.FailNowf("result received unexpectedly", "received result: %+v", res)
case <-time.After(time.Millisecond * 20):
}
}

func (s *testTSOStreamSuite) getResult(ch <-chan callbackInvocation) callbackInvocation {
select {
case res := <-ch:
return res
case <-time.After(time.Second * 10000):
s.FailNow("result not ready in time")
panic("result not ready in time")
}
}

func (s *testTSOStreamSuite) processRequestWithResultCh(count int64) <-chan callbackInvocation {
ch := make(chan callbackInvocation, 1)
err := s.stream.processRequests(1, 2, 3, globalDCLocation, count, time.Now(), func(result tsoRequestResult, reqKeyspaceGroupID uint32, streamURL string, err error) {
if err == nil {
s.Equal(uint32(3), reqKeyspaceGroupID)
s.Equal(uint32(0), result.suffixBits)
}
ch <- callbackInvocation{
result: result,
streamURL: streamURL,
err: err,
}
})
s.NoError(err)
return ch
}

func (s *testTSOStreamSuite) TestTSOStreamBasic() {
ch := s.processRequestWithResultCh(1)
s.noResult(ch)
s.inner.returnResult(10, 1, 1)
res := s.getResult(ch)

s.NoError(res.err)
s.Equal("mock:///", res.streamURL)
s.Equal(int64(10), res.result.physical)
s.Equal(int64(1), res.result.logical)
s.Equal(uint32(1), res.result.count)

ch = s.processRequestWithResultCh(2)
s.noResult(ch)
s.inner.returnResult(20, 3, 2)
res = s.getResult(ch)

s.NoError(res.err)
s.Equal("mock:///", res.streamURL)
s.Equal(int64(20), res.result.physical)
s.Equal(int64(3), res.result.logical)
s.Equal(uint32(2), res.result.count)

ch = s.processRequestWithResultCh(3)
s.noResult(ch)
s.inner.returnError(errors.New("mock rpc error"))
res = s.getResult(ch)
s.Error(res.err)
s.Equal("mock rpc error", res.err.Error())

// After an error from the (simulated) RPC stream, the tsoStream should be in a broken status and can't accept
// new request anymore.
err := s.stream.processRequests(1, 2, 3, globalDCLocation, 1, time.Now(), func(result tsoRequestResult, reqKeyspaceGroupID uint32, streamURL string, err error) {
panic("unreachable")
})
s.Error(err)
}

func (s *testTSOStreamSuite) testTSOStreamBrokenImpl(err error, pendingRequests int) {
var resultCh []<-chan callbackInvocation

for i := 0; i < pendingRequests; i++ {
ch := s.processRequestWithResultCh(1)
resultCh = append(resultCh, ch)
s.noResult(ch)
}

s.inner.breakStream(err)
closedCh := make(chan struct{})
go func() {
s.stream.WaitForClosed()
closedCh <- struct{}{}
}()
select {
case <-closedCh:
case <-time.After(time.Second):
s.FailNow("stream receiver loop didn't exit")
}

for _, ch := range resultCh {
res := s.getResult(ch)
s.Error(res.err)
if err == io.EOF {
s.ErrorIs(res.err, errs.ErrClientTSOStreamClosed)
} else {
s.ErrorIs(res.err, err)
}
}
}

func (s *testTSOStreamSuite) TestTSOStreamBrokenWithEOFNoPendingReq() {
s.testTSOStreamBrokenImpl(io.EOF, 0)
}

func (s *testTSOStreamSuite) TestTSOStreamCanceledNoPendingReq() {
s.testTSOStreamBrokenImpl(context.Canceled, 0)
}

func (s *testTSOStreamSuite) TestTSOStreamBrokenWithEOFWithPendingReq() {
s.testTSOStreamBrokenImpl(io.EOF, 5)
}

func (s *testTSOStreamSuite) TestTSOStreamCanceledWithPendingReq() {
s.testTSOStreamBrokenImpl(context.Canceled, 5)
}

func (s *testTSOStreamSuite) TestTSOStreamFIFO() {
var resultChs []<-chan callbackInvocation
const COUNT = 5
for i := 0; i < COUNT; i++ {
ch := s.processRequestWithResultCh(int64(i + 1))
resultChs = append(resultChs, ch)
}

for _, ch := range resultChs {
s.noResult(ch)
}

for i := 0; i < COUNT; i++ {
s.inner.returnResult(int64((i+1)*10), int64(i), uint32(i+1))
}

for i, ch := range resultChs {
res := s.getResult(ch)
s.NoError(res.err)
s.Equal(int64((i+1)*10), res.result.physical)
s.Equal(int64(i), res.result.logical)
s.Equal(uint32(i+1), res.result.count)
}
}

0 comments on commit 88d9f5b

Please sign in to comment.