Skip to content

Commit

Permalink
Merge pull request #2153 from Inist-CNRS/2136-erreur-import-gros-fich…
Browse files Browse the repository at this point in the history
…iers

catch the right error and avoid cancelling and deleting the entire da…
  • Loading branch information
touv authored Sep 18, 2024
2 parents c4f56c2 + e3a7686 commit ecc7327
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 123 deletions.
95 changes: 48 additions & 47 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@
"@emotion/react": "11.7.1",
"@emotion/styled": "11.6.0",
"@ezs/analytics": "2.3.2",
"@ezs/basics": "2.7.1",
"@ezs/basics": "2.7.2",
"@ezs/conditor": "2.12.2",
"@ezs/core": "3.10.3",
"@ezs/core": "3.10.4",
"@ezs/istex": "1.5.9",
"@ezs/lodex": "file:./packages/ezsLodex",
"@ezs/sparql": "1.2.3",
Expand Down Expand Up @@ -144,7 +144,7 @@
"mongodb": "4.17.1",
"mongodb-restore": "1.6.2",
"mui-file-dropzone": "4.0.2",
"multistream": "^2.1.1",
"multistream": "4.1.0",
"node-polyglot": "^2.3.1",
"pdfkit": "0.14.0",
"pm2": "5.3.1",
Expand Down
11 changes: 3 additions & 8 deletions src/api/controller/api/upload.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ import route from 'koa-route';
import asyncBusboy from '@recuperateur/async-busboy';
import config from 'config';
import koaBodyParser from 'koa-bodyparser';
import fs from 'fs';

import progress from '../../services/progress';
import { PENDING, UPLOADING_DATASET } from '../../../common/progressStatus';
import {
saveStreamInFile,
checkFileExists,
getUploadedFileSize,
unlinkFile,
} from '../../services/fsHelpers';

import { v1 as uuid } from 'uuid';
Expand All @@ -19,13 +19,8 @@ import { IMPORT } from '../../workers/import';

export const requestToStream = (asyncBusboyImpl) => async (req) => {
const { files, fields } = await asyncBusboyImpl(req);

files[0].once('close', () => {
try {
fs.unlinkSync(files[0].path);
} catch (error) {
console.warn(error);
}
files[0].once('close', async () => {
await unlinkFile(files[0].path);
});
return { stream: files[0], fields };
};
Expand Down
8 changes: 4 additions & 4 deletions src/api/services/fsHelpers.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import fs from 'fs';
import range from 'lodash/range';
import rangeRight from 'lodash/rangeRight';
import multiStream from 'multistream';
import MultiStream from 'multistream';

import composeAsync from '../../common/lib/composeAsync';
import safePipe from './safePipe';
Expand Down Expand Up @@ -29,15 +29,15 @@ export const createWriteStream = (chunkname) => fs.createWriteStream(chunkname);
export const createReadStream = (chunkname) => fs.createReadStream(chunkname);

export const mergeChunksFactory =
(createReadStreamImpl, multiStreamImpl) => (filename, nbChunks) => {
(createReadStreamImpl) => (filename, nbChunks) => {
const sourceStreams = range(1, nbChunks + 1)
.map((nb) => `${filename}.${nb}`)
.map((chunkname) => createReadStreamImpl(chunkname));

return multiStreamImpl(sourceStreams);
return new MultiStream(sourceStreams);
};

export const mergeChunks = mergeChunksFactory(createReadStream, multiStream);
export const mergeChunks = mergeChunksFactory(createReadStream);

export const getFileStats = (filename) =>
new Promise((resolve, reject) => {
Expand Down
25 changes: 0 additions & 25 deletions src/api/services/fsHelpers.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,29 +99,4 @@ describe('fsHelpers', () => {
});
});
});

describe('mergeChunksFactory', () => {
const createReadStreamImpl = jest.fn((v) => `read stream for ${v}`);
const multiStreamImpl = jest.fn(() => 'merged stream');
beforeAll(async () => {
await mergeChunksFactory(createReadStreamImpl, multiStreamImpl)(
'filename',
3,
);
});

it('should have called createReadStreamImpl with each generated chunkname', () => {
expect(createReadStreamImpl).toHaveBeenCalledWith('filename.1');
expect(createReadStreamImpl).toHaveBeenCalledWith('filename.2');
expect(createReadStreamImpl).toHaveBeenCalledWith('filename.3');
});

it('should have called multiStreamImpl with created array of readStream and write stream', () => {
expect(multiStreamImpl).toHaveBeenCalledWith([
'read stream for filename.1',
'read stream for filename.2',
'read stream for filename.3',
]);
});
});
});
60 changes: 31 additions & 29 deletions src/api/services/import.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,25 @@ import localConfig from '../../../config.json';

