diff --git a/.run/Test all.run.xml b/.run/Test all.run.xml deleted file mode 100644 index c95303e..0000000 --- a/.run/Test all.run.xml +++ /dev/null @@ -1,38 +0,0 @@ -<component name="ProjectRunConfigurationManager"> - <configuration default="false" name="Test all" type="mocha-javascript-test-runner"> - <node-interpreter>project</node-interpreter> - <node-options /> - <mocha-package>$PROJECT_DIR$/node_modules/mocha</mocha-package> - <working-directory>$PROJECT_DIR$</working-directory> - <pass-parent-env>true</pass-parent-env> - <ui>bdd</ui> - <extra-mocha-options>--exit --timeout 40000</extra-mocha-options> - <test-kind>DIRECTORY</test-kind> - <test-directory>$PROJECT_DIR$/test</test-directory> - <recursive>true</recursive> - <method v="2"> - <option name="NpmBeforeRunTask" enabled="true"> - <package-json value="$PROJECT_DIR$/package.json" /> - <command value="run" /> - <scripts> - <script value="test-setup" /> - </scripts> - <node-interpreter value="project" /> - <envs /> - </option> - </method> - </configuration> - <configuration default="false" name="Test without setup all" type="mocha-javascript-test-runner"> - <node-interpreter>project</node-interpreter> - <node-options /> - <mocha-package>$PROJECT_DIR$/node_modules/mocha</mocha-package> - <working-directory>$PROJECT_DIR$</working-directory> - <pass-parent-env>true</pass-parent-env> - <ui>bdd</ui> - <extra-mocha-options>--exit --timeout 40000</extra-mocha-options> - <test-kind>DIRECTORY</test-kind> - <test-directory>$PROJECT_DIR$/test</test-directory> - <recursive>true</recursive> - <method v="2" /> - </configuration> -</component> \ No newline at end of file diff --git a/.run/Test client resolver.run.xml b/.run/Test client resolver.run.xml deleted file mode 100644 index d007cff..0000000 --- a/.run/Test client resolver.run.xml +++ /dev/null @@ -1,15 +0,0 @@ -<component name="ProjectRunConfigurationManager"> - <configuration default="false" name="Test client resolver" type="mocha-javascript-test-runner"> - <node-interpreter>project</node-interpreter> - <node-options /> - <mocha-package>$PROJECT_DIR$/node_modules/mocha</mocha-package> - <working-directory>$PROJECT_DIR$</working-directory> - <pass-parent-env>true</pass-parent-env> - <ui>bdd</ui> - <extra-mocha-options>--exit</extra-mocha-options> - <test-kind>DIRECTORY</test-kind> - <test-directory>$PROJECT_DIR$/test/client/resolver</test-directory> - <recursive>true</recursive> - <method v="2" /> - </configuration> -</component> \ No newline at end of file diff --git a/.run/Test client serde.run.xml b/.run/Test client serde.run.xml deleted file mode 100644 index d1d931a..0000000 --- a/.run/Test client serde.run.xml +++ /dev/null @@ -1,15 +0,0 @@ -<component name="ProjectRunConfigurationManager"> - <configuration default="false" name="Test client serde" type="mocha-javascript-test-runner"> - <node-interpreter>project</node-interpreter> - <node-options /> - <mocha-package>$PROJECT_DIR$/node_modules/mocha</mocha-package> - <working-directory>$PROJECT_DIR$</working-directory> - <pass-parent-env>true</pass-parent-env> - <ui>bdd</ui> - <extra-mocha-options>--exit</extra-mocha-options> - <test-kind>DIRECTORY</test-kind> - <test-directory>$PROJECT_DIR$/test/client/serde</test-directory> - <recursive>true</recursive> - <method v="2" /> - </configuration> -</component> \ No newline at end of file diff --git a/.run/Test client.run.xml b/.run/Test client.run.xml deleted file mode 100644 index d524250..0000000 --- a/.run/Test client.run.xml +++ /dev/null @@ -1,15 +0,0 @@ -<component name="ProjectRunConfigurationManager"> - <configuration default="false" name="Test client" type="mocha-javascript-test-runner"> - <node-interpreter>project</node-interpreter> - <node-options /> - <mocha-package>$PROJECT_DIR$/node_modules/mocha</mocha-package> - <working-directory>$PROJECT_DIR$</working-directory> - <pass-parent-env>true</pass-parent-env> - <ui>bdd</ui> - <extra-mocha-options>--exit --timeout 20000</extra-mocha-options> - <test-kind>DIRECTORY</test-kind> - <test-directory>$PROJECT_DIR$/test/client</test-directory> - <recursive>true</recursive> - <method v="2" /> - </configuration> -</component> \ No newline at end of file diff --git a/.run/Test consumer.run.xml b/.run/Test consumer.run.xml deleted file mode 100644 index 3782f72..0000000 --- a/.run/Test consumer.run.xml +++ /dev/null @@ -1,15 +0,0 @@ -<component name="ProjectRunConfigurationManager"> - <configuration default="false" name="Test consumer" type="mocha-javascript-test-runner"> - <node-interpreter>project</node-interpreter> - <node-options /> - <mocha-package>$PROJECT_DIR$/node_modules/mocha</mocha-package> - <working-directory>$PROJECT_DIR$</working-directory> - <pass-parent-env>true</pass-parent-env> - <ui>bdd</ui> - <extra-mocha-options>--exit --timeout 40000</extra-mocha-options> - <test-kind>DIRECTORY</test-kind> - <test-directory>$PROJECT_DIR$/test/consumer</test-directory> - <recursive>true</recursive> - <method v="2" /> - </configuration> -</component> \ No newline at end of file diff --git a/package.json b/package.json index 6d6564c..c25fc9c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "pulsar-flex", - "version": "1.0.1-beta.4", + "version": "1.0.1-beta.5", "description": "A package that natively supports pulsar api", "main": "src/index.js", "scripts": { diff --git a/src/errors/PulsarFlexProducerAlreadyCreatedError.js b/src/errors/PulsarFlexProducerAlreadyCreatedError.js new file mode 100644 index 0000000..7ddc15e --- /dev/null +++ b/src/errors/PulsarFlexProducerAlreadyCreatedError.js @@ -0,0 +1,9 @@ +class PulsarFlexProducerAlreadyCreatedError extends Error { + constructor({ message }) { + super(); + this.message = message; + this.name = 'PulsarFlexProducerAlreadyCreatedError'; + } +} + +module.exports = PulsarFlexProducerAlreadyCreatedError; diff --git a/src/errors/index.js b/src/errors/index.js index 8dc12f1..889b066 100644 --- a/src/errors/index.js +++ b/src/errors/index.js @@ -1,5 +1,6 @@ const PulsarFlexBrokerTimeoutError = require('./PulsarFlexBrokerTimeoutError'); const PulsarFlexProducerCreationError = require('./PulsarFlexProducerCreationError'); +const PulsarFlexProducerAlreadyCreatedError = require('./PulsarFlexProducerAlreadyCreatedError'); const PulsarFlexNoPayloadError = require('./PulsarFlexNoPayloadError'); const PulsarFlexProducerCloseError = require('./PulsarFlexProducerCloseError'); const PulsarFlexConnectionError = require('./PulsarFlexConnectionError'); @@ -16,6 +17,7 @@ const PulsarFlexConsumerCloseError = require('./PulsarFlexConsumerCloseError'); module.exports = { PulsarFlexBrokerTimeoutError, PulsarFlexProducerCreationError, + PulsarFlexProducerAlreadyCreatedError, PulsarFlexNoPayloadError, PulsarFlexProducerCloseError, PulsarFlexConnectionError, diff --git a/src/producer/index.js b/src/producer/index.js index 1f7ee93..c3907df 100644 --- a/src/producer/index.js +++ b/src/producer/index.js @@ -56,7 +56,7 @@ class Producer { this._logger.info(`Creating producer to topic: ${this._topic}`); if (this._connected) - throw new errors.PulsarFlexProducerCreationError({ + throw new errors.PulsarFlexProducerAlreadyCreatedError({ message: 'Already connected, please close before trying again', }); this._logger.info(`Creating client connection for producer to topic: ${this._topic}`); @@ -65,7 +65,10 @@ class Producer { await this._client.getCnx().addCleanUpListener(() => { this._logger.warn(`Starting reconnection because socket ended unexpectedly`); this._connected = false; - this._created && services.reconnect(this.create).then(() => (this._connected = true)); + this._created && + services.reconnect(this.create).then(() => { + this._connected = true; + }); }); this._logger.info( @@ -119,6 +122,10 @@ class Producer { }); if (utils.isNil(payload)) throw new errors.PulsarFlexNoPayloadError(); try { + if (!this._connected) + throw new errors.PulsarFlexProducerCreationError({ + message: 'Cannot send messages over not connected producer', + }); const { command } = await services.sendMessage({ producerId: this._producerId, producerName: this._producerName, @@ -135,6 +142,8 @@ class Producer { this._pendingMessageQueue.push({ func: () => services.sendMessage({ + connected: this._connected, + isResend: true, producerId: this._producerId, producerName: this._producerName, client: this._client, @@ -169,6 +178,10 @@ class Producer { message: 'Pending messages queue size has been exceeded', }); try { + if (!this._connected) + throw new errors.PulsarFlexProducerCreationError({ + message: 'Cannot send batch over not connected producer', + }); const { command } = await services.sendBatch({ producerId: this._producerId, producerName: this._producerName, diff --git a/src/producer/services/producerClose.js b/src/producer/services/producerClose.js index 1cdaed8..50d18c3 100644 --- a/src/producer/services/producerClose.js +++ b/src/producer/services/producerClose.js @@ -1,4 +1,3 @@ -const reconnect = require('./reconnect'); const errors = require('../../errors'); const producerClose = ({ client, create, setConnected, sendResponseMediator }) => { @@ -7,8 +6,6 @@ const producerClose = ({ client, create, setConnected, sendResponseMediator }) = setConnected(false); sendResponseMediator.purgeRequests({ error: errors.PulsarFlexProducerCloseError }); client.getCnx().close(); - await reconnect(create, setConnected); - setConnected(true); }); }; diff --git a/src/producer/services/reconnect.js b/src/producer/services/reconnect.js index b4a5142..d55d223 100644 --- a/src/producer/services/reconnect.js +++ b/src/producer/services/reconnect.js @@ -1,7 +1,7 @@ const utils = require('../../utils'); const reconnect = (create) => - create().catch(async () => { + create().catch(async (e) => { await utils.sleep(5000); await reconnect(create); }); diff --git a/src/producer/services/resendMessages.js b/src/producer/services/resendMessages.js index acf5f0a..7ac5f17 100644 --- a/src/producer/services/resendMessages.js +++ b/src/producer/services/resendMessages.js @@ -1,6 +1,6 @@ const resendMessages = (client, messageQueue, logger) => { client.getResponseEvents().on('producerSuccess', async () => { - logger.info('Starting resend message progress'); + logger.info('Starting resend messages'); while (messageQueue.length > 0) { logger.info(`De queueing message from messageQueue current length ${messageQueue.length}`); diff --git a/src/producer/services/sendMessage.js b/src/producer/services/sendMessage.js index 5ce7038..6016aa5 100644 --- a/src/producer/services/sendMessage.js +++ b/src/producer/services/sendMessage.js @@ -8,6 +8,8 @@ const sendMessage = async ({ responseMediator, payload, properties, + connected, + isResend = false, }) => { const { sendPayloadCommandRequest } = client.getCnx(); const messageMetadata = commands.messageMetadata({ diff --git a/test/producer/index-spec.js b/test/producer/index-spec.js index 90d39e3..ee722c9 100644 --- a/test/producer/index-spec.js +++ b/test/producer/index-spec.js @@ -6,6 +6,23 @@ const utils = require('../utils'); const { jwt, discoveryServers, topic, containerName } = config; describe('Producer tests', function () { + const producer = new Producer({ + discoveryServers, + jwt, + topic, + }); + beforeEach(async function () { + if (producer._connected) { + await producer.close(); + } + }); + + afterEach(async function () { + if (producer._connected) { + await producer.close(); + } + }); + describe('on creating & closing ', function () { it('should not throw exception', async function () { try { @@ -191,6 +208,46 @@ describe('Producer tests', function () { }); await producer.close(); }); + it('should resend messages after reconnect', async function () { + await producer.create(); + let msgsSent = 0; + await new Promise(async (resolve, reject) => { + while (msgsSent <= 2) { + await producer + .sendMessage({ payload: 'sinai', properties: { k: 'v' } }) + .then(() => { + msgsSent++; + }) + .catch(reject); + if (msgsSent >= 2) { + resolve(); + break; + } + producer._client.getCnx().close(); + } + }); + }); + it('should resend messages after unload', async function () { + await producer.create(); + let msgsSent = 0; + await new Promise(async (resolve, reject) => { + while (msgsSent <= 5) { + await producer + .sendMessage({ payload: 'sinai', properties: { k: 'v' } }) + .then(() => { + msgsSent++; + }) + .catch(reject); + if (msgsSent === 3) { + await utils.unloadTopic(); + } + if (msgsSent >= 5) { + resolve(); + break; + } + } + }); + }); }); describe('on connection exception should resend batch', function () { it('should not throw exception', async function () {