diff --git a/src/api/controller/api/precomputed.js b/src/api/controller/api/precomputed.js index 542905cf8..97a99b024 100644 --- a/src/api/controller/api/precomputed.js +++ b/src/api/controller/api/precomputed.js @@ -90,6 +90,7 @@ export const precomputedAction = async (ctx, action, id) => { { id, jobType: PRECOMPUTER, + action: 'askForPrecomputed', tenant: ctx.tenant, subLabel: precomputed.name, }, @@ -111,6 +112,7 @@ export const precomputedAction = async (ctx, action, id) => { { id, jobType: PRECOMPUTER, + action: 'askForPrecomputed', tenant: ctx.tenant, subLabel: precomputed.name, }, diff --git a/src/api/controller/webhook.js b/src/api/controller/webhook.js index ec18bb142..69c789a05 100644 --- a/src/api/controller/webhook.js +++ b/src/api/controller/webhook.js @@ -2,45 +2,38 @@ import Koa from 'koa'; import route from 'koa-route'; import bodyParser from 'koa-bodyparser'; -import getLogger from '../services/logger'; -import { getPrecomputedCollectionForWebHook } from '../services/repositoryMiddleware'; -import { - getComputedFromWebservice, - getFailureFromWebservice, -} from '../services/precomputed/precomputed'; +import { v1 as uuid } from 'uuid'; + +import { workerQueues } from '../workers'; +import { PRECOMPUTER } from '../workers/precomputer'; export const getComputedWebserviceData = async ctx => { const { precomputedId, tenant, jobId, failure } = ctx.request.query; const { identifier, generator, state } = ctx.request.body; - const logger = getLogger(ctx.tenant); - logger.info(`Precompute webhook call for ${tenant}`); - logger.info('Query', ctx.request.query); - - ctx.precomputed = await getPrecomputedCollectionForWebHook(tenant); const callId = JSON.stringify([{ id: generator, value: identifier }]); + const data = { + id: precomputedId, + jobType: PRECOMPUTER, + action: 'getPrecomputed', + tenant: tenant, + callId, + failure, + state, + askForPrecomputedJobId: jobId, + }; + + // if error message, pass it to the worker if (failure !== undefined) { const { type, message } = ctx.request.body.error; - logger.info('Precompute webservice call with failure'); - await getFailureFromWebservice( - ctx, - tenant, - precomputedId, - jobId, - type, - message, - ); - ctx.body = 'webhook failure'; - ctx.status = 200; - return; + data.error = { type, message }; } - if (state !== 'ready') { - return; - } - - await getComputedFromWebservice(ctx, tenant, precomputedId, callId, jobId); - + await workerQueues[tenant].add( + PRECOMPUTER, // Name of the job + data, + { jobId: uuid() }, + ); ctx.body = 'webhook ok'; ctx.status = 200; }; diff --git a/src/api/services/precomputed/precomputed.js b/src/api/services/precomputed/precomputed.js index 742d787fb..99ed2bdfa 100644 --- a/src/api/services/precomputed/precomputed.js +++ b/src/api/services/precomputed/precomputed.js @@ -14,7 +14,7 @@ import { ERROR, CANCELED, } from '../../../common/taskStatus'; -import { PRECOMPUTING, PENDING } from '../../../common/progressStatus'; +import { PRECOMPUTING } from '../../../common/progressStatus'; import { jobLogger } from '../../workers/tools'; import { CancelWorkerError, workerQueues } from '../../workers'; import { PRECOMPUTER } from '../../workers/precomputer'; @@ -305,14 +305,25 @@ const extractResultFromZip = async (tenant, job, room, data) => { return result; }; -export const getComputedFromWebservice = async ( - ctx, - tenant, - precomputedId, - callId, - jobId, -) => { - progress.incrementProgress(tenant, 60); +export const getComputedFromWebservice = async ctx => { + const tenant = ctx.tenant; + const { id: precomputedId, callId, askForPrecomputedJobId } = ctx.job.data; + + const { + webServiceUrl, + name: precomputedName, + } = await ctx.precomputed.findOneById(precomputedId); + const webServiceBaseURL = new RegExp('.*\\/').exec(webServiceUrl)[0]; + + progress.start(ctx.tenant, { + status: PRECOMPUTING, + target: 100, + label: 'PRECOMPUTING', + subLabel: precomputedName, + type: 'precomputer', + }); + + progress.setProgress(tenant, 55); if (!tenant || !precomputedId || !callId) { throw new Error( `Precompute webhook error: missing data ${JSON.stringify({ @@ -322,25 +333,26 @@ export const getComputedFromWebservice = async ( })}`, ); } - + progress.setProgress(tenant, 65); const workerQueue = workerQueues[tenant]; - const jobs = await workerQueue.getCompleted(); - const job = jobs.filter(job => { - const { id, jobType, tenant: jobTenant } = job.data; + const completedJobs = await workerQueue.getCompleted(); + const askForPrecomputedJob = completedJobs.filter(completedJob => { + const { id, jobType, tenant: jobTenant } = completedJob.data; return ( id === precomputedId && jobType === PRECOMPUTER && jobTenant === tenant && - `${tenant}-precomputed-job-${job.opts.jobId}` === jobId + `${tenant}-precomputed-job-${completedJob.opts.jobId}` === + askForPrecomputedJobId ); })?.[0]; - if (!job) { + if (!askForPrecomputedJob) { throw new CancelWorkerError('Job has been canceled'); } - progress.incrementProgress(tenant, 70); - const room = `${tenant}-precomputed-job-${jobId}`; + progress.setProgress(tenant, 75); + const room = `${tenant}-precomputed-job-${askForPrecomputedJobId}`; const logData = JSON.stringify({ level: 'ok', @@ -348,15 +360,12 @@ export const getComputedFromWebservice = async ( timestamp: new Date(), status: IN_PROGRESS, }); - jobLogger.info(job, logData); + jobLogger.info(askForPrecomputedJob, logData); notifyListeners(room, logData); //WS doc here: //openapi.services.istex.fr/?urls.primaryName=data-computer%20-%20Calculs%20sur%20fichier%20coprus%20compress%C3%A9#/data-computer/post-v1-collect - const { webServiceUrl } = await ctx.precomputed.findOneById(precomputedId); - const webServiceBaseURL = new RegExp('.*\\/').exec(webServiceUrl)[0]; - try { const ROUTE = { RETRIEVE: 'retrieve', COLLECT: 'collect' }; const response = await fetch(`${webServiceBaseURL}${ANSWER_ROUTE}`, { @@ -367,6 +376,7 @@ export const getComputedFromWebservice = async ( }, compress: false, }); + progress.setProgress(tenant, 85); if (response.status === 200) { let data = response.body; if (ANSWER_ROUTE === ROUTE.RETRIEVE) { @@ -376,9 +386,14 @@ export const getComputedFromWebservice = async ( timestamp: new Date(), status: IN_PROGRESS, }); - jobLogger.info(job, logData); + jobLogger.info(askForPrecomputedJob, logData); notifyListeners(room, logData); - data = await extractResultFromZip(tenant, job, room, data); + data = await extractResultFromZip( + tenant, + askForPrecomputedJob, + room, + data, + ); } await ctx.precomputed.updateStatus(precomputedId, FINISHED, { @@ -386,9 +401,9 @@ export const getComputedFromWebservice = async ( }); await ctx.precomputed.updateStartedAt(precomputedId, null); - job.progress(100); - const isFailed = await job.isFailed(); - notifyListeners(`${job.data.tenant}-precomputer`, { + askForPrecomputedJob.progress(100); + const isFailed = await askForPrecomputedJob.isFailed(); + notifyListeners(`${askForPrecomputedJob.data.tenant}-precomputer`, { isPrecomputing: false, success: !isFailed, }); @@ -399,7 +414,7 @@ export const getComputedFromWebservice = async ( timestamp: new Date(), status: FINISHED, }); - jobLogger.info(job, logData); + jobLogger.info(askForPrecomputedJob, logData); notifyListeners(room, logData); } else { throw new Error( @@ -413,14 +428,10 @@ export const getComputedFromWebservice = async ( } }; -export const getFailureFromWebservice = async ( - ctx, - tenant, - precomputedId, - jobId, - errorType, - errorMessage, -) => { +export const getFailureFromWebservice = async ctx => { + const { tenant } = ctx.tenant; + const { askForPrecomputedJobId, id: precomputedId, error } = ctx.job.data; + if (!tenant || !precomputedId) { throw new Error( `Precompute webhook failure error: missing data ${JSON.stringify({ @@ -430,15 +441,16 @@ export const getFailureFromWebservice = async ( ); } const workerQueue = workerQueues[tenant]; - const jobs = await workerQueue.getCompleted(); - const job = jobs.filter(job => { + const completedJobs = await workerQueue.getCompleted(); + const job = completedJobs.filter(job => { const { id, jobType, tenant: jobTenant } = job.data; return ( id === precomputedId && jobType === PRECOMPUTER && jobTenant === tenant && - `${tenant}-precomputed-job-${job.opts.jobId}` === jobId + `${tenant}-precomputed-job-${job.opts.jobId}` === + askForPrecomputedJobId ); })?.[0]; @@ -446,10 +458,10 @@ export const getFailureFromWebservice = async ( return; } - const room = `${tenant}-precomputed-job-${jobId}`; + const room = `${tenant}-precomputed-job-${askForPrecomputedJobId}`; await ctx.precomputed.updateStatus(precomputedId, ERROR, { - message: errorMessage, + message: error.message, }); await ctx.precomputed.updateStartedAt(precomputedId, null); @@ -457,7 +469,7 @@ export const getFailureFromWebservice = async ( progress.finish(tenant); const logData = JSON.stringify({ level: 'error', - message: `[Instance: ${tenant}] Precomputing data failed ${errorType} ${errorMessage}`, + message: `[Instance: ${tenant}] Precomputing data failed ${error.type} ${error.message}`, timestamp: new Date(), status: ERROR, }); @@ -503,6 +515,7 @@ export const processPrecomputed = async (precomputed, ctx) => { }); jobLogger.info(ctx.job, logData); notifyListeners(room, logData); + const token = await getTokenFromWebservice( precomputed.webServiceUrl, precomputed._id.toString(), @@ -540,19 +553,18 @@ export const setPrecomputedJobId = async (ctx, precomputedID, job) => { }); }; -export const startPrecomputed = async ctx => { +export const startAskForPrecomputed = async ctx => { const id = ctx.job?.data?.id; const precomputed = await ctx.precomputed.findOneById(id); - if (progress.getProgress(ctx.tenant).status === PENDING) { - progress.start(ctx.tenant, { - status: PRECOMPUTING, - target: 100, - label: 'PRECOMPUTING', - subLabel: precomputed.name, - type: 'precomputer', - }); - } + progress.start(ctx.tenant, { + status: PRECOMPUTING, + target: 100, + label: 'PRECOMPUTING', + subLabel: precomputed.name, + type: 'precomputer', + }); + const room = `precomputed-job-${ctx.job.id}`; const logData = JSON.stringify({ level: 'ok', @@ -565,6 +577,19 @@ export const startPrecomputed = async ctx => { await processPrecomputed(precomputed, ctx); }; +export const startGetPrecomputed = async ctx => { + const logger = getLogger(ctx.tenant); + logger.info(`Precompute webhook call for ${ctx.tenant}`); + + if (ctx.job?.data?.failure !== undefined) { + logger.info('Precompute webservice call with failure'); + await getFailureFromWebservice(ctx); + return; + } + + await getComputedFromWebservice(ctx); +}; + export const setPrecomputedError = async (ctx, err) => { const id = ctx.job?.data?.id; await ctx.precomputed.updateStatus( diff --git a/src/api/workers/precomputer.js b/src/api/workers/precomputer.js index 581ab6456..ed4bbfa19 100644 --- a/src/api/workers/precomputer.js +++ b/src/api/workers/precomputer.js @@ -1,7 +1,8 @@ import { - startPrecomputed, + startAskForPrecomputed, setPrecomputedError, notifyListeners, + startGetPrecomputed, } from '../services/precomputed/precomputed'; import repositoryMiddleware from '../services/repositoryMiddleware'; @@ -29,7 +30,16 @@ const startJobPrecomputed = async job => { success: false, }); const ctx = await prepareContext({ job }); - await startPrecomputed(ctx); + + if (job.data.action === 'askForPrecomputed') { + await startAskForPrecomputed(ctx); + return; + } + + if (job.data.action === 'getPrecomputed') { + await startGetPrecomputed(ctx); + return; + } }; const handlePrecomputedError = async (job, err) => {