From bc5f2cd96d7a2959e414608218ac83b4aff60a44 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Mon, 3 Feb 2025 13:21:02 -0600 Subject: [PATCH] Reset System Database (#197) A new command for programmatically resetting the DBOS system database. This is primarily useful for testing a DBOS app, to reset the internal state of DBOS between tests. --- dbos/_dbos.py | 18 +++++++++++- dbos/_sys_db.py | 43 ++++++++++++++++++++++++++++ dbos/cli/cli.py | 51 ++-------------------------------- tests/test_package.py | 34 ++++++++++++++++++++++- tests/test_schema_migration.py | 40 ++++++++++++++++++++++++++ 5 files changed, 136 insertions(+), 50 deletions(-) diff --git a/dbos/_dbos.py b/dbos/_dbos.py index 0ab79254..b80de62f 100644 --- a/dbos/_dbos.py +++ b/dbos/_dbos.py @@ -56,7 +56,7 @@ ) from ._roles import default_required_roles, required_roles from ._scheduler import ScheduledWorkflow, scheduled -from ._sys_db import WorkflowStatusString +from ._sys_db import WorkflowStatusString, reset_system_database from ._tracer import dbos_tracer if TYPE_CHECKING: @@ -409,6 +409,22 @@ def _launch(self) -> None: dbos_logger.error(f"DBOS failed to launch: {traceback.format_exc()}") raise + @classmethod + def reset_system_database(cls) -> None: + """ + Destroy the DBOS system database. Useful for resetting the state of DBOS between tests. + This is a destructive operation and should only be used in a test environment. + More information on testing DBOS apps: https://docs.dbos.dev/python/tutorials/testing + """ + if _dbos_global_instance is not None: + _dbos_global_instance._reset_system_database() + + def _reset_system_database(self) -> None: + assert ( + not self._launched + ), "The system database cannot be reset after DBOS is launched. Resetting the system database is a destructive operation that should only be used in a test environment." + reset_system_database(self.config) + def _destroy(self) -> None: self._initialized = False for event in self.stop_events: diff --git a/dbos/_sys_db.py b/dbos/_sys_db.py index f648d70f..365c9bfb 100644 --- a/dbos/_sys_db.py +++ b/dbos/_sys_db.py @@ -1265,3 +1265,46 @@ def remove_from_queue(self, workflow_id: str, queue: "Queue") -> None: .where(SystemSchema.workflow_queue.c.workflow_uuid == workflow_id) .values(completed_at_epoch_ms=int(time.time() * 1000)) ) + + +def reset_system_database(config: ConfigFile) -> None: + sysdb_name = ( + config["database"]["sys_db_name"] + if "sys_db_name" in config["database"] and config["database"]["sys_db_name"] + else config["database"]["app_db_name"] + SystemSchema.sysdb_suffix + ) + postgres_db_url = sa.URL.create( + "postgresql+psycopg", + username=config["database"]["username"], + password=config["database"]["password"], + host=config["database"]["hostname"], + port=config["database"]["port"], + database="postgres", + ) + try: + # Connect to postgres default database + engine = sa.create_engine(postgres_db_url) + + with engine.connect() as conn: + # Set autocommit required for database dropping + conn.execution_options(isolation_level="AUTOCOMMIT") + + # Terminate existing connections + conn.execute( + sa.text( + """ + SELECT pg_terminate_backend(pg_stat_activity.pid) + FROM pg_stat_activity + WHERE pg_stat_activity.datname = :db_name + AND pid <> pg_backend_pid() + """ + ), + {"db_name": sysdb_name}, + ) + + # Drop the database + conn.execute(sa.text(f"DROP DATABASE IF EXISTS {sysdb_name}")) + + except sa.exc.SQLAlchemyError as e: + dbos_logger.error(f"Error resetting system database: {str(e)}") + raise e diff --git a/dbos/cli/cli.py b/dbos/cli/cli.py index a44f9a14..3769fa28 100644 --- a/dbos/cli/cli.py +++ b/dbos/cli/cli.py @@ -18,8 +18,7 @@ from .. import load_config from .._app_db import ApplicationDatabase from .._dbos_config import _is_valid_app_name -from .._schemas.system_database import SystemSchema -from .._sys_db import SystemDatabase +from .._sys_db import SystemDatabase, reset_system_database from .._workflow_commands import _cancel_workflow, _get_workflow, _list_workflows from ..cli._github_init import create_template_from_github from ._template_init import copy_template, get_project_name, get_templates_directory @@ -224,56 +223,12 @@ def reset( typer.echo("Operation cancelled.") raise typer.Exit() config = load_config() - sysdb_name = ( - config["database"]["sys_db_name"] - if "sys_db_name" in config["database"] and config["database"]["sys_db_name"] - else config["database"]["app_db_name"] + SystemSchema.sysdb_suffix - ) - postgres_db_url = sa.URL.create( - "postgresql+psycopg", - username=config["database"]["username"], - password=config["database"]["password"], - host=config["database"]["hostname"], - port=config["database"]["port"], - database="postgres", - ) try: - # Connect to postgres default database - engine = sa.create_engine(postgres_db_url) - - with engine.connect() as conn: - # Set autocommit required for database dropping - conn.execution_options(isolation_level="AUTOCOMMIT") - - # Terminate existing connections - conn.execute( - sa.text( - """ - SELECT pg_terminate_backend(pg_stat_activity.pid) - FROM pg_stat_activity - WHERE pg_stat_activity.datname = :db_name - AND pid <> pg_backend_pid() - """ - ), - {"db_name": sysdb_name}, - ) - - # Drop the database - conn.execute(sa.text(f"DROP DATABASE IF EXISTS {sysdb_name}")) - + reset_system_database(config) except sa.exc.SQLAlchemyError as e: - typer.echo(f"Error dropping database: {str(e)}") + typer.echo(f"Error resetting system database: {str(e)}") return - sys_db = None - try: - sys_db = SystemDatabase(config) - except Exception as e: - typer.echo(f"DBOS system schema migration failed: {e}") - finally: - if sys_db: - sys_db.destroy() - @workflow.command(help="List workflows for your application") def list( diff --git a/tests/test_package.py b/tests/test_package.py index 26387826..14164e66 100644 --- a/tests/test_package.py +++ b/tests/test_package.py @@ -50,7 +50,7 @@ def test_package(build_wheel: str, postgres_db_engine: sa.Engine) -> None: # initalize the app with dbos scaffolding subprocess.check_call( - ["dbos", "init", template_name, "--template", "dbos-db-starter"], + ["dbos", "init", template_name, "--template", template_name], cwd=temp_path, env=venv, ) @@ -115,3 +115,35 @@ def test_init_config() -> None: actual_yaml = yaml.safe_load(f) assert actual_yaml == expected_yaml + + +def test_reset(postgres_db_engine: sa.Engine) -> None: + app_name = "reset-app" + sysdb_name = "reset_app_dbos_sys" + with tempfile.TemporaryDirectory() as temp_path: + subprocess.check_call( + ["dbos", "init", app_name, "--template", "dbos-db-starter"], + cwd=temp_path, + ) + + # Create a system database and verify it exists + subprocess.check_call(["dbos", "migrate"], cwd=temp_path) + with postgres_db_engine.connect() as c: + c.execution_options(isolation_level="AUTOCOMMIT") + result = c.execute( + sa.text( + f"SELECT COUNT(*) FROM pg_database WHERE datname = '{sysdb_name}'" + ) + ).scalar() + assert result == 1 + + # Call reset and verify it's destroyed + subprocess.check_call(["dbos", "reset", "-y"], cwd=temp_path) + with postgres_db_engine.connect() as c: + c.execution_options(isolation_level="AUTOCOMMIT") + result = c.execute( + sa.text( + f"SELECT COUNT(*) FROM pg_database WHERE datname = '{sysdb_name}'" + ) + ).scalar() + assert result == 0 diff --git a/tests/test_schema_migration.py b/tests/test_schema_migration.py index a5b9658b..2dfae14f 100644 --- a/tests/test_schema_migration.py +++ b/tests/test_schema_migration.py @@ -102,3 +102,43 @@ def rollback_system_db(sysdb_url: str) -> None: ) alembic_cfg.set_main_option("sqlalchemy.url", escaped_conn_string) command.downgrade(alembic_cfg, "base") # Rollback all migrations + + +def test_reset(config: ConfigFile, postgres_db_engine: sa.Engine) -> None: + DBOS.destroy() + dbos = DBOS(config=config) + DBOS.launch() + + # Make sure the system database exists + with dbos._sys_db.engine.connect() as c: + sql = SystemSchema.workflow_status.select() + result = c.execute(sql) + assert result.fetchall() == [] + + DBOS.destroy() + dbos = DBOS(config=config) + DBOS.reset_system_database() + + sysdb_name = ( + config["database"]["sys_db_name"] + if "sys_db_name" in config["database"] and config["database"]["sys_db_name"] + else config["database"]["app_db_name"] + SystemSchema.sysdb_suffix + ) + with postgres_db_engine.connect() as c: + c.execution_options(isolation_level="AUTOCOMMIT") + count: int = c.execute( + sa.text(f"SELECT COUNT(*) FROM pg_database WHERE datname = '{sysdb_name}'") + ).scalar_one() + assert count == 0 + + DBOS.launch() + + # Make sure the system database is recreated + with dbos._sys_db.engine.connect() as c: + sql = SystemSchema.workflow_status.select() + result = c.execute(sql) + assert result.fetchall() == [] + + # Verify that resetting after launch throws + with pytest.raises(AssertionError): + DBOS.reset_system_database()