Skip to content

Commit

Permalink
feat(pg-dd): link up with top-level compose and Makefile
Browse files Browse the repository at this point in the history
  • Loading branch information
mmalenic committed Nov 19, 2024
1 parent 3395e7f commit 69421a1
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 46 deletions.
24 changes: 20 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,30 @@ start-all-service:
docker compose up --wait -d db

# Insert all dump data in before running servers
@(cd lib/workload/stateless/stacks/metadata-manager && $(MAKE) s3-load)
@(cd lib/workload/stateless/stacks/sequence-run-manager && $(MAKE) s3-load)
@(cd lib/workload/stateless/stacks/workflow-manager && $(MAKE) s3-load)
@(cd lib/workload/stateless/stacks/filemanager && $(MAKE) s3-load)
@(cd lib/workload/stateless/stacks/metadata-manager && $(MAKE) reset-db)
@(cd lib/workload/stateless/stacks/sequence-run-manager && $(MAKE) reset-db)
@(cd lib/workload/stateless/stacks/workflow-manager && $(MAKE) reset-db)
@(cd lib/workload/stateless/stacks/filemanager && $(MAKE) reset-db)

# Running the rest of the µ-service server
docker compose up --wait -d --build

# Commands for pg-dd
dump: PG_DD_COMMAND=dump
dump: pg-dd

upload: PG_DD_COMMAND=upload
upload: pg-dd

download: PG_DD_COMMAND=download
download: pg-dd

load: PG_DD_COMMAND=load
load: pg-dd

pg-dd:
@PG_DD_COMMAND=$(COMMAND) docker compose up pg-dd

stop-all-service:
docker compose down

Expand Down
35 changes: 35 additions & 0 deletions compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,38 @@ services:
interval: 10s
timeout: 2s
retries: 5

# Load test data into the database.
pg-dd:
build:
context: ./lib/workload/stateless/stacks/pg-dd
dockerfile: Dockerfile
volumes:
# Store the dumps to the local filesystem.
- ./lib/workload/stateless/stacks/pg-dd/data:/app/data
depends_on:
# Depends on migration from all services, so they must be started first.
- db
- metadata-manager
- workflow-manager
- sequence-run-manager
- filemanager
command: ${PG_DD_COMMAND:-load}
environment:
- PG_DD_URL=postgresql://orcabus:orcabus@db:5432
- PG_DD_DIR=data
- PG_DD_BUCKET=orcabus-test-data-843407916570-ap-southeast-2
- PG_DD_PREFIX=pg-dd

- PG_DD_DATABASE_METADATA_MANAGER=metadata_manager
- PG_DD_DATABASE_SEQUENCE_RUN_MANAGER=sequence_run_manager
- PG_DD_DATABASE_WORKFLOW_MANAGER=workflow_manager
- PG_DD_DATABASE_FILEMANAGER=filemanager
- PG_DD_DATABASE_FILEMANAGER_SQL_DUMP=select * from s3_object order by sequencer limit 10000
- PG_DD_DATABASE_FILEMANAGER_SQL_LOAD=s3_object

- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-access_key_id}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-secret_access_key}
- AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION:-ap-southeast-2}
- AWS_SESSION_TOKEN=${AWS_SESSION_TOKEN:-session_token}
restart: no
6 changes: 4 additions & 2 deletions lib/workload/stateless/stacks/pg-dd/.env.example
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
PG_DD_URL=postgresql://orcabus:[email protected]:5432# pragma: allowlist secret
PG_DD_DIR=data
PG_DD_BUCKET=bucket
PG_DD_PREFIX=pg-dd

PG_DD_DATABASE_METADATA_MANAGER=metadata_manager
PG_DD_DATABASE_SEQUENCE_RUN_MANAGER=sequence_run_manager
PG_DD_DATABASE_WORKFLOW_MANAGER=workflow_manager
PG_DD_DATABASE_FILEMANAGER=filemanager
PG_DD_DATABASE_FILEMANAGER_SQL_DUMP="select * from s3_object order by sequencer limit 10000"
PG_DD_DATABASE_FILEMANAGER_SQL_LOAD="s3_object"
PG_DD_DATABASE_FILEMANAGER_SQL_DUMP='select * from s3_object order by sequencer limit 10000'
PG_DD_DATABASE_FILEMANAGER_SQL_LOAD=s3_object
27 changes: 22 additions & 5 deletions lib/workload/stateless/stacks/pg-dd/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,31 @@
# When running this microservice from the Docker Compose root, this Dockerfile
# will build the image, install dependencies, and start the server

