Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] Fix credit management based on the buffer capacity for the receivers #14060

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
127 commits
Select commit Hold shift + click to select a range
0e7de1c
Conditionally add credits
HarshaNalluru Mar 2, 2021
dd940e6
// Additionally.. have a checkWithTimeout that keeps chec…
HarshaNalluru Mar 2, 2021
76eb13e
Copy the 2048-limit-test.spec.ts test file from the other PR
HarshaNalluru Mar 2, 2021
a7191fe
simplify verifyMessageCount
HarshaNalluru Mar 2, 2021
e0ca3f5
peeklock - deliveryCount will be incremented for 2047 messages if clo…
HarshaNalluru Mar 2, 2021
bf789d0
new messageBatch returns zero after 2047 messages
HarshaNalluru Mar 2, 2021
fd9d20a
receiveAndDelete for subscribe
HarshaNalluru Mar 3, 2021
ecca8b0
handle streaming receiver case
HarshaNalluru Mar 3, 2021
b19575a
copy the logic for sessions
HarshaNalluru Mar 3, 2021
9a26c5e
addcredit(1) logic when hitting the limit
HarshaNalluru Mar 3, 2021
fddbd28
no need of asynclock
HarshaNalluru Mar 3, 2021
a209104
same fpr session receiver
HarshaNalluru Mar 3, 2021
144b904
creditsToAdd - only for peekLock - sessions
HarshaNalluru Mar 3, 2021
e7d7a82
tests
HarshaNalluru Mar 3, 2021
67bd418
bug fix in sending - in the test
HarshaNalluru Mar 3, 2021
26a30e1
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Mar 8, 2021
ceedf58
Handle undefined props while calculating numberOfEmptyIncomingSlots
HarshaNalluru Mar 8, 2021
d1c16ad
Increase timeout to 300000 for receiveAndDelete test
HarshaNalluru Mar 8, 2021
8f25785
Streaming Receiver: complete() after lock expiry with auto-renewal di…
HarshaNalluru Mar 8, 2021
fe6c0c2
We add credits based on the empty slots now
HarshaNalluru Mar 8, 2021
a4b867b
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Mar 8, 2021
ad2ef9b
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Mar 8, 2021
f36fc9e
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Mar 9, 2021
420af6d
Mock incoming circular buffer now that we look for capacity and the a…
HarshaNalluru Mar 9, 2021
eebb4fe
Update sdk/servicebus/service-bus/src/core/streamingReceiver.ts
HarshaNalluru Mar 11, 2021
68b1ee7
Simplifying streaming receiver credit addition
HarshaNalluru Mar 11, 2021
1ac91f3
Merge branch 'harshan/sb/issue/11633-solution-7' of https://github.co…
HarshaNalluru Mar 11, 2021
aa6b359
Update error message for sessions as well
HarshaNalluru Mar 11, 2021
478a7ae
Add receiver type for numberOfEmptyIncomingSlots
HarshaNalluru Mar 11, 2021
629de9a
depend on rhea-promise's commit with the type fix
HarshaNalluru Mar 11, 2021
9f2fd9b
numberOfEmptyIncomingSlots - remove "any"
HarshaNalluru Mar 11, 2021
db35c49
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Mar 12, 2021
6a5fd66
pnpm-lock
HarshaNalluru Mar 12, 2021
55138fa
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Mar 12, 2021
418a4f8
ExcessUnsettledMessagesInBuffer
HarshaNalluru Mar 13, 2021
38a5fff
Refactoring streaming receiver parts for sessions and non-sessions
HarshaNalluru Mar 15, 2021
1d0203b
API Report
HarshaNalluru Mar 15, 2021
8324a30
mocking fakeReceiver and remove a console.log that had been present s…
HarshaNalluru Mar 16, 2021
a38830c
fix session tests as well
HarshaNalluru Mar 16, 2021
8090238
mock it properly - the getter from rhea-promise
HarshaNalluru Mar 16, 2021
814765e
update assertion messages to explain why the assertions failed
HarshaNalluru Mar 16, 2021
cb3975c
New mthod addCreditsInit to refactor common code b/w sessions and non…
HarshaNalluru Mar 16, 2021
88bb497
_getCurrentReceiver for ProcessMessageCreditManager
HarshaNalluru Mar 16, 2021
99841cc
rename to StreamingReceiverCreditManager
HarshaNalluru Mar 16, 2021
b782d56
remove rename todo
HarshaNalluru Mar 16, 2021
2c048b3
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Apr 26, 2021
0fc6f9b
lock file
HarshaNalluru Apr 26, 2021
fddc414
lock file from master
HarshaNalluru Apr 26, 2021
00f6b4b
Update sdk/servicebus/service-bus/src/core/shared.ts
HarshaNalluru Apr 26, 2021
93f6a97
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Apr 26, 2021
7293fa4
changel;og
HarshaNalluru Apr 26, 2021
6188a4e
formatting
HarshaNalluru Apr 26, 2021
d7e914a
Update sdk/servicebus/service-bus/src/core/shared.ts
HarshaNalluru Apr 26, 2021
65630c8
TODOs
HarshaNalluru Apr 26, 2021
42e1b8f
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Apr 27, 2021
ffd2b79
todo
HarshaNalluru Apr 27, 2021
eea0cff
Merge branch 'harshan/sb/issue/11633-solution-7' of https://github.co…
HarshaNalluru Apr 27, 2021
8994c2d
_settlementNotifierForSubscribe
HarshaNalluru Apr 27, 2021
4bbbfbf
remove underscore - settlementNotifierForSubscribe
HarshaNalluru Apr 27, 2021
b72b80d
UnsettledMessagesLimitExceeded
HarshaNalluru Apr 27, 2021
d67a552
rushx format
HarshaNalluru Apr 27, 2021
a4c93f5
set .only
HarshaNalluru Apr 27, 2021
ae3109d
partitioned and unpartitioned had no diff
HarshaNalluru Apr 27, 2021
4a1b205
set to partitioned queue
HarshaNalluru Apr 27, 2021
b692c80
omit doc settings
HarshaNalluru Apr 27, 2021
25d465b
Revert "omit doc settings"
HarshaNalluru Apr 27, 2021
906f9f7
delete logs
HarshaNalluru Apr 27, 2021
7192d04
add console.logs and set .only for the slow tests
HarshaNalluru Apr 27, 2021
f385a24
add debug variable for debugging in CI
HarshaNalluru Apr 27, 2021
a466bbd
remove .only
HarshaNalluru Apr 28, 2021
5fb5562
revert package.json
HarshaNalluru Apr 28, 2021
b63d88a
Rename ReceiverHelper -> StreamingReceiverHelper
HarshaNalluru Apr 28, 2021
9634b74
- numberOfFilledSlots as well
HarshaNalluru Apr 28, 2021
f30bba5
this.maxConcurrentCalls - numberOfFilledSlots instead of this.maxConc…
HarshaNalluru Apr 28, 2021
e53e93d
add log messages
HarshaNalluru Apr 28, 2021
270573d
build failure: in the test
HarshaNalluru Apr 28, 2021
741ada6
remove old postProcessing code
HarshaNalluru Apr 28, 2021
a91db19
final major fixes to credit manager
HarshaNalluru Apr 28, 2021
1298c16
js doc
HarshaNalluru Apr 28, 2021
1711ded
numberOfEmptySlots<1
HarshaNalluru Apr 28, 2021
7820768
doc comment
HarshaNalluru Apr 28, 2021
ca98f09
UnsettledMessagesLimitExceeded Test TODO
HarshaNalluru Apr 28, 2021
50f1217
Update sdk/servicebus/service-bus/CHANGELOG.md
HarshaNalluru Apr 28, 2021
6cbebff
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Apr 28, 2021
9bf59cb
more comments for -1
HarshaNalluru Apr 29, 2021
d0ba78e
Merge branch 'harshan/sb/issue/11633-solution-7' of https://github.co…
HarshaNalluru Apr 29, 2021
dbd1097
UnsettledMessagesLimitExceeded => return 0 messages instead of throwing
HarshaNalluru Apr 30, 2021
c25ef93
InternalServiceBusErrorCode
HarshaNalluru Apr 30, 2021
11b8206
cast UnsettledMessagesLimitExceeded as ServiceBusErrorCode
HarshaNalluru Apr 30, 2021
8bbc2d9
mockBachingReceive for count
HarshaNalluru Apr 30, 2021
d366486
Api Report
HarshaNalluru Apr 30, 2021
c6ca65b
ServiceBusError not thrown for subscribe
HarshaNalluru Apr 30, 2021
34fdc32
not check for unsettledMessagesLimitErrorSeen
HarshaNalluru Apr 30, 2021
1f75e73
Update sdk/servicebus/service-bus/CHANGELOG.md
HarshaNalluru Apr 30, 2021
4415097
formatting
HarshaNalluru Apr 30, 2021
9416b33
Merge branch 'harshan/sb/issue/11633-solution-7' of https://github.co…
HarshaNalluru Apr 30, 2021
3f3def6
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Apr 30, 2021
97b5c8b
comment
HarshaNalluru Apr 30, 2021
e1bc672
unneeded arrow function
HarshaNalluru Apr 30, 2021
f66789e
add the arrow functions back
HarshaNalluru Apr 30, 2021
71780be
remove delays
HarshaNalluru Apr 30, 2021
516d270
update comments
HarshaNalluru Apr 30, 2021
bb07f09
reduceBufferCapacity
HarshaNalluru May 3, 2021
db05be0
- remove .only
HarshaNalluru May 3, 2021
3969549
Add subscribe issue too
HarshaNalluru May 3, 2021
69be170
Update sdk/servicebus/service-bus/src/core/shared.ts
HarshaNalluru May 3, 2021
1d6a152
elaborating a comment
HarshaNalluru May 3, 2021
e71dfdd
elaborating a comment - onReceive method
HarshaNalluru May 3, 2021
d0c9426
(until the service fixes the "draining" bug at the terminal case and/…
HarshaNalluru May 3, 2021
a2f75c5
Repeat it for sessions
HarshaNalluru May 3, 2021
0e9091c
settlementNotifierForSubscribe
HarshaNalluru May 3, 2021
7647e3d
Merge branch 'harshan/sb/issue/11633-solution-7' of https://github.co…
HarshaNalluru May 3, 2021
27e0480
new line
HarshaNalluru May 3, 2021
eebffb3
more text in changelog
HarshaNalluru May 3, 2021
7cad41d
remove as Receiver cast in test
HarshaNalluru May 3, 2021
716f7b1
abandon seems to have added the delivery count prop back on the message
HarshaNalluru May 3, 2021
931b9fd
@internal tag
HarshaNalluru May 3, 2021
b551bf8
remove _notifyError at onReceive(_notifyError: OnError | undefined)
HarshaNalluru May 3, 2021
7c9136c
addInitialCredits
HarshaNalluru May 3, 2021
f1332bf
@internal tag
HarshaNalluru May 3, 2021
4d26b7f
dead code in test
HarshaNalluru May 3, 2021
fce9703
settleMessageOperation refactor to remove !
HarshaNalluru May 3, 2021
afc9760
Add awaits and wrap with try catch
HarshaNalluru May 3, 2021
2d5800d
Merge StreamingReceiverCreditManager into StreamingReceiverHelper
HarshaNalluru May 3, 2021
02710aa
revert changelog deletion
HarshaNalluru May 3, 2021
fd07e8c
resolve build failures
HarshaNalluru May 3, 2021
5295fe0
missed to throw
HarshaNalluru May 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { createAndEndProcessingSpan } from "../diagnostics/instrumentServiceBusMessage";
import { ReceiveMode } from "../models";
import { ServiceBusError, translateServiceBusError } from "../serviceBusError";
import { numberOfEmptyIncomingSlots } from "../receivers/shared";

/**
* Describes the batching receiver where the user can receive a specified number of messages for
Expand Down Expand Up @@ -490,15 +491,17 @@ export class BatchingReceiverLite {
reject(err);
}, args.abortSignal);

logger.verbose(
`${loggingPrefix} Adding credit for receiving ${args.maxMessageCount} messages.`
);
const creditsToAdd =
this._receiveMode === "peekLock"
? Math.min(args.maxMessageCount, numberOfEmptyIncomingSlots(receiver) - 1)
: args.maxMessageCount;
logger.verbose(`${loggingPrefix} Adding credit for receiving ${creditsToAdd} messages.`);

// By adding credit here, we let the service know that at max we can handle `maxMessageCount`
// By adding credit here, we let the service know that at max we can handle `creditsToAdd`
// number of messages concurrently. We will return the user an array of messages that can
// be of size upto maxMessageCount. Then the user needs to accordingly dispose
// be of size upto `creditsToAdd`. Then the user needs to accordingly dispose
// (complete/abandon/defer/deadletter) the messages from the array.
receiver.addCredit(args.maxMessageCount);
receiver.addCredit(creditsToAdd);
HarshaNalluru marked this conversation as resolved.
Show resolved Hide resolved

logger.verbose(
`${loggingPrefix} Setting the wait timer for ${args.maxWaitTimeInMs} milliseconds.`
Expand Down
40 changes: 32 additions & 8 deletions sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import { receiverLogger as logger } from "../log";
import { AmqpError, EventContext, OnAmqpEvent } from "rhea-promise";
import { ServiceBusMessageImpl } from "../serviceBusMessage";
import { AbortSignalLike } from "@azure/abort-controller";
import { translateServiceBusError } from "../serviceBusError";
import { abandonMessage, completeMessage } from "../receivers/shared";
import { ServiceBusError, translateServiceBusError } from "../serviceBusError";
import { abandonMessage, completeMessage, numberOfEmptyIncomingSlots } from "../receivers/shared";
import { ReceiverHandlers } from "./shared";

/**
Expand Down Expand Up @@ -294,7 +294,24 @@ export class StreamingReceiver extends MessageReceiver {
}
return;
} finally {
this._receiverHelper.addCredit(1);
if (this.receiveMode === "receiveAndDelete") {
this._receiverHelper.addCredit(1);
} else if (numberOfEmptyIncomingSlots(this.link) - 1 > 1) {
this._receiverHelper.addCredit(1);
} else {
// Additionally.. have a checkWithTimeout that keeps checking if the above if-condition satisfies
// If it ever satisfies - add the credit
this._onError?.({
error: new ServiceBusError(
`Circular buffer that contains the incoming deliveries is full, please settle the messages using settlement methods such as .completeMessage() on the receiver.
Or set the "autoComplete" flag to true to let the library complete the messages on your behalf.`,
HarshaNalluru marked this conversation as resolved.
Show resolved Hide resolved
"GeneralError"
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
),
errorSource: "receive",
entityPath: this.entityPath,
fullyQualifiedNamespace: this._context.config.host
});
}
}

// If we've made it this far, then user's message handler completed fine. Let us try
Expand Down Expand Up @@ -434,15 +451,19 @@ export class StreamingReceiver extends MessageReceiver {
* Starts the receiver by establishing an AMQP session and an AMQP receiver link on the session.
*
* @param onMessage - The message handler to receive servicebus messages.
* @param onError - The error handler to receive an error that occurs while receivin messages.
* @param onError - The error handler to receive an error that occurs while receiving messages.
*/
subscribe(onMessage: OnMessage, onError: OnError): void {
throwErrorIfConnectionClosed(this._context);

this._onMessage = onMessage;
this._onError = onError;

this._receiverHelper.addCredit(this.maxConcurrentCalls);
this._receiverHelper.addCredit(
this.receiveMode === "peekLock"
? Math.min(this.maxConcurrentCalls, numberOfEmptyIncomingSlots(this.link) - 1)
: this.maxConcurrentCalls
);
HarshaNalluru marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down Expand Up @@ -503,10 +524,13 @@ export class StreamingReceiver extends MessageReceiver {
connectionId: this._context.connectionId,
onError: (args) => this._onError && this._onError(args)
});

this._receiverHelper.addCredit(this.maxConcurrentCalls);
const creditsToAdd =
HarshaNalluru marked this conversation as resolved.
Show resolved Hide resolved
this.receiveMode === "peekLock"
? Math.min(this.maxConcurrentCalls, numberOfEmptyIncomingSlots(this.link) - 1)
: this.maxConcurrentCalls;
this._receiverHelper.addCredit(creditsToAdd);
logger.verbose(
`${this.logPrefix} onDetached: link has been reestablished, added ${this.maxConcurrentCalls} credits.`
`${this.logPrefix} onDetached: link has been reestablished, added ${creditsToAdd} credits.`
);
} finally {
this._isDetaching = false;
Expand Down
7 changes: 7 additions & 0 deletions sdk/servicebus/service-bus/src/receivers/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,10 @@ function settleMessage(
throw translateServiceBusError(err);
});
}

