diff --git a/docs/Makefile b/docs/Makefile index c4580d0..ac36d14 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -3,7 +3,7 @@ # You can set these variables from the command line, and also # from the environment for the first two. -SPHINXOPTS ?=-W -a -E +SPHINXOPTS ?= -a -E SPHINXBUILD ?= sphinx-build SOURCEDIR = source BUILDDIR = build diff --git a/docs/source/api.rst b/docs/source/api.rst index 140e81d..717cdfb 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -12,3 +12,17 @@ API .. autofunction:: sqlalchemy_postgresql_audit.uninstall_audit_triggers :noindex: + +Declarative API +=============== +An alternative API for the use and enablement of auditing functionality +can be used more directly on existing models/tables. + +.. autofunction:: sqlalchemy_postgresql_audit.audit_model + :noindex: + +.. autofunction:: sqlalchemy_postgresql_audit.create_audit_model + :noindex: + +.. autofunction:: sqlalchemy_postgresql_audit.create_audit_table + :noindex: diff --git a/setup.cfg b/setup.cfg index 494156a..ef9dbed 100644 --- a/setup.cfg +++ b/setup.cfg @@ -7,3 +7,15 @@ max-line-length = 100 [black] line-length = 100 + +[isort] +profile = black +known_first_party = sqlalchemy_postgresql_audit,tests +line_length = 100 +float_to_top=true +order_by_type = false +use_parentheses = true + +[tool:pytest] +doctest_optionflags = NORMALIZE_WHITESPACE IGNORE_EXCEPTION_DETAIL ELLIPSIS +addopts = --doctest-modules --ff --strict-markers diff --git a/src/sqlalchemy_postgresql_audit/__init__.py b/src/sqlalchemy_postgresql_audit/__init__.py index b6a08fb..d9038dd 100644 --- a/src/sqlalchemy_postgresql_audit/__init__.py +++ b/src/sqlalchemy_postgresql_audit/__init__.py @@ -5,8 +5,12 @@ "enable", "install_audit_triggers", "uninstall_audit_triggers", + "audit_model", + "create_audit_model", + "create_audit_table", ] -from .session import set_session_vars -from .plugin import enable +from .declarative import audit_model, create_audit_model, create_audit_table from .ddl import install_audit_triggers, uninstall_audit_triggers +from .plugin import enable +from .session import set_session_vars diff --git a/src/sqlalchemy_postgresql_audit/ddl.py b/src/sqlalchemy_postgresql_audit/ddl.py index e4a2584..843127f 100644 --- a/src/sqlalchemy_postgresql_audit/ddl.py +++ b/src/sqlalchemy_postgresql_audit/ddl.py @@ -24,9 +24,7 @@ def get_create_trigger_ddl( session_settings = session_settings or [] deletion_elements = ["'D'", "now()", "current_user"] - updation_elements = ["'U'", "now()", "current_user"] - insertion_elements = ["'I'", "now()", "current_user"] setting_map = { @@ -43,18 +41,24 @@ def get_create_trigger_ddl( else col.name ) - # We need to make sure to explicitly reference all elements in the procedure - column_elements.append(column_name) - # If this value is coming out of the target, then we want to explicitly reference the value - if col.name in target_columns: + if col.name == "audit_pk": + continue + + elif col.name in target_columns: deletion_elements.append("OLD.{}".format(column_name)) updation_elements.append("NEW.{}".format(column_name)) insertion_elements.append("NEW.{}".format(column_name)) + # We need to make sure to explicitly reference all elements in the procedure + column_elements.append(column_name) + # If it is not, it is either a default "audit_*" column # or it is one of our session settings values else: + # We need to make sure to explicitly reference all elements in the procedure + column_elements.append(column_name) + if col.name in ( "audit_operation", "audit_operation_timestamp", diff --git a/src/sqlalchemy_postgresql_audit/declarative.py b/src/sqlalchemy_postgresql_audit/declarative.py new file mode 100644 index 0000000..ba4c8d6 --- /dev/null +++ b/src/sqlalchemy_postgresql_audit/declarative.py @@ -0,0 +1,174 @@ +import uuid +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy import Column, text + +try: + from sqlalchemy.orm.decl_api import DeclarativeMeta +except ImportError: + from sqlalchemy.ext.declarative.api import DeclarativeMeta + +from sqlalchemy_postgresql_audit.event_listeners.sqlalchemy import ( + create_audit_table as create_raw_audit_table, +) + + +default_primary_key = Column( + "audit_pk", + UUID(as_uuid=True), + primary_key=True, + default=uuid.uuid4, + server_default=text("uuid_generate_v4()"), +) + + +def audit_model(_func=None, *, enabled=True, primary_key=default_primary_key, **spec): + """Decorate a model to automatically enable audit modeling. + + Arguments: + enabled: Defaults to true, enables auditing. + primary_key: Default to a uuid primary key. Can be disabled by using `None`. + + By default, automatically enables the auditing in addition to hooking + up the actual audit machinery. + + Additionally, leaves a reference to the audit model's own sqlachemy model + on the ``__audit_cls__`` attribute of the decorated class. + + Examples: + >>> from sqlalchemy import Column, types + >>> from sqlalchemy.ext.declarative import declarative_base + >>> from sqlalchemy_postgresql_audit import audit_model + + >>> Base = declarative_base() + + >>> @audit_model + ... class Foo(Base): + ... __tablename__ = 'foo' + ... id = Column(types.Integer(), primary_key=True) + + >>> Foo.__audit_cls__ + + + >>> @audit_model(enabled=False) + ... class Bar(Base): + ... __tablename__ = 'bar' + ... id = Column(types.Integer(), primary_key=True) + """ + + def decorated(model_cls): + model = create_audit_model( + model_cls, enabled=enabled, primary_key=primary_key, **spec + ) + if model: + model_cls.__audit_cls__ = model + + return model_cls + + if _func is None: + return decorated + return decorated(_func) + + +def create_audit_model( + model_cls, *, enabled=True, primary_key=default_primary_key, **spec +): + """Create an SQLAlchemy declarative Model class for the given `model_cls`. + + Arguments: + model_cls: The SQLAlchemy model being audited + enabled: Defaults to true, enables auditing. + primary_key: Default to a uuid primary key. Can be disabled by using `None`. + + Examples: + >>> from sqlalchemy import Column, types + >>> from sqlalchemy.ext.declarative import declarative_base + >>> from sqlalchemy_postgresql_audit import create_audit_model + + >>> Base = declarative_base() + + >>> class Foo(Base): + ... __tablename__ = 'foo' + ... id = Column(types.Integer(), primary_key=True) + + >>> class Bar(Base): + ... __tablename__ = 'bar' + ... id = Column(types.Integer(), primary_key=True) + + >>> class Baz(Base): + ... __tablename__ = 'baz' + ... id = Column(types.Integer(), primary_key=True) + + >>> AuditModel = create_audit_model(Foo) + >>> AuditModel3 = create_audit_model(Baz, primary_key=default_primary_key) + >>> create_audit_model(Bar, enabled=False) + """ + base_table = model_cls.__table__ + metadata = model_cls.metadata + + table = create_audit_table( + base_table, metadata, enabled=enabled, primary_key=primary_key, **spec + ) + if table is None: + return + + model_base = _find_model_base(model_cls) + + cls = type( + "{model_cls}Audit".format(model_cls=model_cls.__name__), + (model_base,), + {"__table__": table}, + ) + + return cls + + +def create_audit_table( + table, + metadata, + *, + enabled=True, + primary_key=default_primary_key, + ignore_columns=(), + **spec +): + """Create an audit SQLAlchemy ``Table`` for a given `Table` instance. + + Arguments: + table: The SQLAlchemy `Table` to audit. + metadata: The `SQLAlchemy` metadata on which to attach the table. + enabled: Defaults to true, enables auditing. + primary_key: Default to a uuid primary key. Can be disabled by using `None`. + spec: Optional auditing spec options. + + Examples: + >>> from sqlalchemy import MetaData, Table + >>> from sqlalchemy_postgresql_audit import create_audit_table + + >>> meta = MetaData() + + >>> foo_table = Table('foo', meta) + >>> audit_table1 = create_audit_table(foo_table, meta) + + >>> baz_table = Table('baz', meta) + >>> audit_table3 = create_audit_table(baz_table, meta, primary_key=None) + + >>> bar_table = Table('bar', meta) + >>> create_audit_table(bar_table, meta, enabled=False) + """ + existing_info = table.info + existing_info["audit.options"] = {"enabled": enabled, **spec} + + return create_raw_audit_table( + table, + metadata, + primary_key=primary_key, + ignore_columns=ignore_columns, + ) + + +def _find_model_base(model_cls): + for cls in model_cls.__mro__: + if isinstance(cls, DeclarativeMeta) and not hasattr(cls, "__mapper__"): + return cls + + raise ValueError("Invalid model, does not subclass a `DeclarativeMeta`.") diff --git a/src/sqlalchemy_postgresql_audit/event_listeners/__init__.py b/src/sqlalchemy_postgresql_audit/event_listeners/__init__.py index 1338327..de617ff 100644 --- a/src/sqlalchemy_postgresql_audit/event_listeners/__init__.py +++ b/src/sqlalchemy_postgresql_audit/event_listeners/__init__.py @@ -1,7 +1,6 @@ import threading -from sqlalchemy import Table -from sqlalchemy.events import event +from sqlalchemy import Table, event _event_listeners_enabled = False @@ -17,20 +16,19 @@ def enable_event_listeners(): def _enable_sqlalchemy_event_listeners(): - from sqlalchemy_postgresql_audit.event_listeners.sqlalchemy import ( - create_audit_table, - ) + from sqlalchemy_postgresql_audit.event_listeners.sqlalchemy import \ + create_audit_table event.listens_for(Table, "after_parent_attach")(create_audit_table) def _enable_alembic_event_listeners(): try: - from sqlalchemy_postgresql_audit.event_listeners.alembic import ( - compare_for_table, - ) from alembic.autogenerate.compare import comparators + from sqlalchemy_postgresql_audit.event_listeners.alembic import \ + compare_for_table + comparators.dispatch_for("table")(compare_for_table) except ImportError: pass diff --git a/src/sqlalchemy_postgresql_audit/event_listeners/sqlalchemy.py b/src/sqlalchemy_postgresql_audit/event_listeners/sqlalchemy.py index cb64a46..7a28c5b 100644 --- a/src/sqlalchemy_postgresql_audit/event_listeners/sqlalchemy.py +++ b/src/sqlalchemy_postgresql_audit/event_listeners/sqlalchemy.py @@ -18,7 +18,12 @@ ) -def create_audit_table(target, parent): +def create_audit_table( + target, + parent, + primary_key=None, + ignore_columns=(), +): """Create an audit table and generate procedure/trigger DDL. Naming conventions can be defined for a few of the named elements: @@ -83,23 +88,32 @@ def create_audit_table(target, parent): "schema": audit_spec["schema"] or "public", } - columns = [ - Column(col.name, col.type, nullable=True) for col in target.columns.values() - ] + column_elements = [] + if primary_key is not None: + column_elements.append(primary_key) + + column_elements.extend( + [ + Column("audit_operation", String(1), nullable=False), + Column("audit_operation_timestamp", DateTime, nullable=False), + Column("audit_current_user", String(64), nullable=False), + ] + ) + session_setting_columns = [col.copy() for col in audit_spec["session_settings"]] for col in session_setting_columns: col.name = "audit_{}".format(col.name) + column_elements.extend(session_setting_columns) - column_elements = session_setting_columns + columns + table_columns = [ + Column(col.name, col.type, nullable=True) + for col in target.columns.values() + if col.name not in ignore_columns + ] + column_elements.extend(table_columns) audit_table = Table( - audit_table_name, - target.metadata, - Column("audit_operation", String(1), nullable=False), - Column("audit_operation_timestamp", DateTime, nullable=False), - Column("audit_current_user", String(64), nullable=False), - *column_elements, - schema=audit_spec["schema"] + audit_table_name, target.metadata, *column_elements, schema=audit_spec["schema"] ) target.info["audit.create_ddl"] = get_create_trigger_ddl( @@ -119,3 +133,4 @@ def create_audit_table(target, parent): audit_table.info["audit.is_audit_table"] = True target.info["audit.is_audited"] = True + return audit_table diff --git a/tests/unit_tests/test_declarative.py b/tests/unit_tests/test_declarative.py new file mode 100644 index 0000000..f63c664 --- /dev/null +++ b/tests/unit_tests/test_declarative.py @@ -0,0 +1,44 @@ +from sqlalchemy import Column, Integer, MetaData, Table +from sqlalchemy.ext.declarative import declarative_base + +import sqlalchemy_postgresql_audit + + +def setup(): + sqlalchemy_postgresql_audit.enable() + + +def test_vanilla_model(): + Base = declarative_base() + metadata = Base.metadata + + @sqlalchemy_postgresql_audit.audit_model + class Model(Base): + __tablename__ = 'foo' + + id = Column("id", Integer, primary_key=True) + + audit_table = metadata.tables["foo_audit"] + + assert audit_table.info["audit.is_audit_table"] + assert Model.__table__.info["audit.is_audited"] + + +def test_model_with_info(): + Base = declarative_base() + metadata = Base.metadata + + @sqlalchemy_postgresql_audit.audit_model + class Model(Base): + __tablename__ = 'foo' + __table_args__ = { + 'info': {'example': 4} + } + + id = Column("id", Integer, primary_key=True) + + audit_table = metadata.tables["foo_audit"] + + assert audit_table.info["audit.is_audit_table"] + assert Model.__table__.info["audit.is_audited"] + assert 'example' in Model.__table__.info diff --git a/tox.ini b/tox.ini index 65ee417..00c7145 100644 --- a/tox.ini +++ b/tox.ini @@ -13,7 +13,7 @@ basepython = commands = pip install sqlalchemy-postgresql-audit[testing] - pytest --cov sqlalchemy_postgresql_audit --cov-report= -v tests {posargs:} + pytest --cov sqlalchemy_postgresql_audit --cov-report= -v src tests {posargs:} setenv = COVERAGE_FILE=tmp/.coverage.{envname}