diff --git a/internal/topic/topiclistenerinternal/stream_listener.go b/internal/topic/topiclistenerinternal/stream_listener.go index 04e99add5..3627f94a4 100644 --- a/internal/topic/topiclistenerinternal/stream_listener.go +++ b/internal/topic/topiclistenerinternal/stream_listener.go @@ -21,6 +21,10 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) +var ( + errTopicListenerStreamClosed = xerrors.Wrap(errors.New("ydb: the topic listener stream closed already")) +) + type streamListener struct { cfg *StreamListenerConfig @@ -36,6 +40,8 @@ type streamListener struct { hasNewMessagesToSend empty.Chan syncCommitter *topicreadercommon.Committer + closing atomic.Bool + m xsync.Mutex messagesToSend []rawtopicreader.ClientMessage } @@ -56,7 +62,7 @@ func newStreamListener( res.initVars(sessionIDCounter) if err := res.initStream(connectionCtx, client); err != nil { - res.closeWithTimeout(connectionCtx, err) + res.goClose(connectionCtx, err) return nil, err } @@ -75,17 +81,21 @@ func newStreamListener( } func (l *streamListener) Close(ctx context.Context, reason error) error { - var resErrors []error - - // should be first for prevent race between main close process and error handling in streams - if err := l.background.Close(ctx, reason); err != nil { - resErrors = append(resErrors, err) + if !l.closing.CompareAndSwap(false, true) { + return errTopicListenerClosed } + var resErrors []error + + // should be first because background wait stop of steams if l.stream != nil { l.streamClose(reason) } + if err := l.background.Close(ctx, reason); err != nil { + resErrors = append(resErrors, err) + } + if err := l.syncCommitter.Close(ctx, reason); err != nil { resErrors = append(resErrors, err) } @@ -110,10 +120,12 @@ func (l *streamListener) Close(ctx context.Context, reason error) error { return errors.Join(resErrors...) } -func (l *streamListener) closeWithTimeout(ctx context.Context, reason error) { +func (l *streamListener) goClose(ctx context.Context, reason error) { ctx, cancel := context.WithTimeout(xcontext.ValueOnly(ctx), time.Second) l.streamClose(reason) - _ = l.background.Close(ctx, reason) + go func() { + _ = l.background.Close(ctx, reason) + }() cancel() } @@ -146,7 +158,7 @@ func (l *streamListener) initStream(ctx context.Context, client TopicClient) err err := xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( "ydb: topic listener stream init timeout: %w", ctx.Err(), ))) - l.closeWithTimeout(ctx, err) + l.goClose(ctx, err) l.streamClose(err) case <-initDone: // pass @@ -217,7 +229,7 @@ func (l *streamListener) sendMessagesLoop(ctx context.Context) { for _, m := range messages { if err := l.stream.Send(m); err != nil { - l.closeWithTimeout(ctx, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( + l.goClose(ctx, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( "ydb: failed send message by grpc to topic reader stream from listener: %w", err, )))) @@ -237,7 +249,7 @@ func (l *streamListener) receiveMessagesLoop(ctx context.Context) { mess, err := l.stream.Recv() if err != nil { - l.closeWithTimeout(ctx, xerrors.WithStackTrace( + l.goClose(ctx, xerrors.WithStackTrace( fmt.Errorf("ydb: failed read message from the stream in the topic reader listener: %w", err), )) @@ -264,7 +276,7 @@ func (l *streamListener) onReceiveServerMessage(ctx context.Context, mess rawtop // todo log } if err != nil { - l.closeWithTimeout(ctx, err) + l.goClose(ctx, err) } } diff --git a/internal/topic/topiclistenerinternal/topic_listener_reconnector.go b/internal/topic/topiclistenerinternal/topic_listener_reconnector.go index 9a9afc5d5..23604c725 100644 --- a/internal/topic/topiclistenerinternal/topic_listener_reconnector.go +++ b/internal/topic/topiclistenerinternal/topic_listener_reconnector.go @@ -10,7 +10,10 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/empty" ) -var ErrUserCloseTopic = errors.New("ydb: user closed topic listener") +var ( + ErrUserCloseTopic = errors.New("ydb: user closed topic listener") + errTopicListenerClosed = errors.New("ydb: the topic listener already closed") +) type TopicListenerReconnector struct { streamConfig *StreamListenerConfig @@ -22,6 +25,7 @@ type TopicListenerReconnector struct { connectionResult error connectionCompleted empty.Chan connectionIDCounter atomic.Int64 + closing atomic.Bool m sync.Mutex streamListener *streamListener @@ -45,6 +49,9 @@ func NewTopicListenerReconnector( } func (lr *TopicListenerReconnector) Close(ctx context.Context, reason error) error { + if !lr.closing.CompareAndSwap(false, true) { + return errTopicListenerClosed + } var closeErrors []error err := lr.background.Close(ctx, reason) closeErrors = append(closeErrors, err)