Skip to content

Commit

Permalink
Execute the publish promise with await to prevent them from stacking …
Browse files Browse the repository at this point in the history
…up (#386)

* CLI: add importData function

* Execute the publish promise with await to prevent them from stacking up (memory leak)

* Execute the publish promise with await to prevent them from stacking up (memory leak)

* Execute the publish promise with await to prevent them from stacking up (memory leak)

* Remove await behing publish functions

* Remove await behing publish functions

* Put necessary await

* Put necessary await

* Put necessary await

* Put necessary await

* rollback wrong rebase

* rollback wrong rebase
  • Loading branch information
stevenfreville authored Feb 9, 2024
1 parent fcf9e6e commit 76c8a94
Show file tree
Hide file tree
Showing 21 changed files with 71 additions and 68 deletions.
4 changes: 2 additions & 2 deletions apps/core/src/domain/apiKey/apiKeyDomain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ export default function ({
savedKey = _hideSecrets(savedKey);
}

eventsManagerDomain.sendDatabaseEvent(
await eventsManagerDomain.sendDatabaseEvent(
{
action: EventAction.API_KEY_SAVE,
topic: {
Expand Down Expand Up @@ -183,7 +183,7 @@ export default function ({

const keyToReturn = _hideSecrets(deletedKey);

eventsManagerDomain.sendDatabaseEvent(
await eventsManagerDomain.sendDatabaseEvent(
{
action: EventAction.API_KEY_DELETE,
topic: {
Expand Down
4 changes: 2 additions & 2 deletions apps/core/src/domain/application/applicationDomain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ export default function ({
break;
}

eventsManagerDomain.sendPubSubEvent(
await eventsManagerDomain.sendPubSubEvent(
{
data: {
applicationEvent: {
Expand All @@ -120,7 +120,7 @@ export default function ({
ctx
);

eventsManagerDomain.sendDatabaseEvent(
await eventsManagerDomain.sendDatabaseEvent(
{
action: actionByType[type],
topic: {
Expand Down
4 changes: 2 additions & 2 deletions apps/core/src/domain/attribute/attributeDomain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ export default function ({
? await attributeRepo.updateAttribute({attrData: attrToSave, ctx})
: await attributeRepo.createAttribute({attrData: attrToSave, ctx});

eventsManagerDomain.sendDatabaseEvent(
await eventsManagerDomain.sendDatabaseEvent(
{
action: EventAction.ATTRIBUTE_SAVE,
topic: {
Expand Down Expand Up @@ -313,7 +313,7 @@ export default function ({

const deletedAttribute = await attributeRepo.deleteAttribute({attrData: attrProps, ctx});

eventsManagerDomain.sendDatabaseEvent(
await eventsManagerDomain.sendDatabaseEvent(
{
action: EventAction.ATTRIBUTE_DELETE,
topic: {
Expand Down
21 changes: 10 additions & 11 deletions apps/core/src/domain/eventsManager/eventsManagerDomain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import {IQueryInfos} from '_types/queryInfos';
import {Errors} from '../../_types/errors';

export interface IEventsManagerDomain {
sendDatabaseEvent(payload: IDbPayload, ctx: IQueryInfos): void;
sendPubSubEvent(payload: IPubSubPayload, ctx: IQueryInfos): void;
sendDatabaseEvent(payload: IDbPayload, ctx: IQueryInfos): Promise<boolean>;
sendPubSubEvent(payload: IPubSubPayload, ctx: IQueryInfos): Promise<boolean>;
subscribe(triggersName: string[]): AsyncIterator<any>;
initPubSubEventsConsumer(): Promise<void>;
initCustomConsumer(
Expand Down Expand Up @@ -86,8 +86,8 @@ export default function({
};

const _send = (routingKey: string, payload: any, ctx: IQueryInfos) => {
amqpService
.publish(
try {
return amqpService.publish(
config.amqp.exchange,
routingKey,
JSON.stringify({
Expand All @@ -99,11 +99,10 @@ export default function({
trigger: ctx.trigger,
payload
})
)
.catch(err => {
// We don't want to have to await the _send function, so we handle the error here.
logger.error(err);
});
);
} catch (e) {
console.error('Error while sending event to rabbitMQ', e);
}
};

return {
Expand All @@ -130,10 +129,10 @@ export default function({
await amqpService.consume(queue, routingKey, msg => onMessage(msg, amqpService.consumer.channel));
},
sendDatabaseEvent(payload: IDbPayload, ctx: IQueryInfos) {
_send(config.eventsManager.routingKeys.data_events, payload, ctx);
return _send(config.eventsManager.routingKeys.data_events, payload, ctx);
},
sendPubSubEvent(payload: IPubSubPayload, ctx: IQueryInfos) {
_send(config.eventsManager.routingKeys.pubsub_events, payload, ctx);
return _send(config.eventsManager.routingKeys.pubsub_events, payload, ctx);
},
subscribe(triggersName: string[]): AsyncIterator<any> {
return pubsub.asyncIterator(triggersName);
Expand Down
4 changes: 2 additions & 2 deletions apps/core/src/domain/export/exportDomain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ export default function ({
return newTaskId;
}

eventsManagerDomain.sendDatabaseEvent(
await eventsManagerDomain.sendDatabaseEvent(
{
action: EventAction.EXPORT_START,
topic: null,
Expand Down Expand Up @@ -289,7 +289,7 @@ export default function ({
const url = `/${config.export.endpoint}/${filename}`;
await tasksManager.setLink(task.id, {name: 'export file', url}, ctx);

eventsManagerDomain.sendDatabaseEvent(
await eventsManagerDomain.sendDatabaseEvent(
{
action: EventAction.EXPORT_END,
topic: {
Expand Down
2 changes: 1 addition & 1 deletion apps/core/src/domain/filesManager/filesManagerDomain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ export default function ({
progress[key] = Math.floor(progress[key]);
});

eventsManager.sendPubSubEvent(
await eventsManager.sendPubSubEvent(
{
triggerName: TriggerNames.UPLOAD_FILE,
data: {upload: {uid: file.uid, userId: ctx.userId, progress}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export default function ({
// Save settings
const savedSettings = await globalSettingsRepo.saveSettings({settings, ctx});

eventsManagerDomain.sendDatabaseEvent(
await eventsManagerDomain.sendDatabaseEvent(
{
action: EventAction.GLOBAL_SETTINGS_SAVE,
topic: null,
Expand Down
27 changes: 16 additions & 11 deletions apps/core/src/domain/import/importDomain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ export default function({
return newTaskId;
}

eventsManagerDomain.sendDatabaseEvent(
await eventsManagerDomain.sendDatabaseEvent(
{
action: EventAction.CONFIG_IMPORT_START,
topic: null
Expand Down Expand Up @@ -779,7 +779,7 @@ export default function({
}
}

eventsManagerDomain.sendDatabaseEvent(
await eventsManagerDomain.sendDatabaseEvent(
{
action: EventAction.CONFIG_IMPORT_END,
topic: null
Expand Down Expand Up @@ -826,7 +826,7 @@ export default function({
}

ctx.trigger = 'data_import';
eventsManagerDomain.sendDatabaseEvent(
await eventsManagerDomain.sendDatabaseEvent(
{
action: EventAction.DATA_IMPORT_START,
topic: {filename}
Expand Down Expand Up @@ -935,13 +935,18 @@ export default function({
cacheKey: index,
isCacheActive: true
};
await _updateTaskProgress(
progress,
1,
'tasks.import_description.elements_process',
task.id,
ctx
);

// update progress every 1% of progress.elements
if (index % (progress.elements / 100) === 0) {
await _updateTaskProgress(
progress,
1,
'tasks.import_description.elements_process',
task.id,
ctx
);
}

await _treatElement(element, recordIds, cacheParams, progress, ctx);

// update import stats
Expand Down Expand Up @@ -1075,7 +1080,7 @@ export default function({
ctx
);

eventsManagerDomain.sendDatabaseEvent(
await eventsManagerDomain.sendDatabaseEvent(
{
action: EventAction.DATA_IMPORT_END,
topic: {filename},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,9 @@ export default function ({
return _createIndexationTask(findRecordParams, params, task);
}

const _updateLibraryIndexationStatus = (inProgress: boolean) => {
const _updateLibraryIndexationStatus = async (inProgress: boolean) => {
for (const libraryId of findRecordParams.map(e => e.library)) {
eventsManager.sendPubSubEvent(
await eventsManager.sendPubSubEvent(
{
triggerName: TriggerNames.INDEXATION,
data: {indexation: {userId: params.ctx.userId, libraryId, inProgress}}
Expand All @@ -432,13 +432,13 @@ export default function ({
}
};

_updateLibraryIndexationStatus(true);
await _updateLibraryIndexationStatus(true);

for (const frp of findRecordParams) {
await _indexRecords(frp, params.ctx);
}

_updateLibraryIndexationStatus(false);
await _updateLibraryIndexationStatus(false);
};

return {
Expand Down
4 changes: 2 additions & 2 deletions apps/core/src/domain/library/libraryDomain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ export default function ({
}

// sending indexation event
eventsManager.sendDatabaseEvent(
await eventsManager.sendDatabaseEvent(
{
action: EventAction.LIBRARY_SAVE,
topic: {
Expand Down Expand Up @@ -354,7 +354,7 @@ export default function ({
const deletedLibrary = await libraryRepo.deleteLibrary({id, ctx});

// sending indexation event
eventsManager.sendDatabaseEvent(
await eventsManager.sendDatabaseEvent(
{
action: EventAction.LIBRARY_DELETE,
topic: {
Expand Down
2 changes: 1 addition & 1 deletion apps/core/src/domain/permission/permissionDomain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ export default function(deps: IDeps = {}): IPermissionDomain {

const savedPermission = await permissionRepo.savePermission({permData, ctx});

eventsManagerDomain.sendDatabaseEvent(
await eventsManagerDomain.sendDatabaseEvent(
{
action: EventAction.PERMISSION_SAVE,
topic: {
Expand Down
4 changes: 2 additions & 2 deletions apps/core/src/domain/record/helpers/sendRecordUpdateEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ interface IDeps {
export default function ({
'core.domain.eventsManager': eventsManagerDomain = null
}: IDeps): SendRecordUpdateEventHelper {
return (record, updatedValues, ctx) => {
eventsManagerDomain.sendPubSubEvent(
return async (record, updatedValues, ctx) => {
await eventsManagerDomain.sendPubSubEvent(
{
triggerName: TriggerNames.RECORD_UPDATE,
data: {recordUpdate: {record, updatedValues}}
Expand Down
7 changes: 4 additions & 3 deletions apps/core/src/domain/record/recordDomain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,8 @@ export default function ({
});
}

eventsManager.sendDatabaseEvent(
// await is necessary during importData(), otherwise it will generate a memory leak due to number of events incoming
await eventsManager.sendDatabaseEvent(
{
action: EventAction.RECORD_SAVE,
topic: {
Expand All @@ -881,7 +882,7 @@ export default function ({
async updateRecord({library, recordData, ctx}): Promise<IRecord> {
const {old: oldRecord, new: savedRecord} = await recordRepo.updateRecord({libraryId: library, recordData});

eventsManager.sendDatabaseEvent(
await eventsManager.sendDatabaseEvent(
{
action: EventAction.RECORD_SAVE,
topic: {
Expand Down Expand Up @@ -969,7 +970,7 @@ export default function ({
// Everything is clean, we can actually delete the record
const deletedRecord = await recordRepo.deleteRecord({libraryId: library, recordId: id, ctx});

eventsManager.sendDatabaseEvent(
await eventsManager.sendDatabaseEvent(
{
action: EventAction.RECORD_DELETE,
topic: {
Expand Down
6 changes: 3 additions & 3 deletions apps/core/src/domain/tasksManager/tasksManagerDomain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ export default function ({
ctx
);

eventsManager.sendPubSubEvent({triggerName: TriggerNames.TASK, data: {task}}, ctx);
await eventsManager.sendPubSubEvent({triggerName: TriggerNames.TASK, data: {task}}, ctx);

return task;
};
Expand Down Expand Up @@ -375,7 +375,7 @@ export default function ({
(({dbProfiler, ...c}) => c)(ctx)
);

eventsManager.sendPubSubEvent({triggerName: TriggerNames.TASK, data: {task}}, ctx);
await eventsManager.sendPubSubEvent({triggerName: TriggerNames.TASK, data: {task}}, ctx);

return task.id;
};
Expand Down Expand Up @@ -481,7 +481,7 @@ export default function ({
await _deleteTask(t, ctx);
}

eventsManager.sendDatabaseEvent(
await eventsManager.sendDatabaseEvent(
{
action: EventAction.TASKS_DELETE,
topic: null,
Expand Down
8 changes: 4 additions & 4 deletions apps/core/src/domain/tree/treeDomain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ export default function ({
[TreeEventTypes.MOVE]: EventAction.TREE_MOVE_ELEMENT
};

eventsManagerDomain.sendPubSubEvent(
await eventsManagerDomain.sendPubSubEvent(
{
data: {
treeEvent: {
Expand All @@ -355,7 +355,7 @@ export default function ({
ctx
);

eventsManagerDomain.sendDatabaseEvent(
await eventsManagerDomain.sendDatabaseEvent(
{
action: actionByType[type],
topic: {
Expand Down Expand Up @@ -416,7 +416,7 @@ export default function ({
? await treeRepo.updateTree({treeData: dataToSave as ITree, ctx})
: await treeRepo.createTree({treeData: dataToSave as ITree, ctx});

eventsManagerDomain.sendDatabaseEvent(
await eventsManagerDomain.sendDatabaseEvent(
{
action: EventAction.TREE_SAVE,
topic: {
Expand Down Expand Up @@ -479,7 +479,7 @@ export default function ({

const deletedTree = await treeRepo.deleteTree({id, ctx});

eventsManagerDomain.sendDatabaseEvent(
await eventsManagerDomain.sendDatabaseEvent(
{
action: EventAction.TREE_DELETE,
topic: {
Expand Down
4 changes: 2 additions & 2 deletions apps/core/src/domain/value/valueDomain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ const valueDomain = function ({
// Make sure attribute is returned here
res.attribute = attribute;

eventsManager.sendDatabaseEvent(
await eventsManager.sendDatabaseEvent(
{
action: EventAction.VALUE_DELETE,
topic: {
Expand Down Expand Up @@ -472,7 +472,7 @@ const valueDomain = function ({
}

if (!areValuesIdentical) {
eventsManager.sendDatabaseEvent(
await eventsManager.sendDatabaseEvent(
{
action: EventAction.VALUE_SAVE,
topic: {
Expand Down
4 changes: 2 additions & 2 deletions apps/core/src/domain/versionProfile/versionProfileDomain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ export default function ({
? await versionProfileRepo.createVersionProfile({profileData: profileToSave, ctx})
: await versionProfileRepo.updateVersionProfile({profileData: profileToSave, ctx});

eventsManagerDomain.sendDatabaseEvent(
await eventsManagerDomain.sendDatabaseEvent(
{
action: EventAction.VERSION_PROFILE_SAVE,
topic: {
Expand Down Expand Up @@ -196,7 +196,7 @@ export default function ({

const deletedProfile = await versionProfileRepo.deleteVersionProfile({id, ctx});

eventsManagerDomain.sendDatabaseEvent(
await eventsManagerDomain.sendDatabaseEvent(
{
action: EventAction.VERSION_PROFILE_DELETE,
topic: {
Expand Down
Loading

0 comments on commit 76c8a94

Please sign in to comment.