From 4082f424760d75cc0ec3ca129c58830c5c603eaa Mon Sep 17 00:00:00 2001 From: Matthew Williams Date: Wed, 7 Aug 2019 11:34:11 -0400 Subject: [PATCH 1/4] add support for PAY_PER_REQUEST BillingMode --- dynamorm/exceptions.py | 4 ++ dynamorm/table.py | 140 +++++++++++++++++++++++------------------ 2 files changed, 84 insertions(+), 60 deletions(-) diff --git a/dynamorm/exceptions.py b/dynamorm/exceptions.py index 4cb840e..c551ce0 100644 --- a/dynamorm/exceptions.py +++ b/dynamorm/exceptions.py @@ -43,6 +43,10 @@ class MissingTableAttribute(DynamoTableException): """A required attribute is missing""" +class InvalidTableAttribute(DynamoTableException): + """An attribute has an invalid value""" + + class InvalidSchemaField(DynamoTableException): """A field provided does not exist in the schema""" diff --git a/dynamorm/table.py b/dynamorm/table.py index de3cd06..ba05ffc 100644 --- a/dynamorm/table.py +++ b/dynamorm/table.py @@ -4,25 +4,27 @@ The attributes you define on your inner ``Table`` class map to underlying boto data structures. This mapping is expressed through the following data model: -========= ======== ==== =========== -Attribute Required Type Description -========= ======== ==== =========== -name True str The name of the table, as stored in Dynamo. +========= ======== ==== =========== +Attribute Required Type Description +========= ======== ==== =========== +name True str The name of the table, as stored in Dynamo. -hash_key True str The name of the field to use as the hash key. +hash_key True str The name of the field to use as the hash key. It must exist in the schema. -range_key False str The name of the field to use as the range_key, if one is used. +range_key False str The name of the field to use as the range_key, if one is used. It must exist in the schema. -read True int The provisioned read throughput. +read Cond int The provisioned read throughput. Required for 'PROVISIONED' billing_mode (default). -write True int The provisioned write throughput. +write Cond int The provisioned write throughput. Required for 'PROVISIONED' billing_mode (default). -stream False str The stream view type, either None or one of: - 'NEW_IMAGE'|'OLD_IMAGE'|'NEW_AND_OLD_IMAGES'|'KEYS_ONLY' +billing_mode True str The billing mode. One of: 'PROVISIONED'|'PAY_PER_REQUEST' -========= ======== ==== =========== +stream False str The stream view type, either None or one of: + 'NEW_IMAGE'|'OLD_IMAGE'|'NEW_AND_OLD_IMAGES'|'KEYS_ONLY' + +========= ======== ==== =========== Indexes @@ -72,6 +74,7 @@ class DynamoCommon3(object): range_key = None read = None write = None + billing_mode = 'PROVISIONED' def __init__(self): for attr in self.REQUIRED_ATTRS: @@ -107,7 +110,13 @@ def as_schema(name, key_type): @property def provisioned_throughput(self): """Return an appropriate ProvisionedThroughput, based on our attributes""" - return {"ReadCapacityUnits": self.read, "WriteCapacityUnits": self.write} + if self.billing_mode != 'PROVISIONED': + return None + + return { + 'ReadCapacityUnits': self.read, + 'WriteCapacityUnits': self.write + } class DynamoIndex3(DynamoCommon3): @@ -127,6 +136,7 @@ def lookup_by_type(cls, index_type): def __init__(self, table, schema): self.table = table self.schema = schema + self.billing_mode = table.billing_mode super(DynamoIndex3, self).__init__() @@ -167,7 +177,8 @@ class DynamoGlobalIndex3(DynamoIndex3): @property def index_args(self): args = super(DynamoGlobalIndex3, self).index_args - args["ProvisionedThroughput"] = self.provisioned_throughput + if self.billing_mode == 'PROVISIONED': + args['ProvisionedThroughput'] = self.provisioned_throughput return args @@ -344,23 +355,28 @@ def create_table(self, wait=True): :param bool wait: If set to True, the default, this call will block until the table is created """ - if not self.read or not self.write: - raise MissingTableAttribute( - "The read/write attributes are required to create a table" - ) + if self.billing_mode not in ('PROVISIONED', 'PAY_PER_REQUEST'): + raise InvalidTableAttribute("valid values for billing_mode are: PROVISIONED|PAY_PER_REQUEST") - index_args = collections.defaultdict(list) + if self.billing_mode == 'PROVISIONED' and (not self.read or not self.write): + raise MissingTableAttribute("The read/write attributes are required to create " + "a table when billing_mode is 'PROVISIONED'") + + extra_args = collections.defaultdict(list) for index in six.itervalues(self.indexes): - index_args[index.ARG_KEY].append(index.index_args) + extra_args[index.ARG_KEY].append(index.index_args) + + if self.billing_mode == 'PROVISIONED': + extra_args['ProvisionedThroughput'] = self.provisioned_throughput log.info("Creating table %s", self.name) table = self.resource.create_table( TableName=self.name, KeySchema=self.key_schema, AttributeDefinitions=self.attribute_definitions, - ProvisionedThroughput=self.provisioned_throughput, StreamSpecification=self.stream_specification, - **index_args + BillingMode=self.billing_mode, + **extra_args ) if wait: log.info("Waiting for table creation...") @@ -446,23 +462,34 @@ def do_update(**kwargs): wait_for_active() - # check if we're going to change our capacity - if (self.read and self.write) and ( - self.read != table.provisioned_throughput["ReadCapacityUnits"] - or self.write != table.provisioned_throughput["WriteCapacityUnits"] - ): + billing_args = {} - log.info( - "Updating capacity on table %s (%s -> %s)", - self.name, - dict( - (k, v) - for k, v in six.iteritems(table.provisioned_throughput) - if k.endswith("Units") - ), - self.provisioned_throughput, - ) - do_update(ProvisionedThroughput=self.provisioned_throughput) + # check if we're going to change our billing mode + current_billing_mode = table.billing_mode_summary['BillingMode'] + if self.billing_mode != current_billing_mode: + log.info("Updating billing mode on table %s (%s -> %s)", + self.name, + current_billing_mode, + self.billing_mode) + billing_args['BillingMode'] = self.billing_mode + + # check if we're going to change our capacity + if (self.billing_mode == 'PROVISIONED' and self.read and self.write) and \ + (self.read != table.provisioned_throughput['ReadCapacityUnits'] or + self.write != table.provisioned_throughput['WriteCapacityUnits']): + + log.info("Updating capacity on table %s (%s -> %s)", + self.name, + dict( + (k, v) + for k, v in six.iteritems(table.provisioned_throughput) + if k.endswith('Units') + ), + self.provisioned_throughput) + billing_args['ProvisionedThroughput'] = self.provisioned_throughput + + if billing_args: + do_update(**billing_args) return self.update_table() # check if we're going to modify the stream @@ -502,29 +529,22 @@ def do_update(**kwargs): for index in six.itervalues(self.indexes): if index.name in existing_indexes: - current_capacity = existing_indexes[index.name]["ProvisionedThroughput"] - if (index.read and index.write) and ( - index.read != current_capacity["ReadCapacityUnits"] - or index.write != current_capacity["WriteCapacityUnits"] - ): - - log.info( - "Updating capacity on global secondary index %s on table %s (%s)", - index.name, - self.name, - index.provisioned_throughput, - ) - - do_update( - GlobalSecondaryIndexUpdates=[ - { - "Update": { - "IndexName": index["IndexName"], - "ProvisionedThroughput": index.provisioned_throughput, - } - } - ] - ) + current_capacity = existing_indexes[index.name]['ProvisionedThroughput'] + update_args = {} + + if (index.billing_mode == 'PROVISIONED' and index.read and index.write) and \ + (index.read != current_capacity['ReadCapacityUnits'] or + index.write != current_capacity['WriteCapacityUnits']): + + log.info("Updating capacity on global secondary index %s on table %s (%s)", index.name, self.name, + index.provisioned_throughput) + + do_update(GlobalSecondaryIndexUpdates=[{ + 'Update': { + 'IndexName': index.name, + 'ProvisionedThroughput': index.provisioned_throughput + } + }]) return self.update_table() else: # create the index From 94b95a1d10954aaffe497ed911275c17ee2aa865 Mon Sep 17 00:00:00 2001 From: Evan Borgstrom Date: Tue, 1 Oct 2019 10:01:52 +0800 Subject: [PATCH 2/4] Reformat with black --- dynamorm/table.py | 109 +++++++++++++++++++++++++++------------------- 1 file changed, 63 insertions(+), 46 deletions(-) diff --git a/dynamorm/table.py b/dynamorm/table.py index ba05ffc..81256ea 100644 --- a/dynamorm/table.py +++ b/dynamorm/table.py @@ -74,7 +74,7 @@ class DynamoCommon3(object): range_key = None read = None write = None - billing_mode = 'PROVISIONED' + billing_mode = "PROVISIONED" def __init__(self): for attr in self.REQUIRED_ATTRS: @@ -110,13 +110,10 @@ def as_schema(name, key_type): @property def provisioned_throughput(self): """Return an appropriate ProvisionedThroughput, based on our attributes""" - if self.billing_mode != 'PROVISIONED': + if self.billing_mode != "PROVISIONED": return None - return { - 'ReadCapacityUnits': self.read, - 'WriteCapacityUnits': self.write - } + return {"ReadCapacityUnits": self.read, "WriteCapacityUnits": self.write} class DynamoIndex3(DynamoCommon3): @@ -177,8 +174,8 @@ class DynamoGlobalIndex3(DynamoIndex3): @property def index_args(self): args = super(DynamoGlobalIndex3, self).index_args - if self.billing_mode == 'PROVISIONED': - args['ProvisionedThroughput'] = self.provisioned_throughput + if self.billing_mode == "PROVISIONED": + args["ProvisionedThroughput"] = self.provisioned_throughput return args @@ -355,19 +352,23 @@ def create_table(self, wait=True): :param bool wait: If set to True, the default, this call will block until the table is created """ - if self.billing_mode not in ('PROVISIONED', 'PAY_PER_REQUEST'): - raise InvalidTableAttribute("valid values for billing_mode are: PROVISIONED|PAY_PER_REQUEST") + if self.billing_mode not in ("PROVISIONED", "PAY_PER_REQUEST"): + raise InvalidTableAttribute( + "valid values for billing_mode are: PROVISIONED|PAY_PER_REQUEST" + ) - if self.billing_mode == 'PROVISIONED' and (not self.read or not self.write): - raise MissingTableAttribute("The read/write attributes are required to create " - "a table when billing_mode is 'PROVISIONED'") + if self.billing_mode == "PROVISIONED" and (not self.read or not self.write): + raise MissingTableAttribute( + "The read/write attributes are required to create " + "a table when billing_mode is 'PROVISIONED'" + ) extra_args = collections.defaultdict(list) for index in six.itervalues(self.indexes): extra_args[index.ARG_KEY].append(index.index_args) - if self.billing_mode == 'PROVISIONED': - extra_args['ProvisionedThroughput'] = self.provisioned_throughput + if self.billing_mode == "PROVISIONED": + extra_args["ProvisionedThroughput"] = self.provisioned_throughput log.info("Creating table %s", self.name) table = self.resource.create_table( @@ -465,28 +466,33 @@ def do_update(**kwargs): billing_args = {} # check if we're going to change our billing mode - current_billing_mode = table.billing_mode_summary['BillingMode'] + current_billing_mode = table.billing_mode_summary["BillingMode"] if self.billing_mode != current_billing_mode: - log.info("Updating billing mode on table %s (%s -> %s)", - self.name, - current_billing_mode, - self.billing_mode) - billing_args['BillingMode'] = self.billing_mode + log.info( + "Updating billing mode on table %s (%s -> %s)", + self.name, + current_billing_mode, + self.billing_mode, + ) + billing_args["BillingMode"] = self.billing_mode # check if we're going to change our capacity - if (self.billing_mode == 'PROVISIONED' and self.read and self.write) and \ - (self.read != table.provisioned_throughput['ReadCapacityUnits'] or - self.write != table.provisioned_throughput['WriteCapacityUnits']): - - log.info("Updating capacity on table %s (%s -> %s)", - self.name, - dict( - (k, v) - for k, v in six.iteritems(table.provisioned_throughput) - if k.endswith('Units') - ), - self.provisioned_throughput) - billing_args['ProvisionedThroughput'] = self.provisioned_throughput + if (self.billing_mode == "PROVISIONED" and self.read and self.write) and ( + self.read != table.provisioned_throughput["ReadCapacityUnits"] + or self.write != table.provisioned_throughput["WriteCapacityUnits"] + ): + + log.info( + "Updating capacity on table %s (%s -> %s)", + self.name, + dict( + (k, v) + for k, v in six.iteritems(table.provisioned_throughput) + if k.endswith("Units") + ), + self.provisioned_throughput, + ) + billing_args["ProvisionedThroughput"] = self.provisioned_throughput if billing_args: do_update(**billing_args) @@ -529,22 +535,33 @@ def do_update(**kwargs): for index in six.itervalues(self.indexes): if index.name in existing_indexes: - current_capacity = existing_indexes[index.name]['ProvisionedThroughput'] + current_capacity = existing_indexes[index.name]["ProvisionedThroughput"] update_args = {} - if (index.billing_mode == 'PROVISIONED' and index.read and index.write) and \ - (index.read != current_capacity['ReadCapacityUnits'] or - index.write != current_capacity['WriteCapacityUnits']): + if ( + index.billing_mode == "PROVISIONED" and index.read and index.write + ) and ( + index.read != current_capacity["ReadCapacityUnits"] + or index.write != current_capacity["WriteCapacityUnits"] + ): - log.info("Updating capacity on global secondary index %s on table %s (%s)", index.name, self.name, - index.provisioned_throughput) + log.info( + "Updating capacity on global secondary index %s on table %s (%s)", + index.name, + self.name, + index.provisioned_throughput, + ) - do_update(GlobalSecondaryIndexUpdates=[{ - 'Update': { - 'IndexName': index.name, - 'ProvisionedThroughput': index.provisioned_throughput - } - }]) + do_update( + GlobalSecondaryIndexUpdates=[ + { + "Update": { + "IndexName": index.name, + "ProvisionedThroughput": index.provisioned_throughput, + } + } + ] + ) return self.update_table() else: # create the index From 416584491bcac005952c2334cf49d3cec6f02fda Mon Sep 17 00:00:00 2001 From: Evan Borgstrom Date: Tue, 1 Oct 2019 10:08:35 +0800 Subject: [PATCH 3/4] Bump version, add changelog entry --- CHANGELOG.rst | 6 ++++++ dynamorm/table.py | 1 + setup.py | 4 ++-- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 4ad620e..b3e64fe 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,9 @@ +0.9.13 - 2019.10.01 +################### + +* Add support for PAY_PER_REQUEST billing mode +* Bump minimum version of boto3 to 1.9.54 + 0.9.12 - 2019.09.30 ################### diff --git a/dynamorm/table.py b/dynamorm/table.py index 81256ea..d534abe 100644 --- a/dynamorm/table.py +++ b/dynamorm/table.py @@ -54,6 +54,7 @@ from boto3.dynamodb.conditions import Key, Attr from dynamorm.exceptions import ( + InvalidTableAttribute, MissingTableAttribute, TableNotActive, InvalidSchemaField, diff --git a/setup.py b/setup.py index 284f7fa..0c9c72d 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name="dynamorm", - version="0.9.12", + version="0.9.13", description="DynamORM is a Python object & relation mapping library for Amazon's DynamoDB service.", long_description=long_description, author="Evan Borgstrom", @@ -13,7 +13,7 @@ url="https://github.com/NerdWalletOSS/DynamORM", license="Apache License Version 2.0", python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4", - install_requires=["blinker>=1.4,<2.0", "boto3>=1.3,<2.0", "six"], + install_requires=["blinker>=1.4,<2.0", "boto3>=1.9.54,<2.0", "six"], extras_require={ "marshmallow": ["marshmallow>=2.15.1,<4"], "schematics": ["schematics>=2.1.0,<3"], From 6a753c5eb9c71bf621c1464359e5a44ef8e3731e Mon Sep 17 00:00:00 2001 From: Evan Borgstrom Date: Tue, 1 Oct 2019 10:19:34 +0800 Subject: [PATCH 4/4] Refactor and add tests --- dynamorm/table.py | 28 ++++++++++++---------------- tests/test_model.py | 22 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/dynamorm/table.py b/dynamorm/table.py index d534abe..d3271df 100644 --- a/dynamorm/table.py +++ b/dynamorm/table.py @@ -111,9 +111,6 @@ def as_schema(name, key_type): @property def provisioned_throughput(self): """Return an appropriate ProvisionedThroughput, based on our attributes""" - if self.billing_mode != "PROVISIONED": - return None - return {"ReadCapacityUnits": self.read, "WriteCapacityUnits": self.write} @@ -134,7 +131,6 @@ def lookup_by_type(cls, index_type): def __init__(self, table, schema): self.table = table self.schema = schema - self.billing_mode = table.billing_mode super(DynamoIndex3, self).__init__() @@ -175,7 +171,7 @@ class DynamoGlobalIndex3(DynamoIndex3): @property def index_args(self): args = super(DynamoGlobalIndex3, self).index_args - if self.billing_mode == "PROVISIONED": + if self.table.billing_mode == "PROVISIONED": args["ProvisionedThroughput"] = self.provisioned_throughput return args @@ -197,6 +193,17 @@ def __init__(self, schema, indexes=None): super(DynamoTable3, self).__init__() + if self.billing_mode not in ("PROVISIONED", "PAY_PER_REQUEST"): + raise InvalidTableAttribute( + "valid values for billing_mode are: PROVISIONED|PAY_PER_REQUEST" + ) + + if self.billing_mode == "PROVISIONED" and (not self.read or not self.write): + raise MissingTableAttribute( + "The read/write attributes are required to create " + "a table when billing_mode is 'PROVISIONED'" + ) + self.indexes = {} if indexes: for name, klass in six.iteritems(indexes): @@ -353,17 +360,6 @@ def create_table(self, wait=True): :param bool wait: If set to True, the default, this call will block until the table is created """ - if self.billing_mode not in ("PROVISIONED", "PAY_PER_REQUEST"): - raise InvalidTableAttribute( - "valid values for billing_mode are: PROVISIONED|PAY_PER_REQUEST" - ) - - if self.billing_mode == "PROVISIONED" and (not self.read or not self.write): - raise MissingTableAttribute( - "The read/write attributes are required to create " - "a table when billing_mode is 'PROVISIONED'" - ) - extra_args = collections.defaultdict(list) for index in six.itervalues(self.indexes): extra_args[index.ARG_KEY].append(index.index_args) diff --git a/tests/test_model.py b/tests/test_model.py index 3ee00c5..e7a7124 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -7,6 +7,7 @@ DynaModelException, HashKeyExists, InvalidSchemaField, + InvalidTableAttribute, MissingTableAttribute, ValidationError, ) @@ -79,10 +80,31 @@ class Child(Parent): def test_table_validation(): """Defining a model with missing table attributes should raise exceptions""" with pytest.raises(MissingTableAttribute): + # Missing hash_key + class Model(DynaModel): + class Table: + name = "table" + + class Schema: + foo = String(required=True) + + with pytest.raises(MissingTableAttribute): + # Missing read/write + class Model(DynaModel): + class Table: + name = "table" + hash_key = "foo" + class Schema: + foo = String(required=True) + + with pytest.raises(InvalidTableAttribute): + # Invalid billing mode class Model(DynaModel): class Table: name = "table" + hash_key = "foo" + billing_mode = "FOO" class Schema: foo = String(required=True)