From f85da35b1dca32168c1db6727b00f17e40aa899e Mon Sep 17 00:00:00 2001 From: Yaroslav Grishajev Date: Tue, 21 Jul 2020 10:50:26 +0300 Subject: [PATCH] fix(subscriber): user current channel only for message management --- Dockerfile | 3 +- docker-compose.yml | 3 +- package-lock.json | 166 ++++++++----- package.json | 3 +- spec/{client.spec.js => subscriber.spec.js} | 42 ++-- src/channel.js | 24 +- src/index.js | 1 + src/subscriber.js | 261 +++++++++++--------- 8 files changed, 281 insertions(+), 222 deletions(-) rename spec/{client.spec.js => subscriber.spec.js} (70%) diff --git a/Dockerfile b/Dockerfile index ae3ed28..82184f4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM node:10-alpine +FROM node:10 RUN mkdir -p /usr/src/app WORKDIR /usr/src/app @@ -13,3 +13,4 @@ COPY ./src ./src ARG YG_NPM_TOKEN RUN npm install +RUN npm install jest -g diff --git a/docker-compose.yml b/docker-compose.yml index e60d514..6aa4f83 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,8 +19,9 @@ services: - docker-compose.env environment: YG_NPM_TOKEN: ${YG_NPM_TOKEN} + AMQP_URI: amqp://guest:guest@rabbitmq rabbitmq: - image: registry.gitlab.com/trysimply/cloud/rabbitmq-test + image: ygrishajev/rabbitmq-dev restart: on-failure ports: - "5672:5672" diff --git a/package-lock.json b/package-lock.json index 9beb227..b853778 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "rxamqp", - "version": "0.2.13", + "version": "0.3.4", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -2234,13 +2234,11 @@ } }, "define-properties": { - "version": "1.1.2", - "resolved": "https://registry.npmjs.org/define-properties/-/define-properties-1.1.2.tgz", - "integrity": "sha1-g6c/L+pWmJj7c3GTyPhzyvbUXJQ=", - "dev": true, + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/define-properties/-/define-properties-1.1.3.tgz", + "integrity": "sha512-3MqfYKj2lLzdMSf8ZIZE/V+Zuy+BgD6f164e8K2w7dgnpKArBDerGYpM46IYYcjnkdPNMjPk9A6VFB8+3SKlXQ==", "requires": { - "foreach": "^2.0.5", - "object-keys": "^1.0.8" + "object-keys": "^1.0.12" } }, "define-property": { @@ -2362,27 +2360,41 @@ } }, "es-abstract": { - "version": "1.12.0", - "resolved": "https://registry.npmjs.org/es-abstract/-/es-abstract-1.12.0.tgz", - "integrity": "sha512-C8Fx/0jFmV5IPoMOFPA9P9G5NtqW+4cOPit3MIuvR2t7Ag2K15EJTpxnHAYTzL+aYQJIESYeXZmDBfOBE1HcpA==", - "dev": true, + "version": "1.17.6", + "resolved": "https://registry.npmjs.org/es-abstract/-/es-abstract-1.17.6.tgz", + "integrity": "sha512-Fr89bON3WFyUi5EvAeI48QTWX0AyekGgLA8H+c+7fbfCkJwRWRMLd8CQedNEyJuoYYhmtEqY92pgte1FAhBlhw==", "requires": { - "es-to-primitive": "^1.1.1", + "es-to-primitive": "^1.2.1", "function-bind": "^1.1.1", - "has": "^1.0.1", - "is-callable": "^1.1.3", - "is-regex": "^1.0.4" + "has": "^1.0.3", + "has-symbols": "^1.0.1", + "is-callable": "^1.2.0", + "is-regex": "^1.1.0", + "object-inspect": "^1.7.0", + "object-keys": "^1.1.1", + "object.assign": "^4.1.0", + "string.prototype.trimend": "^1.0.1", + "string.prototype.trimstart": "^1.0.1" + }, + "dependencies": { + "has": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/has/-/has-1.0.3.tgz", + "integrity": "sha512-f2dvO0VU6Oej7RkWJGrehjbzMAjFp5/VKPp5tTpWIV4JHHZK1/BxbFRtf/siA2SWTe09caDmVtYYzWEIbBS4zw==", + "requires": { + "function-bind": "^1.1.1" + } + } } }, "es-to-primitive": { - "version": "1.1.1", - "resolved": "https://registry.npmjs.org/es-to-primitive/-/es-to-primitive-1.1.1.tgz", - "integrity": "sha1-RTVSSKiJeQNLZ5Lhm7gfK3l13Q0=", - "dev": true, + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/es-to-primitive/-/es-to-primitive-1.2.1.tgz", + "integrity": "sha512-QCOllgZJtaUo9miYBcLChTUaHNjJF3PYs1VidD7AwiEj1kYxKeQTctLAezAOH5ZKRH0g2IgPn6KwB4IT8iRpvA==", "requires": { - "is-callable": "^1.1.1", + "is-callable": "^1.1.4", "is-date-object": "^1.0.1", - "is-symbol": "^1.0.1" + "is-symbol": "^1.0.2" } }, "escape-string-regexp": { @@ -2896,12 +2908,6 @@ "for-in": "^1.0.1" } }, - "foreach": { - "version": "2.0.5", - "resolved": "https://registry.npmjs.org/foreach/-/foreach-2.0.5.tgz", - "integrity": "sha1-C+4AUBiusmDQo6865ljdATbsG5k=", - "dev": true - }, "forever-agent": { "version": "0.6.1", "resolved": "https://registry.npmjs.org/forever-agent/-/forever-agent-0.6.1.tgz", @@ -3890,8 +3896,7 @@ "function-bind": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/function-bind/-/function-bind-1.1.1.tgz", - "integrity": "sha512-yIovAzMX49sF8Yl58fSCWJ5svSLuaibPxXQJFLmBObTuCr0Mf1KiPopGM9NiFjiYBCbfaa2Fh6breQ6ANVTI0A==", - "dev": true + "integrity": "sha512-yIovAzMX49sF8Yl58fSCWJ5svSLuaibPxXQJFLmBObTuCr0Mf1KiPopGM9NiFjiYBCbfaa2Fh6breQ6ANVTI0A==" }, "functional-red-black-tree": { "version": "1.0.1", @@ -4093,6 +4098,11 @@ "resolved": "https://registry.npmjs.org/has-flag/-/has-flag-3.0.0.tgz", "integrity": "sha1-tdRU3CGZriJWmfNGfloH87lVuv0=" }, + "has-symbols": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/has-symbols/-/has-symbols-1.0.1.tgz", + "integrity": "sha512-PLcsoqu++dmEIZB+6totNFKq/7Do+Z0u4oT0zKOJNl3lYK6vGwwu2hjHs+68OEZbTjiUE9bgOABXbP/GvrS0Kg==" + }, "has-value": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/has-value/-/has-value-1.0.0.tgz", @@ -4391,10 +4401,9 @@ } }, "is-callable": { - "version": "1.1.3", - "resolved": "https://registry.npmjs.org/is-callable/-/is-callable-1.1.3.tgz", - "integrity": "sha1-hut1OSgF3cM69xySoO7fdO52BLI=", - "dev": true + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/is-callable/-/is-callable-1.2.0.tgz", + "integrity": "sha512-pyVD9AaGLxtg6srb2Ng6ynWJqkHU9bEM087AKck0w8QwDarTfNcpIYoU8x8Hv2Icm8u6kFJM18Dag8lyqGkviw==" }, "is-ci": { "version": "1.1.0", @@ -4423,10 +4432,9 @@ } }, "is-date-object": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/is-date-object/-/is-date-object-1.0.1.tgz", - "integrity": "sha1-mqIOtq7rv/d/vTPnTKAbM1gdOhY=", - "dev": true + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/is-date-object/-/is-date-object-1.0.2.tgz", + "integrity": "sha512-USlDT524woQ08aoZFzh3/Z6ch9Y/EWXEHQ/AaRN0SkKq4t2Jw2R2339tSXmwuVoY7LLlBCbOIlx2myP/L5zk0g==" }, "is-descriptor": { "version": "1.0.2", @@ -4636,12 +4644,11 @@ "dev": true }, "is-regex": { - "version": "1.0.4", - "resolved": "https://registry.npmjs.org/is-regex/-/is-regex-1.0.4.tgz", - "integrity": "sha1-VRdIm1RwkbCTDglWVM7SXul+lJE=", - "dev": true, + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/is-regex/-/is-regex-1.1.0.tgz", + "integrity": "sha512-iI97M8KTWID2la5uYXlkbSDQIg4F6o1sYboZKKTDpnDQMLtUL86zxhgDet3Q2SriaYsyGqZ6Mn2SjbRKeLHdqw==", "requires": { - "has": "^1.0.1" + "has-symbols": "^1.0.1" } }, "is-resolvable": { @@ -4663,10 +4670,12 @@ "dev": true }, "is-symbol": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/is-symbol/-/is-symbol-1.0.1.tgz", - "integrity": "sha1-PMWfAAJRlLarLjjbrmaJJWtmBXI=", - "dev": true + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/is-symbol/-/is-symbol-1.0.3.tgz", + "integrity": "sha512-OwijhaRSgqvhm/0ZdAcXNZt9lYdKFpcRDT5ULUuYXPoT794UNOdU+gpT6Rzo7b4V2HUl/op6GqY894AZwv9faQ==", + "requires": { + "has-symbols": "^1.0.1" + } }, "is-text-path": { "version": "1.0.1", @@ -6418,11 +6427,15 @@ } } }, + "object-inspect": { + "version": "1.8.0", + "resolved": "https://registry.npmjs.org/object-inspect/-/object-inspect-1.8.0.tgz", + "integrity": "sha512-jLdtEOB112fORuypAyl/50VRVIBIdVQOSUUGQHzJ4xBSbit81zRarz7GThkEFZy1RceYrWYcPcBFPQwHyAc1gA==" + }, "object-keys": { - "version": "1.0.11", - "resolved": "https://registry.npmjs.org/object-keys/-/object-keys-1.0.11.tgz", - "integrity": "sha1-xUYBd4rVYPEULODgG8yotW0TQm0=", - "dev": true + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/object-keys/-/object-keys-1.1.1.tgz", + "integrity": "sha512-NuAESUOUMrlIXOfHKzD6bpPu3tYt3xvjNdRIQ+FeT0lNb4K8WR70CaDxhuNguS2XG+GjkyMwOzsN5ZktImfhLA==" }, "object-visit": { "version": "1.0.1", @@ -6441,14 +6454,24 @@ } } }, - "object.getownpropertydescriptors": { - "version": "2.0.3", - "resolved": "https://registry.npmjs.org/object.getownpropertydescriptors/-/object.getownpropertydescriptors-2.0.3.tgz", - "integrity": "sha1-h1jIRvW0B62rDyNuCYbxSwUcqhY=", - "dev": true, + "object.assign": { + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/object.assign/-/object.assign-4.1.0.tgz", + "integrity": "sha512-exHJeq6kBKj58mqGyTQ9DFvrZC/eR6OwxzoM9YRoGBqrXYonaFyGiFMuc9VZrXf7DarreEwMpurG3dd+CNyW5w==", "requires": { "define-properties": "^1.1.2", - "es-abstract": "^1.5.1" + "function-bind": "^1.1.1", + "has-symbols": "^1.0.0", + "object-keys": "^1.0.11" + } + }, + "object.getownpropertydescriptors": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/object.getownpropertydescriptors/-/object.getownpropertydescriptors-2.1.0.tgz", + "integrity": "sha512-Z53Oah9A3TdLoblT7VKJaTDdXdT+lQO+cNpKVnya5JDe9uLvzu1YyY1yFDFrcxrlRgWrEFH0jJtD/IbuwjcEVg==", + "requires": { + "define-properties": "^1.1.3", + "es-abstract": "^1.17.0-next.1" } }, "object.omit": { @@ -8665,6 +8688,24 @@ } } }, + "string.prototype.trimend": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/string.prototype.trimend/-/string.prototype.trimend-1.0.1.tgz", + "integrity": "sha512-LRPxFUaTtpqYsTeNKaFOw3R4bxIzWOnbQ837QfBylo8jIxtcbK/A/sMV7Q+OAV/vWo+7s25pOE10KYSjaSO06g==", + "requires": { + "define-properties": "^1.1.3", + "es-abstract": "^1.17.5" + } + }, + "string.prototype.trimstart": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/string.prototype.trimstart/-/string.prototype.trimstart-1.0.1.tgz", + "integrity": "sha512-XxZn+QpvrBI1FOcg6dIpxUPgWCPuNXvMD72aaRaUQv1eD4e/Qy8i/hFTe0BUmD60p/QA6bh1avmuPTfNjqVWRw==", + "requires": { + "define-properties": "^1.1.3", + "es-abstract": "^1.17.5" + } + }, "string_decoder": { "version": "0.10.31", "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", @@ -9643,13 +9684,14 @@ "dev": true }, "util.promisify": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/util.promisify/-/util.promisify-1.0.0.tgz", - "integrity": "sha512-i+6qA2MPhvoKLuxnJNpXAGhg7HphQOSUq2LKMZD0m15EiskXUkMvKdF4Uui0WYeCUGea+o2cw/ZuwehtfsrNkA==", - "dev": true, + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/util.promisify/-/util.promisify-1.0.1.tgz", + "integrity": "sha512-g9JpC/3He3bm38zsLupWryXHoEcS22YHthuPQSJdMy6KNrzIRzWqcsHzD/WUnqe45whVou4VIsPew37DoXWNrA==", "requires": { - "define-properties": "^1.1.2", - "object.getownpropertydescriptors": "^2.0.3" + "define-properties": "^1.1.3", + "es-abstract": "^1.17.2", + "has-symbols": "^1.0.1", + "object.getownpropertydescriptors": "^2.1.0" } }, "uuid": { diff --git a/package.json b/package.json index f9261b9..ca84ebe 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "rxamqp", - "version": "0.3.4", + "version": "0.3.5", "description": "Library for easy amqplib configuration and usage", "main": "./src/index.js", "scripts": { @@ -36,6 +36,7 @@ "lodash.flow": "^3.5.0", "rxjs": "^6.2.1", "rxjs-compat": "^6.2.1", + "util.promisify": "^1.0.1", "uuid": "^3.0.1" }, "devDependencies": { diff --git a/spec/client.spec.js b/spec/subscriber.spec.js similarity index 70% rename from spec/client.spec.js rename to spec/subscriber.spec.js index a5041fc..d03a984 100644 --- a/spec/client.spec.js +++ b/spec/subscriber.spec.js @@ -19,23 +19,22 @@ const usageOptions = { } const use = (...middlewares) => client.use(usageOptions, ...middlewares) -beforeEach(() => { client = createClient(Object.assign({}, { config, logger: false })) }) +beforeEach(() => { client = createClient(Object.assign({}, config, { logger: false })) }) afterEach(() => client.shutdown()) describe('Client', () => { - test('#publish message payload is properly delivered subscriber', () => { - return new Promise(resolve => { - use((msg, ctx) => { - ctx.ack().then(() => resolve(msg)) - }) - .listen() - - client.events.on('request.queue.configured', () => client.publish(EXCHANGE, ROUTING_KEY, MESSAGE)) + it('#publish message payload is properly delivered subscriber', () => new Promise(resolve => { + use((msg, ctx) => { + ctx.ack() + resolve(msg) }) - .then(message => expect(message).toMatchObject(MESSAGE)) + .listen() + + client.events.on('request.queue.configured', () => client.publish(EXCHANGE, ROUTING_KEY, MESSAGE)) }) + .then(message => expect(message).toMatchObject(MESSAGE))) - test('#publish message payload is properly delivered subscriber with multiple bindings', () => { + it('#publish message payload is properly delivered subscriber with multiple bindings', () => { expect.assertions(1) return new Promise(resolve => { @@ -45,11 +44,10 @@ describe('Client', () => { routingKey: [ROUTING_KEY, ROUTING_KEY_SECONDARY] }), (payload, { message, ack }) => { messages[message.routingKey] = payload - ack().then(() => { - if (Object.keys(messages).length === 2) { - resolve(messages) - } - }) + ack() + if (Object.keys(messages).length === 2) { + resolve(messages) + } }) .listen() @@ -64,7 +62,7 @@ describe('Client', () => { })) }) - test('#request receives a proper success response from subscriber', () => { + it('#request receives a proper success response from subscriber', () => { expect.assertions(1) use((msg, ctx) => ctx.respond(MESSAGE)) @@ -74,7 +72,7 @@ describe('Client', () => { .then(message => expect(message).toMatchObject({ data: MESSAGE })) }) - test('#request throws a proper rejection error from subscriber', () => { + it('#request throws a proper rejection error from subscriber', () => { expect.assertions(1) use((msg, ctx) => ctx.rejectAndRespond(ERROR)) @@ -84,7 +82,7 @@ describe('Client', () => { .catch(error => expect(error).toMatchObject({ error: ERROR })) }) - test('#request receives a proper success response from subscriber\'s secondary middleware', () => { + it('#request receives a proper success response from subscriber\'s secondary middleware', () => { expect.assertions(1) use( @@ -97,7 +95,7 @@ describe('Client', () => { .then(message => expect(message).toMatchObject({ data: MESSAGE })) }) - test('#request receives a proper success response from subscriber\'s global middleware', () => { + it('#request receives a proper success response from subscriber\'s global middleware', () => { expect.assertions(1) use((msg, ctx, next) => next()) @@ -108,7 +106,7 @@ describe('Client', () => { .then(message => expect(message).toMatchObject({ data: MESSAGE })) }) - test('#request throws a proper error from subscriber\'s sync global error handler', () => { + it('#request throws a proper error from subscriber\'s sync global error handler', () => { expect.assertions(1) use(() => { throw new Error(ERROR.message) }) @@ -120,7 +118,7 @@ describe('Client', () => { .catch(error => expect(error).toMatchObject({ error: ERROR })) }) - test('#request throws a proper error from subscriber\'s async global error handler', () => { + it('#request throws a proper error from subscriber\'s async global error handler', () => { use((msg, ctx, next) => setTimeout(() => next(new Error(ERROR.message)), 0)) // eslint-disable-next-line no-unused-vars .use((error, msg, ctx, next) => ctx.rejectAndRespond({ message: error.message })) diff --git a/src/channel.js b/src/channel.js index f4d5449..db1b1d4 100644 --- a/src/channel.js +++ b/src/channel.js @@ -1,20 +1,18 @@ +const { promisify } = require('util') const { IllegalOperationError } = require('amqplib/lib/error') +const { ConfirmChannel } = require('amqplib/lib/channel_model') const { BehaviorSubject } = require('rxjs/BehaviorSubject') require('rxjs/add/operator/filter') const { withDefault, toPromise, createMeta } = require('./helpers') +const callChannelMethod = (rxChannel, method) => (...args) => toPromise(rxChannel) + .then(channel => channel[method](...args)) + const promisifyChannelMethod = (rxChannel, method) => (...args) => toPromise(rxChannel) - .then(channel => { - if ( - ['ack', 'reject'].includes(method) - && !channel.consumers[args[0].consumerTag] - ) { - return Promise.reject(new Error(`cannot ${method} message via closed channel`)) - } - - return channel[method](...args) - }) + .then(channel => (channel instanceof ConfirmChannel + ? promisify(channel[method].bind(channel, ...args))() + : channel[method](...args))) const openChannel = (connectionStore, options) => { const store = new BehaviorSubject(null) @@ -24,12 +22,10 @@ const openChannel = (connectionStore, options) => { .subscribe(connection => startRxChannel(connection, store, options)) Object.assign(store, { - ack: promisifyChannelMethod(store, 'ack'), - reject: promisifyChannelMethod(store, 'reject'), publish: promisifyChannelMethod(store, 'publish'), sendToQueue: promisifyChannelMethod(store, 'sendToQueue'), - assertQueue: promisifyChannelMethod(store, 'assertQueue'), - consume: promisifyChannelMethod(store, 'consume') + assertQueue: callChannelMethod(store, 'assertQueue'), + consume: callChannelMethod(store, 'consume') }) return store diff --git a/src/index.js b/src/index.js index 0d06913..4c6750a 100644 --- a/src/index.js +++ b/src/index.js @@ -1,4 +1,5 @@ const flow = require('lodash.flow') +require('util.promisify').shim() const createPublisher = require('./publisher') const createRequester = require('./requester') diff --git a/src/subscriber.js b/src/subscriber.js index 0ccc7bd..dfa002a 100644 --- a/src/subscriber.js +++ b/src/subscriber.js @@ -10,101 +10,46 @@ const REPLY_OPTIONS = { } module.exports = context => { - const sendToQueue = (payload, message) => context.channel - .sendToQueue(message.replyTo, toBuffer(payload), Object.assign({ - appId: context.appId, - correlationId: message.id - }, REPLY_OPTIONS)) - - const respond = (payload, message, options = { ack: true }) => { - context.events.emit('response.success.sent', message.setResponse(payload)) + const shutdown = new Subject() + const resubscribe = new Subject() - if (options.ack) { - context.channel.ack(message) + const exports = { + use, + listen, + resubscribe, + deleteQueue, + shutdown: () => { + shutdown.next(true) + shutdown.complete() } - - return sendToQueue(payload, message) - } - - const ack = message => { - context.events.emit('event.ack', message) - return context.channel.ack(message) - } - - const reject = (payload, message) => { - context.events.emit('response.error.sent', message.setResponse(payload)) - context.channel.reject(message, false) - - return sendToQueue(payload, message) } - const createContext = message => ({ - message, - channel: context.channel, - respond: (payload, options) => (message.replyTo - ? respond({ - data: payload, - status: (options && options.status) || 200 - }, message, options) - : ack(message)), - rejectAndRespond: (payload, status) => message.replyTo && reject({ - error: payload, - status: status || payload.status || 500 - }, message), - ackAndRespond: (payload, status) => message.replyTo && respond({ - data: payload, - status: status || 200 - }, message, { ack: true }), - ack: () => ack(message), - reject: (requeue = false) => { - context.events.emit('event.nack', message) - return context.channel.reject(message, requeue) - } - }) - - const prepareOrReject = (message, handlerId) => { - const incoming = new IncomingMessage(Object.assign(message, { handlerId })) - const emit = () => context.events.emit(`${incoming.replyTo ? 'request' : 'event'}.received`, incoming) - - try { - incoming.parse() - emit() - } catch (error) { - emit() - return reject({ error }, incoming) - } + let isListening = false + const uses = {} + const common = [] + let errorHandler = error => defaultErrorHandler(error) - return incoming + function listen() { + return context.channel + .takeUntil(shutdown) + .subscribe(subscribeToChannel) } - const uses = {} - - const defaultErrorHandler = error => { - if (error) { - console.warn('Error: unhandled error passed to \'next\'') // eslint-disable-line no-console - console.warn(error) // eslint-disable-line no-console + function subscribeToChannel(channel) { + if (channel && typeof use === 'function' && !isListening) { + Promise.all(Object.keys(uses).map(key => uses[key](channel))) + .then(() => resubscribe.next()) + isListening = true } - } - let errorHandler = error => defaultErrorHandler(error) - const common = [] - let isListening = false - const wrap = middleware => (...args) => { - try { - return middleware(...args) - } catch (error) { - const [payload, ctx] = args - return errorHandler(error, payload, ctx, defaultErrorHandler) + if (!channel) { + isListening = false } } - const toQueueName = params => params.queue || [context.appId, params.handlerId, castArray(params.routingKey).join('.')] - .filter(value => value) - .join('.') - // TODO: implement global middlewares // TODO: ensure proper handling of multiple message ack error to avoid reconnection - const use = (...args) => { + function use(...args) { if (typeof args[0] === 'function') { args.forEach(middleware => { if (middleware.length === 4) { @@ -118,10 +63,44 @@ module.exports = context => { const [params, ...middlewares] = args - const consume = message => { + const routingKeys = castArray(params.routingKey) + const queue = toQueueName(params) + + const doUse = channel => channel.assertQueue(queue, params.queueOptions) + .then(() => routingKeys.length && Promise.all(routingKeys + .map(routingKey => channel + .bindQueue(queue, params.exchange, routingKey)))) + .then(() => context.events.emit('request.queue.configured', queue)) + .then(() => channel.consume(queue, toConsumer(params, middlewares, channel), params.consumer)) + + uses[queue] = doUse + + if (isListening) { + context.channel + .first() + .subscribe(doUse) + } + } + + function toQueueName(params) { + return params.queue || [context.appId, params.handlerId, castArray(params.routingKey).join('.')] + .filter(value => value) + .join('.') + } + + function defaultErrorHandler(error) { + if (error) { + console.warn('Error: unhandled error passed to \'next\'') // eslint-disable-line no-console + console.warn(error) // eslint-disable-line no-console + } + } + + function toConsumer(params, middlewares, channel) { + return message => { if (!message) { return null } - const handlerContext = createContext(prepareOrReject(message, params.handlerId)) + const incomingMessage = prepareOrReject(message, params.handlerId, channel) + const handlerContext = createContext(incomingMessage, channel) const handleError = error => errorHandler( error, handlerContext.message.payload, @@ -137,46 +116,95 @@ module.exports = context => { return pipeline() } + } - const routingKeys = castArray(params.routingKey) - const queue = toQueueName(params) + function prepareOrReject(message, handlerId, channel) { + const incoming = new IncomingMessage(Object.assign(message, { handlerId })) + const emit = () => context.events.emit(`${incoming.replyTo ? 'request' : 'event'}.received`, incoming) - const doUse = channel => channel.assertQueue(queue, params.queueOptions) - .then(() => routingKeys.length && Promise.all(routingKeys - .map(routingKey => channel - .bindQueue(queue, params.exchange, routingKey)))) - .then(() => context.events.emit('request.queue.configured', queue)) - .then(() => channel.consume(queue, consume, params.consumer)) + try { + incoming.parse() + emit() + } catch (error) { + emit() + return toChannelCtxBase(channel).reject(channel)({ error }, false, channel) + } - uses[queue] = doUse + return incoming + } - if (isListening) { - context.channel - .first() - .subscribe(doUse) + function createContext(message, channel) { + const { respond, reject, ack } = toChannelCtxBase(channel) + + return { + message, + channel: context.channel, + respond: (payload, options) => (message.replyTo + ? respond({ + data: payload, + status: (options && options.status) || 200 + }, message, options) + : ack(message)), + rejectAndRespond: (payload, status) => message.replyTo && reject({ + error: payload, + status: status || payload.status || 500 + }, message), + ackAndRespond: (payload, status) => message.replyTo && respond({ + data: payload, + status: status || 200 + }, message, { ack: true }), + ack: () => ack(message), + reject: (requeue = false) => { + context.events.emit('event.nack', message) + return Promise.resolve(channel.reject(message, requeue)) + } } } - const shutdown = new Subject() - const resubscribe = new Subject() + function toChannelCtxBase(channel) { + return { + respond: (payload, message, options = { ack: true }) => { + context.events.emit('response.success.sent', message.setResponse(payload)) - const listen = () => { - context.channel - .takeUntil(shutdown) - .subscribe(channel => { - if (channel && typeof use === 'function' && !isListening) { - Promise.all(Object.keys(uses).map(key => uses[key](channel))) - .then(() => resubscribe.next()) - isListening = true + if (options.ack) { + channel.ack(message) } - if (!channel) { - isListening = false - } - }) + return sendToQueue(payload, message) + }, + reject: (payload, message) => { + context.events.emit('response.error.sent', message.setResponse(payload)) + channel.reject(message, false) + + return sendToQueue(payload, message) + }, + ack: message => { + context.events.emit('event.ack', message) + return Promise.resolve(channel.ack(message)) + } + } } - const deleteQueue = params => { + function sendToQueue(payload, message) { + return context.confirmChannel + .sendToQueue(message.replyTo, toBuffer(payload), Object.assign({ + appId: context.appId, + correlationId: message.id + }, REPLY_OPTIONS)) + } + + function wrap(middleware) { + return (...args) => { + try { + return middleware(...args) + } catch (error) { + const [payload, ctx] = args + return errorHandler(error, payload, ctx, defaultErrorHandler) + } + } + } + + function deleteQueue(params) { const queue = toQueueName(params) return context.channel .first() @@ -186,14 +214,5 @@ module.exports = context => { .then(() => context.events.emit('request.queue.deleted', queue)) } - return { - use, - listen, - resubscribe, - deleteQueue, - shutdown: () => { - shutdown.next(true) - shutdown.complete() - } - } + return exports }