Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] Avoid filling the buffer to not let the receiveMessages hanging #14039

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
"build:types": "downlevel-dts types/latest types/3.1",
"build": "tsc -p . && rollup -c 2>&1 && npm run extract-api && npm run build:types",
"check-format": "prettier --list-different --config ../../../.prettierrc.json --ignore-path ../../../.prettierignore \"src/**/*.ts\" \"test/**/*.ts\" \"*.{js,json}\"",
"clean": "rimraf dist dist-esm test-dist types *.tgz *.log coverage coverage-browser .nyc_output",
"clean": "rimraf dist dist-esm test-dist types *.tgz coverage coverage-browser .nyc_output",
"execute:js-samples": "node ../../../common/scripts/run-samples.js samples/javascript/",
"execute:ts-samples": "node ../../../common/scripts/run-samples.js samples/typescript/dist/samples/typescript/src/",
"execute:samples": "echo skipped",
Expand Down
21 changes: 19 additions & 2 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ export class BatchingReceiver extends MessageReceiver {

return this.link;
},
this.receiveMode
this.receiveMode,
this._outstandingDeliveries
);
}

Expand Down Expand Up @@ -116,6 +117,10 @@ export class BatchingReceiver extends MessageReceiver {
this.name
);

if (this._outstandingDeliveries.length === 2047) {
return [];
}

