Skip to content

Commit

Permalink
feat: konsistent write ahead log
Browse files Browse the repository at this point in the history
  • Loading branch information
7sete7 committed Jan 10, 2025
1 parent 573a5f4 commit 9b6147a
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 71 deletions.
106 changes: 48 additions & 58 deletions src/imports/data/data.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ import { TRANSACTION_OPTIONS } from '@imports/consts';
import { find } from "@imports/data/api";
import { client } from '@imports/database';
import { Konsistent } from '@imports/konsistent';
import processIncomingChange from '@imports/konsistent/processIncomingChange';
import eventManager from '@imports/lib/EventManager';
import queueManager from '@imports/queue/QueueManager';
import objectsDiff from '@imports/utils/objectsDiff';
import { dateToString, stringToDate } from '../data/dateParser';
import { populateLookupsData } from '../data/populateLookupsData';
import { processCollectionLogin } from '../data/processCollectionLogin';
Expand Down Expand Up @@ -751,11 +748,19 @@ export async function create({ authTokenId, document, data, contextUser, upsert,
}, dbSession);

if (loginFieldResult.success === false) {
await dbSession.abortTransaction();
return loginFieldResult;
}

set(newRecord, get(metaObject, 'login.field', 'login'), loginFieldResult.data);
}

const walResult = await Konsistent.writeAheadLog(document, 'create', newRecord, user, dbSession);
if (walResult.success === false) {
await dbSession.abortTransaction();
return walResult;
}

