Skip to content

Commit

Permalink
Merge pull request #270 from cloudant/264-nano-stream-nocallback
Browse files Browse the repository at this point in the history
264 nano stream nocallback
  • Loading branch information
ricellis authored Mar 14, 2019
2 parents 0ab0195 + 755be39 commit 9bbd159
Show file tree
Hide file tree
Showing 4 changed files with 900 additions and 603 deletions.
4 changes: 2 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# 2.4.0
# UNRELEASED

- [NEW] Added request timeout option. Set via env var `COUCH_REQUEST_TIMEOUT`,
as CLI option `--request-timeout`, or programmatically via
Expand All @@ -7,7 +7,7 @@
URL validation error messages.
- [IMPROVED] Documentation, help text and log warnings for invalid options in
"shallow" mode.
- [UPGRADED] Increased nodejs-cloudant dependency to 3.x.x.
- [UPGRADED] Moved nodejs-cloudant dependency to 4.x.x.

# 2.3.1 (2018-06-15)

Expand Down
56 changes: 42 additions & 14 deletions includes/writer.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,57 @@ module.exports = function(db, bufferSize, parallelism, ee) {
}

// Stream the payload through a zip stream to the server
var payloadStream = new stream.PassThrough();
const payloadStream = new stream.PassThrough();
payloadStream.end(Buffer.from(JSON.stringify(payload), 'utf8'));
var zipstream = zlib.createGzip();
const zipstream = zlib.createGzip();

// Class for streaming _bulk_docs responses into
// In general the response is [] or a small error/reason JSON object
// so it is OK to have this in memory.
class ResponseWriteable extends stream.Writable {
constructor(options) {
super(options);
this.data = [];
}

_write(chunk, encoding, callback) {
this.data.push(chunk);
callback();
}

asJson() {
return JSON.parse(Buffer.concat(this.data).toString());
}
}

if (!didError) {
var req = db.server.request({
var response;
const responseBody = new ResponseWriteable();
const req = db.server.request({
db: db.config.db,
path: '_bulk_docs',
method: 'POST',
headers: { 'content-encoding': 'gzip' },
stream: true
}, function(err) {
err = error.convertResponseError(err);
if (err) {
debug(`Error writing docs ${err.name} ${err.message}`);
cb(err, payload);
} else {
written += payload.docs.length;
writer.emit('restored', { documents: payload.docs.length, total: written });
cb();
}
});
})
.on('response', function(resp) {
response = resp;
})
.on('end', function() {
if (response.statusCode >= 400) {
const err = error.convertResponseError(Object.assign({}, response, responseBody.asJson()));
debug(`Error writing docs ${err.name} ${err.message}`);
cb(err, payload);
} else {
written += payload.docs.length;
writer.emit('restored', { documents: payload.docs.length, total: written });
cb();
}
});
// Pipe the payload into the request object to POST to _bulk_docs
payloadStream.pipe(zipstream).pipe(req);
// Pipe the request object's response into our bulkDocsResponse
req.pipe(responseBody);
}
}, parallelism);

Expand Down
Loading

0 comments on commit 9bbd159

Please sign in to comment.