Skip to content

Commit

Permalink
Merge pull request #2 from RedbirdHQ/adjust-read-capacity-by-wanted-time
Browse files Browse the repository at this point in the history
Adjust read capacity by wanted time
  • Loading branch information
oh14 authored Dec 4, 2017
2 parents a64fe6b + e4e63f8 commit b01e466
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 38 deletions.
25 changes: 20 additions & 5 deletions bin/dynamo-backup-to-s3
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ program
.option('-b, --bucket <name>', 'S3 bucket to store backups')
.option('-a, --acl <acl>', 'The canned ACL to apply to the object')
.option('-s, --stop-on-failure', 'specify the reporter to use', parseBool, true)
.option('-c, --consistent-read', 'Enable "strongly consistent" reads, uses 2x more read capacities than "evantually consistent"')
.option('-r, --read-percentage <decimal>', 'specific the percentage of Dynamo read capacity to use while backing up. default .25 (25%)', parseFloat, .25)
.option('-m, --max-minutes-per-table <float>', 'The maximum time in minutes we want to backup one table (±10%)', parseFloat)
.option('-x, --excluded-tables <list>', 'exclude these tables from backup', list)
.option('-i, --included-tables <list>', 'only backup these tables', list)
.option('-p, --backup-path <name>', 'backup path to store table dumps in. default is DynamoDB-backup-YYYY-MM-DD-HH-mm-ss')
.option('-e, --base64-encode-binary', 'encode binary fields in base64 before exporting')
.option('-d, --save-data-pipeline-format', 'save in format compatible with the AWS Data Pipeline import. Default to false (save as exported by DynamoDb)')
.option('--server-side-encryption', 'enable server side encryption on S3. either AES256 or aws:kms (disabled by default)')
.option('--server-side-encryption <sse>', 'enable server side encryption on S3. either AES256 or aws:kms (disabled by default)')
.option('--aws-key <key>', 'AWS access key. Will use AWS_ACCESS_KEY_ID env var if --aws-key not set')
.option('--aws-secret <secret>', 'AWS secret key. Will use AWS_SECRET_ACCESS_KEY env var if --aws-secret not set')
.option('--aws-region <region>', 'AWS region. Will use AWS_DEFAULT_REGION env var if --aws-region not set')
Expand All @@ -47,19 +49,24 @@ var dynamoBackup = new DynamoBackup({
excludedTables: program.excludedTables,
includedTables: program.includedTables,
readPercentage: program.readPercentage,
consistentRead: program.consistentRead,
maxMinutesPerTable: program.maxMinutesPerTable,
stopOnFailure: program.stopOnFailure,
base64Binary: program.base64EncodeBinary,
saveDataPipelineFormat: program.saveDataPipelineFormat,
serverSideEncryption: program.serverSideEncryption,
});

