Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] Fix credit management based on the buffer capacity for the receivers #14060

Closed
Closed
Show file tree
Hide file tree
Changes from 116 commits
Commits
Show all changes
127 commits
Select commit Hold shift + click to select a range
0e7de1c
Conditionally add credits
HarshaNalluru Mar 2, 2021
dd940e6
// Additionally.. have a checkWithTimeout that keeps chec…
HarshaNalluru Mar 2, 2021
76eb13e
Copy the 2048-limit-test.spec.ts test file from the other PR
HarshaNalluru Mar 2, 2021
a7191fe
simplify verifyMessageCount
HarshaNalluru Mar 2, 2021
e0ca3f5
peeklock - deliveryCount will be incremented for 2047 messages if clo…
HarshaNalluru Mar 2, 2021
bf789d0
new messageBatch returns zero after 2047 messages
HarshaNalluru Mar 2, 2021
fd9d20a
receiveAndDelete for subscribe
HarshaNalluru Mar 3, 2021
ecca8b0
handle streaming receiver case
HarshaNalluru Mar 3, 2021
b19575a
copy the logic for sessions
HarshaNalluru Mar 3, 2021
9a26c5e
addcredit(1) logic when hitting the limit
HarshaNalluru Mar 3, 2021
fddbd28
no need of asynclock
HarshaNalluru Mar 3, 2021
a209104
same fpr session receiver
HarshaNalluru Mar 3, 2021
144b904
creditsToAdd - only for peekLock - sessions
HarshaNalluru Mar 3, 2021
e7d7a82
tests
HarshaNalluru Mar 3, 2021
67bd418
bug fix in sending - in the test
HarshaNalluru Mar 3, 2021
26a30e1
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Mar 8, 2021
ceedf58
Handle undefined props while calculating numberOfEmptyIncomingSlots
HarshaNalluru Mar 8, 2021
d1c16ad
Increase timeout to 300000 for receiveAndDelete test
HarshaNalluru Mar 8, 2021
8f25785
Streaming Receiver: complete() after lock expiry with auto-renewal di…
HarshaNalluru Mar 8, 2021
fe6c0c2
We add credits based on the empty slots now
HarshaNalluru Mar 8, 2021
a4b867b
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Mar 8, 2021
ad2ef9b
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Mar 8, 2021
f36fc9e
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Mar 9, 2021
420af6d
Mock incoming circular buffer now that we look for capacity and the a…
HarshaNalluru Mar 9, 2021
eebb4fe
Update sdk/servicebus/service-bus/src/core/streamingReceiver.ts
HarshaNalluru Mar 11, 2021
68b1ee7
Simplifying streaming receiver credit addition
HarshaNalluru Mar 11, 2021
1ac91f3
Merge branch 'harshan/sb/issue/11633-solution-7' of https://github.co…
HarshaNalluru Mar 11, 2021
aa6b359
Update error message for sessions as well
HarshaNalluru Mar 11, 2021
478a7ae
Add receiver type for numberOfEmptyIncomingSlots
HarshaNalluru Mar 11, 2021
629de9a
depend on rhea-promise's commit with the type fix
HarshaNalluru Mar 11, 2021
9f2fd9b
numberOfEmptyIncomingSlots - remove "any"
HarshaNalluru Mar 11, 2021
db35c49
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Mar 12, 2021
6a5fd66
pnpm-lock
HarshaNalluru Mar 12, 2021
55138fa
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Mar 12, 2021
418a4f8
ExcessUnsettledMessagesInBuffer
HarshaNalluru Mar 13, 2021
38a5fff
Refactoring streaming receiver parts for sessions and non-sessions
HarshaNalluru Mar 15, 2021
1d0203b
API Report
HarshaNalluru Mar 15, 2021
8324a30
mocking fakeReceiver and remove a console.log that had been present s…
HarshaNalluru Mar 16, 2021
a38830c
fix session tests as well
HarshaNalluru Mar 16, 2021
8090238
mock it properly - the getter from rhea-promise
HarshaNalluru Mar 16, 2021
814765e
update assertion messages to explain why the assertions failed
HarshaNalluru Mar 16, 2021
cb3975c
New mthod addCreditsInit to refactor common code b/w sessions and non…
HarshaNalluru Mar 16, 2021
88bb497
_getCurrentReceiver for ProcessMessageCreditManager
HarshaNalluru Mar 16, 2021
99841cc
rename to StreamingReceiverCreditManager
HarshaNalluru Mar 16, 2021
b782d56
remove rename todo
HarshaNalluru Mar 16, 2021
2c048b3
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Apr 26, 2021
0fc6f9b
lock file
HarshaNalluru Apr 26, 2021
fddc414
lock file from master
HarshaNalluru Apr 26, 2021
00f6b4b
Update sdk/servicebus/service-bus/src/core/shared.ts
HarshaNalluru Apr 26, 2021
93f6a97
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Apr 26, 2021
7293fa4
changel;og
HarshaNalluru Apr 26, 2021
6188a4e
formatting
HarshaNalluru Apr 26, 2021
d7e914a
Update sdk/servicebus/service-bus/src/core/shared.ts
HarshaNalluru Apr 26, 2021
65630c8
TODOs
HarshaNalluru Apr 26, 2021
42e1b8f
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Apr 27, 2021
ffd2b79
todo
HarshaNalluru Apr 27, 2021
eea0cff
Merge branch 'harshan/sb/issue/11633-solution-7' of https://github.co…
HarshaNalluru Apr 27, 2021
8994c2d
_settlementNotifierForSubscribe
HarshaNalluru Apr 27, 2021
4bbbfbf
remove underscore - settlementNotifierForSubscribe
HarshaNalluru Apr 27, 2021
b72b80d
UnsettledMessagesLimitExceeded
HarshaNalluru Apr 27, 2021
d67a552
rushx format
HarshaNalluru Apr 27, 2021
a4c93f5
set .only
HarshaNalluru Apr 27, 2021
ae3109d
partitioned and unpartitioned had no diff
HarshaNalluru Apr 27, 2021
4a1b205
set to partitioned queue
HarshaNalluru Apr 27, 2021
b692c80
omit doc settings
HarshaNalluru Apr 27, 2021
25d465b
Revert "omit doc settings"
HarshaNalluru Apr 27, 2021
906f9f7
delete logs
HarshaNalluru Apr 27, 2021
7192d04
add console.logs and set .only for the slow tests
HarshaNalluru Apr 27, 2021
f385a24
add debug variable for debugging in CI
HarshaNalluru Apr 27, 2021
a466bbd
remove .only
HarshaNalluru Apr 28, 2021
5fb5562
revert package.json
HarshaNalluru Apr 28, 2021
b63d88a
Rename ReceiverHelper -> StreamingReceiverHelper
HarshaNalluru Apr 28, 2021
9634b74
- numberOfFilledSlots as well
HarshaNalluru Apr 28, 2021
f30bba5
this.maxConcurrentCalls - numberOfFilledSlots instead of this.maxConc…
HarshaNalluru Apr 28, 2021
e53e93d
add log messages
HarshaNalluru Apr 28, 2021
270573d
build failure: in the test
HarshaNalluru Apr 28, 2021
741ada6
remove old postProcessing code
HarshaNalluru Apr 28, 2021
a91db19
final major fixes to credit manager
HarshaNalluru Apr 28, 2021
1298c16
js doc
HarshaNalluru Apr 28, 2021
1711ded
numberOfEmptySlots<1
HarshaNalluru Apr 28, 2021
7820768
doc comment
HarshaNalluru Apr 28, 2021
ca98f09
UnsettledMessagesLimitExceeded Test TODO
HarshaNalluru Apr 28, 2021
50f1217
Update sdk/servicebus/service-bus/CHANGELOG.md
HarshaNalluru Apr 28, 2021
6cbebff
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Apr 28, 2021
9bf59cb
more comments for -1
HarshaNalluru Apr 29, 2021
d0ba78e
Merge branch 'harshan/sb/issue/11633-solution-7' of https://github.co…
HarshaNalluru Apr 29, 2021
dbd1097
UnsettledMessagesLimitExceeded => return 0 messages instead of throwing
HarshaNalluru Apr 30, 2021
c25ef93
InternalServiceBusErrorCode
HarshaNalluru Apr 30, 2021
11b8206
cast UnsettledMessagesLimitExceeded as ServiceBusErrorCode
HarshaNalluru Apr 30, 2021
8bbc2d9
mockBachingReceive for count
HarshaNalluru Apr 30, 2021
d366486
Api Report
HarshaNalluru Apr 30, 2021
c6ca65b
ServiceBusError not thrown for subscribe
HarshaNalluru Apr 30, 2021
34fdc32
not check for unsettledMessagesLimitErrorSeen
HarshaNalluru Apr 30, 2021
1f75e73
Update sdk/servicebus/service-bus/CHANGELOG.md
HarshaNalluru Apr 30, 2021
4415097
formatting
HarshaNalluru Apr 30, 2021
9416b33
Merge branch 'harshan/sb/issue/11633-solution-7' of https://github.co…
HarshaNalluru Apr 30, 2021
3f3def6
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Apr 30, 2021
97b5c8b
comment
HarshaNalluru Apr 30, 2021
e1bc672
unneeded arrow function
HarshaNalluru Apr 30, 2021
f66789e
add the arrow functions back
HarshaNalluru Apr 30, 2021
71780be
remove delays
HarshaNalluru Apr 30, 2021
516d270
update comments
HarshaNalluru Apr 30, 2021
bb07f09
reduceBufferCapacity
HarshaNalluru May 3, 2021
db05be0
- remove .only
HarshaNalluru May 3, 2021
3969549
Add subscribe issue too
HarshaNalluru May 3, 2021
69be170
Update sdk/servicebus/service-bus/src/core/shared.ts
HarshaNalluru May 3, 2021
1d6a152
elaborating a comment
HarshaNalluru May 3, 2021
e71dfdd
elaborating a comment - onReceive method
HarshaNalluru May 3, 2021
d0c9426
(until the service fixes the "draining" bug at the terminal case and/…
HarshaNalluru May 3, 2021
a2f75c5
Repeat it for sessions
HarshaNalluru May 3, 2021
0e9091c
settlementNotifierForSubscribe
HarshaNalluru May 3, 2021
7647e3d
Merge branch 'harshan/sb/issue/11633-solution-7' of https://github.co…
HarshaNalluru May 3, 2021
27e0480
new line
HarshaNalluru May 3, 2021
eebffb3
more text in changelog
HarshaNalluru May 3, 2021
7cad41d
remove as Receiver cast in test
HarshaNalluru May 3, 2021
716f7b1
abandon seems to have added the delivery count prop back on the message
HarshaNalluru May 3, 2021
931b9fd
@internal tag
HarshaNalluru May 3, 2021
b551bf8
remove _notifyError at onReceive(_notifyError: OnError | undefined)
HarshaNalluru May 3, 2021
7c9136c
addInitialCredits
HarshaNalluru May 3, 2021
f1332bf
@internal tag
HarshaNalluru May 3, 2021
4d26b7f
dead code in test
HarshaNalluru May 3, 2021
fce9703
settleMessageOperation refactor to remove !
HarshaNalluru May 3, 2021
afc9760
Add awaits and wrap with try catch
HarshaNalluru May 3, 2021
2d5800d
Merge StreamingReceiverCreditManager into StreamingReceiverHelper
HarshaNalluru May 3, 2021
02710aa
revert changelog deletion
HarshaNalluru May 3, 2021
fd07e8c
resolve build failures
HarshaNalluru May 3, 2021
5295fe0
missed to throw
HarshaNalluru May 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@

### Bug fixes

- [Bug Fix] `expiresAtUtc` is `Invalid Date` in the received message when the ttl is not defined. Has been fixed in [#13543](https://github.com/Azure/azure-sdk-for-js/pull/13543)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why this changelog entry was removed. Was there a merge conflict?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no, I'll revert. Thanks! 🙂

- If there are many(2047) outstanding messages that are not settled,
- `receiveMessages` hangs forever and doesn't return for unpartitioned queues [Issue #11633](https://github.com/Azure/azure-sdk-for-js/issues/11633),
- `subscribe` doesn't receive more messages even after settling the unsettled messages for partitioned queues [Issue #15114](https://github.com/Azure/azure-sdk-for-js/issues/15114)

[#14060](https://github.com/Azure/azure-sdk-for-js/pull/14060) fixes the problem by returning an empty array for receiveMessages. If used `subscribe`, more messages will be received after the outstanding messages are settled.
- Some of the queue properties such as "forwardTo" and "autoDeleteOnIdle" were not being set as requested through the `ServiceBusAdministrationClient.createQueue` method because of a bug w.r.t the ordering of XML properties. The issue has been fixed in [#14692](https://github.com/Azure/azure-sdk-for-js/pull/14692).
- Settling messages now use the `retryOptions` passed to `ServiceBusClient`, making it more resilient against network failures.
[PR#14867](https://github.com/Azure/azure-sdk-for-js/pull/14867/files)
Expand Down
29 changes: 21 additions & 8 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import { checkAndRegisterWithAbortSignal } from "../util/utils";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { createAndEndProcessingSpan } from "../diagnostics/instrumentServiceBusMessage";
import { ReceiveMode } from "../models";
import { ServiceBusError, translateServiceBusError } from "../serviceBusError";
import { ServiceBusError, ServiceBusErrorCode, translateServiceBusError } from "../serviceBusError";
import { incomingBufferProperties, UnsettledMessagesLimitExceededError } from "./shared";

/**
* Describes the batching receiver where the user can receive a specified number of messages for
Expand Down Expand Up @@ -195,7 +196,10 @@ type EventEmitterLike<T extends Receiver | Session> = Pick<T, "once" | "removeLi
*
* @internal
*/
export type MinimalReceiver = Pick<Receiver, "name" | "isOpen" | "credit" | "addCredit" | "drain"> &
export type MinimalReceiver = Pick<
Receiver,
"name" | "isOpen" | "credit" | "addCredit" | "drain" | "session"
> &
EventEmitterLike<Receiver> & {
session: EventEmitterLike<Session>;
} & {
Expand Down Expand Up @@ -493,15 +497,24 @@ export class BatchingReceiverLite {
reject(err);
}, args.abortSignal);

logger.verbose(
`${loggingPrefix} Adding credit for receiving ${args.maxMessageCount} messages.`
);
const { numberOfEmptySlots } = incomingBufferProperties(receiver);
if (numberOfEmptySlots === 0) {
throw new ServiceBusError(
UnsettledMessagesLimitExceededError,
"UnsettledMessagesLimitExceeded" as ServiceBusErrorCode
);
}
const creditsToAdd =
this._receiveMode === "peekLock"
? Math.min(args.maxMessageCount, numberOfEmptySlots)
: args.maxMessageCount;
logger.verbose(`${loggingPrefix} Adding credit for receiving ${creditsToAdd} messages.`);

// By adding credit here, we let the service know that at max we can handle `maxMessageCount`
// By adding credit here, we let the service know that at max we can handle `creditsToAdd`
// number of messages concurrently. We will return the user an array of messages that can
// be of size upto maxMessageCount. Then the user needs to accordingly dispose
// be of size upto `creditsToAdd`. Then the user needs to accordingly dispose
// (complete/abandon/defer/deadletter) the messages from the array.
receiver.addCredit(args.maxMessageCount);
receiver.addCredit(creditsToAdd);
HarshaNalluru marked this conversation as resolved.
Show resolved Hide resolved

logger.verbose(
`${loggingPrefix} Setting the wait timer for ${args.maxWaitTimeInMs} milliseconds.`
Expand Down
7 changes: 7 additions & 0 deletions sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ export abstract class MessageReceiver extends LinkEntity<Receiver> {
*/
protected _lockRenewer: LockRenewer | undefined;

/**
* Method to be called after the message is settled when receiving messages through subscribe.
*
* Currently, more credits will be added at this spot based on the maxConcurrentCalls and empty slots.
*/
public settlementNotifierForSubscribe: (() => void) | undefined;

constructor(
context: ConnectionContext,
entityPath: string,
Expand Down
134 changes: 133 additions & 1 deletion sdk/servicebus/service-bus/src/core/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@

import { Delivery, ReceiverOptions, Source } from "rhea-promise";
import { translateServiceBusError } from "../serviceBusError";
import { receiverLogger } from "../log";
import { logger, receiverLogger } from "../log";
import { ReceiveMode } from "../models";
import { Receiver } from "rhea-promise";
import { OnError } from "./messageReceiver";
import { StreamingReceiverHelper } from "./streamingReceiverHelper";

/**
* @internal
Expand Down Expand Up @@ -103,3 +106,132 @@ export function createReceiverOptions(

return rcvrOptions;
}

export const UnsettledMessagesLimitExceededError =
HarshaNalluru marked this conversation as resolved.
Show resolved Hide resolved
"Failed to fetch new messages as the limit for unsettled messages is reached. Please settle received messages using settlement methods(such as `completeMessage()`) on the receiver to receive the next message.";

/**
* Returns the number of empty/filled slots in the Circular buffer of incoming deliveries
* based on the capacity and size of the buffer.
*
* @internal
*/
export function incomingBufferProperties(
receiver: Pick<Receiver, "session"> | undefined
): {
numberOfEmptySlots: number; // 2048(total) - filled
numberOfFilledSlots: number; // number of unsettled messages
} {
const incomingDeliveries = receiver?.session?.incoming?.deliveries;
let numberOfEmptySlots = 0;
if (incomingDeliveries && incomingDeliveries.capacity - 1 > incomingDeliveries.size) {
// Exmpty slots should have been `incomingDeliveries.capacity - 1 - incomingDeliveries.size`. Why -1?
// - If the number of slots is set to (capacity - size),
// the number of unsettled messages that can be held in the buffer would equal to the "capacity".
// At that limiting point, service doesn't respond to the drain request for unpartitioned queues.
// Service team is tracking the issue.
// -1 allows us to not fill up the buffer entirely, it fills up to 2047 if the capacity is 2048
numberOfEmptySlots = incomingDeliveries.capacity - 1 - incomingDeliveries.size;
}
return {
numberOfEmptySlots,
numberOfFilledSlots: incomingDeliveries ? incomingDeliveries.size : 0
};
}

/**
* Provides helper methods to manage the credits on the link for the
* streaming messages scenarios.
* (Used by both sessions(MessageSession) and non-sessions(StreamingReceiver))
*
* @internal
*/
export class StreamingReceiverCreditManager {
constructor(
private _getCurrentReceiver: () => { receiver: Receiver | undefined; logPrefix: string },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we allow receiver to be undefined? Is there a scenario where that is actually intended?

Copy link
Member Author

@HarshaNalluru HarshaNalluru May 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not actually.
I copied this part of code from the StreamingReceiverHelper.
Once I checked(after your comment), I realized that StreamingReceiverHelper has guards to make sure the receiver not defined. (And CreditManager only ever calls StreamingReceiverHelper for adding credits, so the guards are carried over in a way.)

Instead of passing the StreamingReceiverHelper, I've now merged my StreamingReceiverCreditManager into the existing StreamingReceiverHelper.

Let me know if that looks fine.

private streamingReceiverHelper: StreamingReceiverHelper,
private receiveMode: ReceiveMode,
private maxConcurrentCalls: number
) {}

addCreditsInit() {
HarshaNalluru marked this conversation as resolved.
Show resolved Hide resolved
const { receiver, logPrefix } = this._getCurrentReceiver();
const emptySlots = incomingBufferProperties(receiver).numberOfEmptySlots;
const creditsToAdd =
this.receiveMode === "peekLock"
? Math.min(this.maxConcurrentCalls, emptySlots)
: this.maxConcurrentCalls;
this.streamingReceiverHelper.addCredit(creditsToAdd);
logger.verbose(
`${logPrefix} creditManager: added ${creditsToAdd} credits (initial); total credits = ${receiver?.credit}`
);
}
/**
* Upon receiving a new message, this method is to be called in the streaming receiver logic to add credits to receive more messages.
*
* @internal
*/
onReceive(_notifyError: OnError | undefined) {
HarshaNalluru marked this conversation as resolved.
Show resolved Hide resolved
const { receiver, logPrefix } = this._getCurrentReceiver();
if (this.receiveMode === "receiveAndDelete") {
this.streamingReceiverHelper.addCredit(1);
logger.verbose(
`${logPrefix} creditManager: added 1 credits upon receiving a message; total credits = ${receiver?.credit}`
);
return;
}

const { numberOfEmptySlots } = incomingBufferProperties(receiver);
if (receiver && numberOfEmptySlots > 0) {
const possibleMaxCredits = Math.min(this.maxConcurrentCalls, numberOfEmptySlots);
if (possibleMaxCredits > receiver.credit) {
const creditsToAdd = possibleMaxCredits - receiver.credit;
this.streamingReceiverHelper.addCredit(creditsToAdd);
logger.verbose(
`${logPrefix} creditManager: added ${creditsToAdd} credits upon receiving a message; total credits = ${receiver?.credit}`
);
}
return;
}

// if (receiver) {
// No empty slots left, so notify the user with an error
// Commented out because we want to be consistent with other languages
// notifyError?.({
// error: new ServiceBusError(
// UnsettledMessagesLimitExceededError,
// "UnsettledMessagesLimitExceeded" as ServiceBusErrorCode
// ),
// errorSource: "receive",
// entityPath: this.entityPath,
// fullyQualifiedNamespace: this.fullyQualifiedNamespace
// });
// } else {
// receiver is not defined
// SessionLockLost for sessions/onAMQPError for non-sessions will be notified in one of the listeners - nothing to do here
// }
HarshaNalluru marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Meant to be called after a message is settled with the receive link.
* Replenishes the number of credits on the link based on the maxConcurrentCalls and the numberOfEmptySlots to receive more messages.
*
* @internal
*/
postProcessing() {
const { receiver, logPrefix } = this._getCurrentReceiver();
const { numberOfEmptySlots } = incomingBufferProperties(receiver);
if (this.receiveMode === "peekLock") {
if (receiver && numberOfEmptySlots > 0) {
const possibleMaxCredits = Math.min(this.maxConcurrentCalls, numberOfEmptySlots);
if (possibleMaxCredits > receiver.credit) {
const creditsToAdd = possibleMaxCredits - receiver.credit;
this.streamingReceiverHelper.addCredit(creditsToAdd);
logger.verbose(
`${logPrefix} creditManager: added ${creditsToAdd} credits after message settlement; total credits = ${receiver?.credit}`
);
}
}
}
}
}
42 changes: 29 additions & 13 deletions sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
} from "./messageReceiver";
import { ConnectionContext } from "../connectionContext";

import { ReceiverHelper } from "./receiverHelper";
import { StreamingReceiverHelper } from "./streamingReceiverHelper";

import { throwErrorIfConnectionClosed } from "../util/errors";
import {
Expand All @@ -28,7 +28,7 @@ import { ServiceBusMessageImpl } from "../serviceBusMessage";
import { AbortSignalLike } from "@azure/abort-controller";
import { translateServiceBusError } from "../serviceBusError";
import { abandonMessage, completeMessage } from "../receivers/receiverCommon";
import { ReceiverHandlers } from "./shared";
import { ReceiverHandlers, StreamingReceiverCreditManager } from "./shared";

/**
* @internal
Expand Down Expand Up @@ -65,7 +65,7 @@ export class StreamingReceiver extends MessageReceiver {
*/
private _retryOptions: RetryOptions;

private _receiverHelper: ReceiverHelper;
private _streamingReceiverHelper: StreamingReceiverHelper;

/**
* Used so we can stub out retry in tests.
Expand Down Expand Up @@ -95,6 +95,11 @@ export class StreamingReceiver extends MessageReceiver {
*/
private _onAmqpError: OnAmqpEvent;

/**
* Provides helper methods to allow adding credits during initialization, on receiving a message, and after processing a message.
*/
private _creditManager: StreamingReceiverCreditManager;

/**
* The message handler that will be set as the handler on the
* underlying rhea receiver for the "message" event.
Expand All @@ -108,7 +113,7 @@ export class StreamingReceiver extends MessageReceiver {
// for the streaming receiver so long as we can receive messages then we
// _are_ receiving messages - there's no in-between state like there is
// with BatchingReceiver.
return this._receiverHelper.canReceiveMessages();
return this._streamingReceiverHelper.canReceiveMessages();
}

/**
Expand All @@ -127,11 +132,23 @@ export class StreamingReceiver extends MessageReceiver {
this._retryOptions = options?.retryOptions || {};
this._retry = retry;

this._receiverHelper = new ReceiverHelper(() => ({
this._streamingReceiverHelper = new StreamingReceiverHelper(() => ({
receiver: this.link,
logPrefix: this.logPrefix
}));

this._creditManager = new StreamingReceiverCreditManager(
() => ({
receiver: this.link,
logPrefix: this.logPrefix
}),
this._streamingReceiverHelper,
this.receiveMode,
this.maxConcurrentCalls
);

this.settlementNotifierForSubscribe = () => this._creditManager.postProcessing();

this._onAmqpClose = async (context: EventContext) => {
const receiverError = context.receiver && context.receiver.error;
const receiver = this.link || context.receiver!;
Expand Down Expand Up @@ -300,7 +317,7 @@ export class StreamingReceiver extends MessageReceiver {
}
return;
} finally {
this._receiverHelper.addCredit(1);
this._creditManager.onReceive(this._onError);
}

// If we've made it this far, then user's message handler completed fine. Let us try
Expand Down Expand Up @@ -358,7 +375,7 @@ export class StreamingReceiver extends MessageReceiver {
}

async stopReceivingMessages(): Promise<void> {
await this._receiverHelper.suspend();
await this._streamingReceiverHelper.suspend();
}

/**
Expand Down Expand Up @@ -433,22 +450,21 @@ export class StreamingReceiver extends MessageReceiver {
// this might seem odd but in reality this entire class is like one big function call that
// results in a receive(). Once we're being initialized we should consider ourselves the
// "owner" of the receiver and that it's now being locked into being the actual receiver.
this._receiverHelper.resume();
this._streamingReceiverHelper.resume();
}

/**
* Starts the receiver by establishing an AMQP session and an AMQP receiver link on the session.
*
* @param onMessage - The message handler to receive servicebus messages.
* @param onError - The error handler to receive an error that occurs while receivin messages.
* @param onError - The error handler to receive an error that occurs while receiving messages.
*/
subscribe(onMessage: OnMessage, onError: OnError): void {
throwErrorIfConnectionClosed(this._context);

this._onMessage = onMessage;
this._onError = onError;

this._receiverHelper.addCredit(this.maxConcurrentCalls);
this._creditManager.addCreditsInit();
}

/**
Expand Down Expand Up @@ -510,10 +526,10 @@ export class StreamingReceiver extends MessageReceiver {
onError: (args) => this._onError && this._onError(args)
});

this._receiverHelper.addCredit(this.maxConcurrentCalls);
logger.verbose(
`${this.logPrefix} onDetached: link has been reestablished, added ${this.maxConcurrentCalls} credits.`
`${this.logPrefix} onDetached: link has been reestablished, attempting to add credits.`
);
this._creditManager.addCreditsInit();
} finally {
this._isDetaching = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { receiverLogger as logger } from "../log";
*
* @internal
*/
export class ReceiverHelper {
export class StreamingReceiverHelper {
private _isSuspended: boolean = false;

constructor(
Expand Down Expand Up @@ -45,7 +45,7 @@ export class ReceiverHelper {
}

/**
* Drains the credits for the receiver and prevents the `receiverHelper.addCredit()` method from adding credits.
* Drains the credits for the receiver and prevents the `streamingReceiverHelper.addCredit()` method from adding credits.
* Call `resume()` to enable the `addCredit()` method.
*/
async suspend(): Promise<void> {
Expand Down
Loading