Skip to content

Commit

Permalink
DynamoDB CDC: Add ctk load table interface for processing CDC events
Browse files Browse the repository at this point in the history
In contrast to the Lambda-based processor implementation, this one is
a standalone one that can be used optimally in any Python environment,
managed or not.
  • Loading branch information
amotl committed Sep 13, 2024
1 parent 78c6637 commit 337261b
Show file tree
Hide file tree
Showing 19 changed files with 577 additions and 236 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/dynamodb.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ jobs:
os: ["ubuntu-latest"]
# TODO: yarl, dependency of influxio, is currently not available on Python 3.12.
# https://github.com/aio-libs/yarl/pull/942
python-version: ["3.8", "3.11"]
localstack-version: ["3.7"]
python-version: ["3.9", "3.11"]
localstack-version: ["3.6"]

env:
OS: ${{ matrix.os }}
Expand Down Expand Up @@ -78,7 +78,7 @@ jobs:
pip install "setuptools>=64" --upgrade
# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[dynamodb,test,develop]
pip install --use-pep517 --prefer-binary --editable=.[dynamodb,kinesis,test,develop]
- name: Run linter and software tests
run: |
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- MongoDB: Unlock processing JSON files from HTTP resource, using `https+bson://`
- MongoDB: Optionally filter server collection using MongoDB query expression
- MongoDB: Improve error handling wrt. bulk operations vs. usability
- DynamoDB CDC: Add `ctk load table` interface for processing CDC events

## 2024/09/10 v0.0.22
- MongoDB: Rename columns with leading underscores to use double leading underscores
Expand Down
9 changes: 9 additions & 0 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,18 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf
logger.error("Data loading failed or incomplete")
return False

elif source_url_obj.scheme.startswith("kinesis"):
if "+cdc" in source_url_obj.scheme:
from cratedb_toolkit.io.kinesis.api import kinesis_relay

return kinesis_relay(str(source_url_obj), target_url)
else:
raise NotImplementedError("Loading full data via Kinesis not implemented yet")

elif source_url_obj.scheme in ["file+bson", "http+bson", "https+bson", "mongodb", "mongodb+srv"]:
if "+cdc" in source_url_obj.scheme:
source_url_obj.scheme = source_url_obj.scheme.replace("+cdc", "")

from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc

return mongodb_relay_cdc(str(source_url_obj), target_url, progress=True)
Expand Down
Empty file.
83 changes: 83 additions & 0 deletions cratedb_toolkit/io/kinesis/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import asyncio
import typing as t

import boto3
from aiobotocore.session import AioSession
from kinesis import Consumer, JsonProcessor, Producer
from yarl import URL


class KinesisAdapter:
def __init__(self, kinesis_url: URL):
self.async_session = AioSession()
self.async_session.set_credentials(access_key=kinesis_url.user, secret_key=kinesis_url.password)

self.session = boto3.Session(
aws_access_key_id=kinesis_url.user,
aws_secret_access_key=kinesis_url.password,
region_name=kinesis_url.query.get("region"),
)

self.endpoint_url = None
if kinesis_url.host and kinesis_url.host.lower() != "aws":
self.endpoint_url = f"http://{kinesis_url.host}:{kinesis_url.port}"
self.kinesis_url = kinesis_url
self.region_name = kinesis_url.query.get("region")
self.stream_name = self.kinesis_url.path.lstrip("/")
self.kinesis_client = self.session.client("kinesis", endpoint_url=self.endpoint_url)

def consumer_factory(self, **kwargs):
return Consumer(
stream_name=self.stream_name,
session=self.async_session,
endpoint_url=self.endpoint_url,
region_name=self.region_name,
processor=JsonProcessor(),
**kwargs,
)

def consume_forever(self, handler: t.Callable):
asyncio.run(self._consume_forever(handler))

def consume_once(self, handler: t.Callable):
asyncio.run(self._consume_once(handler))

