diff --git a/src/interface/queue-module-options.interface.ts b/src/interface/queue-module-options.interface.ts index 0c45643..34c8389 100644 --- a/src/interface/queue-module-options.interface.ts +++ b/src/interface/queue-module-options.interface.ts @@ -71,4 +71,17 @@ export interface AMQPConnectionOptions { * Connection options directly used by `rhea` */ connectionOptions?: ConnectionOptions; + + /** + * Retry configuration for senders and receivers + */ + retryConnection?: { + receiver?: RetryConfig; + sender?: RetryConfig; + }; +} + +export interface RetryConfig { + retryDelay?: number; + maxRetryAttempts?: number; } diff --git a/src/service/queue/queue.service.spec.ts b/src/service/queue/queue.service.spec.ts index d99464f..5f92933 100644 --- a/src/service/queue/queue.service.spec.ts +++ b/src/service/queue/queue.service.spec.ts @@ -64,6 +64,19 @@ describe('QueueService', () => { createReceiver: jest.fn().mockResolvedValue(jest.fn().mockResolvedValue(new EventContextMock().receiver)), createSender: jest.fn().mockResolvedValue(new EventContextMock().sender), disconnect: jest.fn().mockResolvedValue(jest.fn()), + getConnectionOptions: jest.fn(() => ({ + connectionUri: 'amqp://test', + retryConnection: { + receiver: { + retryDelay: 1000, + maxRetryAttempts: 3, + }, + sender: { + retryDelay: 1000, + maxRetryAttempts: 3, + }, + }, + })), getModuleOptions(): QueueModuleOptions { return moduleOptions; }, @@ -277,6 +290,47 @@ describe('QueueService', () => { expect(result).toBeDefined(); expect(amqpService.createReceiver).toHaveBeenCalledTimes(2); }); + + it('should not retry creating a receiver if maxRetryAttempts is 1', async () => { + (amqpService as any).getConnectionOptions.mockReturnValueOnce({ + retryConnection: { + receiver: { + retryDelay: 1000, + maxRetryAttempts: 1, + }, + }, + }); + + const source = 'test-queue'; + const messageHandler = jest.fn(); + + (amqpService as any).createReceiver.mockRejectedValue(new Error('Test error')); + + await expect(queueService['getReceiver'](source, 1, messageHandler, 'default')).rejects.toThrow('Test error'); + + expect(amqpService.createReceiver).toHaveBeenCalledTimes(1); + }); + + it('should not retry if both retryDelay and maxRetryAttempts are set to zero', async () => { + (amqpService.getConnectionOptions as jest.Mock).mockReturnValueOnce({ + connectionUri: 'amqp://test', + retryConnection: { + receiver: { + retryDelay: 0, + maxRetryAttempts: 0, + }, + }, + }); + + const source = 'test-queue'; + const messageHandler = jest.fn(); + + (amqpService.createReceiver as jest.Mock).mockRejectedValue(new Error('Test error')); + + await expect(queueService['getReceiver'](source, 1, messageHandler, 'default')).rejects.toThrow('Test error'); + + expect(amqpService.createReceiver).toHaveBeenCalledTimes(1); + }); }); }); @@ -417,6 +471,47 @@ describe('QueueService', () => { expect(result).toBeDefined(); expect(amqpService.createSender).toHaveBeenCalledTimes(2); }); + + it('should retry creating a sender with custom retry configuration', async () => { + (amqpService.getConnectionOptions as jest.Mock).mockReturnValueOnce({ + connectionUri: 'amqp://test', + retryConnection: { + sender: { + retryDelay: 500, + maxRetryAttempts: 2, + }, + }, + }); + + const target = 'test-queue'; + + (amqpService.createSender as jest.Mock).mockRejectedValueOnce(new Error('Test error')).mockResolvedValueOnce({} as AwaitableSender); + + const result = await queueService['getSender'](target, 'default'); + + expect(result).toBeDefined(); + expect(amqpService.createSender).toHaveBeenCalledTimes(2); + }); + + it('should not retry creating a sender if maxRetryAttempts is 1', async () => { + (amqpService.getConnectionOptions as jest.Mock).mockReturnValueOnce({ + connectionUri: 'amqp://test', + retryConnection: { + sender: { + retryDelay: 1000, + maxRetryAttempts: 1, + }, + }, + }); + + const target = 'test-queue'; + + (amqpService.createSender as jest.Mock).mockRejectedValue(new Error('Test error')); + + await expect(queueService['getSender'](target, 'default')).rejects.toThrow('Test error'); + + expect(amqpService.createSender).toHaveBeenCalledTimes(1); + }); }); describe('removeListener()', () => { diff --git a/src/service/queue/queue.service.ts b/src/service/queue/queue.service.ts index b5898cd..98b07cf 100644 --- a/src/service/queue/queue.service.ts +++ b/src/service/queue/queue.service.ts @@ -29,7 +29,6 @@ const toString = Object.prototype.toString; export class QueueService { private readonly receivers: Map; private readonly senders: Map; - private readonly reconnectDelay: number = 5000; // 5 seconds constructor(private readonly amqpService: AMQPService, private readonly objectValidatorService: ObjectValidatorService) { // this means only one sender and receiver / app / queue @@ -332,22 +331,36 @@ export class QueueService { connection: string, ): Promise { const sourceToken = typeof source === 'string' ? source : JSON.stringify(source); - const receiverToken = this.getLinkToken(sourceToken, connection); if (this.receivers.has(receiverToken)) { return this.receivers.get(receiverToken); } - try { - const receiver = await this.amqpService.createReceiver(source, credit, messageHandler.bind(this), connection); - this.receivers.set(receiverToken, receiver); - return receiver; - } catch (error) { - logger.error(`Error creating receiver: ${error.message}`, error.stack); - await sleep(this.reconnectDelay); - return this.getReceiver(source, credit, messageHandler, connection); - } + const connectionOptions = this.amqpService.getConnectionOptions(connection); + const retryDelay = connectionOptions.retryConnection?.receiver?.retryDelay ?? 0; + const maxRetryAttempts = connectionOptions.retryConnection?.receiver?.maxRetryAttempts ?? 1; + + let attempt = 0; + + do { + try { + const receiver = await this.amqpService.createReceiver(source, credit, messageHandler.bind(this), connection); + this.receivers.set(receiverToken, receiver); + return receiver; + } catch (error) { + logger.error(`Error creating receiver (attempt ${attempt + 1}): ${error.message}`, error.stack); + + attempt = attempt + 1; + if (attempt >= maxRetryAttempts) { + throw new Error(`Max retry attempts reached for creating receiver: ${error.message}`); + } + + if (retryDelay > 0) { + await sleep(retryDelay); + } + } + } while (attempt < maxRetryAttempts); } private async getSender(target: string, connection: string): Promise { @@ -357,15 +370,30 @@ export class QueueService { return this.senders.get(senderToken); } - try { - const sender = await this.amqpService.createSender(target, connection); - this.senders.set(senderToken, sender); - return sender; - } catch (error) { - logger.error(`Error creating sender: ${error.message}`, error.stack); - await sleep(this.reconnectDelay); - return this.getSender(target, connection); - } + const connectionOptions = this.amqpService.getConnectionOptions(connection); + const retryDelay = connectionOptions.retryConnection?.sender?.retryDelay ?? 0; + const maxRetryAttempts = connectionOptions.retryConnection?.sender?.maxRetryAttempts ?? 1; + + let attempt = 0; + + do { + try { + const sender = await this.amqpService.createSender(target, connection); + this.senders.set(senderToken, sender); + return sender; + } catch (error) { + logger.error(`Error creating sender (attempt ${attempt + 1}): ${error.message}`, error.stack); + + attempt++; + if (attempt >= maxRetryAttempts) { + throw new Error(`Max retry attempts reached for creating sender: ${error.message}`); + } + + if (retryDelay > 0) { + await sleep(retryDelay); + } + } + } while (attempt < maxRetryAttempts); } private encodeMessage(message: any): string {