From ddbf0480eb9ff0d10dfeb4261c6f8c549aa6e054 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Mon, 18 Nov 2024 08:09:24 +1100 Subject: [PATCH 1/8] feat(pg-dd): add command for dumping data --- .../stateless/stacks/pg-dd/.env.example | 8 + .../stateless/stacks/pg-dd/.gitignore | 2 + lib/workload/stateless/stacks/pg-dd/Makefile | 13 + lib/workload/stateless/stacks/pg-dd/README.md | 3 + .../stateless/stacks/pg-dd/pg_dd/__init__.py | 0 .../stateless/stacks/pg-dd/pg_dd/local.py | 14 + .../stateless/stacks/pg-dd/pg_dd/pg_dd.py | 119 ++++++ .../stateless/stacks/pg-dd/poetry.lock | 348 ++++++++++++++++++ .../stateless/stacks/pg-dd/pyproject.toml | 25 ++ 9 files changed, 532 insertions(+) create mode 100644 lib/workload/stateless/stacks/pg-dd/.env.example create mode 100644 lib/workload/stateless/stacks/pg-dd/.gitignore create mode 100644 lib/workload/stateless/stacks/pg-dd/Makefile create mode 100644 lib/workload/stateless/stacks/pg-dd/README.md create mode 100644 lib/workload/stateless/stacks/pg-dd/pg_dd/__init__.py create mode 100644 lib/workload/stateless/stacks/pg-dd/pg_dd/local.py create mode 100644 lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py create mode 100644 lib/workload/stateless/stacks/pg-dd/poetry.lock create mode 100644 lib/workload/stateless/stacks/pg-dd/pyproject.toml diff --git a/lib/workload/stateless/stacks/pg-dd/.env.example b/lib/workload/stateless/stacks/pg-dd/.env.example new file mode 100644 index 000000000..16755c421 --- /dev/null +++ b/lib/workload/stateless/stacks/pg-dd/.env.example @@ -0,0 +1,8 @@ +PG_DD_URL=postgresql://orcabus:orcabus@0.0.0.0:5432/orcabus# pragma: allowlist secret +PG_DD_DIR=data + +PG_DD_DATABASE_METADATA_MANAGER=metadata_manager +PG_DD_DATABASE_SEQUENCE_RUN_MANAGER=sequence_run_manager +PG_DD_DATABASE_WORKFLOW_MANAGER=workflow_manager +PG_DD_DATABASE_FILEMANAGER=filemanager +PG_DD_DATABASE_FILEMANAGER_SQL="select * from s3_object" \ No newline at end of file diff --git a/lib/workload/stateless/stacks/pg-dd/.gitignore b/lib/workload/stateless/stacks/pg-dd/.gitignore new file mode 100644 index 000000000..d70ad070a --- /dev/null +++ b/lib/workload/stateless/stacks/pg-dd/.gitignore @@ -0,0 +1,2 @@ +.env +data \ No newline at end of file diff --git a/lib/workload/stateless/stacks/pg-dd/Makefile b/lib/workload/stateless/stacks/pg-dd/Makefile new file mode 100644 index 000000000..17ddf8fc0 --- /dev/null +++ b/lib/workload/stateless/stacks/pg-dd/Makefile @@ -0,0 +1,13 @@ +.PHONY: * + +install: + @poetry update + +lint: install + @poetry run ruff format . + +check: lint + @poetry run ruff check . + +local: install + @poetry run local \ No newline at end of file diff --git a/lib/workload/stateless/stacks/pg-dd/README.md b/lib/workload/stateless/stacks/pg-dd/README.md new file mode 100644 index 000000000..d82d23050 --- /dev/null +++ b/lib/workload/stateless/stacks/pg-dd/README.md @@ -0,0 +1,3 @@ +# Pg-dd + +Postgres data dump - a service that dumps (like `dd`) orcabus postgres databases to S3. diff --git a/lib/workload/stateless/stacks/pg-dd/pg_dd/__init__.py b/lib/workload/stateless/stacks/pg-dd/pg_dd/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/workload/stateless/stacks/pg-dd/pg_dd/local.py b/lib/workload/stateless/stacks/pg-dd/pg_dd/local.py new file mode 100644 index 000000000..872129ab1 --- /dev/null +++ b/lib/workload/stateless/stacks/pg-dd/pg_dd/local.py @@ -0,0 +1,14 @@ +import os + +from pg_dd.pg_dd import PgDDS3, PgDDLocal + + +def main(): + if os.getenv("PG_DD_BUCKET"): + PgDDS3().write_to_bucket() + else: + PgDDLocal().write_to_dir() + + +if __name__ == "__main__": + main() diff --git a/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py b/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py new file mode 100644 index 000000000..4c306b78b --- /dev/null +++ b/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py @@ -0,0 +1,119 @@ +import os +from typing import Dict, List, Any + +import boto3 +import psycopg +from psycopg import sql +from mypy_boto3_s3 import S3ServiceResource + +from dotenv import load_dotenv + +load_dotenv() + + +class PgDD: + def __init__(self): + self.databases = self.read_databases() + + @staticmethod + def read_databases() -> Dict[str, Dict[str, Any]]: + prefix = "PG_DD_DATABASE_" + sql_prefix = "_SQL" + variables = {} + for key, value in os.environ.items(): + if key[: len(prefix)] == prefix: + database = key[len(prefix) :] + suffix = database[-len(sql_prefix) :] + database = database.removesuffix(sql_prefix) + variables.setdefault(database, {}) + + if suffix == sql_prefix: + variables[database]["sql"] = [s.strip() for s in value.split(",")] + else: + variables[database]["database"] = database.lower() + + return variables + + @staticmethod + def copy_tables_to_csv( + cur: psycopg.cursor.Cursor, tables: List[str] + ) -> Dict[str, str]: + csvs = {} + for table in tables: + rows = [] + copy: psycopg.Copy + with cur.copy( + sql.SQL( + """ + copy {} to stdout with (format csv, header); + """ + ).format(sql.Identifier(table)) + ) as copy: + for row in copy: + rows += [row.tobytes().decode("utf-8")] + + csvs[table] = "".join(rows) + + return csvs + + def csvs_for_tables(self) -> Dict[str, Dict[str, str]]: + database_url = os.getenv("PG_DD_URL") + databases = {} + for entry in self.databases.values(): + url = database_url.rsplit("/", 1)[0] + url = f"{url}/{entry['database']}" + + conn: psycopg.connection.Connection + with psycopg.connect(url) as conn: + with conn.cursor() as cur: + cur.execute( + """ + select table_name from information_schema.tables + where table_schema='public'; + """ + ) + tables = [name[0] for name in cur.fetchall()] + + with conn.cursor() as cur: + databases[entry["database"]] = self.copy_tables_to_csv(cur, tables) + + return databases + + +class PgDDLocal(PgDD): + def __init__(self): + super().__init__() + self.out = os.getenv("PG_DD_DIR") + + def write_to_dir(self) -> None: + for database, tables in self.csvs_for_tables().items(): + output_dir = f"{self.out}/{database}" + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + for table, value in tables.items(): + with open(f"{self.out}/{database}/{table}.csv", "w") as f: + f.write(value) + + +class PgDDS3(PgDD): + def __init__(self): + super().__init__() + self.bucket = os.getenv("PG_DD_BUCKET") + self.prefix = os.getenv("PG_DD_PREFIX") + + def write_to_bucket(self) -> None: + s3: S3ServiceResource = boto3.resource("s3") + for database, tables in self.csvs_for_tables().items(): + for table, value in tables.items(): + key = f"{database}/{table}.csv" + + if self.prefix: + key = f"{self.prefix}/{key}" + + s3_object = s3.Object(self.bucket, key) + s3_object.put(Body=value) + + +def handler(): + PgDDS3().write_to_bucket() diff --git a/lib/workload/stateless/stacks/pg-dd/poetry.lock b/lib/workload/stateless/stacks/pg-dd/poetry.lock new file mode 100644 index 000000000..dff49a018 --- /dev/null +++ b/lib/workload/stateless/stacks/pg-dd/poetry.lock @@ -0,0 +1,348 @@ +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. + +[[package]] +name = "boto3" +version = "1.35.63" +description = "The AWS SDK for Python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "boto3-1.35.63-py3-none-any.whl", hash = "sha256:d0f938d4f6f392b6ffc5e75fff14a42e5bbb5228675a0367c8af55398abadbec"}, + {file = "boto3-1.35.63.tar.gz", hash = "sha256:deb593d9a0fb240deb4c43e4da8e6626d7c36be7b2fd2fe28f49d44d395b7de0"}, +] + +[package.dependencies] +botocore = ">=1.35.63,<1.36.0" +jmespath = ">=0.7.1,<2.0.0" +s3transfer = ">=0.10.0,<0.11.0" + +[package.extras] +crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] + +[[package]] +name = "botocore" +version = "1.35.63" +description = "Low-level, data-driven core of boto 3." +optional = false +python-versions = ">=3.8" +files = [ + {file = "botocore-1.35.63-py3-none-any.whl", hash = "sha256:0ca1200694a4c0a3fa846795d8e8a08404c214e21195eb9e010c4b8a4ca78a4a"}, + {file = "botocore-1.35.63.tar.gz", hash = "sha256:2b8196bab0a997d206c3d490b52e779ef47dffb68c57c685443f77293aca1589"}, +] + +[package.dependencies] +jmespath = ">=0.7.1,<2.0.0" +python-dateutil = ">=2.1,<3.0.0" +urllib3 = {version = ">=1.25.4,<2.2.0 || >2.2.0,<3", markers = "python_version >= \"3.10\""} + +[package.extras] +crt = ["awscrt (==0.22.0)"] + +[[package]] +name = "jmespath" +version = "1.0.1" +description = "JSON Matching Expressions" +optional = false +python-versions = ">=3.7" +files = [ + {file = "jmespath-1.0.1-py3-none-any.whl", hash = "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980"}, + {file = "jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe"}, +] + +[[package]] +name = "mypy" +version = "1.13.0" +description = "Optional static typing for Python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "mypy-1.13.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:6607e0f1dd1fb7f0aca14d936d13fd19eba5e17e1cd2a14f808fa5f8f6d8f60a"}, + {file = "mypy-1.13.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:8a21be69bd26fa81b1f80a61ee7ab05b076c674d9b18fb56239d72e21d9f4c80"}, + {file = "mypy-1.13.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:7b2353a44d2179846a096e25691d54d59904559f4232519d420d64da6828a3a7"}, + {file = "mypy-1.13.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:0730d1c6a2739d4511dc4253f8274cdd140c55c32dfb0a4cf8b7a43f40abfa6f"}, + {file = "mypy-1.13.0-cp310-cp310-win_amd64.whl", hash = "sha256:c5fc54dbb712ff5e5a0fca797e6e0aa25726c7e72c6a5850cfd2adbc1eb0a372"}, + {file = "mypy-1.13.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:581665e6f3a8a9078f28d5502f4c334c0c8d802ef55ea0e7276a6e409bc0d82d"}, + {file = "mypy-1.13.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:3ddb5b9bf82e05cc9a627e84707b528e5c7caaa1c55c69e175abb15a761cec2d"}, + {file = "mypy-1.13.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:20c7ee0bc0d5a9595c46f38beb04201f2620065a93755704e141fcac9f59db2b"}, + {file = "mypy-1.13.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:3790ded76f0b34bc9c8ba4def8f919dd6a46db0f5a6610fb994fe8efdd447f73"}, + {file = "mypy-1.13.0-cp311-cp311-win_amd64.whl", hash = "sha256:51f869f4b6b538229c1d1bcc1dd7d119817206e2bc54e8e374b3dfa202defcca"}, + {file = "mypy-1.13.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:5c7051a3461ae84dfb5dd15eff5094640c61c5f22257c8b766794e6dd85e72d5"}, + {file = "mypy-1.13.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:39bb21c69a5d6342f4ce526e4584bc5c197fd20a60d14a8624d8743fffb9472e"}, + {file = "mypy-1.13.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:164f28cb9d6367439031f4c81e84d3ccaa1e19232d9d05d37cb0bd880d3f93c2"}, + {file = "mypy-1.13.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:a4c1bfcdbce96ff5d96fc9b08e3831acb30dc44ab02671eca5953eadad07d6d0"}, + {file = "mypy-1.13.0-cp312-cp312-win_amd64.whl", hash = "sha256:a0affb3a79a256b4183ba09811e3577c5163ed06685e4d4b46429a271ba174d2"}, + {file = "mypy-1.13.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:a7b44178c9760ce1a43f544e595d35ed61ac2c3de306599fa59b38a6048e1aa7"}, + {file = "mypy-1.13.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:5d5092efb8516d08440e36626f0153b5006d4088c1d663d88bf79625af3d1d62"}, + {file = "mypy-1.13.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:de2904956dac40ced10931ac967ae63c5089bd498542194b436eb097a9f77bc8"}, + {file = "mypy-1.13.0-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:7bfd8836970d33c2105562650656b6846149374dc8ed77d98424b40b09340ba7"}, + {file = "mypy-1.13.0-cp313-cp313-win_amd64.whl", hash = "sha256:9f73dba9ec77acb86457a8fc04b5239822df0c14a082564737833d2963677dbc"}, + {file = "mypy-1.13.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:100fac22ce82925f676a734af0db922ecfea991e1d7ec0ceb1e115ebe501301a"}, + {file = "mypy-1.13.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:7bcb0bb7f42a978bb323a7c88f1081d1b5dee77ca86f4100735a6f541299d8fb"}, + {file = "mypy-1.13.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:bde31fc887c213e223bbfc34328070996061b0833b0a4cfec53745ed61f3519b"}, + {file = "mypy-1.13.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:07de989f89786f62b937851295ed62e51774722e5444a27cecca993fc3f9cd74"}, + {file = "mypy-1.13.0-cp38-cp38-win_amd64.whl", hash = "sha256:4bde84334fbe19bad704b3f5b78c4abd35ff1026f8ba72b29de70dda0916beb6"}, + {file = "mypy-1.13.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:0246bcb1b5de7f08f2826451abd947bf656945209b140d16ed317f65a17dc7dc"}, + {file = "mypy-1.13.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:7f5b7deae912cf8b77e990b9280f170381fdfbddf61b4ef80927edd813163732"}, + {file = "mypy-1.13.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:7029881ec6ffb8bc233a4fa364736789582c738217b133f1b55967115288a2bc"}, + {file = "mypy-1.13.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:3e38b980e5681f28f033f3be86b099a247b13c491f14bb8b1e1e134d23bb599d"}, + {file = "mypy-1.13.0-cp39-cp39-win_amd64.whl", hash = "sha256:a6789be98a2017c912ae6ccb77ea553bbaf13d27605d2ca20a76dfbced631b24"}, + {file = "mypy-1.13.0-py3-none-any.whl", hash = "sha256:9c250883f9fd81d212e0952c92dbfcc96fc237f4b7c92f56ac81fd48460b3e5a"}, + {file = "mypy-1.13.0.tar.gz", hash = "sha256:0291a61b6fbf3e6673e3405cfcc0e7650bebc7939659fdca2702958038bd835e"}, +] + +[package.dependencies] +mypy-extensions = ">=1.0.0" +typing-extensions = ">=4.6.0" + +[package.extras] +dmypy = ["psutil (>=4.0)"] +faster-cache = ["orjson"] +install-types = ["pip"] +mypyc = ["setuptools (>=50)"] +reports = ["lxml"] + +[[package]] +name = "mypy-boto3-s3" +version = "1.35.61" +description = "Type annotations for boto3.S3 1.35.61 service generated with mypy-boto3-builder 8.2.1" +optional = false +python-versions = ">=3.8" +files = [ + {file = "mypy_boto3_s3-1.35.61-py3-none-any.whl", hash = "sha256:4cfc410a02a302935876f0d1ae3f0738bf540acd686168790fb0c5986a085f1e"}, + {file = "mypy_boto3_s3-1.35.61.tar.gz", hash = "sha256:6965fe6c5f3d8362ac7895663540844ed29c885c92a62233c29a1c031ca28c90"}, +] + +[[package]] +name = "mypy-extensions" +version = "1.0.0" +description = "Type system extensions for programs checked with the mypy type checker." +optional = false +python-versions = ">=3.5" +files = [ + {file = "mypy_extensions-1.0.0-py3-none-any.whl", hash = "sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d"}, + {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"}, +] + +[[package]] +name = "psycopg" +version = "3.2.3" +description = "PostgreSQL database adapter for Python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "psycopg-3.2.3-py3-none-any.whl", hash = "sha256:644d3973fe26908c73d4be746074f6e5224b03c1101d302d9a53bf565ad64907"}, + {file = "psycopg-3.2.3.tar.gz", hash = "sha256:a5764f67c27bec8bfac85764d23c534af2c27b893550377e37ce59c12aac47a2"}, +] + +[package.dependencies] +psycopg-binary = {version = "3.2.3", optional = true, markers = "implementation_name != \"pypy\" and extra == \"binary\""} +typing-extensions = {version = ">=4.6", markers = "python_version < \"3.13\""} +tzdata = {version = "*", markers = "sys_platform == \"win32\""} + +[package.extras] +binary = ["psycopg-binary (==3.2.3)"] +c = ["psycopg-c (==3.2.3)"] +dev = ["ast-comments (>=1.1.2)", "black (>=24.1.0)", "codespell (>=2.2)", "dnspython (>=2.1)", "flake8 (>=4.0)", "mypy (>=1.11)", "types-setuptools (>=57.4)", "wheel (>=0.37)"] +docs = ["Sphinx (>=5.0)", "furo (==2022.6.21)", "sphinx-autobuild (>=2021.3.14)", "sphinx-autodoc-typehints (>=1.12)"] +pool = ["psycopg-pool"] +test = ["anyio (>=4.0)", "mypy (>=1.11)", "pproxy (>=2.7)", "pytest (>=6.2.5)", "pytest-cov (>=3.0)", "pytest-randomly (>=3.5)"] + +[[package]] +name = "psycopg-binary" +version = "3.2.3" +description = "PostgreSQL database adapter for Python -- C optimisation distribution" +optional = false +python-versions = ">=3.8" +files = [ + {file = "psycopg_binary-3.2.3-cp310-cp310-macosx_12_0_x86_64.whl", hash = "sha256:965455eac8547f32b3181d5ec9ad8b9be500c10fe06193543efaaebe3e4ce70c"}, + {file = "psycopg_binary-3.2.3-cp310-cp310-macosx_14_0_arm64.whl", hash = "sha256:71adcc8bc80a65b776510bc39992edf942ace35b153ed7a9c6c573a6849ce308"}, + {file = "psycopg_binary-3.2.3-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f73adc05452fb85e7a12ed3f69c81540a8875960739082e6ea5e28c373a30774"}, + {file = "psycopg_binary-3.2.3-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e8630943143c6d6ca9aefc88bbe5e76c90553f4e1a3b2dc339e67dc34aa86f7e"}, + {file = "psycopg_binary-3.2.3-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:3bffb61e198a91f712cc3d7f2d176a697cb05b284b2ad150fb8edb308eba9002"}, + {file = "psycopg_binary-3.2.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc4fa2240c9fceddaa815a58f29212826fafe43ce80ff666d38c4a03fb036955"}, + {file = "psycopg_binary-3.2.3-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:192a5f8496e6e1243fdd9ac20e117e667c0712f148c5f9343483b84435854c78"}, + {file = "psycopg_binary-3.2.3-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:64dc6e9ec64f592f19dc01a784e87267a64a743d34f68488924251253da3c818"}, + {file = "psycopg_binary-3.2.3-cp310-cp310-musllinux_1_2_ppc64le.whl", hash = "sha256:79498df398970abcee3d326edd1d4655de7d77aa9aecd578154f8af35ce7bbd2"}, + {file = "psycopg_binary-3.2.3-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:949551752930d5e478817e0b49956350d866b26578ced0042a61967e3fcccdea"}, + {file = "psycopg_binary-3.2.3-cp310-cp310-win_amd64.whl", hash = "sha256:80a2337e2dfb26950894c8301358961430a0304f7bfe729d34cc036474e9c9b1"}, + {file = "psycopg_binary-3.2.3-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:6d8f2144e0d5808c2e2aed40fbebe13869cd00c2ae745aca4b3b16a435edb056"}, + {file = "psycopg_binary-3.2.3-cp311-cp311-macosx_14_0_arm64.whl", hash = "sha256:94253be2b57ef2fea7ffe08996067aabf56a1eb9648342c9e3bad9e10c46e045"}, + {file = "psycopg_binary-3.2.3-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fda0162b0dbfa5eaed6cdc708179fa27e148cb8490c7d62e5cf30713909658ea"}, + {file = "psycopg_binary-3.2.3-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2c0419cdad8c70eaeb3116bb28e7b42d546f91baf5179d7556f230d40942dc78"}, + {file = "psycopg_binary-3.2.3-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:74fbf5dd3ef09beafd3557631e282f00f8af4e7a78fbfce8ab06d9cd5a789aae"}, + {file = "psycopg_binary-3.2.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7d784f614e4d53050cbe8abf2ae9d1aaacf8ed31ce57b42ce3bf2a48a66c3a5c"}, + {file = "psycopg_binary-3.2.3-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:4e76ce2475ed4885fe13b8254058be710ec0de74ebd8ef8224cf44a9a3358e5f"}, + {file = "psycopg_binary-3.2.3-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:5938b257b04c851c2d1e6cb2f8c18318f06017f35be9a5fe761ee1e2e344dfb7"}, + {file = "psycopg_binary-3.2.3-cp311-cp311-musllinux_1_2_ppc64le.whl", hash = "sha256:257c4aea6f70a9aef39b2a77d0658a41bf05c243e2bf41895eb02220ac6306f3"}, + {file = "psycopg_binary-3.2.3-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:06b5cc915e57621eebf2393f4173793ed7e3387295f07fed93ed3fb6a6ccf585"}, + {file = "psycopg_binary-3.2.3-cp311-cp311-win_amd64.whl", hash = "sha256:09baa041856b35598d335b1a74e19a49da8500acedf78164600694c0ba8ce21b"}, + {file = "psycopg_binary-3.2.3-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:48f8ca6ee8939bab760225b2ab82934d54330eec10afe4394a92d3f2a0c37dd6"}, + {file = "psycopg_binary-3.2.3-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:5361ea13c241d4f0ec3f95e0bf976c15e2e451e9cc7ef2e5ccfc9d170b197a40"}, + {file = "psycopg_binary-3.2.3-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:cb987f14af7da7c24f803111dbc7392f5070fd350146af3345103f76ea82e339"}, + {file = "psycopg_binary-3.2.3-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0463a11b1cace5a6aeffaf167920707b912b8986a9c7920341c75e3686277920"}, + {file = "psycopg_binary-3.2.3-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:8b7be9a6c06518967b641fb15032b1ed682fd3b0443f64078899c61034a0bca6"}, + {file = "psycopg_binary-3.2.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:64a607e630d9f4b2797f641884e52b9f8e239d35943f51bef817a384ec1678fe"}, + {file = "psycopg_binary-3.2.3-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:fa33ead69ed133210d96af0c63448b1385df48b9c0247eda735c5896b9e6dbbf"}, + {file = "psycopg_binary-3.2.3-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:1f8b0d0e99d8e19923e6e07379fa00570be5182c201a8c0b5aaa9a4d4a4ea20b"}, + {file = "psycopg_binary-3.2.3-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:709447bd7203b0b2debab1acec23123eb80b386f6c29e7604a5d4326a11e5bd6"}, + {file = "psycopg_binary-3.2.3-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:5e37d5027e297a627da3551a1e962316d0f88ee4ada74c768f6c9234e26346d9"}, + {file = "psycopg_binary-3.2.3-cp312-cp312-win_amd64.whl", hash = "sha256:261f0031ee6074765096a19b27ed0f75498a8338c3dcd7f4f0d831e38adf12d1"}, + {file = "psycopg_binary-3.2.3-cp313-cp313-macosx_12_0_x86_64.whl", hash = "sha256:41fdec0182efac66b27478ac15ef54c9ebcecf0e26ed467eb7d6f262a913318b"}, + {file = "psycopg_binary-3.2.3-cp313-cp313-macosx_14_0_arm64.whl", hash = "sha256:07d019a786eb020c0f984691aa1b994cb79430061065a694cf6f94056c603d26"}, + {file = "psycopg_binary-3.2.3-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4c57615791a337378fe5381143259a6c432cdcbb1d3e6428bfb7ce59fff3fb5c"}, + {file = "psycopg_binary-3.2.3-cp313-cp313-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e8eb9a4e394926b93ad919cad1b0a918e9b4c846609e8c1cfb6b743683f64da0"}, + {file = "psycopg_binary-3.2.3-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:5905729668ef1418bd36fbe876322dcb0f90b46811bba96d505af89e6fbdce2f"}, + {file = "psycopg_binary-3.2.3-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fd65774ed7d65101b314808b6893e1a75b7664f680c3ef18d2e5c84d570fa393"}, + {file = "psycopg_binary-3.2.3-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:700679c02f9348a0d0a2adcd33a0275717cd0d0aee9d4482b47d935023629505"}, + {file = "psycopg_binary-3.2.3-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:96334bb64d054e36fed346c50c4190bad9d7c586376204f50bede21a913bf942"}, + {file = "psycopg_binary-3.2.3-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:9099e443d4cc24ac6872e6a05f93205ba1a231b1a8917317b07c9ef2b955f1f4"}, + {file = "psycopg_binary-3.2.3-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:1985ab05e9abebfbdf3163a16ebb37fbc5d49aff2bf5b3d7375ff0920bbb54cd"}, + {file = "psycopg_binary-3.2.3-cp313-cp313-win_amd64.whl", hash = "sha256:e90352d7b610b4693fad0feea48549d4315d10f1eba5605421c92bb834e90170"}, + {file = "psycopg_binary-3.2.3-cp38-cp38-macosx_12_0_x86_64.whl", hash = "sha256:69320f05de8cdf4077ecd7fefdec223890eea232af0d58f2530cbda2871244a0"}, + {file = "psycopg_binary-3.2.3-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4926ea5c46da30bec4a85907aa3f7e4ea6313145b2aa9469fdb861798daf1502"}, + {file = "psycopg_binary-3.2.3-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c64c4cd0d50d5b2288ab1bcb26c7126c772bbdebdfadcd77225a77df01c4a57e"}, + {file = "psycopg_binary-3.2.3-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:05a1bdce30356e70a05428928717765f4a9229999421013f41338d9680d03a63"}, + {file = "psycopg_binary-3.2.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7ad357e426b0ea5c3043b8ec905546fa44b734bf11d33b3da3959f6e4447d350"}, + {file = "psycopg_binary-3.2.3-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:967b47a0fd237aa17c2748fdb7425015c394a6fb57cdad1562e46a6eb070f96d"}, + {file = "psycopg_binary-3.2.3-cp38-cp38-musllinux_1_2_i686.whl", hash = "sha256:71db8896b942770ed7ab4efa59b22eee5203be2dfdee3c5258d60e57605d688c"}, + {file = "psycopg_binary-3.2.3-cp38-cp38-musllinux_1_2_ppc64le.whl", hash = "sha256:2773f850a778575dd7158a6dd072f7925b67f3ba305e2003538e8831fec77a1d"}, + {file = "psycopg_binary-3.2.3-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:aeddf7b3b3f6e24ccf7d0edfe2d94094ea76b40e831c16eff5230e040ce3b76b"}, + {file = "psycopg_binary-3.2.3-cp38-cp38-win_amd64.whl", hash = "sha256:824c867a38521d61d62b60aca7db7ca013a2b479e428a0db47d25d8ca5067410"}, + {file = "psycopg_binary-3.2.3-cp39-cp39-macosx_12_0_x86_64.whl", hash = "sha256:9994f7db390c17fc2bd4c09dca722fd792ff8a49bb3bdace0c50a83f22f1767d"}, + {file = "psycopg_binary-3.2.3-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1303bf8347d6be7ad26d1362af2c38b3a90b8293e8d56244296488ee8591058e"}, + {file = "psycopg_binary-3.2.3-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:842da42a63ecb32612bb7f5b9e9f8617eab9bc23bd58679a441f4150fcc51c96"}, + {file = "psycopg_binary-3.2.3-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2bb342a01c76f38a12432848e6013c57eb630103e7556cf79b705b53814c3949"}, + {file = "psycopg_binary-3.2.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fd40af959173ea0d087b6b232b855cfeaa6738f47cb2a0fd10a7f4fa8b74293f"}, + {file = "psycopg_binary-3.2.3-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:9b60b465773a52c7d4705b0a751f7f1cdccf81dd12aee3b921b31a6e76b07b0e"}, + {file = "psycopg_binary-3.2.3-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:fc6d87a1c44df8d493ef44988a3ded751e284e02cdf785f746c2d357e99782a6"}, + {file = "psycopg_binary-3.2.3-cp39-cp39-musllinux_1_2_ppc64le.whl", hash = "sha256:f0b018e37608c3bfc6039a1dc4eb461e89334465a19916be0153c757a78ea426"}, + {file = "psycopg_binary-3.2.3-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:2a29f5294b0b6360bfda69653697eff70aaf2908f58d1073b0acd6f6ab5b5a4f"}, + {file = "psycopg_binary-3.2.3-cp39-cp39-win_amd64.whl", hash = "sha256:e56b1fd529e5dde2d1452a7d72907b37ed1b4f07fdced5d8fb1e963acfff6749"}, +] + +[[package]] +name = "python-dateutil" +version = "2.9.0.post0" +description = "Extensions to the standard Python datetime module" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" +files = [ + {file = "python-dateutil-2.9.0.post0.tar.gz", hash = "sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3"}, + {file = "python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427"}, +] + +[package.dependencies] +six = ">=1.5" + +[[package]] +name = "python-dotenv" +version = "1.0.1" +description = "Read key-value pairs from a .env file and set them as environment variables" +optional = false +python-versions = ">=3.8" +files = [ + {file = "python-dotenv-1.0.1.tar.gz", hash = "sha256:e324ee90a023d808f1959c46bcbc04446a10ced277783dc6ee09987c37ec10ca"}, + {file = "python_dotenv-1.0.1-py3-none-any.whl", hash = "sha256:f7b63ef50f1b690dddf550d03497b66d609393b40b564ed0d674909a68ebf16a"}, +] + +[package.extras] +cli = ["click (>=5.0)"] + +[[package]] +name = "ruff" +version = "0.7.4" +description = "An extremely fast Python linter and code formatter, written in Rust." +optional = false +python-versions = ">=3.7" +files = [ + {file = "ruff-0.7.4-py3-none-linux_armv6l.whl", hash = "sha256:a4919925e7684a3f18e18243cd6bea7cfb8e968a6eaa8437971f681b7ec51478"}, + {file = "ruff-0.7.4-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:cfb365c135b830778dda8c04fb7d4280ed0b984e1aec27f574445231e20d6c63"}, + {file = "ruff-0.7.4-py3-none-macosx_11_0_arm64.whl", hash = "sha256:63a569b36bc66fbadec5beaa539dd81e0527cb258b94e29e0531ce41bacc1f20"}, + {file = "ruff-0.7.4-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0d06218747d361d06fd2fdac734e7fa92df36df93035db3dc2ad7aa9852cb109"}, + {file = "ruff-0.7.4-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:e0cea28d0944f74ebc33e9f934238f15c758841f9f5edd180b5315c203293452"}, + {file = "ruff-0.7.4-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:80094ecd4793c68b2571b128f91754d60f692d64bc0d7272ec9197fdd09bf9ea"}, + {file = "ruff-0.7.4-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:997512325c6620d1c4c2b15db49ef59543ef9cd0f4aa8065ec2ae5103cedc7e7"}, + {file = "ruff-0.7.4-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:00b4cf3a6b5fad6d1a66e7574d78956bbd09abfd6c8a997798f01f5da3d46a05"}, + {file = "ruff-0.7.4-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:7dbdc7d8274e1422722933d1edddfdc65b4336abf0b16dfcb9dedd6e6a517d06"}, + {file = "ruff-0.7.4-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0e92dfb5f00eaedb1501b2f906ccabfd67b2355bdf117fea9719fc99ac2145bc"}, + {file = "ruff-0.7.4-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:3bd726099f277d735dc38900b6a8d6cf070f80828877941983a57bca1cd92172"}, + {file = "ruff-0.7.4-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:2e32829c429dd081ee5ba39aef436603e5b22335c3d3fff013cd585806a6486a"}, + {file = "ruff-0.7.4-py3-none-musllinux_1_2_i686.whl", hash = "sha256:662a63b4971807623f6f90c1fb664613f67cc182dc4d991471c23c541fee62dd"}, + {file = "ruff-0.7.4-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:876f5e09eaae3eb76814c1d3b68879891d6fde4824c015d48e7a7da4cf066a3a"}, + {file = "ruff-0.7.4-py3-none-win32.whl", hash = "sha256:75c53f54904be42dd52a548728a5b572344b50d9b2873d13a3f8c5e3b91f5cac"}, + {file = "ruff-0.7.4-py3-none-win_amd64.whl", hash = "sha256:745775c7b39f914238ed1f1b0bebed0b9155a17cd8bc0b08d3c87e4703b990d6"}, + {file = "ruff-0.7.4-py3-none-win_arm64.whl", hash = "sha256:11bff065102c3ae9d3ea4dc9ecdfe5a5171349cdd0787c1fc64761212fc9cf1f"}, + {file = "ruff-0.7.4.tar.gz", hash = "sha256:cd12e35031f5af6b9b93715d8c4f40360070b2041f81273d0527683d5708fce2"}, +] + +[[package]] +name = "s3transfer" +version = "0.10.3" +description = "An Amazon S3 Transfer Manager" +optional = false +python-versions = ">=3.8" +files = [ + {file = "s3transfer-0.10.3-py3-none-any.whl", hash = "sha256:263ed587a5803c6c708d3ce44dc4dfedaab4c1a32e8329bab818933d79ddcf5d"}, + {file = "s3transfer-0.10.3.tar.gz", hash = "sha256:4f50ed74ab84d474ce614475e0b8d5047ff080810aac5d01ea25231cfc944b0c"}, +] + +[package.dependencies] +botocore = ">=1.33.2,<2.0a.0" + +[package.extras] +crt = ["botocore[crt] (>=1.33.2,<2.0a.0)"] + +[[package]] +name = "six" +version = "1.16.0" +description = "Python 2 and 3 compatibility utilities" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*" +files = [ + {file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"}, + {file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"}, +] + +[[package]] +name = "typing-extensions" +version = "4.12.2" +description = "Backported and Experimental Type Hints for Python 3.8+" +optional = false +python-versions = ">=3.8" +files = [ + {file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"}, + {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"}, +] + +[[package]] +name = "tzdata" +version = "2024.2" +description = "Provider of IANA time zone data" +optional = false +python-versions = ">=2" +files = [ + {file = "tzdata-2024.2-py2.py3-none-any.whl", hash = "sha256:a48093786cdcde33cad18c2555e8532f34422074448fbc874186f0abd79565cd"}, + {file = "tzdata-2024.2.tar.gz", hash = "sha256:7d85cc416e9382e69095b7bdf4afd9e3880418a2413feec7069d533d6b4e31cc"}, +] + +[[package]] +name = "urllib3" +version = "2.2.3" +description = "HTTP library with thread-safe connection pooling, file post, and more." +optional = false +python-versions = ">=3.8" +files = [ + {file = "urllib3-2.2.3-py3-none-any.whl", hash = "sha256:ca899ca043dcb1bafa3e262d73aa25c465bfb49e0bd9dd5d59f1d0acba2f8fac"}, + {file = "urllib3-2.2.3.tar.gz", hash = "sha256:e7d814a81dad81e6caf2ec9fdedb284ecc9c73076b62654547cc64ccdcae26e9"}, +] + +[package.extras] +brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"] +h2 = ["h2 (>=4,<5)"] +socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] +zstd = ["zstandard (>=0.18.0)"] + +[metadata] +lock-version = "2.0" +python-versions = "^3.12" +content-hash = "f03df58d71fc68afa3a117ecf4609fb7315b3c6ad61f8b6bd782dd4468bb31d4" diff --git a/lib/workload/stateless/stacks/pg-dd/pyproject.toml b/lib/workload/stateless/stacks/pg-dd/pyproject.toml new file mode 100644 index 000000000..945b56864 --- /dev/null +++ b/lib/workload/stateless/stacks/pg-dd/pyproject.toml @@ -0,0 +1,25 @@ +[tool.poetry] +name = "postgres-data-dump" +version = "0.1.0" +description = "A serivce to dump postgres databases to S3." +authors = ["Marko malenic "] +readme = "README.md" +packages = [{ include = "pg_dd" }] + +[tool.poetry.dependencies] +python = "^3.12" +boto3 = "^1" +psycopg = { version = "^3", extras = ["binary"] } +python-dotenv = "^1" + +[tool.poetry.group.dev.dependencies] +mypy = "^1" +mypy-boto3-s3 = "^1" +ruff = "^0.7" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" + +[tool.poetry.scripts] +local = "pg_dd.local:main" From d337dc6aef8cf7dcab1a5fe3b64d334f99b7a5b4 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Mon, 18 Nov 2024 12:15:57 +1100 Subject: [PATCH 2/8] feat(pg-dd): deploy Lambda function --- config/config.ts | 2 + config/stacks/pgDD.ts | 25 ++++ .../stateless/stacks/pg-dd/.dockerignore | 8 ++ .../stateless/stacks/pg-dd/.env.example | 2 +- .../stateless/stacks/pg-dd/.gitignore | 3 +- lib/workload/stateless/stacks/pg-dd/Makefile | 5 +- lib/workload/stateless/stacks/pg-dd/README.md | 46 ++++++- .../stateless/stacks/pg-dd/deploy/stack.ts | 115 ++++++++++++++++++ .../stateless/stacks/pg-dd/pg_dd/handler.py | 24 ++++ .../stateless/stacks/pg-dd/pg_dd/pg_dd.py | 65 +++++++--- .../stateless/stacks/pg-dd/poetry.lock | 20 ++- .../stateless/stacks/pg-dd/pyproject.toml | 3 +- .../statelessStackCollectionClass.ts | 10 ++ 13 files changed, 307 insertions(+), 21 deletions(-) create mode 100644 config/stacks/pgDD.ts create mode 100644 lib/workload/stateless/stacks/pg-dd/.dockerignore create mode 100644 lib/workload/stateless/stacks/pg-dd/deploy/stack.ts create mode 100644 lib/workload/stateless/stacks/pg-dd/pg_dd/handler.py diff --git a/config/config.ts b/config/config.ts index 691b0fcc2..f71c5a715 100644 --- a/config/config.ts +++ b/config/config.ts @@ -63,6 +63,7 @@ import { getOraCompressionIcav2PipelineTableStackProps, } from './stacks/oraCompressionPipelineManager'; import { getOraDecompressionManagerStackProps } from './stacks/oraDecompressionPipelineManager'; +import { getPgDDProps } from './stacks/pgDD'; interface EnvironmentConfig { name: string; @@ -130,6 +131,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null workflowManagerStackProps: getWorkflowManagerStackProps(stage), stackyMcStackFaceProps: getGlueStackProps(stage), fmAnnotatorProps: getFmAnnotatorProps(), + pgDDProps: getPgDDProps(stage), }, }; diff --git a/config/stacks/pgDD.ts b/config/stacks/pgDD.ts new file mode 100644 index 000000000..83f555f81 --- /dev/null +++ b/config/stacks/pgDD.ts @@ -0,0 +1,25 @@ +import { + accountIdAlias, + AppStage, + computeSecurityGroupName, + rdsMasterSecretName, + region, + vpcProps, +} from '../constants'; +import { PgDDStackProps } from '../../lib/workload/stateless/stacks/pg-dd/deploy/stack'; +import { getDataBucketStackProps } from './dataBucket'; + +export const getPgDDProps = (stage: AppStage): PgDDStackProps | undefined => { + const bucket = getDataBucketStackProps(stage); + if (bucket.bucketName === undefined) { + return undefined; + } else { + return { + bucket: bucket.bucketName, + prefix: 'pg-dd', + secretArn: `arn:aws:secretsmanager:${region}:${accountIdAlias.beta}:secret:${rdsMasterSecretName}`, // pragma: allowlist secret + lambdaSecurityGroupName: computeSecurityGroupName, + vpcProps, + }; + } +}; diff --git a/lib/workload/stateless/stacks/pg-dd/.dockerignore b/lib/workload/stateless/stacks/pg-dd/.dockerignore new file mode 100644 index 000000000..2863a8486 --- /dev/null +++ b/lib/workload/stateless/stacks/pg-dd/.dockerignore @@ -0,0 +1,8 @@ +deploy +.env +.env.example +.gitignore +Makefile +README.md +data +.ruff_cache diff --git a/lib/workload/stateless/stacks/pg-dd/.env.example b/lib/workload/stateless/stacks/pg-dd/.env.example index 16755c421..d91b99208 100644 --- a/lib/workload/stateless/stacks/pg-dd/.env.example +++ b/lib/workload/stateless/stacks/pg-dd/.env.example @@ -1,4 +1,4 @@ -PG_DD_URL=postgresql://orcabus:orcabus@0.0.0.0:5432/orcabus# pragma: allowlist secret +PG_DD_URL=postgresql://orcabus:orcabus@0.0.0.0:5432# pragma: allowlist secret PG_DD_DIR=data PG_DD_DATABASE_METADATA_MANAGER=metadata_manager diff --git a/lib/workload/stateless/stacks/pg-dd/.gitignore b/lib/workload/stateless/stacks/pg-dd/.gitignore index d70ad070a..df6a7adb9 100644 --- a/lib/workload/stateless/stacks/pg-dd/.gitignore +++ b/lib/workload/stateless/stacks/pg-dd/.gitignore @@ -1,2 +1,3 @@ .env -data \ No newline at end of file +data +.ruff_cache \ No newline at end of file diff --git a/lib/workload/stateless/stacks/pg-dd/Makefile b/lib/workload/stateless/stacks/pg-dd/Makefile index 17ddf8fc0..abc4cd293 100644 --- a/lib/workload/stateless/stacks/pg-dd/Makefile +++ b/lib/workload/stateless/stacks/pg-dd/Makefile @@ -10,4 +10,7 @@ check: lint @poetry run ruff check . local: install - @poetry run local \ No newline at end of file + @poetry run local + +clean: + rm -rf data && rm -rf .ruff_cache \ No newline at end of file diff --git a/lib/workload/stateless/stacks/pg-dd/README.md b/lib/workload/stateless/stacks/pg-dd/README.md index d82d23050..08f56e977 100644 --- a/lib/workload/stateless/stacks/pg-dd/README.md +++ b/lib/workload/stateless/stacks/pg-dd/README.md @@ -1,3 +1,47 @@ -# Pg-dd +# Postgres data dump Postgres data dump - a service that dumps (like `dd`) orcabus postgres databases to S3. + +## Usage + +Call the deployed function to update the current dump: + +```sh +aws lambda invoke --function-name orcabus-pg-dd response.json +``` + +This is setup to dump the metadata_manager, workflow_manager, sequence_run_manager, and 10000 of the most recent +rows of the filemanager database. + +## Configuration + +This function can be configured by setting the following environment variables, see [.env.example][env-example] for an example: + +| Name | Description | Type | +|--------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------| +| `PG_DD_URL` | The database URL to dump databases from. | Postgres connection string | +| `PG_DD_SECRET` | The secret name or ARN to fetch the database URL from. This is only used in the Lambda function, and overrides `PG_DD_URL`. | `string` | +| `PG_DD_DATABASE_` | A name of the database to dump records from where `` represents the target database. Specify this multiple times to use dump from multiple databases. | `string` | +| `PG_DD_DATABASE__SQL` | Custom SQL code to execute when dumping database records for ``. This is optional, and by default all records from all tables are dumped. Specify this is a list of SQL statements to generate a corresponding CSV file. | `string[]` or undefined | +| `PG_DD_BUCKET` | The bucket to dump data to. This is required when deploying the Lambda function. | `string` or undefined | +| `PG_DD_PREFIX` | The bucket prefix to use when writing to a bucket. This is optional. | `string` or undefined | +| `PG_DD_DIR` | The local filesystem directory to dump data to when running this command locally. This is not used on the deployed Lambda function. | filesystem directory or undefined | + +## Local development + +This project uses [poetry] to manage dependencies. + +The pg-dd command can be run locally to dump data to a directory: + +``` +make local +``` + +Run the linter and formatter: + +``` +make check +``` + +[poetry]: https://python-poetry.org/ +[env-example]: .env.example diff --git a/lib/workload/stateless/stacks/pg-dd/deploy/stack.ts b/lib/workload/stateless/stacks/pg-dd/deploy/stack.ts new file mode 100644 index 000000000..67f6b254b --- /dev/null +++ b/lib/workload/stateless/stacks/pg-dd/deploy/stack.ts @@ -0,0 +1,115 @@ +import { Duration, Stack, StackProps } from 'aws-cdk-lib'; +import { Construct } from 'constructs'; +import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha'; +import path from 'path'; +import { Architecture, Runtime } from 'aws-cdk-lib/aws-lambda'; +import { + ISecurityGroup, + IVpc, + SecurityGroup, + SubnetType, + Vpc, + VpcLookupOptions, +} from 'aws-cdk-lib/aws-ec2'; +import { NamedLambdaRole } from '../../../../components/named-lambda-role'; +import { ManagedPolicy, PolicyStatement, Role } from 'aws-cdk-lib/aws-iam'; +import { readFileSync } from 'fs'; + +/** + * Props for the PgDD stack. + */ +export type PgDDStackProps = { + /** + * The bucket to dump data to. + */ + bucket: string; + /** + * Secret to connect to database with. + */ + secretArn: string; + /** + * The key prefix when writing data. + */ + prefix?: string; + /** + * Props to lookup the VPC with. + */ + vpcProps: VpcLookupOptions; + /** + * Existing security group name to be attached on lambda. + */ + lambdaSecurityGroupName: string; +}; + +/** + * Deploy the PgDD stack. + */ +export class PgDDStack extends Stack { + private readonly vpc: IVpc; + private readonly securityGroup: ISecurityGroup; + private readonly role: Role; + + constructor(scope: Construct, id: string, props: StackProps & PgDDStackProps) { + super(scope, id, props); + + this.vpc = Vpc.fromLookup(this, 'MainVpc', props.vpcProps); + this.securityGroup = SecurityGroup.fromLookupByName( + this, + 'OrcaBusLambdaSecurityGroup', + props.lambdaSecurityGroupName, + this.vpc + ); + + this.role = new NamedLambdaRole(this, 'Role'); + this.role.addManagedPolicy( + ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSLambdaVPCAccessExecutionRole') + ); + this.role.addToPolicy( + new PolicyStatement({ + actions: ['s3:PutObject'], + resources: [`arn:aws:s3:::${props.bucket}`, `arn:aws:s3:::${props.bucket}/*`], + }) + ); + this.role.addToPolicy( + new PolicyStatement({ + actions: ['secretsmanager:GetSecretValue'], + resources: [`${props.secretArn}-*`], + }) + ); + + const securityGroup = new SecurityGroup(this, 'SecurityGroup', { + vpc: this.vpc, + allowAllOutbound: true, + description: 'Security group that allows the PgDD Lambda function to egress out.', + }); + + const entry = path.join(__dirname, '..'); + new PythonFunction(this, 'function', { + entry, + functionName: 'orcabus-pg-dd', + index: 'pg_dd/handler.py', + runtime: Runtime.PYTHON_3_12, + architecture: Architecture.ARM_64, + timeout: Duration.minutes(15), + memorySize: 1024, + vpc: this.vpc, + vpcSubnets: { + subnetType: SubnetType.PRIVATE_WITH_EGRESS, + }, + bundling: { + assetExcludes: [...readFileSync(path.join(entry, '.dockerignore'), 'utf-8').split('\n')], + }, + role: this.role, + securityGroups: [securityGroup, this.securityGroup], + environment: { + PG_DD_SECRET: props.secretArn, + PG_DD_BUCKET: props.bucket, + PG_DD_DATABASE_METADATA_MANAGER: 'metadata_manager', + PG_DD_DATABASE_SEQUENCE_RUN_MANAGER: 'sequence_run_manager', + PG_DD_DATABASE_WORKFLOW_MANAGER: 'workflow_manager', + PG_DD_DATABASE_FILEMANAGER: 'filemanager', + ...(props.prefix && { PG_DD_PREFIX: props.prefix }), + }, + }); + } +} diff --git a/lib/workload/stateless/stacks/pg-dd/pg_dd/handler.py b/lib/workload/stateless/stacks/pg-dd/pg_dd/handler.py new file mode 100644 index 000000000..5ffc7bd9e --- /dev/null +++ b/lib/workload/stateless/stacks/pg-dd/pg_dd/handler.py @@ -0,0 +1,24 @@ +import json +import logging +import os +from types import SimpleNamespace + +from libumccr.aws import libsm + +from pg_dd.pg_dd import PgDDS3 + +logger = logging.getLogger(__name__) + +try: + secret_str = libsm.get_secret(os.getenv("PG_DD_SECRET")) + secret = json.loads(secret_str, object_hook=lambda d: SimpleNamespace(**d)) + os.environ["PG_DD_URL"] = ( + f"{secret.engine}://{secret.username}:{secret.password}@{secret.host}:{secret.port}" + ) +except Exception as e: + logger.error(f"retrieving database url from secrets manager: {e}") + raise e + + +def handler(_event, _context): + PgDDS3(logger=logger).write_to_bucket() diff --git a/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py b/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py index 4c306b78b..f9e465bc5 100644 --- a/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py +++ b/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py @@ -1,3 +1,5 @@ +import gzip +import logging import os from typing import Dict, List, Any @@ -12,11 +14,20 @@ class PgDD: - def __init__(self): + """ + A class to dump postgres databases to CSV files. + """ + + def __init__(self, logger=logging.getLogger(__name__)): self.databases = self.read_databases() + self.logger = logger @staticmethod def read_databases() -> Dict[str, Dict[str, Any]]: + """ + Read the databases to dump from env variables. + """ + prefix = "PG_DD_DATABASE_" sql_prefix = "_SQL" variables = {} @@ -38,6 +49,10 @@ def read_databases() -> Dict[str, Dict[str, Any]]: def copy_tables_to_csv( cur: psycopg.cursor.Cursor, tables: List[str] ) -> Dict[str, str]: + """ + Get tables as a csv string. + """ + csvs = {} for table in tables: rows = [] @@ -57,11 +72,14 @@ def copy_tables_to_csv( return csvs def csvs_for_tables(self) -> Dict[str, Dict[str, str]]: + """ + Get csvs for all tables in all databases. + """ + database_url = os.getenv("PG_DD_URL") databases = {} for entry in self.databases.values(): - url = database_url.rsplit("/", 1)[0] - url = f"{url}/{entry['database']}" + url = f"{database_url}/{entry['database']}" conn: psycopg.connection.Connection with psycopg.connect(url) as conn: @@ -73,6 +91,7 @@ def csvs_for_tables(self) -> Dict[str, Dict[str, str]]: """ ) tables = [name[0] for name in cur.fetchall()] + self.logger.debug(f"fetched table names: {tables}") with conn.cursor() as cur: databases[entry["database"]] = self.copy_tables_to_csv(cur, tables) @@ -81,39 +100,55 @@ def csvs_for_tables(self) -> Dict[str, Dict[str, str]]: class PgDDLocal(PgDD): - def __init__(self): - super().__init__() + """ + Dump CSV files to a local directory. + """ + + def __init__(self, logger=logging.getLogger(__name__)): + super().__init__(logger=logger) self.out = os.getenv("PG_DD_DIR") def write_to_dir(self) -> None: + """ + Write the CSV files to the output directory. + """ + for database, tables in self.csvs_for_tables().items(): output_dir = f"{self.out}/{database}" if not os.path.exists(output_dir): os.makedirs(output_dir) + self.logger.debug(f"writing to directory: {output_dir}") + for table, value in tables.items(): - with open(f"{self.out}/{database}/{table}.csv", "w") as f: - f.write(value) + with open(f"{self.out}/{database}/{table}.csv.gz", "wb") as f: + f.write(gzip.compress(str.encode(value))) class PgDDS3(PgDD): - def __init__(self): - super().__init__() + """ + Dump CSV files to an S3 bucket. + """ + + def __init__(self, logger=logging.getLogger(__name__)): + super().__init__(logger=logger) self.bucket = os.getenv("PG_DD_BUCKET") self.prefix = os.getenv("PG_DD_PREFIX") def write_to_bucket(self) -> None: + """ + Write the CSV files to the S3 bucket. + """ + s3: S3ServiceResource = boto3.resource("s3") for database, tables in self.csvs_for_tables().items(): for table, value in tables.items(): - key = f"{database}/{table}.csv" + key = f"{database}/{table}.csv.gz" if self.prefix: key = f"{self.prefix}/{key}" - s3_object = s3.Object(self.bucket, key) - s3_object.put(Body=value) - + self.logger.debug(f"writing to bucket with key: {key}") -def handler(): - PgDDS3().write_to_bucket() + s3_object = s3.Object(self.bucket, key) + s3_object.put(Body=gzip.compress(str.encode(value))) diff --git a/lib/workload/stateless/stacks/pg-dd/poetry.lock b/lib/workload/stateless/stacks/pg-dd/poetry.lock index dff49a018..fabde26b2 100644 --- a/lib/workload/stateless/stacks/pg-dd/poetry.lock +++ b/lib/workload/stateless/stacks/pg-dd/poetry.lock @@ -49,6 +49,24 @@ files = [ {file = "jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe"}, ] +[[package]] +name = "libumccr" +version = "0.4.1" +description = "UMCCR Reusable Python modules" +optional = false +python-versions = ">=3.9" +files = [ + {file = "libumccr-0.4.1-py3-none-any.whl", hash = "sha256:20e06ec4b99ff6d68f9e21b72da10c16e33933314e20f53abe2983d03f3a080e"}, + {file = "libumccr-0.4.1.tar.gz", hash = "sha256:0c6963bbd164eaf799acdb8cda28ba5b5a49bf06a552978901b49802da4fc5ec"}, +] + +[package.extras] +all = ["Django", "boto3", "botocore", "cachetools", "google-auth", "gspread", "gspread-pandas", "pandas", "requests"] +aws = ["boto3", "botocore", "cachetools"] +dev = ["build", "detect-secrets", "pdoc3", "pipdeptree", "pre-commit", "setuptools", "twine", "wheel"] +libgdrive = ["cachetools", "google-auth", "gspread", "gspread-pandas", "pandas", "requests"] +test = ["awscli-local", "cachetools", "flake8", "mockito", "nose2", "pytest", "pytest-cov", "tox"] + [[package]] name = "mypy" version = "1.13.0" @@ -345,4 +363,4 @@ zstd = ["zstandard (>=0.18.0)"] [metadata] lock-version = "2.0" python-versions = "^3.12" -content-hash = "f03df58d71fc68afa3a117ecf4609fb7315b3c6ad61f8b6bd782dd4468bb31d4" +content-hash = "b35be5685a82a2219a11fda89d02d5868fe1f311462c6e32297961021b7a2e89" diff --git a/lib/workload/stateless/stacks/pg-dd/pyproject.toml b/lib/workload/stateless/stacks/pg-dd/pyproject.toml index 945b56864..a3545aab1 100644 --- a/lib/workload/stateless/stacks/pg-dd/pyproject.toml +++ b/lib/workload/stateless/stacks/pg-dd/pyproject.toml @@ -11,10 +11,11 @@ python = "^3.12" boto3 = "^1" psycopg = { version = "^3", extras = ["binary"] } python-dotenv = "^1" +libumccr = "^0.4" +mypy-boto3-s3 = "^1" [tool.poetry.group.dev.dependencies] mypy = "^1" -mypy-boto3-s3 = "^1" ruff = "^0.7" [build-system] diff --git a/lib/workload/stateless/statelessStackCollectionClass.ts b/lib/workload/stateless/statelessStackCollectionClass.ts index 01fa31e18..d10167501 100644 --- a/lib/workload/stateless/statelessStackCollectionClass.ts +++ b/lib/workload/stateless/statelessStackCollectionClass.ts @@ -79,6 +79,7 @@ import { OraDecompressionManagerStack, OraDecompressionManagerStackProps, } from './stacks/ora-decompression-manager/deploy'; +import { PgDDStack, PgDDStackProps } from './stacks/pg-dd/deploy/stack'; export interface StatelessStackCollectionProps { metadataManagerStackProps: MetadataManagerStackProps; @@ -104,6 +105,7 @@ export interface StatelessStackCollectionProps { workflowManagerStackProps: WorkflowManagerStackProps; stackyMcStackFaceProps: GlueStackProps; fmAnnotatorProps: FMAnnotatorConfigurableProps; + pgDDProps?: PgDDStackProps; } export class StatelessStackCollection { @@ -131,6 +133,7 @@ export class StatelessStackCollection { readonly workflowManagerStack: Stack; readonly stackyMcStackFaceStack: Stack; readonly fmAnnotator: Stack; + readonly pgDDStack: Stack; constructor( scope: Construct, @@ -309,6 +312,13 @@ export class StatelessStackCollection { ...statelessConfiguration.fmAnnotatorProps, domainName: fileManagerStack.domainName, }); + + if (statelessConfiguration.pgDDProps) { + this.pgDDStack = new PgDDStack(scope, 'PgDDStack', { + ...this.createTemplateProps(env, 'PgDDStack'), + ...statelessConfiguration.pgDDProps, + }); + } } /** From 3395e7f4717b27aaa85b2f04c9bb5ae6ec7803b7 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Tue, 19 Nov 2024 13:51:04 +1100 Subject: [PATCH 3/8] feat(pg-dd): add subcommands for loading, dumping, uploading and downloading --- .../stateless/stacks/pg-dd/.dockerignore | 1 - .../stateless/stacks/pg-dd/.env.example | 3 +- .../stateless/stacks/pg-dd/.gitignore | 3 +- .../stateless/stacks/pg-dd/Dockerfile | 15 ++ lib/workload/stateless/stacks/pg-dd/Makefile | 4 +- lib/workload/stateless/stacks/pg-dd/README.md | 21 +- .../stateless/stacks/pg-dd/deploy/stack.ts | 3 +- .../stateless/stacks/pg-dd/pg_dd/cli.py | 43 ++++ .../stateless/stacks/pg-dd/pg_dd/local.py | 14 -- .../stateless/stacks/pg-dd/pg_dd/pg_dd.py | 231 +++++++++++++----- .../stateless/stacks/pg-dd/poetry.lock | 41 +++- .../stateless/stacks/pg-dd/pyproject.toml | 3 +- 12 files changed, 281 insertions(+), 101 deletions(-) create mode 100644 lib/workload/stateless/stacks/pg-dd/Dockerfile create mode 100644 lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py delete mode 100644 lib/workload/stateless/stacks/pg-dd/pg_dd/local.py diff --git a/lib/workload/stateless/stacks/pg-dd/.dockerignore b/lib/workload/stateless/stacks/pg-dd/.dockerignore index 2863a8486..dd833bbf9 100644 --- a/lib/workload/stateless/stacks/pg-dd/.dockerignore +++ b/lib/workload/stateless/stacks/pg-dd/.dockerignore @@ -2,7 +2,6 @@ deploy .env .env.example .gitignore -Makefile README.md data .ruff_cache diff --git a/lib/workload/stateless/stacks/pg-dd/.env.example b/lib/workload/stateless/stacks/pg-dd/.env.example index d91b99208..4c98ec346 100644 --- a/lib/workload/stateless/stacks/pg-dd/.env.example +++ b/lib/workload/stateless/stacks/pg-dd/.env.example @@ -5,4 +5,5 @@ PG_DD_DATABASE_METADATA_MANAGER=metadata_manager PG_DD_DATABASE_SEQUENCE_RUN_MANAGER=sequence_run_manager PG_DD_DATABASE_WORKFLOW_MANAGER=workflow_manager PG_DD_DATABASE_FILEMANAGER=filemanager -PG_DD_DATABASE_FILEMANAGER_SQL="select * from s3_object" \ No newline at end of file +PG_DD_DATABASE_FILEMANAGER_SQL_DUMP="select * from s3_object order by sequencer limit 10000" +PG_DD_DATABASE_FILEMANAGER_SQL_LOAD="s3_object" \ No newline at end of file diff --git a/lib/workload/stateless/stacks/pg-dd/.gitignore b/lib/workload/stateless/stacks/pg-dd/.gitignore index df6a7adb9..7e63edf22 100644 --- a/lib/workload/stateless/stacks/pg-dd/.gitignore +++ b/lib/workload/stateless/stacks/pg-dd/.gitignore @@ -1,3 +1,4 @@ .env data -.ruff_cache \ No newline at end of file +.ruff_cache +response.json \ No newline at end of file diff --git a/lib/workload/stateless/stacks/pg-dd/Dockerfile b/lib/workload/stateless/stacks/pg-dd/Dockerfile new file mode 100644 index 000000000..6de2f8c3d --- /dev/null +++ b/lib/workload/stateless/stacks/pg-dd/Dockerfile @@ -0,0 +1,15 @@ +# This Dockerfile is intended to be used as part of a Docker Compose setup. +# When running this microservice from the Docker Compose root, this Dockerfile +# will build the image, install dependencies, and start the server + +FROM public.ecr.aws/docker/library/python:3.12 + +ARG POETRY_VERSION=1.8 +RUN pip install "poetry==${POETRY_VERSION}" + +WORKDIR /app + +COPY . . +RUN poetry install --no-root + +ENTRYPOINT ["/bin/bash", "-c", "make", "local"] \ No newline at end of file diff --git a/lib/workload/stateless/stacks/pg-dd/Makefile b/lib/workload/stateless/stacks/pg-dd/Makefile index abc4cd293..d5693b49b 100644 --- a/lib/workload/stateless/stacks/pg-dd/Makefile +++ b/lib/workload/stateless/stacks/pg-dd/Makefile @@ -9,8 +9,8 @@ lint: install check: lint @poetry run ruff check . -local: install - @poetry run local +cli: install + @poetry run cli clean: rm -rf data && rm -rf .ruff_cache \ No newline at end of file diff --git a/lib/workload/stateless/stacks/pg-dd/README.md b/lib/workload/stateless/stacks/pg-dd/README.md index 08f56e977..2c25176ae 100644 --- a/lib/workload/stateless/stacks/pg-dd/README.md +++ b/lib/workload/stateless/stacks/pg-dd/README.md @@ -17,15 +17,16 @@ rows of the filemanager database. This function can be configured by setting the following environment variables, see [.env.example][env-example] for an example: -| Name | Description | Type | -|--------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------| -| `PG_DD_URL` | The database URL to dump databases from. | Postgres connection string | -| `PG_DD_SECRET` | The secret name or ARN to fetch the database URL from. This is only used in the Lambda function, and overrides `PG_DD_URL`. | `string` | -| `PG_DD_DATABASE_` | A name of the database to dump records from where `` represents the target database. Specify this multiple times to use dump from multiple databases. | `string` | -| `PG_DD_DATABASE__SQL` | Custom SQL code to execute when dumping database records for ``. This is optional, and by default all records from all tables are dumped. Specify this is a list of SQL statements to generate a corresponding CSV file. | `string[]` or undefined | -| `PG_DD_BUCKET` | The bucket to dump data to. This is required when deploying the Lambda function. | `string` or undefined | -| `PG_DD_PREFIX` | The bucket prefix to use when writing to a bucket. This is optional. | `string` or undefined | -| `PG_DD_DIR` | The local filesystem directory to dump data to when running this command locally. This is not used on the deployed Lambda function. | filesystem directory or undefined | +| Name | Description | Type | +|-------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------| +| `PG_DD_URL` | The database URL to dump databases from. | Postgres connection string | +| `PG_DD_SECRET` | The secret name or ARN to fetch the database URL from. This is only used in the Lambda function, and overrides `PG_DD_URL`. | `string` | +| `PG_DD_DATABASE_` | A name of the database to dump records from where `` represents the target database. Specify this multiple times to use dump from multiple databases. | `string` | +| `PG_DD_DATABASE__SQL_DUMP` | Custom SQL code to execute when dumping database records for ``. This is optional, and by default all records from all tables are dumped. Specify this is a list of SQL statements to generate a corresponding CSV file. | `string[]` or undefined | +| `PG_DD_DATABASE__SQL_LOAD` | The name of the table to load into for ``. This is required if loading data after dumping with `` to specify the table to load data into. | `string[]` or undefined | +| `PG_DD_BUCKET` | The bucket to dump data to. This is required when deploying the Lambda function. | `string` or undefined | +| `PG_DD_PREFIX` | The bucket prefix to use when writing to a bucket. This is optional. | `string` or undefined | +| `PG_DD_DIR` | The local filesystem directory to dump data to when running this command locally. This is not used on the deployed Lambda function. | filesystem directory or undefined | ## Local development @@ -34,7 +35,7 @@ This project uses [poetry] to manage dependencies. The pg-dd command can be run locally to dump data to a directory: ``` -make local +make cli ``` Run the linter and formatter: diff --git a/lib/workload/stateless/stacks/pg-dd/deploy/stack.ts b/lib/workload/stateless/stacks/pg-dd/deploy/stack.ts index 67f6b254b..a5a924d70 100644 --- a/lib/workload/stateless/stacks/pg-dd/deploy/stack.ts +++ b/lib/workload/stateless/stacks/pg-dd/deploy/stack.ts @@ -90,7 +90,7 @@ export class PgDDStack extends Stack { index: 'pg_dd/handler.py', runtime: Runtime.PYTHON_3_12, architecture: Architecture.ARM_64, - timeout: Duration.minutes(15), + timeout: Duration.minutes(5), memorySize: 1024, vpc: this.vpc, vpcSubnets: { @@ -108,6 +108,7 @@ export class PgDDStack extends Stack { PG_DD_DATABASE_SEQUENCE_RUN_MANAGER: 'sequence_run_manager', PG_DD_DATABASE_WORKFLOW_MANAGER: 'workflow_manager', PG_DD_DATABASE_FILEMANAGER: 'filemanager', + PG_DD_DATABASE_FILEMANAGER_SQL: 'select * from s3_object order by sequencer limit 10000', ...(props.prefix && { PG_DD_PREFIX: props.prefix }), }, }); diff --git a/lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py b/lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py new file mode 100644 index 000000000..4ab9ce209 --- /dev/null +++ b/lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py @@ -0,0 +1,43 @@ +from pg_dd.pg_dd import PgDDLocal, PgDDS3 +import click + + +@click.group() +def cli(): + pass + + +@cli.command() +def download(): + """ + Download S3 CSV dumps to the local directory. + """ + PgDDS3().download_local() + + +@cli.command() +def upload(): + """ + Uploads local CSV dumps to S3. + """ + PgDDS3().write_to_bucket() + + +@cli.command() +def dump(): + """ + Dump from the local database to CSV files. + """ + PgDDLocal().write_to_dir() + + +@cli.command() +def load(): + """ + Load local CSV files into the database + """ + PgDDLocal().load_to_database() + + +if __name__ == "__main__": + cli() diff --git a/lib/workload/stateless/stacks/pg-dd/pg_dd/local.py b/lib/workload/stateless/stacks/pg-dd/pg_dd/local.py deleted file mode 100644 index 872129ab1..000000000 --- a/lib/workload/stateless/stacks/pg-dd/pg_dd/local.py +++ /dev/null @@ -1,14 +0,0 @@ -import os - -from pg_dd.pg_dd import PgDDS3, PgDDLocal - - -def main(): - if os.getenv("PG_DD_BUCKET"): - PgDDS3().write_to_bucket() - else: - PgDDLocal().write_to_dir() - - -if __name__ == "__main__": - main() diff --git a/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py b/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py index f9e465bc5..c8c601f3c 100644 --- a/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py +++ b/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py @@ -1,7 +1,7 @@ import gzip import logging import os -from typing import Dict, List, Any +from typing import Dict, List, Any, Tuple, LiteralString import boto3 import psycopg @@ -19,9 +19,87 @@ class PgDD: """ def __init__(self, logger=logging.getLogger(__name__)): + self.url = os.getenv("PG_DD_URL") self.databases = self.read_databases() self.logger = logger + def csvs_for_tables(self) -> Dict[str, Dict[str, str]]: + """ + Get csvs for all tables in all databases. + """ + + databases = {} + for entry in self.databases.values(): + url = f"{self.url}/{entry['database']}" + + conn: psycopg.connection.Connection + with psycopg.connect(url) as conn: + if entry.get("sql_dump") is not None: + tables = [(i, e, True) for i, e in enumerate(entry["sql_dump"])] + else: + with conn.cursor() as cur: + cur.execute( + """ + select table_name from information_schema.tables + where table_schema='public'; + """ + ) + tables = [(name[0], name[0], False) for name in cur.fetchall()] + self.logger.info(f"fetched table names: {tables}") + + with conn.cursor() as cur: + databases[entry["database"]] = self.copy_tables_to_csv(cur, tables) + + return databases + + def load_table(self, table, data, conn, only_non_empty=True): + """ + Load a table with the CSV data. + """ + + with conn.cursor() as cur: + if only_non_empty: + exists = cur.execute( + sql.SQL( + """ + select exists( + select from pg_tables where tablename = '{}' + ); + """ + ).format(sql.SQL(table)) + ).fetchone()[0] + has_records = cur.execute( + sql.SQL( + """ + select exists(select * from {}) + """ + ).format(sql.SQL(table)) + ).fetchone()[0] + + if not exists or has_records: + return + + with cur.copy( + sql.SQL( + """ + copy {} from stdin with (format csv, header); + """ + ).format(sql.Identifier(table)), + ) as copy: + copy.write(data) + + def target_files(self) -> List[Tuple[str, str, str, str]]: + """ + Get the target files for all directories. + """ + + files = [] + for database, tables in self.csvs_for_tables().items(): + for table, value in tables.items(): + file = f"{database}/{table}.csv.gz" + files += [(database, table, file, value)] + return files + @staticmethod def read_databases() -> Dict[str, Dict[str, Any]]: """ @@ -29,17 +107,28 @@ def read_databases() -> Dict[str, Dict[str, Any]]: """ prefix = "PG_DD_DATABASE_" - sql_prefix = "_SQL" + sql_dump_prefix = "_SQL_DUMP" + sql_load_prefix = "_SQL_LOAD" variables = {} for key, value in os.environ.items(): if key[: len(prefix)] == prefix: database = key[len(prefix) :] - suffix = database[-len(sql_prefix) :] - database = database.removesuffix(sql_prefix) + suffix_dump = database[-len(sql_dump_prefix) :] + suffix_load = database[-len(sql_load_prefix) :] + + database = database.removesuffix(sql_dump_prefix).removesuffix( + sql_load_prefix + ) variables.setdefault(database, {}) - if suffix == sql_prefix: - variables[database]["sql"] = [s.strip() for s in value.split(",")] + if suffix_dump == sql_dump_prefix: + variables[database]["sql_dump"] = [ + s.strip() for s in value.split(",") + ] + elif suffix_load == sql_load_prefix: + variables[database]["sql_load"] = [ + s.strip() for s in value.split(",") + ] else: variables[database]["database"] = database.lower() @@ -47,57 +136,38 @@ def read_databases() -> Dict[str, Dict[str, Any]]: @staticmethod def copy_tables_to_csv( - cur: psycopg.cursor.Cursor, tables: List[str] + cur: psycopg.cursor.Cursor, tables: List[Tuple[str, str, bool]] ) -> Dict[str, str]: """ Get tables as a csv string. """ csvs = {} - for table in tables: - rows = [] - copy: psycopg.Copy - with cur.copy( - sql.SQL( + for name, table, is_statement in tables: + if is_statement: + statement = sql.SQL( + """ + copy ({}) to stdout with (format csv, header); + """ + ) + else: + statement = sql.SQL( """ copy {} to stdout with (format csv, header); """ - ).format(sql.Identifier(table)) - ) as copy: + ) + + rows = [] + copy: psycopg.Copy + table: LiteralString = table + with cur.copy(statement.format(sql.SQL(table))) as copy: for row in copy: rows += [row.tobytes().decode("utf-8")] - csvs[table] = "".join(rows) + csvs[name] = "".join(rows) return csvs - def csvs_for_tables(self) -> Dict[str, Dict[str, str]]: - """ - Get csvs for all tables in all databases. - """ - - database_url = os.getenv("PG_DD_URL") - databases = {} - for entry in self.databases.values(): - url = f"{database_url}/{entry['database']}" - - conn: psycopg.connection.Connection - with psycopg.connect(url) as conn: - with conn.cursor() as cur: - cur.execute( - """ - select table_name from information_schema.tables - where table_schema='public'; - """ - ) - tables = [name[0] for name in cur.fetchall()] - self.logger.debug(f"fetched table names: {tables}") - - with conn.cursor() as cur: - databases[entry["database"]] = self.copy_tables_to_csv(cur, tables) - - return databases - class PgDDLocal(PgDD): """ @@ -107,48 +177,85 @@ class PgDDLocal(PgDD): def __init__(self, logger=logging.getLogger(__name__)): super().__init__(logger=logger) self.out = os.getenv("PG_DD_DIR") + self.bucket = os.getenv("PG_DD_BUCKET") + self.prefix = os.getenv("PG_DD_PREFIX") + self.s3: S3ServiceResource = boto3.resource("s3") - def write_to_dir(self) -> None: + def write_to_dir(self): """ Write the CSV files to the output directory. """ - for database, tables in self.csvs_for_tables().items(): - output_dir = f"{self.out}/{database}" - if not os.path.exists(output_dir): - os.makedirs(output_dir) + for _, _, f, value in self.target_files(): + file = f"{self.out}/{f}" + os.makedirs(file.rsplit("/", 1)[0], exist_ok=True) + self.logger.info(f"writing to file: {f}") - self.logger.debug(f"writing to directory: {output_dir}") + with open(file, "wb") as f: + f.write(gzip.compress(str.encode(value))) - for table, value in tables.items(): - with open(f"{self.out}/{database}/{table}.csv.gz", "wb") as f: - f.write(gzip.compress(str.encode(value))) + def load_to_database(self): + """ + Download from S3 CSV files to load. + """ + + def load_files(): + for root, _, files in os.walk(f"{self.out}/{database}"): + for file in files: + with open(f"{root}/{file}", "rb") as f: + table = file.removesuffix(".csv.gz") + load = self.databases[database.upper()].get("sql_load") + if load is not None: + table = load[int(table)] + + self.load_table( + table, gzip.decompress(f.read()).decode("utf-8"), conn + ) + + for _, dirs, _ in os.walk(self.out): + for database in dirs: + conn: psycopg.connection.Connection + with psycopg.connect(f"{self.url}/{database}") as conn: + conn.set_deferrable(True) + load_files() + conn.commit() class PgDDS3(PgDD): """ - Dump CSV files to an S3 bucket. + Commands related to running this inside a Lambda function. """ def __init__(self, logger=logging.getLogger(__name__)): super().__init__(logger=logger) self.bucket = os.getenv("PG_DD_BUCKET") self.prefix = os.getenv("PG_DD_PREFIX") + self.dir = os.getenv("PG_DD_DIR") + self.s3: S3ServiceResource = boto3.resource("s3") - def write_to_bucket(self) -> None: + def write_to_bucket(self): """ Write the CSV files to the S3 bucket. """ - s3: S3ServiceResource = boto3.resource("s3") - for database, tables in self.csvs_for_tables().items(): - for table, value in tables.items(): - key = f"{database}/{table}.csv.gz" + for _, _, key, value in self.target_files(): + if self.prefix: + key = f"{self.prefix}/{key}" + + self.logger.info(f"writing to bucket with key: {key}") + + s3_object = self.s3.Object(self.bucket, key) + s3_object.put(Body=gzip.compress(str.encode(value))) - if self.prefix: - key = f"{self.prefix}/{key}" + def download_local(self): + """ + Download from S3 CSV files to load. + """ - self.logger.debug(f"writing to bucket with key: {key}") + for _, _, f, _ in self.target_files(): + self.logger.info(f"target file: {f}") + file = f"{self.dir}/{f}" + os.makedirs(file.rsplit("/", 1)[0], exist_ok=True) - s3_object = s3.Object(self.bucket, key) - s3_object.put(Body=gzip.compress(str.encode(value))) + s3_object = self.s3.Object(self.bucket, f"{self.prefix}/{f}") + s3_object.download_file(f"{self.dir}/{f}") diff --git a/lib/workload/stateless/stacks/pg-dd/poetry.lock b/lib/workload/stateless/stacks/pg-dd/poetry.lock index fabde26b2..26b9fc095 100644 --- a/lib/workload/stateless/stacks/pg-dd/poetry.lock +++ b/lib/workload/stateless/stacks/pg-dd/poetry.lock @@ -2,17 +2,17 @@ [[package]] name = "boto3" -version = "1.35.63" +version = "1.35.64" description = "The AWS SDK for Python" optional = false python-versions = ">=3.8" files = [ - {file = "boto3-1.35.63-py3-none-any.whl", hash = "sha256:d0f938d4f6f392b6ffc5e75fff14a42e5bbb5228675a0367c8af55398abadbec"}, - {file = "boto3-1.35.63.tar.gz", hash = "sha256:deb593d9a0fb240deb4c43e4da8e6626d7c36be7b2fd2fe28f49d44d395b7de0"}, + {file = "boto3-1.35.64-py3-none-any.whl", hash = "sha256:cdacf03fc750caa3aa0dbf6158166def9922c9d67b4160999ff8fc350662facc"}, + {file = "boto3-1.35.64.tar.gz", hash = "sha256:bc3fc12b41fa2c91e51ab140f74fb1544408a2b1e00f88a4c2369a66d18ddf20"}, ] [package.dependencies] -botocore = ">=1.35.63,<1.36.0" +botocore = ">=1.35.64,<1.36.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.10.0,<0.11.0" @@ -21,13 +21,13 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.35.63" +version = "1.35.64" description = "Low-level, data-driven core of boto 3." optional = false python-versions = ">=3.8" files = [ - {file = "botocore-1.35.63-py3-none-any.whl", hash = "sha256:0ca1200694a4c0a3fa846795d8e8a08404c214e21195eb9e010c4b8a4ca78a4a"}, - {file = "botocore-1.35.63.tar.gz", hash = "sha256:2b8196bab0a997d206c3d490b52e779ef47dffb68c57c685443f77293aca1589"}, + {file = "botocore-1.35.64-py3-none-any.whl", hash = "sha256:bbd96bf7f442b1d5e35b36f501076e4a588c83d8d84a1952e9ee1d767e5efb3e"}, + {file = "botocore-1.35.64.tar.gz", hash = "sha256:2f95c83f31c9e38a66995c88810fc638c829790e125032ba00ab081a2cf48cb9"}, ] [package.dependencies] @@ -38,6 +38,31 @@ urllib3 = {version = ">=1.25.4,<2.2.0 || >2.2.0,<3", markers = "python_version > [package.extras] crt = ["awscrt (==0.22.0)"] +[[package]] +name = "click" +version = "8.1.7" +description = "Composable command line interface toolkit" +optional = false +python-versions = ">=3.7" +files = [ + {file = "click-8.1.7-py3-none-any.whl", hash = "sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28"}, + {file = "click-8.1.7.tar.gz", hash = "sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "platform_system == \"Windows\""} + +[[package]] +name = "colorama" +version = "0.4.6" +description = "Cross-platform colored terminal text." +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +files = [ + {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, + {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, +] + [[package]] name = "jmespath" version = "1.0.1" @@ -363,4 +388,4 @@ zstd = ["zstandard (>=0.18.0)"] [metadata] lock-version = "2.0" python-versions = "^3.12" -content-hash = "b35be5685a82a2219a11fda89d02d5868fe1f311462c6e32297961021b7a2e89" +content-hash = "7b6452562763ec8f1bcf43ad6f8cfaae7702c8190d90d8ec142eceb5df7294df" diff --git a/lib/workload/stateless/stacks/pg-dd/pyproject.toml b/lib/workload/stateless/stacks/pg-dd/pyproject.toml index a3545aab1..d62b09c2e 100644 --- a/lib/workload/stateless/stacks/pg-dd/pyproject.toml +++ b/lib/workload/stateless/stacks/pg-dd/pyproject.toml @@ -13,6 +13,7 @@ psycopg = { version = "^3", extras = ["binary"] } python-dotenv = "^1" libumccr = "^0.4" mypy-boto3-s3 = "^1" +click = "^8" [tool.poetry.group.dev.dependencies] mypy = "^1" @@ -23,4 +24,4 @@ requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" [tool.poetry.scripts] -local = "pg_dd.local:main" +cli = "pg_dd.cli:cli" From 69421a1af9e5bdf79faa146cf845a4063ffa37b9 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Wed, 20 Nov 2024 10:51:02 +1100 Subject: [PATCH 4/8] feat(pg-dd): link up with top-level compose and Makefile --- Makefile | 24 +++++++++-- compose.yml | 35 +++++++++++++++ .../stateless/stacks/pg-dd/.env.example | 6 ++- .../stateless/stacks/pg-dd/Dockerfile | 27 +++++++++--- lib/workload/stateless/stacks/pg-dd/Makefile | 4 +- lib/workload/stateless/stacks/pg-dd/README.md | 16 ++++--- .../stateless/stacks/pg-dd/pg_dd/cli.py | 38 ++++++++++++---- .../stateless/stacks/pg-dd/pg_dd/handler.py | 1 + .../stateless/stacks/pg-dd/pg_dd/pg_dd.py | 43 +++++++++++++------ .../stateless/stacks/pg-dd/poetry.lock | 14 +++--- 10 files changed, 162 insertions(+), 46 deletions(-) diff --git a/Makefile b/Makefile index c41b99e54..0ae96bfe1 100644 --- a/Makefile +++ b/Makefile @@ -24,14 +24,30 @@ start-all-service: docker compose up --wait -d db # Insert all dump data in before running servers - @(cd lib/workload/stateless/stacks/metadata-manager && $(MAKE) s3-load) - @(cd lib/workload/stateless/stacks/sequence-run-manager && $(MAKE) s3-load) - @(cd lib/workload/stateless/stacks/workflow-manager && $(MAKE) s3-load) - @(cd lib/workload/stateless/stacks/filemanager && $(MAKE) s3-load) + @(cd lib/workload/stateless/stacks/metadata-manager && $(MAKE) reset-db) + @(cd lib/workload/stateless/stacks/sequence-run-manager && $(MAKE) reset-db) + @(cd lib/workload/stateless/stacks/workflow-manager && $(MAKE) reset-db) + @(cd lib/workload/stateless/stacks/filemanager && $(MAKE) reset-db) # Running the rest of the ยต-service server docker compose up --wait -d --build +# Commands for pg-dd +dump: PG_DD_COMMAND=dump +dump: pg-dd + +upload: PG_DD_COMMAND=upload +upload: pg-dd + +download: PG_DD_COMMAND=download +download: pg-dd + +load: PG_DD_COMMAND=load +load: pg-dd + +pg-dd: + @PG_DD_COMMAND=$(COMMAND) docker compose up pg-dd + stop-all-service: docker compose down diff --git a/compose.yml b/compose.yml index c0b7bcc02..868a9d90c 100644 --- a/compose.yml +++ b/compose.yml @@ -89,3 +89,38 @@ services: interval: 10s timeout: 2s retries: 5 + + # Load test data into the database. + pg-dd: + build: + context: ./lib/workload/stateless/stacks/pg-dd + dockerfile: Dockerfile + volumes: + # Store the dumps to the local filesystem. + - ./lib/workload/stateless/stacks/pg-dd/data:/app/data + depends_on: + # Depends on migration from all services, so they must be started first. + - db + - metadata-manager + - workflow-manager + - sequence-run-manager + - filemanager + command: ${PG_DD_COMMAND:-load} + environment: + - PG_DD_URL=postgresql://orcabus:orcabus@db:5432 + - PG_DD_DIR=data + - PG_DD_BUCKET=orcabus-test-data-843407916570-ap-southeast-2 + - PG_DD_PREFIX=pg-dd + + - PG_DD_DATABASE_METADATA_MANAGER=metadata_manager + - PG_DD_DATABASE_SEQUENCE_RUN_MANAGER=sequence_run_manager + - PG_DD_DATABASE_WORKFLOW_MANAGER=workflow_manager + - PG_DD_DATABASE_FILEMANAGER=filemanager + - PG_DD_DATABASE_FILEMANAGER_SQL_DUMP=select * from s3_object order by sequencer limit 10000 + - PG_DD_DATABASE_FILEMANAGER_SQL_LOAD=s3_object + + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-access_key_id} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-secret_access_key} + - AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION:-ap-southeast-2} + - AWS_SESSION_TOKEN=${AWS_SESSION_TOKEN:-session_token} + restart: no diff --git a/lib/workload/stateless/stacks/pg-dd/.env.example b/lib/workload/stateless/stacks/pg-dd/.env.example index 4c98ec346..a70d51a12 100644 --- a/lib/workload/stateless/stacks/pg-dd/.env.example +++ b/lib/workload/stateless/stacks/pg-dd/.env.example @@ -1,9 +1,11 @@ PG_DD_URL=postgresql://orcabus:orcabus@0.0.0.0:5432# pragma: allowlist secret PG_DD_DIR=data +PG_DD_BUCKET=bucket +PG_DD_PREFIX=pg-dd PG_DD_DATABASE_METADATA_MANAGER=metadata_manager PG_DD_DATABASE_SEQUENCE_RUN_MANAGER=sequence_run_manager PG_DD_DATABASE_WORKFLOW_MANAGER=workflow_manager PG_DD_DATABASE_FILEMANAGER=filemanager -PG_DD_DATABASE_FILEMANAGER_SQL_DUMP="select * from s3_object order by sequencer limit 10000" -PG_DD_DATABASE_FILEMANAGER_SQL_LOAD="s3_object" \ No newline at end of file +PG_DD_DATABASE_FILEMANAGER_SQL_DUMP='select * from s3_object order by sequencer limit 10000' +PG_DD_DATABASE_FILEMANAGER_SQL_LOAD=s3_object \ No newline at end of file diff --git a/lib/workload/stateless/stacks/pg-dd/Dockerfile b/lib/workload/stateless/stacks/pg-dd/Dockerfile index 6de2f8c3d..64eed8b04 100644 --- a/lib/workload/stateless/stacks/pg-dd/Dockerfile +++ b/lib/workload/stateless/stacks/pg-dd/Dockerfile @@ -2,14 +2,31 @@ # When running this microservice from the Docker Compose root, this Dockerfile # will build the image, install dependencies, and start the server -FROM public.ecr.aws/docker/library/python:3.12 +FROM public.ecr.aws/docker/library/python:3.12-slim AS builder -ARG POETRY_VERSION=1.8 -RUN pip install "poetry==${POETRY_VERSION}" +ENV POETRY_HOME=/opt/poetry +ENV POETRY_NO_INTERACTION=1 +ENV POETRY_VIRTUALENVS_IN_PROJECT=1 +ENV POETRY_VIRTUALENVS_CREATE=1 +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 +ENV POETRY_CACHE_DIR=/opt/.cache + +RUN pip install poetry + +WORKDIR /app + +COPY pyproject.toml poetry.lock /app/ +RUN poetry install --no-root && rm -rf $POETRY_CACHE_DIR + +FROM public.ecr.aws/docker/library/python:3.12-slim AS runtime WORKDIR /app +ENV VIRTUAL_ENV=/app/.venv +ENV PATH="/app/.venv/bin:$PATH" + +COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV} COPY . . -RUN poetry install --no-root -ENTRYPOINT ["/bin/bash", "-c", "make", "local"] \ No newline at end of file +ENTRYPOINT ["python", "-m", "pg_dd.cli"] \ No newline at end of file diff --git a/lib/workload/stateless/stacks/pg-dd/Makefile b/lib/workload/stateless/stacks/pg-dd/Makefile index d5693b49b..feb033d0b 100644 --- a/lib/workload/stateless/stacks/pg-dd/Makefile +++ b/lib/workload/stateless/stacks/pg-dd/Makefile @@ -1,5 +1,7 @@ .PHONY: * +COMMAND ?= "load --exists-ok" + install: @poetry update @@ -10,7 +12,7 @@ check: lint @poetry run ruff check . cli: install - @poetry run cli + @poetry run cli $(COMMAND) clean: rm -rf data && rm -rf .ruff_cache \ No newline at end of file diff --git a/lib/workload/stateless/stacks/pg-dd/README.md b/lib/workload/stateless/stacks/pg-dd/README.md index 2c25176ae..0e4c1f6f3 100644 --- a/lib/workload/stateless/stacks/pg-dd/README.md +++ b/lib/workload/stateless/stacks/pg-dd/README.md @@ -13,6 +13,16 @@ aws lambda invoke --function-name orcabus-pg-dd response.json This is setup to dump the metadata_manager, workflow_manager, sequence_run_manager, and 10000 of the most recent rows of the filemanager database. +This command can also be run locally, for example: + +``` +make cli COMMAND="--help" +``` + +The `Dockerfile` is setup to launch with the top-level `Makefile`, which also contains commands for running the CLI. +By default `start-all-service` will run a `load` on the local database, downloading any dumps from S3 along the way. +AWS credentials must be present in the shell for this to work. + ## Configuration This function can be configured by setting the following environment variables, see [.env.example][env-example] for an example: @@ -32,12 +42,6 @@ This function can be configured by setting the following environment variables, This project uses [poetry] to manage dependencies. -The pg-dd command can be run locally to dump data to a directory: - -``` -make cli -``` - Run the linter and formatter: ``` diff --git a/lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py b/lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py index 4ab9ce209..0b8669eae 100644 --- a/lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py +++ b/lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py @@ -1,6 +1,11 @@ +import logging + from pg_dd.pg_dd import PgDDLocal, PgDDS3 import click +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + @click.group() def cli(): @@ -8,11 +13,17 @@ def cli(): @cli.command() -def download(): +@click.option( + "--exists-ok/--no-exists-ok", + default=True, + help="If the file already exists, do not download it.", +) +def download(exists_ok): """ Download S3 CSV dumps to the local directory. + :return: """ - PgDDS3().download_local() + PgDDS3(logger=logger).download_local(exists_ok) @cli.command() @@ -20,23 +31,34 @@ def upload(): """ Uploads local CSV dumps to S3. """ - PgDDS3().write_to_bucket() + PgDDS3(logger=logger).write_to_bucket() @cli.command() -def dump(): +@click.option( + "--database", help="Specify the database to dump, dumps all databases by default." +) +def dump(database): """ Dump from the local database to CSV files. """ - PgDDLocal().write_to_dir() + PgDDLocal(logger=logger).write_to_dir(database) @cli.command() -def load(): +@click.option( + "--download-exists-ok/--no-download-exists-ok", + default=True, + help="Download the CSV files from S3 if they are not already in the local directory.", +) +def load(download_exists_ok): """ - Load local CSV files into the database + Load local CSV files into the database. """ - PgDDLocal().load_to_database() + if download_exists_ok: + PgDDS3(logger=logger).download_local(download_exists_ok) + + PgDDLocal(logger=logger).load_to_database() if __name__ == "__main__": diff --git a/lib/workload/stateless/stacks/pg-dd/pg_dd/handler.py b/lib/workload/stateless/stacks/pg-dd/pg_dd/handler.py index 5ffc7bd9e..7506b11df 100644 --- a/lib/workload/stateless/stacks/pg-dd/pg_dd/handler.py +++ b/lib/workload/stateless/stacks/pg-dd/pg_dd/handler.py @@ -8,6 +8,7 @@ from pg_dd.pg_dd import PgDDS3 logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) try: secret_str = libsm.get_secret(os.getenv("PG_DD_SECRET")) diff --git a/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py b/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py index c8c601f3c..86295772c 100644 --- a/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py +++ b/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py @@ -18,19 +18,23 @@ class PgDD: A class to dump postgres databases to CSV files. """ - def __init__(self, logger=logging.getLogger(__name__)): + def __init__(self, logger: logging.Logger = logging.getLogger(__name__)): self.url = os.getenv("PG_DD_URL") self.databases = self.read_databases() self.logger = logger - def csvs_for_tables(self) -> Dict[str, Dict[str, str]]: + def csvs_for_tables(self, db: str = None) -> Dict[str, Dict[str, str]]: """ Get csvs for all tables in all databases. """ databases = {} for entry in self.databases.values(): - url = f"{self.url}/{entry['database']}" + database = entry["database"] + if db is not None and db != database: + continue + + url = f"{self.url}/{database}" conn: psycopg.connection.Connection with psycopg.connect(url) as conn: @@ -52,7 +56,13 @@ def csvs_for_tables(self) -> Dict[str, Dict[str, str]]: return databases - def load_table(self, table, data, conn, only_non_empty=True): + def load_table( + self, + table: str, + data: str, + conn: psycopg.connection.Connection, + only_non_empty: bool = True, + ): """ Load a table with the CSV data. """ @@ -88,13 +98,13 @@ def load_table(self, table, data, conn, only_non_empty=True): ) as copy: copy.write(data) - def target_files(self) -> List[Tuple[str, str, str, str]]: + def target_files(self, db: str = None) -> List[Tuple[str, str, str, str]]: """ Get the target files for all directories. """ files = [] - for database, tables in self.csvs_for_tables().items(): + for database, tables in self.csvs_for_tables(db).items(): for table, value in tables.items(): file = f"{database}/{table}.csv.gz" files += [(database, table, file, value)] @@ -174,19 +184,19 @@ class PgDDLocal(PgDD): Dump CSV files to a local directory. """ - def __init__(self, logger=logging.getLogger(__name__)): + def __init__(self, logger: logging.Logger = logging.getLogger(__name__)): super().__init__(logger=logger) self.out = os.getenv("PG_DD_DIR") self.bucket = os.getenv("PG_DD_BUCKET") self.prefix = os.getenv("PG_DD_PREFIX") self.s3: S3ServiceResource = boto3.resource("s3") - def write_to_dir(self): + def write_to_dir(self, db: str = None): """ Write the CSV files to the output directory. """ - for _, _, f, value in self.target_files(): + for _, _, f, value in self.target_files(db): file = f"{self.out}/{f}" os.makedirs(file.rsplit("/", 1)[0], exist_ok=True) self.logger.info(f"writing to file: {f}") @@ -215,7 +225,10 @@ def load_files(): for _, dirs, _ in os.walk(self.out): for database in dirs: conn: psycopg.connection.Connection - with psycopg.connect(f"{self.url}/{database}") as conn: + url = f"{self.url}/{database}" + with psycopg.connect(url) as conn: + self.logger.info(f"connecting to: {url}") + conn.set_deferrable(True) load_files() conn.commit() @@ -226,7 +239,7 @@ class PgDDS3(PgDD): Commands related to running this inside a Lambda function. """ - def __init__(self, logger=logging.getLogger(__name__)): + def __init__(self, logger: logging.Logger = logging.getLogger(__name__)): super().__init__(logger=logger) self.bucket = os.getenv("PG_DD_BUCKET") self.prefix = os.getenv("PG_DD_PREFIX") @@ -247,7 +260,7 @@ def write_to_bucket(self): s3_object = self.s3.Object(self.bucket, key) s3_object.put(Body=gzip.compress(str.encode(value))) - def download_local(self): + def download_local(self, exists_ok: bool = True): """ Download from S3 CSV files to load. """ @@ -257,5 +270,9 @@ def download_local(self): file = f"{self.dir}/{f}" os.makedirs(file.rsplit("/", 1)[0], exist_ok=True) + if exists_ok and os.path.exists(file): + self.logger.info(f"file already exists: {f}") + continue + s3_object = self.s3.Object(self.bucket, f"{self.prefix}/{f}") - s3_object.download_file(f"{self.dir}/{f}") + s3_object.download_file(file) diff --git a/lib/workload/stateless/stacks/pg-dd/poetry.lock b/lib/workload/stateless/stacks/pg-dd/poetry.lock index 26b9fc095..34b561dd5 100644 --- a/lib/workload/stateless/stacks/pg-dd/poetry.lock +++ b/lib/workload/stateless/stacks/pg-dd/poetry.lock @@ -2,17 +2,17 @@ [[package]] name = "boto3" -version = "1.35.64" +version = "1.35.65" description = "The AWS SDK for Python" optional = false python-versions = ">=3.8" files = [ - {file = "boto3-1.35.64-py3-none-any.whl", hash = "sha256:cdacf03fc750caa3aa0dbf6158166def9922c9d67b4160999ff8fc350662facc"}, - {file = "boto3-1.35.64.tar.gz", hash = "sha256:bc3fc12b41fa2c91e51ab140f74fb1544408a2b1e00f88a4c2369a66d18ddf20"}, + {file = "boto3-1.35.65-py3-none-any.whl", hash = "sha256:acbca38322b66516450f959c7874826267d431becdc2b080e331e56c2ebbe507"}, + {file = "boto3-1.35.65.tar.gz", hash = "sha256:f6c266b4124b92b1603727bf1ed1917e0b74a899bd0e326f151d80c3eaed27a1"}, ] [package.dependencies] -botocore = ">=1.35.64,<1.36.0" +botocore = ">=1.35.65,<1.36.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.10.0,<0.11.0" @@ -21,13 +21,13 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.35.64" +version = "1.35.65" description = "Low-level, data-driven core of boto 3." optional = false python-versions = ">=3.8" files = [ - {file = "botocore-1.35.64-py3-none-any.whl", hash = "sha256:bbd96bf7f442b1d5e35b36f501076e4a588c83d8d84a1952e9ee1d767e5efb3e"}, - {file = "botocore-1.35.64.tar.gz", hash = "sha256:2f95c83f31c9e38a66995c88810fc638c829790e125032ba00ab081a2cf48cb9"}, + {file = "botocore-1.35.65-py3-none-any.whl", hash = "sha256:8fcaa82ab2338f412e1494449c4c57f9ca785623fb0303f6be5b279c4d27522c"}, + {file = "botocore-1.35.65.tar.gz", hash = "sha256:46652f732f2b2fb395fffcc33cacb288d05ea283047c9a996fb59d6849464919"}, ] [package.dependencies] From 87ded9f5b9b710064886b17d670646a604ddf39c Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Wed, 20 Nov 2024 13:40:56 +1100 Subject: [PATCH 5/8] fix(pg-dd): update filemanager Makefile --- .../stateless/stacks/filemanager/Makefile | 9 +++--- .../stateless/stacks/pg-dd/.env.example | 4 +-- .../stateless/stacks/pg-dd/.gitignore | 2 +- .../stateless/stacks/pg-dd/Dockerfile | 2 +- lib/workload/stateless/stacks/pg-dd/Makefile | 2 +- lib/workload/stateless/stacks/pg-dd/README.md | 10 ++++-- .../stateless/stacks/pg-dd/pg_dd/cli.py | 18 ++++++++--- .../stateless/stacks/pg-dd/pg_dd/pg_dd.py | 31 ++++++++++++------- 8 files changed, 49 insertions(+), 29 deletions(-) diff --git a/lib/workload/stateless/stacks/filemanager/Makefile b/lib/workload/stateless/stacks/filemanager/Makefile index e1cdca44c..ab37e7b1f 100644 --- a/lib/workload/stateless/stacks/filemanager/Makefile +++ b/lib/workload/stateless/stacks/filemanager/Makefile @@ -73,18 +73,17 @@ restore: @docker compose exec -T postgres pg_restore -U filemanager -d filemanager ## Targets related to top-level database management and S3. -apply-schema: - @for file in database/migrations/*; do \ - docker exec -e PGPASSWORD=orcabus -it orcabus_db psql -h $(FILEMANAGER_DATABASE_HOST) -U orcabus -d filemanager -c "$$(cat $$file)"; \ - done reset-db: @docker exec -e PGPASSWORD=orcabus -it orcabus_db psql -h $(FILEMANAGER_DATABASE_HOST) -U orcabus -d orcabus -c "DROP DATABASE IF EXISTS filemanager;" && \ docker exec -e PGPASSWORD=orcabus -it orcabus_db psql -h $(FILEMANAGER_DATABASE_HOST) -U orcabus -d orcabus -c "CREATE DATABASE filemanager;" + for file in database/migrations/*; do \ + docker exec -e PGPASSWORD=orcabus -it orcabus_db psql -h $(FILEMANAGER_DATABASE_HOST) -U orcabus -d filemanager -c "$$(cat $$file)"; \ + done s3-dump-upload: @aws s3 cp data/fm_s3_objects_100000.csv.gz s3://orcabus-test-data-843407916570-ap-southeast-2/file-manager/fm_s3_objects_100000.csv.gz s3-dump-download: @aws s3 cp s3://orcabus-test-data-843407916570-ap-southeast-2/file-manager/fm_s3_objects_100000.csv.gz data/fm_s3_objects_100000.csv.gz -db-load-data: reset-db apply-schema +db-load-data: reset-db @gunzip -c data/fm_s3_objects_100000.csv.gz | \ docker exec -i orcabus_db psql -U orcabus -d filemanager -c "copy s3_object from stdin with (format csv, header);" s3-dump-download-if-not-exists: diff --git a/lib/workload/stateless/stacks/pg-dd/.env.example b/lib/workload/stateless/stacks/pg-dd/.env.example index a70d51a12..49a3a57fc 100644 --- a/lib/workload/stateless/stacks/pg-dd/.env.example +++ b/lib/workload/stateless/stacks/pg-dd/.env.example @@ -1,6 +1,6 @@ PG_DD_URL=postgresql://orcabus:orcabus@0.0.0.0:5432# pragma: allowlist secret PG_DD_DIR=data -PG_DD_BUCKET=bucket +PG_DD_BUCKET=orcabus-test-data-843407916570-ap-southeast-2 PG_DD_PREFIX=pg-dd PG_DD_DATABASE_METADATA_MANAGER=metadata_manager @@ -8,4 +8,4 @@ PG_DD_DATABASE_SEQUENCE_RUN_MANAGER=sequence_run_manager PG_DD_DATABASE_WORKFLOW_MANAGER=workflow_manager PG_DD_DATABASE_FILEMANAGER=filemanager PG_DD_DATABASE_FILEMANAGER_SQL_DUMP='select * from s3_object order by sequencer limit 10000' -PG_DD_DATABASE_FILEMANAGER_SQL_LOAD=s3_object \ No newline at end of file +PG_DD_DATABASE_FILEMANAGER_SQL_LOAD=s3_object diff --git a/lib/workload/stateless/stacks/pg-dd/.gitignore b/lib/workload/stateless/stacks/pg-dd/.gitignore index 7e63edf22..4618a6988 100644 --- a/lib/workload/stateless/stacks/pg-dd/.gitignore +++ b/lib/workload/stateless/stacks/pg-dd/.gitignore @@ -1,4 +1,4 @@ .env data .ruff_cache -response.json \ No newline at end of file +response.json diff --git a/lib/workload/stateless/stacks/pg-dd/Dockerfile b/lib/workload/stateless/stacks/pg-dd/Dockerfile index 64eed8b04..e42aba04e 100644 --- a/lib/workload/stateless/stacks/pg-dd/Dockerfile +++ b/lib/workload/stateless/stacks/pg-dd/Dockerfile @@ -29,4 +29,4 @@ ENV PATH="/app/.venv/bin:$PATH" COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV} COPY . . -ENTRYPOINT ["python", "-m", "pg_dd.cli"] \ No newline at end of file +ENTRYPOINT ["python", "-m", "pg_dd.cli"] diff --git a/lib/workload/stateless/stacks/pg-dd/Makefile b/lib/workload/stateless/stacks/pg-dd/Makefile index feb033d0b..79e353217 100644 --- a/lib/workload/stateless/stacks/pg-dd/Makefile +++ b/lib/workload/stateless/stacks/pg-dd/Makefile @@ -15,4 +15,4 @@ cli: install @poetry run cli $(COMMAND) clean: - rm -rf data && rm -rf .ruff_cache \ No newline at end of file + rm -rf data && rm -rf .ruff_cache diff --git a/lib/workload/stateless/stacks/pg-dd/README.md b/lib/workload/stateless/stacks/pg-dd/README.md index 0e4c1f6f3..b384c0c47 100644 --- a/lib/workload/stateless/stacks/pg-dd/README.md +++ b/lib/workload/stateless/stacks/pg-dd/README.md @@ -13,12 +13,18 @@ aws lambda invoke --function-name orcabus-pg-dd response.json This is setup to dump the metadata_manager, workflow_manager, sequence_run_manager, and 10000 of the most recent rows of the filemanager database. -This command can also be run locally, for example: +This command can be run locally using make, or by running `poetry` directly: -``` +```sh make cli COMMAND="--help" ``` +For example, to dump and upload a specific database to s3: + +```sh +poetry run cli dump --database metadata_manager && poetry run cli upload +``` + The `Dockerfile` is setup to launch with the top-level `Makefile`, which also contains commands for running the CLI. By default `start-all-service` will run a `load` on the local database, downloading any dumps from S3 along the way. AWS credentials must be present in the shell for this to work. diff --git a/lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py b/lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py index 0b8669eae..b9ff86c2e 100644 --- a/lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py +++ b/lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py @@ -21,17 +21,20 @@ def cli(): def download(exists_ok): """ Download S3 CSV dumps to the local directory. - :return: """ PgDDS3(logger=logger).download_local(exists_ok) @cli.command() -def upload(): +@click.option( + "--database", + help="Specify the database to upload, uploads all databases by default.", +) +def upload(database): """ Uploads local CSV dumps to S3. """ - PgDDS3(logger=logger).write_to_bucket() + PgDDS3(logger=logger).write_to_bucket(database) @cli.command() @@ -51,14 +54,19 @@ def dump(database): default=True, help="Download the CSV files from S3 if they are not already in the local directory.", ) -def load(download_exists_ok): +@click.option( + "--only-empty/--no-only-empty", + default=True, + help="Only load into tables that are empty and exist in the database.", +) +def load(download_exists_ok, only_empty): """ Load local CSV files into the database. """ if download_exists_ok: PgDDS3(logger=logger).download_local(download_exists_ok) - PgDDLocal(logger=logger).load_to_database() + PgDDLocal(logger=logger).load_to_database(only_empty) if __name__ == "__main__": diff --git a/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py b/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py index 86295772c..df1c2e55c 100644 --- a/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py +++ b/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py @@ -15,7 +15,7 @@ class PgDD: """ - A class to dump postgres databases to CSV files. + A class to manage dumping/loading CSV files to a Postgres database. """ def __init__(self, logger: logging.Logger = logging.getLogger(__name__)): @@ -61,14 +61,14 @@ def load_table( table: str, data: str, conn: psycopg.connection.Connection, - only_non_empty: bool = True, + only_empty: bool = True, ): """ Load a table with the CSV data. """ with conn.cursor() as cur: - if only_non_empty: + if only_empty: exists = cur.execute( sql.SQL( """ @@ -181,7 +181,7 @@ def copy_tables_to_csv( class PgDDLocal(PgDD): """ - Dump CSV files to a local directory. + Commands related to dumping/loading CSV files to a local directory. """ def __init__(self, logger: logging.Logger = logging.getLogger(__name__)): @@ -201,10 +201,10 @@ def write_to_dir(self, db: str = None): os.makedirs(file.rsplit("/", 1)[0], exist_ok=True) self.logger.info(f"writing to file: {f}") - with open(file, "wb") as f: - f.write(gzip.compress(str.encode(value))) + with open(file, "wb") as file: + file.write(gzip.compress(str.encode(value))) - def load_to_database(self): + def load_to_database(self, only_empty: bool = True): """ Download from S3 CSV files to load. """ @@ -219,7 +219,10 @@ def load_files(): table = load[int(table)] self.load_table( - table, gzip.decompress(f.read()).decode("utf-8"), conn + table, + gzip.decompress(f.read()).decode("utf-8"), + conn, + only_empty, ) for _, dirs, _ in os.walk(self.out): @@ -236,7 +239,7 @@ def load_files(): class PgDDS3(PgDD): """ - Commands related to running this inside a Lambda function. + Commands related to dumping/loading from S3. """ def __init__(self, logger: logging.Logger = logging.getLogger(__name__)): @@ -246,12 +249,12 @@ def __init__(self, logger: logging.Logger = logging.getLogger(__name__)): self.dir = os.getenv("PG_DD_DIR") self.s3: S3ServiceResource = boto3.resource("s3") - def write_to_bucket(self): + def write_to_bucket(self, db: str = None): """ Write the CSV files to the S3 bucket. """ - for _, _, key, value in self.target_files(): + for _, _, key, value in self.target_files(db): if self.prefix: key = f"{self.prefix}/{key}" @@ -275,4 +278,8 @@ def download_local(self, exists_ok: bool = True): continue s3_object = self.s3.Object(self.bucket, f"{self.prefix}/{f}") - s3_object.download_file(file) + try: + s3_object.download_file(file) + except Exception as e: + self.logger.info(f"could not find file: {e}") + continue From f7abe03ed72f9149ce91f90985637699f0d9b7b0 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Thu, 21 Nov 2024 16:12:56 +1100 Subject: [PATCH 6/8] fix(pg-dd): simplify Dockerfile to use poetry directly and update python version [skip ci] --- .../stateless/stacks/pg-dd/Dockerfile | 26 +++---------------- .../stateless/stacks/pg-dd/poetry.lock | 25 +++++++++--------- .../stateless/stacks/pg-dd/pyproject.toml | 2 +- 3 files changed, 17 insertions(+), 36 deletions(-) diff --git a/lib/workload/stateless/stacks/pg-dd/Dockerfile b/lib/workload/stateless/stacks/pg-dd/Dockerfile index e42aba04e..b511ec487 100644 --- a/lib/workload/stateless/stacks/pg-dd/Dockerfile +++ b/lib/workload/stateless/stacks/pg-dd/Dockerfile @@ -2,31 +2,13 @@ # When running this microservice from the Docker Compose root, this Dockerfile # will build the image, install dependencies, and start the server -FROM public.ecr.aws/docker/library/python:3.12-slim AS builder +FROM public.ecr.aws/docker/library/python:3.13 -ENV POETRY_HOME=/opt/poetry -ENV POETRY_NO_INTERACTION=1 -ENV POETRY_VIRTUALENVS_IN_PROJECT=1 -ENV POETRY_VIRTUALENVS_CREATE=1 -ENV PYTHONDONTWRITEBYTECODE=1 -ENV PYTHONUNBUFFERED=1 -ENV POETRY_CACHE_DIR=/opt/.cache - -RUN pip install poetry - -WORKDIR /app - -COPY pyproject.toml poetry.lock /app/ -RUN poetry install --no-root && rm -rf $POETRY_CACHE_DIR - -FROM public.ecr.aws/docker/library/python:3.12-slim AS runtime +RUN pip3 install poetry WORKDIR /app -ENV VIRTUAL_ENV=/app/.venv -ENV PATH="/app/.venv/bin:$PATH" - -COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV} COPY . . +RUN poetry install --no-interaction -ENTRYPOINT ["python", "-m", "pg_dd.cli"] +ENTRYPOINT ["poetry", "run", "cli"] diff --git a/lib/workload/stateless/stacks/pg-dd/poetry.lock b/lib/workload/stateless/stacks/pg-dd/poetry.lock index 34b561dd5..d10f581f3 100644 --- a/lib/workload/stateless/stacks/pg-dd/poetry.lock +++ b/lib/workload/stateless/stacks/pg-dd/poetry.lock @@ -2,17 +2,17 @@ [[package]] name = "boto3" -version = "1.35.65" +version = "1.35.66" description = "The AWS SDK for Python" optional = false python-versions = ">=3.8" files = [ - {file = "boto3-1.35.65-py3-none-any.whl", hash = "sha256:acbca38322b66516450f959c7874826267d431becdc2b080e331e56c2ebbe507"}, - {file = "boto3-1.35.65.tar.gz", hash = "sha256:f6c266b4124b92b1603727bf1ed1917e0b74a899bd0e326f151d80c3eaed27a1"}, + {file = "boto3-1.35.66-py3-none-any.whl", hash = "sha256:09a610f8cf4d3c22d4ca69c1f89079e3a1c82805ce94fa0eb4ecdd4d2ba6c4bc"}, + {file = "boto3-1.35.66.tar.gz", hash = "sha256:c392b9168b65e9c23483eaccb5b68d1f960232d7f967a1e00a045ba065ce050d"}, ] [package.dependencies] -botocore = ">=1.35.65,<1.36.0" +botocore = ">=1.35.66,<1.36.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.10.0,<0.11.0" @@ -21,13 +21,13 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.35.65" +version = "1.35.66" description = "Low-level, data-driven core of boto 3." optional = false python-versions = ">=3.8" files = [ - {file = "botocore-1.35.65-py3-none-any.whl", hash = "sha256:8fcaa82ab2338f412e1494449c4c57f9ca785623fb0303f6be5b279c4d27522c"}, - {file = "botocore-1.35.65.tar.gz", hash = "sha256:46652f732f2b2fb395fffcc33cacb288d05ea283047c9a996fb59d6849464919"}, + {file = "botocore-1.35.66-py3-none-any.whl", hash = "sha256:d0683e9c18bb6852f768da268086c3749d925332a664db0dd1459cfa7e96e475"}, + {file = "botocore-1.35.66.tar.gz", hash = "sha256:51f43220315f384959f02ea3266740db4d421592dd87576c18824e424b349fdb"}, ] [package.dependencies] @@ -179,7 +179,6 @@ files = [ [package.dependencies] psycopg-binary = {version = "3.2.3", optional = true, markers = "implementation_name != \"pypy\" and extra == \"binary\""} -typing-extensions = {version = ">=4.6", markers = "python_version < \"3.13\""} tzdata = {version = "*", markers = "sys_platform == \"win32\""} [package.extras] @@ -320,13 +319,13 @@ files = [ [[package]] name = "s3transfer" -version = "0.10.3" +version = "0.10.4" description = "An Amazon S3 Transfer Manager" optional = false python-versions = ">=3.8" files = [ - {file = "s3transfer-0.10.3-py3-none-any.whl", hash = "sha256:263ed587a5803c6c708d3ce44dc4dfedaab4c1a32e8329bab818933d79ddcf5d"}, - {file = "s3transfer-0.10.3.tar.gz", hash = "sha256:4f50ed74ab84d474ce614475e0b8d5047ff080810aac5d01ea25231cfc944b0c"}, + {file = "s3transfer-0.10.4-py3-none-any.whl", hash = "sha256:244a76a24355363a68164241438de1b72f8781664920260c48465896b712a41e"}, + {file = "s3transfer-0.10.4.tar.gz", hash = "sha256:29edc09801743c21eb5ecbc617a152df41d3c287f67b615f73e5f750583666a7"}, ] [package.dependencies] @@ -387,5 +386,5 @@ zstd = ["zstandard (>=0.18.0)"] [metadata] lock-version = "2.0" -python-versions = "^3.12" -content-hash = "7b6452562763ec8f1bcf43ad6f8cfaae7702c8190d90d8ec142eceb5df7294df" +python-versions = "^3.13" +content-hash = "494ad924c9cf531d9b2d699aca8c8777ac36b8bb21de963548fdd0dd4baaa217" diff --git a/lib/workload/stateless/stacks/pg-dd/pyproject.toml b/lib/workload/stateless/stacks/pg-dd/pyproject.toml index d62b09c2e..057ece265 100644 --- a/lib/workload/stateless/stacks/pg-dd/pyproject.toml +++ b/lib/workload/stateless/stacks/pg-dd/pyproject.toml @@ -7,7 +7,7 @@ readme = "README.md" packages = [{ include = "pg_dd" }] [tool.poetry.dependencies] -python = "^3.12" +python = "^3.13" boto3 = "^1" psycopg = { version = "^3", extras = ["binary"] } python-dotenv = "^1" From c2a8bdb9abc77be0fadc7e7f20182674c3361b16 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Thu, 21 Nov 2024 17:00:51 +1100 Subject: [PATCH 7/8] fix(pg-dd): re-add --no-root and make upload/download only work on the local directory --- .../stateless/stacks/pg-dd/Dockerfile | 2 +- .../stateless/stacks/pg-dd/pg_dd/cli.py | 10 ++++- .../stateless/stacks/pg-dd/pg_dd/pg_dd.py | 40 +++++++++++-------- 3 files changed, 33 insertions(+), 19 deletions(-) diff --git a/lib/workload/stateless/stacks/pg-dd/Dockerfile b/lib/workload/stateless/stacks/pg-dd/Dockerfile index b511ec487..9228fc354 100644 --- a/lib/workload/stateless/stacks/pg-dd/Dockerfile +++ b/lib/workload/stateless/stacks/pg-dd/Dockerfile @@ -9,6 +9,6 @@ RUN pip3 install poetry WORKDIR /app COPY . . -RUN poetry install --no-interaction +RUN poetry install --no-interaction --no-root ENTRYPOINT ["poetry", "run", "cli"] diff --git a/lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py b/lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py index b9ff86c2e..64d441101 100644 --- a/lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py +++ b/lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py @@ -30,10 +30,18 @@ def download(exists_ok): "--database", help="Specify the database to upload, uploads all databases by default.", ) -def upload(database): +@click.option( + "--dump-db/--no-dump-db", + default=False, + help="Dump from the database first before uploading.", +) +def upload(database, dump_db): """ Uploads local CSV dumps to S3. """ + if dump_db: + PgDDLocal(logger=logger).write_to_dir(database) + PgDDS3(logger=logger).write_to_bucket(database) diff --git a/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py b/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py index df1c2e55c..51032fb59 100644 --- a/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py +++ b/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py @@ -254,32 +254,38 @@ def write_to_bucket(self, db: str = None): Write the CSV files to the S3 bucket. """ - for _, _, key, value in self.target_files(db): - if self.prefix: - key = f"{self.prefix}/{key}" + for root, dirs, files in os.walk(self.dir): + for file in files: + file = os.path.join(root, file) + key = file.removeprefix(self.dir).removeprefix("/") - self.logger.info(f"writing to bucket with key: {key}") + if key == "" or (db is not None and not key.startswith(db)): + continue - s3_object = self.s3.Object(self.bucket, key) - s3_object.put(Body=gzip.compress(str.encode(value))) + if self.prefix: + key = f"{self.prefix}/{key}" + + self.logger.info(f"writing to bucket with key: {key}") + + s3_object = self.s3.Object(self.bucket, key) + with open(file, "rb") as f: + s3_object.put(Body=gzip.compress(f.read())) def download_local(self, exists_ok: bool = True): """ Download from S3 CSV files to load. """ - for _, _, f, _ in self.target_files(): - self.logger.info(f"target file: {f}") - file = f"{self.dir}/{f}" - os.makedirs(file.rsplit("/", 1)[0], exist_ok=True) + objects = self.s3.Bucket(self.bucket).objects.filter(Prefix=self.prefix) + for obj in objects: + split = obj.key.rsplit("/", 2) + directory = f"{self.dir}/{split[-2]}" + os.makedirs(directory, exist_ok=True) + file = f"{directory}/{split[-1]}" if exists_ok and os.path.exists(file): - self.logger.info(f"file already exists: {f}") + self.logger.info(f"file already exists: {file}") continue - s3_object = self.s3.Object(self.bucket, f"{self.prefix}/{f}") - try: - s3_object.download_file(file) - except Exception as e: - self.logger.info(f"could not find file: {e}") - continue + s3_object = self.s3.Object(self.bucket, obj.key) + s3_object.download_file(file) From ae393eb2186bf77498333808d9ad832e7b0d4281 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Tue, 26 Nov 2024 16:59:27 +1100 Subject: [PATCH 8/8] fix(pg-dd): double compressed files --- .../stateless/stacks/pg-dd/.env.example | 2 +- .../stateless/stacks/pg-dd/pg_dd/pg_dd.py | 6 +++-- .../stateless/stacks/pg-dd/poetry.lock | 22 +++++++++---------- .../stateless/stacks/pg-dd/pyproject.toml | 2 +- 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/lib/workload/stateless/stacks/pg-dd/.env.example b/lib/workload/stateless/stacks/pg-dd/.env.example index 49a3a57fc..e674b8e70 100644 --- a/lib/workload/stateless/stacks/pg-dd/.env.example +++ b/lib/workload/stateless/stacks/pg-dd/.env.example @@ -1,4 +1,4 @@ -PG_DD_URL=postgresql://orcabus:orcabus@0.0.0.0:5432# pragma: allowlist secret +PG_DD_URL=postgresql://orcabus:orcabus@0.0.0.0:5432 # pragma: allowlist secret PG_DD_DIR=data PG_DD_BUCKET=orcabus-test-data-843407916570-ap-southeast-2 PG_DD_PREFIX=pg-dd diff --git a/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py b/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py index 51032fb59..d5c8fb621 100644 --- a/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py +++ b/lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py @@ -287,5 +287,7 @@ def download_local(self, exists_ok: bool = True): self.logger.info(f"file already exists: {file}") continue - s3_object = self.s3.Object(self.bucket, obj.key) - s3_object.download_file(file) + s3_object = self.s3.Object(self.bucket, obj.key).get() + data = gzip.decompress(s3_object["Body"].read()) + with open(file, "wb") as f: + f.write(data) diff --git a/lib/workload/stateless/stacks/pg-dd/poetry.lock b/lib/workload/stateless/stacks/pg-dd/poetry.lock index d10f581f3..fa87ea2f4 100644 --- a/lib/workload/stateless/stacks/pg-dd/poetry.lock +++ b/lib/workload/stateless/stacks/pg-dd/poetry.lock @@ -2,17 +2,17 @@ [[package]] name = "boto3" -version = "1.35.66" +version = "1.35.69" description = "The AWS SDK for Python" optional = false python-versions = ">=3.8" files = [ - {file = "boto3-1.35.66-py3-none-any.whl", hash = "sha256:09a610f8cf4d3c22d4ca69c1f89079e3a1c82805ce94fa0eb4ecdd4d2ba6c4bc"}, - {file = "boto3-1.35.66.tar.gz", hash = "sha256:c392b9168b65e9c23483eaccb5b68d1f960232d7f967a1e00a045ba065ce050d"}, + {file = "boto3-1.35.69-py3-none-any.whl", hash = "sha256:20945912130cca1505f45819cd9b7183a0e376e91a1221a0b1f50c80d35fd7e2"}, + {file = "boto3-1.35.69.tar.gz", hash = "sha256:40db86c7732a310b282f595251995ecafcbd62009a57e47a22683862e570cc7a"}, ] [package.dependencies] -botocore = ">=1.35.66,<1.36.0" +botocore = ">=1.35.69,<1.36.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.10.0,<0.11.0" @@ -21,13 +21,13 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.35.66" +version = "1.35.69" description = "Low-level, data-driven core of boto 3." optional = false python-versions = ">=3.8" files = [ - {file = "botocore-1.35.66-py3-none-any.whl", hash = "sha256:d0683e9c18bb6852f768da268086c3749d925332a664db0dd1459cfa7e96e475"}, - {file = "botocore-1.35.66.tar.gz", hash = "sha256:51f43220315f384959f02ea3266740db4d421592dd87576c18824e424b349fdb"}, + {file = "botocore-1.35.69-py3-none-any.whl", hash = "sha256:cad8d9305f873404eee4b197d84e60a40975d43cbe1ab63abe893420ddfe6e3c"}, + {file = "botocore-1.35.69.tar.gz", hash = "sha256:f9f23dd76fb247d9b0e8d411d2995e6f847fc451c026f1e58e300f815b0b36eb"}, ] [package.dependencies] @@ -146,13 +146,13 @@ reports = ["lxml"] [[package]] name = "mypy-boto3-s3" -version = "1.35.61" -description = "Type annotations for boto3.S3 1.35.61 service generated with mypy-boto3-builder 8.2.1" +version = "1.35.69" +description = "Type annotations for boto3 S3 1.35.69 service generated with mypy-boto3-builder 8.3.1" optional = false python-versions = ">=3.8" files = [ - {file = "mypy_boto3_s3-1.35.61-py3-none-any.whl", hash = "sha256:4cfc410a02a302935876f0d1ae3f0738bf540acd686168790fb0c5986a085f1e"}, - {file = "mypy_boto3_s3-1.35.61.tar.gz", hash = "sha256:6965fe6c5f3d8362ac7895663540844ed29c885c92a62233c29a1c031ca28c90"}, + {file = "mypy_boto3_s3-1.35.69-py3-none-any.whl", hash = "sha256:11a34259983e09d67e4d3a322fd47904a006bbfff19984e4e36a77e30f2014bb"}, + {file = "mypy_boto3_s3-1.35.69.tar.gz", hash = "sha256:97f7944a84a4a49282825bef1483a25680dcdce75da6017745d709d2cf2aa1c0"}, ] [[package]] diff --git a/lib/workload/stateless/stacks/pg-dd/pyproject.toml b/lib/workload/stateless/stacks/pg-dd/pyproject.toml index 057ece265..0f41f94af 100644 --- a/lib/workload/stateless/stacks/pg-dd/pyproject.toml +++ b/lib/workload/stateless/stacks/pg-dd/pyproject.toml @@ -1,5 +1,5 @@ [tool.poetry] -name = "postgres-data-dump" +name = "pg-dd" version = "0.1.0" description = "A serivce to dump postgres databases to S3." authors = ["Marko malenic "]