From f715d68612812c03c65c95c203d917ab9558c312 Mon Sep 17 00:00:00 2001 From: Neeraj Sirdeshmukh Date: Fri, 28 Jul 2023 17:16:10 -0500 Subject: [PATCH 1/9] rebased onto main --- Dockerfile | 3 +- bin/run-migration-job.py | 119 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 121 insertions(+), 1 deletion(-) create mode 100644 bin/run-migration-job.py diff --git a/Dockerfile b/Dockerfile index c03017b..6c58256 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,7 +23,7 @@ RUN set -x && \ trap "rm -rf '$tmp'" EXIT && \ cd "$tmp" && \ curl -fsSL https://github.com/theory/pgtap/archive/refs/tags/v${PGTAP_VERSION}.tar.gz \ - -o pgtap.tar.gz && \ + -o pgtap.tar.gz && \ tar -xzf pgtap.tar.gz --strip-components 1 && \ make install @@ -36,4 +36,5 @@ ENV PATH=/opt/swoop/db/swoop-db-venv/bin:$PATH RUN mkdir -p /opt/swoop/db/scripts COPY bin/db-initialization.py /opt/swoop/db/scripts/db-initialization.py +COPY bin/run-migration-job.py /opt/swoop/db/scripts/run-migration-job.py ENV PATH=/opt/swoop/db/scripts:$PATH diff --git a/bin/run-migration-job.py b/bin/run-migration-job.py new file mode 100644 index 0000000..5254fdd --- /dev/null +++ b/bin/run-migration-job.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python +""" +This script will either migrate or rollback the swoop database to a specified +migration version. The migration version, action type (of either migrate/rollback), +and an override parameter are all specified in the values.yml from the Swoop-DB helm +chart. + +It requires the following environment variables be set: + + - PGHOST: + Postgres K8s service name which will be used to connect to the database + - PGUSER: + name of the user (role) that will perform the migrations + - PGPORT: + port number of the database container + - PGDATABASE: name of the database which will be used for migrations + - Any additional libpq-supported connection parameters + (https://www.postgresql.org/docs/current/libpq-envars.html) +""" +import argparse +import asyncio +import os +import sys + +from buildpg import V, render, funcs + +from swoop.db import SwoopDB + + +APPLICATION_ROLES = ["user_api", "user_caboose", "user_conductor"] + + +def stderr(*args, **kwargs) -> None: + kwargs["file"] = sys.stderr + print(*args, **kwargs) + + +def check_positive(value) -> int: + ivalue = int(value) + if ivalue <= 0: + raise argparse.ArgumentTypeError( + "%s is an invalid migration version number. Only \ + positive values are supported." + % value + ) + return ivalue + + +async def run_migrations() -> None: + dbname: str = os.environ["PGDATABASE"] + + parser = argparse.ArgumentParser() + + parser.add_argument( + "--action", + choices=["migrate", "rollback"], + help="An action to take, either migrate or rollback", + ) + parser.add_argument( + "--version", + type=check_positive, + help="The migration version to which to migrate/rollback the database", + ) + + parser.add_argument( + "--wait_override", + action="store_true", + default=False, + help="Override option to skip waiting for active connections to close. If specified,\ + it is true, and if not specified it is false.", + ) + + args = parser.parse_args() + + action = args.action + version = args.version + override = args.wait_override + + swoop_db = SwoopDB() + + if action == "migrate": + direction = "up" + elif action == "rollback": + direction = "down" + else: + direction = None + + if override: + stderr(f"Applying migrate/rollback on database {dbname} to version {version}") + await swoop_db.migrate(target=version, direction=direction, database=dbname) + else: + async with swoop_db.get_db_connection(database="") as conn: + # Wait for all active connections from user roles to be closed + active = True + roles_list = V("usename") == funcs.any(APPLICATION_ROLES) + while active: + q, p = render( + "SELECT * FROM pg_stat_activity WHERE datname = :db AND :un_clause", + db=dbname, + un_clause=roles_list, + ) + + records = await conn.fetch(q, *p) + if len(records) == 0: + active = False + + if action == "migrate": + stderr(f"Migrating database {dbname} to version {version}") + elif action == "rollback": + stderr(f"Rolling back database {dbname} to version {version}") + else: + stderr( + f"Applying migrate/rollback on database {dbname} to version {version}" + ) + await swoop_db.migrate(target=version, direction=direction, database=dbname) + + +if __name__ == "__main__": + asyncio.run(run_migrations()) From ffdfd020dc4a971ba9cf3a87f9892af048400106 Mon Sep 17 00:00:00 2001 From: Neeraj Sirdeshmukh Date: Fri, 28 Jul 2023 17:39:20 -0500 Subject: [PATCH 2/9] updates --- bin/run-migration-job.py | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/bin/run-migration-job.py b/bin/run-migration-job.py index 5254fdd..763ab93 100644 --- a/bin/run-migration-job.py +++ b/bin/run-migration-job.py @@ -22,11 +22,10 @@ import os import sys -from buildpg import V, render, funcs +from buildpg import V, funcs, render from swoop.db import SwoopDB - APPLICATION_ROLES = ["user_api", "user_caboose", "user_conductor"] @@ -66,8 +65,8 @@ async def run_migrations() -> None: "--wait_override", action="store_true", default=False, - help="Override option to skip waiting for active connections to close. If specified,\ - it is true, and if not specified it is false.", + help="Override option to skip waiting for active connections to close. \ + If specified, it is true, and if not specified it is false.", ) args = parser.parse_args() @@ -78,16 +77,9 @@ async def run_migrations() -> None: swoop_db = SwoopDB() - if action == "migrate": - direction = "up" - elif action == "rollback": - direction = "down" - else: - direction = None - if override: stderr(f"Applying migrate/rollback on database {dbname} to version {version}") - await swoop_db.migrate(target=version, direction=direction, database=dbname) + await swoop_db.migrate(target=version, database=dbname) else: async with swoop_db.get_db_connection(database="") as conn: # Wait for all active connections from user roles to be closed @@ -110,9 +102,10 @@ async def run_migrations() -> None: stderr(f"Rolling back database {dbname} to version {version}") else: stderr( - f"Applying migrate/rollback on database {dbname} to version {version}" + f"Applying migrate/rollback on database {dbname} to \ + version {version}" ) - await swoop_db.migrate(target=version, direction=direction, database=dbname) + await swoop_db.migrate(target=version, database=dbname) if __name__ == "__main__": From 7d2d4dbf158ac18dab6ecd59e1204aa7d8861a81 Mon Sep 17 00:00:00 2001 From: Neeraj Sirdeshmukh Date: Sat, 29 Jul 2023 19:35:14 -0500 Subject: [PATCH 3/9] update wait_override variable handling --- bin/run-migration-job.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/bin/run-migration-job.py b/bin/run-migration-job.py index 763ab93..e75f0d7 100644 --- a/bin/run-migration-job.py +++ b/bin/run-migration-job.py @@ -63,8 +63,7 @@ async def run_migrations() -> None: parser.add_argument( "--wait_override", - action="store_true", - default=False, + choices=["true", "false"], help="Override option to skip waiting for active connections to close. \ If specified, it is true, and if not specified it is false.", ) @@ -77,7 +76,7 @@ async def run_migrations() -> None: swoop_db = SwoopDB() - if override: + if override == "true": stderr(f"Applying migrate/rollback on database {dbname} to version {version}") await swoop_db.migrate(target=version, database=dbname) else: From a488535f4457718f3ebb272fa84868d912374a78 Mon Sep 17 00:00:00 2001 From: Neeraj Sirdeshmukh Date: Sat, 29 Jul 2023 13:49:34 -0500 Subject: [PATCH 4/9] Update Dockerfile --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 6c58256..abfd2bd 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,7 +23,7 @@ RUN set -x && \ trap "rm -rf '$tmp'" EXIT && \ cd "$tmp" && \ curl -fsSL https://github.com/theory/pgtap/archive/refs/tags/v${PGTAP_VERSION}.tar.gz \ - -o pgtap.tar.gz && \ + -o pgtap.tar.gz && \ tar -xzf pgtap.tar.gz --strip-components 1 && \ make install From 4a931a1e39352ebce595bf87ff078b3ace9ee718 Mon Sep 17 00:00:00 2001 From: Neeraj Sirdeshmukh Date: Wed, 2 Aug 2023 15:59:55 -0500 Subject: [PATCH 5/9] updates to migration job --- bin/db-initialization.py | 6 +- bin/run-migration-job.py | 126 ++++++++++++++++++--------------------- 2 files changed, 61 insertions(+), 71 deletions(-) diff --git a/bin/db-initialization.py b/bin/db-initialization.py index 6c28563..84252c9 100755 --- a/bin/db-initialization.py +++ b/bin/db-initialization.py @@ -10,8 +10,8 @@ swoop-caboose role username and password - CONDUCTOR_ROLE_USER and CONDUCTOR_ROLE_PASS: swoop-conductor role username and password - - MIGRATION_ROLE_USER and MIGRATION_ROLE_PASS: - username and password for migration role + - OWNER_ROLE_USER and OWNER_ROLE_PASS: + username and password for owner role - Any additional libpq-supported connection parameters (https://www.postgresql.org/docs/current/libpq-envars.html) """ @@ -24,8 +24,6 @@ from swoop.db import SwoopDB -OWNER_ROLE_NAME = "swoop" -OWNER_ROLE = "OWNER_ROLE" APPLICATION_ROLES: list[str] = [ "API_ROLE", "CABOOSE_ROLE", diff --git a/bin/run-migration-job.py b/bin/run-migration-job.py index e75f0d7..69ace92 100644 --- a/bin/run-migration-job.py +++ b/bin/run-migration-job.py @@ -14,97 +14,89 @@ - PGPORT: port number of the database container - PGDATABASE: name of the database which will be used for migrations + - ACTION: name of the action performed, either migrate or rollback + - VERSION: the migration version to which to migrate/rollback the database + - NO_WAIT: override option to skip waiting for active connections to close. - Any additional libpq-supported connection parameters (https://www.postgresql.org/docs/current/libpq-envars.html) """ -import argparse import asyncio import os import sys - -from buildpg import V, funcs, render +import time from swoop.db import SwoopDB -APPLICATION_ROLES = ["user_api", "user_caboose", "user_conductor"] - def stderr(*args, **kwargs) -> None: kwargs["file"] = sys.stderr print(*args, **kwargs) -def check_positive(value) -> int: - ivalue = int(value) - if ivalue <= 0: - raise argparse.ArgumentTypeError( - "%s is an invalid migration version number. Only \ - positive values are supported." - % value - ) - return ivalue +def strtobool(val) -> bool: + """Convert a string representation of truth to true or false. + True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values + are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if + 'val' is anything else. + """ + val = val.lower() + if val in ("y", "yes", "t", "true", "on", "1"): + return True + elif val in ("n", "no", "f", "false", "off", "0"): + return False + else: + raise ValueError("invalid truth value %r" % (val,)) async def run_migrations() -> None: dbname: str = os.environ["PGDATABASE"] - parser = argparse.ArgumentParser() - - parser.add_argument( - "--action", - choices=["migrate", "rollback"], - help="An action to take, either migrate or rollback", - ) - parser.add_argument( - "--version", - type=check_positive, - help="The migration version to which to migrate/rollback the database", - ) - - parser.add_argument( - "--wait_override", - choices=["true", "false"], - help="Override option to skip waiting for active connections to close. \ - If specified, it is true, and if not specified it is false.", - ) - - args = parser.parse_args() - - action = args.action - version = args.version - override = args.wait_override + action = os.environ["ACTION"] + version = int(os.environ["VERSION"]) + no_wait = strtobool(os.environ["NO_WAIT"]) swoop_db = SwoopDB() - if override == "true": - stderr(f"Applying migrate/rollback on database {dbname} to version {version}") - await swoop_db.migrate(target=version, database=dbname) - else: - async with swoop_db.get_db_connection(database="") as conn: - # Wait for all active connections from user roles to be closed - active = True - roles_list = V("usename") == funcs.any(APPLICATION_ROLES) - while active: - q, p = render( - "SELECT * FROM pg_stat_activity WHERE datname = :db AND :un_clause", - db=dbname, - un_clause=roles_list, - ) - - records = await conn.fetch(q, *p) - if len(records) == 0: - active = False - - if action == "migrate": - stderr(f"Migrating database {dbname} to version {version}") - elif action == "rollback": - stderr(f"Rolling back database {dbname} to version {version}") - else: - stderr( - f"Applying migrate/rollback on database {dbname} to \ - version {version}" + async with swoop_db.get_db_connection() as conn: + # Wait for all active connections from user roles to be closed + active_sessions = not no_wait + while active_sessions: + active_sessions = await conn.fetchval( + """ + SELECT EXISTS( + SELECT * FROM pg_stat_activity + WHERE + datname = current_database() + AND usename != current_user ) - await swoop_db.migrate(target=version, database=dbname) + """, + ) + if active_sessions: + time.sleep(2) + + current_version = await swoop_db.get_current_version(conn=conn) + version_compatible = True + + if action == "rollback": + stderr(f"Rolling back database {dbname} to version {version}") + direction = "down" + if current_version is not None: + version_compatible = version <= current_version + else: + stderr(f"Migrating database {dbname} to version {version}") + direction = "up" + if current_version is not None: + version_compatible = version >= current_version + + if version_compatible: + await swoop_db.migrate( + target=version, direction=direction, database=dbname, conn=conn + ) + else: + stderr( + f"The current version of the database is incompatible with the desired" + "target version {version} and action {action}" + ) if __name__ == "__main__": From bd1a4aa006434a7c87c1fd73824415425918a02f Mon Sep 17 00:00:00 2001 From: Neeraj Sirdeshmukh Date: Wed, 2 Aug 2023 16:15:32 -0500 Subject: [PATCH 6/9] fix ruff --- bin/run-migration-job.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/run-migration-job.py b/bin/run-migration-job.py index 69ace92..29018d2 100644 --- a/bin/run-migration-job.py +++ b/bin/run-migration-job.py @@ -45,7 +45,7 @@ def strtobool(val) -> bool: elif val in ("n", "no", "f", "false", "off", "0"): return False else: - raise ValueError("invalid truth value %r" % (val,)) + raise ValueError(f"invalid truth value {val!r}") async def run_migrations() -> None: @@ -94,8 +94,8 @@ async def run_migrations() -> None: ) else: stderr( - f"The current version of the database is incompatible with the desired" - "target version {version} and action {action}" + "The current version of the database is incompatible with the desired" + f"target version {version} and action {action}" ) From 6065a5f2887708ed0434c0dd55698e985391d0df Mon Sep 17 00:00:00 2001 From: Neeraj Sirdeshmukh Date: Wed, 2 Aug 2023 22:52:55 -0500 Subject: [PATCH 7/9] updates --- bin/run-migration-job.py | 36 +++++++++++++++--------------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/bin/run-migration-job.py b/bin/run-migration-job.py index 29018d2..6a10f46 100644 --- a/bin/run-migration-job.py +++ b/bin/run-migration-job.py @@ -45,19 +45,26 @@ def strtobool(val) -> bool: elif val in ("n", "no", "f", "false", "off", "0"): return False else: - raise ValueError(f"invalid truth value {val!r}") + raise ValueError(f"invalid boolean value {val!r}") + + +def int_or_none(val): + return int(val) if val else None async def run_migrations() -> None: dbname: str = os.environ["PGDATABASE"] - action = os.environ["ACTION"] - version = int(os.environ["VERSION"]) - no_wait = strtobool(os.environ["NO_WAIT"]) + rollback = strtobool(os.environ.get("ROLLBACK", "false")) + version = int_or_none(os.environ.get("VERSION")) + no_wait = strtobool(os.environ.get("NO_WAIT", "false")) swoop_db = SwoopDB() async with swoop_db.get_db_connection() as conn: + current_version = await swoop_db.get_current_version(conn=conn) + if current_version == version: + return # Wait for all active connections from user roles to be closed active_sessions = not no_wait while active_sessions: @@ -74,29 +81,16 @@ async def run_migrations() -> None: if active_sessions: time.sleep(2) - current_version = await swoop_db.get_current_version(conn=conn) - version_compatible = True - - if action == "rollback": + if rollback: stderr(f"Rolling back database {dbname} to version {version}") direction = "down" - if current_version is not None: - version_compatible = version <= current_version else: stderr(f"Migrating database {dbname} to version {version}") direction = "up" - if current_version is not None: - version_compatible = version >= current_version - if version_compatible: - await swoop_db.migrate( - target=version, direction=direction, database=dbname, conn=conn - ) - else: - stderr( - "The current version of the database is incompatible with the desired" - f"target version {version} and action {action}" - ) + await swoop_db.migrate( + target=version, direction=direction, database=dbname, conn=conn + ) if __name__ == "__main__": From 4e000da39f61f129c03310dd9792d577b1bacdde Mon Sep 17 00:00:00 2001 From: Neeraj Sirdeshmukh Date: Thu, 3 Aug 2023 10:35:07 -0500 Subject: [PATCH 8/9] update docstring --- bin/run-migration-job.py | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/bin/run-migration-job.py b/bin/run-migration-job.py index 6a10f46..0401537 100644 --- a/bin/run-migration-job.py +++ b/bin/run-migration-job.py @@ -1,24 +1,24 @@ #!/usr/bin/env python """ -This script will either migrate or rollback the swoop database to a specified -migration version. The migration version, action type (of either migrate/rollback), -and an override parameter are all specified in the values.yml from the Swoop-DB helm -chart. - -It requires the following environment variables be set: - - - PGHOST: - Postgres K8s service name which will be used to connect to the database - - PGUSER: - name of the user (role) that will perform the migrations - - PGPORT: - port number of the database container - - PGDATABASE: name of the database which will be used for migrations - - ACTION: name of the action performed, either migrate or rollback +This script automates applying migrations to a swoop database as created by +./db-initialization.py. By default it will migrate the database forward to +the most-recent schema version after waiting for all swoop application +connections to be closed, but a few environment variables can be used to +change that default behavior: + + - ROLLBACK: boolean flag required when target version is less than current - VERSION: the migration version to which to migrate/rollback the database - - NO_WAIT: override option to skip waiting for active connections to close. - - Any additional libpq-supported connection parameters - (https://www.postgresql.org/docs/current/libpq-envars.html) + - NO_WAIT: override option to skip waiting for active connections to close + +The script uses standard libpq-supported connection environment variables +(https://www.postgresql.org/docs/current/libpq-envars.html), so specify these +as required to connect to the database. Common vars include: + + - PGHOST: hostname or IP address of the postgres cluster host + - PGPORT: port number of the postgres server + - PGUSER: name of the user (role) that will perform the migrations + - PGPASSWORD: password of the user (role) + - PGDATABASE: name of the database onto which to apply migrations """ import asyncio import os From f99b36ac73420f553516fc347b8205f958147a8a Mon Sep 17 00:00:00 2001 From: Neeraj Sirdeshmukh Date: Thu, 3 Aug 2023 11:24:21 -0500 Subject: [PATCH 9/9] remove dbname variable --- bin/run-migration-job.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/bin/run-migration-job.py b/bin/run-migration-job.py index 0401537..0f2e5f6 100644 --- a/bin/run-migration-job.py +++ b/bin/run-migration-job.py @@ -53,8 +53,6 @@ def int_or_none(val): async def run_migrations() -> None: - dbname: str = os.environ["PGDATABASE"] - rollback = strtobool(os.environ.get("ROLLBACK", "false")) version = int_or_none(os.environ.get("VERSION")) no_wait = strtobool(os.environ.get("NO_WAIT", "false")) @@ -82,15 +80,13 @@ async def run_migrations() -> None: time.sleep(2) if rollback: - stderr(f"Rolling back database {dbname} to version {version}") + stderr(f"Rolling back database to version {version}") direction = "down" else: - stderr(f"Migrating database {dbname} to version {version}") + stderr(f"Migrating database to version {version}") direction = "up" - await swoop_db.migrate( - target=version, direction=direction, database=dbname, conn=conn - ) + await swoop_db.migrate(target=version, direction=direction, conn=conn) if __name__ == "__main__":