diff --git a/index.d.ts b/index.d.ts index e5f8504..082818d 100644 --- a/index.d.ts +++ b/index.d.ts @@ -53,6 +53,37 @@ export namespace gzip { } +export namespace xz { + + function compressFile(source: sourceType, dest: destType, opts?: any): Promise + + function uncompress(source: sourceType, dest: destType, opts?: any): Promise + + function decompress(source: sourceType, dest: destType, opts?: any): Promise + + export class FileStream extends ReadStream { + + constructor(opts?: { + preset?: number, + threads?: number, + source?: sourceType + }); + + } + + export class UncompressStream extends WriteStream { + + constructor(opts?: { + source?: sourceType + }); + + on(event: string, listener: (...args: any[]) => void): this + on(event: 'error', listener: (err: Error) => void): this + + } + +} + export namespace tar { function compressFile(source: sourceType, dest: destType, opts?: any): Promise diff --git a/index.js b/index.js index b104632..259e10b 100644 --- a/index.js +++ b/index.js @@ -4,3 +4,4 @@ exports.zip = require('./lib/zip'); exports.gzip = require('./lib/gzip'); exports.tar = require('./lib/tar'); exports.tgz = require('./lib/tgz'); +exports.xz = require('./lib/xz'); diff --git a/lib/xz/file_stream.js b/lib/xz/file_stream.js new file mode 100644 index 0000000..df4226c --- /dev/null +++ b/lib/xz/file_stream.js @@ -0,0 +1,42 @@ +'use strict'; + +const fs = require('fs'); +const lzma = require('lzma-native'); +const utils = require('../utils'); +const streamifier = require('streamifier'); + +class XzFileStream extends lzma.Compressor { + constructor(opts) { + opts = opts || {}; + const lzmaOpts = { + preset: opts.preset || 6, + threads: opts.threads || 0, + }; + super(lzmaOpts); + + const sourceType = utils.sourceType(opts.source); + + if (sourceType === 'file') { + const stream = fs.createReadStream(opts.source, opts.fs); + stream.on('error', err => this.emit('error', err)); + stream.pipe(this); + return; + } + + if (sourceType === 'buffer') { + const stream = streamifier.createReadStream(opts.source, opts.streamifier); + stream.on('error', err => this.emit('error', err)); + stream.pipe(this); + return; + } + + if (sourceType === 'stream') { + opts.source.on('error', err => this.emit('error', err)); + opts.source.pipe(this); + } + + // else undefined: do nothing + } +} + +module.exports = XzFileStream; diff --git a/lib/xz/index.js b/lib/xz/index.js new file mode 100644 index 0000000..d888f57 --- /dev/null +++ b/lib/xz/index.js @@ -0,0 +1,11 @@ +'use strict'; + +const utils = require('../utils'); +const XzFileStream = require('./file_stream'); +const XzUncompressStream = require('./uncompress_stream'); + +exports.FileStream = XzFileStream; +exports.UncompressStream = XzUncompressStream; +exports.compressFile = utils.makeFileProcessFn(XzFileStream); +exports.uncompress = utils.makeFileProcessFn(XzUncompressStream); +exports.decompress = utils.makeFileProcessFn(XzUncompressStream); diff --git a/lib/xz/uncompress_stream.js b/lib/xz/uncompress_stream.js new file mode 100644 index 0000000..ebae1e4 --- /dev/null +++ b/lib/xz/uncompress_stream.js @@ -0,0 +1,38 @@ +'use strict'; + +const fs = require('fs'); +const lzma = require('lzma-native'); +const utils = require('../utils'); +const streamifier = require('streamifier'); + +class XzUncompressStream extends lzma.Decompressor { + constructor(opts) { + opts = opts || {}; + super(); + + const sourceType = utils.sourceType(opts.source); + + if (sourceType === 'file') { + const stream = fs.createReadStream(opts.source, opts.fs); + stream.on('error', err => this.emit('error', err)); + stream.pipe(this); + return; + } + + if (sourceType === 'buffer') { + const stream = streamifier.createReadStream(opts.source, opts.streamifier); + stream.on('error', err => this.emit('error', err)); + stream.pipe(this); + return; + } + + if (sourceType === 'stream') { + opts.source.on('error', err => this.emit('error', err)); + opts.source.pipe(this); + } + + // else: waiting to be piped + } +} + +module.exports = XzUncompressStream; diff --git a/package.json b/package.json index 96b6c0e..24da30e 100644 --- a/package.json +++ b/package.json @@ -43,6 +43,7 @@ "flushwritable": "^1.0.0", "get-ready": "^1.0.0", "iconv-lite": "^0.5.0", + "lzma-native": "^8.0.6", "streamifier": "^0.1.1", "tar-stream": "^1.5.2", "yazl": "^2.4.2" diff --git a/test/xz/file_stream.test.js b/test/xz/file_stream.test.js new file mode 100644 index 0000000..757376d --- /dev/null +++ b/test/xz/file_stream.test.js @@ -0,0 +1,113 @@ +const fs = require('fs'); +const os = require('os'); +const path = require('path'); +const uuid = require('uuid'); +const { pipeline: pump } = require('stream'); +const compressing = require('../..'); +const assert = require('assert'); + +describe('test/xz/file_stream.test.js', () => { + it('should be a transform stream', done => { + const sourceFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); + const sourceStream = fs.createReadStream(sourceFile); + const destFile = path.join(os.tmpdir(), uuid.v4() + '.log.xz'); + // console.log('destFile', destFile); + const xzStream = new compressing.xz.FileStream(); + const destStream = fs.createWriteStream(destFile); + pump(sourceStream, xzStream, destStream, err => { + assert(!err); + assert(fs.existsSync(destFile)); + done(); + }); + }); + + it('should compress according to file path', done => { + const sourceFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); + const destFile = path.join(os.tmpdir(), uuid.v4() + '.log.xz'); + // console.log('destFile', destFile); + const xzStream = new compressing.xz.FileStream({ source: sourceFile }); + const destStream = fs.createWriteStream(destFile); + pump(xzStream, destStream, err => { + assert(!err); + assert(fs.existsSync(destFile)); + done(); + }); + }); + + it('should compress file into Buffer', async () => { + const sourceFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); + const xzStream = new compressing.xz.FileStream({ source: sourceFile }); + const xzChunks = []; + for await (const chunk of xzStream) { + xzChunks.push(chunk); + } + + const destFile = path.join(os.tmpdir(), uuid.v4() + '.log.xz'); + await fs.promises.writeFile(destFile, Buffer.concat(xzChunks)); + // console.log(destFile); + }); + + it('should compress buffer', done => { + const sourceFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); + const sourceBuffer = fs.readFileSync(sourceFile); + const destFile = path.join(os.tmpdir(), uuid.v4() + '.log.xz'); + // console.log('destFile', destFile); + const destStream = fs.createWriteStream(destFile); + const xzStream = new compressing.xz.FileStream({ source: sourceBuffer }); + pump(xzStream, destStream, err => { + assert(!err); + assert(fs.existsSync(destFile)); + done(); + }); + + }); + + it('should compress stream', done => { + const sourceFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); + const sourceStream = fs.createReadStream(sourceFile); + const destFile = path.join(os.tmpdir(), uuid.v4() + '.log.xz'); + // console.log('destFile', destFile); + const destStream = fs.createWriteStream(destFile); + const xzStream = new compressing.xz.FileStream({ source: sourceStream }); + pump(xzStream, destStream, err => { + assert(!err); + assert(fs.existsSync(destFile)); + done(); + }); + }); + + it('should compress with custom level', done => { + const sourceFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); + const destFile = path.join(os.tmpdir(), uuid.v4() + '.log.xz'); + const xzStream = new compressing.xz.FileStream({ + source: sourceFile, + level: 6, + }); + const destStream = fs.createWriteStream(destFile); + pump(xzStream, destStream, err => { + assert(!err); + assert(fs.existsSync(destFile)); + done(); + }); + }); + + it('should emit error if sourceFile does not exit', done => { + const sourceFile = 'file-not-exist'; + const xzStream = new compressing.xz.FileStream({ source: sourceFile }); + xzStream.on('error', err => { + assert(err); + done(); + }); + }); + + it('should emit error if sourceStream emit error', done => { + const sourceFile = 'file-not-exist'; + const sourceStream = fs.createReadStream(sourceFile); + const xzStream = new compressing.xz.FileStream({ source: sourceStream }); + xzStream.on('error', err => { + assert(err && err.code === 'ENOENT'); + done(); + }); + }); + +}); diff --git a/test/xz/index.test.js b/test/xz/index.test.js new file mode 100644 index 0000000..6c3488a --- /dev/null +++ b/test/xz/index.test.js @@ -0,0 +1,151 @@ +'use strict'; + +const fs = require('fs'); +const os = require('os'); +const path = require('path'); +const uuid = require('uuid'); +const compressing = require('../..'); +const assert = require('assert'); +const isWindows = os.platform() === 'win32'; + +describe('test/xz/index.test.js', () => { + describe('xz.compressFile()', () => { + it('xz.compressFile(file, stream)', async () => { + const sourceFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); + const destFile = path.join(os.tmpdir(), uuid.v4() + '.log.xz'); + // console.log('destFile', destFile); + const fileStream = fs.createWriteStream(destFile); + await compressing.xz.compressFile(sourceFile, fileStream); + assert(fs.existsSync(destFile)); + }); + + it('xz.compressFile(file, destStream) should error if destStream emit error', async () => { + const sourceFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); + const destFile = path.join(os.tmpdir(), uuid.v4() + '.xz'); + const fileStream = fs.createWriteStream(destFile); + setImmediate(() => fileStream.emit('error', new Error('xx'))); + + let err; + try { + await compressing.xz.compressFile(sourceFile, fileStream); + } catch (e) { + err = e; + } + assert(err && err.message === 'xx'); + }); + + it('xz.compressFile(buffer, stream)', async () => { + const sourceFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); + const sourceBuffer = fs.readFileSync(sourceFile); + const destFile = path.join(os.tmpdir(), uuid.v4() + '.log.xz'); + // console.log('destFile', destFile); + const fileStream = fs.createWriteStream(destFile); + await compressing.xz.compressFile(sourceBuffer, fileStream); + assert(fs.existsSync(destFile)); + }); + + it('xz.compressFile(sourceStream, destStream)', async () => { + const sourceFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); + const sourceStream = fs.createReadStream(sourceFile); + const destFile = path.join(os.tmpdir(), uuid.v4() + '.log.xz'); + // console.log('destFile', destFile); + const fileStream = fs.createWriteStream(destFile); + await compressing.xz.compressFile(sourceStream, fileStream); + assert(fs.existsSync(destFile)); + }); + + it('xz.compressFile(file, stream) with custom level', async () => { + const sourceFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); + const destFile = path.join(os.tmpdir(), uuid.v4() + '.log.xz'); + const fileStream = fs.createWriteStream(destFile); + await compressing.xz.compressFile(sourceFile, fileStream, { level: 9 }); + assert(fs.existsSync(destFile)); + }); + }); + + describe('xz.uncompress()', () => { + let compressedFile; + + before(async () => { + // Create a compressed file for testing + const sourceFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); + compressedFile = path.join(os.tmpdir(), 'test-xx.log.xz'); + const fileStream = fs.createWriteStream(compressedFile); + await compressing.xz.compressFile(sourceFile, fileStream); + }); + + it('xz.uncompress(sourceFile, destStream)', async () => { + const originalFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); + const destFile = path.join(os.tmpdir(), uuid.v4() + '.log'); + const fileStream = fs.createWriteStream(destFile); + await compressing.xz.uncompress(compressedFile, fileStream); + assert(fs.existsSync(destFile)); + if (!isWindows) { + // EOL not equal to linux + assert(fs.readFileSync(destFile, 'utf8') === fs.readFileSync(originalFile, 'utf8')); + } + }); + + it('xz.uncompress(sourceStream, destStream)', async () => { + const sourceStream = fs.createReadStream(compressedFile); + const originalFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); + const destFile = path.join(os.tmpdir(), uuid.v4() + '.log'); + const fileStream = fs.createWriteStream(destFile); + await compressing.xz.uncompress(sourceStream, fileStream); + assert(fs.existsSync(destFile)); + if (!isWindows) { + // EOL not equal to linux + assert(fs.readFileSync(destFile, 'utf8') === fs.readFileSync(originalFile, 'utf8')); + } + }); + + it('xz.uncompress(sourceStream, destFile)', async () => { + const sourceStream = fs.createReadStream(compressedFile); + const originalFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); + const destFile = path.join(os.tmpdir(), uuid.v4() + '.log'); + await compressing.xz.uncompress(sourceStream, destFile); + assert(fs.existsSync(destFile)); + if (!isWindows) { + // EOL not equal to linux + assert(fs.readFileSync(destFile, 'utf8') === fs.readFileSync(originalFile, 'utf8')); + } + }); + + it('xz.uncompress(sourceFile, destFile)', async () => { + const originalFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); + const destFile = path.join(os.tmpdir(), uuid.v4() + '.log'); + await compressing.xz.uncompress(compressedFile, destFile); + assert(fs.existsSync(destFile)); + if (!isWindows) { + // EOL not equal to linux + assert(fs.readFileSync(destFile, 'utf8') === fs.readFileSync(originalFile, 'utf8')); + } + }); + + it('xz.uncompress(buffer, destFile)', async () => { + const sourceBuffer = fs.readFileSync(compressedFile); + const originalFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); + const destFile = path.join(os.tmpdir(), uuid.v4() + '.log'); + await compressing.xz.uncompress(sourceBuffer, destFile); + assert(fs.existsSync(destFile)); + if (!isWindows) { + // EOL not equal to linux + assert(fs.readFileSync(destFile, 'utf8') === fs.readFileSync(originalFile, 'utf8')); + } + }); + + it('xz.uncompress should error if destStream emit error', async () => { + const destFile = path.join(os.tmpdir(), uuid.v4() + '.log'); + const fileStream = fs.createWriteStream(destFile); + setImmediate(() => fileStream.emit('error', new Error('write error'))); + + let err; + try { + await compressing.xz.uncompress(compressedFile, fileStream); + } catch (e) { + err = e; + } + assert(err && err.message === 'write error'); + }); + }); +}); diff --git a/test/xz/uncompress_stream.test.js b/test/xz/uncompress_stream.test.js new file mode 100644 index 0000000..ce1090c --- /dev/null +++ b/test/xz/uncompress_stream.test.js @@ -0,0 +1,150 @@ +const fs = require('fs'); +const mm = require('mm'); +const os = require('os'); +const uuid = require('uuid'); +const path = require('path'); +const assert = require('assert'); +const { pipeline: pump } = require('stream'); +const streamifier = require('streamifier'); +const compressing = require('../..'); + +const originalFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); +const isWindows = os.platform() === 'win32'; + +// Helper function to create XZ compressed file for testing +function createXzFile() { + const sourceFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); + const destFile = path.join(os.tmpdir(), 'test-xx.log.xz'); + + return new Promise((resolve, reject) => { + const sourceStream = fs.createReadStream(sourceFile); + const xzStream = new compressing.xz.FileStream(); + const destStream = fs.createWriteStream(destFile); + + pump(sourceStream, xzStream, destStream, err => { + if (err) reject(err); + else resolve(destFile); + }); + }); +} + +describe('test/xz/uncompress_stream.test.js', () => { + let sourceFile; + + before(async () => { + sourceFile = await createXzFile(); + }); + + afterEach(mm.restore); + + it('should be transform stream', done => { + const destFile = path.join(os.tmpdir(), uuid.v4() + '.log'); + + const sourceStream = fs.createReadStream(sourceFile); + const uncompressStream = new compressing.xz.UncompressStream(); + const destStream = fs.createWriteStream(destFile); + pump(sourceStream, uncompressStream, destStream, err => { + assert(!err); + assert(fs.existsSync(destFile)); + const originalFileBuffer = fs.readFileSync(originalFile); + const destFileBuffer = fs.readFileSync(destFile); + assert.equal(destFileBuffer.length, originalFileBuffer.length); + if (!isWindows) { + // EOL not equal to linux + assert.equal(destFileBuffer.toString('utf8'), originalFileBuffer.toString('utf8')); + } + done(); + }); + }); + + it('should uncompress according to file path', done => { + const destFile = path.join(os.tmpdir(), uuid.v4() + '.log'); + + const uncompressStream = new compressing.xz.UncompressStream({ source: sourceFile }); + const destStream = fs.createWriteStream(destFile); + pump(uncompressStream, destStream, err => { + assert(!err); + assert(fs.existsSync(destFile)); + const originalFileBuffer = fs.readFileSync(originalFile); + const destFileBuffer = fs.readFileSync(destFile); + assert.equal(destFileBuffer.length, originalFileBuffer.length); + if (!isWindows) { + assert.equal(destFileBuffer.toString('utf8'), originalFileBuffer.toString('utf8')); + } + done(); + }); + }); + + it('should uncompress buffer', done => { + const sourceBuffer = fs.readFileSync(sourceFile); + const destFile = path.join(os.tmpdir(), uuid.v4() + '.log'); + + const destStream = fs.createWriteStream(destFile); + const uncompressStream = new compressing.xz.UncompressStream({ source: sourceBuffer }); + pump(uncompressStream, destStream, err => { + assert(!err); + assert(fs.existsSync(destFile)); + const originalFileBuffer = fs.readFileSync(originalFile); + const destFileBuffer = fs.readFileSync(destFile); + assert.equal(destFileBuffer.length, originalFileBuffer.length); + if (!isWindows) { + assert.equal(destFileBuffer.toString('utf8'), originalFileBuffer.toString('utf8')); + } + done(); + }); + }); + + it('should uncompress stream', done => { + const sourceStream = fs.createReadStream(sourceFile); + const destFile = path.join(os.tmpdir(), uuid.v4() + '.log'); + + const destStream = fs.createWriteStream(destFile); + const uncompressStream = new compressing.xz.UncompressStream({ source: sourceStream }); + pump(uncompressStream, destStream, err => { + assert(!err); + assert(fs.existsSync(destFile)); + const originalFileBuffer = fs.readFileSync(originalFile); + const destFileBuffer = fs.readFileSync(destFile); + assert.equal(destFileBuffer.length, originalFileBuffer.length); + if (!isWindows) { + assert.equal(destFileBuffer.toString('utf8'), originalFileBuffer.toString('utf8')); + } + done(); + }); + }); + + it('should emit error if sourceFile does not exit', done => { + const sourceFile = 'file-not-exist'; + const uncompressStream = new compressing.xz.UncompressStream({ source: sourceFile }); + uncompressStream.on('error', err => { + assert(err); + done(); + }); + }); + + it('should emit error if sourceStream emit error', done => { + const sourceFile = 'file-not-exist'; + const sourceStream = fs.createReadStream(sourceFile); + const uncompressStream = new compressing.xz.UncompressStream({ source: sourceStream }); + uncompressStream.on('error', err => { + assert(err && err.code === 'ENOENT'); + done(); + }); + }); + + it('should emit error if stream created by streamifier.createReadStream emit error', done => { + const original = streamifier.createReadStream; + mm(streamifier, 'createReadStream', function() { + const result = original.apply(streamifier, arguments); + setImmediate(() => result.emit('error', 'mockError')); + return result; + }); + const sourceBuffer = fs.readFileSync(sourceFile); + const uncompressStream = new compressing.xz.UncompressStream({ source: sourceBuffer }); + uncompressStream.on('error', err => { + assert(err === 'mockError'); + done(); + }); + }); + +});