Skip to content

Commit

Permalink
Merge pull request #78 from ayeo-flex-org/fix/receiveQueueOverflow
Browse files Browse the repository at this point in the history
changed the reflow to happen when a consumer reads a message
  • Loading branch information
galrose authored Dec 14, 2021
2 parents df6d956 + 0632c4b commit 83e3aef
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 25 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "pulsar-flex",
"version": "1.0.1-beta.7",
"version": "1.0.1-beta.8",
"description": "A package that natively supports pulsar api",
"main": "src/index.js",
"scripts": {
Expand Down
27 changes: 20 additions & 7 deletions src/consumer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const pulsarApi = require('../commands/protocol/pulsar/pulsar_pb');
const { createLogger, LEVELS } = require('../logger');
const defaultLogger = require('../logger/default');
const PriorityQueue = require('./priorityQueue');
const { EventEmitter } = require('events');
const SUB_TYPES = pulsarApi.CommandSubscribe.SubType;
const ACK_TYPES = { ...pulsarApi.CommandAck.AckType, NEGATIVE: -1 };
const INITIAL_POSITION = pulsarApi.CommandSubscribe.InitialPosition;
Expand Down Expand Up @@ -52,6 +53,7 @@ module.exports = class Consumer {
this._receiveQueueSize = receiveQueueSize;
this._reconnectInterval = reconnectInterval;
this._isRedeliveringUnacknowledgedMessages = false;
this._emitter = new EventEmitter();
this._prioritizeUnacknowledgedMessages = prioritizeUnacknowledgedMessages;
this._requestId = 0;
this._curFlow = receiveQueueSize;
Expand All @@ -61,7 +63,10 @@ module.exports = class Consumer {
this._onMessageParams = {};
this._processTimeoutInterval = null;

this._receiveQueue = new PriorityQueue();
this._receiveQueue = new PriorityQueue({
maxQueueSize: receiveQueueSize,
logger: this._logger,
});

this._requestIdMediator = new responseMediators.RequestIdResponseMediator({
client: this._client,
Expand All @@ -74,6 +79,7 @@ module.exports = class Consumer {

this._isSubscribed = false;
this._enqueueMessage = (data) => {
if (this._receiveQueue.length() === this._receiveQueueSize) return;
// Classic for, for performance
for (let i = 0; i < data.messages.length; i++) {
this._receiveQueue.enqueue(
Expand All @@ -95,9 +101,9 @@ module.exports = class Consumer {

this._reflow = async () => {
const nextFlow = Math.ceil(this._receiveQueueSize / 2);
if (--this._curFlow <= nextFlow) {
this._curFlow += nextFlow;
if (--this._curFlow < nextFlow) {
this._logger.info(`Re-flow, asking for ${nextFlow} more messages.`);
this._curFlow += nextFlow;
await this._flow(nextFlow);
}
};
Expand Down Expand Up @@ -184,7 +190,10 @@ module.exports = class Consumer {
redeliveringUnacknowledgedMessages &&
(this._subType === SUB_TYPES.EXCLUSIVE || this._subType === SUB_TYPES.FAILOVER)
)
this._receiveQueue = new PriorityQueue();
this._receiveQueue = new PriorityQueue({
maxQueueSize: this._receiveQueueSize,
logger: this._logger,
});
this._isRedeliveringUnacknowledgedMessages = redeliveringUnacknowledgedMessages;
};

Expand All @@ -199,10 +208,13 @@ module.exports = class Consumer {

_cleanState = () => {
clearTimeout(this._processTimeoutInterval);
this._client.getResponseEvents().off('message', this._reflow);
this._emitter.off('dequeueMessage', this._reflow);
this._client.getResponseEvents().off('message', this._enqueueMessage);
this._isSubscribed = false;
this._receiveQueue = new PriorityQueue();
this._receiveQueue = new PriorityQueue({
maxQueueSize: this._receiveQueueSize,
logger: this._logger,
});
};

unsubscribe = async () => {
Expand Down Expand Up @@ -268,8 +280,8 @@ module.exports = class Consumer {
}
this._onMessageParams = { onMessage, autoAck };
this._setState(STATES.ACTIVE);
this._client.getResponseEvents().on('message', this._reflow);
this._client.getResponseEvents().on('message', this._enqueueMessage);
this._emitter.on('dequeueMessage', this._reflow);

const process = async () => {
if (!this._isSubscribed) {
Expand All @@ -285,6 +297,7 @@ module.exports = class Consumer {
return;
}
const message = this._receiveQueue.dequeue();
this._emitter.emit('dequeueMessage');
if (autoAck) {
await this._ack({
messageIdData: message.command.messageId,
Expand Down
7 changes: 0 additions & 7 deletions src/consumer/priorityQueue/element.js

This file was deleted.

11 changes: 8 additions & 3 deletions src/consumer/priorityQueue/index.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
const QElement = require('./element');
class PriorityQueue {
constructor() {
constructor({ maxQueueSize, logger }) {
this.items = [];
this.maxQueueSize = maxQueueSize;
this.logger = logger;
}

enqueue(element, priority) {
const qElement = new QElement(element, priority);
if (this.items.length === this.maxQueueSize + 1)
this.logger.warn(
`The queue has reached the maximum size, still adding to queue but may cause issues.`
);
const qElement = { element, priority };
let contain = false;
// for, for performance
for (let i = 0; i < this.items.length; i++) {
Expand Down
1 change: 1 addition & 0 deletions test/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const config = {
discoveryServers: ['localhost:6650'],
topic: 'persistent://public/default/test',
subscription: 'subscription',
receiveQueueSize: 1000,
producerConfiguration: { producerAccessMode: 'EXCLUSIVE' },
commandConnectedBuffer: Buffer.from([
0, 0, 0, 35, 0, 0, 0, 31, 8, 3, 26, 27, 10, 18, 80, 117, 108, 115, 97, 114, 32, 83, 101, 114,
Expand Down
39 changes: 32 additions & 7 deletions test/consumer/index-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const assert = require('assert');
const utils = require('../utils');
const { LEVELS } = require('../../src/logger');

const { jwt, discoveryServers, topic, containerName } = config;
const { jwt, discoveryServers, topic, containerName, receiveQueueSize } = config;

describe('Consumer tests', function () {
const cons = new Consumer({
Expand All @@ -15,7 +15,7 @@ describe('Consumer tests', function () {
subType: Consumer.SUB_TYPES.FAILOVER,
consumerName: 'Consy',
readCompacted: false,
receiveQueueSize: 1000,
receiveQueueSize,
logLevel: LEVELS.INFO,
});
const cons2 = new Consumer({
Expand All @@ -26,7 +26,7 @@ describe('Consumer tests', function () {
subType: Consumer.SUB_TYPES.FAILOVER,
consumerName: 'Consy2',
readCompacted: false,
receiveQueueSize: 1000,
receiveQueueSize,
logLevel: LEVELS.INFO,
});
const sharedConsumer1 = new Consumer({
Expand All @@ -37,7 +37,7 @@ describe('Consumer tests', function () {
subType: Consumer.SUB_TYPES.SHARED,
consumerName: 'Consy3',
readCompacted: false,
receiveQueueSize: 1000,
receiveQueueSize,
logLevel: LEVELS.INFO,
});
const sharedConsumer2 = new Consumer({
Expand All @@ -48,7 +48,7 @@ describe('Consumer tests', function () {
subType: Consumer.SUB_TYPES.SHARED,
consumerName: 'Consy4',
readCompacted: false,
receiveQueueSize: 1000,
receiveQueueSize,
logLevel: LEVELS.INFO,
});
const unackPrioritySharedConsumer = new Consumer({
Expand All @@ -59,7 +59,7 @@ describe('Consumer tests', function () {
subType: Consumer.SUB_TYPES.SHARED,
consumerName: 'Consy5',
readCompacted: false,
receiveQueueSize: 1000,
receiveQueueSize,
logLevel: LEVELS.INFO,
prioritizeUnacknowledgedMessages: true,
});
Expand Down Expand Up @@ -97,7 +97,29 @@ describe('Consumer tests', function () {
await smallReceiveQueueConsumer.unsubscribe();
}
});
describe('Reflow tests', function () {
describe('Flow tests', function () {
it('Should empty the receive queue after reading all the messages', async function () {
try {
await cons.subscribe();
let messageCounter = 0;
const numberOfMessages = 20;
const messages = Array(numberOfMessages).fill('message');
await utils.produceMessages({ messages });
await new Promise((resolve, reject) => {
cons.run({
onMessage: async ({ message }) => {
messageCounter++;
if (messageCounter === numberOfMessages) resolve();
},
});
});
assert.equal(cons._receiveQueue.length(), 0);
assert.equal(cons._curFlow, receiveQueueSize - numberOfMessages);
} catch (e) {
console.log(e);
throw e;
}
});
it('Should read the messages after multiple reflows', async function () {
try {
await smallReceiveQueueConsumer.subscribe();
Expand All @@ -109,12 +131,15 @@ describe('Consumer tests', function () {
smallReceiveQueueConsumer.run({
onMessage: async ({ message }) => {
messageCounter++;
console.log(message.toString());
if (messageCounter === numberOfMessages) resolve();
},
});
});
assert.equal(smallReceiveQueueConsumer._curFlow, 3);
} catch (e) {
console.log(e);
throw e;
}
});
});
Expand Down

0 comments on commit 83e3aef

Please sign in to comment.