diff --git a/CHANGELOG.md b/CHANGELOG.md index e22b09d4f..5175f0fbd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Returned `topicwriter.ErrQueueLimitExceed`, accidental removed at `v3.81.0` + ## v3.81.3 * Fixed tracing details check for some metrics diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index 3ab1dad4c..d0e182a05 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -36,7 +36,8 @@ var ( errNonZeroSeqNo = xerrors.Wrap(errors.New("ydb: non zero seqno for auto set seqno mode")) //nolint:lll errNonZeroCreatedAt = xerrors.Wrap(errors.New("ydb: non zero Message.CreatedAt and set auto fill created at option")) //nolint:lll errNoAllowedCodecs = xerrors.Wrap(errors.New("ydb: no allowed codecs for write to topic")) - errLargeMessage = xerrors.Wrap(errors.New("ydb: message uncompressed size more, then limit")) //nolint:lll + errLargeMessage = xerrors.Wrap(errors.New("ydb: message uncompressed size more, then limit")) //nolint:lll + PublicErrQueueIsFull = xerrors.Wrap(errors.New("ydb: queue is full")) PublicErrMessagesPutToInternalQueueBeforeError = xerrors.Wrap(errors.New("ydb: the messages was put to internal buffer before the error happened. It mean about the messages can be delivered to the server")) //nolint:lll errDiffetentTransactions = xerrors.Wrap(errors.New("ydb: internal writer has messages from different trasactions. It is internal logic error, write issue please: https://github.com/ydb-platform/ydb-go-sdk/issues/new?assignees=&labels=bug&projects=&template=01_BUG_REPORT.md&title=bug%3A+")) //nolint:lll @@ -223,16 +224,18 @@ func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage) semaphoreWeight := int64(len(messages)) if semaphoreWeight > int64(w.cfg.MaxQueueLen) { return xerrors.WithStackTrace(fmt.Errorf( - "ydb: add more messages, then max queue limit. max queue: %v, try to add: %v", + "ydb: add more messages, then max queue limit. max queue: %v, try to add: %v: %w", w.cfg.MaxQueueLen, semaphoreWeight, + PublicErrQueueIsFull, )) } if err := w.semaphore.Acquire(ctx, semaphoreWeight); err != nil { return xerrors.WithStackTrace( - fmt.Errorf("ydb: add new messages exceed max queue size limit. Add count: %v, max size: %v", + fmt.Errorf("ydb: add new messages exceed max queue size limit. Add count: %v, max size: %v: %w", semaphoreWeight, w.cfg.MaxQueueLen, + PublicErrQueueIsFull, )) } defer func() { diff --git a/topic/topicwriter/topicwriter.go b/topic/topicwriter/topicwriter.go index 96d39e9c9..63ba723a6 100644 --- a/topic/topicwriter/topicwriter.go +++ b/topic/topicwriter/topicwriter.go @@ -10,7 +10,10 @@ type ( Message = topicwriterinternal.PublicMessage ) -var ErrMessagesPutToInternalQueueBeforeError = topicwriterinternal.PublicErrMessagesPutToInternalQueueBeforeError +var ( + ErrQueueLimitExceed = topicwriterinternal.PublicErrQueueIsFull + ErrMessagesPutToInternalQueueBeforeError = topicwriterinternal.PublicErrMessagesPutToInternalQueueBeforeError +) // Writer represent write session to topic // It handles connection problems, reconnect to server when need and resend buffered messages