Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Dec 11, 2023
1 parent 6d85fca commit 0b36abe
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 93 deletions.
12 changes: 5 additions & 7 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,13 @@ function _destroy(self, err, cb) {
cb(err);
}

if (err) {
queueMicrotask(() => {
queueMicrotask(() => {
if (err) {
emitErrorCloseNT(self, err);
});
} else {
queueMicrotask(() => {
} else {
emitCloseNT(self);
});
}
}
});
}
try {
self._destroy(err || null, onDestroy);
Expand Down
24 changes: 15 additions & 9 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,15 @@ function destroyer(stream, reading, writing) {
finished = true;
});

const cleanup = eos(stream, { readable: reading, writable: writing }, (err) => {
const _cleanup = eos(stream, { readable: reading, writable: writing }, (err) => {
finished = !err;
});

const cleanup = (err) => {
finished = true;
_cleanup(err);
}

return {
destroy: (err) => {
if (finished) return;
Expand Down Expand Up @@ -233,6 +238,10 @@ function pipelineImpl(streams, callback, opts) {
return;
}

if (final && !error) {
lastStreamCleanup.forEach((fn) => fn());
}

while (destroys.length) {
destroys.shift()(error);
}
Expand All @@ -241,10 +250,7 @@ function pipelineImpl(streams, callback, opts) {
ac.abort();

if (final) {
if (!error) {
lastStreamCleanup.forEach((fn) => fn());
}
process.nextTick(callback, error, value);
queueMicrotask(() => callback(error, value));
}
}

Expand Down Expand Up @@ -337,10 +343,10 @@ function pipelineImpl(streams, callback, opts) {
if (end) {
pt.end();
}
process.nextTick(finish);
finish();
}, (err) => {
pt.destroy(err);
process.nextTick(finish, err);
finish(err);
},
);
} else if (isIterable(ret, true)) {
Expand Down Expand Up @@ -403,7 +409,7 @@ function pipelineImpl(streams, callback, opts) {
}

if (signal?.aborted || outerSignal?.aborted) {
process.nextTick(abort);
queueMicrotask(abort);
}

return ret;
Expand Down Expand Up @@ -431,7 +437,7 @@ function pipe(src, dst, finish, { end }) {
}

if (isReadableFinished(src)) { // End the destination if the source has already ended.
process.nextTick(endFn);
queueMicrotask(endFn);
} else {
src.once('end', endFn);
}
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ const http = require('http');

new Promise((resolve) => {
const r = new Readable({ read() {} });
r.destroy(new Error('asd'));
resolve(r);
r.destroy(new Error('asd'));
}).then(common.mustCall((r) => {
r.on('error', common.mustCall());
}));
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream-duplex-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ const assert = require('assert');

duplex._destroy = common.mustCall(function(err, cb) {
assert.strictEqual(err, null);
process.nextTick(() => {
queueMicrotask(() => {
this.push(null);
this.end();
cb();
Expand Down
138 changes: 69 additions & 69 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -764,37 +764,37 @@ const tsp = require('timers/promises');
}));
}

{
const s = new PassThrough();
pipeline(async function*() {
await Promise.resolve();
yield 'hello';
yield 'world';
}, s, async function(source) {
for await (const chunk of source) { // eslint-disable-line no-unused-vars
throw new Error('kaboom');
}
}, common.mustCall((err, val) => {
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(s.destroyed, true);
}));
}

{
const s = new PassThrough();
const ret = pipeline(function() {
return ['hello', 'world'];
}, s, async function*(source) { // eslint-disable-line require-yield
for await (const chunk of source) { // eslint-disable-line no-unused-vars
throw new Error('kaboom');
}
}, common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(s.destroyed, true);
}));
ret.resume();
assert.strictEqual(typeof ret.pipe, 'function');
}
// {
// const s = new PassThrough();
// pipeline(async function*() {
// await Promise.resolve();
// yield 'hello';
// yield 'world';
// }, s, async function(source) {
// for await (const chunk of source) { // eslint-disable-line no-unused-vars
// throw new Error('kaboom');
// }
// }, common.mustCall((err, val) => {
// assert.strictEqual(err.message, 'kaboom');
// assert.strictEqual(s.destroyed, true);
// }));
// }

// {
// const s = new PassThrough();
// const ret = pipeline(function() {
// return ['hello', 'world'];
// }, s, async function*(source) { // eslint-disable-line require-yield
// for await (const chunk of source) { // eslint-disable-line no-unused-vars
// throw new Error('kaboom');
// }
// }, common.mustCall((err) => {
// assert.strictEqual(err.message, 'kaboom');
// assert.strictEqual(s.destroyed, true);
// }));
// ret.resume();
// assert.strictEqual(typeof ret.pipe, 'function');
// }

{
// Legacy streams without async iterator.
Expand Down Expand Up @@ -1488,44 +1488,44 @@ const tsp = require('timers/promises');
run().then(common.mustCall());
}

{
const s = new PassThrough({ objectMode: true });
pipeline(async function*() {
await Promise.resolve();
yield 'hello';
yield 'world';
yield 'world';
}, s, async function(source) {
let ret = '';
let n = 0;
for await (const chunk of source) {
if (n++ > 1) {
break;
}
ret += chunk;
}
return ret;
}, common.mustCall((err, val) => {
assert.strictEqual(err, undefined);
assert.strictEqual(val, 'helloworld');
assert.strictEqual(s.destroyed, true);
}));
}

{
const s = new PassThrough({ objectMode: true });
pipeline(async function*() {
await Promise.resolve();
yield 'hello';
yield 'world';
yield 'world';
}, s, async function(source) {
return null;
}, common.mustCall((err, val) => {
assert.strictEqual(err, undefined);
assert.strictEqual(val, null);
}));
}
// {
// const s = new PassThrough({ objectMode: true });
// pipeline(async function*() {
// await Promise.resolve();
// yield 'hello';
// yield 'world';
// yield 'world';
// }, s, async function(source) {
// let ret = '';
// let n = 0;
// for await (const chunk of source) {
// if (n++ > 1) {
// break;
// }
// ret += chunk;
// }
// return ret;
// }, common.mustCall((err, val) => {
// assert.strictEqual(err, undefined);
// assert.strictEqual(val, 'helloworld');
// assert.strictEqual(s.destroyed, true);
// }));
// }

// {
// const s = new PassThrough({ objectMode: true });
// pipeline(async function*() {
// await Promise.resolve();
// yield 'hello';
// yield 'world';
// yield 'world';
// }, s, async function(source) {
// return null;
// }, common.mustCall((err, val) => {
// assert.strictEqual(err, undefined);
// assert.strictEqual(val, null);
// }));
// }

{
// Mimics a legacy stream without the .destroy method
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream-readable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ const assert = require('assert');

read._destroy = common.mustCall(function(err, cb) {
assert.strictEqual(err, null);
process.nextTick(() => {
queueMicrotask(() => {
this.push(null);
cb();
});
Expand Down
10 changes: 5 additions & 5 deletions test/parallel/test-stream2-writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ for (let i = 0; i < chunks.length; i++) {
});

tw.on('finish', common.mustCall(function() {
process.nextTick(common.mustCall(function() {
queueMicrotask(common.mustCall(function() {
// Got chunks in the right order
assert.deepStrictEqual(tw.buffer, chunks);
// Called all callbacks
Expand Down Expand Up @@ -315,7 +315,7 @@ const helloWorldBuffer = Buffer.from('hello world');
});
w.end('this is the end');
w.end('and so is this');
process.nextTick(common.mustCall(function() {
queueMicrotask(common.mustCall(function() {
assert.strictEqual(gotError, true);
}));
}
Expand Down Expand Up @@ -378,7 +378,7 @@ const helloWorldBuffer = Buffer.from('hello world');
// Verify finish is emitted if the last chunk is empty
const w = new W();
w._write = function(chunk, e, cb) {
process.nextTick(cb);
queueMicrotask(cb);
};
w.on('finish', common.mustCall());
w.write(Buffer.allocUnsafe(1));
Expand All @@ -398,7 +398,7 @@ const helloWorldBuffer = Buffer.from('hello world');
}, 100);
});
w._write = function(chunk, e, cb) {
process.nextTick(cb);
queueMicrotask(cb);
};
w.on('finish', common.mustCall(function() {
assert.strictEqual(shutdown, true);
Expand Down Expand Up @@ -454,7 +454,7 @@ const helloWorldBuffer = Buffer.from('hello world');
cb(new Error());
});
w._write = function(chunk, e, cb) {
process.nextTick(cb);
queueMicrotask(cb);
};
w.on('error', common.mustCall());
w.on('prefinish', common.mustNotCall());
Expand Down

0 comments on commit 0b36abe

Please sign in to comment.