diff --git a/lib/dynamo-backup.js b/lib/dynamo-backup.js index a81e469..160664f 100644 --- a/lib/dynamo-backup.js +++ b/lib/dynamo-backup.js @@ -159,38 +159,59 @@ DynamoBackup.prototype._copyTable = function (tableName, itemsReceived, callback return callback(err); } - var readPercentage = self.readPercentage; - var limit = Math.max((data.Table.ProvisionedThroughput.ReadCapacityUnits * readPercentage) | 0, 1); + const Table = data.Table; + var maxCapacityUnits = Math.max(Math.floor(Table.ProvisionedThroughput.ReadCapacityUnits * self.readPercentage), 1); + var averageItemSize = Table.TableSizeBytes / Table.ItemCount; - self._streamItems(tableName, null, limit, itemsReceived, callback); + self._streamItems(tableName, null, 0, maxCapacityUnits, averageItemSize, itemsReceived, callback); }); }; -DynamoBackup.prototype._streamItems = function fetchItems(tableName, startKey, limit, itemsReceived, callback) { +DynamoBackup.prototype._streamItems = function fetchItems(tableName, startKey, remainingCapacityUnits, maxCapacityUnits, averageItemSize, itemsReceived, callback) { var self = this; var ddb = new AWS.DynamoDB(); - var params = { - Limit: limit, - ReturnConsumedCapacity: 'NONE', - TableName: tableName - }; - if (startKey) { - params.ExclusiveStartKey = startKey; - } - ddb.scan(params, function (err, data) { - if (err) { - return callback(err); - } + var startOfSecond = new Date().getTime(); + + var usableCapacity = maxCapacityUnits + remainingCapacityUnits; + if (usableCapacity > 0) { + var limit = Math.max(Math.floor(((8 * 1024) / averageItemSize) * usableCapacity), 1); + var params = { + Limit: limit, + ReturnConsumedCapacity: 'TOTAL', + TableName: tableName + }; - if (data.Items.length > 0) { - itemsReceived(data.Items); + if (startKey) { + params.ExclusiveStartKey = startKey; } - if (!data.LastEvaluatedKey || _.keys(data.LastEvaluatedKey).length === 0) { - return callback(); - } - self._streamItems(tableName, data.LastEvaluatedKey, limit, itemsReceived, callback); - }); + ddb.scan(params, function (error, data) { + if (error) { + callback(error); + return; + } + + if (data.Items.length > 0) { + itemsReceived(data.Items); + } + + if (!data.LastEvaluatedKey || _.keys(data.LastEvaluatedKey).length === 0) { + callback(); + return; + } + + var nextRemainingCapacity = usableCapacity - data.ConsumedCapacity.CapacityUnits; + var timeout = Math.max(1000 - (new Date().getTime() - startOfSecond), 0); + setTimeout(function () { + self._streamItems(tableName, data.LastEvaluatedKey, nextRemainingCapacity, maxCapacityUnits, averageItemSize, itemsReceived, callback); + }, timeout); + }); + } else { + // Wait until we have capacity again + setTimeout(() => { + self._streamItems(tableName, startKey, usableCapacity, maxCapacityUnits, averageItemSize, itemsReceived, callback); + }, 1000); + } }; DynamoBackup.prototype._fetchTables = function (lastTable, tables, callback) { @@ -259,4 +280,4 @@ DynamoBackup.prototype._getDataPipelineAttributeValueKey = function (type) { } }; -module.exports = DynamoBackup; \ No newline at end of file +module.exports = DynamoBackup;