From 76c8a9447d1e4404b3eb8512867e974a466d4ac5 Mon Sep 17 00:00:00 2001 From: Steven Date: Fri, 9 Feb 2024 10:08:40 +0000 Subject: [PATCH] Execute the publish promise with await to prevent them from stacking up (#386) * CLI: add importData function * Execute the publish promise with await to prevent them from stacking up (memory leak) * Execute the publish promise with await to prevent them from stacking up (memory leak) * Execute the publish promise with await to prevent them from stacking up (memory leak) * Remove await behing publish functions * Remove await behing publish functions * Put necessary await * Put necessary await * Put necessary await * Put necessary await * rollback wrong rebase * rollback wrong rebase --- apps/core/src/domain/apiKey/apiKeyDomain.ts | 4 +-- .../domain/application/applicationDomain.ts | 4 +-- .../src/domain/attribute/attributeDomain.ts | 4 +-- .../eventsManager/eventsManagerDomain.ts | 21 +++++++-------- apps/core/src/domain/export/exportDomain.ts | 4 +-- .../domain/filesManager/filesManagerDomain.ts | 2 +- .../globalSettings/globalSettingsDomain.ts | 2 +- apps/core/src/domain/import/importDomain.ts | 27 +++++++++++-------- .../indexationManagerDomain.ts | 8 +++--- apps/core/src/domain/library/libraryDomain.ts | 4 +-- .../src/domain/permission/permissionDomain.ts | 2 +- .../record/helpers/sendRecordUpdateEvent.ts | 4 +-- apps/core/src/domain/record/recordDomain.ts | 7 ++--- .../domain/tasksManager/tasksManagerDomain.ts | 6 ++--- apps/core/src/domain/tree/treeDomain.ts | 8 +++--- apps/core/src/domain/value/valueDomain.ts | 4 +-- .../versionProfile/versionProfileDomain.ts | 4 +-- libs/message-broker/dist/amqpService.d.ts | 11 +++----- libs/message-broker/dist/amqpService.js | 3 ++- libs/message-broker/dist/amqpService.js.map | 2 +- libs/message-broker/src/amqpService.ts | 8 +++--- 21 files changed, 71 insertions(+), 68 deletions(-) diff --git a/apps/core/src/domain/apiKey/apiKeyDomain.ts b/apps/core/src/domain/apiKey/apiKeyDomain.ts index 9a5f3d56c..b30eab337 100644 --- a/apps/core/src/domain/apiKey/apiKeyDomain.ts +++ b/apps/core/src/domain/apiKey/apiKeyDomain.ts @@ -155,7 +155,7 @@ export default function ({ savedKey = _hideSecrets(savedKey); } - eventsManagerDomain.sendDatabaseEvent( + await eventsManagerDomain.sendDatabaseEvent( { action: EventAction.API_KEY_SAVE, topic: { @@ -183,7 +183,7 @@ export default function ({ const keyToReturn = _hideSecrets(deletedKey); - eventsManagerDomain.sendDatabaseEvent( + await eventsManagerDomain.sendDatabaseEvent( { action: EventAction.API_KEY_DELETE, topic: { diff --git a/apps/core/src/domain/application/applicationDomain.ts b/apps/core/src/domain/application/applicationDomain.ts index bff20834f..5762e3773 100644 --- a/apps/core/src/domain/application/applicationDomain.ts +++ b/apps/core/src/domain/application/applicationDomain.ts @@ -107,7 +107,7 @@ export default function ({ break; } - eventsManagerDomain.sendPubSubEvent( + await eventsManagerDomain.sendPubSubEvent( { data: { applicationEvent: { @@ -120,7 +120,7 @@ export default function ({ ctx ); - eventsManagerDomain.sendDatabaseEvent( + await eventsManagerDomain.sendDatabaseEvent( { action: actionByType[type], topic: { diff --git a/apps/core/src/domain/attribute/attributeDomain.ts b/apps/core/src/domain/attribute/attributeDomain.ts index f6ce2e527..8427bb40a 100644 --- a/apps/core/src/domain/attribute/attributeDomain.ts +++ b/apps/core/src/domain/attribute/attributeDomain.ts @@ -250,7 +250,7 @@ export default function ({ ? await attributeRepo.updateAttribute({attrData: attrToSave, ctx}) : await attributeRepo.createAttribute({attrData: attrToSave, ctx}); - eventsManagerDomain.sendDatabaseEvent( + await eventsManagerDomain.sendDatabaseEvent( { action: EventAction.ATTRIBUTE_SAVE, topic: { @@ -313,7 +313,7 @@ export default function ({ const deletedAttribute = await attributeRepo.deleteAttribute({attrData: attrProps, ctx}); - eventsManagerDomain.sendDatabaseEvent( + await eventsManagerDomain.sendDatabaseEvent( { action: EventAction.ATTRIBUTE_DELETE, topic: { diff --git a/apps/core/src/domain/eventsManager/eventsManagerDomain.ts b/apps/core/src/domain/eventsManager/eventsManagerDomain.ts index 517c3ecea..d410859b0 100644 --- a/apps/core/src/domain/eventsManager/eventsManagerDomain.ts +++ b/apps/core/src/domain/eventsManager/eventsManagerDomain.ts @@ -13,8 +13,8 @@ import {IQueryInfos} from '_types/queryInfos'; import {Errors} from '../../_types/errors'; export interface IEventsManagerDomain { - sendDatabaseEvent(payload: IDbPayload, ctx: IQueryInfos): void; - sendPubSubEvent(payload: IPubSubPayload, ctx: IQueryInfos): void; + sendDatabaseEvent(payload: IDbPayload, ctx: IQueryInfos): Promise; + sendPubSubEvent(payload: IPubSubPayload, ctx: IQueryInfos): Promise; subscribe(triggersName: string[]): AsyncIterator; initPubSubEventsConsumer(): Promise; initCustomConsumer( @@ -86,8 +86,8 @@ export default function({ }; const _send = (routingKey: string, payload: any, ctx: IQueryInfos) => { - amqpService - .publish( + try { + return amqpService.publish( config.amqp.exchange, routingKey, JSON.stringify({ @@ -99,11 +99,10 @@ export default function({ trigger: ctx.trigger, payload }) - ) - .catch(err => { - // We don't want to have to await the _send function, so we handle the error here. - logger.error(err); - }); + ); + } catch (e) { + console.error('Error while sending event to rabbitMQ', e); + } }; return { @@ -130,10 +129,10 @@ export default function({ await amqpService.consume(queue, routingKey, msg => onMessage(msg, amqpService.consumer.channel)); }, sendDatabaseEvent(payload: IDbPayload, ctx: IQueryInfos) { - _send(config.eventsManager.routingKeys.data_events, payload, ctx); + return _send(config.eventsManager.routingKeys.data_events, payload, ctx); }, sendPubSubEvent(payload: IPubSubPayload, ctx: IQueryInfos) { - _send(config.eventsManager.routingKeys.pubsub_events, payload, ctx); + return _send(config.eventsManager.routingKeys.pubsub_events, payload, ctx); }, subscribe(triggersName: string[]): AsyncIterator { return pubsub.asyncIterator(triggersName); diff --git a/apps/core/src/domain/export/exportDomain.ts b/apps/core/src/domain/export/exportDomain.ts index a3465d12d..b5f63c1b2 100644 --- a/apps/core/src/domain/export/exportDomain.ts +++ b/apps/core/src/domain/export/exportDomain.ts @@ -180,7 +180,7 @@ export default function ({ return newTaskId; } - eventsManagerDomain.sendDatabaseEvent( + await eventsManagerDomain.sendDatabaseEvent( { action: EventAction.EXPORT_START, topic: null, @@ -289,7 +289,7 @@ export default function ({ const url = `/${config.export.endpoint}/${filename}`; await tasksManager.setLink(task.id, {name: 'export file', url}, ctx); - eventsManagerDomain.sendDatabaseEvent( + await eventsManagerDomain.sendDatabaseEvent( { action: EventAction.EXPORT_END, topic: { diff --git a/apps/core/src/domain/filesManager/filesManagerDomain.ts b/apps/core/src/domain/filesManager/filesManagerDomain.ts index 400ec17e8..206a6822c 100644 --- a/apps/core/src/domain/filesManager/filesManagerDomain.ts +++ b/apps/core/src/domain/filesManager/filesManagerDomain.ts @@ -495,7 +495,7 @@ export default function ({ progress[key] = Math.floor(progress[key]); }); - eventsManager.sendPubSubEvent( + await eventsManager.sendPubSubEvent( { triggerName: TriggerNames.UPLOAD_FILE, data: {upload: {uid: file.uid, userId: ctx.userId, progress}} diff --git a/apps/core/src/domain/globalSettings/globalSettingsDomain.ts b/apps/core/src/domain/globalSettings/globalSettingsDomain.ts index c36e03337..3bd74a0c6 100644 --- a/apps/core/src/domain/globalSettings/globalSettingsDomain.ts +++ b/apps/core/src/domain/globalSettings/globalSettingsDomain.ts @@ -43,7 +43,7 @@ export default function ({ // Save settings const savedSettings = await globalSettingsRepo.saveSettings({settings, ctx}); - eventsManagerDomain.sendDatabaseEvent( + await eventsManagerDomain.sendDatabaseEvent( { action: EventAction.GLOBAL_SETTINGS_SAVE, topic: null, diff --git a/apps/core/src/domain/import/importDomain.ts b/apps/core/src/domain/import/importDomain.ts index 097735c95..6ade0c1b4 100644 --- a/apps/core/src/domain/import/importDomain.ts +++ b/apps/core/src/domain/import/importDomain.ts @@ -701,7 +701,7 @@ export default function({ return newTaskId; } - eventsManagerDomain.sendDatabaseEvent( + await eventsManagerDomain.sendDatabaseEvent( { action: EventAction.CONFIG_IMPORT_START, topic: null @@ -779,7 +779,7 @@ export default function({ } } - eventsManagerDomain.sendDatabaseEvent( + await eventsManagerDomain.sendDatabaseEvent( { action: EventAction.CONFIG_IMPORT_END, topic: null @@ -826,7 +826,7 @@ export default function({ } ctx.trigger = 'data_import'; - eventsManagerDomain.sendDatabaseEvent( + await eventsManagerDomain.sendDatabaseEvent( { action: EventAction.DATA_IMPORT_START, topic: {filename} @@ -935,13 +935,18 @@ export default function({ cacheKey: index, isCacheActive: true }; - await _updateTaskProgress( - progress, - 1, - 'tasks.import_description.elements_process', - task.id, - ctx - ); + + // update progress every 1% of progress.elements + if (index % (progress.elements / 100) === 0) { + await _updateTaskProgress( + progress, + 1, + 'tasks.import_description.elements_process', + task.id, + ctx + ); + } + await _treatElement(element, recordIds, cacheParams, progress, ctx); // update import stats @@ -1075,7 +1080,7 @@ export default function({ ctx ); - eventsManagerDomain.sendDatabaseEvent( + await eventsManagerDomain.sendDatabaseEvent( { action: EventAction.DATA_IMPORT_END, topic: {filename}, diff --git a/apps/core/src/domain/indexationManager/indexationManagerDomain.ts b/apps/core/src/domain/indexationManager/indexationManagerDomain.ts index d3c895cc8..d91cbf130 100644 --- a/apps/core/src/domain/indexationManager/indexationManagerDomain.ts +++ b/apps/core/src/domain/indexationManager/indexationManagerDomain.ts @@ -420,9 +420,9 @@ export default function ({ return _createIndexationTask(findRecordParams, params, task); } - const _updateLibraryIndexationStatus = (inProgress: boolean) => { + const _updateLibraryIndexationStatus = async (inProgress: boolean) => { for (const libraryId of findRecordParams.map(e => e.library)) { - eventsManager.sendPubSubEvent( + await eventsManager.sendPubSubEvent( { triggerName: TriggerNames.INDEXATION, data: {indexation: {userId: params.ctx.userId, libraryId, inProgress}} @@ -432,13 +432,13 @@ export default function ({ } }; - _updateLibraryIndexationStatus(true); + await _updateLibraryIndexationStatus(true); for (const frp of findRecordParams) { await _indexRecords(frp, params.ctx); } - _updateLibraryIndexationStatus(false); + await _updateLibraryIndexationStatus(false); }; return { diff --git a/apps/core/src/domain/library/libraryDomain.ts b/apps/core/src/domain/library/libraryDomain.ts index 76d74967e..508d8072e 100644 --- a/apps/core/src/domain/library/libraryDomain.ts +++ b/apps/core/src/domain/library/libraryDomain.ts @@ -298,7 +298,7 @@ export default function ({ } // sending indexation event - eventsManager.sendDatabaseEvent( + await eventsManager.sendDatabaseEvent( { action: EventAction.LIBRARY_SAVE, topic: { @@ -354,7 +354,7 @@ export default function ({ const deletedLibrary = await libraryRepo.deleteLibrary({id, ctx}); // sending indexation event - eventsManager.sendDatabaseEvent( + await eventsManager.sendDatabaseEvent( { action: EventAction.LIBRARY_DELETE, topic: { diff --git a/apps/core/src/domain/permission/permissionDomain.ts b/apps/core/src/domain/permission/permissionDomain.ts index adaceea38..a2c2f7cf3 100644 --- a/apps/core/src/domain/permission/permissionDomain.ts +++ b/apps/core/src/domain/permission/permissionDomain.ts @@ -188,7 +188,7 @@ export default function(deps: IDeps = {}): IPermissionDomain { const savedPermission = await permissionRepo.savePermission({permData, ctx}); - eventsManagerDomain.sendDatabaseEvent( + await eventsManagerDomain.sendDatabaseEvent( { action: EventAction.PERMISSION_SAVE, topic: { diff --git a/apps/core/src/domain/record/helpers/sendRecordUpdateEvent.ts b/apps/core/src/domain/record/helpers/sendRecordUpdateEvent.ts index 8855171e1..23c432975 100644 --- a/apps/core/src/domain/record/helpers/sendRecordUpdateEvent.ts +++ b/apps/core/src/domain/record/helpers/sendRecordUpdateEvent.ts @@ -19,8 +19,8 @@ interface IDeps { export default function ({ 'core.domain.eventsManager': eventsManagerDomain = null }: IDeps): SendRecordUpdateEventHelper { - return (record, updatedValues, ctx) => { - eventsManagerDomain.sendPubSubEvent( + return async (record, updatedValues, ctx) => { + await eventsManagerDomain.sendPubSubEvent( { triggerName: TriggerNames.RECORD_UPDATE, data: {recordUpdate: {record, updatedValues}} diff --git a/apps/core/src/domain/record/recordDomain.ts b/apps/core/src/domain/record/recordDomain.ts index 7942122ed..a90c65e71 100644 --- a/apps/core/src/domain/record/recordDomain.ts +++ b/apps/core/src/domain/record/recordDomain.ts @@ -862,7 +862,8 @@ export default function ({ }); } - eventsManager.sendDatabaseEvent( + // await is necessary during importData(), otherwise it will generate a memory leak due to number of events incoming + await eventsManager.sendDatabaseEvent( { action: EventAction.RECORD_SAVE, topic: { @@ -881,7 +882,7 @@ export default function ({ async updateRecord({library, recordData, ctx}): Promise { const {old: oldRecord, new: savedRecord} = await recordRepo.updateRecord({libraryId: library, recordData}); - eventsManager.sendDatabaseEvent( + await eventsManager.sendDatabaseEvent( { action: EventAction.RECORD_SAVE, topic: { @@ -969,7 +970,7 @@ export default function ({ // Everything is clean, we can actually delete the record const deletedRecord = await recordRepo.deleteRecord({libraryId: library, recordId: id, ctx}); - eventsManager.sendDatabaseEvent( + await eventsManager.sendDatabaseEvent( { action: EventAction.RECORD_DELETE, topic: { diff --git a/apps/core/src/domain/tasksManager/tasksManagerDomain.ts b/apps/core/src/domain/tasksManager/tasksManagerDomain.ts index 0dbc98a6f..8dc6bfad8 100644 --- a/apps/core/src/domain/tasksManager/tasksManagerDomain.ts +++ b/apps/core/src/domain/tasksManager/tasksManagerDomain.ts @@ -291,7 +291,7 @@ export default function ({ ctx ); - eventsManager.sendPubSubEvent({triggerName: TriggerNames.TASK, data: {task}}, ctx); + await eventsManager.sendPubSubEvent({triggerName: TriggerNames.TASK, data: {task}}, ctx); return task; }; @@ -375,7 +375,7 @@ export default function ({ (({dbProfiler, ...c}) => c)(ctx) ); - eventsManager.sendPubSubEvent({triggerName: TriggerNames.TASK, data: {task}}, ctx); + await eventsManager.sendPubSubEvent({triggerName: TriggerNames.TASK, data: {task}}, ctx); return task.id; }; @@ -481,7 +481,7 @@ export default function ({ await _deleteTask(t, ctx); } - eventsManager.sendDatabaseEvent( + await eventsManager.sendDatabaseEvent( { action: EventAction.TASKS_DELETE, topic: null, diff --git a/apps/core/src/domain/tree/treeDomain.ts b/apps/core/src/domain/tree/treeDomain.ts index 99649b7d2..3786d5227 100644 --- a/apps/core/src/domain/tree/treeDomain.ts +++ b/apps/core/src/domain/tree/treeDomain.ts @@ -338,7 +338,7 @@ export default function ({ [TreeEventTypes.MOVE]: EventAction.TREE_MOVE_ELEMENT }; - eventsManagerDomain.sendPubSubEvent( + await eventsManagerDomain.sendPubSubEvent( { data: { treeEvent: { @@ -355,7 +355,7 @@ export default function ({ ctx ); - eventsManagerDomain.sendDatabaseEvent( + await eventsManagerDomain.sendDatabaseEvent( { action: actionByType[type], topic: { @@ -416,7 +416,7 @@ export default function ({ ? await treeRepo.updateTree({treeData: dataToSave as ITree, ctx}) : await treeRepo.createTree({treeData: dataToSave as ITree, ctx}); - eventsManagerDomain.sendDatabaseEvent( + await eventsManagerDomain.sendDatabaseEvent( { action: EventAction.TREE_SAVE, topic: { @@ -479,7 +479,7 @@ export default function ({ const deletedTree = await treeRepo.deleteTree({id, ctx}); - eventsManagerDomain.sendDatabaseEvent( + await eventsManagerDomain.sendDatabaseEvent( { action: EventAction.TREE_DELETE, topic: { diff --git a/apps/core/src/domain/value/valueDomain.ts b/apps/core/src/domain/value/valueDomain.ts index a58d08e1e..7843fbc61 100644 --- a/apps/core/src/domain/value/valueDomain.ts +++ b/apps/core/src/domain/value/valueDomain.ts @@ -362,7 +362,7 @@ const valueDomain = function ({ // Make sure attribute is returned here res.attribute = attribute; - eventsManager.sendDatabaseEvent( + await eventsManager.sendDatabaseEvent( { action: EventAction.VALUE_DELETE, topic: { @@ -472,7 +472,7 @@ const valueDomain = function ({ } if (!areValuesIdentical) { - eventsManager.sendDatabaseEvent( + await eventsManager.sendDatabaseEvent( { action: EventAction.VALUE_SAVE, topic: { diff --git a/apps/core/src/domain/versionProfile/versionProfileDomain.ts b/apps/core/src/domain/versionProfile/versionProfileDomain.ts index 0666ff3ad..6e7859d68 100644 --- a/apps/core/src/domain/versionProfile/versionProfileDomain.ts +++ b/apps/core/src/domain/versionProfile/versionProfileDomain.ts @@ -130,7 +130,7 @@ export default function ({ ? await versionProfileRepo.createVersionProfile({profileData: profileToSave, ctx}) : await versionProfileRepo.updateVersionProfile({profileData: profileToSave, ctx}); - eventsManagerDomain.sendDatabaseEvent( + await eventsManagerDomain.sendDatabaseEvent( { action: EventAction.VERSION_PROFILE_SAVE, topic: { @@ -196,7 +196,7 @@ export default function ({ const deletedProfile = await versionProfileRepo.deleteVersionProfile({id, ctx}); - eventsManagerDomain.sendDatabaseEvent( + await eventsManagerDomain.sendDatabaseEvent( { action: EventAction.VERSION_PROFILE_DELETE, topic: { diff --git a/libs/message-broker/dist/amqpService.d.ts b/libs/message-broker/dist/amqpService.d.ts index a7de12102..a2d9d9e49 100644 --- a/libs/message-broker/dist/amqpService.d.ts +++ b/libs/message-broker/dist/amqpService.d.ts @@ -9,17 +9,12 @@ export interface IAmqpService { connection: amqp.Connection; channel: amqp.ConfirmChannel; }; - publish(exchange: string, routingKey: string, msg: string, priority?: number): Promise; - consume( - queue: string, - routingKey: string, - onMessage: onMessageFunc, - consumerTag?: string - ): Promise; + publish(exchange: string, routingKey: string, msg: string, priority?: number): Promise; + consume(queue: string, routingKey: string, onMessage: onMessageFunc, consumerTag?: string): Promise; close(): Promise; } interface IDeps { config: IAmqp; } -export default function ({config}: IDeps): Promise; +export default function ({ config }: IDeps): Promise; export {}; diff --git a/libs/message-broker/dist/amqpService.js b/libs/message-broker/dist/amqpService.js index 5ebee1c18..8acff882b 100644 --- a/libs/message-broker/dist/amqpService.js +++ b/libs/message-broker/dist/amqpService.js @@ -47,9 +47,10 @@ async function default_1({ config }) { const publish = async (exchange, routingKey, msg, priority) => { try { await publisher.channel.checkExchange(exchange); - publisher.channel.publish(exchange, routingKey, Buffer.from(msg), { persistent: true, priority }); + const response = publisher.channel.publish(exchange, routingKey, Buffer.from(msg), { persistent: true, priority }); await publisher.channel.waitForConfirms(); retries = 0; + return response; } catch (e) { if (!retries) { diff --git a/libs/message-broker/dist/amqpService.js.map b/libs/message-broker/dist/amqpService.js.map index b5b75223e..f28829a14 100644 --- a/libs/message-broker/dist/amqpService.js.map +++ b/libs/message-broker/dist/amqpService.js.map @@ -1 +1 @@ -{"version":3,"file":"amqpService.js","sourceRoot":"","sources":["../src/amqpService.ts"],"names":[],"mappings":";;;;;;;;;;;;;;;;;;;;;;;;;AAAA,gCAAgC;AAChC,sCAAsC;AACtC,sEAAsE;AACtE,8CAAgC;AAoBjB,KAAK,oBAAW,EAAC,MAAM,EAAQ;IAC1C,IAAI,SAAsE,CAAC;IAC3E,IAAI,QAAqE,CAAC;IAC1E,IAAI,OAAO,GAAG,CAAC,CAAC;IAEhB,MAAM,KAAK,GAAG,KAAK,IAAI,EAAE;QACrB,MAAM,mBAAmB,GAAG,MAAM,IAAI,CAAC,OAAO,CAAC,MAAM,CAAC,OAAO,CAAC,CAAC;QAC/D,MAAM,gBAAgB,GAAG,MAAM,mBAAmB,CAAC,oBAAoB,EAAE,CAAC;QAC1E,MAAM,gBAAgB,CAAC,cAAc,CAAC,MAAM,CAAC,QAAQ,EAAE,MAAM,CAAC,IAAI,CAAC,CAAC;QACpE,MAAM,gBAAgB,CAAC,QAAQ,CAAC,MAAM,CAAC,QAAQ,CAAC,CAAC;QAEjD,MAAM,kBAAkB,GAAG,MAAM,IAAI,CAAC,OAAO,CAAC,MAAM,CAAC,OAAO,CAAC,CAAC;QAC9D,MAAM,eAAe,GAAG,MAAM,kBAAkB,CAAC,oBAAoB,EAAE,CAAC;QACxE,MAAM,eAAe,CAAC,cAAc,CAAC,MAAM,CAAC,QAAQ,EAAE,MAAM,CAAC,IAAI,CAAC,CAAC;QACnE,MAAM,eAAe,CAAC,QAAQ,CAAC,MAAM,CAAC,QAAQ,CAAC,CAAC;QAEhD,SAAS,GAAG,EAAC,UAAU,EAAE,mBAAmB,EAAE,OAAO,EAAE,gBAAgB,EAAC,CAAC;QACzE,QAAQ,GAAG,EAAC,UAAU,EAAE,kBAAkB,EAAE,OAAO,EAAE,eAAe,EAAC,CAAC;IAC1E,CAAC,CAAC;IAEF,MAAM,KAAK,EAAE,CAAC;IAEd,MAAM,OAAO,GAA4B,KAAK,EAAE,QAAQ,EAAE,UAAU,EAAE,GAAG,EAAE,QAAQ,EAAiB,EAAE;QAClG,IAAI;YACA,MAAM,SAAS,CAAC,OAAO,CAAC,aAAa,CAAC,QAAQ,CAAC,CAAC;YAChD,SAAS,CAAC,OAAO,CAAC,OAAO,CAAC,QAAQ,EAAE,UAAU,EAAE,MAAM,CAAC,IAAI,CAAC,GAAG,CAAC,EAAE,EAAC,UAAU,EAAE,IAAI,EAAE,QAAQ,EAAC,CAAC,CAAC;YAChG,MAAM,SAAS,CAAC,OAAO,CAAC,eAAe,EAAE,CAAC;YAC1C,OAAO,GAAG,CAAC,CAAC;SACf;QAAC,OAAO,CAAC,EAAE;YACR,IAAI,CAAC,OAAO,EAAE;gBACV,OAAO,IAAI,CAAC,CAAC;gBAEb,IAAI;oBACA,MAAM,KAAK,EAAE,CAAC;oBACd,MAAM,OAAO,CAAC,QAAQ,EAAE,UAAU,EAAE,GAAG,EAAE,QAAQ,CAAC,CAAC;iBACtD;gBAAC,OAAO,GAAG,EAAE;oBACV,MAAM,IAAI,KAAK,CAAC,6BAA6B,CAAC,CAAC;iBAClD;aACJ;iBAAM;gBACH,MAAM,IAAI,KAAK,CAAC,6BAA6B,CAAC,CAAC;aAClD;SACJ;IACL,CAAC,CAAC;IAEF,MAAM,OAAO,GAAG,KAAK,EACjB,KAAa,EACb,UAAkB,EAClB,SAAwB,EACxB,WAAoB,EACS,EAAE;QAC/B,OAAO,QAAQ,CAAC,OAAO,CAAC,OAAO,CAC3B,KAAK,EACL,KAAK,EAAC,GAAG,EAAC,EAAE;YACR,IAAI,CAAC,GAAG,EAAE;gBACN,OAAO;aACV;YAED,IAAI;gBACA,MAAM,SAAS,CAAC,GAAG,CAAC,CAAC;aACxB;YAAC,OAAO,CAAC,EAAE;gBACR,OAAO,CAAC,KAAK,CAAC,OAAO,CAAC,GAAG,EAAE,UAAU,EAAE,CAAC,CAAC,CAAC;gBAC1C,OAAO,CAAC,KAAK,CACT,IAAI,KAAK,IAAI,UAAU;0BACrB,CAAC;uCACY,GAAG,CAAC,OAAO,CAAC,QAAQ,EAAE;qBACxC,CACA,CAAC;aACL;oBAAS;gBACN,0CAA0C;aAC7C;QACL,CAAC,EACD,EAAC,WAAW,EAAC,CAChB,CAAC;IACN,CAAC,CAAC;IAEF,MAAM,KAAK,GAAG,KAAK,IAAI,EAAE;QACrB,MAAM,SAAS,CAAC,OAAO,CAAC,KAAK,EAAE,CAAC;QAChC,MAAM,SAAS,CAAC,UAAU,CAAC,KAAK,EAAE,CAAC;QACnC,MAAM,QAAQ,CAAC,OAAO,CAAC,KAAK,EAAE,CAAC;QAC/B,MAAM,QAAQ,CAAC,UAAU,CAAC,KAAK,EAAE,CAAC;IACtC,CAAC,CAAC;IAEF,OAAO;QACH,SAAS;QACT,QAAQ;QACR,OAAO;QACP,OAAO;QACP,KAAK;KACR,CAAC;AACN,CAAC;AAzFD,4BAyFC"} \ No newline at end of file +{"version":3,"file":"amqpService.js","sourceRoot":"","sources":["../src/amqpService.ts"],"names":[],"mappings":";;;;;;;;;;;;;;;;;;;;;;;;;AAAA,gCAAgC;AAChC,sCAAsC;AACtC,sEAAsE;AACtE,8CAAgC;AAoBjB,KAAK,oBAAW,EAAC,MAAM,EAAQ;IAC1C,IAAI,SAAsE,CAAC;IAC3E,IAAI,QAAqE,CAAC;IAC1E,IAAI,OAAO,GAAG,CAAC,CAAC;IAEhB,MAAM,KAAK,GAAG,KAAK,IAAI,EAAE;QACrB,MAAM,mBAAmB,GAAG,MAAM,IAAI,CAAC,OAAO,CAAC,MAAM,CAAC,OAAO,CAAC,CAAC;QAC/D,MAAM,gBAAgB,GAAG,MAAM,mBAAmB,CAAC,oBAAoB,EAAE,CAAC;QAC1E,MAAM,gBAAgB,CAAC,cAAc,CAAC,MAAM,CAAC,QAAQ,EAAE,MAAM,CAAC,IAAI,CAAC,CAAC;QACpE,MAAM,gBAAgB,CAAC,QAAQ,CAAC,MAAM,CAAC,QAAQ,CAAC,CAAC;QAEjD,MAAM,kBAAkB,GAAG,MAAM,IAAI,CAAC,OAAO,CAAC,MAAM,CAAC,OAAO,CAAC,CAAC;QAC9D,MAAM,eAAe,GAAG,MAAM,kBAAkB,CAAC,oBAAoB,EAAE,CAAC;QACxE,MAAM,eAAe,CAAC,cAAc,CAAC,MAAM,CAAC,QAAQ,EAAE,MAAM,CAAC,IAAI,CAAC,CAAC;QACnE,MAAM,eAAe,CAAC,QAAQ,CAAC,MAAM,CAAC,QAAQ,CAAC,CAAC;QAEhD,SAAS,GAAG,EAAC,UAAU,EAAE,mBAAmB,EAAE,OAAO,EAAE,gBAAgB,EAAC,CAAC;QACzE,QAAQ,GAAG,EAAC,UAAU,EAAE,kBAAkB,EAAE,OAAO,EAAE,eAAe,EAAC,CAAC;IAC1E,CAAC,CAAC;IAEF,MAAM,KAAK,EAAE,CAAC;IAEd,MAAM,OAAO,GAA4B,KAAK,EAAE,QAAQ,EAAE,UAAU,EAAE,GAAG,EAAE,QAAQ,EAAoB,EAAE;QACrG,IAAI;YACA,MAAM,SAAS,CAAC,OAAO,CAAC,aAAa,CAAC,QAAQ,CAAC,CAAC;YAChD,MAAM,QAAQ,GAAG,SAAS,CAAC,OAAO,CAAC,OAAO,CAAC,QAAQ,EAAE,UAAU,EAAE,MAAM,CAAC,IAAI,CAAC,GAAG,CAAC,EAAE,EAAC,UAAU,EAAE,IAAI,EAAE,QAAQ,EAAC,CAAC,CAAC;YACjH,MAAM,SAAS,CAAC,OAAO,CAAC,eAAe,EAAE,CAAC;YAC1C,OAAO,GAAG,CAAC,CAAC;YAEZ,OAAO,QAAQ,CAAC;SACnB;QAAC,OAAO,CAAC,EAAE;YACR,IAAI,CAAC,OAAO,EAAE;gBACV,OAAO,IAAI,CAAC,CAAC;gBAEb,IAAI;oBACA,MAAM,KAAK,EAAE,CAAC;oBACd,MAAM,OAAO,CAAC,QAAQ,EAAE,UAAU,EAAE,GAAG,EAAE,QAAQ,CAAC,CAAC;iBACtD;gBAAC,OAAO,GAAG,EAAE;oBACV,MAAM,IAAI,KAAK,CAAC,6BAA6B,CAAC,CAAC;iBAClD;aACJ;iBAAM;gBACH,MAAM,IAAI,KAAK,CAAC,6BAA6B,CAAC,CAAC;aAClD;SACJ;IACL,CAAC,CAAC;IAEF,MAAM,OAAO,GAAG,KAAK,EACjB,KAAa,EACb,UAAkB,EAClB,SAAwB,EACxB,WAAoB,EACS,EAAE;QAC/B,OAAO,QAAQ,CAAC,OAAO,CAAC,OAAO,CAC3B,KAAK,EACL,KAAK,EAAC,GAAG,EAAC,EAAE;YACR,IAAI,CAAC,GAAG,EAAE;gBACN,OAAO;aACV;YAED,IAAI;gBACA,MAAM,SAAS,CAAC,GAAG,CAAC,CAAC;aACxB;YAAC,OAAO,CAAC,EAAE;gBACR,OAAO,CAAC,KAAK,CAAC,OAAO,CAAC,GAAG,EAAE,UAAU,EAAE,CAAC,CAAC,CAAC;gBAC1C,OAAO,CAAC,KAAK,CACT,IAAI,KAAK,IAAI,UAAU;0BACrB,CAAC;uCACY,GAAG,CAAC,OAAO,CAAC,QAAQ,EAAE;qBACxC,CACA,CAAC;aACL;oBAAS;gBACN,0CAA0C;aAC7C;QACL,CAAC,EACD,EAAC,WAAW,EAAC,CAChB,CAAC;IACN,CAAC,CAAC;IAEF,MAAM,KAAK,GAAG,KAAK,IAAI,EAAE;QACrB,MAAM,SAAS,CAAC,OAAO,CAAC,KAAK,EAAE,CAAC;QAChC,MAAM,SAAS,CAAC,UAAU,CAAC,KAAK,EAAE,CAAC;QACnC,MAAM,QAAQ,CAAC,OAAO,CAAC,KAAK,EAAE,CAAC;QAC/B,MAAM,QAAQ,CAAC,UAAU,CAAC,KAAK,EAAE,CAAC;IACtC,CAAC,CAAC;IAEF,OAAO;QACH,SAAS;QACT,QAAQ;QACR,OAAO;QACP,OAAO;QACP,KAAK;KACR,CAAC;AACN,CAAC;AA3FD,4BA2FC"} \ No newline at end of file diff --git a/libs/message-broker/src/amqpService.ts b/libs/message-broker/src/amqpService.ts index 44067aae3..0e753200f 100644 --- a/libs/message-broker/src/amqpService.ts +++ b/libs/message-broker/src/amqpService.ts @@ -7,7 +7,7 @@ import {IAmqp, onMessageFunc} from './types/amqp'; export interface IAmqpService { publisher: {connection: amqp.Connection; channel: amqp.ConfirmChannel}; consumer: {connection: amqp.Connection; channel: amqp.ConfirmChannel}; - publish(exchange: string, routingKey: string, msg: string, priority?: number): Promise; + publish(exchange: string, routingKey: string, msg: string, priority?: number): Promise; consume( queue: string, routingKey: string, @@ -43,12 +43,14 @@ export default async function ({config}: IDeps): Promise { await _init(); - const publish: IAmqpService['publish'] = async (exchange, routingKey, msg, priority): Promise => { + const publish: IAmqpService['publish'] = async (exchange, routingKey, msg, priority): Promise => { try { await publisher.channel.checkExchange(exchange); - publisher.channel.publish(exchange, routingKey, Buffer.from(msg), {persistent: true, priority}); + const response = publisher.channel.publish(exchange, routingKey, Buffer.from(msg), {persistent: true, priority}); await publisher.channel.waitForConfirms(); retries = 0; + + return response; } catch (e) { if (!retries) { retries += 1;