Skip to content

Commit

Permalink
DynamoDB: Software integration test for standalone CDC
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Aug 29, 2024
1 parent a7c703d commit 092da37
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 20 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dynamodb.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ jobs:
pip install "setuptools>=64" --upgrade
# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[dynamodb,test,develop]
pip install --use-pep517 --prefer-binary --editable=.[dynamodb,kinesis,test,develop]
- name: Run linter and software tests
run: |
Expand Down
45 changes: 36 additions & 9 deletions cratedb_toolkit/io/kinesis/adapter.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,79 @@
import asyncio
import typing as t

import boto3
from aiobotocore.session import AioSession
from kinesis import Consumer, JsonProcessor, Producer
from yarl import URL


class KinesisAdapter:
def __init__(self, kinesis_url: URL):
self.session = AioSession()
self.session.set_credentials(access_key=kinesis_url.user, secret_key=kinesis_url.password)
self.async_session = AioSession()
self.async_session.set_credentials(access_key=kinesis_url.user, secret_key=kinesis_url.password)

self.session = boto3.Session(
aws_access_key_id=kinesis_url.user,
aws_secret_access_key=kinesis_url.password,
region_name=kinesis_url.query.get("region"),
)

self.endpoint_url = None
if kinesis_url.host and kinesis_url.host.lower() != "aws":
self.endpoint_url = f"http://{kinesis_url.host}:{kinesis_url.port}"
self.kinesis_url = kinesis_url
self.region_name = kinesis_url.query.get("region")
self.stream_name = self.kinesis_url.path.lstrip("/")
self.kinesis_client = self.session.client("kinesis", endpoint_url=self.endpoint_url)

def consumer_factory(self, **kwargs):
return Consumer(
stream_name=self.stream_name,
session=self.async_session,
endpoint_url=self.endpoint_url,
region_name=self.region_name,
processor=JsonProcessor(),
**kwargs,
)

def consume_forever(self, handler: t.Callable):
asyncio.run(self._consume_forever(handler))

def consume_once(self, handler: t.Callable):
asyncio.run(self._consume_once(handler))

