diff --git a/.github/workflows/dynamodb.yml b/.github/workflows/dynamodb.yml index d01b2225..db807ea4 100644 --- a/.github/workflows/dynamodb.yml +++ b/.github/workflows/dynamodb.yml @@ -40,7 +40,7 @@ jobs: os: ["ubuntu-latest"] # TODO: yarl, dependency of influxio, is currently not available on Python 3.12. # https://github.com/aio-libs/yarl/pull/942 - python-version: ["3.8", "3.11"] + python-version: ["3.9", "3.11"] localstack-version: ["3.6"] env: @@ -76,7 +76,7 @@ jobs: pip install "setuptools>=64" --upgrade # Install package in editable mode. - pip install --use-pep517 --prefer-binary --editable=.[dynamodb,test,develop] + pip install --use-pep517 --prefer-binary --editable=.[dynamodb,kinesis,test,develop] - name: Run linter and software tests run: | diff --git a/cratedb_toolkit/io/kinesis/adapter.py b/cratedb_toolkit/io/kinesis/adapter.py index 9b29f071..f6f709a4 100644 --- a/cratedb_toolkit/io/kinesis/adapter.py +++ b/cratedb_toolkit/io/kinesis/adapter.py @@ -1,6 +1,7 @@ import asyncio import typing as t +import boto3 from aiobotocore.session import AioSession from kinesis import Consumer, JsonProcessor, Producer from yarl import URL @@ -8,37 +9,63 @@ class KinesisAdapter: def __init__(self, kinesis_url: URL): - self.session = AioSession() - self.session.set_credentials(access_key=kinesis_url.user, secret_key=kinesis_url.password) + self.async_session = AioSession() + self.async_session.set_credentials(access_key=kinesis_url.user, secret_key=kinesis_url.password) + + self.session = boto3.Session( + aws_access_key_id=kinesis_url.user, + aws_secret_access_key=kinesis_url.password, + region_name=kinesis_url.query.get("region"), + ) + self.endpoint_url = None if kinesis_url.host and kinesis_url.host.lower() != "aws": self.endpoint_url = f"http://{kinesis_url.host}:{kinesis_url.port}" self.kinesis_url = kinesis_url self.region_name = kinesis_url.query.get("region") self.stream_name = self.kinesis_url.path.lstrip("/") + self.kinesis_client = self.session.client("kinesis", endpoint_url=self.endpoint_url) + + def consumer_factory(self, **kwargs): + return Consumer( + stream_name=self.stream_name, + session=self.async_session, + endpoint_url=self.endpoint_url, + region_name=self.region_name, + processor=JsonProcessor(), + **kwargs, + ) def consume_forever(self, handler: t.Callable): asyncio.run(self._consume_forever(handler)) + def consume_once(self, handler: t.Callable): + asyncio.run(self._consume_once(handler)) + async def _consume_forever(self, handler: t.Callable): """ Consume items from a Kinesis stream. """ - async with Consumer( - stream_name=self.stream_name, - session=self.session, - endpoint_url=self.endpoint_url, - region_name=self.region_name, + async with self.consumer_factory( # TODO: Make configurable. create_stream=True, iterator_type="TRIM_HORIZON", sleep_time_no_records=0.2, - processor=JsonProcessor(), ) as consumer: while True: async for item in consumer: handler(item) + async def _consume_once(self, handler: t.Callable): + async with self.consumer_factory( + # TODO: Make configurable. + create_stream=True, + iterator_type="TRIM_HORIZON", + sleep_time_no_records=0.2, + ) as consumer: + async for item in consumer: + handler(item) + def produce(self, data: t.Dict[str, t.Any]): asyncio.run(self._produce(data)) @@ -46,7 +73,7 @@ async def _produce(self, data: t.Dict[str, t.Any]): # Put item onto queue to be flushed via `put_records()`. async with Producer( stream_name=self.stream_name, - session=self.session, + session=self.async_session, endpoint_url=self.endpoint_url, region_name=self.region_name, # TODO: Make configurable. diff --git a/cratedb_toolkit/io/kinesis/relay.py b/cratedb_toolkit/io/kinesis/relay.py index 4060cf66..e2204156 100644 --- a/cratedb_toolkit/io/kinesis/relay.py +++ b/cratedb_toolkit/io/kinesis/relay.py @@ -42,7 +42,7 @@ def __init__( self.connection: sa.Connection self.progress_bar: tqdm - def start(self): + def start(self, once: bool = False): """ Read events from Kinesis stream, convert to SQL statements, and submit to CrateDB. """ @@ -57,9 +57,12 @@ def start(self): # https://github.com/tqdm/tqdm#redirecting-logging self.progress_bar = tqdm() with logging_redirect_tqdm(): - self.kinesis_adapter.consume_forever(self.process) + if once: + self.kinesis_adapter.consume_once(self.process_event) + else: + self.kinesis_adapter.consume_forever(self.process_event) - def process(self, event): + def process_event(self, event): try: record = json.loads(base64.b64decode(event["kinesis"]["data"]).decode("utf-8")) operation = self.translator.to_sql(record) diff --git a/pyproject.toml b/pyproject.toml index cae73840..5e1696eb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -155,6 +155,7 @@ io = [ "sqlalchemy>=2", ] kinesis = [ + "aiobotocore<2.15", "async-kinesis<1.2", "commons-codec>=0.0.12", "lorrystream[carabas]", diff --git a/tests/io/dynamodb/conftest.py b/tests/io/dynamodb/conftest.py index 8806c508..c21d0e86 100644 --- a/tests/io/dynamodb/conftest.py +++ b/tests/io/dynamodb/conftest.py @@ -29,7 +29,7 @@ def setup(self): from cratedb_toolkit.testing.testcontainers.localstack import LocalStackContainerWithKeepalive self.container = LocalStackContainerWithKeepalive() - self.container.with_services("dynamodb") + self.container.with_services("dynamodb", "kinesis") self.container.start() def finalize(self): @@ -44,10 +44,14 @@ def reset(self): for database_name in RESET_TABLES: self.client.drop_database(database_name) - def get_connection_url(self): + def get_connection_url_dynamodb(self): url = URL(self.container.get_url()) return f"dynamodb://LSIAQAAAAAAVNCBMPNSG:dummy@{url.host}:{url.port}" + def get_connection_url_kinesis_dynamodb_cdc(self): + url = URL(self.container.get_url()) + return f"kinesis+dynamodb+cdc://LSIAQAAAAAAVNCBMPNSG:dummy@{url.host}:{url.port}" + @pytest.fixture(scope="session") def dynamodb_service(): @@ -71,4 +75,4 @@ def dynamodb(dynamodb_service): @pytest.fixture(scope="session") def dynamodb_test_manager(dynamodb_service): - return DynamoDBTestManager(dynamodb_service.get_connection_url()) + return DynamoDBTestManager(dynamodb_service.get_connection_url_dynamodb()) diff --git a/tests/io/dynamodb/test_cli.py b/tests/io/dynamodb/test_cli.py index 80d87d63..45f7e2fe 100644 --- a/tests/io/dynamodb/test_cli.py +++ b/tests/io/dynamodb/test_cli.py @@ -10,8 +10,8 @@ def test_dynamodb_load_table(caplog, cratedb, dynamodb, dynamodb_test_manager): """ CLI test: Invoke `ctk load table` for DynamoDB. """ + dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/ProductCatalog?region=us-east-1" cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" - dynamodb_url = f"{dynamodb.get_connection_url()}/ProductCatalog?region=us-east-1" # Populate source database with sample dataset. dynamodb_test_manager.load_product_catalog() diff --git a/tests/io/dynamodb/test_copy.py b/tests/io/dynamodb/test_copy.py index b758d59a..69165a3a 100644 --- a/tests/io/dynamodb/test_copy.py +++ b/tests/io/dynamodb/test_copy.py @@ -24,8 +24,8 @@ def test_dynamodb_copy_list_of_objects(caplog, cratedb, dynamodb, dynamodb_test_ """ # Define source and target URLs. + dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/demo?region=us-east-1" cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" - dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1" # Populate source database with data. dynamodb_test_manager.load_records(table_name="demo", records=[RECORD_UTM]) diff --git a/tests/io/dynamodb/test_relay.py b/tests/io/dynamodb/test_relay.py new file mode 100644 index 00000000..de3c8403 --- /dev/null +++ b/tests/io/dynamodb/test_relay.py @@ -0,0 +1,61 @@ +import time + +import botocore +import pytest + +from cratedb_toolkit.io.kinesis.relay import KinesisRelay +from tests.io.test_processor import DYNAMODB_CDC_INSERT_NESTED, DYNAMODB_CDC_MODIFY_NESTED, wrap_kinesis + +pytestmark = pytest.mark.kinesis + +pytest.importorskip("commons_codec", reason="Only works with commons-codec installed") +pytest.importorskip("kinesis", reason="Only works with async-kinesis installed") + +from commons_codec.transform.dynamodb import DynamoDBCDCTranslator # noqa: E402 + + +def test_kinesis_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb): + """ + Roughly verify that the AWS DynamoDB CDC processing works as expected. + """ + + # Define source and target URLs. + kinesis_url = f"{dynamodb.get_connection_url_kinesis_dynamodb_cdc()}/demo?region=us-east-1" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Define target table name. + table_name = '"testdrive"."demo"' + + # Create target table. + cratedb.database.run_sql(DynamoDBCDCTranslator(table_name=table_name).sql_ddl) + + # Define two CDC events: INSERT and UPDATE. + events = [ + wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED), + wrap_kinesis(DYNAMODB_CDC_MODIFY_NESTED), + ] + + # Initialize table loader. + table_loader = KinesisRelay(kinesis_url=kinesis_url, cratedb_url=cratedb_url) + + # Delete stream for blank canvas. + try: + table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo") + except botocore.exceptions.ClientError as error: + if error.response["Error"]["Code"] != "ResourceNotFoundException": + raise + + # TODO: LocalStack needs a while when deleting the Stream. + time.sleep(0.4) + + # Populate source database with data. + for event in events: + table_loader.kinesis_adapter.produce(event) + + # Run transfer command, consuming once not forever. + table_loader.start(once=True) + + # Verify data in target database. + assert cratedb.database.count_records(table_name) == 1 + results = cratedb.database.run_sql(f"SELECT * FROM {table_name}", records=True) # noqa: S608 + assert results[0]["data"]["list_of_objects"] == [{"foo": "bar"}, {"baz": "qux"}] diff --git a/tests/io/test_processor.py b/tests/io/test_processor.py index 6de1bad0..3439380b 100644 --- a/tests/io/test_processor.py +++ b/tests/io/test_processor.py @@ -133,8 +133,6 @@ def test_processor_kinesis_dynamodb_insert_update(cratedb, reset_handler, mocker from cratedb_toolkit.io.processor.kinesis_lambda import handler # Define two CDC events: INSERT and UPDATE. - # They have to be conveyed separately because CrateDB needs a - # `REFRESH TABLE` operation between them. event = { "Records": [ wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED), @@ -162,6 +160,6 @@ def wrap_kinesis(data): "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "kinesis": { "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", - "data": base64.b64encode(json.dumps(data).encode("utf-8")), + "data": base64.b64encode(json.dumps(data).encode("utf-8")).decode(), }, }