diff --git a/lib/workload/stateless/stacks/pg-dd/.dockerignore b/lib/workload/stateless/stacks/pg-dd/.dockerignore index 2863a8486..dd833bbf9 100644 --- a/lib/workload/stateless/stacks/pg-dd/.dockerignore +++ b/lib/workload/stateless/stacks/pg-dd/.dockerignore @@ -2,7 +2,6 @@ deploy .env .env.example .gitignore -Makefile README.md data .ruff_cache diff --git a/lib/workload/stateless/stacks/pg-dd/.env.example b/lib/workload/stateless/stacks/pg-dd/.env.example index d91b99208..4c98ec346 100644 --- a/lib/workload/stateless/stacks/pg-dd/.env.example +++ b/lib/workload/stateless/stacks/pg-dd/.env.example @@ -5,4 +5,5 @@ PG_DD_DATABASE_METADATA_MANAGER=metadata_manager PG_DD_DATABASE_SEQUENCE_RUN_MANAGER=sequence_run_manager PG_DD_DATABASE_WORKFLOW_MANAGER=workflow_manager PG_DD_DATABASE_FILEMANAGER=filemanager -PG_DD_DATABASE_FILEMANAGER_SQL="select * from s3_object" \ No newline at end of file +PG_DD_DATABASE_FILEMANAGER_SQL_DUMP="select * from s3_object order by sequencer limit 10000" +PG_DD_DATABASE_FILEMANAGER_SQL_LOAD="s3_object" \ No newline at end of file diff --git a/lib/workload/stateless/stacks/pg-dd/.gitignore b/lib/workload/stateless/stacks/pg-dd/.gitignore index df6a7adb9..7e63edf22 100644 --- a/lib/workload/stateless/stacks/pg-dd/.gitignore +++ b/lib/workload/stateless/stacks/pg-dd/.gitignore @@ -1,3 +1,4 @@ .env data -.ruff_cache \ No newline at end of file +.ruff_cache +response.json \ No newline at end of file diff --git a/lib/workload/stateless/stacks/pg-dd/Dockerfile b/lib/workload/stateless/stacks/pg-dd/Dockerfile new file mode 100644 index 000000000..6de2f8c3d --- /dev/null +++ b/lib/workload/stateless/stacks/pg-dd/Dockerfile @@ -0,0 +1,15 @@ +# This Dockerfile is intended to be used as part of a Docker Compose setup. +# When running this microservice from the Docker Compose root, this Dockerfile +# will build the image, install dependencies, and start the server + +FROM public.ecr.aws/docker/library/python:3.12 + +ARG POETRY_VERSION=1.8 +RUN pip install "poetry==${POETRY_VERSION}" + +WORKDIR /app + +COPY . . +RUN poetry install --no-root + +ENTRYPOINT ["/bin/bash", "-c", "make", "local"] \ No newline at end of file diff --git a/lib/workload/stateless/stacks/pg-dd/Makefile b/lib/workload/stateless/stacks/pg-dd/Makefile index abc4cd293..d5693b49b 100644 --- a/lib/workload/stateless/stacks/pg-dd/Makefile +++ b/lib/workload/stateless/stacks/pg-dd/Makefile @@ -9,8 +9,8 @@ lint: install check: lint @poetry run ruff check . -local: install - @poetry run local +cli: install + @poetry run cli clean: rm -rf data && rm -rf .ruff_cache \ No newline at end of file diff --git a/lib/workload/stateless/stacks/pg-dd/README.md b/lib/workload/stateless/stacks/pg-dd/README.md index 08f56e977..2c25176ae 100644 --- a/lib/workload/stateless/stacks/pg-dd/README.md +++ b/lib/workload/stateless/stacks/pg-dd/README.md @@ -17,15 +17,16 @@ rows of the filemanager database. This function can be configured by setting the following environment variables, see [.env.example][env-example] for an example: -| Name | Description | Type | -|--------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------| -| `PG_DD_URL` | The database URL to dump databases from. | Postgres connection string | -| `PG_DD_SECRET` | The secret name or ARN to fetch the database URL from. This is only used in the Lambda function, and overrides `PG_DD_URL`. | `string` | -| `PG_DD_DATABASE_` | A name of the database to dump records from where `` represents the target database. Specify this multiple times to use dump from multiple databases. | `string` | -| `PG_DD_DATABASE__SQL` | Custom SQL code to execute when dumping database records for ``. This is optional, and by default all records from all tables are dumped. Specify this is a list of SQL statements to generate a corresponding CSV file. | `string[]` or undefined | -| `PG_DD_BUCKET` | The bucket to dump data to. This is required when deploying the Lambda function. | `string` or undefined | -| `PG_DD_PREFIX` | The bucket prefix to use when writing to a bucket. This is optional. | `string` or undefined | -| `PG_DD_DIR` | The local filesystem directory to dump data to when running this command locally. This is not used on the deployed Lambda function. | filesystem directory or undefined | +| Name | Description | Type | +|-------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------| +| `PG_DD_URL` | The database URL to dump databases from. | Postgres connection string | +| `PG_DD_SECRET` | The secret name or ARN to fetch the database URL from. This is only used in the Lambda function, and overrides `PG_DD_URL`. | `string` | +| `PG_DD_DATABASE_` | A name of the database to dump records from where `` represents the target database. Specify this multiple times to use dump from multiple databases. | `string` | +| `PG_DD_DATABASE__SQL_DUMP` | Custom SQL code to execute when dumping database records for ``. This is optional, and by default all records from all tables are dumped. Specify this is a list of SQL statements to generate a corresponding CSV file. | `string[]` or undefined | +| `PG_DD_DATABASE__SQL_LOAD` | The name of the table to load into for ``. This is required if loading data after dumping with `` to specify the table to load data into. | `string[]` or undefined | +| `PG_DD_BUCKET` | The bucket to dump data to. This is required when deploying the Lambda function. | `string` or undefined | +| `PG_DD_PREFIX` | The bucket prefix to use when writing to a bucket. This is optional. | `string` or undefined | +| `PG_DD_DIR` | The local filesystem directory to dump data to when running this command locally. This is not used on the deployed Lambda function. | filesystem directory or undefined | ## Local development @@ -34,7 +35,7 @@ This project uses [poetry] to manage dependencies. The pg-dd command can be run locally to dump data to a directory: ``` -make local +make cli ``` Run the linter and formatter: diff --git a/lib/workload/stateless/stacks/pg-dd/deploy/stack.ts b/lib/workload/stateless/stacks/pg-dd/deploy/stack.ts index 67f6b254b..a5a924d70 100644 --- a/lib/workload/stateless/stacks/pg-dd/deploy/stack.ts +++ b/lib/workload/stateless/stacks/pg-dd/deploy/stack.ts @@ -90,7 +90,7 @@ export class PgDDStack extends Stack { index: 'pg_dd/handler.py', runtime: Runtime.PYTHON_3_12, architecture: Architecture.ARM_64, - timeout: Duration.minutes(15), + timeout: Duration.minutes(5), memorySize: 1024, vpc: this.vpc, vpcSubnets: { @@ -108,6 +108,7 @@ export class PgDDStack extends Stack { PG_DD_DATABASE_SEQUENCE_RUN_MANAGER: 'sequence_run_manager', PG_DD_DATABASE_WORKFLOW_MANAGER: 'workflow_manager', PG_DD_DATABASE_FILEMANAGER: 'filemanager', + PG_DD_DATABASE_FILEMANAGER_SQL: 'select * from s3_object order by sequencer limit 10000', ...(props.prefix && { PG_DD_PREFIX: props.prefix }), }, }); diff --git a/lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py b/lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py new file mode 100644 index 000000000..4ab9ce209 --- /dev/null +++ b/lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py @@ -0,0 +1,43 @@ +from pg_dd.pg_dd import PgDDLocal, PgDDS3 +import click + + +@click.group() +def cli(): + pass + + +@cli.command() +def download(): + """ + Download S3 CSV dumps to the local directory. + """ + PgDDS3().download_local() + + +@cli.command() +def upload(): + """ + Uploads local CSV dumps to S3. + """ + PgDDS3().write_to_bucket() + + +@cli.command() +def dump(): + """ + Dump from the local database to CSV files. + """ + PgDDLocal().write_to_dir() + + +@cli.command() +def load(): + """ + Load local CSV files into the database + """ + PgDDLocal().load_to_database() + + +if __name__ == "__main__": + cli() diff --git a/lib/workload/stateless/stacks/pg-dd/pg_dd/local.py b/lib/workload/stateless/stacks/pg-dd/pg_dd/local.py deleted file mode 100644 index 872129ab1..000000000 --- a/lib/workload/stateless/stacks/pg-dd/pg_dd/local.py +++ /dev/null @@ -1,14 +0,0 @@ -import os - -from pg_dd.pg_dd import PgDDS3, PgDDLocal - - -def main(): - if os.getenv("PG_DD_BUCKET"): - PgDDS3().write_to_bucket() - else: - PgDDLocal().write_to_dir() - - -if __name__ == "__main__": - main() diff --git a/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py b/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py index f9e465bc5..c8c601f3c 100644 --- a/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py +++ b/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py @@ -1,7 +1,7 @@ import gzip import logging import os -from typing import Dict, List, Any +from typing import Dict, List, Any, Tuple, LiteralString import boto3 import psycopg @@ -19,9 +19,87 @@ class PgDD: """ def __init__(self, logger=logging.getLogger(__name__)): + self.url = os.getenv("PG_DD_URL") self.databases = self.read_databases() self.logger = logger + def csvs_for_tables(self) -> Dict[str, Dict[str, str]]: + """ + Get csvs for all tables in all databases. + """ + + databases = {} + for entry in self.databases.values(): + url = f"{self.url}/{entry['database']}" + + conn: psycopg.connection.Connection + with psycopg.connect(url) as conn: + if entry.get("sql_dump") is not None: + tables = [(i, e, True) for i, e in enumerate(entry["sql_dump"])] + else: + with conn.cursor() as cur: + cur.execute( + """ + select table_name from information_schema.tables + where table_schema='public'; + """ + ) + tables = [(name[0], name[0], False) for name in cur.fetchall()] + self.logger.info(f"fetched table names: {tables}") + + with conn.cursor() as cur: + databases[entry["database"]] = self.copy_tables_to_csv(cur, tables) + + return databases + + def load_table(self, table, data, conn, only_non_empty=True): + """ + Load a table with the CSV data. + """ + + with conn.cursor() as cur: + if only_non_empty: + exists = cur.execute( + sql.SQL( + """ + select exists( + select from pg_tables where tablename = '{}' + ); + """ + ).format(sql.SQL(table)) + ).fetchone()[0] + has_records = cur.execute( + sql.SQL( + """ + select exists(select * from {}) + """ + ).format(sql.SQL(table)) + ).fetchone()[0] + + if not exists or has_records: + return + + with cur.copy( + sql.SQL( + """ + copy {} from stdin with (format csv, header); + """ + ).format(sql.Identifier(table)), + ) as copy: + copy.write(data) + + def target_files(self) -> List[Tuple[str, str, str, str]]: + """ + Get the target files for all directories. + """ + + files = [] + for database, tables in self.csvs_for_tables().items(): + for table, value in tables.items(): + file = f"{database}/{table}.csv.gz" + files += [(database, table, file, value)] + return files + @staticmethod def read_databases() -> Dict[str, Dict[str, Any]]: """ @@ -29,17 +107,28 @@ def read_databases() -> Dict[str, Dict[str, Any]]: """ prefix = "PG_DD_DATABASE_" - sql_prefix = "_SQL" + sql_dump_prefix = "_SQL_DUMP" + sql_load_prefix = "_SQL_LOAD" variables = {} for key, value in os.environ.items(): if key[: len(prefix)] == prefix: database = key[len(prefix) :] - suffix = database[-len(sql_prefix) :] - database = database.removesuffix(sql_prefix) + suffix_dump = database[-len(sql_dump_prefix) :] + suffix_load = database[-len(sql_load_prefix) :] + + database = database.removesuffix(sql_dump_prefix).removesuffix( + sql_load_prefix + ) variables.setdefault(database, {}) - if suffix == sql_prefix: - variables[database]["sql"] = [s.strip() for s in value.split(",")] + if suffix_dump == sql_dump_prefix: + variables[database]["sql_dump"] = [ + s.strip() for s in value.split(",") + ] + elif suffix_load == sql_load_prefix: + variables[database]["sql_load"] = [ + s.strip() for s in value.split(",") + ] else: variables[database]["database"] = database.lower() @@ -47,57 +136,38 @@ def read_databases() -> Dict[str, Dict[str, Any]]: @staticmethod def copy_tables_to_csv( - cur: psycopg.cursor.Cursor, tables: List[str] + cur: psycopg.cursor.Cursor, tables: List[Tuple[str, str, bool]] ) -> Dict[str, str]: """ Get tables as a csv string. """ csvs = {} - for table in tables: - rows = [] - copy: psycopg.Copy - with cur.copy( - sql.SQL( + for name, table, is_statement in tables: + if is_statement: + statement = sql.SQL( + """ + copy ({}) to stdout with (format csv, header); + """ + ) + else: + statement = sql.SQL( """ copy {} to stdout with (format csv, header); """ - ).format(sql.Identifier(table)) - ) as copy: + ) + + rows = [] + copy: psycopg.Copy + table: LiteralString = table + with cur.copy(statement.format(sql.SQL(table))) as copy: for row in copy: rows += [row.tobytes().decode("utf-8")] - csvs[table] = "".join(rows) + csvs[name] = "".join(rows) return csvs - def csvs_for_tables(self) -> Dict[str, Dict[str, str]]: - """ - Get csvs for all tables in all databases. - """ - - database_url = os.getenv("PG_DD_URL") - databases = {} - for entry in self.databases.values(): - url = f"{database_url}/{entry['database']}" - - conn: psycopg.connection.Connection - with psycopg.connect(url) as conn: - with conn.cursor() as cur: - cur.execute( - """ - select table_name from information_schema.tables - where table_schema='public'; - """ - ) - tables = [name[0] for name in cur.fetchall()] - self.logger.debug(f"fetched table names: {tables}") - - with conn.cursor() as cur: - databases[entry["database"]] = self.copy_tables_to_csv(cur, tables) - - return databases - class PgDDLocal(PgDD): """ @@ -107,48 +177,85 @@ class PgDDLocal(PgDD): def __init__(self, logger=logging.getLogger(__name__)): super().__init__(logger=logger) self.out = os.getenv("PG_DD_DIR") + self.bucket = os.getenv("PG_DD_BUCKET") + self.prefix = os.getenv("PG_DD_PREFIX") + self.s3: S3ServiceResource = boto3.resource("s3") - def write_to_dir(self) -> None: + def write_to_dir(self): """ Write the CSV files to the output directory. """ - for database, tables in self.csvs_for_tables().items(): - output_dir = f"{self.out}/{database}" - if not os.path.exists(output_dir): - os.makedirs(output_dir) + for _, _, f, value in self.target_files(): + file = f"{self.out}/{f}" + os.makedirs(file.rsplit("/", 1)[0], exist_ok=True) + self.logger.info(f"writing to file: {f}") - self.logger.debug(f"writing to directory: {output_dir}") + with open(file, "wb") as f: + f.write(gzip.compress(str.encode(value))) - for table, value in tables.items(): - with open(f"{self.out}/{database}/{table}.csv.gz", "wb") as f: - f.write(gzip.compress(str.encode(value))) + def load_to_database(self): + """ + Download from S3 CSV files to load. + """ + + def load_files(): + for root, _, files in os.walk(f"{self.out}/{database}"): + for file in files: + with open(f"{root}/{file}", "rb") as f: + table = file.removesuffix(".csv.gz") + load = self.databases[database.upper()].get("sql_load") + if load is not None: + table = load[int(table)] + + self.load_table( + table, gzip.decompress(f.read()).decode("utf-8"), conn + ) + + for _, dirs, _ in os.walk(self.out): + for database in dirs: + conn: psycopg.connection.Connection + with psycopg.connect(f"{self.url}/{database}") as conn: + conn.set_deferrable(True) + load_files() + conn.commit() class PgDDS3(PgDD): """ - Dump CSV files to an S3 bucket. + Commands related to running this inside a Lambda function. """ def __init__(self, logger=logging.getLogger(__name__)): super().__init__(logger=logger) self.bucket = os.getenv("PG_DD_BUCKET") self.prefix = os.getenv("PG_DD_PREFIX") + self.dir = os.getenv("PG_DD_DIR") + self.s3: S3ServiceResource = boto3.resource("s3") - def write_to_bucket(self) -> None: + def write_to_bucket(self): """ Write the CSV files to the S3 bucket. """ - s3: S3ServiceResource = boto3.resource("s3") - for database, tables in self.csvs_for_tables().items(): - for table, value in tables.items(): - key = f"{database}/{table}.csv.gz" + for _, _, key, value in self.target_files(): + if self.prefix: + key = f"{self.prefix}/{key}" + + self.logger.info(f"writing to bucket with key: {key}") + + s3_object = self.s3.Object(self.bucket, key) + s3_object.put(Body=gzip.compress(str.encode(value))) - if self.prefix: - key = f"{self.prefix}/{key}" + def download_local(self): + """ + Download from S3 CSV files to load. + """ - self.logger.debug(f"writing to bucket with key: {key}") + for _, _, f, _ in self.target_files(): + self.logger.info(f"target file: {f}") + file = f"{self.dir}/{f}" + os.makedirs(file.rsplit("/", 1)[0], exist_ok=True) - s3_object = s3.Object(self.bucket, key) - s3_object.put(Body=gzip.compress(str.encode(value))) + s3_object = self.s3.Object(self.bucket, f"{self.prefix}/{f}") + s3_object.download_file(f"{self.dir}/{f}") diff --git a/lib/workload/stateless/stacks/pg-dd/poetry.lock b/lib/workload/stateless/stacks/pg-dd/poetry.lock index fabde26b2..26b9fc095 100644 --- a/lib/workload/stateless/stacks/pg-dd/poetry.lock +++ b/lib/workload/stateless/stacks/pg-dd/poetry.lock @@ -2,17 +2,17 @@ [[package]] name = "boto3" -version = "1.35.63" +version = "1.35.64" description = "The AWS SDK for Python" optional = false python-versions = ">=3.8" files = [ - {file = "boto3-1.35.63-py3-none-any.whl", hash = "sha256:d0f938d4f6f392b6ffc5e75fff14a42e5bbb5228675a0367c8af55398abadbec"}, - {file = "boto3-1.35.63.tar.gz", hash = "sha256:deb593d9a0fb240deb4c43e4da8e6626d7c36be7b2fd2fe28f49d44d395b7de0"}, + {file = "boto3-1.35.64-py3-none-any.whl", hash = "sha256:cdacf03fc750caa3aa0dbf6158166def9922c9d67b4160999ff8fc350662facc"}, + {file = "boto3-1.35.64.tar.gz", hash = "sha256:bc3fc12b41fa2c91e51ab140f74fb1544408a2b1e00f88a4c2369a66d18ddf20"}, ] [package.dependencies] -botocore = ">=1.35.63,<1.36.0" +botocore = ">=1.35.64,<1.36.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.10.0,<0.11.0" @@ -21,13 +21,13 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.35.63" +version = "1.35.64" description = "Low-level, data-driven core of boto 3." optional = false python-versions = ">=3.8" files = [ - {file = "botocore-1.35.63-py3-none-any.whl", hash = "sha256:0ca1200694a4c0a3fa846795d8e8a08404c214e21195eb9e010c4b8a4ca78a4a"}, - {file = "botocore-1.35.63.tar.gz", hash = "sha256:2b8196bab0a997d206c3d490b52e779ef47dffb68c57c685443f77293aca1589"}, + {file = "botocore-1.35.64-py3-none-any.whl", hash = "sha256:bbd96bf7f442b1d5e35b36f501076e4a588c83d8d84a1952e9ee1d767e5efb3e"}, + {file = "botocore-1.35.64.tar.gz", hash = "sha256:2f95c83f31c9e38a66995c88810fc638c829790e125032ba00ab081a2cf48cb9"}, ] [package.dependencies] @@ -38,6 +38,31 @@ urllib3 = {version = ">=1.25.4,<2.2.0 || >2.2.0,<3", markers = "python_version > [package.extras] crt = ["awscrt (==0.22.0)"] +[[package]] +name = "click" +version = "8.1.7" +description = "Composable command line interface toolkit" +optional = false +python-versions = ">=3.7" +files = [ + {file = "click-8.1.7-py3-none-any.whl", hash = "sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28"}, + {file = "click-8.1.7.tar.gz", hash = "sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "platform_system == \"Windows\""} + +[[package]] +name = "colorama" +version = "0.4.6" +description = "Cross-platform colored terminal text." +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +files = [ + {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, + {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, +] + [[package]] name = "jmespath" version = "1.0.1" @@ -363,4 +388,4 @@ zstd = ["zstandard (>=0.18.0)"] [metadata] lock-version = "2.0" python-versions = "^3.12" -content-hash = "b35be5685a82a2219a11fda89d02d5868fe1f311462c6e32297961021b7a2e89" +content-hash = "7b6452562763ec8f1bcf43ad6f8cfaae7702c8190d90d8ec142eceb5df7294df" diff --git a/lib/workload/stateless/stacks/pg-dd/pyproject.toml b/lib/workload/stateless/stacks/pg-dd/pyproject.toml index a3545aab1..d62b09c2e 100644 --- a/lib/workload/stateless/stacks/pg-dd/pyproject.toml +++ b/lib/workload/stateless/stacks/pg-dd/pyproject.toml @@ -13,6 +13,7 @@ psycopg = { version = "^3", extras = ["binary"] } python-dotenv = "^1" libumccr = "^0.4" mypy-boto3-s3 = "^1" +click = "^8" [tool.poetry.group.dev.dependencies] mypy = "^1" @@ -23,4 +24,4 @@ requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" [tool.poetry.scripts] -local = "pg_dd.local:main" +cli = "pg_dd.cli:cli"