Skip to content

Commit

Permalink
DynamoDB: Add pagination support for full-load table loader
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 2, 2024
1 parent f89086e commit a063a5b
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 64 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@


## Unreleased
- DynamoDB: Add special decoding for varied lists.
Store them into a separate `OBJECT(IGNORED)` column in CrateDB.
- DynamoDB: Add pagination support for `full-load` table loader

## 2024/08/27 v0.0.20
- DMS/DynamoDB: Fix table name quoting within CDC processor handler
Expand Down
39 changes: 35 additions & 4 deletions cratedb_toolkit/io/dynamodb/adapter.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import logging
import typing as t

Check warning on line 2 in cratedb_toolkit/io/dynamodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/dynamodb/adapter.py#L1-L2

Added lines #L1 - L2 were not covered by tests

import boto3
from yarl import URL

logger = logging.getLogger(__name__)

Check warning on line 7 in cratedb_toolkit/io/dynamodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/dynamodb/adapter.py#L7

Added line #L7 was not covered by tests


class DynamoDBAdapter:
def __init__(self, dynamodb_url: URL, echo: bool = False):
def __init__(self, dynamodb_url: URL):

Check warning on line 11 in cratedb_toolkit/io/dynamodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/dynamodb/adapter.py#L11

Added line #L11 was not covered by tests
self.session = boto3.Session(
aws_access_key_id=dynamodb_url.user,
aws_secret_access_key=dynamodb_url.password,
Expand All @@ -15,11 +20,37 @@ def __init__(self, dynamodb_url: URL, echo: bool = False):
self.dynamodb_resource = self.session.resource("dynamodb", endpoint_url=endpoint_url)
self.dynamodb_client = self.session.client("dynamodb", endpoint_url=endpoint_url)

def scan(self, table_name: str):
def scan(

Check warning on line 23 in cratedb_toolkit/io/dynamodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/dynamodb/adapter.py#L23

Added line #L23 was not covered by tests
self,
table_name: str,
page_size: int = 1000,
consistent_read: bool = False,
on_error: t.Literal["log", "raise"] = "log",
) -> t.Generator[t.Dict, None, None]:
"""
Return all items from DynamoDB table.
Fetch and generate all items from a DynamoDB table, with pagination.
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.Pagination
"""
return self.dynamodb_client.scan(TableName=table_name)
key = None
while True:
try:
scan_kwargs = {"TableName": table_name, "ConsistentRead": consistent_read, "Limit": page_size}
if key is not None:
scan_kwargs.update({"ExclusiveStartKey": key})
response = self.dynamodb_client.scan(**scan_kwargs)
yield response
key = response.get("LastEvaluatedKey", None)
if key is None:
break
except Exception as ex:
if on_error == "log":
logger.exception("Error reading DynamoDB table")
elif on_error == "raise":
raise

Check warning on line 50 in cratedb_toolkit/io/dynamodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/dynamodb/adapter.py#L35-L50

Added lines #L35 - L50 were not covered by tests
else:
raise ValueError(f"Unknown 'on_error' value: {on_error}") from ex
break

Check warning on line 53 in cratedb_toolkit/io/dynamodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/dynamodb/adapter.py#L52-L53

Added lines #L52 - L53 were not covered by tests

def count_records(self, table_name: str):
table = self.dynamodb_resource.Table(table_name)
Expand Down
39 changes: 25 additions & 14 deletions cratedb_toolkit/io/dynamodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.data import asbool

Check warning on line 12 in cratedb_toolkit/io/dynamodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/dynamodb/copy.py#L12

Added line #L12 was not covered by tests

logger = logging.getLogger(__name__)

Expand All @@ -23,6 +24,7 @@ def __init__(
dynamodb_url: str,
cratedb_url: str,
progress: bool = False,
debug: bool = True,
):
cratedb_address = DatabaseAddress.from_string(cratedb_url)
cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode()
Expand All @@ -36,38 +38,47 @@ def __init__(
self.translator = DynamoDBFullLoadTranslator(table_name=self.cratedb_table)

self.progress = progress
self.debug = debug

Check warning on line 41 in cratedb_toolkit/io/dynamodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/dynamodb/copy.py#L41

Added line #L41 was not covered by tests

self.page_size: int = int(self.dynamodb_url.query.get("page-size", 1000))
self.consistent_read: bool = asbool(self.dynamodb_url.query.get("consistent-read", False))

Check warning on line 44 in cratedb_toolkit/io/dynamodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/dynamodb/copy.py#L43-L44

Added lines #L43 - L44 were not covered by tests

def start(self):
"""
Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB.
"""
records_in = self.dynamodb_adapter.count_records(self.dynamodb_table)
logger.info(f"Source: DynamoDB table={self.dynamodb_table} count={records_in}")
logger_on_error = logger.warning
if self.debug:
logger_on_error = logger.exception

Check warning on line 54 in cratedb_toolkit/io/dynamodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/dynamodb/copy.py#L52-L54

Added lines #L52 - L54 were not covered by tests
with self.cratedb_adapter.engine.connect() as connection:
if not self.cratedb_adapter.table_exists(self.cratedb_table):
connection.execute(sa.text(self.translator.sql_ddl))
connection.commit()
records_target = self.cratedb_adapter.count_records(self.cratedb_table)
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}")
progress_bar = tqdm(total=records_in)
result = self.dynamodb_adapter.scan(table_name=self.dynamodb_table)
records_out = 0
for operation in self.items_to_operations(result["Items"]):
for result in self.dynamodb_adapter.scan(

Check warning on line 63 in cratedb_toolkit/io/dynamodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/dynamodb/copy.py#L63

Added line #L63 was not covered by tests
table_name=self.dynamodb_table,
consistent_read=self.consistent_read,
page_size=self.page_size,
):
result_size = len(result["Items"])
try:
operation = self.translator.to_sql(result["Items"])
except Exception as ex:
logger_on_error(f"Transforming query failed: {ex}")
continue

Check warning on line 73 in cratedb_toolkit/io/dynamodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/dynamodb/copy.py#L68-L73

Added lines #L68 - L73 were not covered by tests
try:
connection.execute(sa.text(operation.statement), operation.parameters)
records_out += 1
except sa.exc.ProgrammingError as ex:
logger.warning(f"Running query failed: {ex}")
progress_bar.update()
records_out += result_size
progress_bar.update(n=result_size)
except Exception as ex:
logger_on_error(f"Executing query failed: {ex}")

Check warning on line 79 in cratedb_toolkit/io/dynamodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/dynamodb/copy.py#L76-L79

Added lines #L76 - L79 were not covered by tests
progress_bar.close()
connection.commit()
logger.info(f"Number of records written: {records_out}")
if records_out < records_in:
if records_out == 0:

Check warning on line 83 in cratedb_toolkit/io/dynamodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/dynamodb/copy.py#L83

Added line #L83 was not covered by tests
logger.warning("No data has been copied")

def items_to_operations(self, items):
"""
Convert data for record items to INSERT statements.
"""
for item in items:
yield self.translator.to_sql(item)
19 changes: 19 additions & 0 deletions doc/io/dynamodb/loader.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,23 @@ ctk shell --command "SELECT * FROM testdrive.demo;"
ctk show table "testdrive.demo"
```

## Options

### `page-size`
The source URL accepts the `page-size` option to configure DynamoDB
[pagination]. The default value is `1000`.
```shell
ctk load table .../ProductCatalog?region=us-east-1&page-size=5000
```

### `consistent-read`
The source URL accepts the `consistent-read` option to configure DynamoDB
[read consistency]. The default value is `false`.
```shell
ctk load table .../ProductCatalog?region=us-east-1&consistent-read=true
```


## Variants

### CrateDB Cloud
Expand Down Expand Up @@ -66,3 +83,5 @@ docker run \

[Credentials for accessing LocalStack AWS API]: https://docs.localstack.cloud/references/credentials/
[Get started with DynamoDB on LocalStack]: https://docs.localstack.cloud/user-guide/aws/dynamodb/
[pagination]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.Pagination
[read consistency]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.ReadConsistency
8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ docs = [
]
dynamodb = [
"boto3",
"commons-codec>=0.0.12",
"commons-codec @ git+https://github.com/crate/commons-codec.git@dynamodb-full-load-batch",
]
full = [
"cratedb-toolkit[cfr,cloud,datasets,io,service]",
Expand All @@ -155,11 +155,11 @@ io = [
"sqlalchemy>=2",
]
kinesis = [
"commons-codec>=0.0.12",
"lorrystream[carabas]",
"commons-codec @ git+https://github.com/crate/commons-codec.git@dynamodb-full-load-batch",
"lorrystream[carabas]>=0.0.6",
]
mongodb = [
"commons-codec[mongodb,zyp]>=0.0.12",
"commons-codec[mongodb,zyp] @ git+https://github.com/crate/commons-codec.git@dynamodb-full-load-batch",
"cratedb-toolkit[io]",
"orjson<4,>=3.3.1",
"pymongo<5,>=3.10.1",
Expand Down
42 changes: 42 additions & 0 deletions tests/io/dynamodb/test_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import pytest
from botocore.exceptions import ParamValidationError
from yarl import URL

from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter

pytestmark = pytest.mark.dynamodb


RECORD = {
"Id": {"N": "101"},
}


def test_adapter_scan_success(dynamodb):
dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1"
adapter = DynamoDBAdapter(URL(dynamodb_url))
adapter.scan("foo")


def test_adapter_scan_failure_consistent_read(dynamodb):
"""
Check supplying invalid parameters to `DynamoDBAdapter` fails as expected.
"""
dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1"
adapter = DynamoDBAdapter(URL(dynamodb_url))

with pytest.raises(ParamValidationError) as ex:
next(adapter.scan("demo", consistent_read=-42, on_error="raise"))
assert ex.match("Parameter validation failed:\nInvalid type for parameter ConsistentRead, value: -42.*")


def test_adapter_scan_failure_page_size(dynamodb):
"""
Check supplying invalid parameters to `DynamoDBAdapter` fails as expected.
"""
dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1"
adapter = DynamoDBAdapter(URL(dynamodb_url))

with pytest.raises(ParamValidationError) as ex:
next(adapter.scan("demo", page_size=-1, on_error="raise"))
assert ex.match("Parameter validation failed:\nInvalid value for parameter Limit, value: -1, valid min value: 1")
48 changes: 6 additions & 42 deletions tests/io/dynamodb/test_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,22 @@
pytestmark = pytest.mark.dynamodb


RECORD_UTM = {
RECORD = {
"Id": {"N": "101"},
"utmTags": {
"L": [
{
"M": {
"date": {"S": "2024-08-28T20:05:42.603Z"},
"utm_adgroup": {"L": [{"S": ""}, {"S": ""}]},
"utm_campaign": {"S": "34374686341"},
"utm_medium": {"S": "foobar"},
"utm_source": {"S": "google"},
}
}
]
},
"location": {
"M": {
"coordinates": {"L": [{"S": ""}]},
"meetingPoint": {"S": "At the end of the tunnel"},
"address": {"S": "Salzbergwerk Berchtesgaden"},
},
},
}


def test_dynamodb_copy(caplog, cratedb, dynamodb, dynamodb_test_manager):
def test_dynamodb_copy_success(caplog, cratedb, dynamodb, dynamodb_test_manager):
"""
CLI test: Invoke `ctk load table` for DynamoDB.
Verify `DynamoDBFullLoad` works as expected.
"""

# Define source and target URLs.
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"
dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Populate source database with data.
dynamodb_test_manager.load_records(table_name="demo", records=[RECORD_UTM])
dynamodb_test_manager.load_records(table_name="demo", records=[RECORD])

# Run transfer command.
table_loader = DynamoDBFullLoad(dynamodb_url=dynamodb_url, cratedb_url=cratedb_url)
Expand All @@ -52,20 +32,4 @@ def test_dynamodb_copy(caplog, cratedb, dynamodb, dynamodb_test_manager):
assert cratedb.database.count_records("testdrive.demo") == 1

results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True) # noqa: S608
assert results[0]["data"] == {
"Id": 101.0,
"utmTags": [
{
"date": "2024-08-28T20:05:42.603Z",
"utm_adgroup": ["", ""],
"utm_campaign": "34374686341",
"utm_medium": "foobar",
"utm_source": "google",
}
],
"location": {
"coordinates": [""],
"meetingPoint": "At the end of the tunnel",
"address": "Salzbergwerk Berchtesgaden",
},
}
assert results[0]["data"] == {"Id": 101.0}

0 comments on commit a063a5b

Please sign in to comment.