Skip to content

Commit

Permalink
Replace callbacks with promises
Browse files Browse the repository at this point in the history
  • Loading branch information
SemenchenkoVitaliy committed Jun 18, 2019
1 parent 8ea0e2c commit 7d19dc7
Show file tree
Hide file tree
Showing 7 changed files with 425 additions and 628 deletions.
129 changes: 47 additions & 82 deletions lib/filestorage.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

const fs = require('fs');
const fs = require('./fs');
const path = require('path');
const common = require('@metarhia/common');
const utils = require('./utils');
Expand All @@ -12,141 +12,106 @@ const getFilepath = Symbol('getFilepath');
class FileStorage {
// Create new FileStorage
// options - <Object>
// dir - <string>, data storage directory, which should be created
// path - <string>, data storage directory, which should be created
// before FileStorage is used
// minCompressSize - <number>, minimal file size
// to be compressed, default = 1024
constructor(options) {
this.dir = path.resolve(options.dir);
this.path = path.resolve(options.path);
this.minCompressSize = options.minCompressSize || MIN_COMPRESS_SIZE;
}

// Write file to storage
// id - <common.Uint64>, id of file
// id - <common.Uint64> | <number> | <BigInt>, id of file
// data - <string> | <Buffer> | <Uint8Array>, data to be written
// opts - <Object>
// checksum - <string>, checksum type
// dedupHash - <string>, second checksum type
// cb - <Function>, callback
// err - <Error>
// stats - <Object>
// checksum - <string>, data checksum
// dedupHash - <string>, second data checksum
// size - <number>, data size
// Returns: <Object>, stats
// checksum - <string>, data checksum
// dedupHash - <string>, second data checksum
// size - <number>, data size
// Throws: <TypeError>, if `opts.checksum` or `opts.dedupHash` is incorrect
write(id, data, opts, cb) {
async write(id, data, opts) {
const file = this[getFilepath](id);
common.mkdirp(path.dirname(file), err => {
if (err) {
cb(err);
return;
}
fs.writeFile(file, data, err => {
if (err) {
cb(err);
return;
}
const stats = utils.getDataStats(data, opts.checksum, opts.dedupHash);
cb(null, stats);
});
});
await utils.mkdirRecursive(path.dirname(file));
await fs.writeFile(file, data);

return utils.getDataStats(data, opts.checksum, opts.dedupHash);
}

// Update file in the storage
// id - <common.Uint64>, id of file
// id - <common.Uint64> | <number> | <BigInt>, id of file
// data - <string> | <Buffer> | <Uint8Array>, data to be written
// opts - <Object>
// checksum - <string>, checksum type
// dedupHash - <string>, second checksum type
// cb - <Function>, callback
// err - <Error>
// stats - <Object>
// checksum - <string>, data checksum
// dedupHash - <string>, second data checksum
// size - <number>, data size
// originalSize - <number>, size of original file
// Returns: <Object>, stats
// checksum - <string>, data checksum
// dedupHash - <string>, second data checksum
// size - <number>, data size
// originalSize - <number>, size of original file
// Throws: <TypeError>, if `opts.checksum` or `opts.dedupHash` is incorrect
update(id, data, opts, cb) {
async update(id, data, opts) {
const file = this[getFilepath](id);
fs.stat(file, (err, fstats) => {
if (err) {
cb(err);
return;
}
fs.writeFile(file, data, err => {
if (err) {
cb(err);
return;
}
const stats = utils.getDataStats(data, opts.checksum, opts.dedupHash);
stats.originalSize = fstats.size;
cb(null, stats);
});
});
const fstats = await fs.stat(file);
await fs.writeFile(file, data);

const stats = utils.getDataStats(data, opts.checksum, opts.dedupHash);
stats.originalSize = fstats.size;
return stats;
}

// Get information about file
// id - <common.Uint64>, id of file
// cb - <Function>, callback
// err - <Error>
// stats - <fs.Stats>
stat(id, cb) {
// id - <common.Uint64> | <number> | <BigInt>, id of file
// Returns: <fs.Stats>
async stat(id) {
const file = this[getFilepath](id);
fs.stat(file, cb);
return fs.stat(file);
}

// Read file from storage
// id - <common.Uint64>, id of file
// id - <common.Uint64> | <number> | <BigInt>, id of file
// opts - <Object>
// encoding - <string>
// compression - <string>
// cb - <Function>, callback
// err - <Error>
// data - <Buffer> | <string>
read(id, opts, cb) {
// Returns: <Buffer> | <string>, data
async read(id, opts) {
const file = this[getFilepath](id);
if (opts.compression) utils.uncompress(file, opts, cb);
else fs.readFile(file, opts.encoding, cb);
if (opts.compression) return utils.uncompress(file, opts);
return fs.readFile(file, opts.encoding);
}

// Delete file from storage
// id - <common.Uint64>, id of file
// cb - <Function>, callback
// err - <Error>
rm(id, cb) {
// id - <common.Uint64> | <number> | <BigInt>, id of file
async rm(id) {
const file = this[getFilepath](id);
fs.unlink(file, cb);
await fs.unlink(file);
}

// Compress file in storage
// id - <common.Uint64>, id of file
// id - <common.Uint64> | <number> | <BigInt>, id of file
// compression - <string>, compression type
// cb - <Function>, callback
// err - <Error>
// compressed - <boolean>, whether file was compressed
// Returns: <boolean>, whether file was compressed
// Throws: <TypeError>, if compression is incorrect
compress(id, compression, cb) {
async compress(id, compression) {
const file = this[getFilepath](id);
utils.compress(file, this.minCompressSize, compression, cb);
return utils.compress(file, this.minCompressSize, compression);
}

[getFilepath](id) {
return utils.getFilepath(this.dir, common.idToPath(id));
return utils.getFilepath(this.path, common.idToPath(id));
}
}

// Create new Filestorage and root directory if it doesn't exits
// options - <Object>
// dir - <string>, data storage directory
// minCompressSize - <number>, minimal file size to be compressed
// cb - <Function>, callback
// err - <Error>
// storage - <FileStorage>
const create = (options, cb) => {
common.mkdirp(path.resolve(options.dir), err => {
if (err) cb(err);
else cb(null, new FileStorage(options));
});
// Returns: <FileStorage>, instance
const create = async options => {
await utils.mkdirRecursive(path.resolve(options.path));
return new FileStorage(options);
};

module.exports = { FileStorage, create };
30 changes: 30 additions & 0 deletions lib/fs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict';

// TODO: this file should be removed and `fs.promises` used instead
// when support for Node.js 8 is dropped

const fs = require('fs');
const { promisify } = require('util');

const { iter } = require('@metarhia/common');

const list = [
'readFile',
'writeFile',
'unlink',
'rename',
'stat',
'access',
'mkdir',
'rmdir',
'readdir',
];

if (process.version.slice(1).split('.')[0] >= 10) {
module.exports = fs.promises;
} else {
module.exports = iter(list).collectWith(
{},
(obj, name) => (obj[name] = promisify(fs[name]))
);
}
114 changes: 51 additions & 63 deletions lib/utils.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
'use strict';

const fs = require('fs');
const fs = require('./fs');
const path = require('path');
const crypto = require('crypto');
const { crc32 } = require('crc');
const util = require('util');
const common = require('@metarhia/common');
const { zip, gzip } = require('compressing');
const metasync = require('metasync');

const CHECKSUM = 'CRC32';
const DEDUP_HASH = 'SHA256';
Expand All @@ -28,32 +29,6 @@ const FS_EXT = 'f';

const getFilepath = (...parts) => `${path.join(...parts)}.${FS_EXT}`;

const rmdirp = (dir, cb) => {
fs.stat(dir, (err, stats) => {
if (err) {
if (err.code === 'ENOENT') cb(null);
else cb(err);
return;
}

if (stats.isDirectory()) {
fs.readdir(dir, (err, files) => {
if (err) {
cb(err);
return;
}
files = files.map(file => path.join(dir, file));
metasync.each(files, rmdirp, err => {
if (err) cb(err);
else fs.rmdir(dir, cb);
});
});
} else {
fs.unlink(dir, cb);
}
});
};

const computeHash = (data, checksum) => {
const hasher = hashers[checksum];
if (!hasher) {
Expand All @@ -68,45 +43,57 @@ const getDataStats = (data, checksum, dedupHash) => ({
size: Buffer.byteLength(data),
});

const compress = (file, minCompressSize, compression, cb) => {
fs.stat(file, (err, stats) => {
if (err || stats.size <= minCompressSize) {
cb(err, false);
return;
}
const filec = file + 'z';
const compressor = compressors[compression];
if (!compressor) {
throw new Error(`Unknown compression type ${compression} specified`);
}
compressor
.compressFile(file, filec)
.then(() =>
fs.rename(filec, file, err => {
if (err) cb(err, false);
else cb(null, true);
})
)
.catch(err => cb(err, false));
});
const mkdirRecursive = util.promisify(common.mkdirp);

const rmRecursive = async dir => {
try {
await fs.access(dir);
} catch (err) {
if (err.code === 'ENOENT') return;
throw new Error(`Cannot access directory '${dir}': ${err.message}`);
}

const files = await fs.readdir(dir);

for (let file of files) {
file = path.join(dir, file);
const stats = await fs.stat(file);

if (stats.isDirectory()) await rmRecursive(file);
else await fs.unlink(file);
}

await fs.rmdir(dir);
};

const uncompress = (file, opts, cb) => {
fs.access(file, err => {
if (err) {
cb(err);
return;
}
const compressor = compressors[opts.compression];
if (!compressor) {
throw new Error(`Unknown compression type ${opts.compression} specified`);
}
const compress = async (file, minCompressSize, compression) => {
const compressor = compressors[compression];
if (!compressor) {
throw new Error(`Unknown compression type ${compression} specified`);
}

const stats = await fs.stat(file);
if (stats.size <= minCompressSize) return false;

const filec = file + 'z';
await compressor.compressFile(file, filec);
await fs.rename(filec, file);
return true;
};

const uncompress = async (file, opts) => {
const compressor = compressors[opts.compression];
if (!compressor) {
throw new Error(`Unknown compression type ${opts.compression} specified`);
}

return new Promise((res, rej) => {
const buffers = [];
new compressor.UncompressStream({ source: file })
.on('error', cb)
.on('error', rej)
.on('finish', () => {
if (opts.encoding) cb(null, buffers.join());
else cb(null, Buffer.concat(buffers));
if (opts.encoding) res(buffers.join(''));
else res(Buffer.concat(buffers));
})
.on('entry', (header, stream, next) => {
if (opts.encoding) stream.setEncoding(opts.encoding);
Expand All @@ -120,7 +107,8 @@ module.exports = {
getFilepath,
computeHash,
getDataStats,
rmdirp,
mkdirRecursive,
rmRecursive,
compress,
uncompress,
};
Loading

0 comments on commit 7d19dc7

Please sign in to comment.