Simple function for looping your Node.js stream with C-like for loop features:
break
andcontinue
If all you want to do is consume a Node.js stream in total, then probably the easiest way is async iteration.
However if you only want to consume it partly, there are some limitations. Using break
will result in closing the reading stream.
Imagine the user is writing to stdin
of your program and you want to reply to their 'Hello" with 'Hi!' and then just leave the stream for consumption to the other part of your application.
for await (const chunk of process.stdin) {
if(chunk.toString() === 'Hello\n') {
console.log('Hi!');
break;
}
}
process.stdin.on('data', (chunk) => {
// This will never run :(
});
instead you could just use loopStream
:
await loopStream(process.stdin, (chunk) => {
if(chunk.toString() === 'Hello\n') {
console.log('Hi!');
return { action: 'break' };
}
return { action: 'continue' };
})
process.stdin.on('data', (chunk) => {
// This will work as expected!
});
There are some more use cases for this little function. Let's suppose you want to parse incoming HTTP headers and then read the body using different listeners.
async function readHttpHeaders(stream: Readable): Promise<Record<string, string>> {
/* Assuming the structure of the request is
Header1: Value1\r\n
Header2: Value2\r\n
\r\n
[Body]
*/
return loopStream(stream, '', (chunk, acc) => {
acc += chunk;
const headEndSeqIndex = acc.indexOf("\r\n\r\n");
// if we didn't get to the end of headers section of HTTP request, just continue reading the headers
if (headEndSeqIndex === -1) {
return { action: "continue", acc };
}
const rawHeaders = acc.slice(0, headEndSeqIndex);
// when we know we already got to the end of headers we have to make sure we didn't read a part of HTTP body
const bodyBeginning = acc.slice(headEndSeqIndex + "\r\n\r\n".length);
// if so, we want to return it as 'unconsumedData' so it can be unshifted into the original stream
return {
action: "break",
acc: rawHeaders,
unconsumedData: bodyBeginning
};
});
}
// Consume just the headers part of the HTTP request
const headers = await readHttpHeaders(request);
// and then we just read the remaining stream to get the body
let body = '';
for await (const bodyChunk of request)
body += bodyChunk;
npm install loop-stream
stream
- Readable streamiter
- IterateWithoutState- Returns - Promise that resolves either when
{ action: 'break' }
is returned or when the stream has ended.
- chunk - Chunk of stream data obtained using
stream.read()
- Returns -
{ action: 'continue'; }
or{ action: 'break'; unconsumedData?: any; };
Consumes chunk of data.
Example:
await loopStream(process.stdin, (chunk) => {
if(chunk === 'Hi\n') {
console.log('Hello');
return { action: 'break' };
}
return { action: 'continue' };
});
stream
- Readable streaminitialAcc
- initial value of the accumulatoriter
- IterateWithState- Returns - Promise that resolves either when
{ action: 'break' }
is returned or when the stream has ended.
It's a bit like arr.reduce()
for streams. You can accumulate chunks from streams into some value the will be later returned from loopStream.
- chunk - Chunk of stream data obtained using
stream.read()
- acc - Accumulator returned from the previous iteration. In the first iteration it's
initialAcc
. - Returns -
{ action: 'continue'; acc: Accumulator; }
or{ action: 'break'; unconsumedData?: any; acc: Accumulator; };
Consumes chunk of data and accumulates processed stream.
Example:
const stream = new PassThrough({ objectMode: true });
[2, 1, 3, -1, 3].forEach(num => stream.write(num));
const sum = await loopStream(stream, 0, (chunk, acc: number) => {
if(chunk === -1) {
return { action: 'break', acc };
}
return { action: 'continue', acc: acc + chunk };
});
// sum: 6