From 13b4ef11c239fe6569548da31ec5380fa44621fb Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 25 Sep 2024 23:58:29 +0300 Subject: [PATCH 1/4] fix stop order for listener --- internal/topic/topiclistenerinternal/stream_listener.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/internal/topic/topiclistenerinternal/stream_listener.go b/internal/topic/topiclistenerinternal/stream_listener.go index 81c09d49e..04e99add5 100644 --- a/internal/topic/topiclistenerinternal/stream_listener.go +++ b/internal/topic/topiclistenerinternal/stream_listener.go @@ -77,6 +77,11 @@ func newStreamListener( func (l *streamListener) Close(ctx context.Context, reason error) error { var resErrors []error + // should be first for prevent race between main close process and error handling in streams + if err := l.background.Close(ctx, reason); err != nil { + resErrors = append(resErrors, err) + } + if l.stream != nil { l.streamClose(reason) } @@ -85,10 +90,6 @@ func (l *streamListener) Close(ctx context.Context, reason error) error { resErrors = append(resErrors, err) } - if err := l.background.Close(ctx, reason); err != nil { - resErrors = append(resErrors, err) - } - for _, session := range l.sessions.GetAll() { session.Close() err := l.onStopPartitionRequest(session.Context(), &rawtopicreader.StopPartitionSessionRequest{ From f3d8a5a7ce09b09a16ec2bbdf3e9e343f8339384 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 25 Sep 2024 23:59:17 +0300 Subject: [PATCH 2/4] fix stop order for listener --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8846f366d..735d37537 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +* Fixed error on TopicListener.Close * Added error ErrMessagesPutToInternalQueueBeforeError to topic writer * Added write to topics within transactions From 792d24d3e732baa20c42ff8f7dee72a3de2d7302 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Fri, 4 Oct 2024 12:09:26 +0300 Subject: [PATCH 3/4] removed wait finish of background worker for finish same (loop wait, deadlock with timeout). reorder close steps --- .../topiclistenerinternal/stream_listener.go | 36 ++++++++++++------- .../topic_listener_reconnector.go | 9 ++++- 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/internal/topic/topiclistenerinternal/stream_listener.go b/internal/topic/topiclistenerinternal/stream_listener.go index 04e99add5..3627f94a4 100644 --- a/internal/topic/topiclistenerinternal/stream_listener.go +++ b/internal/topic/topiclistenerinternal/stream_listener.go @@ -21,6 +21,10 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) +var ( + errTopicListenerStreamClosed = xerrors.Wrap(errors.New("ydb: the topic listener stream closed already")) +) + type streamListener struct { cfg *StreamListenerConfig @@ -36,6 +40,8 @@ type streamListener struct { hasNewMessagesToSend empty.Chan syncCommitter *topicreadercommon.Committer + closing atomic.Bool + m xsync.Mutex messagesToSend []rawtopicreader.ClientMessage } @@ -56,7 +62,7 @@ func newStreamListener( res.initVars(sessionIDCounter) if err := res.initStream(connectionCtx, client); err != nil { - res.closeWithTimeout(connectionCtx, err) + res.goClose(connectionCtx, err) return nil, err } @@ -75,17 +81,21 @@ func newStreamListener( } func (l *streamListener) Close(ctx context.Context, reason error) error { - var resErrors []error - - // should be first for prevent race between main close process and error handling in streams - if err := l.background.Close(ctx, reason); err != nil { - resErrors = append(resErrors, err) + if !l.closing.CompareAndSwap(false, true) { + return errTopicListenerClosed } + var resErrors []error + + // should be first because background wait stop of steams if l.stream != nil { l.streamClose(reason) } + if err := l.background.Close(ctx, reason); err != nil { + resErrors = append(resErrors, err) + } + if err := l.syncCommitter.Close(ctx, reason); err != nil { resErrors = append(resErrors, err) } @@ -110,10 +120,12 @@ func (l *streamListener) Close(ctx context.Context, reason error) error { return errors.Join(resErrors...) } -func (l *streamListener) closeWithTimeout(ctx context.Context, reason error) { +func (l *streamListener) goClose(ctx context.Context, reason error) { ctx, cancel := context.WithTimeout(xcontext.ValueOnly(ctx), time.Second) l.streamClose(reason) - _ = l.background.Close(ctx, reason) + go func() { + _ = l.background.Close(ctx, reason) + }() cancel() } @@ -146,7 +158,7 @@ func (l *streamListener) initStream(ctx context.Context, client TopicClient) err err := xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( "ydb: topic listener stream init timeout: %w", ctx.Err(), ))) - l.closeWithTimeout(ctx, err) + l.goClose(ctx, err) l.streamClose(err) case <-initDone: // pass @@ -217,7 +229,7 @@ func (l *streamListener) sendMessagesLoop(ctx context.Context) { for _, m := range messages { if err := l.stream.Send(m); err != nil { - l.closeWithTimeout(ctx, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( + l.goClose(ctx, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( "ydb: failed send message by grpc to topic reader stream from listener: %w", err, )))) @@ -237,7 +249,7 @@ func (l *streamListener) receiveMessagesLoop(ctx context.Context) { mess, err := l.stream.Recv() if err != nil { - l.closeWithTimeout(ctx, xerrors.WithStackTrace( + l.goClose(ctx, xerrors.WithStackTrace( fmt.Errorf("ydb: failed read message from the stream in the topic reader listener: %w", err), )) @@ -264,7 +276,7 @@ func (l *streamListener) onReceiveServerMessage(ctx context.Context, mess rawtop // todo log } if err != nil { - l.closeWithTimeout(ctx, err) + l.goClose(ctx, err) } } diff --git a/internal/topic/topiclistenerinternal/topic_listener_reconnector.go b/internal/topic/topiclistenerinternal/topic_listener_reconnector.go index 9a9afc5d5..23604c725 100644 --- a/internal/topic/topiclistenerinternal/topic_listener_reconnector.go +++ b/internal/topic/topiclistenerinternal/topic_listener_reconnector.go @@ -10,7 +10,10 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/empty" ) -var ErrUserCloseTopic = errors.New("ydb: user closed topic listener") +var ( + ErrUserCloseTopic = errors.New("ydb: user closed topic listener") + errTopicListenerClosed = errors.New("ydb: the topic listener already closed") +) type TopicListenerReconnector struct { streamConfig *StreamListenerConfig @@ -22,6 +25,7 @@ type TopicListenerReconnector struct { connectionResult error connectionCompleted empty.Chan connectionIDCounter atomic.Int64 + closing atomic.Bool m sync.Mutex streamListener *streamListener @@ -45,6 +49,9 @@ func NewTopicListenerReconnector( } func (lr *TopicListenerReconnector) Close(ctx context.Context, reason error) error { + if !lr.closing.CompareAndSwap(false, true) { + return errTopicListenerClosed + } var closeErrors []error err := lr.background.Close(ctx, reason) closeErrors = append(closeErrors, err) From 896a0127672e5c3c817fe65f42d1791438336ced Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Fri, 4 Oct 2024 13:19:56 +0300 Subject: [PATCH 4/4] linter --- internal/topic/topiclistenerinternal/stream_listener.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/internal/topic/topiclistenerinternal/stream_listener.go b/internal/topic/topiclistenerinternal/stream_listener.go index 3627f94a4..0d170a527 100644 --- a/internal/topic/topiclistenerinternal/stream_listener.go +++ b/internal/topic/topiclistenerinternal/stream_listener.go @@ -21,10 +21,6 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) -var ( - errTopicListenerStreamClosed = xerrors.Wrap(errors.New("ydb: the topic listener stream closed already")) -) - type streamListener struct { cfg *StreamListenerConfig