From e2eea70f7c72fa687464751e6538630278f988e6 Mon Sep 17 00:00:00 2001 From: alxndrsn Date: Tue, 26 Nov 2024 06:28:12 +0000 Subject: [PATCH 1/2] Stop using stream.pipe() stream.pipe() should rarely be used in practice because: 1. it can cause memory leaks, and 2. it does not propogate errors between streams PartialPipe.pipeline() explicitly notes this and provides mitigations for it. See: * https://nodejs.org/api/stream.html#readablepipedestination-options * https://stackoverflow.com/questions/58875655/whats-the-difference-between-pipe-and-pipeline-on-streams --- lib/data/briefcase.js | 6 +++--- lib/data/client-audits.js | 7 +++--- lib/task/fs.js | 8 ++++--- lib/util/crypto.js | 5 +++-- lib/util/stream.js | 2 +- test/integration/other/encryption.js | 4 ++-- test/unit/formats/odata.js | 32 ++++++++++++++-------------- test/unit/util/crypto.js | 4 ++-- test/unit/util/stream.js | 2 +- 9 files changed, 37 insertions(+), 33 deletions(-) diff --git a/lib/data/briefcase.js b/lib/data/briefcase.js index f2ebf29c8..2720253a9 100644 --- a/lib/data/briefcase.js +++ b/lib/data/briefcase.js @@ -7,7 +7,7 @@ // including this file, may be copied, modified, propagated, or distributed // except according to the terms contained in the LICENSE file. -const { Transform, pipeline } = require('stream'); +const { Transform } = require('stream'); const hparser = require('htmlparser2'); const { identity, last } = require('ramda'); const csv = require('csv-stringify'); @@ -164,8 +164,8 @@ const processRow = (xml, instanceId, fields, header, selectValues) => new Promis } }, { xmlMode: true, decodeEntities: true }); - if (typeof xml.pipe === 'function') { - pipeline(xml, parser, rejectIfError(reject)); + if (xml instanceof PartialPipe) { + xml.with(parser).pipeline(rejectIfError(reject)); } else { parser.write(xml); parser.end(); diff --git a/lib/data/client-audits.js b/lib/data/client-audits.js index afc49a722..38088d7dc 100644 --- a/lib/data/client-audits.js +++ b/lib/data/client-audits.js @@ -7,10 +7,11 @@ // including this file, may be copied, modified, propagated, or distributed // except according to the terms contained in the LICENSE file. -const { Transform, pipeline } = require('stream'); +const { Transform } = require('stream'); const parse = require('csv-parse'); const csv = require('csv-stringify'); const sanitize = require('sanitize-filename'); +const { PartialPipe } = require('../util/stream'); const { zipPart } = require('../util/zip'); const headers = [ 'event', 'node', 'start', 'end', 'latitude', 'longitude', 'accuracy', 'old-value', 'new-value' ]; @@ -54,8 +55,8 @@ const parseClientAudits = (input) => { return new Promise((pass, fail) => { // feed either the stream or the buffer into the parser now. - if (input.pipe != null) { - pipeline(input, parser, (err) => { if (err != null) fail(err); }); + if (input instanceof PartialPipe) { + input.with(parser).pipeline((err) => { if (err != null) fail(err); }); } else { parser.write(input); parser.end(); diff --git a/lib/task/fs.js b/lib/task/fs.js index 7012f49a5..3b3599fde 100644 --- a/lib/task/fs.js +++ b/lib/task/fs.js @@ -19,6 +19,7 @@ const yauzl = require('yauzl'); const { task } = require('./task'); const Problem = require('../util/problem'); const { generateLocalCipherer, getLocalDecipherer } = require('../util/crypto'); +const { PartialPipe } = require('../util/stream'); // given a directory containing files, a path to a tmpfile, and keyinfo data, @@ -31,7 +32,6 @@ const { generateLocalCipherer, getLocalDecipherer } = require('../util/crypto'); const encryptToArchive = (directory, tmpFilePath, keys) => { const outStream = createWriteStream(tmpFilePath); const zipStream = archiver('zip', { zlib: { level: 9 } }); - zipStream.pipe(outStream); // create a cipher-generator for use below. const [ localkey, cipherer ] = generateLocalCipherer(keys); @@ -39,6 +39,8 @@ const encryptToArchive = (directory, tmpFilePath, keys) => { // call up all files in the directory. return task.promisify(readdir)(directory).then((files) => new Promise((resolve, reject) => { + PartialPipe.of(zipStream, outStream).pipeline(reject); + // stream each file into the zip, encrypting on the way in. clean up each // plaintext file as soon as we're done with them. // TODO: copypasted for now to lib/resources/backup @@ -48,7 +50,7 @@ const encryptToArchive = (directory, tmpFilePath, keys) => { local.ivs[basename(file)] = iv.toString('base64'); const readStream = createReadStream(filePath); - zipStream.append(readStream.pipe(cipher), { name: file }); + zipStream.append(PartialPipe.of(readStream, cipher).pipeline(reject), { name: file }); readStream.on('end', () => unlinkSync(filePath)); // sync to ensure completion. } @@ -138,7 +140,7 @@ const decryptFromArchive = (archivePath, directory, passphrase = '') => if (completed === entries.length) resolve(); }); decipher.on('error', () => reject(Problem.user.undecryptable())); - inStream.pipe(decipher).pipe(outStream); + PartialPipe.of(inStream, decipher, outStream).pipeline(reject); }); }); }, reject); diff --git a/lib/util/crypto.js b/lib/util/crypto.js index 03dbe9b7c..c38152d1e 100644 --- a/lib/util/crypto.js +++ b/lib/util/crypto.js @@ -15,6 +15,7 @@ const digest = require('digest-stream'); const { createHash, randomBytes, generateKeyPair, pbkdf2, createPrivateKey, createPublicKey, createCipheriv, createDecipheriv, publicEncrypt, privateDecrypt } = require('crypto'); const { RSA_NO_PADDING } = require('crypto').constants; const { Transform } = require('stream'); +const { PartialPipe } = require('./stream'); const { unpadPkcs1OaepMgf1Sha256 } = require('./quarantine/oaep'); const { unpadPkcs7 } = require('./quarantine/pkcs7'); const { promisify } = require('util'); @@ -261,9 +262,9 @@ const streamSubmissionCleartext = (key, iv, input) => { }); if (typeof input.pipe === 'function') { - return input.pipe(decipher).pipe(transform); + return PartialPipe.of(input, decipher, transform); } else { - const result = decipher.pipe(transform); + const result = PartialPipe.of(decipher, transform); decipher.write(input); decipher.end(); return result; diff --git a/lib/util/stream.js b/lib/util/stream.js index bb6d74afc..956808312 100644 --- a/lib/util/stream.js +++ b/lib/util/stream.js @@ -103,7 +103,7 @@ class PartialPipe { // really only used for testing purposes, to masquerade as a normal piped stream. // do not use in general; use .with() and .pipeline() above instead. - pipe(out) { return reduce(((x, y) => x.pipe(y)), this.streams[0], this.streams.slice(1)).pipe(out); } + _pipe(out) { return reduce(((x, y) => x.pipe(y)), this.streams[0], this.streams.slice(1)).pipe(out); } /* slint-ignore:pipe */ } module.exports = { diff --git a/test/integration/other/encryption.js b/test/integration/other/encryption.js index 3da709ee2..2da442aa7 100644 --- a/test/integration/other/encryption.js +++ b/test/integration/other/encryption.js @@ -107,11 +107,11 @@ describe('managed encryption', () => { // verify no charlie: (decryptor(null, keys[2].id) === null).should.equal(true); - clearAlpha.pipe(toText((_, textAlpha) => { + clearAlpha._pipe(toText((_, textAlpha) => { textAlpha.should.equal(testData.instances.simple.one); // eslint-disable-next-line no-shadow - clearBeta.pipe(toText((_, textBeta) => { + clearBeta._pipe(toText((_, textBeta) => { textBeta.should.equal(testData.instances.simple.two); done(); })); diff --git a/test/unit/formats/odata.js b/test/unit/formats/odata.js index b00efbb2c..89cd66542 100644 --- a/test/unit/formats/odata.js +++ b/test/unit/formats/odata.js @@ -546,7 +546,7 @@ describe('odata message composition', () => { const inRows = streamTest.fromObjects([]); fieldsFor(testData.forms.simple) .then((fields) => rowStreamToOData(fields, 'Submissions', 'http://localhost:8989', '/simple.svc/Submissions', {}, inRows)) - .then((stream) => stream.pipe(streamTest.toText((_, result) => { + .then((stream) => stream._pipe(streamTest.toText((_, result) => { const resultObj = JSON.parse(result); resultObj['@odata.context'].should.equal('http://localhost:8989/simple.svc/$metadata#Submissions'); done(); @@ -557,7 +557,7 @@ describe('odata message composition', () => { const inRows = streamTest.fromObjects([]); fieldsFor(testData.forms.withrepeat) .then((fields) => rowStreamToOData(fields, 'Submissions.children.child', 'http://localhost:8989', '/withrepeat.svc/Submissions', {}, inRows)) - .then((stream) => stream.pipe(streamTest.toText((_, result) => { + .then((stream) => stream._pipe(streamTest.toText((_, result) => { const resultObj = JSON.parse(result); resultObj['@odata.context'].should.equal('http://localhost:8989/withrepeat.svc/$metadata#Submissions.children.child'); done(); @@ -570,7 +570,7 @@ describe('odata message composition', () => { const inRows = streamTest.fromObjects(instances(10)); fieldsFor(testData.forms.simple) .then((fields) => rowStreamToOData(fields, 'Submissions', 'http://localhost:8989', '/simple.svc/Submissions?$top=3&$skip=7', query, inRows)) - .then((stream) => stream.pipe(streamTest.toText((_, result) => { + .then((stream) => stream._pipe(streamTest.toText((_, result) => { const resultObj = JSON.parse(result); // eslint-disable-next-line no-undef should.not.exist(resultObj['@odata.nextLink']); @@ -583,7 +583,7 @@ describe('odata message composition', () => { const inRows = streamTest.fromObjects(instances(6)); // make it close to check the off-by-one. fieldsFor(testData.forms.simple) .then((fields) => rowStreamToOData(fields, 'Submissions', 'http://localhost:8989', '/simple.svc/Submissions?$top=3&$skip=2', query, inRows, 10, 8)) - .then((stream) => stream.pipe(streamTest.toText((_, result) => { + .then((stream) => stream._pipe(streamTest.toText((_, result) => { const resultObj = JSON.parse(result); resultObj['@odata.nextLink'].should.equal('http://localhost:8989/simple.svc/Submissions?%24top=3&%24skiptoken=01e30%3D'); done(); @@ -595,7 +595,7 @@ describe('odata message composition', () => { const inRows = streamTest.fromObjects(instances(6)); // make it close to check the off-by-one. fieldsFor(testData.forms.simple) .then((fields) => rowStreamToOData(fields, 'Submissions', 'http://localhost:8989', '/simple.svc/Submissions?$top=3&$skip=2&$wkt=true&$count=true', query, inRows, 10, 8)) - .then((stream) => stream.pipe(streamTest.toText((_, result) => { + .then((stream) => stream._pipe(streamTest.toText((_, result) => { const resultObj = JSON.parse(result); resultObj['@odata.nextLink'].should.equal('http://localhost:8989/simple.svc/Submissions?%24top=3&%24wkt=true&%24count=true&%24skiptoken=01e30%3D'); done(); @@ -607,7 +607,7 @@ describe('odata message composition', () => { const inRows = streamTest.fromObjects(instances(8)); // make it close to check the off-by-one. fieldsFor(testData.forms.simple) .then((fields) => rowStreamToOData(fields, 'Submissions', 'http://localhost:8989', '/simple.svc/Submissions?$count=true', query, inRows)) - .then((stream) => stream.pipe(streamTest.toText((_, result) => { + .then((stream) => stream._pipe(streamTest.toText((_, result) => { const resultObj = JSON.parse(result); resultObj['@odata.count'].should.equal(8); done(); @@ -619,7 +619,7 @@ describe('odata message composition', () => { const inRows = streamTest.fromObjects(instances(8)); // make it close to check the off-by-one. fieldsFor(testData.forms.simple) .then((fields) => rowStreamToOData(fields, 'Submissions', 'http://localhost:8989', '/simple.svc/Submissions?$top=1&$skip=1&$count=true', query, inRows)) - .then((stream) => stream.pipe(streamTest.toText((_, result) => { + .then((stream) => stream._pipe(streamTest.toText((_, result) => { const resultObj = JSON.parse(result); resultObj['@odata.count'].should.equal(8); done(); @@ -634,7 +634,7 @@ describe('odata message composition', () => { const inRows = streamTest.fromObjects([]); fieldsFor(testData.forms.simple) .then((fields) => rowStreamToOData(fields, 'Submissions', 'http://localhost:8989', '/simple.svc/Submissions', {}, inRows)) - .then((stream) => stream.pipe(streamTest.toText((_, result) => { + .then((stream) => stream._pipe(streamTest.toText((_, result) => { JSON.parse(result).should.eql({ value: [], '@odata.context': 'http://localhost:8989/simple.svc/$metadata#Submissions', @@ -650,7 +650,7 @@ describe('odata message composition', () => { ]); fieldsFor(testData.forms.simple) .then((fields) => rowStreamToOData(fields, 'Submissions', 'http://localhost:8989', '/simple.svc/Submissions', {}, inRows)) - .then((stream) => stream.pipe(streamTest.toText((_, result) => { + .then((stream) => stream._pipe(streamTest.toText((_, result) => { JSON.parse(result).should.eql({ '@odata.context': 'http://localhost:8989/simple.svc/$metadata#Submissions', value: [ @@ -671,7 +671,7 @@ describe('odata message composition', () => { ]); fieldsFor(testData.forms.simple) .then((fields) => rowStreamToOData(fields, 'Submissions', 'http://localhost:8989', '/simple.svc/Submissions?$top=2', query, inRows)) - .then((stream) => stream.pipe(streamTest.toText((_, result) => { + .then((stream) => stream._pipe(streamTest.toText((_, result) => { JSON.parse(result).should.eql({ '@odata.context': 'http://localhost:8989/simple.svc/$metadata#Submissions', value: [ @@ -693,7 +693,7 @@ describe('odata message composition', () => { ]); fieldsFor(testData.forms.simple) .then((fields) => rowStreamToOData(fields, 'Submissions', 'http://localhost:8989', '/simple.svc/Submissions?$skip=2', query, inRows)) - .then((stream) => stream.pipe(streamTest.toText((_, result) => { + .then((stream) => stream._pipe(streamTest.toText((_, result) => { JSON.parse(result).should.eql({ '@odata.context': 'http://localhost:8989/simple.svc/$metadata#Submissions', value: [ @@ -714,7 +714,7 @@ describe('odata message composition', () => { ]); fieldsFor(testData.forms.withrepeat) .then((fields) => rowStreamToOData(fields, 'Submissions.children.child', 'http://localhost:8989', '/withrepeat.svc/Submissions', {}, inRows)) - .then((stream) => stream.pipe(streamTest.toText((_, result) => { + .then((stream) => stream._pipe(streamTest.toText((_, result) => { JSON.parse(result).should.eql({ '@odata.context': 'http://localhost:8989/withrepeat.svc/$metadata#Submissions.children.child', value: [{ @@ -747,7 +747,7 @@ describe('odata message composition', () => { ]); fieldsFor(testData.forms.withrepeat) .then((fields) => rowStreamToOData(fields, 'Submissions.children.child', 'http://localhost:8989', '/withrepeat.svc/Submissions.children.child?$top=2', query, inRows)) - .then((stream) => stream.pipe(streamTest.toText((_, result) => { + .then((stream) => stream._pipe(streamTest.toText((_, result) => { JSON.parse(result).should.eql({ '@odata.context': 'http://localhost:8989/withrepeat.svc/$metadata#Submissions.children.child', '@odata.nextLink': 'http://localhost:8989/withrepeat.svc/Submissions.children.child?%24top=2&%24skiptoken=01eyJyZXBlYXRJZCI6ImM3NmQwY2NjNmQ1ZGEyMzZiZTdiOTNiOTg1YTgwNDEzZDJlM2UxNzIifQ%3D%3D', @@ -776,7 +776,7 @@ describe('odata message composition', () => { ]); fieldsFor(testData.forms.withrepeat) .then((fields) => rowStreamToOData(fields, 'Submissions.children.child', 'http://localhost:8989', '/withrepeat.svc/Submissions.children.child?$skip=2', query, inRows)) - .then((stream) => stream.pipe(streamTest.toText((_, result) => { + .then((stream) => stream._pipe(streamTest.toText((_, result) => { JSON.parse(result).should.eql({ '@odata.context': 'http://localhost:8989/withrepeat.svc/$metadata#Submissions.children.child', value: [{ @@ -804,7 +804,7 @@ describe('odata message composition', () => { ]); fieldsFor(testData.forms.withrepeat) .then((fields) => rowStreamToOData(fields, 'Submissions.children.child', 'http://localhost:8989', '/withrepeat.svc/Submissions.children.child?$skip=1&$top=1', query, inRows)) - .then((stream) => stream.pipe(streamTest.toText((_, result) => { + .then((stream) => stream._pipe(streamTest.toText((_, result) => { JSON.parse(result).should.eql({ '@odata.context': 'http://localhost:8989/withrepeat.svc/$metadata#Submissions.children.child', '@odata.nextLink': 'http://localhost:8989/withrepeat.svc/Submissions.children.child?%24top=1&%24skiptoken=01eyJyZXBlYXRJZCI6ImM3NmQwY2NjNmQ1ZGEyMzZiZTdiOTNiOTg1YTgwNDEzZDJlM2UxNzIifQ%3D%3D', @@ -828,7 +828,7 @@ describe('odata message composition', () => { ]); fieldsFor(testData.forms.withrepeat) .then((fields) => rowStreamToOData(fields, 'Submissions.children.child', 'http://localhost:8989', '/withrepeat.svc/Submissions.children.child?$skip=1&$top=1', query, inRows)) - .then((stream) => stream.pipe(streamTest.toText((_, result) => { + .then((stream) => stream._pipe(streamTest.toText((_, result) => { JSON.parse(result).should.eql({ '@odata.context': 'http://localhost:8989/withrepeat.svc/$metadata#Submissions.children.child', value: [{ diff --git a/test/unit/util/crypto.js b/test/unit/util/crypto.js index ed7f32c17..dfec937b7 100644 --- a/test/unit/util/crypto.js +++ b/test/unit/util/crypto.js @@ -221,7 +221,7 @@ describe('util/crypto', () => { chunks.push(ciphertext.subarray(328, 336)); streamSubmissionCleartext(aesKey, ivs(1), streamTest.fromChunks(chunks)) - .pipe(streamTest.toText((_, result) => { + ._pipe(streamTest.toText((_, result) => { result.should.equal(plaintext); done(); })); @@ -241,7 +241,7 @@ describe('util/crypto', () => { chunks.push(ciphertext.subarray(332, 336)); streamSubmissionCleartext(aesKey, ivs(1), streamTest.fromChunks(chunks)) - .pipe(streamTest.toText((_, result) => { + ._pipe(streamTest.toText((_, result) => { result.should.equal(plaintext); done(); })); diff --git a/test/unit/util/stream.js b/test/unit/util/stream.js index ccf25190a..6b9a92c4f 100644 --- a/test/unit/util/stream.js +++ b/test/unit/util/stream.js @@ -178,7 +178,7 @@ describe('stream utils', () => { it('should assemble a traditional .pipe() chain on pipe', (done) => PartialPipe.of( fromObjects([ 4, 8, 15, 16, 23, 42 ]), doubler(), doubler() - ).pipe(toObjects((e, result) => { + )._pipe(toObjects((e, result) => { result.should.eql([ 16, 32, 60, 64, 92, 168 ]); done(); }))); From 54f10cec71cab98f7060a11a7efded0abe0cc2d6 Mon Sep 17 00:00:00 2001 From: alxndrsn Date: Tue, 26 Nov 2024 08:36:36 +0000 Subject: [PATCH 2/2] revert fn rename --- lib/util/stream.js | 2 +- test/integration/other/encryption.js | 4 ++-- test/unit/formats/odata.js | 32 ++++++++++++++-------------- test/unit/util/crypto.js | 4 ++-- test/unit/util/stream.js | 2 +- 5 files changed, 22 insertions(+), 22 deletions(-) diff --git a/lib/util/stream.js b/lib/util/stream.js index 956808312..bb6d74afc 100644 --- a/lib/util/stream.js +++ b/lib/util/stream.js @@ -103,7 +103,7 @@ class PartialPipe { // really only used for testing purposes, to masquerade as a normal piped stream. // do not use in general; use .with() and .pipeline() above instead. - _pipe(out) { return reduce(((x, y) => x.pipe(y)), this.streams[0], this.streams.slice(1)).pipe(out); } /* slint-ignore:pipe */ + pipe(out) { return reduce(((x, y) => x.pipe(y)), this.streams[0], this.streams.slice(1)).pipe(out); } } module.exports = { diff --git a/test/integration/other/encryption.js b/test/integration/other/encryption.js index 2da442aa7..3da709ee2 100644 --- a/test/integration/other/encryption.js +++ b/test/integration/other/encryption.js @@ -107,11 +107,11 @@ describe('managed encryption', () => { // verify no charlie: (decryptor(null, keys[2].id) === null).should.equal(true); - clearAlpha._pipe(toText((_, textAlpha) => { + clearAlpha.pipe(toText((_, textAlpha) => { textAlpha.should.equal(testData.instances.simple.one); // eslint-disable-next-line no-shadow - clearBeta._pipe(toText((_, textBeta) => { + clearBeta.pipe(toText((_, textBeta) => { textBeta.should.equal(testData.instances.simple.two); done(); })); diff --git a/test/unit/formats/odata.js b/test/unit/formats/odata.js index 89cd66542..b00efbb2c 100644 --- a/test/unit/formats/odata.js +++ b/test/unit/formats/odata.js @@ -546,7 +546,7 @@ describe('odata message composition', () => { const inRows = streamTest.fromObjects([]); fieldsFor(testData.forms.simple) .then((fields) => rowStreamToOData(fields, 'Submissions', 'http://localhost:8989', '/simple.svc/Submissions', {}, inRows)) - .then((stream) => stream._pipe(streamTest.toText((_, result) => { + .then((stream) => stream.pipe(streamTest.toText((_, result) => { const resultObj = JSON.parse(result); resultObj['@odata.context'].should.equal('http://localhost:8989/simple.svc/$metadata#Submissions'); done(); @@ -557,7 +557,7 @@ describe('odata message composition', () => { const inRows = streamTest.fromObjects([]); fieldsFor(testData.forms.withrepeat) .then((fields) => rowStreamToOData(fields, 'Submissions.children.child', 'http://localhost:8989', '/withrepeat.svc/Submissions', {}, inRows)) - .then((stream) => stream._pipe(streamTest.toText((_, result) => { + .then((stream) => stream.pipe(streamTest.toText((_, result) => { const resultObj = JSON.parse(result); resultObj['@odata.context'].should.equal('http://localhost:8989/withrepeat.svc/$metadata#Submissions.children.child'); done(); @@ -570,7 +570,7 @@ describe('odata message composition', () => { const inRows = streamTest.fromObjects(instances(10)); fieldsFor(testData.forms.simple) .then((fields) => rowStreamToOData(fields, 'Submissions', 'http://localhost:8989', '/simple.svc/Submissions?$top=3&$skip=7', query, inRows)) - .then((stream) => stream._pipe(streamTest.toText((_, result) => { + .then((stream) => stream.pipe(streamTest.toText((_, result) => { const resultObj = JSON.parse(result); // eslint-disable-next-line no-undef should.not.exist(resultObj['@odata.nextLink']); @@ -583,7 +583,7 @@ describe('odata message composition', () => { const inRows = streamTest.fromObjects(instances(6)); // make it close to check the off-by-one. fieldsFor(testData.forms.simple) .then((fields) => rowStreamToOData(fields, 'Submissions', 'http://localhost:8989', '/simple.svc/Submissions?$top=3&$skip=2', query, inRows, 10, 8)) - .then((stream) => stream._pipe(streamTest.toText((_, result) => { + .then((stream) => stream.pipe(streamTest.toText((_, result) => { const resultObj = JSON.parse(result); resultObj['@odata.nextLink'].should.equal('http://localhost:8989/simple.svc/Submissions?%24top=3&%24skiptoken=01e30%3D'); done(); @@ -595,7 +595,7 @@ describe('odata message composition', () => { const inRows = streamTest.fromObjects(instances(6)); // make it close to check the off-by-one. fieldsFor(testData.forms.simple) .then((fields) => rowStreamToOData(fields, 'Submissions', 'http://localhost:8989', '/simple.svc/Submissions?$top=3&$skip=2&$wkt=true&$count=true', query, inRows, 10, 8)) - .then((stream) => stream._pipe(streamTest.toText((_, result) => { + .then((stream) => stream.pipe(streamTest.toText((_, result) => { const resultObj = JSON.parse(result); resultObj['@odata.nextLink'].should.equal('http://localhost:8989/simple.svc/Submissions?%24top=3&%24wkt=true&%24count=true&%24skiptoken=01e30%3D'); done(); @@ -607,7 +607,7 @@ describe('odata message composition', () => { const inRows = streamTest.fromObjects(instances(8)); // make it close to check the off-by-one. fieldsFor(testData.forms.simple) .then((fields) => rowStreamToOData(fields, 'Submissions', 'http://localhost:8989', '/simple.svc/Submissions?$count=true', query, inRows)) - .then((stream) => stream._pipe(streamTest.toText((_, result) => { + .then((stream) => stream.pipe(streamTest.toText((_, result) => { const resultObj = JSON.parse(result); resultObj['@odata.count'].should.equal(8); done(); @@ -619,7 +619,7 @@ describe('odata message composition', () => { const inRows = streamTest.fromObjects(instances(8)); // make it close to check the off-by-one. fieldsFor(testData.forms.simple) .then((fields) => rowStreamToOData(fields, 'Submissions', 'http://localhost:8989', '/simple.svc/Submissions?$top=1&$skip=1&$count=true', query, inRows)) - .then((stream) => stream._pipe(streamTest.toText((_, result) => { + .then((stream) => stream.pipe(streamTest.toText((_, result) => { const resultObj = JSON.parse(result); resultObj['@odata.count'].should.equal(8); done(); @@ -634,7 +634,7 @@ describe('odata message composition', () => { const inRows = streamTest.fromObjects([]); fieldsFor(testData.forms.simple) .then((fields) => rowStreamToOData(fields, 'Submissions', 'http://localhost:8989', '/simple.svc/Submissions', {}, inRows)) - .then((stream) => stream._pipe(streamTest.toText((_, result) => { + .then((stream) => stream.pipe(streamTest.toText((_, result) => { JSON.parse(result).should.eql({ value: [], '@odata.context': 'http://localhost:8989/simple.svc/$metadata#Submissions', @@ -650,7 +650,7 @@ describe('odata message composition', () => { ]); fieldsFor(testData.forms.simple) .then((fields) => rowStreamToOData(fields, 'Submissions', 'http://localhost:8989', '/simple.svc/Submissions', {}, inRows)) - .then((stream) => stream._pipe(streamTest.toText((_, result) => { + .then((stream) => stream.pipe(streamTest.toText((_, result) => { JSON.parse(result).should.eql({ '@odata.context': 'http://localhost:8989/simple.svc/$metadata#Submissions', value: [ @@ -671,7 +671,7 @@ describe('odata message composition', () => { ]); fieldsFor(testData.forms.simple) .then((fields) => rowStreamToOData(fields, 'Submissions', 'http://localhost:8989', '/simple.svc/Submissions?$top=2', query, inRows)) - .then((stream) => stream._pipe(streamTest.toText((_, result) => { + .then((stream) => stream.pipe(streamTest.toText((_, result) => { JSON.parse(result).should.eql({ '@odata.context': 'http://localhost:8989/simple.svc/$metadata#Submissions', value: [ @@ -693,7 +693,7 @@ describe('odata message composition', () => { ]); fieldsFor(testData.forms.simple) .then((fields) => rowStreamToOData(fields, 'Submissions', 'http://localhost:8989', '/simple.svc/Submissions?$skip=2', query, inRows)) - .then((stream) => stream._pipe(streamTest.toText((_, result) => { + .then((stream) => stream.pipe(streamTest.toText((_, result) => { JSON.parse(result).should.eql({ '@odata.context': 'http://localhost:8989/simple.svc/$metadata#Submissions', value: [ @@ -714,7 +714,7 @@ describe('odata message composition', () => { ]); fieldsFor(testData.forms.withrepeat) .then((fields) => rowStreamToOData(fields, 'Submissions.children.child', 'http://localhost:8989', '/withrepeat.svc/Submissions', {}, inRows)) - .then((stream) => stream._pipe(streamTest.toText((_, result) => { + .then((stream) => stream.pipe(streamTest.toText((_, result) => { JSON.parse(result).should.eql({ '@odata.context': 'http://localhost:8989/withrepeat.svc/$metadata#Submissions.children.child', value: [{ @@ -747,7 +747,7 @@ describe('odata message composition', () => { ]); fieldsFor(testData.forms.withrepeat) .then((fields) => rowStreamToOData(fields, 'Submissions.children.child', 'http://localhost:8989', '/withrepeat.svc/Submissions.children.child?$top=2', query, inRows)) - .then((stream) => stream._pipe(streamTest.toText((_, result) => { + .then((stream) => stream.pipe(streamTest.toText((_, result) => { JSON.parse(result).should.eql({ '@odata.context': 'http://localhost:8989/withrepeat.svc/$metadata#Submissions.children.child', '@odata.nextLink': 'http://localhost:8989/withrepeat.svc/Submissions.children.child?%24top=2&%24skiptoken=01eyJyZXBlYXRJZCI6ImM3NmQwY2NjNmQ1ZGEyMzZiZTdiOTNiOTg1YTgwNDEzZDJlM2UxNzIifQ%3D%3D', @@ -776,7 +776,7 @@ describe('odata message composition', () => { ]); fieldsFor(testData.forms.withrepeat) .then((fields) => rowStreamToOData(fields, 'Submissions.children.child', 'http://localhost:8989', '/withrepeat.svc/Submissions.children.child?$skip=2', query, inRows)) - .then((stream) => stream._pipe(streamTest.toText((_, result) => { + .then((stream) => stream.pipe(streamTest.toText((_, result) => { JSON.parse(result).should.eql({ '@odata.context': 'http://localhost:8989/withrepeat.svc/$metadata#Submissions.children.child', value: [{ @@ -804,7 +804,7 @@ describe('odata message composition', () => { ]); fieldsFor(testData.forms.withrepeat) .then((fields) => rowStreamToOData(fields, 'Submissions.children.child', 'http://localhost:8989', '/withrepeat.svc/Submissions.children.child?$skip=1&$top=1', query, inRows)) - .then((stream) => stream._pipe(streamTest.toText((_, result) => { + .then((stream) => stream.pipe(streamTest.toText((_, result) => { JSON.parse(result).should.eql({ '@odata.context': 'http://localhost:8989/withrepeat.svc/$metadata#Submissions.children.child', '@odata.nextLink': 'http://localhost:8989/withrepeat.svc/Submissions.children.child?%24top=1&%24skiptoken=01eyJyZXBlYXRJZCI6ImM3NmQwY2NjNmQ1ZGEyMzZiZTdiOTNiOTg1YTgwNDEzZDJlM2UxNzIifQ%3D%3D', @@ -828,7 +828,7 @@ describe('odata message composition', () => { ]); fieldsFor(testData.forms.withrepeat) .then((fields) => rowStreamToOData(fields, 'Submissions.children.child', 'http://localhost:8989', '/withrepeat.svc/Submissions.children.child?$skip=1&$top=1', query, inRows)) - .then((stream) => stream._pipe(streamTest.toText((_, result) => { + .then((stream) => stream.pipe(streamTest.toText((_, result) => { JSON.parse(result).should.eql({ '@odata.context': 'http://localhost:8989/withrepeat.svc/$metadata#Submissions.children.child', value: [{ diff --git a/test/unit/util/crypto.js b/test/unit/util/crypto.js index dfec937b7..ed7f32c17 100644 --- a/test/unit/util/crypto.js +++ b/test/unit/util/crypto.js @@ -221,7 +221,7 @@ describe('util/crypto', () => { chunks.push(ciphertext.subarray(328, 336)); streamSubmissionCleartext(aesKey, ivs(1), streamTest.fromChunks(chunks)) - ._pipe(streamTest.toText((_, result) => { + .pipe(streamTest.toText((_, result) => { result.should.equal(plaintext); done(); })); @@ -241,7 +241,7 @@ describe('util/crypto', () => { chunks.push(ciphertext.subarray(332, 336)); streamSubmissionCleartext(aesKey, ivs(1), streamTest.fromChunks(chunks)) - ._pipe(streamTest.toText((_, result) => { + .pipe(streamTest.toText((_, result) => { result.should.equal(plaintext); done(); })); diff --git a/test/unit/util/stream.js b/test/unit/util/stream.js index 6b9a92c4f..ccf25190a 100644 --- a/test/unit/util/stream.js +++ b/test/unit/util/stream.js @@ -178,7 +178,7 @@ describe('stream utils', () => { it('should assemble a traditional .pipe() chain on pipe', (done) => PartialPipe.of( fromObjects([ 4, 8, 15, 16, 23, 42 ]), doubler(), doubler() - )._pipe(toObjects((e, result) => { + ).pipe(toObjects((e, result) => { result.should.eql([ 16, 32, 60, 64, 92, 168 ]); done(); })));