From bdd5515c676c2c52c63c7267f149fbfde6d7ed89 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Sat, 21 Oct 2023 18:33:22 +0200 Subject: [PATCH] add stream backpressure test --- test/stream.js | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/test/stream.js b/test/stream.js index 91d92a2..97f0175 100644 --- a/test/stream.js +++ b/test/stream.js @@ -1,5 +1,6 @@ const test = require('brittle') const b4a = require('b4a') +const { Readable } = require('streamx') const proxy = require('./helpers/proxy') const UDX = require('../') const { makeTwoStreams } = require('./helpers') @@ -820,3 +821,46 @@ test('write to unconnected stream', async function (t) { socket.close() }) }) + +test('backpressures stream', async function (t) { + t.plan(2) + + const u = new UDX() + + const send = 512 * 1024 * 1024 + + let sent = 0 + let recv = 0 + + const socket = u.createSocket() + socket.bind(0) + + const a = u.createStream(1) + const b = u.createStream(2) + + a.connect(socket, 2, socket.address().port) + b.connect(socket, 1, socket.address().port) + + const rs = new Readable({ + read (cb) { + sent += 65536 + this.push(Buffer.alloc(65536)) + if (sent === send) this.push(null) + cb(null) + } + }) + + rs.pipe(a) + + b.resume() + b.on('data', function (data) { + recv += data.byteLength + }) + b.on('end', function () { + t.is(recv, send) + t.ok(send > 0, 'sanity check, sent ' + send + ' bytes') + + b.end() + socket.close() + }) +})