Skip to content

Commit

Permalink
Add table_kwargs context manager to make pandas/Dask support dialect
Browse files Browse the repository at this point in the history
Unlock SQLAlchemy ORM's `__table_args__` on the pandas/Dask `to_sql()`
interface, in order to support CrateDB's special SQL DDL options.
  • Loading branch information
amotl committed Jun 24, 2024
1 parent 5e39bbf commit 3a6b827
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
## Unreleased
- Added/reactivated documentation as `sqlalchemy-cratedb`
- Added re-usable patches and polyfills from application adapters
- Added `table_kwargs` context manager to make pandas/Dask support
CrateDB dialect table options.

## 2024/06/13 0.37.0
- Added support for CrateDB's [FLOAT_VECTOR] data type and its accompanying
Expand Down
30 changes: 30 additions & 0 deletions docs/support.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,36 @@ df.to_sql(
)
```


(support-table-kwargs)=
## Context Manager `table_kwargs`

:::{rubric} Background
:::
CrateDB's special SQL DDL options to support [](inv:crate-reference#partitioned-tables),
[](inv:crate-reference#ddl-sharding), or [](inv:crate-reference#ddl-replication)
sometimes can't be configured easily when SQLAlchemy is wrapped into a 3rd-party
framework like pandas or Dask.

:::{rubric} Utility
:::
The `table_kwargs` utility is a context manager that is able to forward CrateDB's
dialect-specific table creation options to the `sa.Table()` constructor call sites
at runtime.

:::{rubric} Synopsis
:::
Using a context manager incantation like outlined below will render a
`PARTITIONED BY ("time")` SQL clause, without touching the call site of
`sa.Table(...)`.
```python
from sqlalchemy_cratedb.support import table_kwargs

