Skip to content

Commit

Permalink
fix: await transactions & lookupReferences
Browse files Browse the repository at this point in the history
  • Loading branch information
7sete7 committed May 24, 2024
1 parent 330cc4d commit 7bf893f
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 64 deletions.
17 changes: 11 additions & 6 deletions src/imports/data/data.js
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ export async function create({ authTokenId, document, data, contextUser, upsert,
});

if (fieldPermissionResult.some(result => result.success === false)) {
dbSession.abortTransaction();
await dbSession.abortTransaction();
return errorReturn(
fieldPermissionResult
.filter(result => result.success === false)
Expand Down Expand Up @@ -611,7 +611,7 @@ export async function create({ authTokenId, document, data, contextUser, upsert,
);

if (validationResults.some(result => result.success === false)) {
dbSession.abortTransaction();
await dbSession.abortTransaction();
return errorReturn(
validationResults
.filter(result => result.success === false)
Expand Down Expand Up @@ -693,7 +693,7 @@ export async function create({ authTokenId, document, data, contextUser, upsert,
});

if (validateAllFieldsResult.some(result => result.success === false)) {
dbSession.abortTransaction();
await dbSession.abortTransaction();
return errorReturn(
validateAllFieldsResult
.filter(result => result.success === false)
Expand All @@ -707,7 +707,7 @@ export async function create({ authTokenId, document, data, contextUser, upsert,

const validation = await processValidationScript({ script: metaObject.validationScript, validationData: metaObject.validationData, fullData: extend({}, data, cleanedData), user });
if (validation.success === false) {
dbSession.abortTransaction();
await dbSession.abortTransaction();
logger.error(validation, `Create - Script Validation Error - ${validation.reason}`);
return errorReturn(`[${document}] ${validation.reason}`);
}
Expand Down Expand Up @@ -743,7 +743,7 @@ export async function create({ authTokenId, document, data, contextUser, upsert,
});

if (autoNumberResult.some(result => result.success === false)) {
dbSession.abortTransaction();
await dbSession.abortTransaction();
return errorReturn(
autoNumberResult
.filter(result => result.success === false)
Expand Down Expand Up @@ -836,7 +836,7 @@ export async function create({ authTokenId, document, data, contextUser, upsert,
logger.error(e, `Error on insert ${MetaObject.Namespace.ns}.${document}: ${e.message}`);
tracingSpan?.addEvent('Error on insert', { error: e.message });
tracingSpan?.setAttribute({ error: e.message });
dbSession.abortTransaction();
await dbSession.abortTransaction();

if (e.code === 11000) {
return errorReturn(`[${document}] Duplicate key error`);
Expand Down Expand Up @@ -920,6 +920,8 @@ export async function create({ authTokenId, document, data, contextUser, upsert,
} catch (e) {
tracingSpan?.addEvent('Error on Konsistent', { error: e.message });
logger.error(e, `Error on processIncomingChange ${document}: ${e.message}`);
await dbSession.abortTransaction();
return errorReturn(`[${document}] Error on Konsistent: ${e.message}`);
}
}
return successReturn([dateToString(resultRecord)]);
Expand Down Expand Up @@ -1392,6 +1394,9 @@ export async function update({ authTokenId, document, data, contextUser, tracing
} catch (e) {
logger.error(e, `Error on processIncomingChange ${document}: ${e.message}`);
tracingSpan?.addEvent('Error on Konsistent', { error: e.message });
await dbSession.abortTransaction();

return errorReturn(`[${document}] Error on Konsistent: ${e.message}`);
}
}

Expand Down
19 changes: 0 additions & 19 deletions src/imports/konsistent/createHistory.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ export default async function createHistory(metaName, action, id, data, updatedB
return;
}

const startTime = process.hrtime();
const changeId = uuidV4();

const historyData = {};
Expand Down Expand Up @@ -50,24 +49,6 @@ export default async function createHistory(metaName, action, id, data, updatedB
// Create history!
try {
await history.updateOne(historyQuery, { $set: historyItem, $setOnInsert: historyQuery }, { upsert: true, session: dbSession });

const updateTime = process.hrtime(startTime);
// Log operation to shell
let log = metaName;

switch (action) {
case 'create':
log = `${updateTime[0]}s ${updateTime[1] / 1000000}ms => Create history to create action over ${log}`;
break;
case 'update':
log = `${updateTime[0]}s ${updateTime[1] / 1000000}ms => Create history to update action over ${log}`;
break;
case 'delete':
log = `${updateTime[0]}s ${updateTime[1] / 1000000}ms => Create history to delete action over ${log}`;
break;
}

logger.debug(log);
} catch (e) {
logger.error(e, 'Error on create history');
}
Expand Down
2 changes: 1 addition & 1 deletion src/imports/konsistent/processIncomingChange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export default async function processIncomingChange(
let startTime = process.hrtime();

if (action === 'update') {
await References.updateLookups(metaName, incomingChange._id, changedProps);
await References.updateLookups(metaName, incomingChange._id, changedProps, dbSession);
logTimeSpent(startTime, `Updated lookup references for ${metaName}`);
}

Expand Down
8 changes: 4 additions & 4 deletions src/imports/konsistent/updateReferences/lookupReference.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import uniq from 'lodash/uniq';
import { MetaObject } from '@imports/model/MetaObject';
import { logger } from '@imports/utils/logger';

export default async function updateLookupReference(metaName, fieldName, field, record, relatedMetaName) {
export default async function updateLookupReference(metaName, fieldName, field, record, relatedMetaName, dbSession) {
// Try to get related meta
const meta = MetaObject.Meta[metaName];
if (!meta) {
Expand Down Expand Up @@ -88,7 +88,7 @@ export default async function updateLookupReference(metaName, fieldName, field,
subQuery = { _id: record[inheritedField.fieldName]._id.valueOf() };

// Find records
lookupRecord = await lookupCollection.findOne(subQuery);
lookupRecord = await lookupCollection.findOne(subQuery, { sesion: dbSession });

// If no record found log error
if (!lookupRecord) {
Expand Down Expand Up @@ -125,7 +125,7 @@ export default async function updateLookupReference(metaName, fieldName, field,
},
};

const subOptions = {};
const subOptions = { session: dbSession };
if (isArray(inheritedMetaField.descriptionFields)) {
subOptions.projection = inheritedMetaField.descriptionFields.reduce((obj, item) => {
const key = item.split('.')[0];
Expand Down Expand Up @@ -175,7 +175,7 @@ export default async function updateLookupReference(metaName, fieldName, field,

try {
// Execute update and get affected records
const updateResult = await collection.updateMany(query, updateData);
const updateResult = await collection.updateMany(query, updateData, { session: dbSession });

// If there are affected records then log
if (updateResult.modifiedCount > 0) {
Expand Down
25 changes: 12 additions & 13 deletions src/imports/konsistent/updateReferences/lookupReferences.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import updateLookupReference from '@imports/konsistent/updateReferences/lookupRe
import { MetaObject } from '@imports/model/MetaObject';
import { logger } from '@imports/utils/logger';

export default async function updateLookupReferences(metaName, id, data) {
export default async function updateLookupReferences(metaName, id, data, dbSession) {
// Get references from meta
let field, fieldName, fields;
const references = MetaObject.References[metaName];

// Verify if exists reverse relations
Expand All @@ -34,10 +33,10 @@ export default async function updateLookupReferences(metaName, id, data) {

// Iterate over all relations to verify if each relation have fields in changed keys
for (var referenceDocumentName in references.from) {
fields = references.from[referenceDocumentName];
for (fieldName in fields) {
const fields = references.from[referenceDocumentName];
for (const fieldName in fields) {
var key;
field = fields[fieldName];
const field = fields[fieldName];
let keysToUpdate = [];
// Split each key to get only first key of array of paths
if (size(field.descriptionFields) > 0) {
Expand Down Expand Up @@ -73,7 +72,7 @@ export default async function updateLookupReferences(metaName, id, data) {
}

// Find record with all information, not only udpated data, to can copy all related fields
const record = await collection.findOne({ _id: id });
const record = await collection.findOne({ _id: id }, { session: dbSession });

// If no record was found log error and abort
if (!record) {
Expand All @@ -83,11 +82,11 @@ export default async function updateLookupReferences(metaName, id, data) {
logger.debug(`Updating references for ${metaName} - ${Object.keys(referencesToUpdate).join(", ")}`);

// Iterate over relations to process and iterate over each related field to execute a method to update relations
await BluebirdPromise.mapSeries(Object.keys(referencesToUpdate), async referenceDocumentName => {
fields = referencesToUpdate[referenceDocumentName];
await BluebirdPromise.mapSeries(Object.keys(fields), async fieldName => {
field = fields[fieldName];
return updateLookupReference(referenceDocumentName, fieldName, field, record, metaName);
});
});
await BluebirdPromise.map(Object.keys(referencesToUpdate), async referenceDocumentName => {
const fields = referencesToUpdate[referenceDocumentName];
await BluebirdPromise.map(Object.keys(fields), async fieldName => {
const field = fields[fieldName];
return updateLookupReference(referenceDocumentName, fieldName, field, record, metaName, dbSession);
}, { concurrency: 5 });
}, { concurrency: 5 });
}
42 changes: 21 additions & 21 deletions src/imports/konsistent/updateReferences/relationReference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ export default async function updateRelationReference(metaName: string, relation
group.$group.value[`$${aggregator.aggregator}`] = `$${aggregator.field}`;
}

if (group.$group.currency == null) {
delete group.$group.currency;
}

pipeline.push(group);

// Try to execute agg and log error if fails
Expand All @@ -148,6 +152,7 @@ export default async function updateRelationReference(metaName: string, relation
} catch (error) {
e = error as Error;
logger.error(e, `Error on aggregate relation ${relation.document} on document ${metaName}: ${e.message}`);
throw e;
}
});

Expand Down Expand Up @@ -177,28 +182,23 @@ export default async function updateRelationReference(metaName: string, relation
const updateQuery: Filter<{ _id: string }> = { _id: lookupId };

// Try to execute update query
try {
const { modifiedCount: affected } = await referenceCollection.updateOne(updateQuery, valuesToUpdate, { session: dbSession });

// If there are affected records
if (affected > 0) {
// Log Status
logger.info(`∑ ${documentName} < ${metaName} (${affected})`);

// And log all aggregatores for this status
Object.entries(relation.aggregators).forEach(([fieldName, aggregator]) => {
if (aggregator.field) {
logger.info(` ${documentName}.${fieldName} < ${aggregator.aggregator} ${metaName}.${aggregator.field}`);
} else {
logger.info(` ${documentName}.${fieldName} < ${aggregator.aggregator} ${metaName}`);
}
});
}

return affected;
} catch (error1) {
logger.error(error1, 'Error on updateRelationReference');
const { modifiedCount: affected } = await referenceCollection.updateOne(updateQuery, valuesToUpdate, { session: dbSession });

// If there are affected records
if (affected > 0) {
// Log Status
logger.info(`∑ ${documentName} < ${metaName} (${affected})`);

// And log all aggregatores for this status
Object.entries(relation.aggregators).forEach(([fieldName, aggregator]) => {
if (aggregator.field) {
logger.info(` ${documentName}.${fieldName} < ${aggregator.aggregator} ${metaName}.${aggregator.field}`);
} else {
logger.info(` ${documentName}.${fieldName} < ${aggregator.aggregator} ${metaName}`);
}
});
}

return 0;
return affected;
}

0 comments on commit 7bf893f

Please sign in to comment.