diff --git a/CHANGES.md b/CHANGES.md index 58ae83a..fd58d78 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,8 @@ ## Unreleased - Added/reactivated documentation as `sqlalchemy-cratedb` - Added re-usable patches and polyfills from application adapters +- Added `table_kwargs` context manager to enable pandas/Dask to support + CrateDB dialect table options. ## 2024/06/13 0.37.0 - Added support for CrateDB's [FLOAT_VECTOR] data type and its accompanying diff --git a/docs/support.md b/docs/support.md index dd949b9..ef45713 100644 --- a/docs/support.md +++ b/docs/support.md @@ -54,6 +54,36 @@ df.to_sql( ) ``` + +(support-table-kwargs)= +## Context Manager `table_kwargs` + +:::{rubric} Background +::: +CrateDB's special SQL DDL options to support [](inv:crate-reference#partitioned-tables), +[](inv:crate-reference#ddl-sharding), or [](inv:crate-reference#ddl-replication) +sometimes can't be configured easily when SQLAlchemy is wrapped into a 3rd-party +framework like pandas or Dask. + +:::{rubric} Utility +::: +The `table_kwargs` utility is a context manager that is able to forward CrateDB's +dialect-specific table creation options to the `sa.Table()` constructor call sites +at runtime. + +:::{rubric} Synopsis +::: +Using a context manager incantation like outlined below will render a +`PARTITIONED BY ("time")` SQL clause, without touching the call site of +`sa.Table(...)`. +```python +from sqlalchemy_cratedb.support import table_kwargs + +with table_kwargs(crate_partitioned_by="time"): + return df.to_sql(...) +``` + + (support-autoincrement)= ## Synthetic Autoincrement using Timestamps diff --git a/src/sqlalchemy_cratedb/support/__init__.py b/src/sqlalchemy_cratedb/support/__init__.py index c40dbbd..d140d60 100644 --- a/src/sqlalchemy_cratedb/support/__init__.py +++ b/src/sqlalchemy_cratedb/support/__init__.py @@ -1,4 +1,4 @@ -from sqlalchemy_cratedb.support.pandas import insert_bulk +from sqlalchemy_cratedb.support.pandas import insert_bulk, table_kwargs from sqlalchemy_cratedb.support.polyfill import check_uniqueness_factory, refresh_after_dml, \ patch_autoincrement_timestamp from sqlalchemy_cratedb.support.util import refresh_table, refresh_dirty @@ -10,4 +10,5 @@ refresh_after_dml, refresh_dirty, refresh_table, + table_kwargs, ] diff --git a/src/sqlalchemy_cratedb/support/pandas.py b/src/sqlalchemy_cratedb/support/pandas.py index 23c4471..90c24ed 100644 --- a/src/sqlalchemy_cratedb/support/pandas.py +++ b/src/sqlalchemy_cratedb/support/pandas.py @@ -18,8 +18,15 @@ # However, if you have executed another commercial license agreement # with Crate these terms will supersede the license and you may use the # software solely pursuant to the terms of the relevant commercial agreement. +from contextlib import contextmanager +from typing import Any +from unittest.mock import patch + import logging +import sqlalchemy as sa + +from sqlalchemy_cratedb import SA_VERSION, SA_2_0 logger = logging.getLogger(__name__) @@ -60,3 +67,45 @@ def insert_bulk(pd_table, conn, keys, data_iter): cursor = conn._dbapi_connection.cursor() cursor.execute(sql=sql, bulk_parameters=data) cursor.close() + + +@contextmanager +def table_kwargs(**kwargs): + """ + Context manager for adding SQLAlchemy dialect-specific table options at runtime. + + In certain cases where SQLAlchemy orchestration is implemented within a + framework, like at this spot [1] in pandas' `SQLTable._create_table_setup`, + it is not easily possible to forward SQLAlchemy dialect options at table + creation time. + + In order to augment the SQL DDL statement to make it honor database-specific + dialect options, the only way to work around the unfortunate situation is by + monkey-patching the call to `sa.Table()` at runtime, relaying additional + dialect options through corresponding keyword arguments in their original + `_` format [2]. + + [1] https://github.com/pandas-dev/pandas/blob/v2.2.2/pandas/io/sql.py#L1282-L1285 + [2] https://docs.sqlalchemy.org/en/20/core/foundation.html#sqlalchemy.sql.base.DialectKWArgs.dialect_kwargs + """ + + if SA_VERSION < SA_2_0: + _init_dist = sa.sql.schema.Table._init + + def _init(self, name, metadata, *args, **kwargs_effective): + kwargs_effective.update(kwargs) + return _init_dist(self, name, metadata, *args, **kwargs_effective) + + with patch("sqlalchemy.sql.schema.Table._init", _init): + yield + + else: + new_dist = sa.sql.schema.Table._new + + def _new(cls, *args: Any, **kw: Any) -> Any: + kw.update(kwargs) + table = new_dist(cls, *args, **kw) + return table + + with patch("sqlalchemy.sql.schema.Table._new", _new): + yield diff --git a/tests/test_support_pandas.py b/tests/test_support_pandas.py new file mode 100644 index 0000000..1fa43a1 --- /dev/null +++ b/tests/test_support_pandas.py @@ -0,0 +1,74 @@ +import re +import sys + +import pytest +import sqlalchemy as sa +from sqlalchemy.exc import ProgrammingError +from sqlalchemy.orm import sessionmaker + +from pueblo.testing.pandas import makeTimeDataFrame + +from sqlalchemy_cratedb import SA_VERSION, SA_2_0 +from sqlalchemy_cratedb.support.pandas import table_kwargs + +TABLE_NAME = "foobar" +INSERT_RECORDS = 42 + +# Create dataframe, to be used as input data. +df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S") +df["time"] = df.index + + +@pytest.mark.skipif(sys.version_info < (3, 8), reason="Feature not supported on Python 3.7 and earlier") +@pytest.mark.skipif(SA_VERSION < SA_2_0, reason="Feature not supported on SQLAlchemy 1.4 and earlier") +def test_table_kwargs_partitioned_by(cratedb_service): + """ + Validate adding CrateDB dialect table option `PARTITIONED BY` at runtime. + """ + + engine = cratedb_service.database.engine + session = sessionmaker(bind=engine)() + + # Insert records from pandas dataframe. + with table_kwargs(crate_partitioned_by="time"): + df.to_sql( + TABLE_NAME, + engine, + if_exists="replace", + index=False, + ) + + # Synchronize writes. + cratedb_service.database.refresh_table(TABLE_NAME) + + # Inquire table cardinality. + metadata = sa.MetaData() + query = sa.select(sa.func.count()).select_from(sa.Table(TABLE_NAME, metadata)) + results = session.execute(query) + count = results.scalar() + + # Compare outcome. + assert count == INSERT_RECORDS + + # Validate SQL DDL. + ddl = cratedb_service.database.run_sql(f"SHOW CREATE TABLE {TABLE_NAME}") + assert 'PARTITIONED BY ("time")' in ddl[0][0] + + +@pytest.mark.skipif(sys.version_info < (3, 8), reason="Feature not supported on Python 3.7 and earlier") +@pytest.mark.skipif(SA_VERSION < SA_2_0, reason="Feature not supported on SQLAlchemy 1.4 and earlier") +def test_table_kwargs_unknown(cratedb_service): + """ + Validate behaviour when adding an unknown CrateDB dialect table option. + """ + engine = cratedb_service.database.engine + with table_kwargs(crate_unknown_option="'bazqux'"): + with pytest.raises(ProgrammingError) as ex: + df.to_sql( + TABLE_NAME, + engine, + if_exists="replace", + index=False, + ) + assert ex.match(re.escape('SQLParseException[Invalid property "unknown_option" ' + 'passed to [ALTER | CREATE] TABLE statement]'))