From fd8de670da4d2a80e21e0f8a12ec7a6b0a13014d Mon Sep 17 00:00:00 2001 From: jakecastelli <38635403+jakecastelli@users.noreply.github.com> Date: Thu, 19 Dec 2024 22:19:34 +1030 Subject: [PATCH] stream: catch and forward error from dest.write PR-URL: https://github.com/nodejs/node/pull/55270 Fixes: https://github.com/nodejs/node/issues/54945 Reviewed-By: Matteo Collina Reviewed-By: Robert Nagy Reviewed-By: Benjamin Gruenbaum Reviewed-By: James M Snell --- lib/internal/streams/readable.js | 13 +++- ...tream-pipe-objectmode-to-non-objectmode.js | 73 +++++++++++++++++++ 2 files changed, 82 insertions(+), 4 deletions(-) create mode 100644 test/parallel/test-stream-pipe-objectmode-to-non-objectmode.js diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 26ff5ec17c6f0c..ca8b4bcc851684 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -1004,10 +1004,15 @@ Readable.prototype.pipe = function(dest, pipeOpts) { src.on('data', ondata); function ondata(chunk) { debug('ondata'); - const ret = dest.write(chunk); - debug('dest.write', ret); - if (ret === false) { - pause(); + try { + const ret = dest.write(chunk); + debug('dest.write', ret); + + if (ret === false) { + pause(); + } + } catch (error) { + dest.destroy(error); } } diff --git a/test/parallel/test-stream-pipe-objectmode-to-non-objectmode.js b/test/parallel/test-stream-pipe-objectmode-to-non-objectmode.js new file mode 100644 index 00000000000000..bf5cfc0434bbe3 --- /dev/null +++ b/test/parallel/test-stream-pipe-objectmode-to-non-objectmode.js @@ -0,0 +1,73 @@ +'use strict'; + +const common = require('../common'); +const assert = require('node:assert'); +const { Readable, Transform, Writable } = require('node:stream'); + +// Pipeine objects from object mode to non-object mode should throw an error and +// catch by the consumer +{ + const objectReadable = Readable.from([ + { hello: 'hello' }, + { world: 'world' }, + ]); + + const passThrough = new Transform({ + transform(chunk, _encoding, cb) { + this.push(chunk); + cb(null); + }, + }); + + passThrough.on('error', common.mustCall()); + + objectReadable.pipe(passThrough); + + assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of passThrough); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); +} + +// The error should be properly forwarded when the readable stream is in object mode, +// the writable stream is in non-object mode, and the data is string. +{ + const stringReadable = Readable.from(['hello', 'world']); + + const passThrough = new Transform({ + transform(chunk, _encoding, cb) { + this.push(chunk); + throw new Error('something went wrong'); + }, + }); + + passThrough.on('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'something went wrong'); + })); + + stringReadable.pipe(passThrough); +} + +// The error should be properly forwarded when the readable stream is in object mode, +// the writable stream is in non-object mode, and the data is buffer. +{ + const binaryData = Buffer.from('binary data'); + + const binaryReadable = new Readable({ + read() { + this.push(binaryData); + this.push(null); + } + }); + + const binaryWritable = new Writable({ + write(chunk, _encoding, cb) { + throw new Error('something went wrong'); + } + }); + + binaryWritable.on('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'something went wrong'); + })); + binaryReadable.pipe(binaryWritable); +}