const messages = await this._batchingReceiverLite.receiveMessages({
maxMessageCount,
maxWaitTimeInMs,
Expand Down Expand Up @@ -235,7 +240,8 @@ export class BatchingReceiverLite {
private _getCurrentReceiver: (
abortSignal?: AbortSignalLike
) => Promise<MinimalReceiver | undefined>,
private _receiveMode: ReceiveMode
private _receiveMode: ReceiveMode,
private _outstandingDeliveries: number[]
) {
this._createAndEndProcessingSpan = createAndEndProcessingSpan;

Expand Down Expand Up @@ -440,6 +446,17 @@ export class BatchingReceiverLite {
);
reject(errObj);
}
if (this._receiveMode === "peekLock") {
this._outstandingDeliveries.push(context.delivery!.id);
if (this._outstandingDeliveries.length === 2047) {
// TODO: Make the circular buffer size configurable in rhea
logger.verbose(
`${loggingPrefix} Batching, circular buffer that contains the incoming deliveries is full,
please settle the messages using settlement methods such as .completeMessage() on the receiver.`
);
finalAction();
}
}
if (brokeredMessages.length === args.maxMessageCount) {
finalAction();
}
Expand Down
12 changes: 11 additions & 1 deletion sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ export abstract class MessageReceiver extends LinkEntity<Receiver> {
* Default: false.
*/
autoComplete: boolean;
/**
* Maintains a deliveries that are yet to be settled.
* Once the limit of 2048(rhea's limit) is reached, receiver doesn't provide anymore messages.
*/
protected _outstandingDeliveries: number[] = [];
/**
* Maintains a map of deliveries that
* are being actively disposed. It acts as a store for correlating the responses received for
Expand Down Expand Up @@ -171,7 +176,12 @@ export abstract class MessageReceiver extends LinkEntity<Receiver> {
},
{
onSettled: (context: EventContext) => {
return onMessageSettled(this.logPrefix, context.delivery, this._deliveryDispositionMap);
return onMessageSettled(
this.logPrefix,
context.delivery,
this._deliveryDispositionMap,
this._outstandingDeliveries
);
},
...handlers
}
Expand Down
6 changes: 5 additions & 1 deletion sdk/servicebus/service-bus/src/core/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ export interface DeferredPromiseAndTimer {
export function onMessageSettled(
logPrefix: string,
delivery: Delivery | undefined,
deliveryDispositionMap: Map<number, DeferredPromiseAndTimer>
deliveryDispositionMap: Map<number, DeferredPromiseAndTimer>,
_outstandingDeliveries: number[]
): void {
if (delivery) {
const id = delivery.id;
Expand Down Expand Up @@ -66,6 +67,9 @@ export function onMessageSettled(
id,
deleteResult
);

_outstandingDeliveries = _outstandingDeliveries.filter((item) => item !== id);

if (state && state.error && (state.error.condition || state.error.description)) {
const error = translateServiceBusError(state.error);
return promise.reject(error);
Expand Down
20 changes: 18 additions & 2 deletions sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import { receiverLogger as logger } from "../log";
import { AmqpError, EventContext, OnAmqpEvent } from "rhea-promise";
import { ServiceBusMessageImpl } from "../serviceBusMessage";
import { AbortSignalLike } from "@azure/abort-controller";
import { translateServiceBusError } from "../serviceBusError";
import { ServiceBusError, translateServiceBusError } from "../serviceBusError";
import { abandonMessage, completeMessage } from "../receivers/shared";
import { ReceiverHandlers } from "./shared";

Expand Down Expand Up @@ -236,6 +236,22 @@ export class StreamingReceiver extends MessageReceiver {

try {
await this._onMessage(bMessage);
if (this.receiveMode === "peekLock") {
this._outstandingDeliveries.push(context.delivery!.id);
if (this._outstandingDeliveries.length === 2047) {
// TODO: Make the circular buffer size configurable in rhea
this._onError?.({
error: new ServiceBusError(
`Circular buffer that contains the incoming deliveries is full, please settle the messages using settlement methods such as .completeMessage() on the receiver.
Or set the "autoComplete" flag to true to let the library complete the messages on your behalf.`,
"GeneralError"
),
errorSource: "receive",
entityPath: this.entityPath,
fullyQualifiedNamespace: this._context.config.host
});
}
}
} catch (err) {
logger.logError(
err,
Expand All @@ -245,7 +261,7 @@ export class StreamingReceiver extends MessageReceiver {
bMessage.messageId,
this.name
);
this._onError!({
this._onError?.({
error: err,
errorSource: "processMessageCallback",
entityPath: this.entityPath,
Expand Down
34 changes: 31 additions & 3 deletions sdk/servicebus/service-bus/src/session/messageSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import {
SubscribeOptions
} from "../models";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { translateServiceBusError } from "../serviceBusError";
import { ServiceBusError, translateServiceBusError } from "../serviceBusError";
import { abandonMessage, completeMessage } from "../receivers/shared";

/**
Expand Down Expand Up @@ -119,6 +119,11 @@ export class MessageSession extends LinkEntity<Receiver> {
private _batchingReceiverLite: BatchingReceiverLite;
private _isReceivingMessagesForSubscriber: boolean;

/**
* Maintains a deliveries that are yet to be settled.
* Once the limit of 2048(rhea's limit) is reached, receiver doesn't provide anymore messages.
*/
protected _outstandingDeliveries: number[] = [];
/**
* Maintains a map of deliveries that
* are being actively disposed. It acts as a store for correlating the responses received for
Expand Down Expand Up @@ -383,14 +388,20 @@ export class MessageSession extends LinkEntity<Receiver> {
async (_abortSignal?: AbortSignalLike): Promise<MinimalReceiver> => {
return this.link!;
},
this.receiveMode
this.receiveMode,
this._outstandingDeliveries
);

// setting all the handlers
this._onSettled = (context: EventContext) => {
const delivery = context.delivery;

onMessageSettled(this.logPrefix, delivery, this._deliveryDispositionMap);
onMessageSettled(
this.logPrefix,
delivery,
this._deliveryDispositionMap,
this._outstandingDeliveries
);
};

this._notifyError = (args: ProcessErrorArgs) => {
Expand Down Expand Up @@ -622,6 +633,23 @@ export class MessageSession extends LinkEntity<Receiver> {

try {
await this._onMessage(bMessage);

if (this.receiveMode === "peekLock") {
this._outstandingDeliveries.push(context.delivery!.id);
if (this._outstandingDeliveries.length === 2047) {
// TODO: Make the circular buffer size configurable in rhea
this._notifyError?.({
error: new ServiceBusError(
`Circular buffer that contains the incoming deliveries is full, please settle the messages using settlement methods such as .completeMessage() on the receiver.
Or set the "autoComplete" flag to true to let the library complete the messages on your behalf.`,
"GeneralError"
),
errorSource: "receive",
entityPath: this.entityPath,
fullyQualifiedNamespace: this._context.config.host
});
}
}
} catch (err) {
logger.logError(
err,
Expand Down
147 changes: 147 additions & 0 deletions sdk/servicebus/service-bus/test/internal/2048-limit-test.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import chai from "chai";
import chaiAsPromised from "chai-as-promised";
import { ServiceBusAdministrationClient, ServiceBusSender } from "../../src";
import { TestClientType } from "../public/utils/testUtils";
import { ServiceBusReceiver } from "../../src/receivers/receiver";
import {
ServiceBusClientForTests,
createServiceBusClientForTests,
EntityName,
getConnectionString
} from "../public/utils/testutils2";
import { verifyMessageCount } from "../public/utils/managementUtils";

chai.use(chaiAsPromised);

let serviceBusClient: ServiceBusClientForTests;
let entityName: EntityName;
let sender: ServiceBusSender;
let receiver: ServiceBusReceiver;
const testClientTypes = [
// TestClientType.PartitionedQueue
// TestClientType.PartitionedQueueWithSessions,
TestClientType.UnpartitionedQueue
// TestClientType.UnpartitionedQueueWithSessions
];

async function beforeEachTest(
entityType: TestClientType,
receiveMode: "peekLock" | "receiveAndDelete" = "peekLock"
): Promise<void> {
entityName = await serviceBusClient.test.createTestEntities(entityType);
if (receiveMode === "receiveAndDelete") {
receiver = await serviceBusClient.test.createReceiveAndDeleteReceiver(entityName);
} else {
receiver = await serviceBusClient.test.createPeekLockReceiver(entityName, {
maxAutoLockRenewalDurationInMs: 0
});
}

sender = serviceBusClient.test.addToCleanup(
serviceBusClient.createSender(entityName.queue ?? entityName.topic!)
);
}

function afterEachTest(): Promise<void> {
return serviceBusClient.test.afterEach();
}

describe("2048 scenarios - receiveBatch in a loop", function(): void {
const numberOfMessagesToSend = 3000;

before(() => {
serviceBusClient = createServiceBusClientForTests();
});

after(() => {
return serviceBusClient.test.after();
});

afterEach(async () => {
await afterEachTest();
});

async function sendMessages() {
let current = 0;
while (current < numberOfMessagesToSend) {
const batch = await sender.createMessageBatch();
while (
batch.tryAddMessage({ body: `message-${current}` }) &&
current++ &&
current < numberOfMessagesToSend
);
await sender.sendMessages(batch);
}
console.log(
`Sent ${
(
await new ServiceBusAdministrationClient(getConnectionString()).getQueueRuntimeProperties(
entityName.queue!
)
).activeMessageCount
} messages!!!`
);
}

async function receiveMessages(receiveMode: "peekLock" | "receiveAndDelete" = "peekLock") {
const startedAt = new Date();
// const testDurationInMs = 50000;
let maxMessageCount = 50;
let maxWaitTimeInMs = 3000;
let numberOfMessagesReceived = 0;
let elapsedTime = new Date().valueOf() - startedAt.valueOf();
let numberOfMessagesToReceive = numberOfMessagesToSend;
if (receiveMode === "peekLock") {
numberOfMessagesToReceive = 2047;
}
while (numberOfMessagesReceived < numberOfMessagesToReceive) {
console.log(`New receive started... ${elapsedTime / 1000} seconds`);
numberOfMessagesReceived += (
await receiver.receiveMessages(maxMessageCount, { maxWaitTimeInMs })
).length;
elapsedTime = new Date().valueOf() - startedAt.valueOf();
console.log(
`Receive ended... ${elapsedTime /
1000} seconds. Total received so far = ${numberOfMessagesReceived}`
);
}
}

describe("receiveAndDelete", () => {
testClientTypes.forEach((clientType) => {
it(
clientType + ": would be able to receive more than 2048 messages",
async function(): Promise<void> {
await beforeEachTest(clientType, "receiveAndDelete");
await sendMessages();
await receiveMessages();
await verifyMessageCount(0, entityName);
}
);
});
});

describe("peekLock", () => {
testClientTypes.forEach((clientType) => {
it(
clientType + ": can receive upto 2048 messages without message loss",
async function(): Promise<void> {
await beforeEachTest(clientType);
await sendMessages();
await receiveMessages();
await verifyMessageCount(numberOfMessagesToSend, entityName);
// TODO:
// - Close the client
// - Receive all the messages again
// - Settle the messages
// - Delivery count should have been 1(or incremented) for 2048 of the messages
// - Rest 952 messages should have zero delivery count
// This makes sure there is no message loss
}
);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -798,14 +798,9 @@ describe("Streaming Receiver Tests", () => {
`Expected 0 messages, but received ${receivedMsgs.length}`
);
receiver = await serviceBusClient.test.createReceiveAndDeleteReceiver(entityNames);
await verifyMessageCount(
totalNumOfMessages,
entityNames.queue,
entityNames.topic,
entityNames.subscription
);
await verifyMessageCount(totalNumOfMessages, entityNames);
await drainReceiveAndDeleteReceiver(receiver);
await verifyMessageCount(0, entityNames.queue, entityNames.topic, entityNames.subscription);
await verifyMessageCount(0, entityNames);
}

it(
Expand Down
Loading