FROM public.ecr.aws/docker/library/python:3.12
FROM public.ecr.aws/docker/library/python:3.12-slim AS builder

ARG POETRY_VERSION=1.8
RUN pip install "poetry==${POETRY_VERSION}"
ENV POETRY_HOME=/opt/poetry
ENV POETRY_NO_INTERACTION=1
ENV POETRY_VIRTUALENVS_IN_PROJECT=1
ENV POETRY_VIRTUALENVS_CREATE=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV POETRY_CACHE_DIR=/opt/.cache

RUN pip install poetry

WORKDIR /app

COPY pyproject.toml poetry.lock /app/
RUN poetry install --no-root && rm -rf $POETRY_CACHE_DIR

FROM public.ecr.aws/docker/library/python:3.12-slim AS runtime

WORKDIR /app

ENV VIRTUAL_ENV=/app/.venv
ENV PATH="/app/.venv/bin:$PATH"

COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV}
COPY . .
RUN poetry install --no-root

ENTRYPOINT ["/bin/bash", "-c", "make", "local"]
ENTRYPOINT ["python", "-m", "pg_dd.cli"]
4 changes: 3 additions & 1 deletion lib/workload/stateless/stacks/pg-dd/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
.PHONY: *

COMMAND ?= "load --exists-ok"

install:
@poetry update

Expand All @@ -10,7 +12,7 @@ check: lint
@poetry run ruff check .

cli: install
@poetry run cli
@poetry run cli $(COMMAND)

clean:
rm -rf data && rm -rf .ruff_cache
16 changes: 10 additions & 6 deletions lib/workload/stateless/stacks/pg-dd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ aws lambda invoke --function-name orcabus-pg-dd response.json
This is setup to dump the metadata_manager, workflow_manager, sequence_run_manager, and 10000 of the most recent
rows of the filemanager database.

This command can also be run locally, for example:

```
make cli COMMAND="--help"
```

The `Dockerfile` is setup to launch with the top-level `Makefile`, which also contains commands for running the CLI.
By default `start-all-service` will run a `load` on the local database, downloading any dumps from S3 along the way.
AWS credentials must be present in the shell for this to work.

## Configuration

This function can be configured by setting the following environment variables, see [.env.example][env-example] for an example:
Expand All @@ -32,12 +42,6 @@ This function can be configured by setting the following environment variables,

This project uses [poetry] to manage dependencies.

The pg-dd command can be run locally to dump data to a directory:

```
make cli
```

Run the linter and formatter:

