Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust read capacity by wanted time #2

Merged
merged 4 commits into from
Dec 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions bin/dynamo-backup-to-s3
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ program
.option('-b, --bucket <name>', 'S3 bucket to store backups')
.option('-a, --acl <acl>', 'The canned ACL to apply to the object')
.option('-s, --stop-on-failure', 'specify the reporter to use', parseBool, true)
.option('-c, --consistent-read', 'Enable "strongly consistent" reads, uses 2x more read capacities than "evantually consistent"')
.option('-r, --read-percentage <decimal>', 'specific the percentage of Dynamo read capacity to use while backing up. default .25 (25%)', parseFloat, .25)
.option('-m, --max-minutes-per-table <float>', 'The maximum time in minutes we want to backup one table (±10%)', parseFloat)
.option('-x, --excluded-tables <list>', 'exclude these tables from backup', list)
.option('-i, --included-tables <list>', 'only backup these tables', list)
.option('-p, --backup-path <name>', 'backup path to store table dumps in. default is DynamoDB-backup-YYYY-MM-DD-HH-mm-ss')
.option('-e, --base64-encode-binary', 'encode binary fields in base64 before exporting')
.option('-d, --save-data-pipeline-format', 'save in format compatible with the AWS Data Pipeline import. Default to false (save as exported by DynamoDb)')
.option('--server-side-encryption', 'enable server side encryption on S3. either AES256 or aws:kms (disabled by default)')
.option('--server-side-encryption <sse>', 'enable server side encryption on S3. either AES256 or aws:kms (disabled by default)')
.option('--aws-key <key>', 'AWS access key. Will use AWS_ACCESS_KEY_ID env var if --aws-key not set')
.option('--aws-secret <secret>', 'AWS secret key. Will use AWS_SECRET_ACCESS_KEY env var if --aws-secret not set')
.option('--aws-region <region>', 'AWS region. Will use AWS_DEFAULT_REGION env var if --aws-region not set')
Expand All @@ -47,19 +49,24 @@ var dynamoBackup = new DynamoBackup({
excludedTables: program.excludedTables,
includedTables: program.includedTables,
readPercentage: program.readPercentage,
consistentRead: program.consistentRead,
maxMinutesPerTable: program.maxMinutesPerTable,
stopOnFailure: program.stopOnFailure,
base64Binary: program.base64EncodeBinary,
saveDataPipelineFormat: program.saveDataPipelineFormat,
serverSideEncryption: program.serverSideEncryption,
});

