Skip to content

Commit

Permalink
Merge pull request #1801 from Inist-CNRS/fix/precomputed-logs
Browse files Browse the repository at this point in the history
Fix(precomputed): Handle logs and reception from webhook
  • Loading branch information
JulienMattiussi authored Nov 27, 2023
2 parents 503858b + f9aaa1d commit 12d34e6
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 82 deletions.
2 changes: 2 additions & 0 deletions src/api/controller/api/precomputed.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ export const precomputedAction = async (ctx, action, id) => {
{
id,
jobType: PRECOMPUTER,
action: 'askForPrecomputed',
tenant: ctx.tenant,
subLabel: precomputed.name,
},
Expand All @@ -111,6 +112,7 @@ export const precomputedAction = async (ctx, action, id) => {
{
id,
jobType: PRECOMPUTER,
action: 'askForPrecomputed',
tenant: ctx.tenant,
subLabel: precomputed.name,
},
Expand Down
51 changes: 22 additions & 29 deletions src/api/controller/webhook.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,38 @@ import Koa from 'koa';
import route from 'koa-route';
import bodyParser from 'koa-bodyparser';

import getLogger from '../services/logger';
import { getPrecomputedCollectionForWebHook } from '../services/repositoryMiddleware';
import {
getComputedFromWebservice,
getFailureFromWebservice,
} from '../services/precomputed/precomputed';
import { v1 as uuid } from 'uuid';

import { workerQueues } from '../workers';
import { PRECOMPUTER } from '../workers/precomputer';

export const getComputedWebserviceData = async ctx => {
const { precomputedId, tenant, jobId, failure } = ctx.request.query;
const { identifier, generator, state } = ctx.request.body;
const logger = getLogger(ctx.tenant);
logger.info(`Precompute webhook call for ${tenant}`);
logger.info('Query', ctx.request.query);

ctx.precomputed = await getPrecomputedCollectionForWebHook(tenant);
const callId = JSON.stringify([{ id: generator, value: identifier }]);

const data = {
id: precomputedId,
jobType: PRECOMPUTER,
action: 'getPrecomputed',
tenant: tenant,
callId,
failure,
state,
askForPrecomputedJobId: jobId,
};

// if error message, pass it to the worker
if (failure !== undefined) {
const { type, message } = ctx.request.body.error;
logger.info('Precompute webservice call with failure');
await getFailureFromWebservice(
ctx,
tenant,
precomputedId,
jobId,
type,
message,
);
ctx.body = 'webhook failure';
ctx.status = 200;
return;
data.error = { type, message };
}

if (state !== 'ready') {
return;
}

await getComputedFromWebservice(ctx, tenant, precomputedId, callId, jobId);

await workerQueues[tenant].add(
PRECOMPUTER, // Name of the job
data,
{ jobId: uuid() },
);
ctx.body = 'webhook ok';
ctx.status = 200;
};
Expand Down
127 changes: 76 additions & 51 deletions src/api/services/precomputed/precomputed.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
ERROR,
CANCELED,
} from '../../../common/taskStatus';
import { PRECOMPUTING, PENDING } from '../../../common/progressStatus';
import { PRECOMPUTING } from '../../../common/progressStatus';
import { jobLogger } from '../../workers/tools';
import { CancelWorkerError, workerQueues } from '../../workers';
import { PRECOMPUTER } from '../../workers/precomputer';
Expand Down Expand Up @@ -305,14 +305,25 @@ const extractResultFromZip = async (tenant, job, room, data) => {
return result;
};

export const getComputedFromWebservice = async (
ctx,
tenant,
precomputedId,
callId,
jobId,
) => {
progress.incrementProgress(tenant, 60);
export const getComputedFromWebservice = async ctx => {
const tenant = ctx.tenant;
const { id: precomputedId, callId, askForPrecomputedJobId } = ctx.job.data;

const {
webServiceUrl,
name: precomputedName,
} = await ctx.precomputed.findOneById(precomputedId);
const webServiceBaseURL = new RegExp('.*\\/').exec(webServiceUrl)[0];

progress.start(ctx.tenant, {
status: PRECOMPUTING,
target: 100,
label: 'PRECOMPUTING',
subLabel: precomputedName,
type: 'precomputer',
});

progress.setProgress(tenant, 55);
if (!tenant || !precomputedId || !callId) {
throw new Error(
`Precompute webhook error: missing data ${JSON.stringify({
Expand All @@ -322,41 +333,39 @@ export const getComputedFromWebservice = async (
})}`,
);
}

progress.setProgress(tenant, 65);
const workerQueue = workerQueues[tenant];
const jobs = await workerQueue.getCompleted();
const job = jobs.filter(job => {
const { id, jobType, tenant: jobTenant } = job.data;
const completedJobs = await workerQueue.getCompleted();
const askForPrecomputedJob = completedJobs.filter(completedJob => {
const { id, jobType, tenant: jobTenant } = completedJob.data;

return (
id === precomputedId &&
jobType === PRECOMPUTER &&
jobTenant === tenant &&
`${tenant}-precomputed-job-${job.opts.jobId}` === jobId
`${tenant}-precomputed-job-${completedJob.opts.jobId}` ===
askForPrecomputedJobId
);
})?.[0];

if (!job) {
if (!askForPrecomputedJob) {
throw new CancelWorkerError('Job has been canceled');
}
progress.incrementProgress(tenant, 70);
const room = `${tenant}-precomputed-job-${jobId}`;
progress.setProgress(tenant, 75);
const room = `${tenant}-precomputed-job-${askForPrecomputedJobId}`;

const logData = JSON.stringify({
level: 'ok',
message: `[Instance: ${tenant}] Webservice response ok`,
timestamp: new Date(),
status: IN_PROGRESS,
});
jobLogger.info(job, logData);
jobLogger.info(askForPrecomputedJob, logData);
notifyListeners(room, logData);

//WS doc here:
//openapi.services.istex.fr/?urls.primaryName=data-computer%20-%20Calculs%20sur%20fichier%20coprus%20compress%C3%A9#/data-computer/post-v1-collect

const { webServiceUrl } = await ctx.precomputed.findOneById(precomputedId);
const webServiceBaseURL = new RegExp('.*\\/').exec(webServiceUrl)[0];

try {
const ROUTE = { RETRIEVE: 'retrieve', COLLECT: 'collect' };
const response = await fetch(`${webServiceBaseURL}${ANSWER_ROUTE}`, {
Expand All @@ -367,6 +376,7 @@ export const getComputedFromWebservice = async (
},
compress: false,
});
progress.setProgress(tenant, 85);
if (response.status === 200) {
let data = response.body;
if (ANSWER_ROUTE === ROUTE.RETRIEVE) {
Expand All @@ -376,19 +386,24 @@ export const getComputedFromWebservice = async (
timestamp: new Date(),
status: IN_PROGRESS,
});
jobLogger.info(job, logData);
jobLogger.info(askForPrecomputedJob, logData);
notifyListeners(room, logData);
data = await extractResultFromZip(tenant, job, room, data);
data = await extractResultFromZip(
tenant,
askForPrecomputedJob,
room,
data,
);
}

await ctx.precomputed.updateStatus(precomputedId, FINISHED, {
data,
});
await ctx.precomputed.updateStartedAt(precomputedId, null);

job.progress(100);
const isFailed = await job.isFailed();
notifyListeners(`${job.data.tenant}-precomputer`, {
askForPrecomputedJob.progress(100);
const isFailed = await askForPrecomputedJob.isFailed();
notifyListeners(`${askForPrecomputedJob.data.tenant}-precomputer`, {
isPrecomputing: false,
success: !isFailed,
});
Expand All @@ -399,7 +414,7 @@ export const getComputedFromWebservice = async (
timestamp: new Date(),
status: FINISHED,
});
jobLogger.info(job, logData);
jobLogger.info(askForPrecomputedJob, logData);
notifyListeners(room, logData);
} else {
throw new Error(
Expand All @@ -413,14 +428,10 @@ export const getComputedFromWebservice = async (
}
};

export const getFailureFromWebservice = async (
ctx,
tenant,
precomputedId,
jobId,
errorType,
errorMessage,
) => {
export const getFailureFromWebservice = async ctx => {
const { tenant } = ctx.tenant;
const { askForPrecomputedJobId, id: precomputedId, error } = ctx.job.data;

if (!tenant || !precomputedId) {
throw new Error(
`Precompute webhook failure error: missing data ${JSON.stringify({
Expand All @@ -430,34 +441,35 @@ export const getFailureFromWebservice = async (
);
}
const workerQueue = workerQueues[tenant];
const jobs = await workerQueue.getCompleted();
const job = jobs.filter(job => {
const completedJobs = await workerQueue.getCompleted();
const job = completedJobs.filter(job => {
const { id, jobType, tenant: jobTenant } = job.data;

return (
id === precomputedId &&
jobType === PRECOMPUTER &&
jobTenant === tenant &&
`${tenant}-precomputed-job-${job.opts.jobId}` === jobId
`${tenant}-precomputed-job-${job.opts.jobId}` ===
askForPrecomputedJobId
);
})?.[0];

if (!job) {
return;
}

const room = `${tenant}-precomputed-job-${jobId}`;
const room = `${tenant}-precomputed-job-${askForPrecomputedJobId}`;

await ctx.precomputed.updateStatus(precomputedId, ERROR, {
message: errorMessage,
message: error.message,
});
await ctx.precomputed.updateStartedAt(precomputedId, null);

job.progress(100);
progress.finish(tenant);
const logData = JSON.stringify({
level: 'error',
message: `[Instance: ${tenant}] Precomputing data failed ${errorType} ${errorMessage}`,
message: `[Instance: ${tenant}] Precomputing data failed ${error.type} ${error.message}`,
timestamp: new Date(),
status: ERROR,
});
Expand Down Expand Up @@ -503,6 +515,7 @@ export const processPrecomputed = async (precomputed, ctx) => {
});
jobLogger.info(ctx.job, logData);
notifyListeners(room, logData);

const token = await getTokenFromWebservice(
precomputed.webServiceUrl,
precomputed._id.toString(),
Expand Down Expand Up @@ -540,19 +553,18 @@ export const setPrecomputedJobId = async (ctx, precomputedID, job) => {
});
};

export const startPrecomputed = async ctx => {
export const startAskForPrecomputed = async ctx => {
const id = ctx.job?.data?.id;
const precomputed = await ctx.precomputed.findOneById(id);

if (progress.getProgress(ctx.tenant).status === PENDING) {
progress.start(ctx.tenant, {
status: PRECOMPUTING,
target: 100,
label: 'PRECOMPUTING',
subLabel: precomputed.name,
type: 'precomputer',
});
}
progress.start(ctx.tenant, {
status: PRECOMPUTING,
target: 100,
label: 'PRECOMPUTING',
subLabel: precomputed.name,
type: 'precomputer',
});

const room = `precomputed-job-${ctx.job.id}`;
const logData = JSON.stringify({
level: 'ok',
Expand All @@ -565,6 +577,19 @@ export const startPrecomputed = async ctx => {
await processPrecomputed(precomputed, ctx);
};

export const startGetPrecomputed = async ctx => {
const logger = getLogger(ctx.tenant);
logger.info(`Precompute webhook call for ${ctx.tenant}`);

if (ctx.job?.data?.failure !== undefined) {
logger.info('Precompute webservice call with failure');
await getFailureFromWebservice(ctx);
return;
}

await getComputedFromWebservice(ctx);
};

export const setPrecomputedError = async (ctx, err) => {
const id = ctx.job?.data?.id;
await ctx.precomputed.updateStatus(
Expand Down
14 changes: 12 additions & 2 deletions src/api/workers/precomputer.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import {
startPrecomputed,
startAskForPrecomputed,
setPrecomputedError,
notifyListeners,
startGetPrecomputed,
} from '../services/precomputed/precomputed';
import repositoryMiddleware from '../services/repositoryMiddleware';

Expand Down Expand Up @@ -29,7 +30,16 @@ const startJobPrecomputed = async job => {
success: false,
});
const ctx = await prepareContext({ job });
await startPrecomputed(ctx);

if (job.data.action === 'askForPrecomputed') {
await startAskForPrecomputed(ctx);
return;
}

if (job.data.action === 'getPrecomputed') {
await startGetPrecomputed(ctx);
return;
}
};

const handlePrecomputedError = async (job, err) => {
Expand Down

0 comments on commit 12d34e6

Please sign in to comment.