```
Expand Down
38 changes: 30 additions & 8 deletions lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,64 @@
import logging

from pg_dd.pg_dd import PgDDLocal, PgDDS3
import click

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


@click.group()
def cli():
pass


@cli.command()
def download():
@click.option(
"--exists-ok/--no-exists-ok",
default=True,
help="If the file already exists, do not download it.",
)
def download(exists_ok):
"""
Download S3 CSV dumps to the local directory.
:return:
"""
PgDDS3().download_local()
PgDDS3(logger=logger).download_local(exists_ok)


@cli.command()
def upload():
"""
Uploads local CSV dumps to S3.
"""
PgDDS3().write_to_bucket()
PgDDS3(logger=logger).write_to_bucket()


@cli.command()
def dump():
@click.option(
"--database", help="Specify the database to dump, dumps all databases by default."
)
def dump(database):
"""
Dump from the local database to CSV files.
"""
PgDDLocal().write_to_dir()
PgDDLocal(logger=logger).write_to_dir(database)


@cli.command()
def load():
@click.option(
"--download-exists-ok/--no-download-exists-ok",
default=True,
help="Download the CSV files from S3 if they are not already in the local directory.",
)
def load(download_exists_ok):
"""
Load local CSV files into the database
Load local CSV files into the database.
"""
PgDDLocal().load_to_database()
if download_exists_ok:
PgDDS3(logger=logger).download_local(download_exists_ok)

PgDDLocal(logger=logger).load_to_database()


if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions lib/workload/stateless/stacks/pg-dd/pg_dd/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pg_dd.pg_dd import PgDDS3

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

try:
secret_str = libsm.get_secret(os.getenv("PG_DD_SECRET"))
Expand Down
43 changes: 30 additions & 13 deletions lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,23 @@ class PgDD:
A class to dump postgres databases to CSV files.
"""

def __init__(self, logger=logging.getLogger(__name__)):
def __init__(self, logger: logging.Logger = logging.getLogger(__name__)):
self.url = os.getenv("PG_DD_URL")
self.databases = self.read_databases()
self.logger = logger

def csvs_for_tables(self) -> Dict[str, Dict[str, str]]:
def csvs_for_tables(self, db: str = None) -> Dict[str, Dict[str, str]]:
"""
Get csvs for all tables in all databases.
"""

databases = {}
for entry in self.databases.values():
url = f"{self.url}/{entry['database']}"
database = entry["database"]
if db is not None and db != database:
continue

url = f"{self.url}/{database}"

conn: psycopg.connection.Connection
with psycopg.connect(url) as conn:
Expand All @@ -52,7 +56,13 @@ def csvs_for_tables(self) -> Dict[str, Dict[str, str]]:

return databases

def load_table(self, table, data, conn, only_non_empty=True):
def load_table(
self,
table: str,
data: str,
conn: psycopg.connection.Connection,
only_non_empty: bool = True,
):
"""
Load a table with the CSV data.
"""
Expand Down Expand Up @@ -88,13 +98,13 @@ def load_table(self, table, data, conn, only_non_empty=True):
) as copy:
copy.write(data)

def target_files(self) -> List[Tuple[str, str, str, str]]:
def target_files(self, db: str = None) -> List[Tuple[str, str, str, str]]:
"""
Get the target files for all directories.
"""

files = []
for database, tables in self.csvs_for_tables().items():
for database, tables in self.csvs_for_tables(db).items():
for table, value in tables.items():
file = f"{database}/{table}.csv.gz"
files += [(database, table, file, value)]
Expand Down Expand Up @@ -174,19 +184,19 @@ class PgDDLocal(PgDD):
Dump CSV files to a local directory.
"""

def __init__(self, logger=logging.getLogger(__name__)):
def __init__(self, logger: logging.Logger = logging.getLogger(__name__)):
super().__init__(logger=logger)
self.out = os.getenv("PG_DD_DIR")
self.bucket = os.getenv("PG_DD_BUCKET")
self.prefix = os.getenv("PG_DD_PREFIX")
self.s3: S3ServiceResource = boto3.resource("s3")

def write_to_dir(self):
def write_to_dir(self, db: str = None):
"""
Write the CSV files to the output directory.
"""

for _, _, f, value in self.target_files():
for _, _, f, value in self.target_files(db):
file = f"{self.out}/{f}"
os.makedirs(file.rsplit("/", 1)[0], exist_ok=True)
self.logger.info(f"writing to file: {f}")
Expand Down Expand Up @@ -215,7 +225,10 @@ def load_files():
for _, dirs, _ in os.walk(self.out):
for database in dirs:
conn: psycopg.connection.Connection
with psycopg.connect(f"{self.url}/{database}") as conn:
url = f"{self.url}/{database}"
with psycopg.connect(url) as conn:
self.logger.info(f"connecting to: {url}")

conn.set_deferrable(True)
load_files()
conn.commit()
Expand All @@ -226,7 +239,7 @@ class PgDDS3(PgDD):
Commands related to running this inside a Lambda function.
"""

def __init__(self, logger=logging.getLogger(__name__)):
def __init__(self, logger: logging.Logger = logging.getLogger(__name__)):
super().__init__(logger=logger)
self.bucket = os.getenv("PG_DD_BUCKET")
self.prefix = os.getenv("PG_DD_PREFIX")
Expand All @@ -247,7 +260,7 @@ def write_to_bucket(self):
s3_object = self.s3.Object(self.bucket, key)
s3_object.put(Body=gzip.compress(str.encode(value)))

def download_local(self):
def download_local(self, exists_ok: bool = True):
"""
Download from S3 CSV files to load.
"""
Expand All @@ -257,5 +270,9 @@ def download_local(self):
file = f"{self.dir}/{f}"
os.makedirs(file.rsplit("/", 1)[0], exist_ok=True)

if exists_ok and os.path.exists(file):
self.logger.info(f"file already exists: {f}")
continue

s3_object = self.s3.Object(self.bucket, f"{self.prefix}/{f}")
s3_object.download_file(f"{self.dir}/{f}")
s3_object.download_file(file)
Loading

0 comments on commit 69421a1

Please sign in to comment.