if (upsert != null && isObject(upsert)) {
const updateOperation = {
$setOnInsert: {},
Expand Down Expand Up @@ -881,17 +886,13 @@ export async function create({ authTokenId, document, data, contextUser, upsert,
);
}

// Process sync Konsistent
if (MetaObject.Namespace.plan?.useExternalKonsistent !== true || Konsistent.isQueueEnabled === false) {
try {
tracingSpan?.addEvent('Processing sync Konsistent');
await processIncomingChange(document, resultRecord, 'create', user, resultRecord, dbSession);
} catch (e) {
tracingSpan?.addEvent('Error on Konsistent', { error: e.message });
logger.error(e, `Error on processIncomingChange ${document}: ${e.message}`);
await dbSession.abortTransaction();
return errorReturn(`[${document}] Error on Konsistent: ${e.message}`);
}
try {
await Konsistent.processChangeSync(document, 'create', user, { newRecord: resultRecord }, dbSession);
} catch (e) {
tracingSpan?.addEvent('Error on sync Konsistent', { error: e.message });
logger.error(e, `Error on sync Konsistent ${document}: ${e.message}`);
await dbSession.abortTransaction();
return errorReturn(`[${document}] Error on sync Konsistent: ${e.message}`);
}

return successReturn([dateToString(resultRecord)]);
Expand All @@ -907,18 +908,10 @@ export async function create({ authTokenId, document, data, contextUser, upsert,
if (transactionResult.success === true && transactionResult.data?.[0] != null) {
const record = transactionResult.data[0];

// Process Konsistent
if (MetaObject.Namespace.plan?.useExternalKonsistent === true && Konsistent.isQueueEnabled) {
tracingSpan?.addEvent('Sending Konsistent message');
try {
await queueManager.sendMessage(Konsistent.queue.resource, Konsistent.queue.name, {
metaName: document,
operation: 'create',
data: record
});
} catch (e) {
logger.error(e, `Error sending Konsistent message: ${e.message}`);
}
try {
await Konsistent.processChangeAsync(record);
} catch (e) {
logger.error(e, `Error sending Konsistent message: ${e.message}`);
}

// Send events
Expand Down Expand Up @@ -1325,6 +1318,21 @@ export async function update({ authTokenId, document, data, contextUser, tracing
);
}

const walResults = await BluebirdPromise.map(
updateResults,
async result => await Konsistent.writeAheadLog(document, 'update', result.data, user, dbSession),
{ concurrency: 5 },
);

if (walResults.some(result => result.success === false)) {
await dbSession.abortTransaction();
return errorReturn(
walResults
.filter(result => result.success === false)
.map(result => result.errors)
.flat(),
);
}
const updatedIs = updateResults.map(result => result.data._id);

if (updatedIs.length > 0) {
Expand Down Expand Up @@ -1380,23 +1388,17 @@ export async function update({ authTokenId, document, data, contextUser, tracing
}

// Process sync Konsistent
if (MetaObject.Namespace.plan?.useExternalKonsistent !== true || Konsistent.isQueueEnabled === false) {
logger.debug('Processing Konsistent');
for await (const record of updatedRecords) {
const original = existsRecords.find(r => r._id === record._id);
const newRecord = omit(record, ['_id', '_createdAt', '_createdBy', '_updatedAt', '_updatedBy']);
const changedProps = objectsDiff(original, newRecord);
for await (const newRecord of updatedRecords) {
const originalRecord = existsRecords.find(r => r._id === newRecord._id);

try {
tracingSpan?.addEvent('Processing sync Konsistent');
await processIncomingChange(document, record, 'update', user, changedProps, dbSession);
} catch (e) {
logger.error(e, `Error on processIncomingChange ${document}: ${e.message}`);
tracingSpan?.addEvent('Error on Konsistent', { error: e.message });
await dbSession.abortTransaction();
try {
await Konsistent.processChangeSync(document, 'update', user, { originalRecord, newRecord }, dbSession);
} catch (e) {
logger.error(e, `Error on processIncomingChange ${document}: ${e.message}`);
tracingSpan?.addEvent('Error on Konsistent', { error: e.message });
await dbSession.abortTransaction();

return errorReturn(`[${document}] Error on Konsistent: ${e.message}`);
}
return errorReturn(`[${document}] Error on Konsistent: ${e.message}`);
}
}

Expand Down Expand Up @@ -1436,23 +1438,11 @@ export async function update({ authTokenId, document, data, contextUser, tracing
if (transactionResult.success === true && transactionResult.data?.length > 0) {
const updatedRecords = transactionResult.data;

// Process Konsistent
if (MetaObject.Namespace.plan?.useExternalKonsistent === true && Konsistent.isQueueEnabled) {
tracingSpan?.addEvent('Sending Konsistent messages');
for (const record of updatedRecords) {
try {
const original = existsRecords.find(r => r._id === record._id);
const newRecord = omit(record, ['_id', '_createdAt', '_createdBy', '_updatedAt', '_updatedBy']);
const changedProps = objectsDiff(original, newRecord);

await queueManager.sendMessage(Konsistent.queue.resource, Konsistent.queue.name, {
metaName: document,
operation: 'update',
data: changedProps
});
} catch (e) {
logger.error(e, `Error sending Konsistent message: ${e.message}`);
}
for (const record of updatedRecords) {
try {
await Konsistent.processChangeAsync(record);
} catch (e) {
logger.error(e, `Error sending Konsistent message: ${e.message}`);
}
}

Expand Down
78 changes: 77 additions & 1 deletion src/imports/konsistent/index.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,38 @@
import { MetaObject } from '@imports/model/MetaObject';

import { db } from '@imports/database';
import { User } from '@imports/model/User';
import queueManager from '@imports/queue/QueueManager';
import { DataDocument } from '@imports/types/data';
import { LogDocument } from '@imports/types/Konsistent';
import getMissingParams from '@imports/utils/getMissingParams';
import objectsDiff from '@imports/utils/objectsDiff';
import { errorReturn, successReturn } from '@imports/utils/return';
import omit from 'lodash/omit';
import { ClientSession, Collection } from 'mongodb';
import { logger } from '../utils/logger';
import processIncomingChange from './processIncomingChange';

type RunningKonsistent = {
isQueueEnabled: boolean;
queue?: {
resource: string | undefined;
name: string | undefined;
};
LogCollection: Collection<LogDocument>;
processChangeSync: typeof processChangeSync;
processChangeAsync: typeof processChangeAsync;
writeAheadLog: typeof writeAheadLog;
removeWAL: typeof removeWAL;
};

export const Konsistent: RunningKonsistent = {
isQueueEnabled: false,
LogCollection: db.collection('Konsistent'),
processChangeSync,
processChangeAsync,
writeAheadLog,
removeWAL,
};

export async function setupKonsistent() {
Expand All @@ -28,6 +48,62 @@ export async function setupKonsistent() {
}

if (usingExternalKonsistent && !Konsistent.isQueueEnabled) {
logger.warn('[konsistent] is set to external but no config found');
logger.warn('[konsistent] is set to external but no config found - default to using sync');
}
}

async function processChangeAsync(data: DataDocument) {
if (MetaObject.Namespace.plan?.useExternalKonsistent === true && Konsistent.isQueueEnabled) {
await queueManager.sendMessage(Konsistent.queue!.resource!, Konsistent.queue!.name!, {
_id: data._id,
});
}
}

async function processChangeSync(metaName: string, operation: string, user: object, data: { originalRecord?: DataDocument; newRecord: DataDocument }, dbSession?: ClientSession) {
if (MetaObject.Namespace.plan?.useExternalKonsistent !== true || Konsistent.isQueueEnabled === false) {
logger.debug('Processing sync Konsistent');

const changedProps = data.originalRecord
? objectsDiff(data.originalRecord, omit(data.newRecord, ['_id', '_createdAt', '_createdBy', '_updatedAt', '_updatedBy']))
: omit(data.newRecord, ['_id', '_createdAt', '_createdBy', '_updatedAt', '_updatedBy']);
return processIncomingChange(metaName, data.newRecord, operation, user, changedProps, dbSession);
}
}

async function writeAheadLog(metaName: string, operation: string, data: DataDocument, user: User, dbSession: ClientSession) {
if (MetaObject.Namespace.plan?.useExternalKonsistent === true && Konsistent.isQueueEnabled) {
try {
const result = await Konsistent.LogCollection.insertOne(
{
_id: `${metaName}-${data._id}-${Date.now()}`,
dataId: data._id,
metaName: metaName,
operation: operation,
data: data,
userId: user._id,
ts: new Date(),
},
{ session: dbSession },
);

return result.insertedId ? successReturn(result.insertedId) : errorReturn('Error on writeAheadLog');
} catch (e) {
const message = `Error on writeAheadLog ${metaName}: ${(e as Error).message}`;
logger.error(e, message);
return errorReturn(message);
}
}

return successReturn(null);
}

async function removeWAL(payload: Awaited<ReturnType<typeof writeAheadLog>>) {
if (payload.success === false || payload.data === null) {
return;
}

if (MetaObject.Namespace.plan?.useExternalKonsistent === true && Konsistent.isQueueEnabled) {
await Konsistent.LogCollection.deleteOne({ _id: payload.data });
}
}
12 changes: 5 additions & 7 deletions src/imports/konsistent/processIncomingChange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import * as References from './updateReferences';
import { DataDocument } from '@imports/types/data';
import { ClientSession, MongoServerError } from 'mongodb';

type Action = 'create' | 'update' | 'delete';

const logTimeSpent = (startTime: [number, number], message: string) => {
const totalTime = process.hrtime(startTime);
logger.debug(`${totalTime[0]}s ${totalTime[1] / 1000000}ms => ${message}`);
Expand All @@ -16,26 +14,26 @@ const logTimeSpent = (startTime: [number, number], message: string) => {
export default async function processIncomingChange(
metaName: string,
incomingChange: DataDocument,
action: Action,
operation: string,
user: object,
changedProps: Record<string, any>,
dbSession?: ClientSession,
) {
let startTime = process.hrtime();

try {
if (action === 'update') {
if (operation === 'update') {
await References.updateLookups(metaName, incomingChange._id, changedProps, dbSession);
logTimeSpent(startTime, `Updated lookup references for ${metaName}`);
}

await processReverseLookups(metaName, incomingChange._id, incomingChange, action);
await processReverseLookups(metaName, incomingChange._id, incomingChange, operation);
logTimeSpent(startTime, `Process'd reverse lookups for ${metaName}`);

await References.updateRelations(metaName, action, incomingChange._id, incomingChange, dbSession);
await References.updateRelations(metaName, operation, incomingChange._id, incomingChange, dbSession);
logTimeSpent(startTime, `Updated relation references for ${metaName}`);

await createHistory(metaName, action, incomingChange._id, user, new Date(), changedProps, dbSession);
await createHistory(metaName, operation, incomingChange._id, user, new Date(), changedProps, dbSession);
logTimeSpent(startTime, `Created history for ${metaName}`);
} catch (e) {
if ((e as MongoServerError).codeName === 'NoSuchTransaction') {
Expand Down
9 changes: 4 additions & 5 deletions src/imports/konsistent/updateReferences/relationReferences.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ import { logger } from '@imports/utils/logger';
import { ClientSession, Collection, FindOptions } from 'mongodb';
import updateRelationReference from './relationReference';

type Action = 'update' | 'create' | 'delete';
const CONCURRENCY = 5;

export default async function updateRelationReferences(metaName: string, action: Action, id: string, data: Record<string, any>, dbSession?: ClientSession) {
export default async function updateRelationReferences(metaName: string, operation: string, id: string, data: Record<string, any>, dbSession?: ClientSession) {
// Get references from meta
let relation, relations, relationsFromDocumentName;
const references = MetaObject.References[metaName];
Expand All @@ -33,14 +32,14 @@ export default async function updateRelationReferences(metaName: string, action:
let collection = MetaObject.Collections[metaName];

// If action is delete then get collection trash
if (action === 'delete') {
if (operation === 'delete') {
collection = MetaObject.Collections[`${metaName}.Trash`];
}

const referencesToUpdate: Record<string, Relation[]> = {};

// If action is create or delete then update all records with data related in this record
if (action !== 'update') {
if (operation !== 'update') {
for (relationsFromDocumentName in references.relationsFrom) {
relations = references.relationsFrom[relationsFromDocumentName];
referencesToUpdate[relationsFromDocumentName] = relations;
Expand Down Expand Up @@ -121,7 +120,7 @@ export default async function updateRelationReferences(metaName: string, action:
}

// If action is update and the lookup field of relation was updated go to hitory to update old relation
if (lookupId.length > 0 && action === 'update' && has(data, `${relation.lookup}._id`)) {
if (lookupId.length > 0 && operation === 'update' && has(data, `${relation.lookup}._id`)) {
// Try to get history model
const historyCollection = MetaObject.Collections[`${metaName}.History`];

Expand Down
9 changes: 9 additions & 0 deletions src/imports/types/Konsistent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export type LogDocument = {
_id: string;
dataId: string;
metaName: string;
operation: string;
data: Record<string, any>;
userId: string;
ts: Date;
};

0 comments on commit 9b6147a

Please sign in to comment.