dynamoBackup.on('error', function(data) {
console.log('Error backing up ' + data.tableName);
console.log(data.error);
dynamoBackup.on('error', function(tableName, error) {
console.log('Error backing up %s', tableName);
console.log(error);
if (program.stopOnFailure) {
process.exit(-1);
}
});
dynamoBackup.on('warning', function(tableName, message) {
console.log('Warning on %s: %s', tableName, message);
});

dynamoBackup.on('start-backup', function(tableName) {
runTimes[tableName] = moment();
Expand All @@ -75,4 +82,12 @@ dynamoBackup.on('end-backup', function(tableName) {
dynamoBackup.backupAllTables(function() {
console.log('Finished backing up DynamoDB');
process.exit(0);
});
});

process.on('SIGINT', function() {
console.log('Restoring original capacities before exit');
dynamoBackup.restoreOriginalCapacities(function(){
console.log('Original capacities restored');
process.exit();
});
});
2 changes: 1 addition & 1 deletion bin/dynamo-restore-from-s3
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ program
.option('-rc, --readcapacity <units>', 'Read Units for new table (when finished). Default is 5.')
.option('-wc, --writecapacity <units>', 'Write Units for new table (when finished). Default is --concurrency.')
.option('-sf, --stop-on-failure', 'Stop process when the same batch fails to restore 3 times. Defaults to false.', true)
.option('--server-side-encryption', 'enable server side encryption on S3. either AES256 or aws:kms (disabled by default)')
.option('--server-side-encryption <sse>', 'enable server side encryption on S3. either AES256 or aws:kms (disabled by default)')
.option('--aws-key <key>', 'AWS access key. Will use AWS_ACCESS_KEY_ID env var if --aws-key not set')
.option('--aws-secret <secret>', 'AWS secret key. Will use AWS_SECRET_ACCESS_KEY env var if --aws-secret not set')
.option('--aws-region <region>', 'AWS region. Will use AWS_DEFAULT_REGION env var if --aws-region not set')
Expand Down
213 changes: 182 additions & 31 deletions lib/dynamo-backup.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,25 @@ var Uploader = require('s3-streaming-upload').Uploader;

var ReadableStream = require('./readable-stream');

const READ_CAPACITY_SIZE = 4096; // 4KB
const SINGLE_SCAN_MAX_SIZE = 1048576; // 1MB
const ESTIMATED_1MB_WRITE_TIME = 0.035; // ~35ms for 1MB response (estimation)
const RESPONSE_PARSING_AVG_TIME = 0.3; // time is in milliseconds, 0.3ms is the average duration to parse response result
const STREAM_HIGH_WATERMARK = 16384; // 16KB of highWaterMark of stream for buffering

function DynamoBackup(options) {
var params = {};
options = options || {};
this.excludedTables = options.excludedTables || [];
this.includedTables = options.includedTables;
this.tablesToBackup = [];
this.consistentRead = !!options.consistentRead; // strongly consistent
this.consistentRatio = this.consistentRead ? 1 : 0.5; // eventually consistent consumes 1/2 of a read capacity unit.
this.singleCapacitySize = READ_CAPACITY_SIZE / this.consistentRatio; // 4KB for consistent, 8KB for eventually consistent
this.maxSingleScanReadCapacity = SINGLE_SCAN_MAX_SIZE / this.singleCapacitySize; // 1MB per scan page
this.readPercentage = options.readPercentage || .25;
this.maxSecondsPerTable = 60 * (options.maxMinutesPerTable || Infinity);
this.tablesCapacities = {};
this.backupPath = options.backupPath;
this.bucket = options.bucket;
this.stopOnFailure = options.stopOnFailure || false;
Expand Down Expand Up @@ -84,10 +97,7 @@ DynamoBackup.prototype.backupTable = function (tableName, backupPath, callback)
self.emit('start-backup', tableName, startTime);
upload.send(function (err) {
if (err) {
self.emit('error', {
table: tableName,
err: err
});
self.emit('error', tableName, err);
}
var endTime = moment.utc();
var backupDuration = new dateRange(startTime, endTime);
Expand Down Expand Up @@ -118,10 +128,7 @@ DynamoBackup.prototype.backupTable = function (tableName, backupPath, callback)
function (err) {
stream.end();
if (err) {
self.emit('error', {
table: tableName,
err: err
});
self.emit('error', tableName, err);
}
}
);
Expand All @@ -138,6 +145,7 @@ DynamoBackup.prototype.backupAllTables = function (callback) {
var includedTables = self.includedTables || tables;
tables = _.difference(tables, self.excludedTables);
tables = _.intersection(tables, includedTables);
self.tablesToBackup = tables;

async.each(tables,
function (tableName, done) {
Expand All @@ -164,43 +172,125 @@ DynamoBackup.prototype._getBackupPath = function () {
DynamoBackup.prototype._copyTable = function (tableName, itemsReceived, callback) {
var self = this;
var ddb = new AWS.DynamoDB();
var startDescTime = new Date().getTime();
ddb.describeTable({ TableName: tableName }, function (err, data) {
if (err) {
return callback(err);
}
var descTime = new Date().getTime() - startDescTime - RESPONSE_PARSING_AVG_TIME;
var descSize = this.httpResponse.stream.socket.bytesRead;
var speed = 1000 * descSize / descTime;

const Table = data.Table;
var originalReadCapacity = Table.ProvisionedThroughput.ReadCapacityUnits;
var originalWriteCapacity = Table.ProvisionedThroughput.WriteCapacityUnits; // needed for future update
var readPercentage = self.readPercentage;
var limit = Math.max((data.Table.ProvisionedThroughput.ReadCapacityUnits * readPercentage) | 0, 1);
var maxCapacityUnits = Math.max(Math.floor(originalReadCapacity * readPercentage), 1);
var tableCapacityConsumption = Table.TableSizeBytes / self.singleCapacitySize;
var averageItemSize = Math.ceil(Table.TableSizeBytes / Table.ItemCount);

var minimumNbScans = Math.ceil(Table.TableSizeBytes / SINGLE_SCAN_MAX_SIZE); // 1 scan can't return more than 1MB of data
var estimatedWriteTime = minimumNbScans * ESTIMATED_1MB_WRITE_TIME * self.tablesToBackup.length; // with tables concurrency

// estimate the backup duration with the current read capacity
var nbSecondsToBackup = Math.ceil(tableCapacityConsumption / maxCapacityUnits) + estimatedWriteTime;

if( nbSecondsToBackup <= self.maxSecondsPerTable ) {
self._streamItems(tableName, null, 0, maxCapacityUnits, averageItemSize, itemsReceived, callback);
return;
}

var estimatedFetchRatio = speed / STREAM_HIGH_WATERMARK ; // estimation ratio: if >1: faster, <1: slower.
var maxSingleScanReadCapacity = self.maxSingleScanReadCapacity * estimatedFetchRatio;

self._streamItems(tableName, null, limit, itemsReceived, callback);
// Calculate the best read capacity, depending of wanted backup time
var newReadCapacity = tableCapacityConsumption / (self.maxSecondsPerTable - estimatedWriteTime);
if( newReadCapacity > maxSingleScanReadCapacity ) {
newReadCapacity = maxSingleScanReadCapacity;
nbSecondsToBackup = Math.ceil(tableCapacityConsumption / newReadCapacity) + estimatedWriteTime;
self.emit('warning', tableName, "Can't backup in wanted duration, estimated backup duration: " + Math.ceil(nbSecondsToBackup) + " seconds (±10%).");
}

// update new capacity and scan
newReadCapacity = Math.ceil(newReadCapacity / readPercentage);
self._updateTableCapacities(tableName, newReadCapacity, originalWriteCapacity, function(err, data){
if (err) {
if( err.code != 'ValidationException' && err.code != 'LimitExceededException' ) {
return callback(err);
}
self.emit('warning', tableName, "Can't increase the read capacity value from " + originalReadCapacity + " to " + newReadCapacity);
}
// save old and new capacity, to restore after backup
self.tablesCapacities[tableName] = {
oldRead: originalReadCapacity,
newRead: newReadCapacity,
oldWrite: originalWriteCapacity,
newWrite: originalWriteCapacity,
};
var maxCapacityUnits = Math.max(Math.floor(newReadCapacity * readPercentage), 1);
self._streamItems(tableName, null, 0, maxCapacityUnits, averageItemSize, itemsReceived, function(err) {
self._updateTableOriginalCapacities(tableName, function(){
callback(err); // err is parent error
});
});
});
});
};

DynamoBackup.prototype._streamItems = function fetchItems(tableName, startKey, limit, itemsReceived, callback) {
DynamoBackup.prototype._streamItems = function fetchItems(tableName, startKey, remainingCapacityUnits, maxCapacityUnits, averageItemSize, itemsReceived, callback) {
var self = this;
var ddb = new AWS.DynamoDB();
var params = {
Limit: limit,
ReturnConsumedCapacity: 'NONE',
TableName: tableName
};
if (startKey) {
params.ExclusiveStartKey = startKey;
}
ddb.scan(params, function (err, data) {
if (err) {
return callback(err);
}
var startOfSecond = new Date().getTime();

var usableCapacity = maxCapacityUnits + remainingCapacityUnits;
if (usableCapacity > 0) {
var limit = Math.max(Math.floor((self.singleCapacitySize / averageItemSize) * usableCapacity), 1);
var params = {
Limit: limit,
ConsistentRead: self.consistentRead,
ReturnConsumedCapacity: 'TOTAL',
TableName: tableName
};

if (data.Items.length > 0) {
itemsReceived(data.Items);
if (startKey) {
params.ExclusiveStartKey = startKey;
}

if (!data.LastEvaluatedKey || _.keys(data.LastEvaluatedKey).length === 0) {
return callback();
}
self._streamItems(tableName, data.LastEvaluatedKey, limit, itemsReceived, callback);
});
ddb.scan(params, function (error, data) {
if (error) {
callback(error);
return;
}

if (data.Items.length > 0) {
itemsReceived(data.Items);
}


if (!data.LastEvaluatedKey || _.keys(data.LastEvaluatedKey).length === 0) {
callback();
return;
}

var nextRemainingCapacity = usableCapacity - data.ConsumedCapacity.CapacityUnits;
var duration = new Date().getTime() - startOfSecond;
var timeout = 1000 - duration;
if( timeout < 0 || (maxCapacityUnits*duration/1000) < nextRemainingCapacity) {
// all time are not consumed, or there is too many remaining capacity
nextRemainingCapacity -= maxCapacityUnits*timeout/1000;
timeout = 0;
}

setTimeout(function () {
self._streamItems(tableName, data.LastEvaluatedKey, Math.floor(nextRemainingCapacity), maxCapacityUnits, averageItemSize, itemsReceived, callback);
}, timeout);
});
} else {
// Wait until we have capacity again
setTimeout(() => {
self._streamItems(tableName, startKey, usableCapacity, maxCapacityUnits, averageItemSize, itemsReceived, callback);
}, 1000);
}
};

DynamoBackup.prototype._fetchTables = function (lastTable, tables, callback) {
Expand Down Expand Up @@ -269,4 +359,65 @@ DynamoBackup.prototype._getDataPipelineAttributeValueKey = function (type) {
}
};

module.exports = DynamoBackup;
DynamoBackup.prototype._updateTableCapacities = function(tableName, newReadCapacity, newWriteCapacity, callback) {
var params = {
TableName: tableName,
ProvisionedThroughput: {
ReadCapacityUnits: newReadCapacity,
WriteCapacityUnits: newWriteCapacity
}
};

var ddb = new AWS.DynamoDB();
ddb.updateTable(params, callback);
};

DynamoBackup.prototype._updateTableOriginalCapacities = function(tableName, callback) {
var self = this;
if( !self.tablesCapacities[tableName] ) {
return callback && callback();
}
var originalReadCapacity = self.tablesCapacities[tableName].oldRead;
var currentReadCapacity = self.tablesCapacities[tableName].newRead;
var diffReadApplied = currentReadCapacity - originalReadCapacity;

var originalWriteCapacity = self.tablesCapacities[tableName].oldWrite;
var currentWriteCapacity = self.tablesCapacities[tableName].newWrite;
var diffWriteApplied = currentWriteCapacity - originalWriteCapacity;

var ddb = new AWS.DynamoDB();
ddb.describeTable({ TableName: tableName }, function (err, data) {
if (!err) {
currentReadCapacity = data.Table.ProvisionedThroughput.ReadCapacityUnits;
currentWriteCapacity = data.Table.ProvisionedThroughput.WriteCapacityUnits;
}
var newReadCapacity = Math.max(currentReadCapacity - diffReadApplied, originalReadCapacity);
var newWriteCapacity = Math.max(currentWriteCapacity - diffWriteApplied, originalWriteCapacity);

if( newReadCapacity == currentReadCapacity && newWriteCapacity == currentWriteCapacity ) {
return callback && callback();
}

self._updateTableCapacities(tableName, newReadCapacity, newWriteCapacity, function(err, data){
if(err && err.code == 'LimitExceededException') {
self.emit('warning', tableName, "Can't decrease the read capacity value " + currentReadCapacity + " to " + newReadCapacity);
}
return callback && callback();
});
});
};

DynamoBackup.prototype.restoreOriginalCapacities = function(callback) {
var self = this;
var tables = Object.keys(this.tablesCapacities);
if( 0 == tables.length ) {
return callback && callback();
}
this._updateTableOriginalCapacities(tables[0], function(){
delete self.tablesCapacities[tables[0]];
return self.restoreOriginalCapacities(callback);
});
}

module.exports = DynamoBackup;

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "dynamo-backup-to-s3",
"version": "0.6.2",
"version": "0.6.3",
"author": "Dylan Lingelbach (https://github.com/dylanlingelbach)",
"license": "MIT",
"contributors": [
Expand Down

0 comments on commit b01e466

Please sign in to comment.