Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't use asynchronous queue module options #73

Open
amangeot opened this issue Feb 15, 2024 · 0 comments
Open

Can't use asynchronous queue module options #73

amangeot opened this issue Feb 15, 2024 · 0 comments

Comments

@amangeot
Copy link

Hello @raschan,
I'm trying to test url connection for handling failover urls before setting up connection uri in queue module options.

To do that I'm trying to use a useFactory with [email protected] defined as useFactory?: (...args: any[]) => Promise<QueueModuleOptions> | QueueModuleOptions;.

Looking at logs, it shows the factory being called twice: once synchronously, once asynchronously.
For my use case, the synchronous call makes all connection tests fail, hence the app won't start.

Any chance you can remove the extra synchronous call?

Thanks again,


Here is some code if you'd like to reproduce using a Nest's ConfigService with a .env and a single broker instance.

When canConnect returns false by default, the app won't start.
When canConnect returns true by default, the async config makes the app start, the sync config fails, and it triggers a reconnection.
You can see that behaviour within logs.

import { InternalServerErrorException, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { QueueModule, QueueModuleOptions } from '@team-supercharge/nest-amqp';
import { ConnectionDetails, ConnectionOptions } from 'rhea';
import { Connection } from 'rhea-promise';

export async function getQueueModuleFactory(
  configService: ConfigService<Record<string, any>, true>,
): Promise<QueueModuleOptions> {
  // Prepare logger and log levels.
  const logger = new Logger(QueueModule.name);
  if (!logger?.localInstance?.setLogLevels) throw new InternalServerErrorException('Logger not available');
  logger.localInstance.setLogLevels(['verbose', 'error', 'warn', 'log', 'debug']);

  const connectionOptions: ConnectionOptions = {
    password: configService.get<string>('MESSAGE_BROKER_PASSWORD'),
    rejectUnauthorized: configService.get<boolean>('MESSAGE_BROKER_REJECT_UNAUTHORIZED'),
    username: configService.get<string>('MESSAGE_BROKER_USERNAME'),
  };

  const urls = configService.get<string>('MESSAGE_BROKER_URLS').split(',');

  logger.debug(`Getting queue module options for: ${urls}`);

  // Retrieve first valid url.
  let url: string | null = null;
  for (const testUrl of urls) {
    logger.debug(`About to check connection: ${testUrl}`);

    const can = await canConnect(testUrl, { connectionOptions, logger });
    if (can) {
      url = testUrl;
      break;
    }
  }

  logger.debug(`url: ${url}`);

  // Don't start the app if we can't connect to broker.
  if (!url) throw new InternalServerErrorException('Could not connect to message broker.');

  // Register reconnection attempts.
  // NOTE: Reconnection attempts are not reset after a successful reconnection.
  let attempts = 0;

  return {
    connectionUri: url,
    connectionOptions: {
      ...connectionOptions,
      // Switch between urls.
      // NOTE: "connection_details" (rhea) takes over "connectionUri" when reconnecting.
      connection_details: function (): ConnectionDetails {
        attempts++;
        url = urls[attempts % urls.length];
        const { protocol, hostname: host, port } = new URL(url);

        logger.verbose(`Lost connection, retrying on: ${protocol}//${host}:${port}. (${attempts})`);

        return {
          port: Number.parseInt(port),
          host,
          transport: getTransport(protocol),
          options: connectionOptions,
        };
      },
      reconnect: true,
    },
    logger,
    throwExceptionOnConnectionError: false,
  };
}

export function getTransport(protocol: string) {
  switch (protocol) {
    case 'amqp:':
      return 'tcp';
    case 'amqps:':
      return 'ssl';
    case 'amqp+ssl:':
      return 'ssl';
    case 'amqp+tls:':
      return 'tls';
    default:
      return 'tcp';
  }
}

export async function canConnect(
  url: string,
  { connectionOptions, logger }: { connectionOptions: ConnectionOptions; logger?: Logger },
): Promise<boolean> {
  logger?.debug(`Checking connection: ${url}`);

  const { protocol, hostname: host, port } = new URL(url);

  const connection = new Connection({
    ...connectionOptions,
    host: host,
    port: Number.parseInt(port),
    transport: getTransport(protocol),
    reconnect: false,
  });

  try {
    await connection.open();

    logger?.debug(`Tried opening: ${url}`);
  } catch (error) {
    if (logger) logger.verbose(`Could not connect to ${protocol}//${host}:${port}. Error: ${error instanceof Error ? error.message : JSON.stringify(error)}.`); // prettier-ignore

    return false;
  }

  if (connection.isOpen()) {
    logger?.debug(`Opened: ${url}`);

    await connection.close();

    return true;
  }

  logger?.debug(`Did not open: ${url}`);
  return false; // NOTE: The app starts when returning true by default.
}

Used as:

QueueModule.forRootAsync({
      useFactory: getQueueModuleFactory,
      inject: [ConfigService],
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant