diff --git a/README.md b/README.md index bd81a99..a904cd3 100644 --- a/README.md +++ b/README.md @@ -386,6 +386,20 @@ the connections defined in the `forRoot` method contains a connection named `Con > Note: If you leave out the name of the connection, the listener will be attached to the default connection (if exists). +### Removing a listener + +If you want to remove a listener, you can use the `removeListener()` method of the `QueueService`. The method's first parameter is the name string or `Source` object of the queue, and the second (optional) parameter is the connection name if you have set it up. Here is an example: + +```typescript +// with the default connection +await this.queueService.removeListener('example'); +``` + +```typescript +// with named connection +await this.queueService.removeListener('example', Connections.TEST); +``` + ### Message control When a new message arrives at a queue, the assigned method with `@Listen()` decorator receives the transformed and validated message body diff --git a/package-lock.json b/package-lock.json index ea1d481..9490997 100644 --- a/package-lock.json +++ b/package-lock.json @@ -3473,14 +3473,24 @@ } }, "node_modules/caniuse-lite": { - "version": "1.0.30001248", - "resolved": "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001248.tgz", - "integrity": "sha512-NwlQbJkxUFJ8nMErnGtT0QTM2TJ33xgz4KXJSMIrjXIbDVdaYueGyjOrLKRtJC+rTiWfi6j5cnZN1NBiSBJGNw==", + "version": "1.0.30001585", + "resolved": "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001585.tgz", + "integrity": "sha512-yr2BWR1yLXQ8fMpdS/4ZZXpseBgE7o4g41x3a6AJOqZuOi+iE/WdJYAuZ6Y95i4Ohd2Y+9MzIWRR+uGABH4s3Q==", "dev": true, - "funding": { - "type": "opencollective", - "url": "https://opencollective.com/browserslist" - } + "funding": [ + { + "type": "opencollective", + "url": "https://opencollective.com/browserslist" + }, + { + "type": "tidelift", + "url": "https://tidelift.com/funding/github/npm/caniuse-lite" + }, + { + "type": "github", + "url": "https://github.com/sponsors/ai" + } + ] }, "node_modules/chalk": { "version": "4.1.2", @@ -14812,9 +14822,9 @@ } }, "caniuse-lite": { - "version": "1.0.30001248", - "resolved": "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001248.tgz", - "integrity": "sha512-NwlQbJkxUFJ8nMErnGtT0QTM2TJ33xgz4KXJSMIrjXIbDVdaYueGyjOrLKRtJC+rTiWfi6j5cnZN1NBiSBJGNw==", + "version": "1.0.30001585", + "resolved": "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001585.tgz", + "integrity": "sha512-yr2BWR1yLXQ8fMpdS/4ZZXpseBgE7o4g41x3a6AJOqZuOi+iE/WdJYAuZ6Y95i4Ohd2Y+9MzIWRR+uGABH4s3Q==", "dev": true }, "chalk": { diff --git a/src/service/queue/queue.service.spec.ts b/src/service/queue/queue.service.spec.ts index ee758f1..4b00821 100644 --- a/src/service/queue/queue.service.spec.ts +++ b/src/service/queue/queue.service.spec.ts @@ -61,7 +61,7 @@ describe('QueueService', () => { { provide: AMQPService, useValue: { - createReceiver: jest.fn().mockResolvedValue(jest.fn()), + createReceiver: jest.fn().mockResolvedValue(jest.fn().mockResolvedValue(new EventContextMock().receiver)), createSender: jest.fn().mockResolvedValue(new EventContextMock().sender), disconnect: jest.fn().mockResolvedValue(jest.fn()), getModuleOptions(): QueueModuleOptions { @@ -349,6 +349,62 @@ describe('QueueService', () => { }); }); + describe('removeListener()', () => { + it('should remove listener', async () => { + (amqpService.createReceiver as jest.Mock).mockResolvedValue(new EventContextMock().receiver); + await queueService.listen(defaultQueue, () => void 0, {}); + expect(queueService['receivers'].size).toBe(1); + + const receiver = queueService['receivers'].get(queueService['receivers'].keys().next().value); + + const result = await queueService.removeListener(defaultQueue); + expect(receiver.close).toBeCalled(); + expect(result).toBe(true); + expect(queueService['receivers'].size).toBe(0); + }); + + it('should remove listener with connectionName', async () => { + (amqpService.createReceiver as jest.Mock).mockResolvedValue(new EventContextMock().receiver); + const connection = 'test_connection'; + await queueService.listen(defaultQueue, () => void 0, {}, connection); + + const receiver = queueService['receivers'].get(queueService['receivers'].keys().next().value); + + expect(queueService['receivers'].size).toBe(1); + + const result = await queueService.removeListener(defaultQueue, connection); + expect(receiver.close).toBeCalled(); + expect(result).toBe(true); + expect(queueService['receivers'].size).toBe(0); + }); + + it('should remove listener with Source object', async () => { + (amqpService.createReceiver as jest.Mock).mockResolvedValue(new EventContextMock().receiver); + const source: Source = { + address: defaultQueue, + filter: filter.selector("((JMSCorrelationID) <> ''"), + }; + await queueService.listen(source, () => void 0, {}); + expect(queueService['receivers'].size).toBe(1); + const receiver = queueService['receivers'].get(queueService['receivers'].keys().next().value); + + const result = await queueService.removeListener(source); + expect(receiver.close).toBeCalled(); + expect(result).toBe(true); + expect(queueService['receivers'].size).toBe(0); + }); + + it('should not do anything with non-existing listener', async () => { + (amqpService.createReceiver as jest.Mock).mockResolvedValue(new EventContextMock().receiver); + await queueService.listen(defaultQueue, () => void 0, {}); + expect(queueService['receivers'].size).toBe(1); + + const result = await queueService.removeListener('otherQueue'); + expect(result).toBe(false); + expect(queueService['receivers'].size).toBe(1); + }); + }); + it('should shutdown', async () => { (amqpService.createReceiver as jest.Mock).mockResolvedValue(new EventContextMock().receiver); await queueService.listen(defaultQueue, () => void 0, {}); diff --git a/src/service/queue/queue.service.ts b/src/service/queue/queue.service.ts index 18c44ad..8c4c24b 100644 --- a/src/service/queue/queue.service.ts +++ b/src/service/queue/queue.service.ts @@ -42,7 +42,7 @@ export class QueueService { * objects when a new message arrives on the queue. If a receiver is already * created for the given queue then a new receiver won't be created. * - * @param {string} source Name of the queue. + * @param {string} source Name or Source object of the queue. * @param {function(body: T, control: MessageControl, metadata: Omit) => Promise} callback Function what will invoked when message arrives. * @param {ListenOptions} options Options for message processing. * @param {string} connection Name of the connection @@ -300,27 +300,46 @@ export class QueueService { this.receivers.clear(); } + /** + * Removes listener from active listeners + * + * @param {string} source Name or Source object of the queue. + * @param {string} connection Name of the connection + * + * @returns {Promise} Returns true if listener was removed, otherwise false. If listener was not found, returns false. + * + * @public + */ + public async removeListener(source: string | Source, connection: string = AMQP_DEFAULT_CONNECTION_TOKEN): Promise { + const sourceToken = typeof source === 'string' ? source : JSON.stringify(source); + const receiverToken = this.getLinkToken(sourceToken, connection); + + if (this.receivers.has(receiverToken)) { + const receiver = this.receivers.get(receiverToken); + await receiver.close(); + + return this.receivers.delete(receiverToken); + } + + return false; + } + private async getReceiver( source: string | Source, credit: number, messageHandler: (context: EventContext) => Promise, connection: string, ): Promise { - let receiver; - const sourceToken = typeof source === 'string' ? source : JSON.stringify(source); const receiverToken = this.getLinkToken(sourceToken, connection); - if (this.receivers.has(receiverToken)) { - receiver = this.receivers.get(receiverToken); - } else { - receiver = await this.amqpService.createReceiver(source, credit, messageHandler.bind(this), connection); - + if (!this.receivers.has(receiverToken)) { + const receiver = await this.amqpService.createReceiver(source, credit, messageHandler.bind(this), connection); this.receivers.set(receiverToken, receiver); } - return receiver; + return this.receivers.get(receiverToken); } private async getSender(target: string, connection: string): Promise { diff --git a/src/test/event-context.mock.ts b/src/test/event-context.mock.ts index ccbc17c..6fd7455 100644 --- a/src/test/event-context.mock.ts +++ b/src/test/event-context.mock.ts @@ -29,7 +29,7 @@ export class EventContextMock implements EventContext { }, credit: 0, addCredit: jest.fn(), - close: jest.fn().mockResolvedValue(true), + close: jest.fn(() => true), }; public sender: any = { send: jest.fn().mockResolvedValue({ sent: true }),