Skip to content

Commit

Permalink
Merge pull request #69 from ayeo-flex-org/bug/fixProducerReconnect
Browse files Browse the repository at this point in the history
Bug/fix producer handling reconnect & topic unload
  • Loading branch information
ronfarkash authored Oct 31, 2021
2 parents c58b492 + 4802110 commit 42d61c1
Show file tree
Hide file tree
Showing 14 changed files with 88 additions and 106 deletions.
38 changes: 0 additions & 38 deletions .run/Test all.run.xml

This file was deleted.

15 changes: 0 additions & 15 deletions .run/Test client resolver.run.xml

This file was deleted.

15 changes: 0 additions & 15 deletions .run/Test client serde.run.xml

This file was deleted.

15 changes: 0 additions & 15 deletions .run/Test client.run.xml

This file was deleted.

15 changes: 0 additions & 15 deletions .run/Test consumer.run.xml

This file was deleted.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "pulsar-flex",
"version": "1.0.1-beta.4",
"version": "1.0.1-beta.5",
"description": "A package that natively supports pulsar api",
"main": "src/index.js",
"scripts": {
Expand Down
9 changes: 9 additions & 0 deletions src/errors/PulsarFlexProducerAlreadyCreatedError.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class PulsarFlexProducerAlreadyCreatedError extends Error {
constructor({ message }) {
super();
this.message = message;
this.name = 'PulsarFlexProducerAlreadyCreatedError';
}
}

module.exports = PulsarFlexProducerAlreadyCreatedError;
2 changes: 2 additions & 0 deletions src/errors/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const PulsarFlexBrokerTimeoutError = require('./PulsarFlexBrokerTimeoutError');
const PulsarFlexProducerCreationError = require('./PulsarFlexProducerCreationError');
const PulsarFlexProducerAlreadyCreatedError = require('./PulsarFlexProducerAlreadyCreatedError');
const PulsarFlexNoPayloadError = require('./PulsarFlexNoPayloadError');
const PulsarFlexProducerCloseError = require('./PulsarFlexProducerCloseError');
const PulsarFlexConnectionError = require('./PulsarFlexConnectionError');
Expand All @@ -16,6 +17,7 @@ const PulsarFlexConsumerCloseError = require('./PulsarFlexConsumerCloseError');
module.exports = {
PulsarFlexBrokerTimeoutError,
PulsarFlexProducerCreationError,
PulsarFlexProducerAlreadyCreatedError,
PulsarFlexNoPayloadError,
PulsarFlexProducerCloseError,
PulsarFlexConnectionError,
Expand Down
17 changes: 15 additions & 2 deletions src/producer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class Producer {
this._logger.info(`Creating producer to topic: ${this._topic}`);

if (this._connected)
throw new errors.PulsarFlexProducerCreationError({
throw new errors.PulsarFlexProducerAlreadyCreatedError({
message: 'Already connected, please close before trying again',
});
this._logger.info(`Creating client connection for producer to topic: ${this._topic}`);
Expand All @@ -65,7 +65,10 @@ class Producer {
await this._client.getCnx().addCleanUpListener(() => {
this._logger.warn(`Starting reconnection because socket ended unexpectedly`);
this._connected = false;
this._created && services.reconnect(this.create).then(() => (this._connected = true));
this._created &&
services.reconnect(this.create).then(() => {
this._connected = true;
});
});

this._logger.info(
Expand Down Expand Up @@ -119,6 +122,10 @@ class Producer {
});
if (utils.isNil(payload)) throw new errors.PulsarFlexNoPayloadError();
try {
if (!this._connected)
throw new errors.PulsarFlexProducerCreationError({
message: 'Cannot send messages over not connected producer',
});
const { command } = await services.sendMessage({
producerId: this._producerId,
producerName: this._producerName,
Expand All @@ -135,6 +142,8 @@ class Producer {
this._pendingMessageQueue.push({
func: () =>
services.sendMessage({
connected: this._connected,
isResend: true,
producerId: this._producerId,
producerName: this._producerName,
client: this._client,
Expand Down Expand Up @@ -169,6 +178,10 @@ class Producer {
message: 'Pending messages queue size has been exceeded',
});
try {
if (!this._connected)
throw new errors.PulsarFlexProducerCreationError({
message: 'Cannot send batch over not connected producer',
});
const { command } = await services.sendBatch({
producerId: this._producerId,
producerName: this._producerName,
Expand Down
3 changes: 0 additions & 3 deletions src/producer/services/producerClose.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
const reconnect = require('./reconnect');
const errors = require('../../errors');

const producerClose = ({ client, create, setConnected, sendResponseMediator }) => {
Expand All @@ -7,8 +6,6 @@ const producerClose = ({ client, create, setConnected, sendResponseMediator }) =
setConnected(false);
sendResponseMediator.purgeRequests({ error: errors.PulsarFlexProducerCloseError });
client.getCnx().close();
await reconnect(create, setConnected);
setConnected(true);
});
};

Expand Down
2 changes: 1 addition & 1 deletion src/producer/services/reconnect.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const utils = require('../../utils');

const reconnect = (create) =>
create().catch(async () => {
create().catch(async (e) => {
await utils.sleep(5000);
await reconnect(create);
});
Expand Down
2 changes: 1 addition & 1 deletion src/producer/services/resendMessages.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const resendMessages = (client, messageQueue, logger) => {
client.getResponseEvents().on('producerSuccess', async () => {
logger.info('Starting resend message progress');
logger.info('Starting resend messages');

while (messageQueue.length > 0) {
logger.info(`De queueing message from messageQueue current length ${messageQueue.length}`);
Expand Down
2 changes: 2 additions & 0 deletions src/producer/services/sendMessage.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ const sendMessage = async ({
responseMediator,
payload,
properties,
connected,
isResend = false,
}) => {
const { sendPayloadCommandRequest } = client.getCnx();
const messageMetadata = commands.messageMetadata({
Expand Down
57 changes: 57 additions & 0 deletions test/producer/index-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,23 @@ const utils = require('../utils');
const { jwt, discoveryServers, topic, containerName } = config;

describe('Producer tests', function () {
const producer = new Producer({
discoveryServers,
jwt,
topic,
});
beforeEach(async function () {
if (producer._connected) {
await producer.close();
}
});

afterEach(async function () {
if (producer._connected) {
await producer.close();
}
});

describe('on creating & closing ', function () {
it('should not throw exception', async function () {
try {
Expand Down Expand Up @@ -191,6 +208,46 @@ describe('Producer tests', function () {
});
await producer.close();
});
it('should resend messages after reconnect', async function () {
await producer.create();
let msgsSent = 0;
await new Promise(async (resolve, reject) => {
while (msgsSent <= 2) {
await producer
.sendMessage({ payload: 'sinai', properties: { k: 'v' } })
.then(() => {
msgsSent++;
})
.catch(reject);
if (msgsSent >= 2) {
resolve();
break;
}
producer._client.getCnx().close();
}
});
});
it('should resend messages after unload', async function () {
await producer.create();
let msgsSent = 0;
await new Promise(async (resolve, reject) => {
while (msgsSent <= 5) {
await producer
.sendMessage({ payload: 'sinai', properties: { k: 'v' } })
.then(() => {
msgsSent++;
})
.catch(reject);
if (msgsSent === 3) {
await utils.unloadTopic();
}
if (msgsSent >= 5) {
resolve();
break;
}
}
});
});
});
describe('on connection exception should resend batch', function () {
it('should not throw exception', async function () {
Expand Down

0 comments on commit 42d61c1

Please sign in to comment.