Skip to content

Commit

Permalink
fix(netstring): Early return during chunked write (merge #1790)
Browse files Browse the repository at this point in the history
  • Loading branch information
kriskowal authored Sep 26, 2023
2 parents aebbea2 + ae1d93b commit 33bc44b
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 6 deletions.
1 change: 1 addition & 0 deletions packages/netstring/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
},
"dependencies": {
"@endo/init": "^0.5.60",
"@endo/promise-kit": "^0.2.60",
"@endo/stream": "^0.3.29",
"ses": "^0.18.8"
},
Expand Down
41 changes: 41 additions & 0 deletions packages/netstring/test/test-netstring.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,47 @@ const chunkedWrite = async (t, opts) => {
test('chunked write', chunkedWrite);
test('chunked write (chunked)', chunkedWrite, { chunked: true });

test('concurrent chunked writes', async t => {
const { array, writer } = makeArrayWriter({ chunked: true });
const concurrentChunkedMessages = [
[],
[''],
['A'],
['hello', ' ', 'world'],
['Hello', ', ', 'World', '!\n'],
];
await Promise.all(
concurrentChunkedMessages.flatMap(strChunks => [
writer.next(strChunks.map(strChunk => encoder.encode(strChunk))),
writer.return(),
]),
);

t.deepEqual(
concurrentChunkedMessages.map(strChunks =>
encoder.encode(strChunks.join('')),
),
await read(makeNetstringReader(array)),
);
});

test('writer closes anywhere within chunk', async t => {
for (let count = 0; count < 4; count += 1) {
const pipe = makePipe();
const writer = makeNetstringWriter(pipe[1], { chunked: true });
for (let i = 0; i < count; i += 1) {
pipe[0].next();
}
// close the writer:
pipe[0].return();
// eslint-disable-next-line no-await-in-loop
const { done } = await writer.next(
['Hello, ', 'World!\n'].map(str => encoder.encode(str)),
);
t.assert(done);
}
});

const varyingMessages = async (t, opts) => {
const array = ['', 'A', 'hello'];

Expand Down
32 changes: 26 additions & 6 deletions packages/netstring/writer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// @ts-check
/// <reference types="ses"/>

import { makePromiseKit } from '@endo/promise-kit';

const COMMA_BUFFER = new Uint8Array([','.charCodeAt(0)]);

/** @param {number} length */
Expand Down Expand Up @@ -39,14 +41,32 @@ export const makeNetstringWriter = (output, { chunked = false } = {}) => {
const prefix = getLengthPrefixCharCodes(messageLength);

if (chunked) {
return Promise.all([
const ack = makePromiseKit();

const partsWritten = [
output.next(new Uint8Array(prefix)),
...messageChunks.map(async chunk => output.next(chunk)),
...messageChunks.map(chunk => output.next(chunk)),
output.next(COMMA_BUFFER),
]).then(([r1, r2, r3]) => ({
done: !!(r1.done || r2.done || r3.done),
value: undefined,
}));
];

// Resolve early if the output writer closes early.
for (const promise of partsWritten) {
promise.then(partWritten => {
if (partWritten.done) {
ack.resolve(partWritten);
}
});
}

Promise.all(partsWritten).then(results => {
// Redundant resolution is safe and clean.
ack.resolve({
done: results.some(({ done }) => done),
value: undefined,
});
}, ack.reject);

return ack.promise;
} else {
const buffer = new Uint8Array(prefix.length + messageLength + 1);
buffer.set(prefix, 0);
Expand Down

0 comments on commit 33bc44b

Please sign in to comment.