async def _consume_forever(self, handler: t.Callable):
"""
Consume items from a Kinesis stream.
"""
async with self.consumer_factory(
# TODO: Make configurable.
create_stream=True,
iterator_type="TRIM_HORIZON",
sleep_time_no_records=0.2,
) as consumer:
while True:
async for item in consumer:
handler(item)

async def _consume_once(self, handler: t.Callable):
async with self.consumer_factory(
# TODO: Make configurable.
create_stream=True,
iterator_type="TRIM_HORIZON",
sleep_time_no_records=0.2,
) as consumer:
async for item in consumer:
handler(item)

def produce(self, data: t.Dict[str, t.Any]):
asyncio.run(self._produce(data))

async def _produce(self, data: t.Dict[str, t.Any]):
# Put item onto queue to be flushed via `put_records()`.
async with Producer(
stream_name=self.stream_name,
session=self.async_session,
endpoint_url=self.endpoint_url,
region_name=self.region_name,
# TODO: Make configurable.
create_stream=True,
buffer_time=0.01,
) as producer:
await producer.put(data)
6 changes: 6 additions & 0 deletions cratedb_toolkit/io/kinesis/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from cratedb_toolkit.io.kinesis.relay import KinesisRelay


def kinesis_relay(source_url, target_url):
ka = KinesisRelay(source_url, target_url)
ka.start()
82 changes: 82 additions & 0 deletions cratedb_toolkit/io/kinesis/relay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import base64
import json
import logging

import sqlalchemy as sa
from commons_codec.transform.dynamodb import DynamoDBCDCTranslator
from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm
from yarl import URL

from cratedb_toolkit.io.kinesis.adapter import KinesisAdapter
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util import DatabaseAdapter

logger = logging.getLogger(__name__)


class KinesisRelay:
"""
Relay events from Kinesis into CrateDB table.
"""

def __init__(
self,
kinesis_url: str,
cratedb_url: str,
):
cratedb_address = DatabaseAddress.from_string(cratedb_url)
cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode()
cratedb_table = cratedb_table_address.fullname

self.kinesis_url = URL(kinesis_url)
self.kinesis_adapter = KinesisAdapter(self.kinesis_url)
self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False)
self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table)

if "dynamodb+cdc" in self.kinesis_url.scheme:
self.translator = DynamoDBCDCTranslator(table_name=self.cratedb_table)
else:
raise NotImplementedError(f"Data processing not implemented for {self.kinesis_url}")

self.connection: sa.Connection
self.progress_bar: tqdm

def start(self, once: bool = False):
"""
Read events from Kinesis stream, convert to SQL statements, and submit to CrateDB.
"""
logger.info(f"Source: Kinesis stream={self.kinesis_adapter.stream_name} count=unknown")
self.connection = self.cratedb_adapter.engine.connect()
if not self.cratedb_adapter.table_exists(self.cratedb_table):
self.connection.execute(sa.text(self.translator.sql_ddl))
self.connection.commit()
records_target = self.cratedb_adapter.count_records(self.cratedb_table)
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}")
# Harmonize logging and progress bar.
# https://github.com/tqdm/tqdm#redirecting-logging
self.progress_bar = tqdm()
with logging_redirect_tqdm():
if once:
self.kinesis_adapter.consume_once(self.process_event)
else:
self.kinesis_adapter.consume_forever(self.process_event)

def process_event(self, event):
try:
record = json.loads(base64.b64decode(event["kinesis"]["data"]).decode("utf-8"))
operation = self.translator.to_sql(record)
except Exception:
logger.exception("Decoding Kinesis event failed")
return
try:
# Process record.
self.connection.execute(sa.text(operation.statement), operation.parameters)

# Processing alternating CDC events requires write synchronization.
self.connection.execute(sa.text(f"REFRESH TABLE {self.cratedb_table}"))

self.connection.commit()
except sa.exc.ProgrammingError as ex:
logger.warning(f"Running query failed: {ex}")
self.progress_bar.update()
Loading

0 comments on commit 337261b

Please sign in to comment.