Skip to content

Commit

Permalink
feat(bundles ans-104): add a 30 second timeout when streaming bundle …
Browse files Browse the repository at this point in the history
…data PE-4306

Adds a timeout that aborts the bundle data stream if it pauses for more
than 30 seconds (default). This prevents slow streams from blocking the
unbundling queue.

Also adds temp file cleanup in more error conditions.
  • Loading branch information
djwhitt committed Aug 8, 2023
1 parent e84b52c commit 18f5adc
Showing 1 changed file with 46 additions and 3 deletions.
49 changes: 46 additions & 3 deletions src/lib/ans-104.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const DATA_ITEM_MATCHED: ParseEventName = 'data-item-matched';
const UNBUNDLE_COMPLETE: ParseEventName = 'unbundle-complete';
const UNBUNDLE_ERROR: ParseEventName = 'unbundle-error';

const DEFAULT_STREAM_TIMEOUT = 1000 * 30; // 30 seconds

interface ParserMessage {
eventName: ParseEventName;
dataItem?: NormalizedDataItem;
Expand Down Expand Up @@ -100,6 +102,7 @@ export class Ans104Parser {
private log: winston.Logger;
private worker: Worker;
private contiguousDataSource: ContiguousDataSource;
private streamTimeout: number;
private unbundlePromiseResolve: (() => void) | undefined;
private unbundlePromiseReject: ((reason?: any) => void) | undefined;

Expand All @@ -108,14 +111,17 @@ export class Ans104Parser {
eventEmitter,
contiguousDataSource,
dataItemIndexFilterString,
streamTimeout = DEFAULT_STREAM_TIMEOUT,
}: {
log: winston.Logger;
eventEmitter: EventEmitter;
contiguousDataSource: ContiguousDataSource;
dataItemIndexFilterString: string;
streamTimeout?: number;
}) {
this.log = log.child({ class: 'Ans104Parser' });
this.contiguousDataSource = contiguousDataSource;
this.streamTimeout = streamTimeout;

const workerUrl = new URL('./ans-104.js', import.meta.url);
this.worker = new Worker(workerUrl, {
Expand Down Expand Up @@ -176,25 +182,55 @@ export class Ans104Parser {
}): Promise<void> {
const unbundlePromise: Promise<void> = new Promise(
async (resolve, reject) => {
let bundlePath: string | undefined;
try {
this.unbundlePromiseResolve = resolve;
this.unbundlePromiseReject = reject;
const log = this.log.child({ parentId });

// Get data stream
const data = await this.contiguousDataSource.getData(parentId);

// Construct temp path for passing data to worker
await fsPromises.mkdir(path.join(process.cwd(), 'data/tmp/ans-104'), {
recursive: true,
});
const data = await this.contiguousDataSource.getData(parentId);
const bundlePath = path.join(
bundlePath = path.join(
process.cwd(),
'data/tmp/ans-104',
`${parentId}-${Math.random().toString(36).substring(2, 15)}`,
);

// Setup timeout for stalled data streams
let timeout: NodeJS.Timeout;
const resetTimeout = () => {
if (timeout !== undefined) {
clearTimeout(timeout);
}
timeout = setTimeout(() => {
data.stream.destroy(new Error('Timeout'));
}, this.streamTimeout);
};
data.stream.on('data', resetTimeout);
data.stream.pause();

// Write data stream to temp file
const writeStream = fs.createWriteStream(bundlePath);
pipeline(data.stream, writeStream, (error) => {
pipeline(data.stream, writeStream, async (error) => {
if (error !== undefined) {
reject(error);
this.resetUnbundlePromise();
log.error('Error writing ANS-104 bundle stream', error);
if (bundlePath !== undefined) {
try {
await fsPromises.unlink(bundlePath);
} catch (error) {
log.error(
'Error deleting ANS-104 temporary bundle file',
error,
);
}
}
} else {
log.info('Parsing ANS-104 bundle stream...');
this.worker.postMessage({
Expand All @@ -208,6 +244,13 @@ export class Ans104Parser {
} catch (error) {
reject(error);
this.resetUnbundlePromise();
if (bundlePath !== undefined) {
try {
await fsPromises.unlink(bundlePath);
} catch (error) {
log.error('Error deleting ANS-104 temporary bundle file', error);
}
}
}
},
);
Expand Down

0 comments on commit 18f5adc

Please sign in to comment.