export function numberOfEmptyIncomingSlots(receiver: any): number {
return (
(receiver as any).session._session.incoming.deliveries.capacity -
HarshaNalluru marked this conversation as resolved.
Show resolved Hide resolved
(receiver as any).session._session.incoming.deliveries.size
);
}
29 changes: 25 additions & 4 deletions sdk/servicebus/service-bus/src/session/messageSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import {
SubscribeOptions
} from "../models";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { translateServiceBusError } from "../serviceBusError";
import { abandonMessage, completeMessage } from "../receivers/shared";
import { ServiceBusError, translateServiceBusError } from "../serviceBusError";
import { abandonMessage, completeMessage, numberOfEmptyIncomingSlots } from "../receivers/shared";

/**
* Describes the options that need to be provided while creating a message session receiver link.
Expand Down Expand Up @@ -672,7 +672,24 @@ export class MessageSession extends LinkEntity<Receiver> {
}
return;
} finally {
this.receiverHelper.addCredit(1);
if (this.receiveMode === "receiveAndDelete") {
this._receiverHelper.addCredit(1);
} else if (numberOfEmptyIncomingSlots(this.link) - 1 > 1) {
this._receiverHelper.addCredit(1);
} else {
// Additionally.. have a checkWithTimeout that keeps checking if the above if-condition satisfies
// If it ever satisfies - add the credit
this._notifyError?.({
error: new ServiceBusError(
`Circular buffer that contains the incoming deliveries is full, please settle the messages using settlement methods such as .completeMessage() on the receiver.
Or set the "autoComplete" flag to true to let the library complete the messages on your behalf.`,
"GeneralError"
),
errorSource: "receive",
entityPath: this.entityPath,
fullyQualifiedNamespace: this._context.config.host
});
}
}

// If we've made it this far, then user's message handler completed fine. Let us try
Expand Down Expand Up @@ -709,7 +726,11 @@ export class MessageSession extends LinkEntity<Receiver> {
// setting the "message" event listener.
this.link.on(ReceiverEvents.message, onSessionMessage);
// adding credit
this.receiverHelper.addCredit(this.maxConcurrentCalls);
const creditsToAdd = Math.min(
this.maxConcurrentCalls,
numberOfEmptyIncomingSlots(this.link) - 1
);
this._receiverHelper.addCredit(creditsToAdd);
} else {
this._isReceivingMessagesForSubscriber = false;
const msg =
Expand Down