From 4297f5d07c5cbc6bc84573f8da36ac4c9b2abea3 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 3 Sep 2024 13:18:22 +0200 Subject: [PATCH] MongoDB: Accept Zyp Treatments, and ingress pagination / egress batching - Use `--transformation` option for applying special treatments. Certain fields should be stored as lists, some need to be ignored for now, others need to be treated manually, etc. - Use pagination on source collection, for creating batches towards CrateDB. --- CHANGES.md | 3 + cratedb_toolkit/api/main.py | 5 +- cratedb_toolkit/io/mongodb/copy.py | 78 ++++++++++++---- cratedb_toolkit/io/mongodb/export.py | 114 +++++++++++------------- cratedb_toolkit/io/mongodb/model.py | 3 + cratedb_toolkit/io/mongodb/util.py | 4 +- cratedb_toolkit/util/config.py | 55 ++++++++++++ examples/zyp/zyp-treatment-all.yaml | 24 +++++ examples/zyp/zyp-treatment-ignore.yaml | 11 +++ pyproject.toml | 4 +- tests/io/mongodb/conftest.py | 6 ++ tests/io/mongodb/test_cli.py | 65 ++++++++++++-- tests/io/mongodb/test_export.py | 107 ++++++++++++++++++++++ tests/io/mongodb/test_extract.py | 11 +-- tests/io/mongodb/test_integration.py | 4 +- tests/io/mongodb/test_transformation.py | 82 +++++++++++++++-- tests/io/mongodb/test_util.py | 2 +- 17 files changed, 467 insertions(+), 111 deletions(-) create mode 100644 cratedb_toolkit/io/mongodb/model.py create mode 100644 cratedb_toolkit/util/config.py create mode 100644 examples/zyp/zyp-treatment-all.yaml create mode 100644 examples/zyp/zyp-treatment-ignore.yaml create mode 100644 tests/io/mongodb/test_export.py diff --git a/CHANGES.md b/CHANGES.md index 5c43831e..84f29ad9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -12,6 +12,9 @@ This means relevant column definitions will not be included into the SQL DDL. - MongoDB: Make `ctk load table` use the `data OBJECT(DYNAMIC)` mapping strategy. - MongoDB: Sanitize lists of varying objects +- MongoDB: Add `--treatment` option for applying special treatments to certain items + on real-world data +- MongoDB: Use pagination on source collection, for creating batches towards CrateDB ## 2024/09/02 v0.0.21 - DynamoDB: Add special decoding for varied lists. diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index 01a3d029..4706a132 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -37,7 +37,10 @@ def __post_init__(self): logger.info(f"Connecting to CrateDB Cloud Cluster: {self.cloud_id}") def load_table( - self, resource: InputOutputResource, target: t.Optional[TableAddress] = None, transformation: Path = None + self, + resource: InputOutputResource, + target: t.Optional[TableAddress] = None, + transformation: Path = None, ): """ Load data into a database table on CrateDB Cloud. diff --git a/cratedb_toolkit/io/mongodb/copy.py b/cratedb_toolkit/io/mongodb/copy.py index a735a1bc..bd9c3ff9 100644 --- a/cratedb_toolkit/io/mongodb/copy.py +++ b/cratedb_toolkit/io/mongodb/copy.py @@ -8,8 +8,12 @@ from commons_codec.model import SQLOperation from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB from tqdm import tqdm +from tqdm.contrib.logging import logging_redirect_tqdm +from yarl import URL +from zyp.model.collection import CollectionAddress -from cratedb_toolkit.io.mongodb.export import extract_value +from cratedb_toolkit.io.mongodb.export import CrateDBConverter +from cratedb_toolkit.io.mongodb.model import DocumentDict from cratedb_toolkit.io.mongodb.transform import TransformationManager from cratedb_toolkit.model import DatabaseAddress from cratedb_toolkit.util import DatabaseAdapter @@ -18,8 +22,13 @@ class MongoDBFullLoadTranslator(MongoDBCDCTranslatorCrateDB): - def __init__(self, table_name: str, tm: TransformationManager = None): + """ + Translate a MongoDB document into a CrateDB document. + """ + + def __init__(self, table_name: str, converter: CrateDBConverter, tm: TransformationManager = None): super().__init__(table_name=table_name) + self.converter = converter self.tm = tm @staticmethod @@ -31,18 +40,22 @@ def get_document_key(record: t.Dict[str, t.Any]) -> str: """ return record["_id"] - def to_sql(self, document: t.Dict[str, t.Any]) -> SQLOperation: + def to_sql(self, data: t.Union[DocumentDict, t.List[DocumentDict]]) -> SQLOperation: """ - Produce CrateDB INSERT SQL statement from MongoDB document. + Produce CrateDB SQL INSERT batch operation from multiple MongoDB documents. """ + if not isinstance(data, list): + data = [data] # Define SQL INSERT statement. sql = f"INSERT INTO {self.table_name} ({self.ID_COLUMN}, {self.DATA_COLUMN}) VALUES (:oid, :record);" - # Converge MongoDB document to SQL parameters. - record = extract_value(self.decode_bson(document)) - oid: str = self.get_document_key(record) - parameters = {"oid": oid, "record": record} + # Converge multiple MongoDB documents into SQL parameters for `executemany` operation. + parameters: t.List[DocumentDict] = [] + for document in data: + record = self.converter.convert(self.decode_bson(document)) + oid: str = self.get_document_key(record) + parameters.append({"oid": oid, "record": record}) return SQLOperation(sql, parameters) @@ -60,6 +73,7 @@ def __init__( cratedb_url: str, tm: t.Union[TransformationManager, None], mongodb_limit: int = 0, + on_error: t.Literal["ignore", "raise"] = "ignore", progress: bool = False, debug: bool = True, ): @@ -67,6 +81,7 @@ def __init__( cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode() cratedb_table = cratedb_table_address.fullname + self.mongodb_uri = URL(mongodb_url) self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient( mongodb_url, document_class=RawBSONDocument, @@ -76,11 +91,20 @@ def __init__( self.mongodb_limit = mongodb_limit self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False) self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table) - self.translator = MongoDBFullLoadTranslator(table_name=self.cratedb_table, tm=tm) + # Transformation machinery. + transformation = None + if tm: + transformation = tm.project.get(CollectionAddress(container=mongodb_database, name=mongodb_collection)) + self.converter = CrateDBConverter(transformation=transformation) + self.translator = MongoDBFullLoadTranslator(table_name=self.cratedb_table, converter=self.converter, tm=tm) + + self.on_error = on_error self.progress = progress self.debug = debug + self.batch_size: int = int(self.mongodb_uri.query.get("batch-size", 250)) + def start(self): """ Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB. @@ -90,29 +114,49 @@ def start(self): logger_on_error = logger.warning if self.debug: logger_on_error = logger.exception - with self.cratedb_adapter.engine.connect() as connection: + with self.cratedb_adapter.engine.connect() as connection, logging_redirect_tqdm(): if not self.cratedb_adapter.table_exists(self.cratedb_table): connection.execute(sa.text(self.translator.sql_ddl)) connection.commit() records_target = self.cratedb_adapter.count_records(self.cratedb_table) logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}") progress_bar = tqdm(total=records_in) - records_out = 0 + records_out: int = 0 - for document in self.mongodb_collection.find().limit(self.mongodb_limit): + skip: int = 0 + while True: + progress_bar.set_description("ACQUIRE") + # Acquire batch of documents, and convert to SQL operation. + documents = self.mongodb_collection.find().skip(skip).limit(self.batch_size).batch_size(self.batch_size) try: - operation = self.translator.to_sql(document) - logger.debug("SQL operation: %s", operation) + operation = self.translator.to_sql(list(documents)) except Exception as ex: - logger_on_error(f"Transforming query failed: {ex}") - continue + logger_on_error(f"Computing query failed: {ex}") + if self.on_error == "raise": + raise + break + + # When input data is exhausted, stop processing. + progress_bar.set_description("CHECK") + if not operation.parameters: + break + + # Submit operation to CrateDB. + progress_bar.set_description("SUBMIT") try: result = connection.execute(sa.text(operation.statement), operation.parameters) result_size = result.rowcount + if result_size < 0: + raise ValueError("Unable to insert one or more records") records_out += result_size progress_bar.update(n=result_size) except Exception as ex: - logger_on_error(f"Executing query failed: {ex}") + logger_on_error(f"Executing operation failed: {ex}\nOperation:\n{operation}") + if self.on_error == "raise": + raise + + # Next page. + skip += self.batch_size progress_bar.close() connection.commit() diff --git a/cratedb_toolkit/io/mongodb/export.py b/cratedb_toolkit/io/mongodb/export.py index 2e5d711b..dbeae37c 100644 --- a/cratedb_toolkit/io/mongodb/export.py +++ b/cratedb_toolkit/io/mongodb/export.py @@ -25,8 +25,8 @@ """ import base64 -import builtins import calendar +import logging import typing as t from uuid import UUID @@ -34,11 +34,16 @@ import dateutil.parser as dateparser import orjson as json import pymongo.collection +from attr import Factory from attrs import define +from zyp.model.collection import CollectionTransformation +from cratedb_toolkit.io.mongodb.model import DocumentDict from cratedb_toolkit.io.mongodb.transform import TransformationManager from cratedb_toolkit.io.mongodb.util import sanitize_field_names +logger = logging.getLogger(__name__) + def date_converter(value): if isinstance(value, int): @@ -60,81 +65,64 @@ def timestamp_converter(value): } -def extract_value(value, parent_type=None): - """ - Decode MongoDB Extended JSON. - - - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json-v1/ - - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ - """ - if isinstance(value, dict): - if len(value) == 1: - if "$binary" in value and value["$binary"]["subType"] in ["03", "04"]: - decoded = str(UUID(bytes=base64.b64decode(value["$binary"]["base64"]))) - return extract_value(decoded, parent_type) - for k, v in value.items(): - if k.startswith("$"): - return extract_value(v, k.lstrip("$")) - return {k.lstrip("$"): extract_value(v, parent_type) for (k, v) in value.items()} - if isinstance(value, list): - if value and isinstance(value[0], dict): - lovos = ListOfVaryingObjectsSanitizer(value) - lovos.apply() - - return [extract_value(v, parent_type) for v in value] - if parent_type: - converter = type_converter.get(parent_type) - if converter: - return converter(value) - return value - - @define -class ListOfVaryingObjectsSanitizer: - """ - CrateDB can not store lists of varying objects, so normalize them. - """ - - data: t.List[t.Dict[str, t.Any]] - - def apply(self): - self.apply_rules(self.get_rules(self.type_stats())) +class CrateDBConverter: + transformation: CollectionTransformation = Factory(CollectionTransformation) + + def convert(self, data: DocumentDict) -> t.Dict[str, t.Any]: + """ + Decode MongoDB Extended JSON, considering CrateDB specifics. + """ + return self.extract_value(data) + + def extract_value(self, value: t.Any, parent_type: t.Optional[str] = None) -> t.Any: + """ + Decode MongoDB Extended JSON. + + - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json-v1/ + - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ + """ + if isinstance(value, dict): + # Custom adjustments to compensate shape anomalies in source data. + self.apply_special_treatments(value) + if len(value) == 1: + if "$binary" in value and value["$binary"]["subType"] in ["03", "04"]: + decoded = str(UUID(bytes=base64.b64decode(value["$binary"]["base64"]))) + return self.extract_value(decoded, parent_type) + for k, v in value.items(): + if k.startswith("$"): + return self.extract_value(v, k.lstrip("$")) + return {k.lstrip("$"): self.extract_value(v, parent_type) for (k, v) in value.items()} + if isinstance(value, list): + return [self.extract_value(v, parent_type) for v in value] + if parent_type: + converter = type_converter.get(parent_type) + if converter: + return converter(value) + return value - def type_stats(self) -> t.Dict[str, t.List[str]]: - types: t.Dict[str, t.List[str]] = {} - for item in self.data: - for key, value in item.items(): - types.setdefault(key, []).append(type(value).__name__) - return types + def apply_special_treatments(self, value: t.Any): + """ + Apply special treatments to value that can't be described otherwise up until now. + # Ignore certain items including anomalies that are not resolved, yet. - def get_rules(self, all_types): - rules = [] - for name, types in all_types.items(): - if len(types) > 1: - rules.append({"name": name, "converter": self.get_best_converter(types)}) - return rules + TODO: Needs an integration test feeding two records instead of just one. + """ - def apply_rules(self, rules): - for item in self.data: - for rule in rules: - name = rule["name"] - if name in item: - item[name] = rule["converter"](item[name]) + if self.transformation is None or self.transformation.treatment is None: + return None - @staticmethod - def get_best_converter(types: t.List[str]) -> t.Callable: - if "str" in types: - return builtins.str - return lambda x: x + return self.transformation.treatment.apply(value) def convert(d): """ Decode MongoDB Extended JSON, considering CrateDB specifics. """ + converter = CrateDBConverter() newdict = {} for k, v in sanitize_field_names(d).items(): - newdict[k] = extract_value(v) + newdict[k] = converter.convert(v) return newdict diff --git a/cratedb_toolkit/io/mongodb/model.py b/cratedb_toolkit/io/mongodb/model.py new file mode 100644 index 00000000..37e62548 --- /dev/null +++ b/cratedb_toolkit/io/mongodb/model.py @@ -0,0 +1,3 @@ +import typing as t + +DocumentDict = t.Dict[str, t.Any] diff --git a/cratedb_toolkit/io/mongodb/util.py b/cratedb_toolkit/io/mongodb/util.py index 2e5d0f6b..529e5f6b 100644 --- a/cratedb_toolkit/io/mongodb/util.py +++ b/cratedb_toolkit/io/mongodb/util.py @@ -1,6 +1,6 @@ import re -import typing as t +from cratedb_toolkit.io.mongodb.model import DocumentDict from cratedb_toolkit.util.data_dict import OrderedDictX @@ -26,7 +26,7 @@ def parse_input_numbers(s: str): return options -def sanitize_field_names(data: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: +def sanitize_field_names(data: DocumentDict) -> DocumentDict: """ Rename top-level column names with single leading underscores to double leading underscores. CrateDB does not accept singe leading underscores, like `_id`. diff --git a/cratedb_toolkit/util/config.py b/cratedb_toolkit/util/config.py new file mode 100644 index 00000000..0e2114dd --- /dev/null +++ b/cratedb_toolkit/util/config.py @@ -0,0 +1,55 @@ +import typing as t +from collections import OrderedDict + +import attr +from attrs import define +from cattrs.preconf.json import make_converter as make_json_converter +from cattrs.preconf.pyyaml import make_converter as make_yaml_converter + + +@define +class Metadata: + version: t.Union[int, None] = None + type: t.Union[str, None] = None + + +@define +class Dumpable: + meta: t.Union[Metadata, None] = None + + def to_dict(self) -> t.Dict[str, t.Any]: + return attr.asdict(self, dict_factory=OrderedDict, filter=no_privates_no_nulls_no_empties) + + def to_json(self) -> str: + converter = make_json_converter(dict_factory=OrderedDict) + return converter.dumps(self.to_dict()) + + def to_yaml(self) -> str: + converter = make_yaml_converter(dict_factory=OrderedDict) + return converter.dumps(self.to_dict()) + + @classmethod + def from_dict(cls, data: t.Dict[str, t.Any]): + return cls(**data) + + @classmethod + def from_json(cls, json_str: str): + converter = make_json_converter(dict_factory=OrderedDict) + return converter.loads(json_str, cls) + + @classmethod + def from_yaml(cls, yaml_str: str): + converter = make_yaml_converter(dict_factory=OrderedDict) + return converter.loads(yaml_str, cls) + + +def no_privates_no_nulls_no_empties(key, value) -> bool: + """ + A filter for `attr.asdict`, to suppress private attributes. + """ + is_private = key.name.startswith("_") + is_null = value is None + is_empty = value == [] + if is_private or is_null or is_empty: + return False + return True diff --git a/examples/zyp/zyp-treatment-all.yaml b/examples/zyp/zyp-treatment-all.yaml new file mode 100644 index 00000000..cc353c76 --- /dev/null +++ b/examples/zyp/zyp-treatment-all.yaml @@ -0,0 +1,24 @@ +# Zyp transformation defining a few special treatments to be applied to inbound data. +--- +meta: + type: zyp-project + version: 1 +collections: +- address: + container: testdrive + name: demo + treatment: + ignore_complex_lists: false + normalize_complex_lists: true + ignore_field: + - "ignore_toplevel" + - "ignore_nested" + convert_list: + - "to_list" + convert_string: + - "to_string" + convert_dict: + - name: "to_dict_scalar" + wrapper_name: "id" + - name: "user" + wrapper_name: "id" diff --git a/examples/zyp/zyp-treatment-ignore.yaml b/examples/zyp/zyp-treatment-ignore.yaml new file mode 100644 index 00000000..6f0b6364 --- /dev/null +++ b/examples/zyp/zyp-treatment-ignore.yaml @@ -0,0 +1,11 @@ +# Zyp transformation defining a few special treatments to be applied to inbound data. +--- +meta: + type: zyp-project + version: 1 +collections: +- address: + container: testdrive + name: demo + treatment: + ignore_complex_lists: true diff --git a/pyproject.toml b/pyproject.toml index cf6b515c..a57abb83 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,7 +83,9 @@ dynamic = [ "version", ] dependencies = [ + "attrs<25", "boltons<25", + "cattrs<24", "click<9", "click-aliases<2,>=1.0.4", "colorama<1", @@ -159,7 +161,7 @@ kinesis = [ "lorrystream[carabas]>=0.0.6", ] mongodb = [ - "commons-codec[mongodb,zyp]>=0.0.14", + "commons-codec[mongodb,zyp]>=0.0.15", "cratedb-toolkit[io]", "orjson<4,>=3.3.1", "pymongo<5,>=3.10.1", diff --git a/tests/io/mongodb/conftest.py b/tests/io/mongodb/conftest.py index 2361000e..419b98e6 100644 --- a/tests/io/mongodb/conftest.py +++ b/tests/io/mongodb/conftest.py @@ -7,6 +7,12 @@ logger = logging.getLogger(__name__) +pytest.importorskip("bson", reason="Skipping tests because bson is not installed") +pytest.importorskip("bsonjs", reason="Skipping tests because bsonjs is not installed") +pytest.importorskip("pymongo", reason="Skipping tests because pymongo is not installed") +pytest.importorskip("rich", reason="Skipping tests because rich is not installed") + + # Define databases to be deleted before running each test case. RESET_DATABASES = [ "testdrive", diff --git a/tests/io/mongodb/test_cli.py b/tests/io/mongodb/test_cli.py index a97b7243..9cdda9e5 100644 --- a/tests/io/mongodb/test_cli.py +++ b/tests/io/mongodb/test_cli.py @@ -1,22 +1,23 @@ import os +from copy import deepcopy +from pathlib import Path from unittest import mock from uuid import UUID +import bson import dateutil +import pymongo import pytest import sqlparse from click.testing import CliRunner from pueblo.testing.dataframe import DataFrameFactory +from toolz import dissoc from cratedb_toolkit.cli import cli from tests.conftest import check_sqlalchemy2 pytestmark = pytest.mark.mongodb -bson = pytest.importorskip("bson", reason="Skipping tests because bson is not installed") -pymongo = pytest.importorskip("pymongo", reason="Skipping tests because pymongo is not installed") -pytest.importorskip("rich", reason="Skipping tests because rich is not installed") - @pytest.fixture(scope="module", autouse=True) def check_prerequisites(): @@ -76,6 +77,14 @@ def test_version(): }, }, } +DOCUMENT_OUT_NO_COMPLEX_LISTS = deepcopy(DOCUMENT_OUT) +DOCUMENT_OUT_NO_COMPLEX_LISTS["data"]["value"] = dissoc( + DOCUMENT_OUT["data"]["value"], + "list_object_symmetric", + "list_object_varying_string", + "list_object_varying_date", +) + DOCUMENT_DDL = """ CREATE TABLE IF NOT EXISTS "testdrive"."demo" ( "oid" TEXT, @@ -114,12 +123,12 @@ def test_mongodb_load_table_basic(caplog, cratedb, mongodb): assert cratedb.database.count_records("testdrive.demo") == 42 -def test_mongodb_load_table_real(caplog, cratedb, mongodb): +def test_mongodb_load_table_complex_lists_normalize(caplog, cratedb, mongodb): """ CLI test: Invoke `ctk load table` for MongoDB. """ - cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" mongodb_url = f"{mongodb.get_connection_url()}/testdrive/demo" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" # Populate source database. client: pymongo.MongoClient = mongodb.get_connection_client() @@ -127,11 +136,13 @@ def test_mongodb_load_table_real(caplog, cratedb, mongodb): demo = testdrive.create_collection("demo") demo.insert_many([DOCUMENT_IN]) + transformation = Path("examples/zyp/zyp-treatment-all.yaml") + # Run transfer command. runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb_url}) result = runner.invoke( cli, - args=f"load table {mongodb_url}", + args=f"load table {mongodb_url} --transformation={transformation}", catch_exceptions=False, ) assert result.exit_code == 0 @@ -150,3 +161,43 @@ def test_mongodb_load_table_real(caplog, cratedb, mongodb): sql = results[0][0] sql = sqlparse.format(sql) assert sql.startswith(DOCUMENT_DDL) + + +def test_mongodb_load_table_complex_lists_ignore(caplog, cratedb, mongodb): + """ + CLI test: Invoke `ctk load table` for MongoDB, with special parameter to ignore complex lists. + """ + mongodb_url = f"{mongodb.get_connection_url()}/testdrive/demo" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Populate source database. + client: pymongo.MongoClient = mongodb.get_connection_client() + testdrive = client.get_database("testdrive") + demo = testdrive.create_collection("demo") + demo.insert_many([DOCUMENT_IN]) + + transformation = Path("examples/zyp/zyp-treatment-ignore.yaml") + + # Run transfer command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb_url}) + result = runner.invoke( + cli, + args=f"load table {mongodb_url} --transformation={transformation}", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify metadata in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") == 1 + + # Verify content in target database. + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo", records=True) + assert results[0] == DOCUMENT_OUT_NO_COMPLEX_LISTS + + # Verify schema in target database. + results = cratedb.database.run_sql("SHOW CREATE TABLE testdrive.demo") + sql = results[0][0] + sql = sqlparse.format(sql) + assert sql.startswith(DOCUMENT_DDL) diff --git a/tests/io/mongodb/test_export.py b/tests/io/mongodb/test_export.py new file mode 100644 index 00000000..46666a08 --- /dev/null +++ b/tests/io/mongodb/test_export.py @@ -0,0 +1,107 @@ +import pytest +from zyp.model.collection import CollectionTransformation +from zyp.model.treatment import Treatment + +from cratedb_toolkit.io.mongodb.export import CrateDBConverter + +pytestmark = pytest.mark.mongodb + + +def test_convert_basic(): + data_in = { + "_id": { + "$oid": "56027fcae4b09385a85f9344", + }, + "value": { + "id": 42, + "date": {"$date": "2015-09-23T10:32:42.33Z"}, + "list": [ + {"id": "foo", "value": "something"}, + {"id": "bar", "value": {"$date": "2015-09-24T10:32:42.33Z"}}, + ], + }, + } + + data_out = { + "_id": "56027fcae4b09385a85f9344", + "value": { + "date": 1443004362000, + "id": 42, + "list": [ + {"id": "foo", "value": "something"}, + {"id": "bar", "value": 1443090762000}, + ], + }, + } + converter = CrateDBConverter() + assert converter.convert(data_in) == data_out + + +def test_convert_with_treatment_ignore_complex_list(): + data_in = { + "_id": { + "$oid": "56027fcae4b09385a85f9344", + }, + "value": { + "id": 42, + "date": {"$date": "2015-09-23T10:32:42.33Z"}, + "some_complex_list": [ + {"id": "foo", "value": "something"}, + {"id": "bar", "value": {"$date": "2015-09-24T10:32:42.33Z"}}, + ], + }, + } + + data_out = { + "_id": "56027fcae4b09385a85f9344", + "value": { + "date": 1443004362000, + "id": 42, + }, + } + + treatment = Treatment(ignore_complex_lists=True) + converter = CrateDBConverter(transformation=CollectionTransformation(treatment=treatment)) + assert converter.convert(data_in) == data_out + + +def test_convert_with_treatment_all_options(): + data_in = { + "_id": { + "$oid": "56027fcae4b09385a85f9344", + }, + "ignore_toplevel": 42, + "value": { + "id": 42, + "date": {"$date": "2015-09-23T10:32:42.33Z"}, + "ignore_nested": 42, + }, + "to_list": 42, + "to_string": 42, + "to_dict_scalar": 42, + "to_dict_list": [{"user": 42}], + } + + data_out = { + "_id": "56027fcae4b09385a85f9344", + "value": { + "date": 1443004362000, + "id": 42, + }, + "to_list": [42], + "to_string": "42", + "to_dict_scalar": {"id": 42}, + "to_dict_list": [{"user": {"id": 42}}], + } + treatment = Treatment( + ignore_complex_lists=False, + ignore_field=["ignore_toplevel", "ignore_nested"], + convert_list=["to_list"], + convert_string=["to_string"], + convert_dict=[ + {"name": "to_dict_scalar", "wrapper_name": "id"}, + {"name": "user", "wrapper_name": "id"}, + ], + ) + converter = CrateDBConverter(transformation=CollectionTransformation(treatment=treatment)) + assert converter.convert(data_in) == data_out diff --git a/tests/io/mongodb/test_extract.py b/tests/io/mongodb/test_extract.py index c5cf5a85..63fe1b04 100644 --- a/tests/io/mongodb/test_extract.py +++ b/tests/io/mongodb/test_extract.py @@ -3,18 +3,13 @@ import unittest from collections import OrderedDict -import pytest - -pytestmark = pytest.mark.mongodb - -pytest.importorskip("bson", reason="Skipping tests because bson is not installed") -pytest.importorskip("pymongo", reason="Skipping tests because pymongo is not installed") -pytest.importorskip("rich", reason="Skipping tests because rich is not installed") - import bson +import pytest from cratedb_toolkit.io.mongodb import extract +pytestmark = pytest.mark.mongodb + class TestExtractTypes(unittest.TestCase): def test_primitive_types(self): diff --git a/tests/io/mongodb/test_integration.py b/tests/io/mongodb/test_integration.py index 7574e7d3..57526efc 100644 --- a/tests/io/mongodb/test_integration.py +++ b/tests/io/mongodb/test_integration.py @@ -4,15 +4,13 @@ import unittest from unittest import mock +import pymongo import pytest from tests.conftest import check_sqlalchemy2 pytestmark = pytest.mark.mongodb -pymongo = pytest.importorskip("pymongo", reason="Skipping tests because pymongo is not installed") -pytest.importorskip("rich", reason="Skipping tests because rich is not installed") - @pytest.fixture(scope="module", autouse=True) def check_prerequisites(): diff --git a/tests/io/mongodb/test_transformation.py b/tests/io/mongodb/test_transformation.py index cabf0c29..ae063f15 100644 --- a/tests/io/mongodb/test_transformation.py +++ b/tests/io/mongodb/test_transformation.py @@ -1,17 +1,14 @@ from pathlib import Path +from unittest import mock +import pymongo import pytest +from cratedb_toolkit.io.mongodb.api import mongodb_copy from tests.conftest import check_sqlalchemy2 pytestmark = pytest.mark.mongodb -pymongo = pytest.importorskip("pymongo", reason="Skipping tests because pymongo is not installed") -pytest.importorskip("bsonjs", reason="Skipping tests because bsonjs is not installed") -pytest.importorskip("rich", reason="Skipping tests because rich is not installed") - -from cratedb_toolkit.io.mongodb.api import mongodb_copy # noqa: E402 - @pytest.fixture(scope="module", autouse=True) def check_prerequisites(): @@ -21,7 +18,6 @@ def check_prerequisites(): check_sqlalchemy2() -@pytest.mark.skip("Wishful thinking with single column strategy") def test_mongodb_copy_transform_timestamp(caplog, cratedb, mongodb): """ Verify MongoDB -> CrateDB data transfer with transformation. @@ -52,4 +48,74 @@ def test_mongodb_copy_transform_timestamp(caplog, cratedb, mongodb): "SELECT pg_typeof(data['timestamp']) AS type FROM testdrive.demo;", records=True ) timestamp_type = type_result[0]["type"] - assert timestamp_type == "TIMESTAMP WITH TIME ZONE" + assert timestamp_type == "bigint" + + # FIXME: Only works with a defined schema. + # assert timestamp_type == "TIMESTAMP WITH TIME ZONE" # noqa: ERA001 + + +def test_mongodb_copy_treatment_all(caplog, cratedb, mongodb): + """ + Verify MongoDB -> CrateDB data transfer with Zyp Treatment transformation. + """ + + data_in = { + "ignore_toplevel": 42, + "value": { + "id": 42, + "date": {"$date": "2015-09-23T10:32:42.33Z"}, + "ignore_nested": 42, + }, + "to_list": 42, + "to_string": 42, + "to_dict_scalar": 42, + "to_dict_list": [{"user": 42}], + } + + data_out = { + "oid": mock.ANY, + "data": { + "_id": mock.ANY, + "value": { + "date": 1443004362000, + "id": 42, + }, + "to_list": [42], + "to_string": "42", + "to_dict_scalar": {"id": 42}, + "to_dict_list": [{"user": {"id": 42}}], + }, + } + + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + mongodb_url = f"{mongodb.get_connection_url()}/testdrive/demo" + + # Populate source database. + client: pymongo.MongoClient = mongodb.get_connection_client() + testdrive = client.get_database("testdrive") + demo = testdrive.create_collection("demo") + demo.insert_one(data_in) + + # Run transfer command. + mongodb_copy( + mongodb_url, + cratedb_url, + transformation=Path("examples/zyp/zyp-treatment-all.yaml"), + ) + + # Verify data in target database. + cratedb.database.refresh_table("testdrive.demo") + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True) + assert results[0] == data_out + return + assert results[0]["data"]["timestamp"] == 1563051934000 + + # Verify schema in target database. + type_result = cratedb.database.run_sql( + "SELECT pg_typeof(data['timestamp']) AS type FROM testdrive.demo;", records=True + ) + timestamp_type = type_result[0]["type"] + assert timestamp_type == "bigint" + + # FIXME: Only works with a defined schema. + # assert timestamp_type == "TIMESTAMP WITH TIME ZONE" # noqa: ERA001 diff --git a/tests/io/mongodb/test_util.py b/tests/io/mongodb/test_util.py index 0146d0cf..dc27cdb1 100644 --- a/tests/io/mongodb/test_util.py +++ b/tests/io/mongodb/test_util.py @@ -2,7 +2,7 @@ import pytest -from cratedb_toolkit.io.mongodb.util import parse_input_numbers, sanitize_field_names +from cratedb_toolkit.io.mongodb.util import parse_input_numbers, sanitize_field_names # noqa: E402 pytestmark = pytest.mark.mongodb