Skip to content

Commit

Permalink
Merge pull request #9 from banananyo/new-conn-if-cannot-open-conn
Browse files Browse the repository at this point in the history
New conn if cannot open conn
  • Loading branch information
c18s authored Jun 23, 2022
2 parents 13d9412 + c6a2fcb commit 89ff58b
Showing 1 changed file with 22 additions and 20 deletions.
42 changes: 22 additions & 20 deletions lib/services/amqp.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,17 @@ import { Injectable } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { EventEmitter } from 'events';
import { hostname } from 'os';
import { AwaitableSender, Connection, ConnectionEvents, Container, EventContext, Receiver, ReceiverEvents, SenderEvents } from 'rhea-promise';
import {
AwaitableSender,
Connection,
ConnectionEvents,
ConnectionOptions,
Container,
EventContext,
Receiver,
ReceiverEvents,
SenderEvents,
} from 'rhea-promise';

import { AMQP_CONNECTION_RECONNECT } from '../constants';
import { AMQPModuleOptions, CreateReceiverOptions, CreateSenderOptions } from '../interfaces';
Expand Down Expand Up @@ -32,7 +42,7 @@ export class AMQPService {
const container = new Container({
id: `${connectionToken}:${hostname()}:${new Date().getTime()}`.toLowerCase(),
});
const connection = container.createConnection({
let connection = container.createConnection({
...(!!connectionUri ? parseURL(connectionUri) : {}),
...connectionOptions,
});
Expand All @@ -54,34 +64,26 @@ export class AMQPService {
} else {
this.logger.warn(error);
}
const timeoutHandler = setTimeout(async () => {
(context.connection as any)._connection.dispatch(ConnectionEvents.disconnected, void 0);
await context.connection
.open()
.then(() => {
this.logger.silly('connection successfully reopened');
const emitted = AMQPService.eventEmitter.emit(AMQP_CONNECTION_RECONNECT);

// istanbul ignore next: mocking out the event emitter is unnecessary
if (!emitted) {
this.logger.warn('reconnect event not emitted');
}
})
.catch(error => {
this.logger.error(`reopening connection failed with error: ${error.message}`, error);
});
clearTimeout(timeoutHandler);
}, 1000);
});
try {
await connection.open();
} catch (err) {
const errorMessage = ErrorMessage.fromError(err);
this.logger.error(`Connection open failed: ${connectionToken}`, errorMessage);
connection?.removeAllListeners();
connection = await AMQPService.handleConnectionDisconnected(options);
if (connection?.isOpen()) {
AMQPService.eventEmitter.emit(AMQP_CONNECTION_RECONNECT);
}
}
return connection;
}

public static async handleConnectionDisconnected(options: AMQPModuleOptions) {
await new Promise(resolve => setTimeout(resolve, options.connectionOptions?.initial_reconnect_delay ?? 3000));
return AMQPService.createConnection(options);
}

/**
* @param options - create sender options
* @returns sender
Expand Down

0 comments on commit 89ff58b

Please sign in to comment.