Skip to content

Commit

Permalink
fix: send message retries
Browse files Browse the repository at this point in the history
  • Loading branch information
7sete7 committed Jan 7, 2025
1 parent c484e5b commit 8c33b2c
Showing 1 changed file with 18 additions and 4 deletions.
22 changes: 18 additions & 4 deletions src/imports/queue/RabbitMQResource.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import { KonectyResult } from '@imports/types/result';
import { errorReturn, successReturn } from '@imports/utils/return';
import { Channel, connect, Connection } from 'amqplib';
import { QueueResource } from './QueueResource';

export class RabbitMQResource extends QueueResource {
private connection: Connection | null = null;
private channel: Channel | null = null;
private connectionUrl: string = '';

async connect(url: string): Promise<void> {
try {
this.connectionUrl = url;
this.connection = await connect(url);
this.channel = await this.connection.createChannel();

Expand Down Expand Up @@ -38,10 +41,21 @@ export class RabbitMQResource extends QueueResource {
await this.channel?.assertQueue(name, driverParams);
}

async sendMessage(queue: string, message: unknown) {
const strMessage = typeof message === 'object' ? JSON.stringify(message) : String(message);
const success = this.channel?.sendToQueue(queue, Buffer.from(strMessage), { appId: 'konecty' });
async sendMessage(queue: string, message: unknown, retries: number = 0): Promise<KonectyResult> {
try {
const strMessage = typeof message === 'object' ? JSON.stringify(message) : String(message);
const success = this.channel?.sendToQueue(queue, Buffer.from(strMessage), { appId: 'konecty', persistent: true, deliveryMode: 2 });

return success ? successReturn('Message sent') : errorReturn('Failed to send message');
return success ? successReturn('Message sent') : errorReturn('Failed to send message');
} catch (error) {
if (retries > 2) {
await this.handleError(error as Error, 'sendMessage');
return errorReturn('Failed to send message');
}

await this.disconnect();
await this.handleConnectionError(error as Error, () => this.connect(this.connectionUrl));
return this.sendMessage(queue, message, retries + 1);
}
}
}

0 comments on commit 8c33b2c

Please sign in to comment.