From d8e0677e12410e8f039640d83dbf83b31d3b2fc2 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 1 Sep 2024 22:00:21 +0200 Subject: [PATCH] DynamoDB: Improve `to_sql()` to accept list of records, for bulk support --- CHANGES.md | 1 + src/commons_codec/model.py | 3 +++ src/commons_codec/transform/dynamodb.py | 12 ++++++++---- tests/transform/test_dynamodb_full.py | 12 +++++++----- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 77c0c96..81bd6c0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,6 +5,7 @@ `quote_relation_name` from `sqlalchemy-cratedb` package. - DynamoDB: Add special decoding for varied lists, storing them into a separate `OBJECT(IGNORED)` column in CrateDB +- DynamoDB: Improve `to_sql()` to accept list of records ## 2024/08/27 v0.0.13 - DMS/DynamoDB: Use parameterized SQL WHERE clauses instead of inlining values diff --git a/src/commons_codec/model.py b/src/commons_codec/model.py index f731f56..965553d 100644 --- a/src/commons_codec/model.py +++ b/src/commons_codec/model.py @@ -139,3 +139,6 @@ class DualRecord: typed: t.Dict[str, t.Any] untyped: t.Dict[str, t.Any] + + def to_dict(self): + return {"typed": self.typed, "untyped": self.untyped} diff --git a/src/commons_codec/transform/dynamodb.py b/src/commons_codec/transform/dynamodb.py index 6f613bb..e8e769e 100644 --- a/src/commons_codec/transform/dynamodb.py +++ b/src/commons_codec/transform/dynamodb.py @@ -23,6 +23,8 @@ # Inhibit Rounded Exceptions DYNAMODB_CONTEXT.traps[decimal.Rounded] = False +RecordType = t.Dict[str, t.Any] + class CrateDBTypeDeserializer(TypeDeserializer): def _deserialize_n(self, value): @@ -132,13 +134,15 @@ def decode_record(self, item: t.Dict[str, t.Any]) -> DualRecord: class DynamoDBFullLoadTranslator(DynamoTranslatorBase): - def to_sql(self, record: t.Dict[str, t.Any]) -> SQLOperation: + def to_sql(self, data: t.Union[RecordType, t.List[RecordType]]) -> SQLOperation: """ - Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record. + Produce INSERT SQL operations (SQL statement and parameters) from DynamoDB record(s). """ - dual_record = self.decode_record(record) sql = f"INSERT INTO {self.table_name} ({self.TYPED_COLUMN}, {self.UNTYPED_COLUMN}) VALUES (:typed, :untyped);" - return SQLOperation(sql, {"typed": dual_record.typed, "untyped": dual_record.untyped}) + if not isinstance(data, list): + data = [data] + parameters = [self.decode_record(record).to_dict() for record in data] + return SQLOperation(sql, parameters) class DynamoDBCDCTranslator(DynamoTranslatorBase): diff --git a/tests/transform/test_dynamodb_full.py b/tests/transform/test_dynamodb_full.py index a962c9e..1f71970 100644 --- a/tests/transform/test_dynamodb_full.py +++ b/tests/transform/test_dynamodb_full.py @@ -89,10 +89,12 @@ def test_to_sql_operation(): """ assert DynamoDBFullLoadTranslator(table_name="foo").to_sql(RECORD_IN) == SQLOperation( statement="INSERT INTO foo (data, aux) VALUES (:typed, :untyped);", - parameters={ - "typed": RECORD_OUT_DATA, - "untyped": RECORD_OUT_AUX, - }, + parameters=[ + { + "typed": RECORD_OUT_DATA, + "untyped": RECORD_OUT_AUX, + } + ], ) @@ -103,7 +105,7 @@ def test_to_sql_cratedb(caplog, cratedb): # Compute CrateDB operation (SQL+parameters) from DynamoDB record. translator = DynamoDBFullLoadTranslator(table_name="from.dynamodb") - operation = translator.to_sql(record=RECORD_IN) + operation = translator.to_sql(RECORD_IN) # Insert into CrateDB. cratedb.database.run_sql(translator.sql_ddl)