async def _consume_forever(self, handler: t.Callable):
"""
Consume items from a Kinesis stream.
"""
async with Consumer(
stream_name=self.stream_name,
session=self.session,
endpoint_url=self.endpoint_url,
region_name=self.region_name,
async with self.consumer_factory(
# TODO: Make configurable.
create_stream=True,
iterator_type="TRIM_HORIZON",
sleep_time_no_records=0.2,
processor=JsonProcessor(),
) as consumer:
while True:
async for item in consumer:
handler(item)

async def _consume_once(self, handler: t.Callable):
async with self.consumer_factory(
# TODO: Make configurable.
create_stream=True,
iterator_type="TRIM_HORIZON",
sleep_time_no_records=0.2,
) as consumer:
async for item in consumer:
handler(item)

def produce(self, data: t.Dict[str, t.Any]):
asyncio.run(self._produce(data))

async def _produce(self, data: t.Dict[str, t.Any]):
# Put item onto queue to be flushed via `put_records()`.
async with Producer(
stream_name=self.stream_name,
session=self.session,
session=self.async_session,
endpoint_url=self.endpoint_url,
region_name=self.region_name,
# TODO: Make configurable.
Expand Down
9 changes: 6 additions & 3 deletions cratedb_toolkit/io/kinesis/relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(
self.connection: sa.Connection
self.progress_bar: tqdm

def start(self):
def start(self, once: bool = False):
"""
Read events from Kinesis stream, convert to SQL statements, and submit to CrateDB.
"""
Expand All @@ -57,9 +57,12 @@ def start(self):
# https://github.com/tqdm/tqdm#redirecting-logging
self.progress_bar = tqdm()
with logging_redirect_tqdm():
self.kinesis_adapter.consume_forever(self.process)
if once:
self.kinesis_adapter.consume_once(self.process_event)
else:
self.kinesis_adapter.consume_forever(self.process_event)

def process(self, event):
def process_event(self, event):
try:
record = json.loads(base64.b64decode(event["kinesis"]["data"]).decode("utf-8"))
operation = self.translator.to_sql(record)
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ io = [
"sqlalchemy>=2",
]
kinesis = [
"aiobotocore<2.15",
"async-kinesis<1.2",
"commons-codec>=0.0.12",
"lorrystream[carabas]",
Expand Down
10 changes: 7 additions & 3 deletions tests/io/dynamodb/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def setup(self):
from cratedb_toolkit.testing.testcontainers.localstack import LocalStackContainerWithKeepalive

self.container = LocalStackContainerWithKeepalive()
self.container.with_services("dynamodb")
self.container.with_services("dynamodb", "kinesis")
self.container.start()

def finalize(self):
Expand All @@ -44,10 +44,14 @@ def reset(self):
for database_name in RESET_TABLES:
self.client.drop_database(database_name)

def get_connection_url(self):
def get_connection_url_dynamodb(self):
url = URL(self.container.get_url())
return f"dynamodb://LSIAQAAAAAAVNCBMPNSG:dummy@{url.host}:{url.port}"

def get_connection_url_kinesis_dynamodb_cdc(self):
url = URL(self.container.get_url())
return f"kinesis+dynamodb+cdc://LSIAQAAAAAAVNCBMPNSG:dummy@{url.host}:{url.port}"


@pytest.fixture(scope="session")
def dynamodb_service():
Expand All @@ -71,4 +75,4 @@ def dynamodb(dynamodb_service):

@pytest.fixture(scope="session")
def dynamodb_test_manager(dynamodb_service):
return DynamoDBTestManager(dynamodb_service.get_connection_url())
return DynamoDBTestManager(dynamodb_service.get_connection_url_dynamodb())
2 changes: 1 addition & 1 deletion tests/io/dynamodb/test_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ def test_dynamodb_copy_list_of_objects(caplog, cratedb, dynamodb, dynamodb_test_
"""

# Define source and target URLs.
dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/demo?region=us-east-1"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"
dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1"

# Populate source database with data.
dynamodb_test_manager.load_records(table_name="demo", records=[RECORD_UTM])
Expand Down
61 changes: 61 additions & 0 deletions tests/io/dynamodb/test_relay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import time

import botocore
import pytest

from cratedb_toolkit.io.kinesis.relay import KinesisRelay
from tests.io.test_processor import DYNAMODB_CDC_INSERT_NESTED, DYNAMODB_CDC_MODIFY_NESTED, wrap_kinesis

pytestmark = pytest.mark.kinesis

pytest.importorskip("commons_codec", reason="Only works with commons-codec installed")
pytest.importorskip("kinesis", reason="Only works with async-kinesis installed")

from commons_codec.transform.dynamodb import DynamoDBCDCTranslator # noqa: E402


def test_kinesis_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb):
"""
Roughly verify that the AWS DynamoDB CDC processing works as expected.
"""

# Define source and target URLs.
kinesis_url = f"{dynamodb.get_connection_url_kinesis_dynamodb_cdc()}/demo?region=us-east-1"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Define target table name.
table_name = '"testdrive"."demo"'

# Create target table.
cratedb.database.run_sql(DynamoDBCDCTranslator(table_name=table_name).sql_ddl)

# Define two CDC events: INSERT and UPDATE.
events = [
wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED),
wrap_kinesis(DYNAMODB_CDC_MODIFY_NESTED),
]

# Initialize table loader.
table_loader = KinesisRelay(kinesis_url=kinesis_url, cratedb_url=cratedb_url)

# Delete stream for blank canvas.
try:
table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo")
except botocore.exceptions.ClientError as error:
if error.response["Error"]["Code"] != "ResourceNotFoundException":
raise

# TODO: LocalStack needs a while when deleting the Stream.
time.sleep(0.4)

# Populate source database with data.
for event in events:
table_loader.kinesis_adapter.produce(event)

# Run transfer command, consuming once not forever.
table_loader.start(once=True)

# Verify data in target database.
assert cratedb.database.count_records(table_name) == 1
results = cratedb.database.run_sql(f"SELECT * FROM {table_name}", records=True) # noqa: S608
assert results[0]["data"]["list_of_objects"] == [{"foo": "bar"}, {"baz": "qux"}]
4 changes: 1 addition & 3 deletions tests/io/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,6 @@ def test_processor_kinesis_dynamodb_insert_update(cratedb, reset_handler, mocker
from cratedb_toolkit.io.processor.kinesis_lambda import handler

# Define two CDC events: INSERT and UPDATE.
# They have to be conveyed separately because CrateDB needs a
# `REFRESH TABLE` operation between them.
event = {
"Records": [
wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED),
Expand Down Expand Up @@ -162,6 +160,6 @@ def wrap_kinesis(data):
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"kinesis": {
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": base64.b64encode(json.dumps(data).encode("utf-8")),
"data": base64.b64encode(json.dumps(data).encode("utf-8")).decode(),
},
}

0 comments on commit 092da37

Please sign in to comment.