From ba364587c8b590837533474c3efda108e461cee0 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Wed, 18 Dec 2019 06:18:08 -0800 Subject: [PATCH 1/4] Remove unnecessary requirement --- requirements-to-freeze.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/requirements-to-freeze.txt b/requirements-to-freeze.txt index 8f5bd59..9c08eb5 100644 --- a/requirements-to-freeze.txt +++ b/requirements-to-freeze.txt @@ -10,7 +10,6 @@ pytest-pythonpath==0.7.3 # MotoServer requirements flask -netifaces aiohttp # Functional requirements From b5b14d552788952434938163334921b922e67e11 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Sun, 5 Jan 2020 06:34:37 -0800 Subject: [PATCH 2/4] #13 - Stream data to new tables --- Makefile | 2 +- requirements-to-freeze.txt | 2 +- src/migrator/steps/AddIndexStep.py | 6 ++ src/migrator/steps/CreateTableStep.py | 1 - src/migrator/utilities/AwsUtilities.py | 44 ++++++++- src/migrator/utilities/DynamoDButilities.py | 9 ++ .../add_index/table_copy_items_v1.py | 3 +- .../add_index/table_copy_items_v2.py | 8 +- .../add_index/table_stream_items_v1.py | 3 +- .../add_index/table_stream_items_v2.py | 2 +- .../add_index/table_stream_items_v3.py | 2 +- tests/migrator/steps/test_add_index.py | 39 ++++++-- .../migrator/utilities/test_aws_utilities.py | 93 ++++++++++++++++++- 13 files changed, 188 insertions(+), 26 deletions(-) diff --git a/Makefile b/Makefile index c28159e..f72b219 100644 --- a/Makefile +++ b/Makefile @@ -18,7 +18,7 @@ venv_lint: venv/bin/activate venv_test: venv/bin/activate . venv/bin/activate ; \ - venv/bin/pytest -s + venv/bin/pytest -sv lint: flake8 diff --git a/requirements-to-freeze.txt b/requirements-to-freeze.txt index 9c08eb5..37a9a49 100644 --- a/requirements-to-freeze.txt +++ b/requirements-to-freeze.txt @@ -13,5 +13,5 @@ flask aiohttp # Functional requirements -moto==1.3.15.dev142 +moto==1.3.15.dev172 tenacity==5.1.4 \ No newline at end of file diff --git a/src/migrator/steps/AddIndexStep.py b/src/migrator/steps/AddIndexStep.py index b8eb2ff..18aaea8 100644 --- a/src/migrator/steps/AddIndexStep.py +++ b/src/migrator/steps/AddIndexStep.py @@ -27,6 +27,10 @@ def execute(self): base_name = self._get_base_name(previous_table_name) new_table_name = f"{base_name}_V{self._version}" previous_table = self.aws_utils.describe_table(previous_table_name)['Table'] + if 'StreamSpecification' not in previous_table: + # Alter table to add stream + self.aws_utils.update_table(DynamoDButilities.get_stream_props(previous_table_name)) + previous_table = self.aws_utils.describe_table(previous_table_name)['Table'] new_table = DynamoDButilities.get_table_creation_details(previous_table, new_table_name, local_indexes=self._properties['LocalSecondaryIndexes'], attr_definitions=self._properties['AttributeDefinitions']) @@ -40,6 +44,8 @@ def execute(self): # Create stream self.aws_utils.create_event_source_mapping(stream_arn=previous_table['LatestStreamArn'], function_arn=func['FunctionArn']) + # Update existing data + self.aws_utils.update_data(previous_table_name, key_schema=previous_table['KeySchema']) return created_table def _get_base_name(self, name): diff --git a/src/migrator/steps/CreateTableStep.py b/src/migrator/steps/CreateTableStep.py index 2c633ff..8c35925 100644 --- a/src/migrator/steps/CreateTableStep.py +++ b/src/migrator/steps/CreateTableStep.py @@ -10,5 +10,4 @@ def __init__(self, identifier, version, properties): self.aws_utils = AwsUtilities(self._identifier, version=self._version) def execute(self): - self._properties['StreamSpecification'] = {'StreamEnabled': True, 'StreamViewType': 'NEW_AND_OLD_IMAGES'} return self.aws_utils.create_table_if_not_exists(self._properties) diff --git a/src/migrator/utilities/AwsUtilities.py b/src/migrator/utilities/AwsUtilities.py index 0a906dd..21428ba 100644 --- a/src/migrator/utilities/AwsUtilities.py +++ b/src/migrator/utilities/AwsUtilities.py @@ -1,10 +1,12 @@ import boto3 import logging +from boto3.dynamodb.conditions import AttributeNotExists from migrator.utilities.DynamoDButilities import DynamoDButilities from migrator.utilities.IAMutilities import lambda_stream_assume_role, lambda_stream_policy from migrator.utilities.LambdaUtilities import get_zipfile from migrator.utilities.Utilities import logger, metadata_table_name, metadata_table_properties from tenacity import before_sleep_log, retry, wait_exponential, stop_after_attempt, RetryError +from datetime import datetime from time import sleep from uuid import uuid4 @@ -13,6 +15,7 @@ _iam = boto3.client('iam') _sts = boto3.client('sts') +_dynamodb.meta. class AwsHistory: @@ -146,16 +149,51 @@ def create_table(self, properties): def _create_table(self, properties, keep_history=True): _dynamodb.create_table(**properties) + created_table = self.wait_for_table(properties) + logger.info(f"Successfully created table {properties['TableName']}") + if keep_history: + self._history.created_table(properties['TableName']) + return created_table + + def wait_for_table(self, properties): status = 'CREATING' while status != 'ACTIVE': created_table = self.describe_table(properties['TableName'])['Table'] status = created_table['TableStatus'] sleep(1) - logger.info(f"Successfully created table {properties['TableName']}") - if keep_history: - self._history.created_table(properties['TableName']) return created_table + def update_table(self, properties): + def _update_table(): + logger.info(f"Updating table: {properties}") + logger.info(f"Current table: {self.describe_table(properties['TableName'])}") + _dynamodb.update_table(**properties) + updated_table = self.wait_for_table(properties) + logger.info(f"Successfully updated table {properties['TableName']}") + return updated_table + try: + return self.retry(_update_table, ()) + except RetryError as e: + self._history.rollback() + e.reraise() + + def update_data(self, table_name, key_schema): + unique_attr = f"{str(uuid4())}_migration" + self._history._ddb_utils.add_attr_name(unique_attr) + items = _dynamodb.scan(TableName=table_name, Limit=200)['Items'] + logger.warning(f"Items in {table_name}: {items}") + for item in items: + key = {key['AttributeName']: { + ([*item[key['AttributeName']]][0]): item[key['AttributeName']][[*item[key['AttributeName']]][0]]} for + key in key_schema} + _dynamodb.update_item(TableName=table_name, + Key=key, + UpdateExpression="set #attr = :val", + ExpressionAttributeNames={'#attr': unique_attr}, + ExpressionAttributeValues={':val': {'S': str(datetime.today())}}) + logger.debug(f"Successfully updated data in {table_name}") + logger.warning(f"Items in {table_name}: {_dynamodb.scan(TableName=table_name)['Items']}") + def describe_table(self, table_name): return _dynamodb.describe_table(TableName=table_name) diff --git a/src/migrator/utilities/DynamoDButilities.py b/src/migrator/utilities/DynamoDButilities.py index bb27a0f..27aefa3 100644 --- a/src/migrator/utilities/DynamoDButilities.py +++ b/src/migrator/utilities/DynamoDButilities.py @@ -19,6 +19,12 @@ def __init__(self, identifier, version): self._identifier = identifier self._version = str(version) + @staticmethod + def get_stream_props(table_name): + return {'TableName': table_name, + 'StreamSpecification': {'StreamEnabled': True, + 'StreamViewType': 'NEW_AND_OLD_IMAGES'}} + @staticmethod def get_table_creation_details(existing_table: dict, new_table_name: str, local_indexes: [dict], attr_definitions: [dict]): @@ -66,6 +72,9 @@ def add_mapping(self, uuid): def add_function(self, arn): self._set_operation("ADD", "functions", arn) + def add_attr_name(self, attr_name): + self._set_operation("ADD", "attribute_name", attr_name) + def remove_table(self, name): self._set_operation("DELETE", "tables", name) diff --git a/tests/migration_scripts/add_index/table_copy_items_v1.py b/tests/migration_scripts/add_index/table_copy_items_v1.py index ac491d0..73e9ef8 100644 --- a/tests/migration_scripts/add_index/table_copy_items_v1.py +++ b/tests/migration_scripts/add_index/table_copy_items_v1.py @@ -1,9 +1,10 @@ #!/usr/bin/python from migrator.dynamodb_migrator import Migrator +from uuid import uuid4 -table_name = 'customers' +table_name = str(uuid4()) migrator = Migrator(identifier="copy_items") diff --git a/tests/migration_scripts/add_index/table_copy_items_v2.py b/tests/migration_scripts/add_index/table_copy_items_v2.py index 8149185..9d8915c 100644 --- a/tests/migration_scripts/add_index/table_copy_items_v2.py +++ b/tests/migration_scripts/add_index/table_copy_items_v2.py @@ -1,9 +1,9 @@ #!/usr/bin/python from migrator.dynamodb_migrator import Migrator +from migration_scripts.add_index.table_copy_items_v1 import table_name -table_name = 'customers' migrator = Migrator(identifier="copy_items") @@ -15,8 +15,7 @@ {'AttributeName': 'last_name', 'KeyType': 'RANGE'}], BillingMode='PAY_PER_REQUEST') def v1(created_table): - assert created_table['TableName'] == table_name - assert created_table['TableStatus'] == 'ACTIVE' + pass @migrator.version(2) @@ -26,5 +25,4 @@ def v1(created_table): {'AttributeName': 'postcode', 'KeyType': 'RANGE'}], 'Projection': {'ProjectionType': 'ALL'}}]) def v2(created_table): - assert created_table['TableName'] == f"{table_name}_V2" - assert created_table['TableStatus'] == 'ACTIVE' + pass diff --git a/tests/migration_scripts/add_index/table_stream_items_v1.py b/tests/migration_scripts/add_index/table_stream_items_v1.py index 9dae51f..50ae5d7 100644 --- a/tests/migration_scripts/add_index/table_stream_items_v1.py +++ b/tests/migration_scripts/add_index/table_stream_items_v1.py @@ -1,9 +1,10 @@ #!/usr/bin/python from migrator.dynamodb_migrator import Migrator +from uuid import uuid4 -table_name = 'customers' +table_name = str(uuid4()) migrator = Migrator(identifier="table_stream_test") diff --git a/tests/migration_scripts/add_index/table_stream_items_v2.py b/tests/migration_scripts/add_index/table_stream_items_v2.py index 567c512..7079a48 100644 --- a/tests/migration_scripts/add_index/table_stream_items_v2.py +++ b/tests/migration_scripts/add_index/table_stream_items_v2.py @@ -1,9 +1,9 @@ #!/usr/bin/python from migrator.dynamodb_migrator import Migrator +from migration_scripts.add_index.table_stream_items_v1 import table_name -table_name = 'customers' migrator = Migrator(identifier="table_stream_test") diff --git a/tests/migration_scripts/add_index/table_stream_items_v3.py b/tests/migration_scripts/add_index/table_stream_items_v3.py index 07f4b00..a7ada95 100644 --- a/tests/migration_scripts/add_index/table_stream_items_v3.py +++ b/tests/migration_scripts/add_index/table_stream_items_v3.py @@ -1,9 +1,9 @@ #!/usr/bin/python from migrator.dynamodb_migrator import Migrator +from migration_scripts.add_index.table_stream_items_v1 import table_name -table_name = 'customers' migrator = Migrator(identifier="table_stream_test") diff --git a/tests/migrator/steps/test_add_index.py b/tests/migrator/steps/test_add_index.py index 56117c1..1f7f97a 100644 --- a/tests/migrator/steps/test_add_index.py +++ b/tests/migrator/steps/test_add_index.py @@ -1,6 +1,7 @@ import importlib import logging import os +from datetime import datetime from time import sleep from random import randint from tenacity import before_sleep_log, retry, stop_after_attempt, wait_exponential @@ -51,18 +52,19 @@ def test_add_index_script__assert_data_is_send_through_for_multiple_versions(dyn import migrator.utilities.AwsUtilities importlib.reload(migrator.utilities.AwsUtilities) import migration_scripts.add_index.table_stream_items_v1 # noqa + from migration_scripts.add_index.table_stream_items_v1 import table_name try: # Create index, and verify data is transferred import migration_scripts.add_index.table_stream_items_v2 # noqa if not CONNECT_TO_AWS: # update Lambda when mocking, to point to local MOTO server update_dynamodb_host_in_lambda(dynamodb, lmbda, version='2') - insert_and_verify_random_data(dynamodb, 'customers', 'customers_V2') + insert_and_verify_random_data(dynamodb, table_name, f'{table_name}_V2') # Create another index, and verify data is transferred again import migration_scripts.add_index.table_stream_items_v3 # noqa if not CONNECT_TO_AWS: update_dynamodb_host_in_lambda(dynamodb, lmbda, version='3') - insert_and_verify_random_data(dynamodb, 'customers_V2', 'customers_V3') + insert_and_verify_random_data(dynamodb, f'{table_name}_V2', f'{table_name}_V3') finally: delete_created_services(dynamodb=dynamodb, iam=iam, lmbda=lmbda) @@ -70,15 +72,15 @@ def test_add_index_script__assert_data_is_send_through_for_multiple_versions(dyn @aws_integration_test def test_add_index_script__assert_existing_data_is_replicated(dynamodb, lmbda, iam): import migration_scripts.add_index.table_copy_items_v1 # noqa - insert_random_data(dynamodb, 'customers') + from migration_scripts.add_index.table_copy_items_v1 import table_name + insert_random_data(dynamodb, table_name) # Update table import migration_scripts.add_index.table_copy_items_v2 # noqa # # Assert the new table has the items created in the first table try: - reverify_random_data(dynamodb, 'customers_V2') + reverify_random_data(dynamodb, f'{table_name}_V2') finally: - # delete_created_services(dynamodb=dynamodb, iam=iam, lmbda=lmbda) @@ -94,17 +96,31 @@ def insert_random_data(dynamodb, table_name): sleep(10) +def update_random_data(dynamodb, table_name): + items = dynamodb.scan(TableName=table_name)['Items'] + items = dynamodb.scan(TableName=metadata_table_name)['Items'] + unique_attr = items[0]['2']['M']['attribute_name']['SS'][0] + for cust_nr in customer_nrs: + key = {'customer_nr': {'S': cust_nr}, 'last_name': {'S': 'Smith'}} + dynamodb.update_item(TableName=table_name, + Key=key, + UpdateExpression="set #attr = :val", + ExpressionAttributeNames={'#attr': unique_attr}, + ExpressionAttributeValues={':val': {'S': str(datetime.today())}}) + sleep(10) + + def verify_random_data(dynamodb, table_name): items = dynamodb.scan(TableName=table_name)['Items'] assert len(items) == len(customer_nrs) assert [item['customer_nr']['S'] for item in items].sort() == customer_nrs.sort() - indexed_items = dynamodb.scan(TableName='customers_V2', IndexName='postcode_index')['Items'] + indexed_items = dynamodb.scan(TableName=f'{table_name}', IndexName='postcode_index')['Items'] assert len(indexed_items) == len(customer_nrs) -@retry(wait=wait_exponential(multiplier=1, min=2, max=2), stop=stop_after_attempt(2), before_sleep=before_sleep_log(logger, logging.DEBUG)) +@retry(wait=wait_exponential(multiplier=1, min=2), stop=stop_after_attempt(5), before_sleep=before_sleep_log(logger, logging.DEBUG)) def reverify_random_data(dynamodb, table_name): - verify_random_data(table_name) + verify_random_data(dynamodb, table_name) @retry(wait=wait_exponential(multiplier=1, min=2, max=120), stop=stop_after_attempt(5), before_sleep=before_sleep_log(logger, logging.DEBUG)) @@ -174,16 +190,21 @@ def update_dynamodb_host_in_lambda(dynamodb, lmbda, version): new_code = update_boto_client_endpoints(existing_code, str(os.environ['dynamodb_mock_endpoint_url'])) res = lmbda.update_function_code(FunctionName=created_function_name, ZipFile=zip(new_code)) lmbda.update_event_source_mapping(UUID=created_items['mappings']['SS'][0], FunctionName=res['FunctionArn']) + sleep(10) def delete_created_services(dynamodb, iam, lmbda): metadata = dynamodb.scan(TableName=metadata_table_name)['Items'][0] created_items = metadata['2']['M'] + tables = metadata['1']['M']['tables']['SS'] + tables.extend(created_items['tables']['SS']) delete(created_items, iam, lmbda) if '3' in metadata: created_items = dynamodb.scan(TableName=metadata_table_name)['Items'][0]['3']['M'] delete(created_items, iam, lmbda) - delete_tables(dynamodb, [metadata_table_name, 'customers', 'customers_V2', 'customers_V3']) + tables.extend(created_items['tables']['SS']) + tables.append(metadata_table_name) + delete_tables(dynamodb, tables) def delete(created_items, iam, lmbda): diff --git a/tests/migrator/utilities/test_aws_utilities.py b/tests/migrator/utilities/test_aws_utilities.py index 78afc5c..9eb8cb1 100644 --- a/tests/migrator/utilities/test_aws_utilities.py +++ b/tests/migrator/utilities/test_aws_utilities.py @@ -1,6 +1,9 @@ from botocore.exceptions import ClientError +from datetime import datetime +from dateutil.parser import parse from mock_wrapper import mock_aws from migrator.utilities.AwsUtilities import AwsUtilities +from migrator.utilities.DynamoDButilities import DynamoDButilities from migrator.utilities.IAMutilities import lambda_stream_assume_role, lambda_stream_policy from migrator.utilities.Utilities import metadata_table_name from time import sleep @@ -12,8 +15,7 @@ table_properties = {'AttributeDefinitions': [{'AttributeName': 'identifier', 'AttributeType': 'S'}], 'TableName': table_name, 'KeySchema': [{'AttributeName': 'identifier', 'KeyType': 'HASH'}], - 'BillingMode': 'PAY_PER_REQUEST', - 'StreamSpecification': {'StreamEnabled': True, 'StreamViewType': 'NEW_AND_OLD_IMAGES'}} + 'BillingMode': 'PAY_PER_REQUEST'} @mock_aws @@ -74,6 +76,7 @@ def test_rollback_when_creating_two_tables(dynamodb, lmbda, iam): # create something using AwsUtils identifier = str(uuid4()) aws_util = AwsUtilities(identifier=identifier, version='1') + created_table = aws_util.update_table(DynamoDButilities.get_stream_props(table_name)) aws_util.create_iam_items(created_table, created_table) # Sanity check that the policy now exists expected_iam_policy = get_recorded(dynamodb, identifier, "policies")[0] @@ -325,6 +328,92 @@ def test_aws_util_create_table_if_not_exists(dynamodb, lmbda, iam): delete_tables(dynamodb, [metadata_table_name, table_name]) +@mock_aws +def test_dynamodb_table_can_be_updated(dynamodb, *_): + # + # Initialize + util = AwsUtilities(identifier=str(uuid4()), version='1') + # + # Create first table, and verify it exists + util.create_table_if_not_exists(table_properties) + assert 'StreamSpecification' not in dynamodb.describe_table(TableName=table_name)['Table'] + # + # Update table with stream specification + util.update_table(DynamoDButilities.get_stream_props(table_name)) + # + # Assert it exists + assert 'StreamSpecification' in dynamodb.describe_table(TableName=table_name)['Table'] + assert dynamodb.describe_table(TableName=table_name)['Table']['StreamSpecification'] == {'StreamEnabled': True, + 'StreamViewType': 'NEW_AND_OLD_IMAGES'} + # + # Cleanup + delete_tables(dynamodb, [metadata_table_name, table_name]) + + +@mock_aws +def test_dynamodb_table_update_can_be_reverted(dynamodb, *_): + # + # Initialize + util = AwsUtilities(identifier=str(uuid4()), version='1') + # + # Update table without name + util.create_table_if_not_exists(table_properties) + try: + util.update_table({'StreamSpecification': {'StreamEnabled': True}}) + assert False, "Updating a table without name should fail" + except: # noqa: E722 + pass # Expect exception + # + # Verify table has been deleted (i.e., rollback is working) + assert table_name not in dynamodb.list_tables()['TableNames'] + # + # Cleanup + delete_tables(dynamodb, [metadata_table_name]) + + +@mock_aws +def test_update_data(dynamodb, *_): + # + # Create table + util = AwsUtilities(identifier=str(uuid4()), version='1') + table = util.create_table_if_not_exists(table_properties) + # + # Add some data + nr_of_items = 300 + for _ in range(0, nr_of_items): + dynamodb.put_item(TableName=table_name, Item={'identifier': {'S': str(uuid4())}}) + # + # Update data + util.update_data(table_name, key_schema=table['KeySchema']) + # + # Update_key will be recorded + metadata = dynamodb.scan(TableName=metadata_table_name)['Items'][0] + update_key = metadata['1']['M']['attribute_name']['SS'][0] + # + # Verify data is updated + items = [item for item in dynamodb.scan(TableName=table_name)['Items'] if update_key in item] + assert len(items) == nr_of_items + assert all([parse(item[update_key]['S']) for item in items]) + # + # Clean up + delete_tables(dynamodb, [metadata_table_name, table_name]) + + +@mock_aws +def test_update_data__only_updates_non_updated(): + pass + + +@mock_aws +def test_update_data__can_be_called_twice(): + pass + + +@mock_aws +def test_update_data__only_return_key_attributes(): + pass + + def get_metadata(dynamodb, identifier, version): return dynamodb.get_item(TableName=metadata_table_name, Key={'identifier': {'S': identifier}})['Item'][version] From de45caaaedbfd29607b36b605f7aef1bd0c4181e Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Mon, 2 Mar 2020 12:31:59 +0000 Subject: [PATCH 3/4] #13 - Update items in the existing table --- .travis.yml | 3 +- requirements-to-freeze.txt | 2 +- setup.cfg | 3 +- src/migrator/steps/AddIndexStep.py | 4 +- src/migrator/utilities/AwsUtilities.py | 155 ++++++++++-------- src/migrator/utilities/DynamoDButilities.py | 7 +- src/migrator/utilities/IAMutilities.py | 8 +- src/migrator/utilities/LambdaUtilities.py | 22 ++- tests/conftest.py | 21 +++ tests/migrator/steps/test_add_index.py | 28 +++- .../migrator/utilities/test_aws_utilities.py | 117 +++++++------ 11 files changed, 237 insertions(+), 133 deletions(-) create mode 100644 tests/conftest.py diff --git a/.travis.yml b/.travis.yml index 5aa56f0..d615d39 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,8 +4,9 @@ sudo: true python: - 3.6 - 3.7 +- 3.8 env: -- CONNECT_TO_AWS=NO +- CONNECT_TO_AWS=FALSE before_install: - sudo apt-get install -y ruby-coveralls install: diff --git a/requirements-to-freeze.txt b/requirements-to-freeze.txt index 37a9a49..8fdb7e8 100644 --- a/requirements-to-freeze.txt +++ b/requirements-to-freeze.txt @@ -13,5 +13,5 @@ flask aiohttp # Functional requirements -moto==1.3.15.dev172 +moto==1.3.15.dev414 tenacity==5.1.4 \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index 4c4346a..7d06b05 100755 --- a/setup.cfg +++ b/setup.cfg @@ -12,7 +12,8 @@ addopts = --cov-branch env = # OPTIONS: TRUE | FALSE - # YES: Runs each test in-memory first, and then runs it again against AWS + # FALSE: Runs each test in-memory only + # TRUE: Runs each test against AWS D:CONNECT_TO_AWS=FALSE [pycodestyle] diff --git a/src/migrator/steps/AddIndexStep.py b/src/migrator/steps/AddIndexStep.py index 18aaea8..de45f2b 100644 --- a/src/migrator/steps/AddIndexStep.py +++ b/src/migrator/steps/AddIndexStep.py @@ -40,7 +40,9 @@ def execute(self): # Create Role created_policy, created_role = self.aws_utils.create_iam_items(created_table, previous_table) # Create Lambda - func = self.aws_utils.create_aws_lambda(created_role, created_table['TableName']) + func = self.aws_utils.create_aws_lambda(created_role, + old_table=previous_table_name, + new_table=created_table['TableName']) # Create stream self.aws_utils.create_event_source_mapping(stream_arn=previous_table['LatestStreamArn'], function_arn=func['FunctionArn']) diff --git a/src/migrator/utilities/AwsUtilities.py b/src/migrator/utilities/AwsUtilities.py index 21428ba..ef72f10 100644 --- a/src/migrator/utilities/AwsUtilities.py +++ b/src/migrator/utilities/AwsUtilities.py @@ -1,6 +1,5 @@ import boto3 import logging -from boto3.dynamodb.conditions import AttributeNotExists from migrator.utilities.DynamoDButilities import DynamoDButilities from migrator.utilities.IAMutilities import lambda_stream_assume_role, lambda_stream_policy from migrator.utilities.LambdaUtilities import get_zipfile @@ -10,12 +9,6 @@ from time import sleep from uuid import uuid4 -_dynamodb = boto3.client('dynamodb') -_lambda = boto3.client('lambda') -_iam = boto3.client('iam') -_sts = boto3.client('sts') - -_dynamodb.meta. class AwsHistory: @@ -28,6 +21,9 @@ def __init__(self, identifier, version): self._identifier = identifier self._version = version self._ddb_utils = DynamoDButilities(identifier, version) + self._dynamodb = boto3.client('dynamodb') + self._lambda = boto3.client('lambda') + self._iam = boto3.client('iam') def created_table(self, name): self._ddb_utils.add_table(name) @@ -53,48 +49,47 @@ def rollback(self): functions = self._ddb_utils.get_created_functions() logger.warning(f"Deleting: Tables: {tables}") for name in tables: - _dynamodb.delete_table(TableName=name) + self._dynamodb.delete_table(TableName=name) # Wait for table to be deleted while True: try: - _dynamodb.describe_table(TableName=name) + self._dynamodb.describe_table(TableName=name) sleep(1) - except _dynamodb.exceptions.ResourceNotFoundException: + except self._dynamodb.exceptions.ResourceNotFoundException: self._ddb_utils.remove_table(name) break logger.warning(f"Deleting: Roles: {roles}") for arn in roles: name = arn[arn.rindex('/') + 1:] - attached_policies = [policy['PolicyArn'] for policy in _iam.list_attached_role_policies(RoleName=name)['AttachedPolicies']] + attached_policies = [policy['PolicyArn'] for policy in self._iam.list_attached_role_policies(RoleName=name)['AttachedPolicies']] # Detach any policies first for policy_arn in attached_policies: if policy_arn in policies: - _iam.detach_role_policy(RoleName=name, PolicyArn=policy_arn) + self._iam.detach_role_policy(RoleName=name, PolicyArn=policy_arn) # Then delete those policies logger.warning(f"Deleting: Policies: {policies}") for arn in policies: - _iam.delete_policy(PolicyArn=arn) + self._iam.delete_policy(PolicyArn=arn) self._ddb_utils.remove_policy(arn) # And the roles logger.warning(f"Deleting: Roles: {roles}") for arn in roles: name = arn[arn.rindex('/') + 1:] - _iam.delete_role(RoleName=name) + self._iam.delete_role(RoleName=name) self._ddb_utils.remove_role(arn) logger.warning(f"Deleting: EventSourceMappings: {mappings}") for uuid in mappings: - _lambda.delete_event_source_mapping(UUID=uuid) + self._lambda.delete_event_source_mapping(UUID=uuid) self._ddb_utils.remove_mapping(name) logger.warning(f"Deleting: Functions: {functions}") for name in functions: - _lambda.delete_function(FunctionName=name) + self._lambda.delete_function(FunctionName=name) self._ddb_utils.remove_function(name) class AwsUtilities: _account_id = None - ResourceNotFoundException = _dynamodb.exceptions.ResourceNotFoundException @retry(wait=AwsHistory.exponential, before_sleep=AwsHistory.sleep_action, stop=AwsHistory.stop) def retry(self, func, args): @@ -106,24 +101,29 @@ def __init__(self, identifier, version): self._identifier = identifier self._version = str(version) self._history = AwsHistory(self._identifier, self._version) + self._dynamodb = boto3.client('dynamodb') + self._lambda = boto3.client('lambda') + self._iam = boto3.client('iam') + self._sts = boto3.client('sts') initial_map = {'M': {}} try: self.describe_table(table_name=metadata_table_name) logger.debug(f"Metadata table '{metadata_table_name}' already exists") - item = _dynamodb.get_item(TableName=metadata_table_name, Key={'identifier': {'S': identifier}})['Item'] + item = self._dynamodb.get_item(TableName=metadata_table_name, Key={'identifier': {'S': identifier}})['Item'] if str(version) not in item: - _dynamodb.update_item(TableName=metadata_table_name, - Key={'identifier': {'S': self._identifier}}, - UpdateExpression="set #attr = :val", - ExpressionAttributeNames={'#attr': str(self._version)}, - ExpressionAttributeValues={':val': initial_map}) - except _dynamodb.exceptions.ResourceNotFoundException: + self._dynamodb.update_item(TableName=metadata_table_name, + Key={'identifier': {'S': self._identifier}}, + UpdateExpression="set #attr = :val", + ExpressionAttributeNames={'#attr': str(self._version)}, + ExpressionAttributeValues={':val': initial_map}) + except self._dynamodb.exceptions.ResourceNotFoundException: logger.debug(f"Metadata table '{metadata_table_name}' does not exist yet") self._create_table(metadata_table_properties, keep_history=False) # Add version information - _dynamodb.put_item(TableName=metadata_table_name, Item={'identifier': {'S': self._identifier}, - self._version: initial_map}) + self._dynamodb.put_item(TableName=metadata_table_name, + Item={'identifier': {'S': self._identifier}, + self._version: initial_map}) def get_region(self): return self._region @@ -131,7 +131,7 @@ def get_region(self): def create_table_if_not_exists(self, properties): try: return self.describe_table(properties['TableName'])['Table'] - except _dynamodb.exceptions.ResourceNotFoundException: + except self._dynamodb.exceptions.ResourceNotFoundException: return self.create_table(properties) def create_table(self, properties): @@ -148,7 +148,7 @@ def create_table(self, properties): e.reraise() def _create_table(self, properties, keep_history=True): - _dynamodb.create_table(**properties) + self._dynamodb.create_table(**properties) created_table = self.wait_for_table(properties) logger.info(f"Successfully created table {properties['TableName']}") if keep_history: @@ -167,7 +167,7 @@ def update_table(self, properties): def _update_table(): logger.info(f"Updating table: {properties}") logger.info(f"Current table: {self.describe_table(properties['TableName'])}") - _dynamodb.update_table(**properties) + self._dynamodb.update_table(**properties) updated_table = self.wait_for_table(properties) logger.info(f"Successfully updated table {properties['TableName']}") return updated_table @@ -178,28 +178,48 @@ def _update_table(): e.reraise() def update_data(self, table_name, key_schema): - unique_attr = f"{str(uuid4())}_migration" + unique_attr = self._get_or_create_unique_attr() + items, last_eval_key = self.get_items_without_attr(table_name, unique_attr) + while items: + for item in items: + key = {key['AttributeName']: { + ([*item[key['AttributeName']]][0]): item[key['AttributeName']][[*item[key['AttributeName']]][0]]} for + key in key_schema} + self._dynamodb.update_item(TableName=table_name, + Key=key, + UpdateExpression="set #attr = :val", + ExpressionAttributeNames={'#attr': unique_attr}, + ExpressionAttributeValues={':val': {'S': str(datetime.today())}}) + logger.debug(f"Successfully updated {len(items)} items in {table_name}") + if last_eval_key: + items, last_eval_key = self.get_items_without_attr(table_name, unique_attr, last_eval_key) + else: + items = None + if not items: + logger.debug(f"Finished updating items in {table_name}") + + def _get_or_create_unique_attr(self): + unique_attr = self._history._ddb_utils.get_created_attr() + unique_attr = f"{str(uuid4())}_migration" if not unique_attr else unique_attr[0] self._history._ddb_utils.add_attr_name(unique_attr) - items = _dynamodb.scan(TableName=table_name, Limit=200)['Items'] - logger.warning(f"Items in {table_name}: {items}") - for item in items: - key = {key['AttributeName']: { - ([*item[key['AttributeName']]][0]): item[key['AttributeName']][[*item[key['AttributeName']]][0]]} for - key in key_schema} - _dynamodb.update_item(TableName=table_name, - Key=key, - UpdateExpression="set #attr = :val", - ExpressionAttributeNames={'#attr': unique_attr}, - ExpressionAttributeValues={':val': {'S': str(datetime.today())}}) - logger.debug(f"Successfully updated data in {table_name}") - logger.warning(f"Items in {table_name}: {_dynamodb.scan(TableName=table_name)['Items']}") + return unique_attr + + def get_items_without_attr(self, table_name, unique_attr, last_evaluated = None): + kwargs = {'ExclusiveStartKey': last_evaluated} if last_evaluated else {} + scan = self._dynamodb.scan(TableName=table_name, + Limit=200, + FilterExpression='attribute_not_exists(#u_a)', + ExpressionAttributeNames={'#u_a': unique_attr}, + **kwargs) + return scan['Items'], scan['LastEvaluatedKey'] if 'LastEvaluatedKey' in scan else {} def describe_table(self, table_name): - return _dynamodb.describe_table(TableName=table_name) + return self._dynamodb.describe_table(TableName=table_name) def create_iam_items(self, created_table, previous_table): policy_document = lambda_stream_policy.substitute(region=self._region, - oldtable=previous_table['LatestStreamArn'], + oldtable=previous_table['TableArn'], + oldtablestream=previous_table['LatestStreamArn'], newtable=created_table['TableArn']) desc = ' created by dynamodb_migrator, migrating data from ' + previous_table['TableName'] + ' to ' + created_table['TableName'] created_policy = self.create_policy(desc, policy_document) @@ -211,10 +231,10 @@ def create_policy(self, desc, policy_document, policy_name=None): policy_name = policy_name or 'dynamodb_migrator_' + str(uuid4())[:4] def _create_policy(): - created_policy = _iam.create_policy(Path='/dynamodb_migrator/', - PolicyName=policy_name, - PolicyDocument=policy_document, - Description='Policy' + desc) + created_policy = self._iam.create_policy(Path='/dynamodb_migrator/', + PolicyName=policy_name, + PolicyDocument=policy_document, + Description='Policy' + desc) self._history.created_policy(created_policy['Policy']['Arn']) logger.info(f"Successfully created policy {policy_name}") return created_policy @@ -229,10 +249,10 @@ def create_role(self, desc, role_name=None): role_name = role_name or 'dynamodb_migrator_' + str(uuid4())[:4] def _create_role(): - created_role = _iam.create_role(Path='/dynamodb_migrator/', - RoleName=role_name, - AssumeRolePolicyDocument=lambda_stream_assume_role, - Description='Role' + desc) + created_role = self._iam.create_role(Path='/dynamodb_migrator/', + RoleName=role_name, + AssumeRolePolicyDocument=lambda_stream_assume_role, + Description='Role' + desc) self._history.created_role(created_role['Role']['Arn']) logger.info(f"Successfully created role {role_name}") return created_role @@ -250,21 +270,22 @@ def attach_policy_to_role(self, created_policy, created_role): :param created_role: """ def _attach_policy_to_role(): - _iam.attach_role_policy(PolicyArn=created_policy['Policy']['Arn'], RoleName=created_role['Role']['RoleName']) + self._iam.attach_role_policy(PolicyArn=created_policy['Policy']['Arn'], + RoleName=created_role['Role']['RoleName']) try: self.retry(_attach_policy_to_role, ()) except RetryError as e: self._history.rollback() e.reraise() - def create_aws_lambda(self, created_role, table_name, lambda_name = None): + def create_aws_lambda(self, created_role, old_table, new_table, lambda_name = None): def _create_aws_lambda(): - zipped_lambda_code = get_zipfile(table_name=table_name) + zipped_lambda_code = get_zipfile(old_table=old_table, new_table=new_table, unique_attr=self._get_or_create_unique_attr()) name = lambda_name or 'dynamodb_migrator_' + str(uuid4())[0:4] - func = _lambda.create_function(FunctionName=name, Runtime='python3.7', - Role=created_role['Role']['Arn'], - Handler='lambda_stream.copy', - Code={'ZipFile': zipped_lambda_code}) + func = self._lambda.create_function(FunctionName=name, Runtime='python3.7', + Role=created_role['Role']['Arn'], + Handler='lambda_stream.copy', + Code={'ZipFile': zipped_lambda_code}) self._history.created_function(func['FunctionArn']) logger.info(f"Successfully created function {name}") return func @@ -276,12 +297,12 @@ def _create_aws_lambda(): def create_event_source_mapping(self, stream_arn, function_arn): def _create_event_source_mapping(): - mapping = _lambda.create_event_source_mapping(EventSourceArn=stream_arn, FunctionName=function_arn, - Enabled=True, - BatchSize=10, MaximumBatchingWindowInSeconds=5, - StartingPosition='TRIM_HORIZON') + mapping = self._lambda.create_event_source_mapping(EventSourceArn=stream_arn, FunctionName=function_arn, + Enabled=True, + BatchSize=10, MaximumBatchingWindowInSeconds=5, + StartingPosition='TRIM_HORIZON') while mapping['State'] != 'Enabled': - mapping = _lambda.get_event_source_mapping(UUID=mapping['UUID']) + mapping = self._lambda.get_event_source_mapping(UUID=mapping['UUID']) sleep(1) self._history.created_mapping(mapping['UUID']) logger.info(f"Successfully created event_source_mapping {mapping['UUID']}") @@ -293,5 +314,5 @@ def _create_event_source_mapping(): e.reraise() def get_metadata_table(self): - return _dynamodb.get_item(TableName=metadata_table_name, - Key={'identifier': {'S': self._identifier}})['Item'] + return self._dynamodb.get_item(TableName=metadata_table_name, + Key={'identifier': {'S': self._identifier}})['Item'] diff --git a/src/migrator/utilities/DynamoDButilities.py b/src/migrator/utilities/DynamoDButilities.py index 27aefa3..cb0972b 100644 --- a/src/migrator/utilities/DynamoDButilities.py +++ b/src/migrator/utilities/DynamoDButilities.py @@ -1,3 +1,4 @@ +import boto3 from migrator.utilities.Utilities import metadata_table_name _accepted_table_properties = ['AttributeDefinitions', @@ -14,8 +15,7 @@ class DynamoDButilities: def __init__(self, identifier, version): - from migrator.utilities.AwsUtilities import _dynamodb - self._dynamodb = _dynamodb + self._dynamodb = boto3.client('dynamodb') self._identifier = identifier self._version = str(version) @@ -93,6 +93,9 @@ def remove_function(self, arn): def get_created_tables(self, version=None): return self.get_attr("tables", version=version) + def get_created_attr(self): + return self.get_attr("attribute_name") + def get_created_policies(self): return self.get_attr("policies") diff --git a/src/migrator/utilities/IAMutilities.py b/src/migrator/utilities/IAMutilities.py index 6e667d6..071ca2d 100644 --- a/src/migrator/utilities/IAMutilities.py +++ b/src/migrator/utilities/IAMutilities.py @@ -11,6 +11,12 @@ "dynamodb:UpdateItem" ], "Resource": "$newtable" + }, { + "Effect": "Allow", + "Action": [ + "dynamodb:UpdateItem" + ], + "Resource": "$oldtable" }, { "Effect": "Allow", "Action": [ @@ -19,7 +25,7 @@ "dynamodb:ListStreams", "dynamodb:GetRecords" ], - "Resource": "$oldtable" + "Resource": "$oldtablestream" }, { "Effect": "Allow", "Action": "logs:*", diff --git a/src/migrator/utilities/LambdaUtilities.py b/src/migrator/utilities/LambdaUtilities.py index 9d095b6..68f5c82 100644 --- a/src/migrator/utilities/LambdaUtilities.py +++ b/src/migrator/utilities/LambdaUtilities.py @@ -6,27 +6,39 @@ lambda_code = Template("""import boto3 import json +from datetime import datetime dynamodb = boto3.client('dynamodb') +old_table_name = "$oldtable" table_name = "$newtable" +unique_attr = "$uniqueattr" def copy(event, context): for record in event['Records']: + key = record['dynamodb']['Keys'] if record['eventName'] == 'REMOVE': response = dynamodb.delete_item(TableName=table_name, - Key=record['dynamodb']['Keys']) + Key=key) if record['eventName'] == 'INSERT' or record['eventName'] == 'MODIFY': - response = dynamodb.put_item(TableName=table_name, - Item=record['dynamodb']['NewImage']) + item = record['dynamodb']['NewImage'] + if unique_attr in item: + del item[unique_attr] + dynamodb.put_item(TableName=table_name, Item=item) + else: + dynamodb.update_item(TableName=old_table_name, + Key=key, + UpdateExpression="set #attr = :val", + ExpressionAttributeNames={'#attr': unique_attr}, + ExpressionAttributeValues={':val': {'S': str(datetime.today())}}) return { 'statusCode': 200 } """) -def get_zipfile(table_name): - lambda_content = lambda_code.substitute(newtable=table_name) +def get_zipfile(old_table, new_table, unique_attr): + lambda_content = lambda_code.substitute(oldtable=old_table, newtable=new_table, uniqueattr=unique_attr) return zip(lambda_content) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..34410c4 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,21 @@ +import pytest +from settings import CONNECT_TO_AWS +from mock_wrapper import log_target, dynamodb, patch_boto, get_moto_services, verify_everything_is_deleted + + +@pytest.fixture() +def dynamodb_server_mode(): + # Same behaviour as the #mock_server_mode decorator + # Used as a fixture, to ensure it plays nice with other fixtures (such as parametrize) + if CONNECT_TO_AWS: + log_target("AWS") + yield dynamodb + else: + log_target("MOCK SERVER MODE") + patch_boto() + moto_services = get_moto_services(['dynamodb', 'lambda', 'iam']) + + yield moto_services['dynamodb'] + verify_everything_is_deleted(dynamodb=moto_services['dynamodb'], + lmbda=moto_services['lambda'], + iam=moto_services['iam']) diff --git a/tests/migrator/steps/test_add_index.py b/tests/migrator/steps/test_add_index.py index 1f7f97a..572e357 100644 --- a/tests/migrator/steps/test_add_index.py +++ b/tests/migrator/steps/test_add_index.py @@ -45,6 +45,8 @@ def test_add_index_script__assert_metadata_table_is_updated(dynamodb, lmbda, iam # Moto does not allow mocked Lambda to access mocked DynamoDB - can only verify this in server mode # https://github.com/spulec/moto/issues/1317 +# Currently fails due to a bug in moto, where items are not removed from the stream +# https://github.com/spulec/moto/pull/2763 @mock_server_mode def test_add_index_script__assert_data_is_send_through_for_multiple_versions(dynamodb, lmbda, iam): if not CONNECT_TO_AWS: @@ -97,7 +99,6 @@ def insert_random_data(dynamodb, table_name): def update_random_data(dynamodb, table_name): - items = dynamodb.scan(TableName=table_name)['Items'] items = dynamodb.scan(TableName=metadata_table_name)['Items'] unique_attr = items[0]['2']['M']['attribute_name']['SS'][0] for cust_nr in customer_nrs: @@ -112,18 +113,24 @@ def update_random_data(dynamodb, table_name): def verify_random_data(dynamodb, table_name): items = dynamodb.scan(TableName=table_name)['Items'] - assert len(items) == len(customer_nrs) + logger.warning(f"Items in {table_name}: {items}") + assert len(items) == len(customer_nrs), f"Table {table_name} should have {len(customer_nrs)} items - has only {len(items)}" assert [item['customer_nr']['S'] for item in items].sort() == customer_nrs.sort() indexed_items = dynamodb.scan(TableName=f'{table_name}', IndexName='postcode_index')['Items'] - assert len(indexed_items) == len(customer_nrs) + assert len(indexed_items) == len(customer_nrs), f"Index postcode_index should have {len(customer_nrs)} items - has only {len(items)}" + # Verify that the migration-attribute is not migrated + # We only needed it in the first table to keep track of what's migrated + items = dynamodb.scan(TableName=metadata_table_name)['Items'] + unique_attr = items[0]['2']['M']['attribute_name']['SS'][0] + assert all(unique_attr not in item for item in items) -@retry(wait=wait_exponential(multiplier=1, min=2), stop=stop_after_attempt(5), before_sleep=before_sleep_log(logger, logging.DEBUG)) +@retry(wait=wait_exponential(multiplier=1, min=2), stop=stop_after_attempt(10), before_sleep=before_sleep_log(logger, logging.DEBUG)) def reverify_random_data(dynamodb, table_name): verify_random_data(dynamodb, table_name) -@retry(wait=wait_exponential(multiplier=1, min=2, max=120), stop=stop_after_attempt(5), before_sleep=before_sleep_log(logger, logging.DEBUG)) +@retry(wait=wait_exponential(multiplier=1, min=2), stop=stop_after_attempt(3), before_sleep=before_sleep_log(logger, logging.DEBUG)) def insert_and_verify_random_data(dynamodb, first_table, second_table): insert_random_data(dynamodb, first_table) verify_random_data(dynamodb, second_table) @@ -154,11 +161,12 @@ def test_add_index_script__assert_existing_streams_still_exist(dynamodb, lmbda, table = dynamodb.describe_table(TableName='customers')['Table'] policy_document = lambda_stream_policy.substitute(aws_utils.get_region(), oldtable='*', + oldtablestream='*', newtable='*') created_policy = aws_utils.create_policy('test_add_index_script__assert_existing_streams_still_exist', policy_document) created_role = aws_utils.create_role(desc='test_add_index_script__assert_existing_streams_still_exist') aws_utils.attach_policy_to_role(created_policy, created_role) - func = aws_utils.create_aws_lambda(created_role, table_name='N/A') + func = aws_utils.create_aws_lambda(created_role, old_table='N/A', new_table='N/A') # Create stream mapping = aws_utils.create_event_source_mapping(stream_arn=table['LatestStreamArn'], function_arn=func['FunctionArn']) @@ -182,11 +190,17 @@ def test_add_index_script__assert_existing_streams_still_exist(dynamodb, lmbda, def update_dynamodb_host_in_lambda(dynamodb, lmbda, version): + old_version = str(int(version) - 1) + first_table = dynamodb.scan(TableName=metadata_table_name)['Items'][0][old_version]['M']['tables']['SS'][0] created_items = dynamodb.scan(TableName=metadata_table_name)['Items'][0][version]['M'] + print(created_items) + unique_attr = created_items['attribute_name']['SS'][0] created_function_arn = created_items['functions']['SS'][0] created_function_name = created_function_arn[created_function_arn.rindex(':') + 1:] created_table = dynamodb.describe_table(TableName=created_items['tables']['SS'][0]) - existing_code = lambda_code.substitute(newtable=created_table['Table']['TableName']) + existing_code = lambda_code.substitute(oldtable=first_table, + newtable=created_table['Table']['TableName'], + uniqueattr=unique_attr) new_code = update_boto_client_endpoints(existing_code, str(os.environ['dynamodb_mock_endpoint_url'])) res = lmbda.update_function_code(FunctionName=created_function_name, ZipFile=zip(new_code)) lmbda.update_event_source_mapping(UUID=created_items['mappings']['SS'][0], FunctionName=res['FunctionArn']) diff --git a/tests/migrator/utilities/test_aws_utilities.py b/tests/migrator/utilities/test_aws_utilities.py index 9eb8cb1..f8b6a0f 100644 --- a/tests/migrator/utilities/test_aws_utilities.py +++ b/tests/migrator/utilities/test_aws_utilities.py @@ -1,5 +1,5 @@ +import pytest from botocore.exceptions import ClientError -from datetime import datetime from dateutil.parser import parse from mock_wrapper import mock_aws from migrator.utilities.AwsUtilities import AwsUtilities @@ -35,7 +35,7 @@ def test_table_can_be_created(dynamodb, _, __): @mock_aws def test_iam_policy_can_be_created_without_name(dynamodb, _, iam): aws_util = AwsUtilities(identifier=str(uuid4()), version='1') - policy_document = lambda_stream_policy.substitute(region='', oldtable='*', newtable='*') + policy_document = lambda_stream_policy.substitute(region='', oldtable='*', oldtablestream='*', newtable='*') policy = aws_util.create_policy(desc=str(uuid4()), policy_document=policy_document) # # Assert policy is created @@ -52,7 +52,7 @@ def test_iam_policy_can_be_created_without_name(dynamodb, _, iam): def test_iam_policy_can_be_created_with_name(dynamodb, _, iam): policy_name = str(uuid4()) aws_util = AwsUtilities(identifier=str(uuid4()), version='1') - policy_document = lambda_stream_policy.substitute(region='', oldtable='*', newtable='*') + policy_document = lambda_stream_policy.substitute(region='', oldtable='*', oldtablestream='*', newtable='*') policy = aws_util.create_policy(desc=str(uuid4()), policy_document=policy_document, policy_name=policy_name) # # Assert policy is created @@ -107,7 +107,7 @@ def test_rollback_when_creating_invalid_lambda(dynamodb, lmbda, iam): assert table_name in dynamodb.list_tables()['TableNames'] # Try to create Lambda, this time using AwsUtils try: - aws_util.create_aws_lambda(invalid_role, 'table_name') + aws_util.create_aws_lambda(invalid_role, old_table='example', new_table='example') assert False, "Creating AWS Lambda with an invalid role should fail" except ClientError as e: if "Value 'nonsense' at 'role' failed to satisfy constraint" in e.response['Error']['Message']: @@ -234,7 +234,7 @@ def get_policy_arns(): return sorted([policy['Arn'] for policy in iam.list_policies(Scope='Local', PathPrefix='/dynamodb_migrator/')['Policies']]) identifier = str(uuid4()) util = AwsUtilities(identifier=identifier, version='1') - policy_document = lambda_stream_policy.substitute(region='', oldtable='*', newtable='*') + policy_document = lambda_stream_policy.substitute(region='', oldtable='*', oldtablestream='*', newtable='*') policy1 = util.create_policy(desc=str(uuid4()), policy_document=policy_document)['Policy'] policy2 = util.create_policy(desc=str(uuid4()), policy_document=policy_document)['Policy'] # @@ -279,8 +279,8 @@ def get_existing_functions(): created_role = iam.create_role(RoleName=str(uuid4()), AssumeRolePolicyDocument=lambda_stream_assume_role) identifier = str(uuid4()) util = AwsUtilities(identifier=identifier, version='1') - fn1 = util.create_aws_lambda(created_role, table_name='ex')['FunctionArn'] - fn2 = util.create_aws_lambda(created_role, table_name='ex')['FunctionArn'] + fn1 = util.create_aws_lambda(created_role, old_table='example', new_table='example')['FunctionArn'] + fn2 = util.create_aws_lambda(created_role, old_table='example', new_table='example')['FunctionArn'] # # Assert functions are recorded assert get_existing_functions() == sorted([fn1, fn2]) @@ -371,47 +371,70 @@ def test_dynamodb_table_update_can_be_reverted(dynamodb, *_): delete_tables(dynamodb, [metadata_table_name]) -@mock_aws -def test_update_data(dynamodb, *_): - # - # Create table +@pytest.mark.parametrize(argnames="nr_of_items", + argvalues=[5, 100, 300, 1000], + ids=["test_with_5_items", "with_100_items", "with_300_items", "with_1000_items"]) +def test_update_data(nr_of_items, dynamodb_server_mode): util = AwsUtilities(identifier=str(uuid4()), version='1') - table = util.create_table_if_not_exists(table_properties) - # - # Add some data - nr_of_items = 300 - for _ in range(0, nr_of_items): - dynamodb.put_item(TableName=table_name, Item={'identifier': {'S': str(uuid4())}}) - # - # Update data - util.update_data(table_name, key_schema=table['KeySchema']) - # - # Update_key will be recorded - metadata = dynamodb.scan(TableName=metadata_table_name)['Items'][0] - update_key = metadata['1']['M']['attribute_name']['SS'][0] - # - # Verify data is updated - items = [item for item in dynamodb.scan(TableName=table_name)['Items'] if update_key in item] - assert len(items) == nr_of_items - assert all([parse(item[update_key]['S']) for item in items]) - # - # Clean up - delete_tables(dynamodb, [metadata_table_name, table_name]) - - -@mock_aws -def test_update_data__only_updates_non_updated(): - pass - - -@mock_aws -def test_update_data__can_be_called_twice(): - pass - - -@mock_aws -def test_update_data__only_return_key_attributes(): - pass + # Ensure we use the dynamodb pointing to server mode + original_dynamodb = util._dynamodb + util._dynamodb = dynamodb_server_mode + try: + # + # Create table + table = util.create_table_if_not_exists(table_properties) + # + # Add some data + for _ in range(0, nr_of_items): + util._dynamodb.put_item(TableName=table_name, Item={'identifier': {'S': str(uuid4())}}) + # + # Update data + util.update_data(table_name, key_schema=table['KeySchema']) + # + # Update_key will be recorded + metadata = util._dynamodb.scan(TableName=metadata_table_name)['Items'][0] + update_key = metadata['1']['M']['attribute_name']['SS'][0] + # + # Verify data is updated + items = [item for item in util._dynamodb.scan(TableName=table_name)['Items'] if update_key in item] + assert len(items) == nr_of_items + assert all([parse(item[update_key]['S']) for item in items]) + finally: + # + # Clean up + delete_tables(util._dynamodb, [metadata_table_name, table_name]) + util._dynamodb = original_dynamodb + + +def test_update_data__can_be_called_twice(dynamodb_server_mode): + nr_of_items = 5 + util = AwsUtilities(identifier=str(uuid4()), version='1') + # Ensure we use the dynamodb pointing to server mode + original_dynamodb = util._dynamodb + util._dynamodb = dynamodb_server_mode + try: + # + # Create table + table = util.create_table_if_not_exists(table_properties) + # + # Add some data + for _ in range(0, nr_of_items): + util._dynamodb.put_item(TableName=table_name, Item={'identifier': {'S': str(uuid4())}}) + # + # Update data + util.update_data(table_name, key_schema=table['KeySchema']) + # + # Update data again + util.update_data(table_name, key_schema=table['KeySchema']) + # + # Ensure only single unique key is used + metadata = util._dynamodb.scan(TableName=metadata_table_name)['Items'][0] + assert len(metadata['1']['M']['attribute_name']['SS']) == 1 + finally: + # + # Clean up + delete_tables(dynamodb_server_mode, [metadata_table_name, table_name]) + util._dynamodb = original_dynamodb def get_metadata(dynamodb, identifier, version): From b48bfb98657950f803627555e1ee5613323391dd Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Mon, 9 Mar 2020 12:10:10 +0000 Subject: [PATCH 4/4] Update requirements --- requirements-to-freeze.txt | 2 +- tests/migrator/steps/test_add_index.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/requirements-to-freeze.txt b/requirements-to-freeze.txt index 8fdb7e8..762f0fd 100644 --- a/requirements-to-freeze.txt +++ b/requirements-to-freeze.txt @@ -13,5 +13,5 @@ flask aiohttp # Functional requirements -moto==1.3.15.dev414 +moto==1.3.15.dev479 tenacity==5.1.4 \ No newline at end of file diff --git a/tests/migrator/steps/test_add_index.py b/tests/migrator/steps/test_add_index.py index 572e357..ed0db7f 100644 --- a/tests/migrator/steps/test_add_index.py +++ b/tests/migrator/steps/test_add_index.py @@ -193,7 +193,6 @@ def update_dynamodb_host_in_lambda(dynamodb, lmbda, version): old_version = str(int(version) - 1) first_table = dynamodb.scan(TableName=metadata_table_name)['Items'][0][old_version]['M']['tables']['SS'][0] created_items = dynamodb.scan(TableName=metadata_table_name)['Items'][0][version]['M'] - print(created_items) unique_attr = created_items['attribute_name']['SS'][0] created_function_arn = created_items['functions']['SS'][0] created_function_name = created_function_arn[created_function_arn.rindex(':') + 1:]