ezs.use(ezsBasics);

export const getLoader = (loaderName, loaderEnvironment, loaderHeader) => (stream) => {
const env2query = new URLSearchParams(loaderEnvironment);
return stream
.pipe(
ezs('URLConnect', {
url: `${
process.env.WORKERS_URL || 'http://localhost:31976'
}/loaders/${loaderName}?${env2query}`,
streaming: true,
timeout: Number(localConfig.timeout) || 120000,
json: false,
encoder: 'transit',
header: loaderHeader,
}),
)
.pipe(ezs('unpack'));
};
export const getCustomLoader = async (script, loaderEnvironment) => {
export const getLoader =
(loaderName, loaderEnvironment, loaderHeader) => (stream) => {
const env2query = new URLSearchParams(loaderEnvironment);
return stream
.pipe(
ezs('URLConnect', {
url: `${
process.env.WORKERS_URL || 'http://localhost:31976'
}/loaders/${loaderName}?${env2query}`,
streaming: true,
timeout: Number(localConfig.timeout) || 120000,
json: false,
encoder: 'transit',
header: loaderHeader,
}),
)
.pipe(ezs('unpack'));
};
export const getCustomLoader = (script, loaderEnvironment) => {
return (stream) =>
stream.pipe(ezs('delegate', { script }, loaderEnvironment));
};
Expand Down Expand Up @@ -86,37 +87,38 @@ export const startImport = async (ctx) => {
if (customLoader) {
loaderEnvironment.parser =
loaderEnvironment.parser.concat('/custom');
parseStream = await ctx.getCustomLoader(
customLoader,
parseStream = ctx.getCustomLoader(customLoader, loaderEnvironment);
} else {
parseStream = ctx.getLoader(
parser,
loaderEnvironment,
`'X-Request-ID:${fusible}`,
);
} else {
parseStream = ctx.getLoader(parser, loaderEnvironment, `'X-Request-ID:${fusible}`);
}
let stream;
if (url) {
stream = await ctx.getStreamFromUrl(url);
}
if (filename && totalChunks) {
try {
stream = ctx.mergeChunks(filename, totalChunks);
} catch (error) {
throw new Error(`Error while merging chunks: ${error}`);
}
stream = ctx.mergeChunks(filename, totalChunks);
}
if (text) {
loaderEnvironment.source = 'text input';
stream = ctx.getStreamFromText(text);
}
const inputStream = stream.pipe(ezs(breaker, { fusible }));
const parsedStream = await parseStream(inputStream);
const parsedStream = parseStream(inputStream);
const outputStream = parsedStream.pipe(ezs(breaker, { fusible }));
await ctx.saveParsedStream(ctx, outputStream);

const insertedTotal = await ctx.saveParsedStream(ctx, outputStream);
progress.start(ctx.tenant, {
status: INDEXATION,
type: 'import',
});
await ctx.dataset.indexColumns();
} catch (error) {
console.error('Error during import', error);
throw new Error(`Error during import: ${error}`);
} finally {
await disableFusible(fusible);
progress.finish(ctx.tenant);
Expand Down
2 changes: 1 addition & 1 deletion src/api/services/import.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ describe.skip('import', () => {
},
update: jest.fn(),
},
getCustomLoader: jest.fn().mockImplementation(() => loader),
getCustomLoader: () => loader,
mergeChunks: jest.fn().mockImplementation(() => 'stream'),
clearChunks: jest.fn(),
saveParsedStream: jest.fn(),
Expand Down
Loading

0 comments on commit ecc7327

Please sign in to comment.