From 600f6bb05ad85c3f51d8c023ed389edc0a7f4052 Mon Sep 17 00:00:00 2001 From: Nicolas Thouvenin Date: Wed, 18 Sep 2024 15:18:17 +0200 Subject: [PATCH 1/2] catch the right error and avoid cancelling and deleting the entire dataset --- package-lock.json | 45 +++++++++++------------- package.json | 2 +- src/api/controller/api/upload.js | 10 ++---- src/api/services/fsHelpers.js | 8 ++--- src/api/services/import.js | 60 +++++++++++++++++--------------- src/api/services/saveStream.js | 12 +++---- 6 files changed, 65 insertions(+), 72 deletions(-) diff --git a/package-lock.json b/package-lock.json index e0598f853..95745d00b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -98,7 +98,7 @@ "mongodb": "4.17.1", "mongodb-restore": "1.6.2", "mui-file-dropzone": "4.0.2", - "multistream": "^2.1.1", + "multistream": "4.1.0", "node-polyglot": "^2.3.1", "pdfkit": "0.14.0", "pm2": "5.3.1", @@ -25180,31 +25180,26 @@ } }, "node_modules/multistream": { - "version": "2.1.1", - "license": "MIT", - "dependencies": { - "inherits": "^2.0.1", - "readable-stream": "^2.0.5" - } - }, - "node_modules/multistream/node_modules/readable-stream": { - "version": "2.3.7", - "license": "MIT", - "dependencies": { - "core-util-is": "~1.0.0", - "inherits": "~2.0.3", - "isarray": "~1.0.0", - "process-nextick-args": "~2.0.0", - "safe-buffer": "~5.1.1", - "string_decoder": "~1.1.1", - "util-deprecate": "~1.0.1" - } - }, - "node_modules/multistream/node_modules/string_decoder": { - "version": "1.1.1", - "license": "MIT", + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/multistream/-/multistream-4.1.0.tgz", + "integrity": "sha512-J1XDiAmmNpRCBfIWJv+n0ymC4ABcf/Pl+5YvC5B/D2f/2+8PtHvCNxMPKiQcZyi922Hq69J2YOpb1pTywfifyw==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], "dependencies": { - "safe-buffer": "~5.1.0" + "once": "^1.4.0", + "readable-stream": "^3.6.0" } }, "node_modules/mute-stream": { diff --git a/package.json b/package.json index 5ef47d28b..4ac834359 100644 --- a/package.json +++ b/package.json @@ -144,7 +144,7 @@ "mongodb": "4.17.1", "mongodb-restore": "1.6.2", "mui-file-dropzone": "4.0.2", - "multistream": "^2.1.1", + "multistream": "4.1.0", "node-polyglot": "^2.3.1", "pdfkit": "0.14.0", "pm2": "5.3.1", diff --git a/src/api/controller/api/upload.js b/src/api/controller/api/upload.js index 1bce65d3f..b2e9ececb 100644 --- a/src/api/controller/api/upload.js +++ b/src/api/controller/api/upload.js @@ -11,6 +11,7 @@ import { saveStreamInFile, checkFileExists, getUploadedFileSize, + unlinkFile, } from '../../services/fsHelpers'; import { v1 as uuid } from 'uuid'; @@ -19,13 +20,8 @@ import { IMPORT } from '../../workers/import'; export const requestToStream = (asyncBusboyImpl) => async (req) => { const { files, fields } = await asyncBusboyImpl(req); - - files[0].once('close', () => { - try { - fs.unlinkSync(files[0].path); - } catch (error) { - console.warn(error); - } + files[0].once('close', async () => { + await unlinkFile(files[0].path); }); return { stream: files[0], fields }; }; diff --git a/src/api/services/fsHelpers.js b/src/api/services/fsHelpers.js index 28c93e834..3a108ad41 100644 --- a/src/api/services/fsHelpers.js +++ b/src/api/services/fsHelpers.js @@ -1,7 +1,7 @@ import fs from 'fs'; import range from 'lodash/range'; import rangeRight from 'lodash/rangeRight'; -import multiStream from 'multistream'; +import MultiStream from 'multistream'; import composeAsync from '../../common/lib/composeAsync'; import safePipe from './safePipe'; @@ -29,15 +29,15 @@ export const createWriteStream = (chunkname) => fs.createWriteStream(chunkname); export const createReadStream = (chunkname) => fs.createReadStream(chunkname); export const mergeChunksFactory = - (createReadStreamImpl, multiStreamImpl) => (filename, nbChunks) => { + (createReadStreamImpl) => (filename, nbChunks) => { const sourceStreams = range(1, nbChunks + 1) .map((nb) => `${filename}.${nb}`) .map((chunkname) => createReadStreamImpl(chunkname)); - return multiStreamImpl(sourceStreams); + return new MultiStream(sourceStreams); }; -export const mergeChunks = mergeChunksFactory(createReadStream, multiStream); +export const mergeChunks = mergeChunksFactory(createReadStream); export const getFileStats = (filename) => new Promise((resolve, reject) => { diff --git a/src/api/services/import.js b/src/api/services/import.js index 7ad516d01..32875548e 100644 --- a/src/api/services/import.js +++ b/src/api/services/import.js @@ -14,24 +14,25 @@ import localConfig from '../../../config.json'; ezs.use(ezsBasics); -export const getLoader = (loaderName, loaderEnvironment, loaderHeader) => (stream) => { - const env2query = new URLSearchParams(loaderEnvironment); - return stream - .pipe( - ezs('URLConnect', { - url: `${ - process.env.WORKERS_URL || 'http://localhost:31976' - }/loaders/${loaderName}?${env2query}`, - streaming: true, - timeout: Number(localConfig.timeout) || 120000, - json: false, - encoder: 'transit', - header: loaderHeader, - }), - ) - .pipe(ezs('unpack')); -}; -export const getCustomLoader = async (script, loaderEnvironment) => { +export const getLoader = + (loaderName, loaderEnvironment, loaderHeader) => (stream) => { + const env2query = new URLSearchParams(loaderEnvironment); + return stream + .pipe( + ezs('URLConnect', { + url: `${ + process.env.WORKERS_URL || 'http://localhost:31976' + }/loaders/${loaderName}?${env2query}`, + streaming: true, + timeout: Number(localConfig.timeout) || 120000, + json: false, + encoder: 'transit', + header: loaderHeader, + }), + ) + .pipe(ezs('unpack')); + }; +export const getCustomLoader = (script, loaderEnvironment) => { return (stream) => stream.pipe(ezs('delegate', { script }, loaderEnvironment)); }; @@ -86,37 +87,38 @@ export const startImport = async (ctx) => { if (customLoader) { loaderEnvironment.parser = loaderEnvironment.parser.concat('/custom'); - parseStream = await ctx.getCustomLoader( - customLoader, + parseStream = ctx.getCustomLoader(customLoader, loaderEnvironment); + } else { + parseStream = ctx.getLoader( + parser, loaderEnvironment, + `'X-Request-ID:${fusible}`, ); - } else { - parseStream = ctx.getLoader(parser, loaderEnvironment, `'X-Request-ID:${fusible}`); } let stream; if (url) { stream = await ctx.getStreamFromUrl(url); } if (filename && totalChunks) { - try { - stream = ctx.mergeChunks(filename, totalChunks); - } catch (error) { - throw new Error(`Error while merging chunks: ${error}`); - } + stream = ctx.mergeChunks(filename, totalChunks); } if (text) { loaderEnvironment.source = 'text input'; stream = ctx.getStreamFromText(text); } const inputStream = stream.pipe(ezs(breaker, { fusible })); - const parsedStream = await parseStream(inputStream); + const parsedStream = parseStream(inputStream); const outputStream = parsedStream.pipe(ezs(breaker, { fusible })); - await ctx.saveParsedStream(ctx, outputStream); + + const insertedTotal = await ctx.saveParsedStream(ctx, outputStream); progress.start(ctx.tenant, { status: INDEXATION, type: 'import', }); await ctx.dataset.indexColumns(); + } catch (error) { + console.error('Error during import', error); + throw new Error(`Error during import: ${error}`); } finally { await disableFusible(fusible); progress.finish(ctx.tenant); diff --git a/src/api/services/saveStream.js b/src/api/services/saveStream.js index 9545552ec..809166b66 100644 --- a/src/api/services/saveStream.js +++ b/src/api/services/saveStream.js @@ -1,23 +1,22 @@ import ezs from '@ezs/core'; import progress from './progress'; -import { CancelWorkerError } from '../workers'; async function insert(data, feed) { const method = this.getParam('method'); const ctx = this.getEnv(); + if (!this.nb) { + this.nb = 0; + } if (this.isLast()) { return feed.close(); } - const isActive = await ctx.job?.isActive(); - if (!isActive) { - return feed.stop(new CancelWorkerError('Job has been canceled')); - } try { + this.nb += data.length; const result = await ctx.dataset[method](data); progress.incrementProgress(ctx.tenant, data.length); return feed.send(result); } catch (error) { - return feed.stop(error); + return feed.send(error); } } @@ -31,6 +30,7 @@ export default async (stream, ctx) => { .pipe(ezs(insert, { method }, ctx)) .pipe(ezs.catch()) .on('error', (e) => { + console.error('Error in the import stream pipeline', e); reject(e.sourceError || e); }) .on('data', ({ insertedCount = 0 }) => { From e3a76861df4fc3a6df4a09daba1ae9467ba32e4d Mon Sep 17 00:00:00 2001 From: Nicolas Thouvenin Date: Wed, 18 Sep 2024 16:55:50 +0200 Subject: [PATCH 2/2] usage fixed version of ezs --- package-lock.json | 50 +++++++++++++++++------------- package.json | 4 +-- src/api/controller/api/upload.js | 1 - src/api/services/fsHelpers.spec.js | 25 --------------- src/api/services/import.spec.js | 2 +- 5 files changed, 31 insertions(+), 51 deletions(-) diff --git a/package-lock.json b/package-lock.json index 95745d00b..642dfd609 100644 --- a/package-lock.json +++ b/package-lock.json @@ -29,9 +29,9 @@ "@emotion/react": "11.7.1", "@emotion/styled": "11.6.0", "@ezs/analytics": "2.3.2", - "@ezs/basics": "2.7.1", + "@ezs/basics": "2.7.2", "@ezs/conditor": "2.12.2", - "@ezs/core": "3.10.3", + "@ezs/core": "3.10.4", "@ezs/istex": "1.5.9", "@ezs/lodex": "file:./packages/ezsLodex", "@ezs/sparql": "1.2.3", @@ -3140,9 +3140,9 @@ } }, "node_modules/@ezs/basics": { - "version": "2.7.1", - "resolved": "https://registry.npmjs.org/@ezs/basics/-/basics-2.7.1.tgz", - "integrity": "sha512-t/IPlXOCHlWq5R9S2P6IfQTahbLJb7v4FwLQvt0QzjtTjMgcPENWTmZ+Ew7ml7pRcKIpCit3KkVzha1f65FpIg==", + "version": "2.7.2", + "resolved": "https://registry.npmjs.org/@ezs/basics/-/basics-2.7.2.tgz", + "integrity": "sha512-eJcaWgQuWzg5VTuFngLOG56X97ZWwAiFS/2+K39reuN3w9WF9pycK36M6R0Ehmm4H7I7ZyCblrDZ84Dr4EtdRA==", "dependencies": { "async-retry": "1.3.3", "better-https-proxy-agent": "1.0.9", @@ -3158,7 +3158,7 @@ "JSONStream": "1.3.5", "lodash": "4.17.21", "make-dir": "4.0.0", - "micromatch": "4.0.4", + "micromatch": "4.0.8", "node-abort-controller": "1.1.0", "parse-headers": "2.0.4", "path-exists": "4.0.0", @@ -3441,9 +3441,9 @@ } }, "node_modules/@ezs/core": { - "version": "3.10.3", - "resolved": "https://registry.npmjs.org/@ezs/core/-/core-3.10.3.tgz", - "integrity": "sha512-0YpXZZfV0WEr26+mFmBpXScfvr5/DKpf2/1rJP9Vzleing2OJYkAyxep8wzgh+NEp+fZbkdXsXHaCKVyG3Q2AA==", + "version": "3.10.4", + "resolved": "https://registry.npmjs.org/@ezs/core/-/core-3.10.4.tgz", + "integrity": "sha512-Xf6bGDTSE1xd2NHmN/OTQVzWml3DWqr0oatvca4NyzzKMxdUdTsIRnppvft67KAacPQYxzLxQHrsSDTRpKK2vQ==", "dependencies": { "app-module-path": "2.2.0", "autocast": "0.0.4", @@ -9956,10 +9956,11 @@ } }, "node_modules/braces": { - "version": "3.0.2", - "license": "MIT", + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/braces/-/braces-3.0.3.tgz", + "integrity": "sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA==", "dependencies": { - "fill-range": "^7.0.1" + "fill-range": "^7.1.1" }, "engines": { "node": ">=8" @@ -15643,8 +15644,9 @@ } }, "node_modules/fill-range": { - "version": "7.0.1", - "license": "MIT", + "version": "7.1.1", + "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.1.1.tgz", + "integrity": "sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==", "dependencies": { "to-regex-range": "^5.0.1" }, @@ -18349,7 +18351,8 @@ }, "node_modules/is-number": { "version": "7.0.0", - "license": "MIT", + "resolved": "https://registry.npmjs.org/is-number/-/is-number-7.0.0.tgz", + "integrity": "sha512-41Cifkg6e8TylSpdtTpeLVMqvSBEVzTttHvERD741+pnZ8ANv0004MRL43QKPDlK9cGvNp6NZWZUBlbGXYxxng==", "engines": { "node": ">=0.12.0" } @@ -24089,11 +24092,12 @@ } }, "node_modules/micromatch": { - "version": "4.0.4", - "license": "MIT", + "version": "4.0.8", + "resolved": "https://registry.npmjs.org/micromatch/-/micromatch-4.0.8.tgz", + "integrity": "sha512-PXwfBhYu0hBCPw8Dn0E+WDYb7af3dSLVWKi3HGv84IdF4TyFoC0ysxFd0Goxw7nSv4T/PzEJQxsYsEiFCKo2BA==", "dependencies": { - "braces": "^3.0.1", - "picomatch": "^2.2.3" + "braces": "^3.0.3", + "picomatch": "^2.3.1" }, "engines": { "node": ">=8.6" @@ -26518,8 +26522,9 @@ "license": "ISC" }, "node_modules/picomatch": { - "version": "2.3.0", - "license": "MIT", + "version": "2.3.1", + "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-2.3.1.tgz", + "integrity": "sha512-JU3teHTNjmE2VCGFzuY8EXzCDVwEqB2a8fsIvwaStHhAWJEeVd1o1QD80CU6+ZdEXXSLbSsuLwJjkCBWqRQUVA==", "engines": { "node": ">=8.6" }, @@ -30992,7 +30997,8 @@ }, "node_modules/to-regex-range": { "version": "5.0.1", - "license": "MIT", + "resolved": "https://registry.npmjs.org/to-regex-range/-/to-regex-range-5.0.1.tgz", + "integrity": "sha512-65P7iz6X5yEr1cwcgvQxbbIw7Uk3gOy5dIdtZ4rDveLqhrdJP+Li/Hx6tyK0NEb+2GCyneCMJiGqrADCSNk8sQ==", "dependencies": { "is-number": "^7.0.0" }, diff --git a/package.json b/package.json index 4ac834359..813ee10b9 100644 --- a/package.json +++ b/package.json @@ -75,9 +75,9 @@ "@emotion/react": "11.7.1", "@emotion/styled": "11.6.0", "@ezs/analytics": "2.3.2", - "@ezs/basics": "2.7.1", + "@ezs/basics": "2.7.2", "@ezs/conditor": "2.12.2", - "@ezs/core": "3.10.3", + "@ezs/core": "3.10.4", "@ezs/istex": "1.5.9", "@ezs/lodex": "file:./packages/ezsLodex", "@ezs/sparql": "1.2.3", diff --git a/src/api/controller/api/upload.js b/src/api/controller/api/upload.js index b2e9ececb..4470cd404 100644 --- a/src/api/controller/api/upload.js +++ b/src/api/controller/api/upload.js @@ -3,7 +3,6 @@ import route from 'koa-route'; import asyncBusboy from '@recuperateur/async-busboy'; import config from 'config'; import koaBodyParser from 'koa-bodyparser'; -import fs from 'fs'; import progress from '../../services/progress'; import { PENDING, UPLOADING_DATASET } from '../../../common/progressStatus'; diff --git a/src/api/services/fsHelpers.spec.js b/src/api/services/fsHelpers.spec.js index 28bbd165f..8f7e20bba 100644 --- a/src/api/services/fsHelpers.spec.js +++ b/src/api/services/fsHelpers.spec.js @@ -99,29 +99,4 @@ describe('fsHelpers', () => { }); }); }); - - describe('mergeChunksFactory', () => { - const createReadStreamImpl = jest.fn((v) => `read stream for ${v}`); - const multiStreamImpl = jest.fn(() => 'merged stream'); - beforeAll(async () => { - await mergeChunksFactory(createReadStreamImpl, multiStreamImpl)( - 'filename', - 3, - ); - }); - - it('should have called createReadStreamImpl with each generated chunkname', () => { - expect(createReadStreamImpl).toHaveBeenCalledWith('filename.1'); - expect(createReadStreamImpl).toHaveBeenCalledWith('filename.2'); - expect(createReadStreamImpl).toHaveBeenCalledWith('filename.3'); - }); - - it('should have called multiStreamImpl with created array of readStream and write stream', () => { - expect(multiStreamImpl).toHaveBeenCalledWith([ - 'read stream for filename.1', - 'read stream for filename.2', - 'read stream for filename.3', - ]); - }); - }); }); diff --git a/src/api/services/import.spec.js b/src/api/services/import.spec.js index 079cb19c1..9d299199b 100644 --- a/src/api/services/import.spec.js +++ b/src/api/services/import.spec.js @@ -107,7 +107,7 @@ describe.skip('import', () => { }, update: jest.fn(), }, - getCustomLoader: jest.fn().mockImplementation(() => loader), + getCustomLoader: () => loader, mergeChunks: jest.fn().mockImplementation(() => 'stream'), clearChunks: jest.fn(), saveParsedStream: jest.fn(),