From 55ff4311a33db49cbc00aa30e679f6e27038bfc9 Mon Sep 17 00:00:00 2001 From: Carlos Espa Date: Wed, 16 Oct 2024 09:28:42 +0200 Subject: [PATCH 1/5] feat: add streams guide Co-authored-by: Matteo Collina --- apps/site/navigation.json | 4 + .../en/learn/modules/how-to-use-streams.md | 595 ++++++++++++++++++ package.json | 2 +- packages/i18n/locales/en.json | 3 +- 4 files changed, 602 insertions(+), 2 deletions(-) create mode 100644 apps/site/pages/en/learn/modules/how-to-use-streams.md diff --git a/apps/site/navigation.json b/apps/site/navigation.json index ea4468a46f6f9..4b481276ead92 100644 --- a/apps/site/navigation.json +++ b/apps/site/navigation.json @@ -302,6 +302,10 @@ "backpressuringInStreams": { "link": "/learn/modules/backpressuring-in-streams", "label": "components.navigation.learn.modules.links.backpressuringInStreams" + }, + "howToUseStreams": { + "link": "/learn/modules/how-to-use-streams", + "label": "components.navigation.learn.modules.links.howToUseStreams" } } }, diff --git a/apps/site/pages/en/learn/modules/how-to-use-streams.md b/apps/site/pages/en/learn/modules/how-to-use-streams.md new file mode 100644 index 0000000000000..9aa583b972738 --- /dev/null +++ b/apps/site/pages/en/learn/modules/how-to-use-streams.md @@ -0,0 +1,595 @@ +--- +title: How to use Streams +layout: learn +--- + +# How To Use Streams + +Working with large amounts of data in Node.js applications can be a double-edged sword. The ability to handle massive amounts of data is extremely handy, but can also lead to performance bottlenecks and memory exhaustion. Traditionally, developers tackled this challenge by reading the entire dataset into memory at once. This approach, while intuitive for smaller datasets, becomes inefficient and resource-intensive for large data (e.g. files, network requests…). + +This is where Node.js streams come in. Streams offer a fundamentally different approach, allowing you to process data incrementally and optimize memory usage. By handling data in manageable chunks, streams empower you to build scalable applications that can efficiently tackle even the most daunting datasets. As popularly quoted, “streams are arrays over time”. + +In this guide we give an overview of the Stream concept, history and API as well as some recommendations on how to use and operate them. + +## What are Node.js Streams? + +Node.js streams offer a powerful abstraction for managing data flow in your applications. They excel at processing large datasets like reading or writing from files and network requests without compromising performance. + +This approach differs from loading the entire dataset into memory at once. Streams process data in chunks, significantly reducing memory usage. All streams in Node.js inherit from the [`EventEmitter`][] class, allowing them to emit events at various stages of data processing. These streams can be readable, writable, or both, providing flexibility for different data handling scenarios. + +### Event-Driven Architecture + +Node.js thrives on an event-driven architecture, making it ideal for real-time I/O. This means consuming input as soon as it's available and sending output as soon as the application generates it. Streams seamlessly integrate with this approach, enabling continuous data processing. + +They achieve this by emitting events at key stages. These events include signals for received data ([`data`][] event) and the stream's completion ([`end`][] event). Developers can listen to these events and execute custom logic accordingly. This event-driven nature makes streams highly efficient for the processing of data from external sources. + +## Why use Streams? + +Streams provide three key advantages over other data-handling methods: + +- **Memory Efficiency**: Streams process data incrementally, consuming and processing data in chunks rather than loading the entire dataset into memory. This is a major advantage when dealing with large datasets, as it significantly reduces memory usage and prevents memory-related performance issues. +- **Improved response time**: Streams allow for immediate data processing. When a chunk of data arrives, it can be processed without waiting for the entire payload or dataset to be received. This reduces latency and improves your application's overall responsiveness. +- **Scalability for real-time processing**: By handling data in chunks, Node.js streams can efficiently handle large amounts of data with limited resources. This scalability makes streams ideal for applications that process high volumes of data in real time. + +These advantages make streams a powerful tool for building high-performance, scalable Node.js applications, particularly when working with large datasets or real-time data processing. + +### Note on performance + +If your application already has all the data readily available in memory, using streams might add unnecessary overhead, complexity and slow down your application. + +## Stream history + +This section is just a reference of the history of streams in Node.js. Unless you’re working with a codebase written for a Node.js version prior to 0.11.5 (2013), you will rarely encounter older versions of the streams API, but the terms might still be in use. + +### Streams 0 + +The first version of streams was released at the same time as Node.js. Although there wasn't a Stream class yet, different modules used the concept and implemented the `read`/`write` functions. `util.pump()` function was available to control the flow of data between streams. + +### Streams 1 (Classic) + +With the release of Node v0.4.0 in 2011 the Stream class was introduced as well as the `pipe()` method. + +### Streams 2 + +In 2012, with the release of Node v0.10.0, Streams 2 were unveiled. This update brought new stream subclasses, including Readable, Writable, Duplex, and Transform. Additionally, the `readable` event was added. To maintain backwards compatibility, streams could be switched to the old mode by adding a `data` event listener or calling `pause()` or `resume()` methods. + +### Streams 3 + +In 2013, Streams 3 were released with Node v0.11.5, to address the problem of a stream having both a `data` and `readable` event handlers. This removed the need to choose between 'current' and 'old' modes. Streams3 is the current version of streams in Node.js. + +## Stream types + +### Readable + +[`Readable`][] is the class that we use to sequentially read a source of data. Typical examples of `Readable` streams in Node.js API are [`fs.ReadStream` ][] when reading files, [`http.IncomingMessage` ][] when reading HTTP requests, and [`process.stdin` ][] when reading from the standard input + +#### Key Methods and Events + +A readable stream operates with several core methods and events that allow fine control over data handling: + +- **[`on('data')`][]**: This event is triggered whenever data is available from the stream. It is very fast, as the stream pushes data as quickly as it can handle, making it suitable for high-throughput scenarios. +- **[`on('end')`][]**: Emitted when there is no more data to read from the stream. It signifies the completion of data delivery. This event is only fired when all the data from the stream has been consumed. +- **[`on('readable')`][]**: This event is triggered when there is data available to read from the stream or when the end of the stream has been reached. It allows for more controlled data reading when needed. +- **[`on('close')`][]**: This event is emitted when the stream and its underlying resources have been closed and indicates no more events will be emitted. +- **[`on('error')`][]**: This event can be emitted at any point signalling that there was an error processing. A handler of this event can be used to avoid uncaught exceptions + +A demonstration of the use of this events can be seen in the following sections + +#### Basic Readable Stream + +Here's an example of a simple readable stream implementation that generates data dynamically: + +```cjs +const { Readable } = require('node:stream'); + +class MyStream extends Readable { + #count = 0; + _read(size) { + this.push(':-)'); + if (++this.#count === 5) { + this.push(null); + } + } +} + +const stream = new MyStream(); + +stream.on('data', chunk => { + console.log(chunk.toString()); +}); +``` + +In this code, the `MyStream` class extends Readable and overrides the [`_read`][] method to push a string ":-)" to the internal buffer. After pushing the string five times, it signals the end of the stream by pushing `null`. The [`on('data')`][] event handler logs each chunk to the console as it is received. + +#### Advanced Control with the readable Event + +For even finer control over data flow, the readable event can be used. This event is more complex but provides better performance for certain applications by allowing explicit control over when data is read from the stream: + +```js +const stream = new MyStream({ + highWaterMark: 1, +}); + +stream.on('readable', () => { + console.count('>> readable event'); + let chunk; + while ((chunk = stream.read()) !== null) { + console.log(chunk.toString()); // Process the chunk + } +}); +stream.on('end', () => console.log('>> end event')); +``` + +Here, the readable event is used to pull data from the stream as needed manually. The loop inside the readable event handler continues to read data from the stream buffer until it returns `null`, indicating that the buffer is temporarily empty or the stream has ended. Setting `highWaterMark` to 1 keeps the buffer size small, triggering the readable event more frequently and allowing more granular control over data flow. + +With the previous code you’ll get an output like + +```bash +>> readable event: 1 +:-):-) +:-) +:-) +:-) +>> readable event: 2 +>> readable event: 3 +>> readable event: 4 +>> end event +``` + +Let’s try to digest that. When we attach the `on('readable')` event it does a first call to `read()` because that is what might triggers the emission of a `readable` event. After the emission of said event we call `read` on the first iteration of the `while` loop, that’s why we get the two first smileys in one row. After that we keep calling `read` until `null` is pushed. Each call to read programs the emission of a new `readable` event, but as we are in “flow” mode (i.e. using the `readable` event) the emission is programmed on `nextTick` that’s why we get them all at the end, when the synchronous code of the loop is finished. + +NOTE: You can try to run the code with `NODE_DEBUG=stream` to see the that `emitReadable` is triggered after each `push`. + +If we want to see readable events called before each smiley we can wrap `push` into a `setImmediate` or `process.nextTick` like this: + +```js +class MyStream extends Readable { + #count = 0; + _read(size) { + setImmediate(() => { + this.push(':-)'); + if (++this.#count === 5) { + return this.push(null); + } + }); + } +} +``` + +And we’ll get: + +```bash +>> readable event: 1 +:-) +>> readable event: 2 +:-) +>> readable event: 3 +:-) +>> readable event: 4 +:-) +>> readable event: 5 +:-) +>> readable event: 6 +>> end event +``` + +### Writable + +[`Writable`][] streams are useful for creating files, uploading data, or any task that involves sequentially outputting data. While readable streams provide the source of data, writable streams in Node.js act as the destination for your data. Typical examples of writable streams in Node.js API are [`fs.WriteStream` ][], [`process.stdout` ][] and [`process.stderr` ][] + +#### Key Methods and Events in Writable Streams + +- **[`.write()`][]**: This method is used to write a chunk of data to the stream. It handles the data by buffering it up to a defined limit (highWaterMark), and returns a boolean indicating whether more data can be written immediately. +- **[`.end()`][]**: This method signals the end of the data writing process. It signals the stream to complete the write operation and potentially perform any necessary cleanup. + +#### Creating a Writable + +Here's an example of creating a writable stream that converts all incoming data to uppercase before writing it to the standard output: + +```mjs +import { Writable } from 'node:stream'; +import { once } from 'node:events'; + +class MyStream extends Writable { + constructor() { + super({ highWaterMark: 10 /* 10 bytes */ }); + } + _write(data, encode, cb) { + process.stdout.write(data.toString().toUpperCase() + '\n', cb); + } +} +const stream = new MyStream(); + +for (let i = 0; i < 10; i++) { + const waitDrain = !stream.write('hello'); + + if (waitDrain) { + console.log('>> wait drain'); + await once(stream, 'drain'); + } +} + +stream.end('world'); +``` + +In this code, `MyStream` is a custom [`Writable`][] stream with a buffer capacity ([`highWaterMark`][]) of 10 bytes. It overrides the [`_write`][] method to convert data to uppercase before writing it out. + +The loop attempts to write hello ten times to the stream. If the buffer fills up (`waitDrain` becomes `true`), it waits for a [`drain`][] event before continuing, ensuring we do not overwhelm the stream's buffer. + +The output will be: + +```bash +HELLO +>> wait drain +HELLO +HELLO +>> wait drain +HELLO +HELLO +>> wait drain +HELLO +HELLO +>> wait drain +HELLO +HELLO +>> wait drain +HELLO +WORLD +``` + +### Duplex + +[`Duplex`][] streams implement both the readable and writable interfaces. + +#### Key Methods and Events in Duplex Streams + +Duplex streams implement all the methods and events described in Readable and Writable Streams. + +A good example of duplex stream is the `Socket` class in the `net` module + +```cjs +const net = require('node:net'); + +// Create a TCP server +const server = net.createServer(socket => { + socket.write('Hello from server!\n'); + + socket.on('data', data => { + console.log(`Client says: ${data.toString()}`); + }); + + // Handle client disconnection + socket.on('end', () => { + console.log('Client disconnected'); + }); +}); + +// Start the server on port 8080 +server.listen(8080, () => { + console.log('Server listening on port 8080'); +}); +``` + +The previous code will open a TCP socket in port 8080 and send `Hello from server!` to any connecting client and log any data received + +```cjs +const net = require('node:net'); + +// Connect to the server at localhost:8080 +const client = net.createConnection({ port: 8080 }, () => { + client.write('Hello from client!\n'); +}); + +client.on('data', data => { + console.log(`Server says: ${data.toString()}`); +}); + +// Handle the server closing the connection +client.on('end', () => { + console.log('Disconnected from server'); +}); +``` + +The previous code will connect to the TCP socket and send a `Hello from client` and log any received data. + +### Transform + +[`Transform`][] streams are duplex streams where the output is computed based on the input. As the name suggests, they are usually used between a readable and a writable stream to transform the data as it passes through. + +#### Key Methods and Events in Transform Streams + +Apart from all the methods and events in Duplex Streams there is + +- **[`_transform`][]**: This function is called internally to handle the flow of data between the readable and the writable parts. This MUST NOT be called by application code. + +#### Creating a Transform Stream + +To create a new transform stream we can pass an `options` object to the `Transform` constructor including a `transform` function which handles how the output data is computed from the input data using the `push` method. + +```cjs +const { Transform } = require('node:stream'); + +const upper = new Transform({ + transform: function (data, enc, cb) { + this.push(data.toString().toUpperCase()); + cb(); + }, +}); +``` + +This stream will take any input and output it in uppercase. + +## How to operate with streams + +When working with streams, we usually want to perform read from a source and write to a destination, possibly needing some transformation of the data in between. The following sections will cover different ways to do so. + +### .pipe() + +The [`.pipe()`][] method concatenates one readable stream to a writable (or transform) stream. Although this seems a simple way to achieve our goal, it delegates all error handling to the programmer making it difficult to get it right. + +The following example shows a pipe trying to output the current file in upper case to the console. + +```cjs +const fs = require('node:fs'); +const { Transform } = require('node:stream'); + +let errorCount = 0; +const upper = new Transform({ + transform: function (data, enc, cb) { + if (errorCount === 10) { + return cb(new Error('BOOM!')); + } + errorCount++; + this.push(data.toString().toUpperCase()); + cb(); + }, +}); + +const readStream = fs.createReadStream(__filename, { highWaterMark: 1 }); +const writeStream = process.stdout; + +readStream.pipe(upper).pipe(writeStream); + +readStream.on('close', () => { + console.log('Readable stream closed'); +}); + +upper.on('close', () => { + console.log('Transform stream closed'); +}); + +upper.on('error', err => { + console.error('\nError in transform stream:', err.message); +}); + +writeStream.on('close', () => { + console.log('Writable stream closed'); +}); +``` + +After writing 10 characters `upper` will return an error in the callback, which will cause the stream to close. However, the other streams won’t be notified resulting in memory leaks. The output will be: + +```bash +CONST FS = +Error in transform stream: BOOM! +Transform stream closed +``` + +### pipeline() + +To avoid the pitfalls low level complexity of the `.pipe()` method, in most cases it is recommended to use [`pipeline()`][] method. This method is a safer and more robust way to pipe streams together, handling errors and cleanup automatically. + +The following example demonstrates how using`pipeline()`prevents the pitfalls of the previous example: + +```cjs +const fs = require('node:fs'); +const { Transform, pipeline } = require('node:stream'); + +let errorCount = 0; +const upper = new Transform({ + transform: function (data, enc, cb) { + if (errorCount === 10) { + return cb(new Error('BOOM!')); + } + errorCount++; + this.push(data.toString().toUpperCase()); + cb(); + }, +}); + +const readStream = fs.createReadStream(__filename, { highWaterMark: 1 }); +const writeStream = process.stdout; + +readStream.on('close', () => { + console.log('Readable stream closed'); +}); + +upper.on('close', () => { + console.log('\nTransform stream closed'); +}); + +writeStream.on('close', () => { + console.log('Writable stream closed'); +}); + +pipeline(readStream, upper, writeStream, err => { + if (err) { + return console.error('Pipeline error:', err.message); + } + console.log('Pipeline succeeded'); +}); +``` + +In this case all streams will be closed with the following output: + +```bash +CONST FS = +Transform stream closed +Writable stream closed +Pipeline error: BOOM! +Readable stream closed +``` + +The [`pipeline()`][] method also has an [`async pipeline()`][] version which doesn’t accept a callback but instead returns a promise which is rejected if the pipeline fails. + +### Async Iterators + +Async Iterators are recommended as the standard way of interfacing with the Streams API. Compared to all the streams primitives on the Web and Node.js, Async iterators are easier to understand and to use, contributing to fewer bugs and more maintainable code. In recent versions of Node.js, async iterators have emerged as a more elegant and readable way to interact with streams. Building upon the foundation of events, async iterators provide a higher-level abstraction that simplifies stream consumption. + +In Node.js, all readable streams are asynchronous iterables. This means you can use the for await...of syntax to loop through the stream's data as it becomes available, handling each piece of data with the efficiency and simplicity of asynchronous code. + +#### Benefits of Using Async Iterators with Streams + +Using async iterators with streams simplifies the handling of asynchronous data flows in several ways: + +- **Enhanced Readability**: The code structure is cleaner and more readable, particularly when dealing with multiple asynchronous data sources. +- **Error Handling**: Async iterators allow straightforward error handling using try/catch blocks, akin to regular asynchronous functions. +- **Flow Control**: They inherently manage backpressure, as the consumer controls the flow by awaiting the next piece of data, allowing for more efficient memory usage and processing. + +Async iterators offer a more modern and often more readable way to work with readable streams, especially when dealing with asynchronous data sources or when you prefer a more sequential, loop-based approach to data processing. + +Here's an example demonstrating the use of async iterators with a readable stream: + +```mjs +import fs from 'fs'; +import { pipeline } from 'stream/promises'; + +await pipeline( + fs.createReadStream(import.meta.filename), + async function* (source) { + for await (let chunk of source) { + yield chunk.toString().toUpperCase(); + } + }, + process.stdout +); +``` + +This code achieves the same as the previous examples, without the need to define a new transform stream. The error from previous examples has been removed for the sake of brevity. The async version of pipeline has been used. It should be wrapped in a `try..catch` block to handle possible errors. + +### Object mode + +By default streams can work with strings, [`Buffer`], [`TypedArray`], or [`DataView`], if an arbitrary value different than those (e.g. an object) is pushed into a stream a `TypeError` will be thrown. However, it is possible to work with objects by setting the `objectMode` option to `true`. This allows the stream to work with any JavaScript value, with the exception of `null`, which is used to signal the end of the stream. This means you can `push` and `read` any value in a readable stream and `write` any value in a writable stream. + +```cjs +const { Readable } = require('stream'); + +const readable = Readable({ + objectMode: true, + read() { + this.push({ hello: 'world' }); + this.push(null); + }, +}); +``` + +When working in object mode it is important to remember that the `highWaterMark` option is the number of objects, not bytes. + +### Backpressure + +When using streams it is important to make sure the producer doesn't overwhelm the consumer. For this the backpressure mechanism is used in all streams in the Node.js API and implementors are responsible to keep that behaviour. + +In any scenario where the data buffer has exceeded the [`highWaterMark`](https://nodejs.org/api/stream.html#stream_buffering) or the write queue is currently busy, [`.write()`](https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback) will return `false`. + +When a `false` value is returned, the backpressure system kicks in. It will pause the incoming [`Readable`](https://nodejs.org/api/stream.html#stream_readable_streams) stream from sending any data and wait until the consumer is ready again. Once the data buffer is emptied, a [`'drain'`](https://nodejs.org/api/stream.html#stream_event_drain) event will be emitted and resume the incoming data flow. + +For a deeper understanding of backpressure, check the [`backpressure guide`][]. + +## Streams vs Web streams + +The stream concept is not exclusive to Node.js. In fact Node.js has a different implementation of the stream concept called [`Web Streams`][]. That implements the [`WHATWG Streams Standard`][]. Although the concepts behind them are similar, it is important to be aware that they have different APIs and are not directly compatible. + +[`Web Streams`][] implement [`ReadableStream`][], [`WritableStream`][] and [`TransformStream`][] classes, homologous to Node.js [`Readable`][], [`Writable`][] and [`Transform`][] streams. + +### Interoperability of streams and Web Streams + +Node.js provides utility functions to convert to/from Web Streams and Node.js streams. This functions are implemented as `toWeb` and `fromWeb` methods in each stream class. + +The following example in the [`Duplex`][] class covers how to work with both readable and writable streams converted to Web Streams: + +```cjs +const { Duplex } = require('node:stream'); + +const duplex = Duplex({ + read() { + this.push('world'); + this.push(null); + }, + write(chunk, encoding, callback) { + console.log('writable', chunk); + callback(); + }, +}); + +const { readable, writable } = Duplex.toWeb(duplex); +writable.getWriter().write('hello'); + +readable + .getReader() + .read() + .then(result => { + console.log('readable', result.value); + }); +``` + +The helper functions are useful if you need to return a Web Stream from a Node.js module or vice versa. For regular consumption of streams, async iterators enable seamless interaction with both Node.js and Web Streams. + +```mjs +import { pipeline } from 'node:stream/promises'; + +const { body } = await fetch('https://nodejs.org/api/stream.html'); + +await pipeline( + body, + new TextDecoderStream(), + async function* (source) { + for await (const chunk of source) { + yield chunk.toString().toUpperCase(); + } + }, + process.stdout +); +``` + +Be aware that fetch body is a `ReadableStream` and therefore a [`TextDecoderStream`][] is needed to work with chunks as strings. + +[`Stream`]: https://nodejs.org/api/stream.html +[`Buffer`]: https://nodejs.org/api/buffer.html +[`TypedArray`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/TypedArray +[`DataView`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/DataView +[`TextDecoderStream`]: https://developer.mozilla.org/en-US/docs/Web/API/TextDecoderStream +[`EventEmitter`]: https://nodejs.org/api/events.html#class-eventemitter +[`Writable`]: https://nodejs.org/api/stream.html#stream_writable_streams +[`Readable`]: https://nodejs.org/api/stream.html#stream_readable_streams +[`Duplex`]: https://nodejs.org/api/stream.html#stream_duplex_and_transform_streams +[`Transform`]: https://nodejs.org/api/stream.html#stream_duplex_and_transform_streams +[`drain`]: https://nodejs.org/api/stream.html#stream_event_drain +[`on('data')`]: https://nodejs.org/api/stream.html#stream_event_data +[`data`]: https://nodejs.org/api/stream.html#stream_event_data +[`on('end')`]: https://nodejs.org/api/stream.html#event-end +[`end`]: https://nodejs.org/api/stream.html#event-end +[`on('readable')`]: https://nodejs.org/api/stream.html#event-readable +[`on('close')`]: https://nodejs.org/api/stream.html#event-close_1 +[`on('error')`]: https://nodejs.org/api/stream.html#event-error_1 +[`.read()`]: https://nodejs.org/docs/latest/api/stream.html#stream_readable_read_size +[`.write()`]: https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback +[`_write`]: https://nodejs.org/api/stream.html#writable_writechunk-encoding-callback +[`.end()`]: https://nodejs.org/api/stream.html#writableendchunk-encoding-callback +[`_transform`]: https://nodejs.org/api/stream.html#transform_transformchunk-encoding-callback +[`Readable.from()`]: https://nodejs.org/api/stream.html#streamreadablefromiterable-options +[`highWaterMark`]: https://nodejs.org/api/stream.html#stream_buffering +[`.pipe()`]: https://nodejs.org/docs/latest/api/stream.html#stream_readable_pipe_destination_options +[`pipeline()`]: https://nodejs.org/api/stream.html#stream_stream_pipeline_streams_callback +[`async pipeline()`]: https://nodejs.org/api/stream.html#streampipelinesource-transforms-destination-options +[`Web Streams`]: https://nodejs.org/api/webstreams.html +[`ReadableStream`]: https://nodejs.org/api/webstreams.html#class-readablestream +[`WritableStream`]: https://nodejs.org/api/webstreams.html#class-writablestream +[`TransformStream`]: https://nodejs.org/api/webstreams.html#class-transformstream +[`WHATWG Streams Standard`]: https://streams.spec.whatwg.org/ +[`backpressure guide`]: /learn/modules/backpressuring-in-streams +[`fs.readStream`]: https://nodejs.org/api/fs.html#class-fsreadstream +[`http.IncomingMessage`]: https://nodejs.org/api/http.html#class-httpincomingmessage +[`process.stdin`]: https://nodejs.org/api/process.html#processstdin +[`fs.WriteStream`]: https://nodejs.org/api/fs.html#class-fswritestream +[`process.stdout`]: https://nodejs.org/api/process.html#processstdout +[`process.stderr`]: https://nodejs.org/api/process.html#processstderr diff --git a/package.json b/package.json index 91571d15bad8d..ddbd5e98a5ef6 100644 --- a/package.json +++ b/package.json @@ -30,7 +30,7 @@ "lint": "turbo run lint", "lint:fix": "turbo run lint --force", "prettier": "prettier \"**/*.{js,mjs,ts,tsx,md,mdx,json,yml,css}\" --check --cache --cache-strategy=content --cache-location=.prettiercache", - "prettier:fix": "npm run prettier --write", + "prettier:fix": "npm run prettier -- --write", "format": "npm run lint:fix && npm run prettier:fix", "test": "turbo test:unit", "prepare": "husky" diff --git a/packages/i18n/locales/en.json b/packages/i18n/locales/en.json index 8b20d7964f6ef..0cd35640ce620 100644 --- a/packages/i18n/locales/en.json +++ b/packages/i18n/locales/en.json @@ -84,7 +84,8 @@ "publishingNodeApiModules": "How to publish a Node-API package", "anatomyOfAnHttpTransaction": "Anatomy of an HTTP Transaction", "abiStability": "ABI Stability", - "backpressuringInStreams": "Backpressuring in Streams" + "backpressuringInStreams": "Backpressuring in Streams", + "howToUseStreams": "How to use streams" } }, "diagnostics": { From d13ac49a53a4c51d51dec549e74bd7110cd5aed2 Mon Sep 17 00:00:00 2001 From: Carlos Espa <43477095+Ceres6@users.noreply.github.com> Date: Thu, 14 Nov 2024 07:45:30 +0000 Subject: [PATCH 2/5] Apply suggestions from code review Add authors and original content attribution Signed-off-by: Carlos Espa <43477095+Ceres6@users.noreply.github.com> --- apps/site/pages/en/learn/modules/how-to-use-streams.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/apps/site/pages/en/learn/modules/how-to-use-streams.md b/apps/site/pages/en/learn/modules/how-to-use-streams.md index 9aa583b972738..6b220204216bb 100644 --- a/apps/site/pages/en/learn/modules/how-to-use-streams.md +++ b/apps/site/pages/en/learn/modules/how-to-use-streams.md @@ -1,6 +1,7 @@ --- title: How to use Streams layout: learn +authors: mcollina, ceres6, simoneb, codyzu --- # How To Use Streams @@ -553,6 +554,7 @@ await pipeline( Be aware that fetch body is a `ReadableStream` and therefore a [`TextDecoderStream`][] is needed to work with chunks as strings. +This work is derived from the content published by [Matteo Collina][] in [Platformatic's Blog][] [`Stream`]: https://nodejs.org/api/stream.html [`Buffer`]: https://nodejs.org/api/buffer.html [`TypedArray`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/TypedArray @@ -593,3 +595,5 @@ Be aware that fetch body is a `ReadableStream` and therefore a [`Tex [`fs.WriteStream`]: https://nodejs.org/api/fs.html#class-fswritestream [`process.stdout`]: https://nodejs.org/api/process.html#processstdout [`process.stderr`]: https://nodejs.org/api/process.html#processstderr +[Matteo Collina]: https://github.com/mcollina +[Platformatic's Blog]: https://blog.platformatic.dev/a-guide-to-reading-and-writing-nodejs-streams From 8f76b2a86d3b50dbc98c7549f53ea19d73515b37 Mon Sep 17 00:00:00 2001 From: Carlos Espa <43477095+Ceres6@users.noreply.github.com> Date: Sun, 17 Nov 2024 10:27:41 +0100 Subject: [PATCH 3/5] Apply suggestions from code review Co-authored-by: Aviv Keller Signed-off-by: Carlos Espa <43477095+Ceres6@users.noreply.github.com> --- .../en/learn/modules/how-to-use-streams.md | 107 +++++++++--------- 1 file changed, 54 insertions(+), 53 deletions(-) diff --git a/apps/site/pages/en/learn/modules/how-to-use-streams.md b/apps/site/pages/en/learn/modules/how-to-use-streams.md index 6b220204216bb..ca64901086058 100644 --- a/apps/site/pages/en/learn/modules/how-to-use-streams.md +++ b/apps/site/pages/en/learn/modules/how-to-use-streams.md @@ -6,17 +6,17 @@ authors: mcollina, ceres6, simoneb, codyzu # How To Use Streams -Working with large amounts of data in Node.js applications can be a double-edged sword. The ability to handle massive amounts of data is extremely handy, but can also lead to performance bottlenecks and memory exhaustion. Traditionally, developers tackled this challenge by reading the entire dataset into memory at once. This approach, while intuitive for smaller datasets, becomes inefficient and resource-intensive for large data (e.g. files, network requests…). +Working with large amounts of data in Node.js applications can be a double-edged sword. The ability to handle massive amounts of data is extremely handy but can also lead to performance bottlenecks and memory exhaustion. Traditionally, developers tackled this challenge by reading the entire dataset into memory at once. This approach, while intuitive for smaller datasets, becomes inefficient and resource-intensive for large data (e.g., files, network requests…). -This is where Node.js streams come in. Streams offer a fundamentally different approach, allowing you to process data incrementally and optimize memory usage. By handling data in manageable chunks, streams empower you to build scalable applications that can efficiently tackle even the most daunting datasets. As popularly quoted, “streams are arrays over time”. +This is where Node.js streams come in. Streams offer a fundamentally different approach, allowing you to process data incrementally and optimize memory usage. By handling data in manageable chunks, streams empower you to build scalable applications that can efficiently tackle even the most daunting datasets. As popularly quoted, “streams are arrays over time.” -In this guide we give an overview of the Stream concept, history and API as well as some recommendations on how to use and operate them. +In this guide, we give an overview of the Stream concept, history, and API as well as some recommendations on how to use and operate them. ## What are Node.js Streams? -Node.js streams offer a powerful abstraction for managing data flow in your applications. They excel at processing large datasets like reading or writing from files and network requests without compromising performance. +Node.js streams offer a powerful abstraction for managing data flow in your applications. They excel at processing large datasets, such as reading or writing from files and network requests, without compromising performance. -This approach differs from loading the entire dataset into memory at once. Streams process data in chunks, significantly reducing memory usage. All streams in Node.js inherit from the [`EventEmitter`][] class, allowing them to emit events at various stages of data processing. These streams can be readable, writable, or both, providing flexibility for different data handling scenarios. +This approach differs from loading the entire dataset into memory at once. Streams process data in chunks, significantly reducing memory usage. All streams in Node.js inherit from the [`EventEmitter`][] class, allowing them to emit events at various stages of data processing. These streams can be readable, writable, or both, providing flexibility for different data-handling scenarios. ### Event-Driven Architecture @@ -29,26 +29,26 @@ They achieve this by emitting events at key stages. These events include signals Streams provide three key advantages over other data-handling methods: - **Memory Efficiency**: Streams process data incrementally, consuming and processing data in chunks rather than loading the entire dataset into memory. This is a major advantage when dealing with large datasets, as it significantly reduces memory usage and prevents memory-related performance issues. -- **Improved response time**: Streams allow for immediate data processing. When a chunk of data arrives, it can be processed without waiting for the entire payload or dataset to be received. This reduces latency and improves your application's overall responsiveness. -- **Scalability for real-time processing**: By handling data in chunks, Node.js streams can efficiently handle large amounts of data with limited resources. This scalability makes streams ideal for applications that process high volumes of data in real time. +- **Improved Response Time**: Streams allow for immediate data processing. When a chunk of data arrives, it can be processed without waiting for the entire payload or dataset to be received. This reduces latency and improves your application's overall responsiveness. +- **Scalability for Real-Time Processing**: By handling data in chunks, Node.js streams can efficiently handle large amounts of data with limited resources. This scalability makes streams ideal for applications that process high volumes of data in real time. These advantages make streams a powerful tool for building high-performance, scalable Node.js applications, particularly when working with large datasets or real-time data processing. ### Note on performance -If your application already has all the data readily available in memory, using streams might add unnecessary overhead, complexity and slow down your application. +If your application already has all the data readily available in memory, using streams might add unnecessary overhead, complexity, and slow down your application. ## Stream history -This section is just a reference of the history of streams in Node.js. Unless you’re working with a codebase written for a Node.js version prior to 0.11.5 (2013), you will rarely encounter older versions of the streams API, but the terms might still be in use. +This section is a reference of the history of streams in Node.js. Unless you’re working with a codebase written for a Node.js version prior to 0.11.5 (2013), you will rarely encounter older versions of the streams API, but the terms might still be in use. ### Streams 0 -The first version of streams was released at the same time as Node.js. Although there wasn't a Stream class yet, different modules used the concept and implemented the `read`/`write` functions. `util.pump()` function was available to control the flow of data between streams. +The first version of streams was released at the same time as Node.js. Although there wasn't a Stream class yet, different modules used the concept and implemented the `read`/`write` functions. The `util.pump()` function was available to control the flow of data between streams. ### Streams 1 (Classic) -With the release of Node v0.4.0 in 2011 the Stream class was introduced as well as the `pipe()` method. +With the release of Node v0.4.0 in 2011, the Stream class was introduced, as well as the `pipe()` method. ### Streams 2 @@ -56,13 +56,13 @@ In 2012, with the release of Node v0.10.0, Streams 2 were unveiled. This update ### Streams 3 -In 2013, Streams 3 were released with Node v0.11.5, to address the problem of a stream having both a `data` and `readable` event handlers. This removed the need to choose between 'current' and 'old' modes. Streams3 is the current version of streams in Node.js. +In 2013, Streams 3 were released with Node v0.11.5, to address the problem of a stream having both a `data` and `readable` event handlers. This removed the need to choose between 'current' and 'old' modes. Streams 3 is the current version of streams in Node.js. ## Stream types ### Readable -[`Readable`][] is the class that we use to sequentially read a source of data. Typical examples of `Readable` streams in Node.js API are [`fs.ReadStream` ][] when reading files, [`http.IncomingMessage` ][] when reading HTTP requests, and [`process.stdin` ][] when reading from the standard input +[`Readable`][] is the class that we use to sequentially read a source of data. Typical examples of `Readable` streams in Node.js API are [`fs.ReadStream` ][] when reading files, [`http.IncomingMessage` ][] when reading HTTP requests, and [`process.stdin` ][] when reading from the standard input. #### Key Methods and Events @@ -71,10 +71,10 @@ A readable stream operates with several core methods and events that allow fine - **[`on('data')`][]**: This event is triggered whenever data is available from the stream. It is very fast, as the stream pushes data as quickly as it can handle, making it suitable for high-throughput scenarios. - **[`on('end')`][]**: Emitted when there is no more data to read from the stream. It signifies the completion of data delivery. This event is only fired when all the data from the stream has been consumed. - **[`on('readable')`][]**: This event is triggered when there is data available to read from the stream or when the end of the stream has been reached. It allows for more controlled data reading when needed. -- **[`on('close')`][]**: This event is emitted when the stream and its underlying resources have been closed and indicates no more events will be emitted. -- **[`on('error')`][]**: This event can be emitted at any point signalling that there was an error processing. A handler of this event can be used to avoid uncaught exceptions +- **[`on('close')`][]**: This event is emitted when the stream and its underlying resources have been closed and indicates that no more events will be emitted. +- **[`on('error')`][]**: This event can be emitted at any point, signaling that there was an error processing. A handler for this event can be used to avoid uncaught exceptions. -A demonstration of the use of this events can be seen in the following sections +A demonstration of the use of these events can be seen in the following sections. #### Basic Readable Stream @@ -121,9 +121,9 @@ stream.on('readable', () => { stream.on('end', () => console.log('>> end event')); ``` -Here, the readable event is used to pull data from the stream as needed manually. The loop inside the readable event handler continues to read data from the stream buffer until it returns `null`, indicating that the buffer is temporarily empty or the stream has ended. Setting `highWaterMark` to 1 keeps the buffer size small, triggering the readable event more frequently and allowing more granular control over data flow. +Here, the readable event is used to pull data from the stream as needed manually. The loop inside the readable event handler continues to read data from the stream buffer until it returns `null`, indicating that the buffer is temporarily empty or the stream has ended. Setting `highWaterMark` to 1 keeps the buffer size small, triggering the readable event more frequently and allowing more granular control over the data flow. -With the previous code you’ll get an output like +With the previous code, you’ll get an output like ```bash >> readable event: 1 @@ -137,11 +137,11 @@ With the previous code you’ll get an output like >> end event ``` -Let’s try to digest that. When we attach the `on('readable')` event it does a first call to `read()` because that is what might triggers the emission of a `readable` event. After the emission of said event we call `read` on the first iteration of the `while` loop, that’s why we get the two first smileys in one row. After that we keep calling `read` until `null` is pushed. Each call to read programs the emission of a new `readable` event, but as we are in “flow” mode (i.e. using the `readable` event) the emission is programmed on `nextTick` that’s why we get them all at the end, when the synchronous code of the loop is finished. +Let’s try to digest that. When we attach the `on('readable')` event, it makes a first call to `read()` because that is what might trigger the emission of a `readable` event. After the emission of said event, we call `read` on the first iteration of the `while` loop. That’s why we get the first two smileys in one row. After that, we keep calling `read` until `null` is pushed. Each call to `read` programs the emission of a new `readable` event, but as we are in “flow” mode (i.e., using the `readable` event), the emission is scheduled for the `nextTick`. That’s why we get them all at the end, when the synchronous code of the loop is finished. -NOTE: You can try to run the code with `NODE_DEBUG=stream` to see the that `emitReadable` is triggered after each `push`. +NOTE: You can try to run the code with `NODE_DEBUG=stream` to see that `emitReadable` is triggered after each `push`. -If we want to see readable events called before each smiley we can wrap `push` into a `setImmediate` or `process.nextTick` like this: +If we want to see readable events called before each smiley, we can wrap `push` into a `setImmediate` or `process.nextTick` like this: ```js class MyStream extends Readable { @@ -176,7 +176,7 @@ And we’ll get: ### Writable -[`Writable`][] streams are useful for creating files, uploading data, or any task that involves sequentially outputting data. While readable streams provide the source of data, writable streams in Node.js act as the destination for your data. Typical examples of writable streams in Node.js API are [`fs.WriteStream` ][], [`process.stdout` ][] and [`process.stderr` ][] +[`Writable`][] streams are useful for creating files, uploading data, or any task that involves sequentially outputting data. While readable streams provide the source of data, writable streams in Node.js act as the destination for your data. Typical examples of writable streams in the Node.js API are [`fs.WriteStream` ][], [`process.stdout` ][], and [`process.stderr` ][]. #### Key Methods and Events in Writable Streams @@ -246,7 +246,7 @@ WORLD Duplex streams implement all the methods and events described in Readable and Writable Streams. -A good example of duplex stream is the `Socket` class in the `net` module +A good example of a duplex stream is the `Socket` class in the `net` module: ```cjs const net = require('node:net'); @@ -271,7 +271,7 @@ server.listen(8080, () => { }); ``` -The previous code will open a TCP socket in port 8080 and send `Hello from server!` to any connecting client and log any data received +The previous code will open a TCP socket on port 8080, send `Hello from server!` to any connecting client, and log any data received. ```cjs const net = require('node:net'); @@ -291,21 +291,21 @@ client.on('end', () => { }); ``` -The previous code will connect to the TCP socket and send a `Hello from client` and log any received data. +The previous code will connect to the TCP socket, send a `Hello from client` message, and log any received data. ### Transform -[`Transform`][] streams are duplex streams where the output is computed based on the input. As the name suggests, they are usually used between a readable and a writable stream to transform the data as it passes through. +[`Transform`][] streams are duplex streams, where the output is computed based on the input. As the name suggests, they are usually used between a readable and a writable stream to transform the data as it passes through. #### Key Methods and Events in Transform Streams -Apart from all the methods and events in Duplex Streams there is +Apart from all the methods and events in Duplex Streams, there is: -- **[`_transform`][]**: This function is called internally to handle the flow of data between the readable and the writable parts. This MUST NOT be called by application code. +- **[`_transform`][]**: This function is called internally to handle the flow of data between the readable and writable parts. This MUST NOT be called by application code. #### Creating a Transform Stream -To create a new transform stream we can pass an `options` object to the `Transform` constructor including a `transform` function which handles how the output data is computed from the input data using the `push` method. +To create a new transform stream, we can pass an `options` object to the `Transform` constructor, including a `transform` function that handles how the output data is computed from the input data using the `push` method. ```cjs const { Transform } = require('node:stream'); @@ -322,13 +322,13 @@ This stream will take any input and output it in uppercase. ## How to operate with streams -When working with streams, we usually want to perform read from a source and write to a destination, possibly needing some transformation of the data in between. The following sections will cover different ways to do so. +When working with streams, we usually want to read from a source and write to a destination, possibly needing some transformation of the data in between. The following sections will cover different ways to do so. -### .pipe() +### `.pipe()` -The [`.pipe()`][] method concatenates one readable stream to a writable (or transform) stream. Although this seems a simple way to achieve our goal, it delegates all error handling to the programmer making it difficult to get it right. +The [`.pipe()`][] method concatenates one readable stream to a writable (or transform) stream. Although this seems like a simple way to achieve our goal, it delegates all error handling to the programmer, making it difficult to get it right. -The following example shows a pipe trying to output the current file in upper case to the console. +The following example shows a pipe trying to output the current file in uppercase to the console. ```cjs const fs = require('node:fs'); @@ -368,7 +368,7 @@ writeStream.on('close', () => { }); ``` -After writing 10 characters `upper` will return an error in the callback, which will cause the stream to close. However, the other streams won’t be notified resulting in memory leaks. The output will be: +After writing 10 characters, `upper` will return an error in the callback, which will cause the stream to close. However, the other streams won’t be notified, resulting in memory leaks. The output will be: ```bash CONST FS = @@ -376,11 +376,11 @@ Error in transform stream: BOOM! Transform stream closed ``` -### pipeline() +### `pipeline()` -To avoid the pitfalls low level complexity of the `.pipe()` method, in most cases it is recommended to use [`pipeline()`][] method. This method is a safer and more robust way to pipe streams together, handling errors and cleanup automatically. +To avoid the pitfalls and low-level complexity of the `.pipe()` method, in most cases, it is recommended to use the [`pipeline()`][] method. This method is a safer and more robust way to pipe streams together, handling errors and cleanup automatically. -The following example demonstrates how using`pipeline()`prevents the pitfalls of the previous example: +The following example demonstrates how using `pipeline()` prevents the pitfalls of the previous example: ```cjs const fs = require('node:fs'); @@ -421,7 +421,7 @@ pipeline(readStream, upper, writeStream, err => { }); ``` -In this case all streams will be closed with the following output: +In this case, all streams will be closed with the following output: ```bash CONST FS = @@ -431,13 +431,13 @@ Pipeline error: BOOM! Readable stream closed ``` -The [`pipeline()`][] method also has an [`async pipeline()`][] version which doesn’t accept a callback but instead returns a promise which is rejected if the pipeline fails. +The [`pipeline()`][] method also has an [`async pipeline()`][] version, which doesn’t accept a callback but instead returns a promise that is rejected if the pipeline fails. ### Async Iterators -Async Iterators are recommended as the standard way of interfacing with the Streams API. Compared to all the streams primitives on the Web and Node.js, Async iterators are easier to understand and to use, contributing to fewer bugs and more maintainable code. In recent versions of Node.js, async iterators have emerged as a more elegant and readable way to interact with streams. Building upon the foundation of events, async iterators provide a higher-level abstraction that simplifies stream consumption. +Async iterators are recommended as the standard way of interfacing with the Streams API. Compared to all the stream primitives in both the Web and Node.js, async iterators are easier to understand and use, contributing to fewer bugs and more maintainable code. In recent versions of Node.js, async iterators have emerged as a more elegant and readable way to interact with streams. Building upon the foundation of events, async iterators provide a higher-level abstraction that simplifies stream consumption. -In Node.js, all readable streams are asynchronous iterables. This means you can use the for await...of syntax to loop through the stream's data as it becomes available, handling each piece of data with the efficiency and simplicity of asynchronous code. +In Node.js, all readable streams are asynchronous iterables. This means you can use the `for await...of` syntax to loop through the stream's data as it becomes available, handling each piece of data with the efficiency and simplicity of asynchronous code. #### Benefits of Using Async Iterators with Streams @@ -466,11 +466,11 @@ await pipeline( ); ``` -This code achieves the same as the previous examples, without the need to define a new transform stream. The error from previous examples has been removed for the sake of brevity. The async version of pipeline has been used. It should be wrapped in a `try..catch` block to handle possible errors. +This code achieves the same result as the previous examples, without the need to define a new transform stream. The error from the previous examples has been removed for the sake of brevity. The async version of the pipeline has been used, and it should be wrapped in a `try...catch` block to handle possible errors. ### Object mode -By default streams can work with strings, [`Buffer`], [`TypedArray`], or [`DataView`], if an arbitrary value different than those (e.g. an object) is pushed into a stream a `TypeError` will be thrown. However, it is possible to work with objects by setting the `objectMode` option to `true`. This allows the stream to work with any JavaScript value, with the exception of `null`, which is used to signal the end of the stream. This means you can `push` and `read` any value in a readable stream and `write` any value in a writable stream. +By default, streams can work with strings, [`Buffer`][], [`TypedArray`][], or [`DataView`][]. If an arbitrary value different from these (e.g., an object) is pushed into a stream, a `TypeError` will be thrown. However, it is possible to work with objects by setting the `objectMode` option to `true`. This allows the stream to work with any JavaScript value, except for `null`, which is used to signal the end of the stream. This means you can `push` and `read` any value in a readable stream, and `write` any value in a writable stream. ```cjs const { Readable } = require('stream'); @@ -484,29 +484,29 @@ const readable = Readable({ }); ``` -When working in object mode it is important to remember that the `highWaterMark` option is the number of objects, not bytes. +When working in object mode, it is important to remember that the `highWaterMark` option refers to the number of objects, not bytes. ### Backpressure -When using streams it is important to make sure the producer doesn't overwhelm the consumer. For this the backpressure mechanism is used in all streams in the Node.js API and implementors are responsible to keep that behaviour. +When using streams, it is important to make sure the producer doesn't overwhelm the consumer. For this, the backpressure mechanism is used in all streams in the Node.js API, and implementors are responsible for maintaining that behavior. -In any scenario where the data buffer has exceeded the [`highWaterMark`](https://nodejs.org/api/stream.html#stream_buffering) or the write queue is currently busy, [`.write()`](https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback) will return `false`. +In any scenario where the data buffer has exceeded the [`highWaterMark`](https://nodejs.org/api/stream.html#stream_buffering) or the write queue is currently busy, [`write()`](https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback) will return `false`. -When a `false` value is returned, the backpressure system kicks in. It will pause the incoming [`Readable`](https://nodejs.org/api/stream.html#stream_readable_streams) stream from sending any data and wait until the consumer is ready again. Once the data buffer is emptied, a [`'drain'`](https://nodejs.org/api/stream.html#stream_event_drain) event will be emitted and resume the incoming data flow. +When a `false` value is returned, the backpressure system kicks in. It will pause the incoming [`Readable`](https://nodejs.org/api/stream.html#stream_readable_streams) stream from sending any data and wait until the consumer is ready again. Once the data buffer is emptied, a [`'drain'`](https://nodejs.org/api/stream.html#stream_event_drain) event will be emitted to resume the incoming data flow. For a deeper understanding of backpressure, check the [`backpressure guide`][]. ## Streams vs Web streams -The stream concept is not exclusive to Node.js. In fact Node.js has a different implementation of the stream concept called [`Web Streams`][]. That implements the [`WHATWG Streams Standard`][]. Although the concepts behind them are similar, it is important to be aware that they have different APIs and are not directly compatible. +The stream concept is not exclusive to Node.js. In fact, Node.js has a different implementation of the stream concept called [`Web Streams`][], which implements the [`WHATWG Streams Standard`][]. Although the concepts behind them are similar, it is important to be aware that they have different APIs and are not directly compatible. -[`Web Streams`][] implement [`ReadableStream`][], [`WritableStream`][] and [`TransformStream`][] classes, homologous to Node.js [`Readable`][], [`Writable`][] and [`Transform`][] streams. +[`Web Streams`][] implement the [`ReadableStream`][], [`WritableStream`][], and [`TransformStream`][] classes, which are homologous to Node.js's [`Readable`][], [`Writable`][], and [`Transform`][] streams. ### Interoperability of streams and Web Streams -Node.js provides utility functions to convert to/from Web Streams and Node.js streams. This functions are implemented as `toWeb` and `fromWeb` methods in each stream class. +Node.js provides utility functions to convert to/from Web Streams and Node.js streams. These functions are implemented as `toWeb` and `fromWeb` methods in each stream class. -The following example in the [`Duplex`][] class covers how to work with both readable and writable streams converted to Web Streams: +The following example in the [`Duplex`][] class demonstrates how to work with both readable and writable streams converted to Web Streams: ```cjs const { Duplex } = require('node:stream'); @@ -552,9 +552,10 @@ await pipeline( ); ``` -Be aware that fetch body is a `ReadableStream` and therefore a [`TextDecoderStream`][] is needed to work with chunks as strings. +Be aware that the fetch body is a `ReadableStream`, and therefore a [`TextDecoderStream`][] is needed to work with chunks as strings. + +This work is derived from content published by [Matteo Collina][] in [Platformatic's Blog][]. -This work is derived from the content published by [Matteo Collina][] in [Platformatic's Blog][] [`Stream`]: https://nodejs.org/api/stream.html [`Buffer`]: https://nodejs.org/api/buffer.html [`TypedArray`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/TypedArray From 0c3afaa6d29c482f7b91cea3a6089e52a42f1967 Mon Sep 17 00:00:00 2001 From: Carlos Espa Date: Sun, 17 Nov 2024 11:03:49 +0100 Subject: [PATCH 4/5] fixup! Apply suggestions from code review --- .../en/learn/modules/how-to-use-streams.md | 290 +++++++++++++++++- packages/i18n/locales/en.json | 4 +- 2 files changed, 286 insertions(+), 8 deletions(-) diff --git a/apps/site/pages/en/learn/modules/how-to-use-streams.md b/apps/site/pages/en/learn/modules/how-to-use-streams.md index ca64901086058..2dca4746165a4 100644 --- a/apps/site/pages/en/learn/modules/how-to-use-streams.md +++ b/apps/site/pages/en/learn/modules/how-to-use-streams.md @@ -100,13 +100,48 @@ stream.on('data', chunk => { }); ``` +```mjs +import { Readable } from 'node:stream'; + +class MyStream extends Readable { + #count = 0; + _read(size) { + this.push(':-)'); + if (++this.#count === 5) { + this.push(null); + } + } +} + +const stream = new MyStream(); + +stream.on('data', chunk => { + console.log(chunk.toString()); +}); +``` + In this code, the `MyStream` class extends Readable and overrides the [`_read`][] method to push a string ":-)" to the internal buffer. After pushing the string five times, it signals the end of the stream by pushing `null`. The [`on('data')`][] event handler logs each chunk to the console as it is received. #### Advanced Control with the readable Event For even finer control over data flow, the readable event can be used. This event is more complex but provides better performance for certain applications by allowing explicit control over when data is read from the stream: -```js +```cjs +const stream = new MyStream({ + highWaterMark: 1, +}); + +stream.on('readable', () => { + console.count('>> readable event'); + let chunk; + while ((chunk = stream.read()) !== null) { + console.log(chunk.toString()); // Process the chunk + } +}); +stream.on('end', () => console.log('>> end event')); +``` + +```mjs const stream = new MyStream({ highWaterMark: 1, }); @@ -143,7 +178,21 @@ NOTE: You can try to run the code with `NODE_DEBUG=stream` to see that `emitRead If we want to see readable events called before each smiley, we can wrap `push` into a `setImmediate` or `process.nextTick` like this: -```js +```cjs +class MyStream extends Readable { + #count = 0; + _read(size) { + setImmediate(() => { + this.push(':-)'); + if (++this.#count === 5) { + return this.push(null); + } + }); + } +} +``` + +```mjs class MyStream extends Readable { #count = 0; _read(size) { @@ -187,6 +236,32 @@ And we’ll get: Here's an example of creating a writable stream that converts all incoming data to uppercase before writing it to the standard output: +```cjs +const { Writable } = require('node:stream'); +const { once } = require('node:events'); + +class MyStream extends Writable { + constructor() { + super({ highWaterMark: 10 /* 10 bytes */ }); + } + _write(data, encode, cb) { + process.stdout.write(data.toString().toUpperCase() + '\n', cb); + } +} +const stream = new MyStream(); + +for (let i = 0; i < 10; i++) { + const waitDrain = !stream.write('hello'); + + if (waitDrain) { + console.log('>> wait drain'); + await once(stream, 'drain'); + } +} + +stream.end('world'); +``` + ```mjs import { Writable } from 'node:stream'; import { once } from 'node:events'; @@ -271,6 +346,29 @@ server.listen(8080, () => { }); ``` +```mjs +import { net } from 'node:net'; + +// Create a TCP server +const server = net.createServer(socket => { + socket.write('Hello from server!\n'); + + socket.on('data', data => { + console.log(`Client says: ${data.toString()}`); + }); + + // Handle client disconnection + socket.on('end', () => { + console.log('Client disconnected'); + }); +}); + +// Start the server on port 8080 +server.listen(8080, () => { + console.log('Server listening on port 8080'); +}); +``` + The previous code will open a TCP socket on port 8080, send `Hello from server!` to any connecting client, and log any data received. ```cjs @@ -291,6 +389,24 @@ client.on('end', () => { }); ``` +```mjs +import { net } from 'node:net'; + +// Connect to the server at localhost:8080 +const client = net.createConnection({ port: 8080 }, () => { + client.write('Hello from client!\n'); +}); + +client.on('data', data => { + console.log(`Server says: ${data.toString()}`); +}); + +// Handle the server closing the connection +client.on('end', () => { + console.log('Disconnected from server'); +}); +``` + The previous code will connect to the TCP socket, send a `Hello from client` message, and log any received data. ### Transform @@ -318,6 +434,17 @@ const upper = new Transform({ }); ``` +```mjs +import { Transform } from 'node:stream'; + +const upper = new Transform({ + transform: function (data, enc, cb) { + this.push(data.toString().toUpperCase()); + cb(); + }, +}); +``` + This stream will take any input and output it in uppercase. ## How to operate with streams @@ -368,6 +495,46 @@ writeStream.on('close', () => { }); ``` +```mjs +import fs from 'node:fs'; +import { Transform } from 'node:stream'; + +let errorCount = 0; +const upper = new Transform({ + transform: function (data, enc, cb) { + if (errorCount === 10) { + return cb(new Error('BOOM!')); + } + errorCount++; + this.push(data.toString().toUpperCase()); + cb(); + }, +}); + +const readStream = fs.createReadStream(import.meta.filename, { + highWaterMark: 1, +}); +const writeStream = process.stdout; + +readStream.pipe(upper).pipe(writeStream); + +readStream.on('close', () => { + console.log('Readable stream closed'); +}); + +upper.on('close', () => { + console.log('Transform stream closed'); +}); + +upper.on('error', err => { + console.error('\nError in transform stream:', err.message); +}); + +writeStream.on('close', () => { + console.log('Writable stream closed'); +}); +``` + After writing 10 characters, `upper` will return an error in the callback, which will cause the stream to close. However, the other streams won’t be notified, resulting in memory leaks. The output will be: ```bash @@ -421,6 +588,47 @@ pipeline(readStream, upper, writeStream, err => { }); ``` +```mjs +import fs from 'node:fs'; +import { Transform, pipeline } from 'node:stream'; + +let errorCount = 0; +const upper = new Transform({ + transform: function (data, enc, cb) { + if (errorCount === 10) { + return cb(new Error('BOOM!')); + } + errorCount++; + this.push(data.toString().toUpperCase()); + cb(); + }, +}); + +const readStream = fs.createReadStream(import.meta.filename, { + highWaterMark: 1, +}); +const writeStream = process.stdout; + +readStream.on('close', () => { + console.log('Readable stream closed'); +}); + +upper.on('close', () => { + console.log('\nTransform stream closed'); +}); + +writeStream.on('close', () => { + console.log('Writable stream closed'); +}); + +pipeline(readStream, upper, writeStream, err => { + if (err) { + return console.error('Pipeline error:', err.message); + } + console.log('Pipeline succeeded'); +}); +``` + In this case, all streams will be closed with the following output: ```bash @@ -451,6 +659,21 @@ Async iterators offer a more modern and often more readable way to work with rea Here's an example demonstrating the use of async iterators with a readable stream: +```cjs +const fs = require('node:fs'); +const { pipeline } = require('node:stream/promises'); + +await pipeline( + fs.createReadStream(import.meta.filename), + async function* (source) { + for await (let chunk of source) { + yield chunk.toString().toUpperCase(); + } + }, + process.stdout +); +``` + ```mjs import fs from 'fs'; import { pipeline } from 'stream/promises'; @@ -473,7 +696,19 @@ This code achieves the same result as the previous examples, without the need to By default, streams can work with strings, [`Buffer`][], [`TypedArray`][], or [`DataView`][]. If an arbitrary value different from these (e.g., an object) is pushed into a stream, a `TypeError` will be thrown. However, it is possible to work with objects by setting the `objectMode` option to `true`. This allows the stream to work with any JavaScript value, except for `null`, which is used to signal the end of the stream. This means you can `push` and `read` any value in a readable stream, and `write` any value in a writable stream. ```cjs -const { Readable } = require('stream'); +const { Readable } = require('node:stream'); + +const readable = Readable({ + objectMode: true, + read() { + this.push({ hello: 'world' }); + this.push(null); + }, +}); +``` + +```mjs +import { Readable } from 'node:stream'; const readable = Readable({ objectMode: true, @@ -490,15 +725,15 @@ When working in object mode, it is important to remember that the `highWaterMark When using streams, it is important to make sure the producer doesn't overwhelm the consumer. For this, the backpressure mechanism is used in all streams in the Node.js API, and implementors are responsible for maintaining that behavior. -In any scenario where the data buffer has exceeded the [`highWaterMark`](https://nodejs.org/api/stream.html#stream_buffering) or the write queue is currently busy, [`write()`](https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback) will return `false`. +In any scenario where the data buffer has exceeded the [`highWaterMark`][] or the write queue is currently busy, [`.write()`][] will return `false`. -When a `false` value is returned, the backpressure system kicks in. It will pause the incoming [`Readable`](https://nodejs.org/api/stream.html#stream_readable_streams) stream from sending any data and wait until the consumer is ready again. Once the data buffer is emptied, a [`'drain'`](https://nodejs.org/api/stream.html#stream_event_drain) event will be emitted to resume the incoming data flow. +When a `false` value is returned, the backpressure system kicks in. It will pause the incoming [`Readable`][] stream from sending any data and wait until the consumer is ready again. Once the data buffer is emptied, a [`'drain'`][] event will be emitted to resume the incoming data flow. For a deeper understanding of backpressure, check the [`backpressure guide`][]. ## Streams vs Web streams -The stream concept is not exclusive to Node.js. In fact, Node.js has a different implementation of the stream concept called [`Web Streams`][], which implements the [`WHATWG Streams Standard`][]. Although the concepts behind them are similar, it is important to be aware that they have different APIs and are not directly compatible. +The stream concept is not exclusive to Node.js. In fact, Node.js has a different implementation of the stream concept called [`Web Streams`][], which implements the [`WHATWG Streams Standard`][]. Although the concepts behind them are similar, it is important to be aware that they have different APIs and are not directly compatible. [`Web Streams`][] implement the [`ReadableStream`][], [`WritableStream`][], and [`TransformStream`][] classes, which are homologous to Node.js's [`Readable`][], [`Writable`][], and [`Transform`][] streams. @@ -533,8 +768,50 @@ readable }); ``` +```mjs +import { Duplex } from 'node:stream'; + +const duplex = Duplex({ + read() { + this.push('world'); + this.push(null); + }, + write(chunk, encoding, callback) { + console.log('writable', chunk); + callback(); + }, +}); + +const { readable, writable } = Duplex.toWeb(duplex); +writable.getWriter().write('hello'); + +readable + .getReader() + .read() + .then(result => { + console.log('readable', result.value); + }); +``` + The helper functions are useful if you need to return a Web Stream from a Node.js module or vice versa. For regular consumption of streams, async iterators enable seamless interaction with both Node.js and Web Streams. +```cjs +const { pipeline } = require('node:stream/promises'); + +const { body } = await fetch('https://nodejs.org/api/stream.html'); + +await pipeline( + body, + new TextDecoderStream(), + async function* (source) { + for await (const chunk of source) { + yield chunk.toString().toUpperCase(); + } + }, + process.stdout +); +``` + ```mjs import { pipeline } from 'node:stream/promises'; @@ -578,6 +855,7 @@ This work is derived from content published by [Matteo Collina][] in [Platformat [`.write()`]: https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback [`_write`]: https://nodejs.org/api/stream.html#writable_writechunk-encoding-callback [`.end()`]: https://nodejs.org/api/stream.html#writableendchunk-encoding-callback +[`'drain'`]: https://nodejs.org/api/stream.html#stream_event_drain [`_transform`]: https://nodejs.org/api/stream.html#transform_transformchunk-encoding-callback [`Readable.from()`]: https://nodejs.org/api/stream.html#streamreadablefromiterable-options [`highWaterMark`]: https://nodejs.org/api/stream.html#stream_buffering diff --git a/packages/i18n/locales/en.json b/packages/i18n/locales/en.json index 22adafa2d7bcc..c1802c2ad3d56 100644 --- a/packages/i18n/locales/en.json +++ b/packages/i18n/locales/en.json @@ -93,8 +93,8 @@ "publishingNodeApiModules": "How to publish a Node-API package", "anatomyOfAnHttpTransaction": "Anatomy of an HTTP Transaction", "abiStability": "ABI Stability", - "backpressuringInStreams": "Backpressuring in Streams", - "howToUseStreams": "How to use streams" + "howToUseStreams": "How to use streams", + "backpressuringInStreams": "Backpressuring in Streams" } }, "diagnostics": { From 6d9c6b4140cf979ebd80db7d55516aa3a137b2cb Mon Sep 17 00:00:00 2001 From: Carlos Espa Date: Sun, 17 Nov 2024 12:13:43 +0100 Subject: [PATCH 5/5] fixup! fixup! Apply suggestions from code review --- apps/site/navigation.json | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/site/navigation.json b/apps/site/navigation.json index 3a21dca8b27ec..4c00d08ce0a05 100644 --- a/apps/site/navigation.json +++ b/apps/site/navigation.json @@ -325,13 +325,13 @@ "link": "/learn/modules/abi-stability", "label": "components.navigation.learn.modules.links.abiStability" }, - "backpressuringInStreams": { - "link": "/learn/modules/backpressuring-in-streams", - "label": "components.navigation.learn.modules.links.backpressuringInStreams" - }, "howToUseStreams": { "link": "/learn/modules/how-to-use-streams", "label": "components.navigation.learn.modules.links.howToUseStreams" + }, + "backpressuringInStreams": { + "link": "/learn/modules/backpressuring-in-streams", + "label": "components.navigation.learn.modules.links.backpressuringInStreams" } } },