Skip to content

Commit

Permalink
Reset System Database (#197)
Browse files Browse the repository at this point in the history
A new command for programmatically resetting the DBOS system database.
This is primarily useful for testing a DBOS app, to reset the internal
state of DBOS between tests.
  • Loading branch information
kraftp authored Feb 3, 2025
1 parent b200aaf commit bc5f2cd
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 50 deletions.
18 changes: 17 additions & 1 deletion dbos/_dbos.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
)
from ._roles import default_required_roles, required_roles
from ._scheduler import ScheduledWorkflow, scheduled
from ._sys_db import WorkflowStatusString
from ._sys_db import WorkflowStatusString, reset_system_database
from ._tracer import dbos_tracer

if TYPE_CHECKING:
Expand Down Expand Up @@ -409,6 +409,22 @@ def _launch(self) -> None:
dbos_logger.error(f"DBOS failed to launch: {traceback.format_exc()}")
raise

@classmethod
def reset_system_database(cls) -> None:
"""
Destroy the DBOS system database. Useful for resetting the state of DBOS between tests.
This is a destructive operation and should only be used in a test environment.
More information on testing DBOS apps: https://docs.dbos.dev/python/tutorials/testing
"""
if _dbos_global_instance is not None:
_dbos_global_instance._reset_system_database()

def _reset_system_database(self) -> None:
assert (
not self._launched
), "The system database cannot be reset after DBOS is launched. Resetting the system database is a destructive operation that should only be used in a test environment."
reset_system_database(self.config)

def _destroy(self) -> None:
self._initialized = False
for event in self.stop_events:
Expand Down
43 changes: 43 additions & 0 deletions dbos/_sys_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -1265,3 +1265,46 @@ def remove_from_queue(self, workflow_id: str, queue: "Queue") -> None:
.where(SystemSchema.workflow_queue.c.workflow_uuid == workflow_id)
.values(completed_at_epoch_ms=int(time.time() * 1000))
)


def reset_system_database(config: ConfigFile) -> None:
sysdb_name = (
config["database"]["sys_db_name"]
if "sys_db_name" in config["database"] and config["database"]["sys_db_name"]
else config["database"]["app_db_name"] + SystemSchema.sysdb_suffix
)
postgres_db_url = sa.URL.create(
"postgresql+psycopg",
username=config["database"]["username"],
password=config["database"]["password"],
host=config["database"]["hostname"],
port=config["database"]["port"],
database="postgres",
)
try:
# Connect to postgres default database
engine = sa.create_engine(postgres_db_url)

with engine.connect() as conn:
# Set autocommit required for database dropping
conn.execution_options(isolation_level="AUTOCOMMIT")

# Terminate existing connections
conn.execute(
sa.text(
"""
SELECT pg_terminate_backend(pg_stat_activity.pid)
FROM pg_stat_activity
WHERE pg_stat_activity.datname = :db_name
AND pid <> pg_backend_pid()
"""
),
{"db_name": sysdb_name},
)

# Drop the database
conn.execute(sa.text(f"DROP DATABASE IF EXISTS {sysdb_name}"))

