diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js index 4498f0f3905be2..e89bf67006b752 100644 --- a/lib/internal/streams/duplexify.js +++ b/lib/internal/streams/duplexify.js @@ -83,15 +83,19 @@ module.exports = function duplexify(body, name) { } if (typeof body === 'function') { - const { value, write, final, destroy } = fromAsyncGen(body); + let d; + + const { value, write, final, destroy } = fromAsyncGen(body, () => { + if (d) destroyer(d); + }); // Body might be a constructor function instead of an async generator function. if (isDuplexNodeStream(value)) { - return value; + return d = value; } if (isIterable(value)) { - return from(Duplexify, value, { + return d = from(Duplexify, value, { // TODO (ronag): highWaterMark? objectMode: true, write, @@ -102,12 +106,11 @@ module.exports = function duplexify(body, name) { const then = value?.then; if (typeof then === 'function') { - let d; - const promise = FunctionPrototypeCall( then, value, (val) => { + destroyer(d, null); if (val != null) { throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val); } @@ -208,11 +211,12 @@ module.exports = function duplexify(body, name) { body); }; -function fromAsyncGen(fn) { +function fromAsyncGen(fn, destructor) { let { promise, resolve } = createDeferredPromise(); const ac = new AbortController(); const signal = ac.signal; - const value = fn(async function*() { + + const asyncGenerator = (async function* () { while (true) { const _promise = promise; promise = null; @@ -224,7 +228,44 @@ function fromAsyncGen(fn) { ({ promise, resolve } = createDeferredPromise()); yield chunk; } - }(), { signal }); + })(); + + // Not using try/finally because asyncGenerator.return() should work even + // if the generator was never started. + + const originalReturn = asyncGenerator.return; + asyncGenerator.return = async function(value) { + try { + return await originalReturn.call(this, value); + } finally { + if (promise) { + const _promise = promise; + promise = null; + const { cb } = await _promise; + process.nextTick(cb); + + destructor(); + } + } + }; + + const originalThrow = asyncGenerator.throw; + asyncGenerator.throw = async function(err) { + try { + return await originalThrow.call(this, err); + } finally { + if (promise) { + const _promise = promise; + promise = null; + const { cb } = await _promise; + + // asyncGenerator.throw(undefined) should cause a callback error + process.nextTick(cb, err ?? new AbortError()); + } + } + }; + + const value = fn(asyncGenerator, { signal }); return { value, diff --git a/test/parallel/test-stream-duplex-from.js b/test/parallel/test-stream-duplex-from.js index e3c117ff8dedb0..bb83527582d21b 100644 --- a/test/parallel/test-stream-duplex-from.js +++ b/test/parallel/test-stream-duplex-from.js @@ -5,6 +5,7 @@ const assert = require('assert'); const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream'); const { ReadableStream, WritableStream } = require('stream/web'); const { Blob } = require('buffer'); +const sleep = require('util').promisify(setTimeout); { const d = Duplex.from({ @@ -401,3 +402,136 @@ function makeATestWritableStream(writeFunc) { assert.strictEqual(d.writable, false); })); } + +{ + const r = Readable.from(['foo', 'bar', 'bar']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + const values = await Array.fromAsync(asyncGenerator); + assert.deepStrictEqual(values, ['foo', 'bar', 'bar']); + + await asyncGenerator.return(); + await asyncGenerator.return(); + await asyncGenerator.return(); + }), + common.mustSucceed(() => { + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'bar']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + // eslint-disable-next-line no-unused-vars + for await (const _ of asyncGenerator) break; + }), + common.mustSucceed(() => { + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + const a = await asyncGenerator.next(); + assert.strictEqual(a.done, false); + assert.strictEqual(a.value.toString(), 'foo'); + const b = await asyncGenerator.return(); + assert.strictEqual(b.done, true); + }), + common.mustSucceed(() => { + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + // Note: the generator is not even started at this point + await asyncGenerator.return(); + }), + common.mustSucceed(() => { + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + // Same as before, with a delay + await sleep(100); + await asyncGenerator.return(); + }), + common.mustSucceed(() => { + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) {}), + common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + await sleep(100); + }), + common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + await asyncGenerator.throw(new Error('my error')); + }), + common.mustCall((err) => { + assert.strictEqual(err.message, 'my error'); + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + await asyncGenerator.next(); + await asyncGenerator.throw(new Error('my error')); + }), + common.mustCall((err) => { + assert.strictEqual(err.message, 'my error'); + assert.strictEqual(r.destroyed, true); + }) + ); +}