From bf65a6601513da64a891234310ad7fedfb66d8cc Mon Sep 17 00:00:00 2001 From: Nicolas Thouvenin Date: Fri, 28 Jun 2024 16:19:43 +0200 Subject: [PATCH] attemps to use ezs fusible for loaders --- src/api/services/import.js | 25 +++++++++++++++++++++---- src/api/workers/tools.js | 5 +++++ 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/src/api/services/import.js b/src/api/services/import.js index e391c3ed4..effc1865d 100644 --- a/src/api/services/import.js +++ b/src/api/services/import.js @@ -1,5 +1,11 @@ import ezs from '@ezs/core'; import ezsBasics from '@ezs/basics'; +import { + createFusible, + enableFusible, + disableFusible, +} from '@ezs/core/lib/fusible'; +import breaker from '@ezs/core/lib/statements/breaker'; import fetch from 'fetch-with-proxy'; import progress from './progress'; import { INDEXATION, SAVING_DATASET } from '../../common/progressStatus'; @@ -8,7 +14,7 @@ import localConfig from '../../../config.json'; ezs.use(ezsBasics); -export const getLoader = (loaderName, loaderEnvironment) => (stream) => { +export const getLoader = (loaderName, loaderEnvironment, loaderHeader) => (stream) => { const env2query = new URLSearchParams(loaderEnvironment); return stream .pipe( @@ -20,6 +26,7 @@ export const getLoader = (loaderName, loaderEnvironment) => (stream) => { timeout: Number(localConfig.timeout) || 120000, json: false, encoder: 'transit', + header: loaderHeader, }), ) .pipe(ezs('unpack')); @@ -51,6 +58,7 @@ export const startImport = async (ctx) => { extension, customLoader, } = ctx.job?.data || {}; + let fusible; try { if (progress.status !== SAVING_DATASET) { progress.start(ctx.tenant, { @@ -66,6 +74,12 @@ export const startImport = async (ctx) => { source, parser, }; + fusible = await createFusible(); + await enableFusible(fusible); + ctx.job.update({ + ...ctx.job.data, + fusible, + }); let parseStream; if (customLoader) { loaderEnvironment.parser = @@ -75,7 +89,7 @@ export const startImport = async (ctx) => { loaderEnvironment, ); } else { - parseStream = ctx.getLoader(parser, loaderEnvironment); + parseStream = ctx.getLoader(parser, loaderEnvironment, `'X-Request-ID:${fusible}`); } let stream; if (url) { @@ -92,14 +106,17 @@ export const startImport = async (ctx) => { loaderEnvironment.source = 'text input'; stream = ctx.getStreamFromText(text); } - const parsedStream = await parseStream(stream); - await ctx.saveParsedStream(ctx, parsedStream); + const inputStream = stream.pipe(ezs(breaker, { fusible })); + const parsedStream = await parseStream(inputStream); + const outputStream = parsedStream.pipe(ezs(breaker, { fusible })); + await ctx.saveParsedStream(ctx, outputStream); progress.start(ctx.tenant, { status: INDEXATION, type: 'import', }); await ctx.dataset.indexColumns(); } finally { + await disableFusible(fusible); progress.finish(ctx.tenant); if (filename && totalChunks) { await ctx.clearChunks(filename, totalChunks); diff --git a/src/api/workers/tools.js b/src/api/workers/tools.js index c3c9d9e95..07d1f4759 100644 --- a/src/api/workers/tools.js +++ b/src/api/workers/tools.js @@ -1,4 +1,6 @@ import { cleanWaitingJobsOfType, workerQueues } from '.'; +import { disableFusible } from '@ezs/core/lib/fusible'; + import { ERROR } from '../../common/progressStatus'; import getLogger from '../services/logger'; import progress from '../services/progress'; @@ -59,6 +61,9 @@ export const getWaitingJobs = async (tenant) => { export const cancelJob = async (ctx, jobType, subLabel = null) => { const activeJob = await getActiveJob(ctx.tenant); + if (activeJob.data.fusible) { + await disableFusible(activeJob.data.fusible); + } if (activeJob?.data?.jobType === jobType) { if (jobType === 'publisher') { await cleanWaitingJobsOfType(ctx.tenant, activeJob.data.jobType);