Skip to content

Commit

Permalink
Merge pull request #1492 Returned topicwriter.ErrQueueLimitExceed, …
Browse files Browse the repository at this point in the history
…accidental removed at `v3.81.0`
  • Loading branch information
rekby authored Oct 2, 2024
2 parents dbaa3bf + 4e8254f commit 91eb73b
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 4 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Returned `topicwriter.ErrQueueLimitExceed`, accidental removed at `v3.81.0`

## v3.81.3
* Fixed tracing details check for some metrics

Expand Down
9 changes: 6 additions & 3 deletions internal/topic/topicwriterinternal/writer_reconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ var (
errNonZeroSeqNo = xerrors.Wrap(errors.New("ydb: non zero seqno for auto set seqno mode")) //nolint:lll
errNonZeroCreatedAt = xerrors.Wrap(errors.New("ydb: non zero Message.CreatedAt and set auto fill created at option")) //nolint:lll
errNoAllowedCodecs = xerrors.Wrap(errors.New("ydb: no allowed codecs for write to topic"))
errLargeMessage = xerrors.Wrap(errors.New("ydb: message uncompressed size more, then limit")) //nolint:lll
errLargeMessage = xerrors.Wrap(errors.New("ydb: message uncompressed size more, then limit")) //nolint:lll
PublicErrQueueIsFull = xerrors.Wrap(errors.New("ydb: queue is full"))
PublicErrMessagesPutToInternalQueueBeforeError = xerrors.Wrap(errors.New("ydb: the messages was put to internal buffer before the error happened. It mean about the messages can be delivered to the server")) //nolint:lll
errDiffetentTransactions = xerrors.Wrap(errors.New("ydb: internal writer has messages from different trasactions. It is internal logic error, write issue please: https://github.com/ydb-platform/ydb-go-sdk/issues/new?assignees=&labels=bug&projects=&template=01_BUG_REPORT.md&title=bug%3A+")) //nolint:lll

Expand Down Expand Up @@ -223,16 +224,18 @@ func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage)
semaphoreWeight := int64(len(messages))
if semaphoreWeight > int64(w.cfg.MaxQueueLen) {
return xerrors.WithStackTrace(fmt.Errorf(
"ydb: add more messages, then max queue limit. max queue: %v, try to add: %v",
"ydb: add more messages, then max queue limit. max queue: %v, try to add: %v: %w",
w.cfg.MaxQueueLen,
semaphoreWeight,
PublicErrQueueIsFull,
))
}
if err := w.semaphore.Acquire(ctx, semaphoreWeight); err != nil {
return xerrors.WithStackTrace(
fmt.Errorf("ydb: add new messages exceed max queue size limit. Add count: %v, max size: %v",
fmt.Errorf("ydb: add new messages exceed max queue size limit. Add count: %v, max size: %v: %w",
semaphoreWeight,
w.cfg.MaxQueueLen,
PublicErrQueueIsFull,
))
}
defer func() {
Expand Down
5 changes: 4 additions & 1 deletion topic/topicwriter/topicwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ type (
Message = topicwriterinternal.PublicMessage
)

var ErrMessagesPutToInternalQueueBeforeError = topicwriterinternal.PublicErrMessagesPutToInternalQueueBeforeError
var (
ErrQueueLimitExceed = topicwriterinternal.PublicErrQueueIsFull
ErrMessagesPutToInternalQueueBeforeError = topicwriterinternal.PublicErrMessagesPutToInternalQueueBeforeError
)

// Writer represent write session to topic
// It handles connection problems, reconnect to server when need and resend buffered messages
Expand Down

0 comments on commit 91eb73b

Please sign in to comment.