Skip to content

Commit

Permalink
persist: improve errors logging + lastSyncDate = new Date if undefined (
Browse files Browse the repository at this point in the history
#1586)

* persist: improve errors logging

* persist API: pass lastSyncDate = new Date if undefined

* persist: use nangoConnectionId in route path

connectionId is user-defined and can contain url-unfriendly character
let's use the nangoConnectionId instead which is an int
  • Loading branch information
TBonnin authored Jan 31, 2024
1 parent e098f61 commit e22bd0c
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 56 deletions.
61 changes: 32 additions & 29 deletions packages/persist/lib/controllers/persist.controller.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Request, Response } from 'express';
import type { NextFunction, Request, Response } from 'express';
import {
setLastSyncDate,
createActivityLogMessage,
Expand All @@ -22,7 +22,7 @@ type persistType = 'save' | 'delete' | 'update';
type RecordRequest = Request<
{
environmentId: number;
connectionId: string;
nangoConnectionId: number;
syncId: string;
syncJobId: number;
},
Expand All @@ -31,7 +31,7 @@ type RecordRequest = Request<
model: string;
records: Record<string, any>[];
providerConfigKey: string;
nangoConnectionId: number;
connectionId: string;
activityLogId: number;
trackDeletes: boolean;
lastSyncDate: Date;
Expand All @@ -41,7 +41,7 @@ type RecordRequest = Request<
>;

class PersistController {
public async saveLastSyncDate(req: Request<{ syncId: string }, any, { lastSyncDate: Date }, any, Record<string, any>>, res: Response) {
public async saveLastSyncDate(req: Request<{ syncId: string }, any, { lastSyncDate: Date }, any, Record<string, any>>, res: Response, next: NextFunction) {
const {
params: { syncId },
body: { lastSyncDate }
Expand All @@ -50,13 +50,14 @@ class PersistController {
if (result) {
res.status(201).send();
} else {
res.status(500).json({ error: `Failed to save last sync date '${lastSyncDate}' for sync '${syncId}'` });
next(new Error(`Failed to save last sync date '${lastSyncDate}' for sync '${syncId}'`));
}
}

public async saveActivityLog(
req: Request<{ environmentId: number }, any, { activityLogId: number; level: LogLevel; msg: string }, any, Record<string, any>>,
res: Response
res: Response,
next: NextFunction
) {
const {
params: { environmentId },
Expand All @@ -75,14 +76,14 @@ class PersistController {
if (result) {
res.status(201).send();
} else {
res.status(500).json({ error: `Failed to save log ${activityLogId}` });
next(new Error(`Failed to save log ${activityLogId}`));
}
}

public async saveRecords(req: RecordRequest, res: Response) {
public async saveRecords(req: RecordRequest, res: Response, next: NextFunction) {
const {
params: { environmentId, connectionId, syncId, syncJobId },
body: { model, records, providerConfigKey, nangoConnectionId, trackDeletes, lastSyncDate, activityLogId }
params: { environmentId, nangoConnectionId, syncId, syncJobId },
body: { model, records, providerConfigKey, connectionId, trackDeletes, lastSyncDate, activityLogId }
} = req;
const persist = async (dataRecords: DataRecord[]) => {
return await dataService.upsert(
Expand Down Expand Up @@ -113,13 +114,17 @@ class PersistController {
softDelete: false,
persistFunction: persist
});
PersistController.sendRes(res, result, 'Failed to save records');
if (result.ok) {
res.status(201).send();
} else {
next(new Error(`'Failed to save records': ${result.error.message}`));
}
}

public async deleteRecords(req: RecordRequest, res: Response) {
public async deleteRecords(req: RecordRequest, res: Response, next: NextFunction) {
const {
params: { environmentId, connectionId, syncId, syncJobId },
body: { model, records, providerConfigKey, nangoConnectionId, trackDeletes, lastSyncDate, activityLogId }
params: { environmentId, nangoConnectionId, syncId, syncJobId },
body: { model, records, providerConfigKey, connectionId, trackDeletes, lastSyncDate, activityLogId }
} = req;
const persist = async (dataRecords: DataRecord[]) => {
return await dataService.upsert(
Expand Down Expand Up @@ -150,13 +155,17 @@ class PersistController {
softDelete: true,
persistFunction: persist
});
PersistController.sendRes(res, result, 'Failed to delete records');
if (result.ok) {
res.status(201).send();
} else {
next(new Error(`'Failed to delete records': ${result.error.message}`));
}
}

public async updateRecords(req: RecordRequest, res: Response) {
public async updateRecords(req: RecordRequest, res: Response, next: NextFunction) {
const {
params: { environmentId, connectionId, syncId, syncJobId },
body: { model, records, providerConfigKey, nangoConnectionId, trackDeletes, lastSyncDate, activityLogId }
params: { environmentId, nangoConnectionId, syncId, syncJobId },
body: { model, records, providerConfigKey, connectionId, trackDeletes, lastSyncDate, activityLogId }
} = req;
const persist = async (dataRecords: DataRecord[]) => {
return await dataService.updateRecord(
Expand Down Expand Up @@ -185,7 +194,11 @@ class PersistController {
softDelete: false,
persistFunction: persist
});
PersistController.sendRes(res, result, 'Failed to update records');
if (result.ok) {
res.status(201).send();
} else {
next(new Error(`'Failed to update records': ${result.error.message}`));
}
}

private static async persistRecords({
Expand Down Expand Up @@ -325,16 +338,6 @@ class PersistController {
return err(errMsg);
}
}

private static sendRes(res: Response, result: Result<void>, errorMsg: string) {
if (result.ok) {
res.status(201).send();
} else {
res.status(500).json({
error: `${errorMsg}: ${result.error.message}`
});
}
}
}

export default new PersistController();
32 changes: 22 additions & 10 deletions packages/persist/lib/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,18 @@ export const server = express();
server.use(express.json());

server.use((req: Request, res: Response, next: NextFunction) => {
const originalSend = res.send;
res.send = function (body: any) {
if (res.statusCode >= 400) {
console.log(`[Persist] [Error] ${req.method} ${req.path} ${res.statusCode} '${JSON.stringify(body)}'`);
}
originalSend.call(this, body) as any;
return this;
};
next();
console.log(`[Persist] ${req.method} ${req.path} ${res.statusCode}`);
if (res.statusCode < 400) {
console.log(`[Persist] ${req.method} ${req.path} ${res.statusCode}`);
}
});

server.get('/health', (_req: Request, res: Response) => {
Expand Down Expand Up @@ -52,15 +62,15 @@ server.post(
const validateRecordsRequest = validateRequest({
params: z.object({
environmentId: z.string().transform(Number).pipe(z.number().int().positive()) as unknown as z.ZodNumber,
connectionId: z.string(),
nangoConnectionId: z.string().transform(Number).pipe(z.number().int().positive()) as unknown as z.ZodNumber,
syncId: z.string(),
syncJobId: z.string().transform(Number).pipe(z.number().int().positive()) as unknown as z.ZodNumber
}),
body: z.object({
model: z.string(),
records: z.any().array().nonempty(),
providerConfigKey: z.string(),
nangoConnectionId: z.number(),
connectionId: z.string(),
activityLogId: z.number(),
lastSyncDate: z
.string()
Expand All @@ -70,13 +80,15 @@ const validateRecordsRequest = validateRequest({
trackDeletes: z.boolean()
})
});
server.post('/environment/:environmentId/connection/:connectionId/sync/:syncId/job/:syncJobId/records', validateRecordsRequest, persistController.saveRecords);
server.delete(
'/environment/:environmentId/connection/:connectionId/sync/:syncId/job/:syncJobId/records',
validateRecordsRequest,
persistController.deleteRecords
);
server.put('/environment/:environmentId/connection/:connectionId/sync/:syncId/job/:syncJobId/records', validateRecordsRequest, persistController.updateRecords);
const recordPath = '/environment/:environmentId/connection/:nangoConnectionId/sync/:syncId/job/:syncJobId/records';
server.post(recordPath, validateRecordsRequest, persistController.saveRecords);
server.delete(recordPath, validateRecordsRequest, persistController.deleteRecords);
server.put(recordPath, validateRecordsRequest, persistController.updateRecords);

server.use((_req: Request, res: Response, next: NextFunction) => {
res.status(404);
next();
});

server.use((err: Error, _req: Request, res: Response, next: NextFunction) => {
if (err) {
Expand Down
16 changes: 8 additions & 8 deletions packages/persist/lib/server.unit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ describe('Persist API', () => {

describe('save records', () => {
it('should error if no records', async () => {
const response = await fetch(`${serverUrl}/environment/123/connection/myconn/sync/abc/job/101/records`, {
const response = await fetch(`${serverUrl}/environment/123/connection/456/sync/abc/job/101/records`, {
method: 'POST',
body: JSON.stringify({
model: 'MyModel',
records: [],
providerConfigKey: 'provider',
nangoConnectionId: 456,
connectionId: 'myconn',
lastSyncDate: new Date(),
trackDeletes: false,
softDelete: true
Expand All @@ -93,13 +93,13 @@ describe('Persist API', () => {
{ id: 2, name: 'r2' }
];
dbTracker.on('query', DBTracker.persistQueries(model));
const response = await fetch(`${serverUrl}/environment/123/connection/myconn/sync/abc/job/101/records`, {
const response = await fetch(`${serverUrl}/environment/123/connection/456/sync/abc/job/101/records`, {
method: 'POST',
body: JSON.stringify({
model,
records: records,
providerConfigKey: 'provider',
nangoConnectionId: 456,
connectionId: 'myconn',
activityLogId: 12,
lastSyncDate: new Date(),
trackDeletes: false
Expand All @@ -119,13 +119,13 @@ describe('Persist API', () => {
{ id: 2, name: 'r2' }
];
dbTracker.on('query', DBTracker.persistQueries(model));
const response = await fetch(`${serverUrl}/environment/123/connection/myconn/sync/abc/job/101/records`, {
const response = await fetch(`${serverUrl}/environment/123/connection/456/sync/abc/job/101/records`, {
method: 'DELETE',
body: JSON.stringify({
model,
records: records,
providerConfigKey: 'provider',
nangoConnectionId: 456,
connectionId: 'myconn',
activityLogId: 12,
lastSyncDate: new Date(),
trackDeletes: false
Expand All @@ -144,13 +144,13 @@ describe('Persist API', () => {
{ id: 2, name: 'r2' }
];
dbTracker.on('query', DBTracker.persistQueries(model));
const response = await fetch(`${serverUrl}/environment/123/connection/myconn/sync/abc/job/101/records`, {
const response = await fetch(`${serverUrl}/environment/123/connection/456/sync/abc/job/101/records`, {
method: 'PUT',
body: JSON.stringify({
model,
records: records,
providerConfigKey: 'provider',
nangoConnectionId: 456,
connectionId: 'myconn',
activityLogId: 12,
lastSyncDate: new Date(),
trackDeletes: false,
Expand Down
18 changes: 9 additions & 9 deletions packages/shared/lib/sdk/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -607,14 +607,14 @@ export class NangoSync extends NangoAction {
const batch = results.slice(i, i + this.batchSize);
const response = await persistApi({
method: 'POST',
url: `/environment/${this.environmentId}/connection/${this.connectionId}/sync/${this.syncId}/job/${this.syncJobId}/records`,
url: `/environment/${this.environmentId}/connection/${this.nangoConnectionId}/sync/${this.syncId}/job/${this.syncJobId}/records`,
data: {
model,
records: batch,
providerConfigKey: this.providerConfigKey,
nangoConnectionId: this.nangoConnectionId,
connectionId: this.connectionId,
activityLogId: this.activityLogId,
lastSyncDate: this.lastSyncDate,
lastSyncDate: this.lastSyncDate || new Date(),
trackDeletes: this.track_deletes
}
});
Expand Down Expand Up @@ -744,14 +744,14 @@ export class NangoSync extends NangoAction {
const batch = results.slice(i, i + this.batchSize);
const response = await persistApi({
method: 'DELETE',
url: `/environment/${this.environmentId}/connection/${this.connectionId}/sync/${this.syncId}/job/${this.syncJobId}/records`,
url: `/environment/${this.environmentId}/connection/${this.nangoConnectionId}/sync/${this.syncId}/job/${this.syncJobId}/records`,
data: {
model,
records: batch,
providerConfigKey: this.providerConfigKey,
nangoConnectionId: this.nangoConnectionId,
connectionId: this.connectionId,
activityLogId: this.activityLogId,
lastSyncDate: this.lastSyncDate,
lastSyncDate: this.lastSyncDate || new Date(),
trackDeletes: this.track_deletes
}
});
Expand Down Expand Up @@ -883,14 +883,14 @@ export class NangoSync extends NangoAction {
const batch = results.slice(i, i + this.batchSize);
const response = await persistApi({
method: 'PUT',
url: `/environment/${this.environmentId}/connection/${this.connectionId}/sync/${this.syncId}/job/${this.syncJobId}/records`,
url: `/environment/${this.environmentId}/connection/${this.nangoConnectionId}/sync/${this.syncId}/job/${this.syncJobId}/records`,
data: {
model,
records: batch,
providerConfigKey: this.providerConfigKey,
nangoConnectionId: this.nangoConnectionId,
connectionId: this.connectionId,
activityLogId: this.activityLogId,
lastSyncDate: this.lastSyncDate,
lastSyncDate: this.lastSyncDate || new Date(),
trackDeletes: this.track_deletes
}
});
Expand Down

0 comments on commit e22bd0c

Please sign in to comment.