Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MongoDB: Fix and verify Zyp transformations #240

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


## Unreleased
- MongoDB: Fix and verify Zyp transformations

## 2024/08/21 v0.0.18
- Dependencies: Unpin commons-codec, to always use the latest version
Expand Down
6 changes: 4 additions & 2 deletions cratedb_toolkit/io/mongodb/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def apply_type_overrides(self, database_name: str, collection_name: str, collect
for rule in transformation.schema.rules:
pointer = JsonPointer(f"/document{rule.pointer}/types")
type_stats = pointer.resolve(collection_schema)
type_stats[rule.type] = 1e10
type_stats[rule.type] = {"count": int(9e10)}

def apply_transformations(self, database_name: str, collection_name: str, data: t.Dict[str, t.Any]):
if not self.active:
Expand All @@ -46,4 +46,6 @@ def apply_transformations(self, database_name: str, collection_name: str, data:
transformation: CollectionTransformation = self.project.get(address)
except KeyError:
return data
return transformation.bucket.apply(data)
if transformation.bucket:
return transformation.bucket.apply(data)
return data
10 changes: 8 additions & 2 deletions cratedb_toolkit/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

from boltons.urlutils import URL

from cratedb_toolkit.util.database import DatabaseAdapter, decode_database_table


@dataclasses.dataclass
class DatabaseAddress:
Expand Down Expand Up @@ -68,6 +66,8 @@ def decode(self) -> t.Tuple[URL, "TableAddress"]:
"""
Decode database and table names, and sanitize database URI.
"""
from cratedb_toolkit.util.database import decode_database_table

database, table = decode_database_table(self.dburi)
uri = deepcopy(self.uri)
uri.path = ""
Expand All @@ -88,8 +88,14 @@ def fullname(self):
"""
Return a full-qualified quoted table identifier.
"""
from cratedb_toolkit.util import DatabaseAdapter

return DatabaseAdapter.quote_relation_name(f"{self.schema}.{self.table}")

@classmethod
def from_string(cls, table_name_full: str) -> "TableAddress":
return TableAddress(*table_name_full.split("."))


@dataclasses.dataclass
class ClusterInformation:
Expand Down
9 changes: 9 additions & 0 deletions cratedb_toolkit/util/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from sqlalchemy.sql.elements import AsBoolean
from sqlalchemy_cratedb.dialect import CrateDialect

from cratedb_toolkit.model import TableAddress
from cratedb_toolkit.util.data import str_contains

try:
Expand Down Expand Up @@ -349,6 +350,14 @@ def import_csv_dask(
parallel=True,
)

def describe_table_columns(self, table_name: str):
"""
Introspect table schema returning defined columns and their types.
"""
inspector = sa.inspect(self.engine)
table_address = TableAddress.from_string(table_name)
return inspector.get_columns(table_name=t.cast(str, table_address.table), schema=table_address.schema)


def sa_is_empty(thing):
"""
Expand Down
17 changes: 17 additions & 0 deletions examples/zyp/zyp-int64-to-timestamp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Zyp transformation defining a schema override on a top-level column.

# Timestamps in Unixtime/Epoch format are sometimes stored as
# int64 / BIGINT. This transformation defines an adjustment to
# make the target schema use a native datetime/timestamp column.
---
meta:
type: zyp-project
version: 1
collections:
- address:
container: testdrive
name: demo
schema:
rules:
- pointer: /timestamp
type: DATETIME
File renamed without changes.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ kinesis = [
"lorrystream[carabas]",
]
mongodb = [
"commons-codec[mongodb,zyp]",
"commons-codec[mongodb,zyp]>=0.0.4",
"cratedb-toolkit[io]",
"orjson<4,>=3.3.1",
"pymongo<5,>=3.10.1",
Expand Down
52 changes: 52 additions & 0 deletions tests/io/mongodb/test_transformation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from pathlib import Path

import pytest
from sqlalchemy import TIMESTAMP

from tests.conftest import check_sqlalchemy2

pytestmark = pytest.mark.mongodb

pymongo = pytest.importorskip("pymongo", reason="Skipping tests because pymongo is not installed")
pytest.importorskip("rich", reason="Skipping tests because rich is not installed")

from cratedb_toolkit.io.mongodb.api import mongodb_copy # noqa: E402


@pytest.fixture(scope="module", autouse=True)
def check_prerequisites():
"""
This subsystem needs SQLAlchemy 2.x.
"""
check_sqlalchemy2()


def test_mongodb_copy_transform_timestamp(caplog, cratedb, mongodb):
"""
Verify MongoDB -> CrateDB data transfer with transformation.
"""
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"
mongodb_url = f"{mongodb.get_connection_url()}/testdrive/demo"

# Populate source database.
client: pymongo.MongoClient = mongodb.get_connection_client()
testdrive = client.get_database("testdrive")
demo = testdrive.create_collection("demo")
demo.insert_one({"device": "Hotzenplotz", "temperature": 42.42, "timestamp": 1563051934000})

# Run transfer command.
mongodb_copy(
mongodb_url,
cratedb_url,
transformation=Path("examples/zyp/zyp-int64-to-timestamp.yaml"),
)

# Verify data in target database.
cratedb.database.refresh_table("testdrive.demo")
results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True)
assert results[0]["timestamp"] == 1563051934000

# Verify schema in target database.
columns = cratedb.database.describe_table_columns("testdrive.demo")
timestamp_type = columns[3]["type"]
assert isinstance(timestamp_type, TIMESTAMP)