except sa.exc.SQLAlchemyError as e:
dbos_logger.error(f"Error resetting system database: {str(e)}")
raise e
51 changes: 3 additions & 48 deletions dbos/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
from .. import load_config
from .._app_db import ApplicationDatabase
from .._dbos_config import _is_valid_app_name
from .._schemas.system_database import SystemSchema
from .._sys_db import SystemDatabase
from .._sys_db import SystemDatabase, reset_system_database
from .._workflow_commands import _cancel_workflow, _get_workflow, _list_workflows
from ..cli._github_init import create_template_from_github
from ._template_init import copy_template, get_project_name, get_templates_directory
Expand Down Expand Up @@ -224,56 +223,12 @@ def reset(
typer.echo("Operation cancelled.")
raise typer.Exit()
config = load_config()
sysdb_name = (
config["database"]["sys_db_name"]
if "sys_db_name" in config["database"] and config["database"]["sys_db_name"]
else config["database"]["app_db_name"] + SystemSchema.sysdb_suffix
)
postgres_db_url = sa.URL.create(
"postgresql+psycopg",
username=config["database"]["username"],
password=config["database"]["password"],
host=config["database"]["hostname"],
port=config["database"]["port"],
database="postgres",
)
try:
# Connect to postgres default database
engine = sa.create_engine(postgres_db_url)

with engine.connect() as conn:
# Set autocommit required for database dropping
conn.execution_options(isolation_level="AUTOCOMMIT")

# Terminate existing connections
conn.execute(
sa.text(
"""
SELECT pg_terminate_backend(pg_stat_activity.pid)
FROM pg_stat_activity
WHERE pg_stat_activity.datname = :db_name
AND pid <> pg_backend_pid()
"""
),
{"db_name": sysdb_name},
)

# Drop the database
conn.execute(sa.text(f"DROP DATABASE IF EXISTS {sysdb_name}"))

reset_system_database(config)
except sa.exc.SQLAlchemyError as e:
typer.echo(f"Error dropping database: {str(e)}")
typer.echo(f"Error resetting system database: {str(e)}")
return

sys_db = None
try:
sys_db = SystemDatabase(config)
except Exception as e:
typer.echo(f"DBOS system schema migration failed: {e}")
finally:
if sys_db:
sys_db.destroy()


@workflow.command(help="List workflows for your application")
def list(
Expand Down
34 changes: 33 additions & 1 deletion tests/test_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_package(build_wheel: str, postgres_db_engine: sa.Engine) -> None:

# initalize the app with dbos scaffolding
subprocess.check_call(
["dbos", "init", template_name, "--template", "dbos-db-starter"],
["dbos", "init", template_name, "--template", template_name],
cwd=temp_path,
env=venv,
)
Expand Down Expand Up @@ -115,3 +115,35 @@ def test_init_config() -> None:
actual_yaml = yaml.safe_load(f)

assert actual_yaml == expected_yaml


def test_reset(postgres_db_engine: sa.Engine) -> None:
app_name = "reset-app"
sysdb_name = "reset_app_dbos_sys"
with tempfile.TemporaryDirectory() as temp_path:
subprocess.check_call(
["dbos", "init", app_name, "--template", "dbos-db-starter"],
cwd=temp_path,
)

# Create a system database and verify it exists
subprocess.check_call(["dbos", "migrate"], cwd=temp_path)
with postgres_db_engine.connect() as c:
c.execution_options(isolation_level="AUTOCOMMIT")
result = c.execute(
sa.text(
f"SELECT COUNT(*) FROM pg_database WHERE datname = '{sysdb_name}'"
)
).scalar()
assert result == 1

# Call reset and verify it's destroyed
subprocess.check_call(["dbos", "reset", "-y"], cwd=temp_path)
with postgres_db_engine.connect() as c:
c.execution_options(isolation_level="AUTOCOMMIT")
result = c.execute(
sa.text(
f"SELECT COUNT(*) FROM pg_database WHERE datname = '{sysdb_name}'"
)
).scalar()
assert result == 0
40 changes: 40 additions & 0 deletions tests/test_schema_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,43 @@ def rollback_system_db(sysdb_url: str) -> None:
)
alembic_cfg.set_main_option("sqlalchemy.url", escaped_conn_string)
command.downgrade(alembic_cfg, "base") # Rollback all migrations


def test_reset(config: ConfigFile, postgres_db_engine: sa.Engine) -> None:
DBOS.destroy()
dbos = DBOS(config=config)
DBOS.launch()

# Make sure the system database exists
with dbos._sys_db.engine.connect() as c:
sql = SystemSchema.workflow_status.select()
result = c.execute(sql)
assert result.fetchall() == []

DBOS.destroy()
dbos = DBOS(config=config)
DBOS.reset_system_database()

sysdb_name = (
config["database"]["sys_db_name"]
if "sys_db_name" in config["database"] and config["database"]["sys_db_name"]
else config["database"]["app_db_name"] + SystemSchema.sysdb_suffix
)
with postgres_db_engine.connect() as c:
c.execution_options(isolation_level="AUTOCOMMIT")
count: int = c.execute(
sa.text(f"SELECT COUNT(*) FROM pg_database WHERE datname = '{sysdb_name}'")
).scalar_one()
assert count == 0

DBOS.launch()

# Make sure the system database is recreated
with dbos._sys_db.engine.connect() as c:
sql = SystemSchema.workflow_status.select()
result = c.execute(sql)
assert result.fetchall() == []

# Verify that resetting after launch throws
with pytest.raises(AssertionError):
DBOS.reset_system_database()

0 comments on commit bc5f2cd

Please sign in to comment.