Skip to content

Commit

Permalink
removed wait finish of background worker for finish same (loop wait, …
Browse files Browse the repository at this point in the history
…deadlock with timeout).

reorder close steps
  • Loading branch information
rekby committed Oct 4, 2024
1 parent 883bd5d commit 792d24d
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 13 deletions.
36 changes: 24 additions & 12 deletions internal/topic/topiclistenerinternal/stream_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

var (
errTopicListenerStreamClosed = xerrors.Wrap(errors.New("ydb: the topic listener stream closed already"))
)

type streamListener struct {
cfg *StreamListenerConfig

Expand All @@ -36,6 +40,8 @@ type streamListener struct {
hasNewMessagesToSend empty.Chan
syncCommitter *topicreadercommon.Committer

closing atomic.Bool

m xsync.Mutex
messagesToSend []rawtopicreader.ClientMessage
}
Expand All @@ -56,7 +62,7 @@ func newStreamListener(

res.initVars(sessionIDCounter)
if err := res.initStream(connectionCtx, client); err != nil {
res.closeWithTimeout(connectionCtx, err)
res.goClose(connectionCtx, err)

return nil, err
}
Expand All @@ -75,17 +81,21 @@ func newStreamListener(
}

func (l *streamListener) Close(ctx context.Context, reason error) error {
var resErrors []error

// should be first for prevent race between main close process and error handling in streams
if err := l.background.Close(ctx, reason); err != nil {
resErrors = append(resErrors, err)
if !l.closing.CompareAndSwap(false, true) {
return errTopicListenerClosed
}

var resErrors []error

// should be first because background wait stop of steams
if l.stream != nil {
l.streamClose(reason)
}

if err := l.background.Close(ctx, reason); err != nil {
resErrors = append(resErrors, err)
}

if err := l.syncCommitter.Close(ctx, reason); err != nil {
resErrors = append(resErrors, err)
}
Expand All @@ -110,10 +120,12 @@ func (l *streamListener) Close(ctx context.Context, reason error) error {
return errors.Join(resErrors...)
}

func (l *streamListener) closeWithTimeout(ctx context.Context, reason error) {
func (l *streamListener) goClose(ctx context.Context, reason error) {
ctx, cancel := context.WithTimeout(xcontext.ValueOnly(ctx), time.Second)
l.streamClose(reason)
_ = l.background.Close(ctx, reason)
go func() {
_ = l.background.Close(ctx, reason)
}()

cancel()
}
Expand Down Expand Up @@ -146,7 +158,7 @@ func (l *streamListener) initStream(ctx context.Context, client TopicClient) err
err := xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
"ydb: topic listener stream init timeout: %w", ctx.Err(),
)))
l.closeWithTimeout(ctx, err)
l.goClose(ctx, err)
l.streamClose(err)
case <-initDone:
// pass
Expand Down Expand Up @@ -217,7 +229,7 @@ func (l *streamListener) sendMessagesLoop(ctx context.Context) {

for _, m := range messages {
if err := l.stream.Send(m); err != nil {
l.closeWithTimeout(ctx, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
l.goClose(ctx, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
"ydb: failed send message by grpc to topic reader stream from listener: %w",
err,
))))
Expand All @@ -237,7 +249,7 @@ func (l *streamListener) receiveMessagesLoop(ctx context.Context) {

mess, err := l.stream.Recv()
if err != nil {
l.closeWithTimeout(ctx, xerrors.WithStackTrace(
l.goClose(ctx, xerrors.WithStackTrace(
fmt.Errorf("ydb: failed read message from the stream in the topic reader listener: %w", err),
))

Expand All @@ -264,7 +276,7 @@ func (l *streamListener) onReceiveServerMessage(ctx context.Context, mess rawtop
// todo log
}
if err != nil {
l.closeWithTimeout(ctx, err)
l.goClose(ctx, err)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
)

var ErrUserCloseTopic = errors.New("ydb: user closed topic listener")
var (
ErrUserCloseTopic = errors.New("ydb: user closed topic listener")
errTopicListenerClosed = errors.New("ydb: the topic listener already closed")
)

type TopicListenerReconnector struct {
streamConfig *StreamListenerConfig
Expand All @@ -22,6 +25,7 @@ type TopicListenerReconnector struct {
connectionResult error
connectionCompleted empty.Chan
connectionIDCounter atomic.Int64
closing atomic.Bool

m sync.Mutex
streamListener *streamListener
Expand All @@ -45,6 +49,9 @@ func NewTopicListenerReconnector(
}

func (lr *TopicListenerReconnector) Close(ctx context.Context, reason error) error {
if !lr.closing.CompareAndSwap(false, true) {
return errTopicListenerClosed
}
var closeErrors []error
err := lr.background.Close(ctx, reason)
closeErrors = append(closeErrors, err)
Expand Down

0 comments on commit 792d24d

Please sign in to comment.