-
Notifications
You must be signed in to change notification settings - Fork 34
feat: add XZ compression and decompression support #115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
'use strict'; | ||
|
||
const fs = require('fs'); | ||
const lzma = require('lzma-native'); | ||
const utils = require('../utils'); | ||
const streamifier = require('streamifier'); | ||
|
||
class XzFileStream extends lzma.Compressor { | ||
constructor(opts) { | ||
opts = opts || {}; | ||
const lzmaOpts = { | ||
preset: opts.preset || 6, | ||
threads: opts.threads || 0, | ||
}; | ||
super(lzmaOpts); | ||
|
||
const sourceType = utils.sourceType(opts.source); | ||
|
||
if (sourceType === 'file') { | ||
const stream = fs.createReadStream(opts.source, opts.fs); | ||
stream.on('error', err => this.emit('error', err)); | ||
stream.pipe(this); | ||
return; | ||
} | ||
|
||
if (sourceType === 'buffer') { | ||
const stream = streamifier.createReadStream(opts.source, opts.streamifier); | ||
stream.on('error', err => this.emit('error', err)); | ||
stream.pipe(this); | ||
return; | ||
} | ||
|
||
if (sourceType === 'stream') { | ||
opts.source.on('error', err => this.emit('error', err)); | ||
opts.source.pipe(this); | ||
} | ||
Comment on lines
+17
to
+36
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic for handling different source types ( |
||
|
||
// else undefined: do nothing | ||
} | ||
} | ||
|
||
module.exports = XzFileStream; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
'use strict'; | ||
|
||
const utils = require('../utils'); | ||
const XzFileStream = require('./file_stream'); | ||
const XzUncompressStream = require('./uncompress_stream'); | ||
|
||
exports.FileStream = XzFileStream; | ||
exports.UncompressStream = XzUncompressStream; | ||
exports.compressFile = utils.makeFileProcessFn(XzFileStream); | ||
exports.uncompress = utils.makeFileProcessFn(XzUncompressStream); | ||
exports.decompress = utils.makeFileProcessFn(XzUncompressStream); |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,38 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
'use strict'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
const fs = require('fs'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
const lzma = require('lzma-native'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
const utils = require('../utils'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
const streamifier = require('streamifier'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
class XzUncompressStream extends lzma.Decompressor { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
constructor(opts) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
opts = opts || {}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
super(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
const sourceType = utils.sourceType(opts.source); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (sourceType === 'file') { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
const stream = fs.createReadStream(opts.source, opts.fs); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
stream.on('error', err => this.emit('error', err)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
stream.pipe(this); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (sourceType === 'buffer') { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
const stream = streamifier.createReadStream(opts.source, opts.streamifier); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
stream.on('error', err => this.emit('error', err)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
stream.pipe(this); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (sourceType === 'stream') { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
opts.source.on('error', err => this.emit('error', err)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
opts.source.pipe(this); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+13
to
+32
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Comment on lines
+15
to
+32
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Prefer destroying on upstream errors to ensure teardown and proper pipeline propagation Forwarding errors with this.emit('error', err) works, but using this.destroy(err) tears down the decompressor and guarantees pipeline() propagates the error cleanly and promptly. Apply across all three source types for consistency. if (sourceType === 'file') {
const stream = fs.createReadStream(opts.source, opts.fs);
- stream.on('error', err => this.emit('error', err));
+ stream.on('error', err => this.destroy(err));
stream.pipe(this);
return;
}
if (sourceType === 'buffer') {
const stream = streamifier.createReadStream(opts.source, opts.streamifier);
- stream.on('error', err => this.emit('error', err));
+ stream.on('error', err => this.destroy(err));
stream.pipe(this);
return;
}
if (sourceType === 'stream') {
- opts.source.on('error', err => this.emit('error', err));
+ opts.source.on('error', err => this.destroy(err));
opts.source.pipe(this);
+ return;
} 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// else: waiting to be piped | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
module.exports = XzUncompressStream; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
const fs = require('fs'); | ||
const os = require('os'); | ||
const path = require('path'); | ||
const uuid = require('uuid'); | ||
const { pipeline: pump } = require('stream'); | ||
const compressing = require('../..'); | ||
const assert = require('assert'); | ||
|
||
describe('test/xz/file_stream.test.js', () => { | ||
it('should be a transform stream', done => { | ||
const sourceFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); | ||
const sourceStream = fs.createReadStream(sourceFile); | ||
const destFile = path.join(os.tmpdir(), uuid.v4() + '.log.xz'); | ||
// console.log('destFile', destFile); | ||
fengmk2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
const xzStream = new compressing.xz.FileStream(); | ||
const destStream = fs.createWriteStream(destFile); | ||
pump(sourceStream, xzStream, destStream, err => { | ||
assert(!err); | ||
assert(fs.existsSync(destFile)); | ||
done(); | ||
}); | ||
}); | ||
Comment on lines
+10
to
+22
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test uses the it('should be a transform stream', async () => {
const sourceFile = path.join(__dirname, '..', 'fixtures', 'xx.log');
const sourceStream = fs.createReadStream(sourceFile);
const destFile = path.join(os.tmpdir(), uuid.v4() + '.log.xz');
// console.log('destFile', destFile);
const xzStream = new compressing.xz.FileStream();
const destStream = fs.createWriteStream(destFile);
await new Promise((resolve, reject) => {
pump(sourceStream, xzStream, destStream, err => {
if (err) return reject(err);
resolve();
});
});
assert(fs.existsSync(destFile));
}); |
||
|
||
it('should compress according to file path', done => { | ||
const sourceFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); | ||
const destFile = path.join(os.tmpdir(), uuid.v4() + '.log.xz'); | ||
// console.log('destFile', destFile); | ||
const xzStream = new compressing.xz.FileStream({ source: sourceFile }); | ||
const destStream = fs.createWriteStream(destFile); | ||
pump(xzStream, destStream, err => { | ||
assert(!err); | ||
assert(fs.existsSync(destFile)); | ||
done(); | ||
}); | ||
}); | ||
|
||
it('should compress file into Buffer', async () => { | ||
const sourceFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); | ||
const xzStream = new compressing.xz.FileStream({ source: sourceFile }); | ||
const xzChunks = []; | ||
for await (const chunk of xzStream) { | ||
xzChunks.push(chunk); | ||
} | ||
|
||
const destFile = path.join(os.tmpdir(), uuid.v4() + '.log.xz'); | ||
await fs.promises.writeFile(destFile, Buffer.concat(xzChunks)); | ||
// console.log(destFile); | ||
}); | ||
|
||
it('should compress buffer', done => { | ||
const sourceFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); | ||
const sourceBuffer = fs.readFileSync(sourceFile); | ||
const destFile = path.join(os.tmpdir(), uuid.v4() + '.log.xz'); | ||
// console.log('destFile', destFile); | ||
const destStream = fs.createWriteStream(destFile); | ||
const xzStream = new compressing.xz.FileStream({ source: sourceBuffer }); | ||
pump(xzStream, destStream, err => { | ||
assert(!err); | ||
assert(fs.existsSync(destFile)); | ||
done(); | ||
}); | ||
|
||
}); | ||
|
||
it('should compress stream', done => { | ||
const sourceFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); | ||
const sourceStream = fs.createReadStream(sourceFile); | ||
const destFile = path.join(os.tmpdir(), uuid.v4() + '.log.xz'); | ||
// console.log('destFile', destFile); | ||
const destStream = fs.createWriteStream(destFile); | ||
const xzStream = new compressing.xz.FileStream({ source: sourceStream }); | ||
pump(xzStream, destStream, err => { | ||
assert(!err); | ||
assert(fs.existsSync(destFile)); | ||
done(); | ||
}); | ||
}); | ||
|
||
it('should compress with custom level', done => { | ||
const sourceFile = path.join(__dirname, '..', 'fixtures', 'xx.log'); | ||
const destFile = path.join(os.tmpdir(), uuid.v4() + '.log.xz'); | ||
const xzStream = new compressing.xz.FileStream({ | ||
source: sourceFile, | ||
level: 6, | ||
}); | ||
const destStream = fs.createWriteStream(destFile); | ||
pump(xzStream, destStream, err => { | ||
assert(!err); | ||
assert(fs.existsSync(destFile)); | ||
done(); | ||
}); | ||
}); | ||
|
||
it('should emit error if sourceFile does not exit', done => { | ||
const sourceFile = 'file-not-exist'; | ||
const xzStream = new compressing.xz.FileStream({ source: sourceFile }); | ||
xzStream.on('error', err => { | ||
assert(err); | ||
done(); | ||
}); | ||
}); | ||
|
||
it('should emit error if sourceStream emit error', done => { | ||
const sourceFile = 'file-not-exist'; | ||
const sourceStream = fs.createReadStream(sourceFile); | ||
const xzStream = new compressing.xz.FileStream({ source: sourceStream }); | ||
xzStream.on('error', err => { | ||
assert(err && err.code === 'ENOENT'); | ||
done(); | ||
}); | ||
}); | ||
|
||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For better type safety, the
opts
parameter for these functions should be more specific thanany
.compressFile
acceptspreset
andthreads
options, whileuncompress
anddecompress
don't have specific options. Please consider defining the types for these options inline to improve developer experience and prevent potential bugs.