dynamoBackup.on('error', function(data) {
console.log('Error backing up ' + data.tableName);
console.log(data.error);
dynamoBackup.on('error', function(tableName, error) {
console.log('Error backing up %s', tableName);
console.log(error);
if (program.stopOnFailure) {
process.exit(-1);
}
});
dynamoBackup.on('warning', function(tableName, message) {
console.log('Warning on %s: %s', tableName, message);
});

dynamoBackup.on('start-backup', function(tableName) {
runTimes[tableName] = moment();
Expand All @@ -75,4 +82,12 @@ dynamoBackup.on('end-backup', function(tableName) {
dynamoBackup.backupAllTables(function() {
console.log('Finished backing up DynamoDB');
process.exit(0);
});
});

process.on('SIGINT', function() {
console.log('Restoring original capacities before exit');
dynamoBackup.restoreOriginalCapacities(function(){
console.log('Original capacities restored');
process.exit();
});
});
2 changes: 1 addition & 1 deletion bin/dynamo-restore-from-s3
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ program
.option('-rc, --readcapacity <units>', 'Read Units for new table (when finished). Default is 5.')
.option('-wc, --writecapacity <units>', 'Write Units for new table (when finished). Default is --concurrency.')
.option('-sf, --stop-on-failure', 'Stop process when the same batch fails to restore 3 times. Defaults to false.', true)
.option('--server-side-encryption', 'enable server side encryption on S3. either AES256 or aws:kms (disabled by default)')
.option('--server-side-encryption <sse>', 'enable server side encryption on S3. either AES256 or aws:kms (disabled by default)')
.option('--aws-key <key>', 'AWS access key. Will use AWS_ACCESS_KEY_ID env var if --aws-key not set')
.option('--aws-secret <secret>', 'AWS secret key. Will use AWS_SECRET_ACCESS_KEY env var if --aws-secret not set')
.option('--aws-region <region>', 'AWS region. Will use AWS_DEFAULT_REGION env var if --aws-region not set')
Expand Down
213 changes: 182 additions & 31 deletions lib/dynamo-backup.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,25 @@ var Uploader = require('s3-streaming-upload').Uploader;

var ReadableStream = require('./readable-stream');

const READ_CAPACITY_SIZE = 4096; // 4KB
const SINGLE_SCAN_MAX_SIZE = 1048576; // 1MB
const ESTIMATED_1MB_WRITE_TIME = 0.035; // ~35ms for 1MB response (estimation)
const RESPONSE_PARSING_AVG_TIME = 0.3; // time is in milliseconds, 0.3ms is the average duration to parse response result
const STREAM_HIGH_WATERMARK = 16384; // 16KB of highWaterMark of stream for buffering

function DynamoBackup(options) {
var params = {};
options = options || {};
this.excludedTables = options.excludedTables || [];
this.includedTables = options.includedTables;
this.tablesToBackup = [];
this.consistentRead = !!options.consistentRead; // strongly consistent
this.consistentRatio = this.consistentRead ? 1 : 0.5; // eventually consistent consumes 1/2 of a read capacity unit.
this.singleCapacitySize = READ_CAPACITY_SIZE / this.consistentRatio; // 4KB for consistent, 8KB for eventually consistent
this.maxSingleScanReadCapacity = SINGLE_SCAN_MAX_SIZE / this.singleCapacitySize; // 1MB per scan page
this.readPercentage = options.readPercentage || .25;
this.maxSecondsPerTable = 60 * (options.maxMinutesPerTable || Infinity);
this.tablesCapacities = {};
this.backupPath = options.backupPath;
this.bucket = options.bucket;
this.stopOnFailure = options.stopOnFailure || false;
Expand Down Expand Up @@ -84,10 +97,7 @@ DynamoBackup.prototype.backupTable = function (tableName, backupPath, callback)
self.emit('start-backup', tableName, startTime);
upload.send(function (err) {
if (err) {
self.emit('error', {
table: tableName,
err: err
});
self.emit('error', tableName, err);
}
var endTime = moment.utc();
var backupDuration = new dateRange(startTime, endTime);
Expand Down Expand Up @@ -118,10 +128,7 @@ DynamoBackup.prototype.backupTable = function (tableName, backupPath, callback)
function (err) {
stream.end();
if (err) {
self.emit('error', {
table: tableName,
err: err
});
self.emit('error', tableName, err);
}
}
);
Expand All @@ -138,6 +145,7 @@ DynamoBackup.prototype.backupAllTables = function (callback) {
var includedTables = self.includedTables || tables;
tables = _.difference(tables, self.excludedTables);
tables = _.intersection(tables, includedTables);
self.tablesToBackup = tables;

async.each(tables,
function (tableName, done) {
Expand All @@ -164,43 +172,125 @@ DynamoBackup.prototype._getBackupPath = function () {
DynamoBackup.prototype._copyTable = function (tableName, itemsReceived, callback) {
var self = this;
var ddb = new AWS.DynamoDB();
var startDescTime = new Date().getTime();
ddb.describeTable({ TableName: tableName }, function (err, data) {
if (err) {
return callback(err);
}
var descTime = new Date().getTime() - startDescTime - RESPONSE_PARSING_AVG_TIME;
var descSize = this.httpResponse.stream.socket.bytesRead;
var speed = 1000 * descSize / descTime;

const Table = data.Table;
var originalReadCapacity = Table.ProvisionedThroughput.ReadCapacityUnits;
var originalWriteCapacity = Table.ProvisionedThroughput.WriteCapacityUnits; // needed for future update
var readPercentage = self.readPercentage;
var limit = Math.max((data.Table.ProvisionedThroughput.ReadCapacityUnits * readPercentage) | 0, 1);
var maxCapacityUnits = Math.max(Math.floor(originalReadCapacity * readPercentage), 1);
var tableCapacityConsumption = Table.TableSizeBytes / self.singleCapacitySize;
var averageItemSize = Math.ceil(Table.TableSizeBytes / Table.ItemCount);

var minimumNbScans = Math.ceil(Table.TableSizeBytes / SINGLE_SCAN_MAX_SIZE); // 1 scan can't return more than 1MB of data
var estimatedWriteTime = minimumNbScans * ESTIMATED_1MB_WRITE_TIME * self.tablesToBackup.length; // with tables concurrency

// estimate the backup duration with the current read capacity
var nbSecondsToBackup = Math.ceil(tableCapacityConsumption / maxCapacityUnits) + estimatedWriteTime;

if( nbSecondsToBackup <= self.maxSecondsPerTable ) {
self._streamItems(tableName, null, 0, maxCapacityUnits, averageItemSize, itemsReceived, callback);
return;
}

var estimatedFetchRatio = speed / STREAM_HIGH_WATERMARK ; // estimation ratio: if >1: faster, <1: slower.
var maxSingleScanReadCapacity = self.maxSingleScanReadCapacity * estimatedFetchRatio;

self._streamItems(tableName, null, limit, itemsReceived, callback);
// Calculate the best read capacity, depending of wanted backup time
var newReadCapacity = tableCapacityConsumption / (self.maxSecondsPerTable - estimatedWriteTime);
if( newReadCapacity > maxSingleScanReadCapacity ) {
newReadCapacity = maxSingleScanReadCapacity;
nbSecondsToBackup = Math.ceil(tableCapacityConsumption / newReadCapacity) + estimatedWriteTime;
self.emit('warning', tableName, "Can't backup in wanted duration, estimated backup duration: " + Math.ceil(nbSecondsToBackup) + " seconds (±10%).");
}

// update new capacity and scan
newReadCapacity = Math.ceil(newReadCapacity / readPercentage);
self._updateTableCapacities(tableName, newReadCapacity, originalWriteCapacity, function(err, data){
if (err) {
if( err.code != 'ValidationException' && err.code != 'LimitExceededException' ) {
return callback(err);
}
self.emit('warning', tableName, "Can't increase the read capacity value from " + originalReadCapacity + " to " + newReadCapacity);
}
// save old and new capacity, to restore after backup
self.tablesCapacities[tableName] = {
oldRead: originalReadCapacity,
newRead: newReadCapacity,
oldWrite: originalWriteCapacity,
newWrite: originalWriteCapacity,
};
var maxCapacityUnits = Math.max(Math.floor(newReadCapacity * readPercentage), 1);
self._streamItems(tableName, null, 0, maxCapacityUnits, averageItemSize, itemsReceived, function(err) {
self._updateTableOriginalCapacities(tableName, function(){
callback(err); // err is parent error
});
});
});
});
};

DynamoBackup.prototype._streamItems = function fetchItems(tableName, startKey, limit, itemsReceived, callback) {
DynamoBackup.prototype._streamItems = function fetchItems(tableName, startKey, remainingCapacityUnits, maxCapacityUnits, averageItemSize, itemsReceived, callback) {
var self = this;
var ddb = new AWS.DynamoDB();
var params = {
Limit: limit,
ReturnConsumedCapacity: 'NONE',
TableName: tableName
};
if (startKey) {
params.ExclusiveStartKey = startKey;
}
ddb.scan(params, function (err, data) {
if (err) {
return callback(err);
}
var startOfSecond = new Date().getTime();

var usableCapacity = maxCapacityUnits + remainingCapacityUnits;
if (usableCapacity > 0) {
var limit = Math.max(Math.floor((self.singleCapacitySize / averageItemSize) * usableCapacity), 1);
var params = {
Limit: limit,
ConsistentRead: self.consistentRead,
ReturnConsumedCapacity: 'TOTAL',
TableName: tableName
};

if (data.Items.length > 0) {
itemsReceived(data.Items);
if (startKey) {
params.ExclusiveStartKey = startKey;
}

if (!data.LastEvaluatedKey || _.keys(data.LastEvaluatedKey).length === 0) {
return callback();
}
self._streamItems(tableName, data.LastEvaluatedKey, limit, itemsReceived, callback);
});
ddb.scan(params, function (error, data) {
if (error) {
callback(error);
return;
}

if (data.Items.length > 0) {
itemsReceived(data.Items);
}


if (!data.LastEvaluatedKey || _.keys(data.LastEvaluatedKey).length === 0) {
callback();
return;
}

var nextRemainingCapacity = usableCapacity - data.ConsumedCapacity.CapacityUnits;
var duration = new Date().getTime() - startOfSecond;
var timeout = 1000 - duration;
if( timeout < 0 || (maxCapacityUnits*duration/1000) < nextRemainingCapacity) {
// all time are not consumed, or there is too many remaining capacity
nextRemainingCapacity -= maxCapacityUnits*timeout/1000;
timeout = 0;
}

setTimeout(function () {
self._streamItems(tableName, data.LastEvaluatedKey, Math.floor(nextRemainingCapacity), maxCapacityUnits, averageItemSize, itemsReceived, callback);
}, timeout);
});
} else {
// Wait until we have capacity again
setTimeout(() => {
self._streamItems(tableName, startKey, usableCapacity, maxCapacityUnits, averageItemSize, itemsReceived, callback);
}, 1000);
}
};

DynamoBackup.prototype._fetchTables = function (lastTable, tables, callback) {
Expand Down Expand Up @@ -269,4 +359,65 @@ DynamoBackup.prototype._getDataPipelineAttributeValueKey = function (type) {
}
};

module.exports = DynamoBackup;
DynamoBackup.prototype._updateTableCapacities = function(tableName, newReadCapacity, newWriteCapacity, callback) {
var params = {
TableName: tableName,
ProvisionedThroughput: {
ReadCapacityUnits: newReadCapacity,
WriteCapacityUnits: newWriteCapacity
}
};

var ddb = new AWS.DynamoDB();
ddb.updateTable(params, callback);
};

DynamoBackup.prototype._updateTableOriginalCapacities = function(tableName, callback) {
var self = this;
if( !self.tablesCapacities[tableName] ) {
return callback && callback();
}
var originalReadCapacity = self.tablesCapacities[tableName].oldRead;
var currentReadCapacity = self.tablesCapacities[tableName].newRead;
var diffReadApplied = currentReadCapacity - originalReadCapacity;

var originalWriteCapacity = self.tablesCapacities[tableName].oldWrite;
var currentWriteCapacity = self.tablesCapacities[tableName].newWrite;
var diffWriteApplied = currentWriteCapacity - originalWriteCapacity;

var ddb = new AWS.DynamoDB();
ddb.describeTable({ TableName: tableName }, function (err, data) {
if (!err) {
currentReadCapacity = data.Table.ProvisionedThroughput.ReadCapacityUnits;
currentWriteCapacity = data.Table.ProvisionedThroughput.WriteCapacityUnits;
}
var newReadCapacity = Math.max(currentReadCapacity - diffReadApplied, originalReadCapacity);
var newWriteCapacity = Math.max(currentWriteCapacity - diffWriteApplied, originalWriteCapacity);

if( newReadCapacity == currentReadCapacity && newWriteCapacity == currentWriteCapacity ) {
return callback && callback();
}

self._updateTableCapacities(tableName, newReadCapacity, newWriteCapacity, function(err, data){
if(err && err.code == 'LimitExceededException') {
self.emit('warning', tableName, "Can't decrease the read capacity value " + currentReadCapacity + " to " + newReadCapacity);
}
return callback && callback();
});
});
};

DynamoBackup.prototype.restoreOriginalCapacities = function(callback) {
var self = this;
var tables = Object.keys(this.tablesCapacities);
if( 0 == tables.length ) {
return callback && callback();
}
this._updateTableOriginalCapacities(tables[0], function(){
delete self.tablesCapacities[tables[0]];
return self.restoreOriginalCapacities(callback);
});
}

module.exports = DynamoBackup;

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "dynamo-backup-to-s3",
"version": "0.6.2",
"version": "0.6.3",
"author": "Dylan Lingelbach (https://github.com/dylanlingelbach)",
"license": "MIT",
"contributors": [
Expand Down