Skip to content

Commit

Permalink
Add table_kwargs context manager to make pandas/Dask support dialect
Browse files Browse the repository at this point in the history
Unlock SQLAlchemy ORM's `__table_args__` on the pandas/Dask `to_sql()`
interface, in order to support CrateDB's special SQL DDL options.
  • Loading branch information
amotl committed Jun 23, 2024
1 parent 5e39bbf commit b9af9da
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
## Unreleased
- Added/reactivated documentation as `sqlalchemy-cratedb`
- Added re-usable patches and polyfills from application adapters
- Added ``table_kwargs`` context manager to make pandas/Dask support
CrateDB dialect table options.

## 2024/06/13 0.37.0
- Added support for CrateDB's [FLOAT_VECTOR] data type and its accompanying
Expand Down
30 changes: 30 additions & 0 deletions docs/support.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,36 @@ df.to_sql(
)
```


(support-table-kwargs)=
## Context Manager `table_kwargs`

:::{rubric} Background
:::
CrateDB's special SQL DDL options to support [](inv:crate-reference#partitioned-tables),
[](inv:crate-reference#ddl-sharding), or [](inv:crate-reference#ddl-replication)
sometimes can't be configured easily when SQLAlchemy is wrapped into a 3rd-party
framework like pandas or Dask.

:::{rubric} Utility
:::
The `table_kwargs` utility is a context manager that is able to forward CrateDB's
dialect-specific table creation options to the `sa.Table()` constructor call sites
at runtime.

:::{rubric} Synopsis
:::
Using a context manager incantation like outlined below will render a
`PARTITIONED BY ("time")` SQL clause, without touching the call site of
`sa.Table(...)`.
```python
from sqlalchemy_cratedb.support import table_kwargs

with table_kwargs(crate_partitioned_by="time"):
return df.to_sql(...)
```


(support-autoincrement)=
## Synthetic Autoincrement using Timestamps

Expand Down
3 changes: 2 additions & 1 deletion src/sqlalchemy_cratedb/support/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from sqlalchemy_cratedb.support.pandas import insert_bulk
from sqlalchemy_cratedb.support.pandas import insert_bulk, table_kwargs
from sqlalchemy_cratedb.support.polyfill import check_uniqueness_factory, refresh_after_dml, \
patch_autoincrement_timestamp
from sqlalchemy_cratedb.support.util import refresh_table, refresh_dirty
Expand All @@ -10,4 +10,5 @@
refresh_after_dml,
refresh_dirty,
refresh_table,
table_kwargs,
]
34 changes: 34 additions & 0 deletions src/sqlalchemy_cratedb/support/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@
# However, if you have executed another commercial license agreement
# with Crate these terms will supersede the license and you may use the
# software solely pursuant to the terms of the relevant commercial agreement.
from contextlib import contextmanager
from unittest.mock import patch

import logging

import sqlalchemy as sa


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -60,3 +65,32 @@ def insert_bulk(pd_table, conn, keys, data_iter):
cursor = conn._dbapi_connection.cursor()
cursor.execute(sql=sql, bulk_parameters=data)
cursor.close()


@contextmanager
def table_kwargs(**kwargs):
"""
Context manager for adding SQLAlchemy dialect-specific table options at runtime.
In certain cases where SQLAlchemy orchestration is implemented within a
framework, like at this spot [1] in pandas' `SQLTable._create_table_setup`,
it is not easily possible to forward SQLAlchemy dialect options at table
creation time.
In order to augment the SQL DDL statement to make it honor database-specific
dialect options, the only way to work around the unfortunate situation is by
monkey-patching the call to `sa.Table()` at runtime, relaying additional
dialect options through corresponding keyword arguments in their original
`<dialect>_<kwarg>` format [2].
[1] https://github.com/pandas-dev/pandas/blob/v2.2.2/pandas/io/sql.py#L1282-L1285
[2] https://docs.sqlalchemy.org/en/20/core/foundation.html#sqlalchemy.sql.base.DialectKWArgs.dialect_kwargs
"""
_init_dist = sa.sql.schema.Table._init

def _init(self, name, metadata, *args, **kwargs_effective):
kwargs_effective.update(kwargs)
return _init_dist(self, name, metadata, *args, **kwargs_effective)

with patch("sqlalchemy.sql.schema.Table._init", _init):
yield
67 changes: 67 additions & 0 deletions tests/test_support_pandas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import re

import pytest
import sqlalchemy as sa
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.orm import sessionmaker

from pueblo.testing.pandas import makeTimeDataFrame

from sqlalchemy_cratedb.support.pandas import table_kwargs

TABLE_NAME = "foobar"
INSERT_RECORDS = 42

# Create dataframe, to be used as input data.
df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S")
df["time"] = df.index


def test_table_kwargs_partitioned_by(cratedb_service):
"""
Validate adding CrateDB dialect table option `PARTITIONED BY` at runtime.
"""

engine = cratedb_service.database.engine
session = sessionmaker(bind=engine)()

# Insert records from pandas dataframe.
with table_kwargs(crate_partitioned_by="time"):
df.to_sql(
TABLE_NAME,
con=engine,
if_exists="replace",
index=False,
)

# Synchronize writes.
cratedb_service.database.refresh_table(TABLE_NAME)

# Inquire table cardinality.
metadata = sa.MetaData()
query = sa.select([sa.func.count()]).select_from(sa.Table(TABLE_NAME, metadata))
results = session.execute(query)
count = results.scalar()

# Compare outcome.
assert count == INSERT_RECORDS

# Validate SQL DDL.
ddl = cratedb_service.database.run_sql(f"SHOW CREATE TABLE {TABLE_NAME}")
assert 'PARTITIONED BY ("time")' in ddl[0][0]


def test_table_kwargs_unknown(cratedb_service):
"""
Validate behaviour when adding an unknown CrateDB dialect table option.
"""
engine = cratedb_service.database.engine
with table_kwargs(crate_unknown_option="bazqux"):
with pytest.raises(ProgrammingError) as ex:
df.to_sql(
TABLE_NAME,
con=engine,
if_exists="replace",
index=False,
)
assert ex.match(re.escape("ColumnUnknownException[Column bazqux unknown]"))

0 comments on commit b9af9da

Please sign in to comment.