diff --git a/src/imports/data/data.js b/src/imports/data/data.js index 31b6d3b..3f7df0f 100644 --- a/src/imports/data/data.js +++ b/src/imports/data/data.js @@ -573,7 +573,7 @@ export async function create({ authTokenId, document, data, contextUser, upsert, }); if (fieldPermissionResult.some(result => result.success === false)) { - dbSession.abortTransaction(); + await dbSession.abortTransaction(); return errorReturn( fieldPermissionResult .filter(result => result.success === false) @@ -611,7 +611,7 @@ export async function create({ authTokenId, document, data, contextUser, upsert, ); if (validationResults.some(result => result.success === false)) { - dbSession.abortTransaction(); + await dbSession.abortTransaction(); return errorReturn( validationResults .filter(result => result.success === false) @@ -693,7 +693,7 @@ export async function create({ authTokenId, document, data, contextUser, upsert, }); if (validateAllFieldsResult.some(result => result.success === false)) { - dbSession.abortTransaction(); + await dbSession.abortTransaction(); return errorReturn( validateAllFieldsResult .filter(result => result.success === false) @@ -707,7 +707,7 @@ export async function create({ authTokenId, document, data, contextUser, upsert, const validation = await processValidationScript({ script: metaObject.validationScript, validationData: metaObject.validationData, fullData: extend({}, data, cleanedData), user }); if (validation.success === false) { - dbSession.abortTransaction(); + await dbSession.abortTransaction(); logger.error(validation, `Create - Script Validation Error - ${validation.reason}`); return errorReturn(`[${document}] ${validation.reason}`); } @@ -743,7 +743,7 @@ export async function create({ authTokenId, document, data, contextUser, upsert, }); if (autoNumberResult.some(result => result.success === false)) { - dbSession.abortTransaction(); + await dbSession.abortTransaction(); return errorReturn( autoNumberResult .filter(result => result.success === false) @@ -836,7 +836,7 @@ export async function create({ authTokenId, document, data, contextUser, upsert, logger.error(e, `Error on insert ${MetaObject.Namespace.ns}.${document}: ${e.message}`); tracingSpan?.addEvent('Error on insert', { error: e.message }); tracingSpan?.setAttribute({ error: e.message }); - dbSession.abortTransaction(); + await dbSession.abortTransaction(); if (e.code === 11000) { return errorReturn(`[${document}] Duplicate key error`); @@ -920,6 +920,8 @@ export async function create({ authTokenId, document, data, contextUser, upsert, } catch (e) { tracingSpan?.addEvent('Error on Konsistent', { error: e.message }); logger.error(e, `Error on processIncomingChange ${document}: ${e.message}`); + await dbSession.abortTransaction(); + return errorReturn(`[${document}] Error on Konsistent: ${e.message}`); } } return successReturn([dateToString(resultRecord)]); @@ -1392,6 +1394,9 @@ export async function update({ authTokenId, document, data, contextUser, tracing } catch (e) { logger.error(e, `Error on processIncomingChange ${document}: ${e.message}`); tracingSpan?.addEvent('Error on Konsistent', { error: e.message }); + await dbSession.abortTransaction(); + + return errorReturn(`[${document}] Error on Konsistent: ${e.message}`); } } diff --git a/src/imports/konsistent/createHistory.js b/src/imports/konsistent/createHistory.js index 92b1bc7..69f344c 100644 --- a/src/imports/konsistent/createHistory.js +++ b/src/imports/konsistent/createHistory.js @@ -10,7 +10,6 @@ export default async function createHistory(metaName, action, id, data, updatedB return; } - const startTime = process.hrtime(); const changeId = uuidV4(); const historyData = {}; @@ -50,24 +49,6 @@ export default async function createHistory(metaName, action, id, data, updatedB // Create history! try { await history.updateOne(historyQuery, { $set: historyItem, $setOnInsert: historyQuery }, { upsert: true, session: dbSession }); - - const updateTime = process.hrtime(startTime); - // Log operation to shell - let log = metaName; - - switch (action) { - case 'create': - log = `${updateTime[0]}s ${updateTime[1] / 1000000}ms => Create history to create action over ${log}`; - break; - case 'update': - log = `${updateTime[0]}s ${updateTime[1] / 1000000}ms => Create history to update action over ${log}`; - break; - case 'delete': - log = `${updateTime[0]}s ${updateTime[1] / 1000000}ms => Create history to delete action over ${log}`; - break; - } - - logger.debug(log); } catch (e) { logger.error(e, 'Error on create history'); } diff --git a/src/imports/konsistent/processIncomingChange.ts b/src/imports/konsistent/processIncomingChange.ts index 53d44d6..300f215 100644 --- a/src/imports/konsistent/processIncomingChange.ts +++ b/src/imports/konsistent/processIncomingChange.ts @@ -26,7 +26,7 @@ export default async function processIncomingChange( let startTime = process.hrtime(); if (action === 'update') { - await References.updateLookups(metaName, incomingChange._id, changedProps); + await References.updateLookups(metaName, incomingChange._id, changedProps, dbSession); logTimeSpent(startTime, `Updated lookup references for ${metaName}`); } diff --git a/src/imports/konsistent/updateReferences/lookupReference.js b/src/imports/konsistent/updateReferences/lookupReference.js index 7a5694f..95611af 100644 --- a/src/imports/konsistent/updateReferences/lookupReference.js +++ b/src/imports/konsistent/updateReferences/lookupReference.js @@ -10,7 +10,7 @@ import uniq from 'lodash/uniq'; import { MetaObject } from '@imports/model/MetaObject'; import { logger } from '@imports/utils/logger'; -export default async function updateLookupReference(metaName, fieldName, field, record, relatedMetaName) { +export default async function updateLookupReference(metaName, fieldName, field, record, relatedMetaName, dbSession) { // Try to get related meta const meta = MetaObject.Meta[metaName]; if (!meta) { @@ -88,7 +88,7 @@ export default async function updateLookupReference(metaName, fieldName, field, subQuery = { _id: record[inheritedField.fieldName]._id.valueOf() }; // Find records - lookupRecord = await lookupCollection.findOne(subQuery); + lookupRecord = await lookupCollection.findOne(subQuery, { sesion: dbSession }); // If no record found log error if (!lookupRecord) { @@ -125,7 +125,7 @@ export default async function updateLookupReference(metaName, fieldName, field, }, }; - const subOptions = {}; + const subOptions = { session: dbSession }; if (isArray(inheritedMetaField.descriptionFields)) { subOptions.projection = inheritedMetaField.descriptionFields.reduce((obj, item) => { const key = item.split('.')[0]; @@ -175,7 +175,7 @@ export default async function updateLookupReference(metaName, fieldName, field, try { // Execute update and get affected records - const updateResult = await collection.updateMany(query, updateData); + const updateResult = await collection.updateMany(query, updateData, { session: dbSession }); // If there are affected records then log if (updateResult.modifiedCount > 0) { diff --git a/src/imports/konsistent/updateReferences/lookupReferences.js b/src/imports/konsistent/updateReferences/lookupReferences.js index a35d5d5..bf80de1 100644 --- a/src/imports/konsistent/updateReferences/lookupReferences.js +++ b/src/imports/konsistent/updateReferences/lookupReferences.js @@ -10,9 +10,8 @@ import updateLookupReference from '@imports/konsistent/updateReferences/lookupRe import { MetaObject } from '@imports/model/MetaObject'; import { logger } from '@imports/utils/logger'; -export default async function updateLookupReferences(metaName, id, data) { +export default async function updateLookupReferences(metaName, id, data, dbSession) { // Get references from meta - let field, fieldName, fields; const references = MetaObject.References[metaName]; // Verify if exists reverse relations @@ -34,10 +33,10 @@ export default async function updateLookupReferences(metaName, id, data) { // Iterate over all relations to verify if each relation have fields in changed keys for (var referenceDocumentName in references.from) { - fields = references.from[referenceDocumentName]; - for (fieldName in fields) { + const fields = references.from[referenceDocumentName]; + for (const fieldName in fields) { var key; - field = fields[fieldName]; + const field = fields[fieldName]; let keysToUpdate = []; // Split each key to get only first key of array of paths if (size(field.descriptionFields) > 0) { @@ -73,7 +72,7 @@ export default async function updateLookupReferences(metaName, id, data) { } // Find record with all information, not only udpated data, to can copy all related fields - const record = await collection.findOne({ _id: id }); + const record = await collection.findOne({ _id: id }, { session: dbSession }); // If no record was found log error and abort if (!record) { @@ -83,11 +82,11 @@ export default async function updateLookupReferences(metaName, id, data) { logger.debug(`Updating references for ${metaName} - ${Object.keys(referencesToUpdate).join(", ")}`); // Iterate over relations to process and iterate over each related field to execute a method to update relations - await BluebirdPromise.mapSeries(Object.keys(referencesToUpdate), async referenceDocumentName => { - fields = referencesToUpdate[referenceDocumentName]; - await BluebirdPromise.mapSeries(Object.keys(fields), async fieldName => { - field = fields[fieldName]; - return updateLookupReference(referenceDocumentName, fieldName, field, record, metaName); - }); - }); + await BluebirdPromise.map(Object.keys(referencesToUpdate), async referenceDocumentName => { + const fields = referencesToUpdate[referenceDocumentName]; + await BluebirdPromise.map(Object.keys(fields), async fieldName => { + const field = fields[fieldName]; + return updateLookupReference(referenceDocumentName, fieldName, field, record, metaName, dbSession); + }, { concurrency: 5 }); + }, { concurrency: 5 }); } \ No newline at end of file diff --git a/src/imports/konsistent/updateReferences/relationReference.ts b/src/imports/konsistent/updateReferences/relationReference.ts index f2ac8ea..881c003 100644 --- a/src/imports/konsistent/updateReferences/relationReference.ts +++ b/src/imports/konsistent/updateReferences/relationReference.ts @@ -125,6 +125,10 @@ export default async function updateRelationReference(metaName: string, relation group.$group.value[`$${aggregator.aggregator}`] = `$${aggregator.field}`; } + if (group.$group.currency == null) { + delete group.$group.currency; + } + pipeline.push(group); // Try to execute agg and log error if fails @@ -148,6 +152,7 @@ export default async function updateRelationReference(metaName: string, relation } catch (error) { e = error as Error; logger.error(e, `Error on aggregate relation ${relation.document} on document ${metaName}: ${e.message}`); + throw e; } }); @@ -177,28 +182,23 @@ export default async function updateRelationReference(metaName: string, relation const updateQuery: Filter<{ _id: string }> = { _id: lookupId }; // Try to execute update query - try { - const { modifiedCount: affected } = await referenceCollection.updateOne(updateQuery, valuesToUpdate, { session: dbSession }); - - // If there are affected records - if (affected > 0) { - // Log Status - logger.info(`∑ ${documentName} < ${metaName} (${affected})`); - - // And log all aggregatores for this status - Object.entries(relation.aggregators).forEach(([fieldName, aggregator]) => { - if (aggregator.field) { - logger.info(` ${documentName}.${fieldName} < ${aggregator.aggregator} ${metaName}.${aggregator.field}`); - } else { - logger.info(` ${documentName}.${fieldName} < ${aggregator.aggregator} ${metaName}`); - } - }); - } - return affected; - } catch (error1) { - logger.error(error1, 'Error on updateRelationReference'); + const { modifiedCount: affected } = await referenceCollection.updateOne(updateQuery, valuesToUpdate, { session: dbSession }); + + // If there are affected records + if (affected > 0) { + // Log Status + logger.info(`∑ ${documentName} < ${metaName} (${affected})`); + + // And log all aggregatores for this status + Object.entries(relation.aggregators).forEach(([fieldName, aggregator]) => { + if (aggregator.field) { + logger.info(` ${documentName}.${fieldName} < ${aggregator.aggregator} ${metaName}.${aggregator.field}`); + } else { + logger.info(` ${documentName}.${fieldName} < ${aggregator.aggregator} ${metaName}`); + } + }); } - return 0; + return affected; }