diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 8abb022a1450..464a988be3f7 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -11,7 +11,15 @@ ### Bug fixes -- [Bug Fix] `expiresAtUtc` is `Invalid Date` in the received message when the ttl is not defined. Has been fixed in [#13543](https://github.com/Azure/azure-sdk-for-js/pull/13543) +- `expiresAtUtc` is `Invalid Date` in the received message when the ttl is not defined. Has been fixed in [#13543](https://github.com/Azure/azure-sdk-for-js/pull/13543) + +- If there are many(2047) outstanding messages that are not settled, + + - `receiveMessages` hangs forever and doesn't return for unpartitioned queues [Issue #11633](https://github.com/Azure/azure-sdk-for-js/issues/11633), + - `subscribe` doesn't receive more messages even after settling the unsettled messages for partitioned queues [Issue #15114](https://github.com/Azure/azure-sdk-for-js/issues/15114) + + [#14060](https://github.com/Azure/azure-sdk-for-js/pull/14060) fixes the problem by returning an empty array for receiveMessages. If used `subscribe`, more messages will be received after the outstanding messages are settled. + - Some of the queue properties such as "forwardTo" and "autoDeleteOnIdle" were not being set as requested through the `ServiceBusAdministrationClient.createQueue` method because of a bug w.r.t the ordering of XML properties. The issue has been fixed in [#14692](https://github.com/Azure/azure-sdk-for-js/pull/14692). - Settling messages now use the `retryOptions` passed to `ServiceBusClient`, making it more resilient against network failures. [PR#14867](https://github.com/Azure/azure-sdk-for-js/pull/14867/files) diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index 0e9ade3842ec..66b98d532190 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -20,7 +20,8 @@ import { checkAndRegisterWithAbortSignal } from "../util/utils"; import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs"; import { createAndEndProcessingSpan } from "../diagnostics/instrumentServiceBusMessage"; import { ReceiveMode } from "../models"; -import { ServiceBusError, translateServiceBusError } from "../serviceBusError"; +import { ServiceBusError, ServiceBusErrorCode, translateServiceBusError } from "../serviceBusError"; +import { incomingBufferProperties, UnsettledMessagesLimitExceededError } from "./shared"; /** * Describes the batching receiver where the user can receive a specified number of messages for @@ -195,7 +196,10 @@ type EventEmitterLike = Pick & +export type MinimalReceiver = Pick< + Receiver, + "name" | "isOpen" | "credit" | "addCredit" | "drain" | "session" +> & EventEmitterLike & { session: EventEmitterLike; } & { @@ -493,15 +497,24 @@ export class BatchingReceiverLite { reject(err); }, args.abortSignal); - logger.verbose( - `${loggingPrefix} Adding credit for receiving ${args.maxMessageCount} messages.` - ); + const { numberOfEmptySlots } = incomingBufferProperties(receiver); + if (numberOfEmptySlots === 0) { + throw new ServiceBusError( + UnsettledMessagesLimitExceededError, + "UnsettledMessagesLimitExceeded" as ServiceBusErrorCode + ); + } + const creditsToAdd = + this._receiveMode === "peekLock" + ? Math.min(args.maxMessageCount, numberOfEmptySlots) + : args.maxMessageCount; + logger.verbose(`${loggingPrefix} Adding credit for receiving ${creditsToAdd} messages.`); - // By adding credit here, we let the service know that at max we can handle `maxMessageCount` + // By adding credit here, we let the service know that at max we can handle `creditsToAdd` // number of messages concurrently. We will return the user an array of messages that can - // be of size upto maxMessageCount. Then the user needs to accordingly dispose + // be of size upto `creditsToAdd`. Then the user needs to accordingly dispose // (complete/abandon/defer/deadletter) the messages from the array. - receiver.addCredit(args.maxMessageCount); + receiver.addCredit(creditsToAdd); logger.verbose( `${loggingPrefix} Setting the wait timer for ${args.maxWaitTimeInMs} milliseconds.` diff --git a/sdk/servicebus/service-bus/src/core/messageReceiver.ts b/sdk/servicebus/service-bus/src/core/messageReceiver.ts index a02ac7b8b720..9f3ed0ad8546 100644 --- a/sdk/servicebus/service-bus/src/core/messageReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/messageReceiver.ts @@ -136,6 +136,13 @@ export abstract class MessageReceiver extends LinkEntity { */ protected _lockRenewer: LockRenewer | undefined; + /** + * Method to be called after the message is settled when receiving messages through subscribe. + * + * Currently, more credits will be added at this spot based on the maxConcurrentCalls and empty slots. + */ + public settlementNotifierForSubscribe: (() => void) | undefined; + constructor( context: ConnectionContext, entityPath: string, diff --git a/sdk/servicebus/service-bus/src/core/shared.ts b/sdk/servicebus/service-bus/src/core/shared.ts index c81309cdff94..58fcab051109 100644 --- a/sdk/servicebus/service-bus/src/core/shared.ts +++ b/sdk/servicebus/service-bus/src/core/shared.ts @@ -5,6 +5,7 @@ import { Delivery, ReceiverOptions, Source } from "rhea-promise"; import { translateServiceBusError } from "../serviceBusError"; import { receiverLogger } from "../log"; import { ReceiveMode } from "../models"; +import { Receiver } from "rhea-promise"; /** * @internal @@ -103,3 +104,38 @@ export function createReceiverOptions( return rcvrOptions; } + +/** + * @internal + */ +export const UnsettledMessagesLimitExceededError = + "Failed to fetch new messages as the limit for unsettled messages is reached. Please settle received messages using settlement methods(such as `completeMessage()`) on the receiver to receive the next message."; + +/** + * Returns the number of empty/filled slots in the Circular buffer of incoming deliveries + * based on the capacity and size of the buffer. + * + * @internal + */ +export function incomingBufferProperties( + receiver: Pick | undefined +): { + numberOfEmptySlots: number; // 2048(total) - filled + numberOfFilledSlots: number; // number of unsettled messages +} { + const incomingDeliveries = receiver?.session?.incoming?.deliveries; + let numberOfEmptySlots = 0; + if (incomingDeliveries && incomingDeliveries.capacity - 1 > incomingDeliveries.size) { + // Exmpty slots should have been `incomingDeliveries.capacity - 1 - incomingDeliveries.size`. Why -1? + // - If the number of slots is set to (capacity - size), + // the number of unsettled messages that can be held in the buffer would equal to the "capacity". + // At that limiting point, service doesn't respond to the drain request for unpartitioned queues. + // Service team is tracking the issue. + // -1 allows us to not fill up the buffer entirely, it fills up to 2047 if the capacity is 2048 + numberOfEmptySlots = incomingDeliveries.capacity - 1 - incomingDeliveries.size; + } + return { + numberOfEmptySlots, + numberOfFilledSlots: incomingDeliveries ? incomingDeliveries.size : 0 + }; +} diff --git a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts index 51e4d403183b..9673788eb5ac 100644 --- a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts @@ -10,7 +10,7 @@ import { } from "./messageReceiver"; import { ConnectionContext } from "../connectionContext"; -import { ReceiverHelper } from "./receiverHelper"; +import { StreamingReceiverHelper } from "./streamingReceiverHelper"; import { throwErrorIfConnectionClosed } from "../util/errors"; import { @@ -65,7 +65,7 @@ export class StreamingReceiver extends MessageReceiver { */ private _retryOptions: RetryOptions; - private _receiverHelper: ReceiverHelper; + private _streamingReceiverHelper: StreamingReceiverHelper; /** * Used so we can stub out retry in tests. @@ -108,7 +108,7 @@ export class StreamingReceiver extends MessageReceiver { // for the streaming receiver so long as we can receive messages then we // _are_ receiving messages - there's no in-between state like there is // with BatchingReceiver. - return this._receiverHelper.canReceiveMessages(); + return this._streamingReceiverHelper.canReceiveMessages(); } /** @@ -127,10 +127,16 @@ export class StreamingReceiver extends MessageReceiver { this._retryOptions = options?.retryOptions || {}; this._retry = retry; - this._receiverHelper = new ReceiverHelper(() => ({ - receiver: this.link, - logPrefix: this.logPrefix - })); + this._streamingReceiverHelper = new StreamingReceiverHelper( + () => ({ + receiver: this.link, + logPrefix: this.logPrefix + }), + this.receiveMode, + this.maxConcurrentCalls + ); + + this.settlementNotifierForSubscribe = () => this._streamingReceiverHelper.postProcessing(); this._onAmqpClose = async (context: EventContext) => { const receiverError = context.receiver && context.receiver.error; @@ -300,7 +306,7 @@ export class StreamingReceiver extends MessageReceiver { } return; } finally { - this._receiverHelper.addCredit(1); + this._streamingReceiverHelper.onReceive(); } // If we've made it this far, then user's message handler completed fine. Let us try @@ -358,7 +364,7 @@ export class StreamingReceiver extends MessageReceiver { } async stopReceivingMessages(): Promise { - await this._receiverHelper.suspend(); + await this._streamingReceiverHelper.suspend(); } /** @@ -433,22 +439,21 @@ export class StreamingReceiver extends MessageReceiver { // this might seem odd but in reality this entire class is like one big function call that // results in a receive(). Once we're being initialized we should consider ourselves the // "owner" of the receiver and that it's now being locked into being the actual receiver. - this._receiverHelper.resume(); + this._streamingReceiverHelper.resume(); } /** * Starts the receiver by establishing an AMQP session and an AMQP receiver link on the session. * * @param onMessage - The message handler to receive servicebus messages. - * @param onError - The error handler to receive an error that occurs while receivin messages. + * @param onError - The error handler to receive an error that occurs while receiving messages. */ subscribe(onMessage: OnMessage, onError: OnError): void { throwErrorIfConnectionClosed(this._context); this._onMessage = onMessage; this._onError = onError; - - this._receiverHelper.addCredit(this.maxConcurrentCalls); + this._streamingReceiverHelper.addInitialCredits(); } /** @@ -510,10 +515,10 @@ export class StreamingReceiver extends MessageReceiver { onError: (args) => this._onError && this._onError(args) }); - this._receiverHelper.addCredit(this.maxConcurrentCalls); logger.verbose( - `${this.logPrefix} onDetached: link has been reestablished, added ${this.maxConcurrentCalls} credits.` + `${this.logPrefix} onDetached: link has been reestablished, attempting to add credits.` ); + this._streamingReceiverHelper.addInitialCredits(); } finally { this._isDetaching = false; } diff --git a/sdk/servicebus/service-bus/src/core/receiverHelper.ts b/sdk/servicebus/service-bus/src/core/streamingReceiverHelper.ts similarity index 50% rename from sdk/servicebus/service-bus/src/core/receiverHelper.ts rename to sdk/servicebus/service-bus/src/core/streamingReceiverHelper.ts index cf21c81062da..3920c30ca5e5 100644 --- a/sdk/servicebus/service-bus/src/core/receiverHelper.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiverHelper.ts @@ -3,18 +3,25 @@ import { Receiver, ReceiverEvents } from "rhea-promise"; import { receiverLogger as logger } from "../log"; +import { ReceiveMode } from "../models"; +import { incomingBufferProperties } from "./shared"; /** * Wraps the receiver with some higher level operations for managing state * like credits, draining, etc... * + * Provides helper methods to manage the credits on the link for the + * streaming messages scenarios. + * (Used by both sessions(MessageSession) and non-sessions(StreamingReceiver)) * @internal */ -export class ReceiverHelper { +export class StreamingReceiverHelper { private _isSuspended: boolean = false; constructor( - private _getCurrentReceiver: () => { receiver: Receiver | undefined; logPrefix: string } + private _getCurrentReceiver: () => { receiver: Receiver | undefined; logPrefix: string }, + private receiveMode: ReceiveMode, + private maxConcurrentCalls: number ) {} /** @@ -26,7 +33,7 @@ export class ReceiverHelper { * @returns true if credits were added, false if there is no current receiver instance * or `stopReceivingMessages` has been called. */ - addCredit(credits: number): boolean { + private addCredit(credits: number): boolean { const { receiver, logPrefix } = this._getCurrentReceiver(); if (!this.canReceiveMessages()) { @@ -45,7 +52,7 @@ export class ReceiverHelper { } /** - * Drains the credits for the receiver and prevents the `receiverHelper.addCredit()` method from adding credits. + * Drains the credits for the receiver and prevents the `streamingReceiverHelper.addCredit()` method from adding credits. * Call `resume()` to enable the `addCredit()` method. */ async suspend(): Promise { @@ -116,4 +123,69 @@ export class ReceiverHelper { private _isValidReceiver(receiver: Receiver | undefined): receiver is Receiver { return receiver != null && receiver.isOpen(); } + + addInitialCredits() { + const { receiver, logPrefix } = this._getCurrentReceiver(); + const emptySlots = incomingBufferProperties(receiver).numberOfEmptySlots; + const creditsToAdd = + this.receiveMode === "peekLock" + ? Math.min(this.maxConcurrentCalls, emptySlots) + : this.maxConcurrentCalls; + this.addCredit(creditsToAdd); + logger.verbose( + `${logPrefix} creditManager: added ${creditsToAdd} credits (initial); total credits = ${receiver?.credit}` + ); + } + + /** + * Upon receiving a new message, this method is to be called in the streaming receiver logic to add credits to receive more messages. + * + * @internal + */ + onReceive() { + const { receiver, logPrefix } = this._getCurrentReceiver(); + if (this.receiveMode === "receiveAndDelete") { + this.addCredit(1); + logger.verbose( + `${logPrefix} creditManager: added 1 credits upon receiving a message; total credits = ${receiver?.credit}` + ); + return; + } + + const { numberOfEmptySlots } = incomingBufferProperties(receiver); + if (receiver && numberOfEmptySlots > 0) { + const possibleMaxCredits = Math.min(this.maxConcurrentCalls, numberOfEmptySlots); + if (possibleMaxCredits > receiver.credit) { + const creditsToAdd = possibleMaxCredits - receiver.credit; + this.addCredit(creditsToAdd); + logger.verbose( + `${logPrefix} creditManager: added ${creditsToAdd} credits upon receiving a message; total credits = ${receiver?.credit}` + ); + } + return; + } + } + + /** + * Meant to be called after a message is settled with the receive link. + * Replenishes the number of credits on the link based on the maxConcurrentCalls and the numberOfEmptySlots to receive more messages. + * + * @internal + */ + postProcessing() { + const { receiver, logPrefix } = this._getCurrentReceiver(); + const { numberOfEmptySlots } = incomingBufferProperties(receiver); + if (this.receiveMode === "peekLock") { + if (receiver && numberOfEmptySlots > 0) { + const possibleMaxCredits = Math.min(this.maxConcurrentCalls, numberOfEmptySlots); + if (possibleMaxCredits > receiver.credit) { + const creditsToAdd = possibleMaxCredits - receiver.credit; + this.addCredit(creditsToAdd); + logger.verbose( + `${logPrefix} creditManager: added ${creditsToAdd} credits after message settlement; total credits = ${receiver?.credit}` + ); + } + } + } + } } diff --git a/sdk/servicebus/service-bus/src/receivers/receiver.ts b/sdk/servicebus/service-bus/src/receivers/receiver.ts index e178574034da..af65c0788f94 100644 --- a/sdk/servicebus/service-bus/src/receivers/receiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/receiver.ts @@ -41,7 +41,11 @@ import "@azure/core-asynciterator-polyfill"; import { LockRenewer } from "../core/autoLockRenewer"; import { createProcessingSpan } from "../diagnostics/instrumentServiceBusMessage"; import { receiverLogger as logger } from "../log"; -import { translateServiceBusError } from "../serviceBusError"; +import { + InternalServiceBusErrorCode, + isServiceBusError, + translateServiceBusError +} from "../serviceBusError"; /** * The default time to wait for messages _after_ the first message @@ -505,7 +509,18 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver { retryOptions: this._retryOptions }; return retry(config).catch((err) => { - throw translateServiceBusError(err); + const error = translateServiceBusError(err); + if ( + isServiceBusError(error) && + (error.code as InternalServiceBusErrorCode) === "UnsettledMessagesLimitExceeded" + ) { + // To be consistent with other languages, not throwing for UnsettledMessagesLimitExceeded case. + // If the unsettled messages limit is reached, we'd just return 0 messages instead of throwing + // (until the service fixes the "draining" bug at the terminal case and/or we extend the 2048 limit in rhea to infinite like the .NET SDK) + return []; + } else { + throw error; + } }); } diff --git a/sdk/servicebus/service-bus/src/receivers/receiverCommon.ts b/sdk/servicebus/service-bus/src/receivers/receiverCommon.ts index 7879617e2541..b964621b6ae8 100644 --- a/sdk/servicebus/service-bus/src/receivers/receiverCommon.ts +++ b/sdk/servicebus/service-bus/src/receivers/receiverCommon.ts @@ -5,7 +5,7 @@ import { MessageHandlers, ProcessErrorArgs } from "../models"; import { ServiceBusReceiver } from "./receiver"; import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs"; import { receiverLogger, ServiceBusLogger } from "../log"; -import { translateServiceBusError } from "../serviceBusError"; +import { ServiceBusError, translateServiceBusError } from "../serviceBusError"; import { DeadLetterOptions, DispositionType, @@ -15,6 +15,7 @@ import { import { DispositionStatusOptions } from "../core/managementClient"; import { ConnectionContext } from "../connectionContext"; import { + delay, ErrorNameConditionMapper, retry, RetryOperationType, @@ -206,7 +207,7 @@ export function settleMessage( /** * @internal */ -export function settleMessageOperation( +export async function settleMessageOperation( message: ServiceBusMessageImpl, operation: DispositionType, context: ConnectionContext, @@ -219,49 +220,68 @@ export function settleMessageOperation( : context.getReceiverFromCache(message.delivery.link.name, message.sessionId); const associatedLinkName = receiver?.name; - let error: Error | undefined; - if (message.delivery.remote_settled) { - error = new Error(MessageAlreadySettled); - } else if ( - !isDeferredMessage && - (!receiver || !receiver.isOpen()) && - isDefined(message.sessionId) - ) { - error = translateServiceBusError({ - description: - `Failed to ${operation} the message as the AMQP link with which the message was ` + - `received is no longer alive.`, - condition: ErrorNameConditionMapper.SessionLockLostError + const settlementWithManagementLink = () => + context.getManagementClient(entityPath).updateDispositionStatus(message.lockToken!, operation, { + ...options, + associatedLinkName, + sessionId: message.sessionId }); - } - if (error) { + const logError = (error: Error) => { receiverLogger.logError( error, "[%s] An error occurred when settling a message with id '%s'", context.connectionId, message.messageId ); + }; + + let error: Error | undefined; + if (message.delivery.remote_settled) { + error = new Error(MessageAlreadySettled); + logError(error); throw error; } - // Message Settlement with managementLink - // 1. If the received message is deferred as such messages can only be settled using managementLink - // 2. If the associated receiver link is not available. This does not apply to messages from sessions as we need a lock on the session to do so. - if (isDeferredMessage || ((!receiver || !receiver.isOpen()) && !isDefined(message.sessionId))) { - return context - .getManagementClient(entityPath) - .updateDispositionStatus(message.lockToken!, operation, { - ...options, - associatedLinkName, - sessionId: message.sessionId - }) - .catch((err) => { - throw translateServiceBusError(err); - }); - } + try { + if (isDeferredMessage) { + // Message Settlement with managementLink + // 1. If the received message is deferred as such messages can only be settled using managementLink + return await settlementWithManagementLink(); + } + + if (!isDefined(receiver)) { + error = new ServiceBusError( + `Failed to ${operation} the message as the receiver is undefined.`, + "GeneralError" + ); + logError(error); + throw error; + } - return receiver!.settleMessage(message, operation, options).catch((err) => { + if (!receiver.isOpen()) { + if (!isDefined(message.sessionId)) { + // Message Settlement with managementLink + // 2. If the associated receiver link is not available. This does not apply to messages from sessions as we need a lock on the session to do so. + return await settlementWithManagementLink(); + } + if (isDefined(message.sessionId)) { + error = translateServiceBusError({ + description: + `Failed to ${operation} the message as the AMQP link with which the message was ` + + `received is no longer alive.`, + condition: ErrorNameConditionMapper.SessionLockLostError + }); + logError(error); + throw error; + } + } + + await receiver.settleMessage(message, operation, options); + // delay (setTimeout) ensures that the delivery is popped, size is decremented with respect to the settlement that was done + await delay(0); + receiver.settlementNotifierForSubscribe?.(); + } catch (err) { throw translateServiceBusError(err); - }); + } } diff --git a/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts b/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts index 3e841c7658a9..36a2f9fa0750 100644 --- a/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts @@ -41,7 +41,11 @@ import "@azure/core-asynciterator-polyfill"; import { AmqpError } from "rhea-promise"; import { createProcessingSpan } from "../diagnostics/instrumentServiceBusMessage"; import { receiverLogger as logger } from "../log"; -import { translateServiceBusError } from "../serviceBusError"; +import { + InternalServiceBusErrorCode, + isServiceBusError, + translateServiceBusError +} from "../serviceBusError"; /** *A receiver that handles sessions, including renewing the session lock. @@ -410,7 +414,18 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver abortSignal: options?.abortSignal }; return retry(config).catch((err) => { - throw translateServiceBusError(err); + const error = translateServiceBusError(err); + if ( + isServiceBusError(error) && + (error.code as InternalServiceBusErrorCode) === "UnsettledMessagesLimitExceeded" + ) { + // To be consistent with other languages, not throwing for UnsettledMessagesLimitExceeded case. + // If the unsettled messages limit is reached, we'd just return 0 messages instead of throwing + // (until the service fixes the "draining" bug at the terminal case and/or we extend the 2048 limit in rhea to infinite like the .NET SDK) + return []; + } else { + throw error; + } }); } @@ -438,7 +453,7 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver return { close: async (): Promise => { - return this._messageSession?.receiverHelper.suspend(); + return this._messageSession?.streamingReceiverHelper.suspend(); } }; } diff --git a/sdk/servicebus/service-bus/src/serviceBusError.ts b/sdk/servicebus/service-bus/src/serviceBusError.ts index 1eb70a8f455d..faa47a4e158f 100644 --- a/sdk/servicebus/service-bus/src/serviceBusError.ts +++ b/sdk/servicebus/service-bus/src/serviceBusError.ts @@ -89,6 +89,17 @@ export const wellKnownMessageCodesToServiceBusCodes: Map { return this._batchingReceiverLite.isReceivingMessages || this._isReceivingMessagesForSubscriber; } + /** + * Method to be called after the message is settled when receiving messages through subscribe. + * + * Currently, more credits will be added at this spot based on the maxConcurrentCalls and empty slots. + */ + public settlementNotifierForSubscribe: (() => void) | undefined; private _batchingReceiverLite: BatchingReceiverLite; private _isReceivingMessagesForSubscriber: boolean; @@ -180,10 +186,10 @@ export class MessageSession extends LinkEntity { private _totalAutoLockRenewDuration: number; - public get receiverHelper(): ReceiverHelper { - return this._receiverHelper; + public get streamingReceiverHelper(): StreamingReceiverHelper { + return this._streamingReceiverHelper; } - private _receiverHelper: ReceiverHelper; + private _streamingReceiverHelper: StreamingReceiverHelper; /** * Ensures that the session lock is renewed before it expires. The lock will not be renewed for @@ -367,14 +373,18 @@ export class MessageSession extends LinkEntity { address: entityPath, audience: `${connectionContext.config.endpoint}${entityPath}` }); - this._receiverHelper = new ReceiverHelper(() => ({ - receiver: this.link, - logPrefix: this.logPrefix - })); + this.receiveMode = options.receiveMode || "peekLock"; + this._streamingReceiverHelper = new StreamingReceiverHelper( + () => ({ + receiver: this.link, + logPrefix: this.logPrefix + }), + this.receiveMode, + this.maxConcurrentCalls + ); this._retryOptions = options.retryOptions; this.autoComplete = false; if (isDefined(this._providedSessionId)) this.sessionId = this._providedSessionId; - this.receiveMode = options.receiveMode || "peekLock"; this.maxAutoRenewDurationInMs = options.maxAutoLockRenewalDurationInMs != null ? options.maxAutoLockRenewalDurationInMs @@ -607,6 +617,7 @@ export class MessageSession extends LinkEntity { this._onError = onError; if (this.link && this.link.isOpen()) { + this.settlementNotifierForSubscribe = () => this._streamingReceiverHelper.postProcessing(); const onSessionMessage = async (context: EventContext): Promise => { // If the receiver got closed in PeekLock mode, avoid processing the message as we // cannot settle the message. @@ -684,7 +695,7 @@ export class MessageSession extends LinkEntity { } return; } finally { - this.receiverHelper.addCredit(1); + this._streamingReceiverHelper.onReceive(); } // If we've made it this far, then user's message handler completed fine. Let us try @@ -721,7 +732,7 @@ export class MessageSession extends LinkEntity { // setting the "message" event listener. this.link.on(ReceiverEvents.message, onSessionMessage); // adding credit - this.receiverHelper.addCredit(this.maxConcurrentCalls); + this._streamingReceiverHelper.addInitialCredits(); } else { this._isReceivingMessagesForSubscriber = false; const msg = diff --git a/sdk/servicebus/service-bus/test/internal/2048-limit-test.spec.ts b/sdk/servicebus/service-bus/test/internal/2048-limit-test.spec.ts new file mode 100644 index 000000000000..1bf2f15b5358 --- /dev/null +++ b/sdk/servicebus/service-bus/test/internal/2048-limit-test.spec.ts @@ -0,0 +1,394 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import chai from "chai"; +import chaiAsPromised from "chai-as-promised"; +import { ProcessErrorArgs, ServiceBusReceivedMessage, ServiceBusSender } from "../../src"; +import { checkWithTimeout, TestClientType, TestMessage } from "../public/utils/testUtils"; +import { ServiceBusReceiver, ServiceBusReceiverImpl } from "../../src/receivers/receiver"; +import { + ServiceBusClientForTests, + createServiceBusClientForTests, + EntityName, + getRandomTestClientTypeWithNoSessions, + getRandomTestClientTypeWithSessions +} from "../public/utils/testutils2"; +import { verifyMessageCount } from "../public/utils/managementUtils"; +import { ServiceBusSessionReceiverImpl } from "../../src/receivers/sessionReceiver"; +import { MinimalReceiver } from "../../src/core/batchingReceiver"; +import { InternalMessageHandlers } from "../../src/models"; + +chai.use(chaiAsPromised); + +let serviceBusClient: ServiceBusClientForTests; +let entityName: EntityName; +let sender: ServiceBusSender; +let receiver: ServiceBusReceiver; +const retryOptions = { maxRetries: 1, retryDelayInMs: 10 }; +const bufferCapacityToSet = 20; +const numberOfMessagesToSend = bufferCapacityToSet * 1.5; +const getTestClientTypes = () => [ + getRandomTestClientTypeWithNoSessions(), + getRandomTestClientTypeWithSessions() +]; + +async function beforeEachTest( + entityType: TestClientType, + receiveMode: "peekLock" | "receiveAndDelete" = "peekLock" +): Promise { + entityName = await serviceBusClient.test.createTestEntities(entityType); + if (receiveMode === "receiveAndDelete") { + receiver = await serviceBusClient.test.createReceiveAndDeleteReceiver(entityName); + } else { + receiver = await serviceBusClient.test.createPeekLockReceiver(entityName); + } + + sender = serviceBusClient.test.addToCleanup( + serviceBusClient.createSender(entityName.queue ?? entityName.topic!) + ); + + reduceBufferCapacity(receiver, bufferCapacityToSet); +} + +async function sendMessages(numberOfMessagesToSend: number) { + let current = 0; + const messageBodies = []; + while (current < numberOfMessagesToSend) { + const batch = await sender.createMessageBatch(); + let body = `message-${current}`; + while ( + current < numberOfMessagesToSend && + batch.tryAddMessage({ + body, + timeToLive: 10000, + sessionId: entityName.usesSessions ? TestMessage.sessionId : undefined + }) + ) { + messageBodies.push(body); + current++; + body = `message-${current}`; + } + + await sender.sendMessages(batch); + } + return messageBodies; +} + +function setIncomingCapacityOnLink(link: MinimalReceiver | undefined, newCapacity: number) { + if (link && link.session.incoming.deliveries.capacity > 0) { + link.session.incoming.deliveries.capacity = newCapacity; + } +} + +function reduceBufferCapacity(receiver: ServiceBusReceiver, newCapacity: number) { + if (entityName.usesSessions) { + const receiverTemp = receiver as ServiceBusSessionReceiverImpl; + const link = receiverTemp["_messageSession"]["link"]; + return setIncomingCapacityOnLink(link, newCapacity); + } + + const receiverTemp = receiver as ServiceBusReceiverImpl; + if (receiverTemp["_streamingReceiver"]) { + const link = receiverTemp["_streamingReceiver"]["link"]; + return setIncomingCapacityOnLink(link, newCapacity); + } + + const createBatchingReceiver = receiverTemp["_createBatchingReceiver"]; + (receiver as ServiceBusReceiverImpl)["_createBatchingReceiver"] = function(...args: any) { + const batchingReceiver = createBatchingReceiver.apply(this, args); + const _getCurrentReceiver = batchingReceiver["_batchingReceiverLite"]["_getCurrentReceiver"]; + batchingReceiver["_batchingReceiverLite"]["_getCurrentReceiver"] = async function( + ...newArgs: any + ) { + const link = await _getCurrentReceiver.apply(this, newArgs); + setIncomingCapacityOnLink(link, newCapacity); + return link; + }; + return batchingReceiver; + }; +} + +describe("2048 scenarios - receiveBatch in a loop", function(): void { + beforeEach(() => { + serviceBusClient = createServiceBusClientForTests({ + retryOptions + }); + }); + + afterEach(async () => { + await serviceBusClient.test.afterEach(); + return serviceBusClient.test.after(); + }); + + async function receiveMessages(numberOfMessagesToReceive: number) { + let messages: ServiceBusReceivedMessage[] = []; + while (messages.length < numberOfMessagesToReceive) { + messages = messages.concat(await receiver.receiveMessages(50, { maxWaitTimeInMs: 5000 })); + } + chai.assert.equal( + messages.length, + numberOfMessagesToReceive, + "Unexpected number of messages received" + ); + return messages; + } + + describe("receiveAndDelete", () => { + getTestClientTypes().forEach((clientType) => { + it( + clientType + ": would be able to receive more than bufferCapacity messages", + async function(): Promise { + await beforeEachTest(clientType, "receiveAndDelete"); + await sendMessages(numberOfMessagesToSend); + await receiveMessages(numberOfMessagesToSend); + await verifyMessageCount(0, entityName); + } + ); + }); + }); + + describe("peekLock: can receive a max of (bufferCapacity-1) messages when not being settled", () => { + getTestClientTypes().forEach((clientType) => { + it( + clientType + + ": deliveryCount will be incremented for (bufferCapacity-1) messages if closed the receiver and received again", + async function(): Promise { + await beforeEachTest(clientType); + await sendMessages(numberOfMessagesToSend); + await Promise.all( + (await receiveMessages(bufferCapacityToSet - 1)).map((msg) => + receiver.abandonMessage(msg) + ) + ); + await verifyMessageCount(numberOfMessagesToSend, entityName); + await serviceBusClient.close(); + serviceBusClient = createServiceBusClientForTests(); + await verifyMessageCount(numberOfMessagesToSend, entityName); + receiver = await serviceBusClient.test.createReceiveAndDeleteReceiver(entityName); + const messages = await receiveMessages(numberOfMessagesToSend); + const delCount = new Array(10).fill(0, 0, 10); + for (const message of messages) { + if (message.deliveryCount) { + delCount[message.deliveryCount]++; + } + } + chai.assert.equal( + delCount[1], + bufferCapacityToSet - 1, + "Unexpected number of messages have deliveryCount = 1" + ); + await verifyMessageCount(0, entityName); + } + ); + + function mockBachingReceive(receiver: ServiceBusReceiver, receiveCalled: { count: number }) { + if (entityName.usesSessions) { + const sessionReceiverTemp = receiver as ServiceBusSessionReceiverImpl; + if (sessionReceiverTemp["_messageSession"]) { + const tempFunc = + sessionReceiverTemp["_messageSession"]["_batchingReceiverLite"].receiveMessages; + sessionReceiverTemp["_messageSession"][ + "_batchingReceiverLite" + ].receiveMessages = function(...args: any) { + receiveCalled.count++; + return tempFunc.apply(this, args); + }; + } + } else { + const receiverTemp = receiver as ServiceBusReceiverImpl; + if (receiverTemp["_batchingReceiver"]) { + const tempFunc = + receiverTemp["_batchingReceiver"]["_batchingReceiverLite"].receiveMessages; + receiverTemp["_batchingReceiver"]["_batchingReceiverLite"].receiveMessages = function( + ...args: any + ) { + receiveCalled.count++; + return tempFunc.apply(this, args); + }; + } + } + } + + it( + clientType + ": new messageBatch returns zero after (bufferCapacity-1) messages", + async function(): Promise { + await beforeEachTest(clientType); + await sendMessages(numberOfMessagesToSend); + const firstBatch = await receiveMessages(bufferCapacityToSet - 1); + await verifyMessageCount(numberOfMessagesToSend, entityName); + + const receiveCalled = { count: 0 }; + mockBachingReceive(receiver, receiveCalled); + chai.assert.equal( + (await receiver.receiveMessages(1, { maxWaitTimeInMs: 2000 })).length, + 0, + "Should have received 0 messages!" + ); + chai.assert.equal( + receiveCalled.count, + retryOptions.maxRetries + 1, + "Unexpected number of times receive called" + ); + await verifyMessageCount(numberOfMessagesToSend, entityName); + await Promise.all(firstBatch.map((msg) => receiver.completeMessage(msg))); + const leftOver = await receiveMessages( + numberOfMessagesToSend - (bufferCapacityToSet - 1) + ); + chai.assert.equal( + leftOver.length, + numberOfMessagesToSend - (bufferCapacityToSet - 1), + "Unexpected leftover number of messages received" + ); + } + ); + }); + }); +}); + +describe("2048 scenarios - subscribe", function(): void { + beforeEach(() => { + serviceBusClient = createServiceBusClientForTests({ + retryOptions + }); + }); + + afterEach(async () => { + await serviceBusClient.test.afterEach(); + await serviceBusClient.test.after(); + }); + + describe("receiveAndDelete", () => { + getTestClientTypes().forEach((clientType) => { + it( + clientType + ": would be able to receive more than bufferCapacity messages", + async function(): Promise { + await beforeEachTest(clientType, "receiveAndDelete"); + await sendMessages(numberOfMessagesToSend); + let numberOfMessagesReceived = 0; + receiver.subscribe( + { + async processMessage(_msg: ServiceBusReceivedMessage) { + numberOfMessagesReceived++; + }, + async processError() {}, + async processInitialize() { + reduceBufferCapacity(receiver, bufferCapacityToSet); // Used for non-sessions only, sessions being mocked at beforeEachTest + } + } as InternalMessageHandlers, + { maxConcurrentCalls: 2000 } + ); + chai.assert.equal( + await checkWithTimeout( + () => numberOfMessagesReceived === numberOfMessagesToSend, + 1000, + 100000 + ), + true, + `Could not receive the messages in expected time. RECEIVED=${numberOfMessagesReceived}, SENT=${numberOfMessagesToSend}` + ); + await verifyMessageCount(0, entityName); + await receiver.close(); + } + ); + }); + }); + + describe("peekLock", () => { + getTestClientTypes().forEach((clientType) => { + it( + clientType + ": receives more than bufferCapacity messages once settled", + async function(): Promise { + // subscribe - peekLock + // - send + // - receive 2048 messages + // - do not settle them + // - wait for 10 seconds + // - make sure no new messages were received + // - settle one message + // - wait for 30 seconds + // - we should receive one new message by now + // - settle all the messages + // - rest would have been received + // - settle all of them + // - verifyMessageCount + await beforeEachTest(clientType); + let sentBodies = await sendMessages(numberOfMessagesToSend); + const receivedBodies: any[] = []; + let numberOfMessagesReceived = 0; + let reachedBufferCapacity = false; + const firstBatch: ServiceBusReceivedMessage[] = []; + const secondBatch: ServiceBusReceivedMessage[] = []; + receiver.subscribe( + { + async processMessage(msg: ServiceBusReceivedMessage) { + numberOfMessagesReceived++; + receivedBodies.push(msg.body); + if (!reachedBufferCapacity) { + firstBatch.push(msg); + } else { + secondBatch.push(msg); + } + }, + async processError(_args: ProcessErrorArgs) { + // if ( + // ((args.error as ServiceBusError).code as InternalServiceBusErrorCode) === + // "UnsettledMessagesLimitExceeded" + // ) { + // unsettledMessagesLimitErrorSeen = true; + // } + }, + async processInitialize() { + reduceBufferCapacity(receiver, bufferCapacityToSet); // Used for non-sessions only, sessions being mocked at beforeEachTest + } + } as InternalMessageHandlers, + { + maxConcurrentCalls: 2000, + autoCompleteMessages: false + } + ); + reachedBufferCapacity = await checkWithTimeout( + () => numberOfMessagesReceived === bufferCapacityToSet - 1, + 1000, + 100000 + ); + chai.assert.equal( + numberOfMessagesReceived, + bufferCapacityToSet - 1, + "Unexpected - messages were not settled, so new messages should not have been received" + ); + // chai.assert.equal( + // unsettledMessagesLimitErrorSeen, + // true, + // "UnsettledMessagesLimitExceeded should have been observed in the processError callback" + // ); + await receiver.completeMessage(firstBatch.shift()!); // settle the first message + chai.assert.equal( + await checkWithTimeout( + () => numberOfMessagesReceived >= bufferCapacityToSet, + 1000, + 30000 + ), + true, + `Unexpected - one message was settled, count should have been bufferCapacityToSet: ${bufferCapacityToSet}` + ); + await Promise.all(firstBatch.map((msg) => receiver.completeMessage(msg))); + chai.assert.equal( + await checkWithTimeout( + () => { + sentBodies = sentBodies.filter((sentBody) => !receivedBodies.includes(sentBody)); + return sentBodies.length === 0; + }, + 1000, + 100000 + ), + true, + "Unexpected - all the sent messages should have been received" + ); + await Promise.all(secondBatch.map((msg) => receiver.completeMessage(msg))); + await verifyMessageCount(0, entityName); + await receiver.close(); + } + ); + }); + }); +}); diff --git a/sdk/servicebus/service-bus/test/internal/streamingReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/streamingReceiver.spec.ts index 345eea15e75c..8c5a5fbddc9b 100644 --- a/sdk/servicebus/service-bus/test/internal/streamingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/streamingReceiver.spec.ts @@ -798,14 +798,9 @@ describe("Streaming Receiver Tests", () => { `Expected 0 messages, but received ${receivedMsgs.length}` ); receiver = await serviceBusClient.test.createReceiveAndDeleteReceiver(entityNames); - await verifyMessageCount( - totalNumOfMessages, - entityNames.queue, - entityNames.topic, - entityNames.subscription - ); + await verifyMessageCount(totalNumOfMessages, entityNames); await drainReceiveAndDeleteReceiver(receiver); - await verifyMessageCount(0, entityNames.queue, entityNames.topic, entityNames.subscription); + await verifyMessageCount(0, entityNames); } it( diff --git a/sdk/servicebus/service-bus/test/internal/unit/batchingReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/batchingReceiver.spec.ts index 111a7d1c782b..d50fe77037e8 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/batchingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/batchingReceiver.spec.ts @@ -467,7 +467,7 @@ describe("BatchingReceiver unit tests", () => { const remainingRegisteredListeners = new Set(); - const fakeRheaReceiver = { + const fakeRheaReceiver = ({ on(evt: ReceiverEvents, handler: OnAmqpEventAsPromise) { emitter.on(evt, handler); @@ -498,6 +498,15 @@ describe("BatchingReceiver unit tests", () => { removeListener(evt: SessionEvents, handler: OnAmqpEventAsPromise) { remainingRegisteredListeners.delete(evt.toString()); emitter.removeListener(evt, handler); + }, + incoming: { + deliveries: { + capacity: 2048, + size: 0, + head: 0, + tail: 0, + entries: [] + } } }, isOpen: () => true, @@ -516,7 +525,7 @@ describe("BatchingReceiver unit tests", () => { connection: { id: "connection-id" } - } as RheaReceiver; + } as unknown) as Receiver; return { receiveIsReady, diff --git a/sdk/servicebus/service-bus/test/internal/unit/messageSession.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/messageSession.spec.ts index f1af33f1e979..5e96fc4ba378 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/messageSession.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/messageSession.spec.ts @@ -13,10 +13,10 @@ import sinon from "sinon"; import { EventEmitter } from "events"; import { ReceiverEvents, - Receiver as RheaReceiver, EventContext, Message as RheaMessage, - SessionEvents + SessionEvents, + Receiver } from "rhea-promise"; import { OnAmqpEventAsPromise } from "../../../src/core/messageReceiver"; import { ServiceBusMessageImpl } from "../../../src/serviceBusMessage"; @@ -282,7 +282,7 @@ describe("Message session unit tests", () => { const remainingRegisteredListeners = new Set(); - const fakeRheaReceiver = { + const fakeRheaReceiver = ({ on(evt: ReceiverEvents, handler: OnAmqpEventAsPromise) { emitter.on(evt, handler); @@ -313,6 +313,15 @@ describe("Message session unit tests", () => { removeListener(evt: SessionEvents, handler: OnAmqpEventAsPromise) { remainingRegisteredListeners.delete(evt.toString()); emitter.removeListener(evt, handler); + }, + incoming: { + deliveries: { + capacity: 2048, + size: 0, + head: 0, + tail: 0, + entries: [] + } } }, isOpen: () => true, @@ -331,7 +340,7 @@ describe("Message session unit tests", () => { connection: { id: "connection-id" } - } as RheaReceiver; + } as unknown) as Receiver; batchingReceiver["_link"] = fakeRheaReceiver; diff --git a/sdk/servicebus/service-bus/test/internal/unit/receiverHelper.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/receiverHelper.spec.ts index 6a0125d8c5dd..51f51795c2ca 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/receiverHelper.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/receiverHelper.spec.ts @@ -4,7 +4,7 @@ import chai from "chai"; import chaiAsPromised from "chai-as-promised"; import { Receiver, ReceiverEvents } from "rhea-promise"; -import { ReceiverHelper } from "../../../src/core/receiverHelper"; +import { StreamingReceiverHelper } from "../../../src/core/streamingReceiverHelper"; import { createRheaReceiverForTests } from "./unittestUtils"; chai.use(chaiAsPromised); const assert = chai.assert; @@ -28,12 +28,16 @@ describe("ReceiverHelper unit tests", () => { * checking. */ it(`operations on an invalid receiver should just no-op harmlessly: ${invalidReceiver}`, async () => { - const helper = new ReceiverHelper(() => ({ - receiver: invalidReceiver, - logPrefix: "whatever" - })); + const helper = new StreamingReceiverHelper( + () => ({ + receiver: invalidReceiver, + logPrefix: "whatever" + }), + "peekLock", + 1000 + ); - assert.isFalse(helper.addCredit(101)); + assert.isFalse(helper["addCredit"](101)); await helper.drain(); await helper.suspend(); @@ -49,13 +53,17 @@ describe("ReceiverHelper unit tests", () => { ); // should still do nothing. - helper.addCredit(101); + helper["addCredit"](101); }); }); it("operations on an open receiver", async () => { const receiver = createRheaReceiverForTests(); - const helper = new ReceiverHelper(() => ({ receiver, logPrefix: "hello" })); + const helper = new StreamingReceiverHelper( + () => ({ receiver, logPrefix: "hello" }), + "peekLock", + 1000 + ); let drainWasCalled = false; @@ -64,7 +72,7 @@ describe("ReceiverHelper unit tests", () => { }); // we can explicitly drain - helper.addCredit(101); + helper["addCredit"](101); await helper.drain(); assert.isTrue(drainWasCalled); @@ -74,7 +82,7 @@ describe("ReceiverHelper unit tests", () => { // or we can drain as part of suspending a receiver. drainWasCalled = false; - helper.addCredit(101); + helper["addCredit"](101); await helper.suspend(); assert.isTrue(helper["_isSuspended"]); @@ -83,7 +91,7 @@ describe("ReceiverHelper unit tests", () => { assert.equal(receiver.credit, 0); // if we suspend() a receiver it will no longer have credits added. - helper.addCredit(101); + helper["addCredit"](101); assert.equal( receiver.credit, 0, @@ -92,7 +100,7 @@ describe("ReceiverHelper unit tests", () => { helper.resume(); assert.isFalse(helper["_isSuspended"]); - helper.addCredit(101); + helper["addCredit"](101); assert.equal(receiver.credit, 101); }); }); diff --git a/sdk/servicebus/service-bus/test/internal/unit/streamingReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/streamingReceiver.spec.ts index a15800977f2c..e7a4cce82844 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/streamingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/streamingReceiver.spec.ts @@ -92,7 +92,7 @@ describe("StreamingReceiver unit tests", () => { // start of the receive call. assert.isTrue( streamingReceiver.isReceivingMessages, - "receive() sets the isReceivingMessages flag immediately to avoid race conditions" + "receive() is supposed to set the isReceivingMessages flag immediately to avoid race conditions" ); streamingReceiver.subscribe( @@ -107,22 +107,26 @@ describe("StreamingReceiver unit tests", () => { assert.equal( streamingReceiver["link"]!.credit, 101, - "Credits are added when receive() is called" + "Credits are not added when receive() is called" ); assert.isTrue( streamingReceiver.isReceivingMessages, - "receive() sets the isReceivingMessages flag immediately to avoid race conditions" + "receive() is supposed to set the isReceivingMessages flag immediately to avoid race conditions" ); // now we'll stop the streaming receiver and then start it back up again. await streamingReceiver.stopReceivingMessages(); assert.isFalse( streamingReceiver.isReceivingMessages, - "We've stopped receiving messages explicitly" + "We've stopped receiving messages explicitly, isReceivingMessages should have been false" ); - assert.equal(streamingReceiver["link"]?.credit, 0, "All receiver credits have been drained"); // ie, receiver drained + assert.equal( + streamingReceiver["link"]?.credit, + 0, + "All receiver credits should have been drained" + ); // ie, receiver drained await streamingReceiver.init({ useNewName: false, @@ -133,7 +137,7 @@ describe("StreamingReceiver unit tests", () => { assert.isTrue( streamingReceiver.isReceivingMessages, - "we've initialized the streaming receiver again so we're ready to receive again" + "we've initialized the streaming receiver again so isReceivingMessages should have been true" ); streamingReceiver.subscribe( @@ -148,7 +152,7 @@ describe("StreamingReceiver unit tests", () => { assert.equal( streamingReceiver["link"]?.credit, 101, - "subscribe has started again, and is revitalized with 101 credits." + "subscribe has started again, and is supposed to be revitalized with 101 credits." ); }); @@ -452,11 +456,11 @@ describe("StreamingReceiver unit tests", () => { streamingReceiver["init"] = initMock; streamingReceiver["_onError"] = onErrorMock; - streamingReceiver["_receiverHelper"]["addCredit"] = addCreditMock; + streamingReceiver["_streamingReceiverHelper"]["addCredit"] = addCreditMock; await streamingReceiver.onDetached(new Error("let's detach")); assert.isTrue( - addCreditMock.calledWith(101), + addCreditMock.called, "Credits need to be re-added to the link since it's been recreated." ); diff --git a/sdk/servicebus/service-bus/test/internal/unit/unittestUtils.ts b/sdk/servicebus/service-bus/test/internal/unit/unittestUtils.ts index e13058a34cac..2e105d716495 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/unittestUtils.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/unittestUtils.ts @@ -86,6 +86,20 @@ export function createConnectionContextForTests( mockLinkProperties(receiver); (receiver as any).connection = { id: "connection-id" }; + (receiver as any).session = { + _session: { + incoming: { + deliveries: { + capacity: 2048, + size: 0, + head: 0, + tail: 0, + entries: [] + } + } + } + }; + (receiver as any).session.incoming = (receiver as any).session._session.incoming; return receiver; }, async close(): Promise { diff --git a/sdk/servicebus/service-bus/test/public/utils/managementUtils.ts b/sdk/servicebus/service-bus/test/public/utils/managementUtils.ts index 13126d28a8d5..a4d4dd015b51 100644 --- a/sdk/servicebus/service-bus/test/public/utils/managementUtils.ts +++ b/sdk/servicebus/service-bus/test/public/utils/managementUtils.ts @@ -9,6 +9,7 @@ import { ServiceBusAdministrationClient } from "../../../src"; import { EnvVarNames, getEnvVars } from "./envVarUtils"; import chai from "chai"; import { CreateQueueOptions } from "../../../src"; +import { EntityName } from "./testutils2"; const should = chai.should(); let client: ServiceBusAdministrationClient; @@ -181,15 +182,13 @@ export async function recreateSubscription( */ export async function verifyMessageCount( expectedMessageCount: number, - queueName?: string, - topicName?: string, - subscriptionName?: string + entityName: Pick ): Promise { getManagementClient(); should.equal( - queueName - ? (await client.getQueueRuntimeProperties(queueName)).totalMessageCount - : (await client.getSubscriptionRuntimeProperties(topicName!, subscriptionName!)) + entityName.queue + ? (await client.getQueueRuntimeProperties(entityName.queue)).totalMessageCount + : (await client.getSubscriptionRuntimeProperties(entityName.topic!, entityName.subscription!)) .totalMessageCount, expectedMessageCount, `Unexpected number of messages are present in the entity.` diff --git a/sdk/servicebus/service-bus/test/public/utils/testutils2.ts b/sdk/servicebus/service-bus/test/public/utils/testutils2.ts index b5bfddf365cf..a140b1a61e41 100644 --- a/sdk/servicebus/service-bus/test/public/utils/testutils2.ts +++ b/sdk/servicebus/service-bus/test/public/utils/testutils2.ts @@ -291,7 +291,7 @@ export class ServiceBusTestHelpers { // Relying on Atom mgmt client for the message count verification instead of the `testPeekMsgsLength` // because creating the session receivers might encounter timeouts or // "MessagingError: No unlocked sessions were available" when there are no available sessions - await verifyMessageCount(0, entityNames.queue, entityNames.topic, entityNames.subscription); + await verifyMessageCount(0, entityNames); } /**