diff --git a/CHANGES.md b/CHANGES.md index 31cee5d..cd63a9e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,6 +6,9 @@ - Added `CrateIdentifierPreparer`, in order to quote reserved words like `object` properly, for example when used as column names. - Fixed `CrateDialect.get_pk_constraint` to return `list` instead of `set` type +- Added re-usable patches and polyfills from application adapters. + New utilities: `patch_autoincrement_timestamp`, `refresh_after_dml`, + `check_uniqueness_factory` ## 2024/06/13 0.37.0 - Added support for CrateDB's [FLOAT_VECTOR] data type and its accompanying diff --git a/docs/index-all.rst b/docs/index-all.rst index b2b35a1..9df21d5 100644 --- a/docs/index-all.rst +++ b/docs/index-all.rst @@ -18,3 +18,4 @@ CrateDB SQLAlchemy dialect -- all pages advanced-querying inspection-reflection dataframe + support diff --git a/docs/index.rst b/docs/index.rst index f4c3677..8aec310 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -135,7 +135,7 @@ Load results into `pandas`_ DataFrame. print(df) -Data types +Data Types ========== The :ref:`DB API driver ` and the SQLAlchemy dialect @@ -150,6 +150,20 @@ extension types ` documentation pages. data-types +Support Utilities +================= + +The package bundles a few support and utility functions that try to fill a few +gaps you will observe when working with CrateDB, when compared with other +databases. +Due to its distributed nature, CrateDB's behavior and features differ from those +found in other RDBMS systems. + +.. toctree:: + :maxdepth: 2 + + support + .. _examples: .. _by-example: diff --git a/docs/overview.rst b/docs/overview.rst index 070898b..06c48eb 100644 --- a/docs/overview.rst +++ b/docs/overview.rst @@ -1,9 +1,9 @@ .. _overview: .. _using-sqlalchemy: -======== -Overview -======== +================= +Features Overview +================= .. rubric:: Table of contents @@ -300,15 +300,28 @@ would translate into the following declarative model: >>> log.id ... +.. _auto-generated-identifiers: -Auto-generated primary key +Auto-generated identifiers .......................... +CrateDB does not provide traditional sequences or ``SERIAL`` data type support, +which enable automatically assigning incremental values when inserting records. +However, it offers server-side support by providing an SQL function to generate +random identifiers of ``STRING`` type, and client-side support for generating +``INTEGER``-based identifiers, when using the SQLAlchemy dialect. + +.. _gen_random_text_uuid: + +``gen_random_text_uuid`` +~~~~~~~~~~~~~~~~~~~~~~~~ + CrateDB 4.5.0 added the :ref:`gen_random_text_uuid() ` scalar function, which can also be used within an SQL DDL statement, in order to automatically assign random identifiers to newly inserted records on the server side. In this spirit, it is suitable to be used as a ``PRIMARY KEY`` constraint for SQLAlchemy. +It works on SQLAlchemy-defined columns of type ``sa.String``. A table schema like this @@ -334,6 +347,32 @@ would translate into the following declarative model: >>> item.id ... +.. _timestamp-autoincrement: + +Timestamp-based Autoincrement +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +By using SQLAlchemy's ``sa.func.now()``, you can assign automatically generated +identifiers to SQLAlchemy columns of types ``sa.BigInteger``, ``sa.DateTime``, +and ``sa.String``. + +This emulates autoincrement / sequential ID behavior for designated columns, based +on assigning timestamps on record insertion. + + >>> class Item(Base): + ... id = sa.Column("id", sa.BigInteger, default=func.now(), primary_key=True) + ... name = sa.Column("name", sa.String) + + >>> item = Item(name="Foobar") + >>> session.add(item) + >>> session.commit() + >>> item.id + ... + +There is a support utility which emulates autoincrement / sequential ID +behavior for designated columns, based on assigning timestamps on record +insertion. See :ref:`support-autoincrement`. + .. _using-extension-types: diff --git a/docs/support.md b/docs/support.md new file mode 100644 index 0000000..da366b0 --- /dev/null +++ b/docs/support.md @@ -0,0 +1,213 @@ +(support-features)= +(support-utilities)= +# Support Features + +The package bundles a few support and utility functions that try to fill a few +gaps you will observe when working with CrateDB, a distributed OLAP database, +since it lacks certain features, usually found in traditional OLTP databases. + +A few of the features outlined below are referred to as [polyfills], and +emulate a few functionalities, for example, to satisfy compatibility issues on +downstream frameworks or test suites. You can use them at your disposal, but +you should know what you are doing, as some of them can seriously impact +performance. + +Other features include efficiency support utilities for 3rd-party frameworks, +which can be used to increase performance, mostly on INSERT operations. + + +(support-insert-bulk)= +## Bulk Support for pandas and Dask + +:::{rubric} Background +::: +CrateDB's [](inv:crate-reference#http-bulk-ops) interface enables efficient +INSERT, UPDATE, and DELETE operations for batches of data. It enables +bulk operations, which are executed as single calls on the database server. + +:::{rubric} Utility +::: +The `insert_bulk` utility provides efficient bulk data transfers when using +dataframe libraries like pandas and Dask. {ref}`dataframe` dedicates a whole +page to corresponding topics, about choosing the right chunk sizes, concurrency +settings, and beyond. + +:::{rubric} Synopsis +::: +Use `method=insert_bulk` on pandas' or Dask's `to_sql()` method. +```python +import sqlalchemy as sa +from sqlalchemy_cratedb.support import insert_bulk +from pueblo.testing.pandas import makeTimeDataFrame + +# Create a pandas DataFrame, and connect to CrateDB. +df = makeTimeDataFrame(nper=42, freq="S") +engine = sa.create_engine("crate://") + +# Insert content of DataFrame using batches of records. +df.to_sql( + name="testdrive", + con=engine, + if_exists="replace", + index=False, + method=insert_bulk, +) +``` + +(support-autoincrement)= +## Synthetic Autoincrement using Timestamps + +:::{rubric} Background +::: +CrateDB does not provide traditional sequences or `SERIAL` data type support, +which enable automatically assigning incremental values when inserting records. + + +:::{rubric} Utility +::: +- The `patch_autoincrement_timestamp` utility emulates autoincrement / + sequential ID behavior for designated columns, based on assigning timestamps + on record insertion. +- It will simply assign `sa.func.now()` as a column `default` on the ORM model + column. +- It works on the SQLAlchemy column types `sa.BigInteger`, `sa.DateTime`, + and `sa.String`. +- You can use it if adjusting ORM models for your database adapter is not + an option. + +:::{rubric} Synopsis +::: +After activating the patch, you can use `autoincrement=True` on column definitions. +```python +import sqlalchemy as sa +from sqlalchemy.orm import declarative_base +from sqlalchemy_cratedb.support import patch_autoincrement_timestamp + +# Enable patch. +patch_autoincrement_timestamp() + +# Define database schema. +Base = declarative_base() + +class FooBar(Base): + id = sa.Column(sa.DateTime, primary_key=True, autoincrement=True) +``` + +:::{warning} +CrateDB's [`TIMESTAMP`](inv:crate-reference#type-timestamp) data type provides +milliseconds granularity. This has to be considered when evaluating collision +safety in high-traffic environments. +::: + + +(support-synthetic-refresh)= +## Synthetic Table REFRESH after DML + +:::{rubric} Background +::: +CrateDB is [eventually consistent]. Data written with a former statement is +not guaranteed to be fetched with the next following select statement for the +affected rows. + +Data written to CrateDB is flushed periodically, the refresh interval is +1000 milliseconds by default, and can be changed. More details can be found in +the reference documentation about [table refreshing](inv:crate-reference#refresh_data). + +There are situations where stronger consistency is required, for example when +needing to satisfy test suites of 3rd party frameworks, which usually do not +take such special behavior of CrateDB into consideration. + +:::{rubric} Utility +::: +- The `refresh_after_dml` utility will configure an SQLAlchemy engine or session + to automatically invoke `REFRESH TABLE` statements after each DML + operation (INSERT, UPDATE, DELETE). +- Only relevant (dirty) entities / tables will be considered to be refreshed. + +:::{rubric} Synopsis +::: +```python +import sqlalchemy as sa +from sqlalchemy_cratedb.support import refresh_after_dml + +engine = sa.create_engine("crate://") +refresh_after_dml(engine) +``` + +```python +import sqlalchemy as sa +from sqlalchemy.orm import sessionmaker +from sqlalchemy_cratedb.support import refresh_after_dml + +engine = sa.create_engine("crate://") +session = sessionmaker(bind=engine)() +refresh_after_dml(session) +``` + +:::{warning} +Refreshing the table after each DML operation can cause serious performance +degradations, and should only be used on low-volume, low-traffic data, +when applicable, and if you know what you are doing. +::: + + +(support-unique)= +## Synthetic UNIQUE Constraints + +:::{rubric} Background +::: +CrateDB does not provide `UNIQUE` constraints in DDL statements. Because of its +distributed nature, supporting such a feature natively would cause expensive +database cluster operations, negating many benefits of using database clusters +firsthand. + +:::{rubric} Utility +::: +- The `check_uniqueness_factory` utility emulates "unique constraints" + functionality by querying the table for unique values before invoking + SQL `INSERT` operations. +- It uses SQLALchemy [](inv:sa#orm_event_toplevel), more specifically + the [before_insert] mapper event. +- When the uniqueness constraint is violated, the adapter will raise a + corresponding exception. + ```python + IntegrityError: DuplicateKeyException in table 'foobar' on constraint 'name' + ``` + +:::{rubric} Synopsis +::: +```python +import sqlalchemy as sa +from sqlalchemy.orm import declarative_base +from sqlalchemy.event import listen +from sqlalchemy_cratedb.support import check_uniqueness_factory + +# Define database schema. +Base = declarative_base() + +class FooBar(Base): + id = sa.Column(sa.String, primary_key=True) + name = sa.Column(sa.String) + +# Add synthetic UNIQUE constraint on `name` column. +listen(FooBar, "before_insert", check_uniqueness_factory(FooBar, "name")) +``` + +[before_insert]: https://docs.sqlalchemy.org/en/20/orm/events.html#sqlalchemy.orm.MapperEvents.before_insert + +:::{note} +This feature will only work well if table data is consistent, which can be +ensured by invoking a `REFRESH TABLE` statement after any DML operation. +For conveniently enabling "always refresh", please refer to the documentation +section about [](#support-synthetic-refresh). +::: + +:::{warning} +Querying the table before each INSERT operation can cause serious performance +degradations, and should only be used on low-volume, low-traffic data, +when applicable, and if you know what you are doing. +::: + + +[eventually consistent]: https://en.wikipedia.org/wiki/Eventual_consistency +[polyfills]: https://en.wikipedia.org/wiki/Polyfill_(programming) diff --git a/src/sqlalchemy_cratedb/support/__init__.py b/src/sqlalchemy_cratedb/support/__init__.py new file mode 100644 index 0000000..c40dbbd --- /dev/null +++ b/src/sqlalchemy_cratedb/support/__init__.py @@ -0,0 +1,13 @@ +from sqlalchemy_cratedb.support.pandas import insert_bulk +from sqlalchemy_cratedb.support.polyfill import check_uniqueness_factory, refresh_after_dml, \ + patch_autoincrement_timestamp +from sqlalchemy_cratedb.support.util import refresh_table, refresh_dirty + +__all__ = [ + check_uniqueness_factory, + insert_bulk, + patch_autoincrement_timestamp, + refresh_after_dml, + refresh_dirty, + refresh_table, +] diff --git a/src/sqlalchemy_cratedb/support.py b/src/sqlalchemy_cratedb/support/pandas.py similarity index 100% rename from src/sqlalchemy_cratedb/support.py rename to src/sqlalchemy_cratedb/support/pandas.py diff --git a/src/sqlalchemy_cratedb/support/polyfill.py b/src/sqlalchemy_cratedb/support/polyfill.py new file mode 100644 index 0000000..230af0d --- /dev/null +++ b/src/sqlalchemy_cratedb/support/polyfill.py @@ -0,0 +1,128 @@ +import sqlalchemy as sa +from sqlalchemy.event import listen +import typing as t + +from sqlalchemy_cratedb.support.util import refresh_dirty, refresh_table + + +def patch_autoincrement_timestamp(): + """ + Configure SQLAlchemy model columns with an alternative to `autoincrement=True`. + Use the current timestamp instead. + + This is used by CrateDB's MLflow adapter. + + TODO: Maybe enable through a dialect parameter `crate_polyfill_autoincrement` or such. + """ + import sqlalchemy.sql.schema as schema + + init_dist = schema.Column.__init__ + + def __init__(self, *args, **kwargs): + if "autoincrement" in kwargs: + del kwargs["autoincrement"] + if "default" not in kwargs: + kwargs["default"] = sa.func.now() + init_dist(self, *args, **kwargs) + + schema.Column.__init__ = __init__ # type: ignore[method-assign] + + +def check_uniqueness_factory(sa_entity, *attribute_names): + """ + Run a manual column value uniqueness check on a table, and raise an IntegrityError if applicable. + + CrateDB does not support the UNIQUE constraint on columns. This attempts to emulate it. + + https://github.com/crate/sqlalchemy-cratedb/issues/76 + + This is used by CrateDB's MLflow adapter. + + TODO: Maybe enable through a dialect parameter `crate_polyfill_unique` or such. + """ + + # Synthesize a canonical "name" for the constraint, + # composed of all column names involved. + constraint_name: str = "-".join(attribute_names) + + def check_uniqueness(mapper, connection, target): + from sqlalchemy.exc import IntegrityError + + if isinstance(target, sa_entity): + # TODO: How to use `session.query(SqlExperiment)` here? + stmt = mapper.selectable.select() + for attribute_name in attribute_names: + stmt = stmt.filter(getattr(sa_entity, attribute_name) == getattr(target, attribute_name)) + stmt = stmt.compile(bind=connection.engine) + results = connection.execute(stmt) + if results.rowcount > 0: + raise IntegrityError( + statement=stmt, + params=[], + orig=Exception( + f"DuplicateKeyException in table '{target.__tablename__}' " f"on constraint '{constraint_name}'" + ), + ) + + return check_uniqueness + + +def refresh_after_dml_session(session: sa.orm.Session): + """ + Run `REFRESH TABLE` after each DML operation (INSERT, UPDATE, DELETE). + + CrateDB is eventually consistent, i.e. write operations are not flushed to + disk immediately, so readers may see stale data. In a traditional OLTP-like + application, this is not applicable. + + This SQLAlchemy extension makes sure that data is synchronized after each + operation manipulating data. + + > `after_{insert,update,delete}` events only apply to the session flush operation + > and do not apply to the ORM DML operations described at ORM-Enabled INSERT, + > UPDATE, and DELETE statements. To intercept ORM DML events, use + > `SessionEvents.do_orm_execute().` + > -- https://docs.sqlalchemy.org/en/20/orm/events.html#sqlalchemy.orm.MapperEvents.after_insert + + > Intercept statement executions that occur on behalf of an ORM Session object. + > -- https://docs.sqlalchemy.org/en/20/orm/events.html#sqlalchemy.orm.SessionEvents.do_orm_execute + + > Execute after flush has completed, but before commit has been called. + > -- https://docs.sqlalchemy.org/en/20/orm/events.html#sqlalchemy.orm.SessionEvents.after_flush + + This is used by CrateDB's LangChain adapter. + + TODO: Maybe enable through a dialect parameter `crate_dml_refresh` or such. + """ # noqa: E501 + listen(session, "after_flush", refresh_dirty) + + +def refresh_after_dml_engine(engine: sa.engine.Engine): + """ + Run `REFRESH TABLE` after each DML operation (INSERT, UPDATE, DELETE). + + This is used by CrateDB's Singer/Meltano and `rdflib-sqlalchemy` adapters. + """ + def receive_after_execute( + conn: sa.engine.Connection, clauseelement, multiparams, params, execution_options, result + ): + if isinstance(clauseelement, (sa.sql.Insert, sa.sql.Update, sa.sql.Delete)): + if not isinstance(clauseelement.table, sa.sql.Join): + full_table_name = f'"{clauseelement.table.name}"' + if clauseelement.table.schema is not None: + full_table_name = f'"{clauseelement.table.schema}".' + full_table_name + refresh_table(conn, full_table_name) + + sa.event.listen(engine, "after_execute", receive_after_execute) + + +def refresh_after_dml(engine_or_session: t.Union[sa.engine.Engine, sa.orm.Session]): + """ + Run `REFRESH TABLE` after each DML operation (INSERT, UPDATE, DELETE). + """ + if isinstance(engine_or_session, sa.engine.Engine): + refresh_after_dml_engine(engine_or_session) + elif isinstance(engine_or_session, (sa.orm.Session, sa.orm.scoping.scoped_session)): + refresh_after_dml_session(engine_or_session) + else: + raise TypeError(f"Unknown type: {type(engine_or_session)}") diff --git a/src/sqlalchemy_cratedb/support/util.py b/src/sqlalchemy_cratedb/support/util.py new file mode 100644 index 0000000..1defc93 --- /dev/null +++ b/src/sqlalchemy_cratedb/support/util.py @@ -0,0 +1,34 @@ +import itertools +import typing as t + +import sqlalchemy as sa + +if t.TYPE_CHECKING: + try: + from sqlalchemy.orm import DeclarativeBase + except ImportError: + pass + + +def refresh_table(connection, target: t.Union[str, "DeclarativeBase"]): + """ + Invoke a `REFRESH TABLE` statement. + """ + if hasattr(target, "__tablename__"): + sql = f"REFRESH TABLE {target.__tablename__}" + else: + sql = f"REFRESH TABLE {target}" + connection.execute(sa.text(sql)) + + +def refresh_dirty(session, flush_context=None): + """ + Invoke a `REFRESH TABLE` statement on each table entity flagged as "dirty". + + SQLAlchemy event handler for the 'after_flush' event, + invoking `REFRESH TABLE` on each table which has been modified. + """ + dirty_entities = itertools.chain(session.new, session.dirty, session.deleted) + dirty_classes = {entity.__class__ for entity in dirty_entities} + for class_ in dirty_classes: + refresh_table(session, class_) diff --git a/tests/integration.py b/tests/integration.py index 5e262fc..80d155e 100644 --- a/tests/integration.py +++ b/tests/integration.py @@ -124,6 +124,7 @@ def drop_tables(): "DROP TABLE IF EXISTS archived_tasks", "DROP TABLE IF EXISTS characters", "DROP TABLE IF EXISTS cities", + "DROP TABLE IF EXISTS foobar", "DROP TABLE IF EXISTS locations", "DROP BLOB TABLE IF EXISTS myfiles", "DROP TABLE IF EXISTS search", diff --git a/tests/test_schema.py b/tests/test_schema.py index 83eb348..fa9a476 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -10,9 +10,10 @@ def test_correct_schema(cratedb_service): database = cratedb_service.database tablename = f'"{TESTDRIVE_DATA_SCHEMA}"."foobar"' - inspector: sa.Inspector = sa.inspect(database.engine) + database.run_sql(f"DROP TABLE IF EXISTS {tablename}") database.run_sql(f"CREATE TABLE {tablename} AS SELECT 1") + inspector: sa.Inspector = sa.inspect(database.engine) assert TESTDRIVE_DATA_SCHEMA in inspector.get_schema_names() table_names = inspector.get_table_names(schema=TESTDRIVE_DATA_SCHEMA) diff --git a/tests/test_support_polyfill.py b/tests/test_support_polyfill.py new file mode 100644 index 0000000..d495fee --- /dev/null +++ b/tests/test_support_polyfill.py @@ -0,0 +1,137 @@ +import datetime as dt + +import pytest +import sqlalchemy as sa +from sqlalchemy.event import listen +from sqlalchemy.exc import IntegrityError +from sqlalchemy.orm import sessionmaker + +from sqlalchemy_cratedb import SA_VERSION, SA_1_4 + +try: + from sqlalchemy.orm import declarative_base +except ImportError: + from sqlalchemy.ext.declarative import declarative_base + +from sqlalchemy_cratedb.support import check_uniqueness_factory, patch_autoincrement_timestamp, refresh_after_dml + + +@pytest.mark.skipif(SA_VERSION < SA_1_4, reason="Test case not supported on SQLAlchemy 1.3 and earlier") +def test_autoincrement_timestamp(cratedb_service): + """ + Validate autoincrement columns using `sa.DateTime` columns. + + https://github.com/crate/sqlalchemy-cratedb/issues/77 + """ + patch_autoincrement_timestamp() + + engine = cratedb_service.database.engine + session = sessionmaker(bind=engine)() + Base = declarative_base() + + # Define DDL. + class FooBar(Base): + __tablename__ = 'foobar' + id = sa.Column(sa.String, primary_key=True) + date = sa.Column(sa.DateTime, autoincrement=True) + number = sa.Column(sa.BigInteger, autoincrement=True) + string = sa.Column(sa.String, autoincrement=True) + + Base.metadata.drop_all(engine, checkfirst=True) + Base.metadata.create_all(engine, checkfirst=True) + + # Insert record. + foo_item = FooBar(id="foo") + session.add(foo_item) + session.commit() + session.execute(sa.text("REFRESH TABLE foobar")) + + # Query record. + result = session.execute(sa.select(FooBar.date, FooBar.number, FooBar.string)).mappings().first() + + # Compare outcome. + assert result["date"].year == dt.datetime.now().year + assert result["number"] >= 1718846016235 + assert result["string"] >= "1718846016235" + + +@pytest.mark.skipif(SA_VERSION < SA_1_4, reason="Feature not supported on SQLAlchemy 1.3 and earlier") +def test_check_uniqueness_factory(cratedb_service): + """ + Validate basic synthetic UNIQUE constraints. + + https://github.com/crate/sqlalchemy-cratedb/issues/76 + """ + + engine = cratedb_service.database.engine + session = sessionmaker(bind=engine)() + Base = declarative_base() + + # Define DDL. + class FooBar(Base): + __tablename__ = 'foobar' + id = sa.Column(sa.String, primary_key=True) + name = sa.Column(sa.String) + + # Add synthetic UNIQUE constraint on `name` column. + listen(FooBar, "before_insert", check_uniqueness_factory(FooBar, "name")) + + Base.metadata.drop_all(engine, checkfirst=True) + Base.metadata.create_all(engine, checkfirst=True) + + # Insert baseline record. + foo_item = FooBar(id="foo", name="foo") + session.add(foo_item) + session.commit() + session.execute(sa.text("REFRESH TABLE foobar")) + + # Insert second record, violating the uniqueness constraint. + bar_item = FooBar(id="bar", name="foo") + session.add(bar_item) + with pytest.raises(IntegrityError) as ex: + session.commit() + assert ex.match("DuplicateKeyException in table 'foobar' on constraint 'name'") + + +@pytest.mark.skipif(SA_VERSION < SA_1_4, reason="Feature not supported on SQLAlchemy 1.3 and earlier") +@pytest.mark.parametrize("mode", ["engine", "session"]) +def test_refresh_after_dml(cratedb_service, mode): + """ + Validate automatic `REFRESH TABLE` issuing works well. + + https://github.com/crate/sqlalchemy-cratedb/issues/83 + """ + engine = cratedb_service.database.engine + session = sessionmaker(bind=engine)() + Base = declarative_base() + + # Enable automatic refresh. + if mode == "engine": + refresh_after_dml(engine) + elif mode == "session": + refresh_after_dml(session) + else: + raise ValueError(f"Unable to enable automatic refresh with mode: {mode}") + + # Define DDL. + class FooBar(Base): + __tablename__ = 'foobar' + id = sa.Column(sa.String, primary_key=True) + + Base.metadata.drop_all(engine, checkfirst=True) + Base.metadata.create_all(engine, checkfirst=True) + + # Insert baseline record. + foo_item = FooBar(id="foo") + session.add(foo_item) + session.commit() + + # Query record. + query = session.query(FooBar.id) + result = query.first() + + # Sanity checks. + assert result is not None, "Database result is empty. Most probably, `REFRESH TABLE` wasn't issued." + + # Compare outcome. + assert result[0] == "foo" diff --git a/tests/vector_test.py b/tests/vector_test.py index 245ed30..6a564d7 100644 --- a/tests/vector_test.py +++ b/tests/vector_test.py @@ -215,13 +215,13 @@ def test_float_vector_as_generic(): assert fv.python_type is list -def test_float_vector_integration(): +def test_float_vector_integration(cratedb_service): """ An integration test for `FLOAT_VECTOR` and `KNN_SEARCH`. """ np = pytest.importorskip("numpy") - engine = sa.create_engine(f"crate://") + engine = cratedb_service.database.engine session = sessionmaker(bind=engine)() Base = declarative_base()