diff --git a/index.js b/index.js index d2a81e4..9ff4943 100644 --- a/index.js +++ b/index.js @@ -22,42 +22,40 @@ module.exports = function DiskStore(options) { var log = options.log || function _noOpLog() {}; - var adapter = { - - rm: function(fd, cb) { - return fsx.unlink(fd, function(err) { - // Ignore "doesn't exist" errors - if (err && (typeof err !== 'object' || err.code !== 'ENOENT')) { - return cb(err); - } else return cb(); - }); - }, - - ls: function(dirpath, cb) { - return fsx.readdir(dirpath, function (err, files){ - if (err) return cb(err); - files = _.reduce(_.isArray(files)?files:[], function (m, filename){ - var fd = path.join(dirpath,filename); - m.push(fd); - return m; - }, []); - cb(null, files); - }); - }, + var adapter = {}; + adapter.rm = function(fd, cb) { + return fsx.unlink(fd, function(err) { + // Ignore "doesn't exist" errors + if (err && (typeof err !== 'object' || err.code !== 'ENOENT')) { + return cb(err); + } else return cb(); + }); + }; - read: function(fd, cb) { - if (cb) { - return fsx.readFile(fd, cb); - } else { - return fsx.createReadStream(fd); - } - }, + adapter.ls = function(dirpath, cb) { + return fsx.readdir(dirpath, function (err, files){ + if (err) return cb(err); + files = _.reduce(_.isArray(files)?files:[], function (m, filename){ + var fd = path.join(dirpath,filename); + m.push(fd); + return m; + }, []); + cb(null, files); + }); + }; - receive: function (options){ - return r_buildDiskReceiverStream(options); + adapter.read = function(fd, cb) { + if (cb) { + return fsx.readFile(fd, cb); + } else { + return fsx.createReadStream(fd); } }; + adapter.receive = function(options) { + return r_buildDiskReceiverStream(_.extend(options, {adapter: adapter})); + }; + return adapter; }; diff --git a/package.json b/package.json index 6ee637d..0b6b8d1 100644 --- a/package.json +++ b/package.json @@ -33,6 +33,6 @@ "dependencies": { "fs-extra": "~0.8.1", "lodash": "~2.4.1", - "node-uuid": "~1.4.1" + "digest-stream": "~0.2.2" } } diff --git a/standalone/build-disk-receiver-stream.js b/standalone/build-disk-receiver-stream.js index 1e3cc72..7e232b6 100644 --- a/standalone/build-disk-receiver-stream.js +++ b/standalone/build-disk-receiver-stream.js @@ -57,9 +57,15 @@ module.exports = function buildDiskReceiverStream(options) { maxBytes: 15000000, // By default, upload files to `./.tmp/uploads` (relative to cwd) - dirname: '.tmp/uploads' + dirname: '.tmp/uploads', + + // hash method: md5 | sha1 | null + hash: null }); + if(options.hash) { + var DigestStream = require('digest-stream'); + } var receiver__ = WritableStream({ objectMode: true }); @@ -118,13 +124,25 @@ module.exports = function buildDiskReceiverStream(options) { done(err); }); - var __progress__ = r_buildProgressStream(options, __newFile, receiver__, outs__); + if(options.hash) { + var __hash__ = DigestStream(options.hash, 'hex', function(hash, length) { + __newFile.extra = __newFile.extra || {}; + __newFile.extra[options.hash] = hash; + }); + } + else { + var __hash__ = null; + } + + var __progress__ = r_buildProgressStream(options, __newFile, receiver__, outs__, __hash__); // Finally pipe the progress THROUGH the progress stream // and out to disk. - __newFile - .pipe(__progress__) - .pipe(outs__); + var fileStream = __newFile.pipe(__progress__); + + if(__hash__) fileStream = fileStream.pipe(__hash__); + + fileStream.pipe(outs__); }); diff --git a/standalone/build-progress-stream.js b/standalone/build-progress-stream.js index 679002c..f6d7e3f 100644 --- a/standalone/build-progress-stream.js +++ b/standalone/build-progress-stream.js @@ -15,8 +15,9 @@ var TransformStream = require('stream').Transform; * @param {[type]} outs__ [description] * @return {[type]} [description] */ -module.exports = function buildProgressStream (options, __newFile, receiver__, outs__) { +module.exports = function buildProgressStream (options, __newFile, receiver__, outs__, __hash__) { options = options || {}; + var adapter = options.adapter; var log = options.log || function noOpLog(){}; // Generate a progress stream and unique id for this file @@ -41,7 +42,7 @@ module.exports = function buildProgressStream (options, __newFile, receiver__, o // progress and determine whether we're within quota this.emit('progress', { id: localID, - fd: __newFile._skipperFD, + fd: __newFile.fd, name: __newFile.name, written: writtenSoFar, total: guessedTotal, @@ -81,7 +82,7 @@ module.exports = function buildProgressStream (options, __newFile, receiver__, o } else { currentFileProgress = { id: localID, - fd: __newFile._skipperFD, + fd: __newFile.fd, name: __newFile.filename, written: milestone.written, total: milestone.total, @@ -96,7 +97,7 @@ module.exports = function buildProgressStream (options, __newFile, receiver__, o // Recalculate `totalBytesWritten` so far for this receiver instance // (across ALL OF ITS FILES) // using the sum of all bytes written to each file in `receiver__._files` - totalBytesWritten = _.reduce(receiver__._files, function(memo, status) { + var totalBytesWritten = _.reduce(receiver__._files, function(memo, status) { memo += status.written; return memo; }, 0); @@ -120,13 +121,15 @@ module.exports = function buildProgressStream (options, __newFile, receiver__, o // Stop listening for progress events __progress__.removeAllListeners('progress'); // Unpipe the progress stream, which feeds the disk stream, so we don't keep dumping to disk - __progress__.unpipe(); + setImmediate(function() { + __progress__.unpipe(); + }); // Clean up any files we've already written (function gc(err) { // Garbage-collects the bytes that were already written for this file. // (called when a read or write error occurs) - log('************** Garbage collecting file `' + __newFile.filename + '` located @ ' + fd + '...'); - adapter.rm(fd, function(gcErr) { + log('************** Garbage collecting file `' + __newFile.filename + '` located @ ' + __newFile.fd + '...'); + adapter.rm(__newFile.fd, function(gcErr) { if (gcErr) return outs__.emit('E_EXCEEDS_UPLOAD_LIMIT',[err].concat([gcErr])); return outs__.emit('E_EXCEEDS_UPLOAD_LIMIT',err); });