with table_kwargs(crate_partitioned_by="time"):
return df.to_sql(...)
```


(support-autoincrement)=
## Synthetic Autoincrement using Timestamps

Expand Down
3 changes: 2 additions & 1 deletion src/sqlalchemy_cratedb/support/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from sqlalchemy_cratedb.support.pandas import insert_bulk
from sqlalchemy_cratedb.support.pandas import insert_bulk, table_kwargs
from sqlalchemy_cratedb.support.polyfill import check_uniqueness_factory, refresh_after_dml, \
patch_autoincrement_timestamp
from sqlalchemy_cratedb.support.util import refresh_table, refresh_dirty
Expand All @@ -10,4 +10,5 @@
refresh_after_dml,
refresh_dirty,
refresh_table,
table_kwargs,
]
49 changes: 49 additions & 0 deletions src/sqlalchemy_cratedb/support/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@
# However, if you have executed another commercial license agreement
# with Crate these terms will supersede the license and you may use the
# software solely pursuant to the terms of the relevant commercial agreement.
from contextlib import contextmanager
from typing import Any
from unittest.mock import patch

import logging

import sqlalchemy as sa

from sqlalchemy_cratedb import SA_VERSION, SA_2_0

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -60,3 +67,45 @@ def insert_bulk(pd_table, conn, keys, data_iter):
cursor = conn._dbapi_connection.cursor()
cursor.execute(sql=sql, bulk_parameters=data)
cursor.close()


@contextmanager
def table_kwargs(**kwargs):
"""
Context manager for adding SQLAlchemy dialect-specific table options at runtime.
In certain cases where SQLAlchemy orchestration is implemented within a
framework, like at this spot [1] in pandas' `SQLTable._create_table_setup`,
it is not easily possible to forward SQLAlchemy dialect options at table
creation time.
In order to augment the SQL DDL statement to make it honor database-specific
dialect options, the only way to work around the unfortunate situation is by
monkey-patching the call to `sa.Table()` at runtime, relaying additional
dialect options through corresponding keyword arguments in their original
`<dialect>_<kwarg>` format [2].
[1] https://github.com/pandas-dev/pandas/blob/v2.2.2/pandas/io/sql.py#L1282-L1285
[2] https://docs.sqlalchemy.org/en/20/core/foundation.html#sqlalchemy.sql.base.DialectKWArgs.dialect_kwargs
"""

if SA_VERSION < SA_2_0:
_init_dist = sa.sql.schema.Table._init

Check warning on line 93 in src/sqlalchemy_cratedb/support/pandas.py

View check run for this annotation

Codecov / codecov/patch

src/sqlalchemy_cratedb/support/pandas.py#L93

Added line #L93 was not covered by tests

def _init(self, name, metadata, *args, **kwargs_effective):
kwargs_effective.update(kwargs)
return _init_dist(self, name, metadata, *args, **kwargs_effective)

Check warning on line 97 in src/sqlalchemy_cratedb/support/pandas.py

View check run for this annotation

Codecov / codecov/patch

src/sqlalchemy_cratedb/support/pandas.py#L95-L97

Added lines #L95 - L97 were not covered by tests

with patch("sqlalchemy.sql.schema.Table._init", _init):
yield

Check warning on line 100 in src/sqlalchemy_cratedb/support/pandas.py

View check run for this annotation

Codecov / codecov/patch

src/sqlalchemy_cratedb/support/pandas.py#L99-L100

Added lines #L99 - L100 were not covered by tests

else:
new_dist = sa.sql.schema.Table._new

def _new(cls, *args: Any, **kw: Any) -> Any:
kw.update(kwargs)
table = new_dist(cls, *args, **kw)
return table

with patch("sqlalchemy.sql.schema.Table._new", _new):
yield
73 changes: 73 additions & 0 deletions tests/test_support_pandas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import re
import sys

import pytest
import sqlalchemy as sa
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.orm import sessionmaker

from pueblo.testing.pandas import makeTimeDataFrame

from sqlalchemy_cratedb import SA_VERSION, SA_1_4
from sqlalchemy_cratedb.support.pandas import table_kwargs

TABLE_NAME = "foobar"
INSERT_RECORDS = 42

# Create dataframe, to be used as input data.
df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S")
df["time"] = df.index


@pytest.mark.skipif(sys.version_info < (3, 8), reason="Feature not supported on Python 3.7 and earlier")
@pytest.mark.skipif(SA_VERSION < SA_1_4, reason="Feature not supported on SQLAlchemy 1.3 and earlier")
def test_table_kwargs_partitioned_by(cratedb_service):
"""
Validate adding CrateDB dialect table option `PARTITIONED BY` at runtime.
"""

engine = cratedb_service.database.engine
session = sessionmaker(bind=engine)()

# Insert records from pandas dataframe.
with table_kwargs(crate_partitioned_by="time"):
df.to_sql(
TABLE_NAME,
engine.connect(),
if_exists="replace",
index=False,
)

# Synchronize writes.
cratedb_service.database.refresh_table(TABLE_NAME)

# Inquire table cardinality.
metadata = sa.MetaData()
query = sa.select(sa.func.count()).select_from(sa.Table(TABLE_NAME, metadata))
results = session.execute(query)
count = results.scalar()

# Compare outcome.
assert count == INSERT_RECORDS

# Validate SQL DDL.
ddl = cratedb_service.database.run_sql(f"SHOW CREATE TABLE {TABLE_NAME}")
assert 'PARTITIONED BY ("time")' in ddl[0][0]


@pytest.mark.skipif(sys.version_info < (3, 8), reason="Feature not supported on Python 3.7 and earlier")
@pytest.mark.skipif(SA_VERSION < SA_1_4, reason="Feature not supported on SQLAlchemy 1.3 and earlier")
def test_table_kwargs_unknown(cratedb_service):
"""
Validate behaviour when adding an unknown CrateDB dialect table option.
"""
engine = cratedb_service.database.engine
with table_kwargs(crate_unknown_option="bazqux"):
with pytest.raises(ProgrammingError) as ex:
df.to_sql(
TABLE_NAME,
engine.connect(),
if_exists="replace",
index=False,
)
assert ex.match(re.escape("ColumnUnknownException[Column bazqux unknown]"))

0 comments on commit 3a6b827

Please sign in to comment.