Skip to content

Commit

Permalink
Add test for asymmetric backpressure (#43)
Browse files Browse the repository at this point in the history
* Add test for asymmetric backpressure

* Internalize NullWritable
  • Loading branch information
dmurvihill authored Sep 12, 2023
1 parent 6848bbf commit bf82cb6
Showing 1 changed file with 50 additions and 0 deletions.
50 changes: 50 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const fs = require('fs')
const path = require('path')
const { test } = require('tape')
const stream1 = require('stream')
const from = require('from2')
const crypto = require('crypto')
const sink = require('flush-write-stream')
Expand Down Expand Up @@ -720,6 +721,55 @@ test('big file', function (t) {
setTimeout(pipe.bind(null, stream.clone(), 2), 1000)
})

test('waits for slowest clone', function (t) {
t.plan(5)

let streamOut = 0
let cloneOut = 0
const stream = cloneable(fs.createReadStream(path.join(__dirname, 'big')))
.on('data', (chunk, encoding) => { streamOut += chunk.length })
const clone = stream.clone()
.on('data', (chunk, encoding) => { cloneOut += chunk.length })

class NullWritable extends stream1.Writable {
_write (_chunk, _encoding, callback) {
callback()
}

_writev (_chunks, callback) {
callback()
}
}

const sink1 = new NullWritable()
const sink2 = new NullWritable()
sink2.cork()

function pipe (s, d, num) {
s.on('end', function () {
t.pass('end for ' + num)
})
s.pipe(d)
.on('finish', function () {
t.pass('finish for ' + num)
})
}

pipe(stream, sink1, 0)
pipe(clone, sink2, 1)

let firstDelta, secondDelta

setTimeout(function () {
firstDelta = streamOut - cloneOut
setTimeout(function () {
secondDelta = streamOut - cloneOut
t.equal(firstDelta, secondDelta)
sink2.uncork()
}, 100)
}, 100)
})

test('pipeline error', function (t) {
t.plan(1)

Expand Down

0 comments on commit bf82cb6

Please sign in to comment.