diff --git a/lib/data/briefcase.js b/lib/data/briefcase.js index f2ebf29c8..2720253a9 100644 --- a/lib/data/briefcase.js +++ b/lib/data/briefcase.js @@ -7,7 +7,7 @@ // including this file, may be copied, modified, propagated, or distributed // except according to the terms contained in the LICENSE file. -const { Transform, pipeline } = require('stream'); +const { Transform } = require('stream'); const hparser = require('htmlparser2'); const { identity, last } = require('ramda'); const csv = require('csv-stringify'); @@ -164,8 +164,8 @@ const processRow = (xml, instanceId, fields, header, selectValues) => new Promis } }, { xmlMode: true, decodeEntities: true }); - if (typeof xml.pipe === 'function') { - pipeline(xml, parser, rejectIfError(reject)); + if (xml instanceof PartialPipe) { + xml.with(parser).pipeline(rejectIfError(reject)); } else { parser.write(xml); parser.end(); diff --git a/lib/data/client-audits.js b/lib/data/client-audits.js index afc49a722..38088d7dc 100644 --- a/lib/data/client-audits.js +++ b/lib/data/client-audits.js @@ -7,10 +7,11 @@ // including this file, may be copied, modified, propagated, or distributed // except according to the terms contained in the LICENSE file. -const { Transform, pipeline } = require('stream'); +const { Transform } = require('stream'); const parse = require('csv-parse'); const csv = require('csv-stringify'); const sanitize = require('sanitize-filename'); +const { PartialPipe } = require('../util/stream'); const { zipPart } = require('../util/zip'); const headers = [ 'event', 'node', 'start', 'end', 'latitude', 'longitude', 'accuracy', 'old-value', 'new-value' ]; @@ -54,8 +55,8 @@ const parseClientAudits = (input) => { return new Promise((pass, fail) => { // feed either the stream or the buffer into the parser now. - if (input.pipe != null) { - pipeline(input, parser, (err) => { if (err != null) fail(err); }); + if (input instanceof PartialPipe) { + input.with(parser).pipeline((err) => { if (err != null) fail(err); }); } else { parser.write(input); parser.end(); diff --git a/lib/task/fs.js b/lib/task/fs.js index 7012f49a5..3b3599fde 100644 --- a/lib/task/fs.js +++ b/lib/task/fs.js @@ -19,6 +19,7 @@ const yauzl = require('yauzl'); const { task } = require('./task'); const Problem = require('../util/problem'); const { generateLocalCipherer, getLocalDecipherer } = require('../util/crypto'); +const { PartialPipe } = require('../util/stream'); // given a directory containing files, a path to a tmpfile, and keyinfo data, @@ -31,7 +32,6 @@ const { generateLocalCipherer, getLocalDecipherer } = require('../util/crypto'); const encryptToArchive = (directory, tmpFilePath, keys) => { const outStream = createWriteStream(tmpFilePath); const zipStream = archiver('zip', { zlib: { level: 9 } }); - zipStream.pipe(outStream); // create a cipher-generator for use below. const [ localkey, cipherer ] = generateLocalCipherer(keys); @@ -39,6 +39,8 @@ const encryptToArchive = (directory, tmpFilePath, keys) => { // call up all files in the directory. return task.promisify(readdir)(directory).then((files) => new Promise((resolve, reject) => { + PartialPipe.of(zipStream, outStream).pipeline(reject); + // stream each file into the zip, encrypting on the way in. clean up each // plaintext file as soon as we're done with them. // TODO: copypasted for now to lib/resources/backup @@ -48,7 +50,7 @@ const encryptToArchive = (directory, tmpFilePath, keys) => { local.ivs[basename(file)] = iv.toString('base64'); const readStream = createReadStream(filePath); - zipStream.append(readStream.pipe(cipher), { name: file }); + zipStream.append(PartialPipe.of(readStream, cipher).pipeline(reject), { name: file }); readStream.on('end', () => unlinkSync(filePath)); // sync to ensure completion. } @@ -138,7 +140,7 @@ const decryptFromArchive = (archivePath, directory, passphrase = '') => if (completed === entries.length) resolve(); }); decipher.on('error', () => reject(Problem.user.undecryptable())); - inStream.pipe(decipher).pipe(outStream); + PartialPipe.of(inStream, decipher, outStream).pipeline(reject); }); }); }, reject); diff --git a/lib/util/crypto.js b/lib/util/crypto.js index 03dbe9b7c..c38152d1e 100644 --- a/lib/util/crypto.js +++ b/lib/util/crypto.js @@ -15,6 +15,7 @@ const digest = require('digest-stream'); const { createHash, randomBytes, generateKeyPair, pbkdf2, createPrivateKey, createPublicKey, createCipheriv, createDecipheriv, publicEncrypt, privateDecrypt } = require('crypto'); const { RSA_NO_PADDING } = require('crypto').constants; const { Transform } = require('stream'); +const { PartialPipe } = require('./stream'); const { unpadPkcs1OaepMgf1Sha256 } = require('./quarantine/oaep'); const { unpadPkcs7 } = require('./quarantine/pkcs7'); const { promisify } = require('util'); @@ -261,9 +262,9 @@ const streamSubmissionCleartext = (key, iv, input) => { }); if (typeof input.pipe === 'function') { - return input.pipe(decipher).pipe(transform); + return PartialPipe.of(input, decipher, transform); } else { - const result = decipher.pipe(transform); + const result = PartialPipe.of(decipher, transform); decipher.write(input); decipher.end(); return result;