Skip to content

Commit

Permalink
attemps to use ezs fusible for loaders
Browse files Browse the repository at this point in the history
  • Loading branch information
touv committed Jun 28, 2024
1 parent e4e4caf commit bf65a66
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
25 changes: 21 additions & 4 deletions src/api/services/import.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import ezs from '@ezs/core';
import ezsBasics from '@ezs/basics';
import {
createFusible,
enableFusible,
disableFusible,
} from '@ezs/core/lib/fusible';
import breaker from '@ezs/core/lib/statements/breaker';
import fetch from 'fetch-with-proxy';
import progress from './progress';
import { INDEXATION, SAVING_DATASET } from '../../common/progressStatus';
Expand All @@ -8,7 +14,7 @@ import localConfig from '../../../config.json';

ezs.use(ezsBasics);

export const getLoader = (loaderName, loaderEnvironment) => (stream) => {
export const getLoader = (loaderName, loaderEnvironment, loaderHeader) => (stream) => {
const env2query = new URLSearchParams(loaderEnvironment);
return stream
.pipe(
Expand All @@ -20,6 +26,7 @@ export const getLoader = (loaderName, loaderEnvironment) => (stream) => {
timeout: Number(localConfig.timeout) || 120000,
json: false,
encoder: 'transit',
header: loaderHeader,
}),
)
.pipe(ezs('unpack'));
Expand Down Expand Up @@ -51,6 +58,7 @@ export const startImport = async (ctx) => {
extension,
customLoader,
} = ctx.job?.data || {};
let fusible;
try {
if (progress.status !== SAVING_DATASET) {
progress.start(ctx.tenant, {
Expand All @@ -66,6 +74,12 @@ export const startImport = async (ctx) => {
source,
parser,
};
fusible = await createFusible();
await enableFusible(fusible);
ctx.job.update({
...ctx.job.data,
fusible,
});
let parseStream;
if (customLoader) {
loaderEnvironment.parser =
Expand All @@ -75,7 +89,7 @@ export const startImport = async (ctx) => {
loaderEnvironment,
);
} else {
parseStream = ctx.getLoader(parser, loaderEnvironment);
parseStream = ctx.getLoader(parser, loaderEnvironment, `'X-Request-ID:${fusible}`);
}
let stream;
if (url) {
Expand All @@ -92,14 +106,17 @@ export const startImport = async (ctx) => {
loaderEnvironment.source = 'text input';
stream = ctx.getStreamFromText(text);
}
const parsedStream = await parseStream(stream);
await ctx.saveParsedStream(ctx, parsedStream);
const inputStream = stream.pipe(ezs(breaker, { fusible }));
const parsedStream = await parseStream(inputStream);
const outputStream = parsedStream.pipe(ezs(breaker, { fusible }));
await ctx.saveParsedStream(ctx, outputStream);
progress.start(ctx.tenant, {
status: INDEXATION,
type: 'import',
});
await ctx.dataset.indexColumns();
} finally {
await disableFusible(fusible);
progress.finish(ctx.tenant);
if (filename && totalChunks) {
await ctx.clearChunks(filename, totalChunks);
Expand Down
5 changes: 5 additions & 0 deletions src/api/workers/tools.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { cleanWaitingJobsOfType, workerQueues } from '.';
import { disableFusible } from '@ezs/core/lib/fusible';

import { ERROR } from '../../common/progressStatus';
import getLogger from '../services/logger';
import progress from '../services/progress';
Expand Down Expand Up @@ -59,6 +61,9 @@ export const getWaitingJobs = async (tenant) => {

export const cancelJob = async (ctx, jobType, subLabel = null) => {
const activeJob = await getActiveJob(ctx.tenant);
if (activeJob.data.fusible) {
await disableFusible(activeJob.data.fusible);
}
if (activeJob?.data?.jobType === jobType) {
if (jobType === 'publisher') {
await cleanWaitingJobsOfType(ctx.tenant, activeJob.data.jobType);
Expand Down

0 comments on commit bf65a66

Please sign in to comment.