What is the correct way to add a trigger on a SQLModel table after it has been created #1042
Replies: 1 comment
-
Apologies for the noise, the following works just fine (the transactions were not being explicitly committed 🙈) FTR, the following works just fine. from sqlmodel import SQLModel, Field, Session
from sqlalchemy import (
Column,
String,
Integer,
JSON,
TIMESTAMP,
text,
ForeignKey,
event,
)
from typing import Optional, Dict
from datetime import datetime
from geoalchemy2 import Geometry
from sqlalchemy.engine import Engine
from utils.constants import (
PROJECTS_OBSERVABILITY_SCHEMA,
PROJECTS_TABLE,
REGISTRIES_TABLE,
)
class Project(SQLModel, table=True):
__tablename__ = PROJECTS_TABLE
__table_args__ = {"schema": PROJECTS_OBSERVABILITY_SCHEMA}
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(
sa_column=Column(String(255), unique=True, nullable=False),
)
registry_id: Optional[int] = Field(
sa_column=Column(
Integer,
ForeignKey(f"{PROJECTS_OBSERVABILITY_SCHEMA}.{REGISTRIES_TABLE}.id"),
nullable=True,
)
)
created_at: datetime = Field(
default_factory=datetime.utcnow,
sa_column=Column(
TIMESTAMP(timezone=True), server_default=text("now()"), nullable=False
),
)
schema_name: str = Field(sa_column=Column(String(255), unique=True, nullable=False))
plots_fqn: str = Field(sa_column=Column(String(255), unique=True, nullable=False))
crediting_period_years: Optional[int] = Field(
sa_column=Column(Integer, nullable=True)
)
location: Optional[str] = Field(
sa_column=Column(Geometry(geometry_type="POINT", srid=4326), nullable=True)
)
exports_uri: Optional[str] = Field(sa_column=Column(String(255), nullable=True))
reports_uri: Optional[str] = Field(sa_column=Column(String(255), nullable=True))
registry_project_uri: Optional[str] = Field(
sa_column=Column(String(255), nullable=True)
)
registry_project_id: Optional[str] = Field(sa_column=Column(String, nullable=True))
meta: Optional[Dict] = Field(
default_factory=dict, sa_column=Column(JSON, nullable=True)
)
lifetime_years: Optional[int] = Field(sa_column=Column(Integer, nullable=True))
start_date: Optional[datetime] = Field(
sa_column=Column(TIMESTAMP(timezone=True), nullable=True)
)
end_date: Optional[datetime] = Field(
sa_column=Column(TIMESTAMP(timezone=True), nullable=True)
)
class Config:
arbitrary_types_allowed = True
validate_assignment = True
def create_functions(engine: Engine):
with Session(engine) as session:
session.exec(
text(f"""
CREATE OR REPLACE FUNCTION {PROJECTS_OBSERVABILITY_SCHEMA}.update_area_on_plots_change()
RETURNS TRIGGER AS $$
DECLARE
project_id INT;
area_calculation_epsg INT := 5348;
BEGIN
SELECT id INTO project_id
FROM {PROJECTS_OBSERVABILITY_SCHEMA}.{PROJECTS_TABLE}
WHERE plots_fqn = tg_table_schema || '.' || tg_table_name;
IF project_id IS NULL THEN
RAISE EXCEPTION 'Project ID not found for table %', tg_table_schema || '.' || tg_table_name;
END IF;
PERFORM {PROJECTS_OBSERVABILITY_SCHEMA}.calculate_project_area(project_id, area_calculation_epsg);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
""")
)
session.commit()
session.exec(
text(f"""
CREATE OR REPLACE FUNCTION {PROJECTS_OBSERVABILITY_SCHEMA}.add_area_update_trigger()
RETURNS TRIGGER AS $$
DECLARE
plot_table text;
check_table_exists text;
BEGIN
plot_table := NEW.plots_fqn;
EXECUTE format('SELECT to_regclass(%L)', plot_table) INTO check_table_exists;
IF check_table_exists IS NOT NULL THEN
RAISE notice 'Calling create trigger for (%)', check_table_exists;
EXECUTE format(
'CREATE TRIGGER trg_update_area_on_plots_change AFTER INSERT OR UPDATE OR DELETE ON %s EXECUTE FUNCTION {PROJECTS_OBSERVABILITY_SCHEMA}.update_area_on_plots_change();',
plot_table
);
ELSE
RAISE notice 'Table % does not exist. Trigger not created.', plot_table;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""")
)
session.commit()
def add_trigger_to_table(engine: Engine):
try:
with Session(engine) as session:
session.exec(
text(f"""
CREATE TRIGGER trg_add_area_update_trigger
AFTER INSERT ON {PROJECTS_OBSERVABILITY_SCHEMA}.{PROJECTS_TABLE}
FOR EACH ROW
EXECUTE FUNCTION {PROJECTS_OBSERVABILITY_SCHEMA}.add_area_update_trigger();
""")
)
session.commit()
except Exception as e:
if 'already exists' in str(e):
pass
else:
raise e
@event.listens_for(Project.metadata, "after_create")
def after_create(target, connection, **kw):
engine = connection.engine
create_functions(engine)
add_trigger_to_table(engine) |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
First Check
Commit to Help
Example Code
Description
Hi I'm probably missing something obvious as I'm starting my journey on SQLModel, I'm trying to ensure a query is executed after a table model is created in the database, in this case to add a trigger.
To do so, I'm trying to leverage SQLAlchemy event listeners but it's unfortunately not working as expected:
And inside
main.py
:From the Fast API logs I can see there's a rollback on the transaction:
By checking the database logs I can see an error indicating that the table where the trigger should be created does not exists, so I suspect there's a race condition and the SQLModel models are not created at the time that the trigger creation function runs.
Operating System
macOS
Operating System Details
No response
SQLModel Version
0.0.19
Python Version
3.12.4
Additional Context
No response
Beta Was this translation helpful? Give feedback.
All reactions