Skip to content

Commit

Permalink
Fixed topic reader and writer WaitInit hanging
Browse files Browse the repository at this point in the history
  • Loading branch information
rekby committed Mar 27, 2024
1 parent a0abd92 commit 440d5de
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Fixed topic reader and writer WaitInit hunging on unretriable connection error

## v3.61.2
* Changed default transaction control to `NoTx` for execute query through query service client

Expand Down
4 changes: 4 additions & 0 deletions internal/topic/topicreaderinternal/stream_reconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldRead
r.reconnectFromBadStream <- newReconnectRequest(oldReader, reason)
trace.TopicOnReaderReconnectRequest(r.tracer, err, true)
}(err)
} else {
_ = r.CloseWithError(ctx, err)
}

r.m.WithLock(func() {
Expand Down Expand Up @@ -356,6 +358,8 @@ func (r *readerReconnector) WaitInit(ctx context.Context) error {
return ctx.Err()
case <-r.initDoneCh:
return r.initErr
case <-r.background.Done():
return r.background.CloseReason()
}
}

Expand Down
19 changes: 19 additions & 0 deletions internal/topic/topicreaderinternal/stream_reconnector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,25 @@ func TestTopicReaderReconnectorWaitInit(t *testing.T) {

require.ErrorIs(t, err, ctx.Err())
})

t.Run("UnretriableError", func(t *testing.T) {
reconnector := &readerReconnector{
tracer: &trace.Topic{},
}
reconnector.initChannelsAndClock()

testErr := errors.New("test error")
ctx := context.Background()
reconnector.readerConnect = readerConnectFuncMock(readerConnectFuncAnswer{
callback: func(ctx context.Context) (batchedStreamReader, error) {
return nil, testErr
},
})
reconnector.start()

err := reconnector.WaitInit(ctx)
require.ErrorIs(t, err, testErr)
})
}

func TestTopicReaderReconnectorFireReconnectOnRetryableError(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions internal/topic/topicwriterinternal/writer_reconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,8 @@ func (w *WriterReconnector) WaitInit(ctx context.Context) (info InitialInfo, err
select {
case <-ctx.Done():
return InitialInfo{}, ctx.Err()
case <-w.background.Done():
return InitialInfo{}, w.background.CloseReason()
case <-w.initDoneCh:
return w.initInfo, nil
}
Expand Down
16 changes: 16 additions & 0 deletions internal/topic/topicwriterinternal/writer_reconnector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,22 @@ func TestWriterImpl_WaitInit(t *testing.T) {
w.onWriterChange(&SingleStreamWriter{})
require.True(t, isClosed(w.firstInitResponseProcessedChan))
})

t.Run("CloseWriter", func(t *testing.T) {
ctx := context.Background()
w := newTestWriterStopped(WithAutoSetSeqNo(true))

testErr := errors.New("test error")
go func() {
_ = w.close(ctx, testErr)
}()

_, err := w.WaitInit(ctx)
require.ErrorIs(t, err, testErr)

w.onWriterChange(&SingleStreamWriter{})
require.True(t, isClosed(w.firstInitResponseProcessedChan))
})
}

func TestWriterImpl_Reconnect(t *testing.T) {
Expand Down

0 comments on commit 440d5de

Please sign in to comment.