Skip to content

Commit

Permalink
fix(pg-dd): update filemanager Makefile
Browse files Browse the repository at this point in the history
  • Loading branch information
mmalenic committed Nov 20, 2024
1 parent 69421a1 commit 05d3737
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 26 deletions.
9 changes: 4 additions & 5 deletions lib/workload/stateless/stacks/filemanager/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,17 @@ restore:
@docker compose exec -T postgres pg_restore -U filemanager -d filemanager

## Targets related to top-level database management and S3.
apply-schema:
@for file in database/migrations/*; do \
docker exec -e PGPASSWORD=orcabus -it orcabus_db psql -h $(FILEMANAGER_DATABASE_HOST) -U orcabus -d filemanager -c "$$(cat $$file)"; \
done
reset-db:
@docker exec -e PGPASSWORD=orcabus -it orcabus_db psql -h $(FILEMANAGER_DATABASE_HOST) -U orcabus -d orcabus -c "DROP DATABASE IF EXISTS filemanager;" && \
docker exec -e PGPASSWORD=orcabus -it orcabus_db psql -h $(FILEMANAGER_DATABASE_HOST) -U orcabus -d orcabus -c "CREATE DATABASE filemanager;"
for file in database/migrations/*; do \
docker exec -e PGPASSWORD=orcabus -it orcabus_db psql -h $(FILEMANAGER_DATABASE_HOST) -U orcabus -d filemanager -c "$$(cat $$file)"; \
done
s3-dump-upload:
@aws s3 cp data/fm_s3_objects_100000.csv.gz s3://orcabus-test-data-843407916570-ap-southeast-2/file-manager/fm_s3_objects_100000.csv.gz
s3-dump-download:
@aws s3 cp s3://orcabus-test-data-843407916570-ap-southeast-2/file-manager/fm_s3_objects_100000.csv.gz data/fm_s3_objects_100000.csv.gz
db-load-data: reset-db apply-schema
db-load-data: reset-db
@gunzip -c data/fm_s3_objects_100000.csv.gz | \
docker exec -i orcabus_db psql -U orcabus -d filemanager -c "copy s3_object from stdin with (format csv, header);"
s3-dump-download-if-not-exists:
Expand Down
4 changes: 2 additions & 2 deletions lib/workload/stateless/stacks/pg-dd/.env.example
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
PG_DD_URL=postgresql://orcabus:[email protected]:5432# pragma: allowlist secret
PG_DD_DIR=data
PG_DD_BUCKET=bucket
PG_DD_BUCKET=orcabus-test-data-843407916570-ap-southeast-2
PG_DD_PREFIX=pg-dd

PG_DD_DATABASE_METADATA_MANAGER=metadata_manager
PG_DD_DATABASE_SEQUENCE_RUN_MANAGER=sequence_run_manager
PG_DD_DATABASE_WORKFLOW_MANAGER=workflow_manager
PG_DD_DATABASE_FILEMANAGER=filemanager
PG_DD_DATABASE_FILEMANAGER_SQL_DUMP='select * from s3_object order by sequencer limit 10000'
PG_DD_DATABASE_FILEMANAGER_SQL_LOAD=s3_object
PG_DD_DATABASE_FILEMANAGER_SQL_LOAD=s3_object
2 changes: 1 addition & 1 deletion lib/workload/stateless/stacks/pg-dd/.gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.env
data
.ruff_cache
response.json
response.json
2 changes: 1 addition & 1 deletion lib/workload/stateless/stacks/pg-dd/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ ENV PATH="/app/.venv/bin:$PATH"
COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV}
COPY . .

ENTRYPOINT ["python", "-m", "pg_dd.cli"]
ENTRYPOINT ["python", "-m", "pg_dd.cli"]
2 changes: 1 addition & 1 deletion lib/workload/stateless/stacks/pg-dd/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ cli: install
@poetry run cli $(COMMAND)

clean:
rm -rf data && rm -rf .ruff_cache
rm -rf data && rm -rf .ruff_cache
10 changes: 8 additions & 2 deletions lib/workload/stateless/stacks/pg-dd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@ aws lambda invoke --function-name orcabus-pg-dd response.json
This is setup to dump the metadata_manager, workflow_manager, sequence_run_manager, and 10000 of the most recent
rows of the filemanager database.

This command can also be run locally, for example:
This command can be run locally using make, or by running `poetry` directly:

```
```sh
make cli COMMAND="--help"
```

For example, to dump and upload a specific database to s3:

```sh
poetry run cli dump --database metadata_manager && poetry run cli upload
```

The `Dockerfile` is setup to launch with the top-level `Makefile`, which also contains commands for running the CLI.
By default `start-all-service` will run a `load` on the local database, downloading any dumps from S3 along the way.
AWS credentials must be present in the shell for this to work.
Expand Down
18 changes: 13 additions & 5 deletions lib/workload/stateless/stacks/pg-dd/pg_dd/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,20 @@ def cli():
def download(exists_ok):
"""
Download S3 CSV dumps to the local directory.
:return:
"""
PgDDS3(logger=logger).download_local(exists_ok)


@cli.command()
def upload():
@click.option(
"--database",
help="Specify the database to upload, uploads all databases by default.",
)
def upload(database):
"""
Uploads local CSV dumps to S3.
"""
PgDDS3(logger=logger).write_to_bucket()
PgDDS3(logger=logger).write_to_bucket(database)


@cli.command()
Expand All @@ -51,14 +54,19 @@ def dump(database):
default=True,
help="Download the CSV files from S3 if they are not already in the local directory.",
)
def load(download_exists_ok):
@click.option(
"--only-empty/--no-only-empty",
default=True,
help="Only load into tables that are empty and exist in the database.",
)
def load(download_exists_ok, only_empty):
"""
Load local CSV files into the database.
"""
if download_exists_ok:
PgDDS3(logger=logger).download_local(download_exists_ok)

PgDDLocal(logger=logger).load_to_database()
PgDDLocal(logger=logger).load_to_database(only_empty)


if __name__ == "__main__":
Expand Down
25 changes: 16 additions & 9 deletions lib/workload/stateless/stacks/pg-dd/pg_dd/pg_dd.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ def load_table(
table: str,
data: str,
conn: psycopg.connection.Connection,
only_non_empty: bool = True,
only_empty: bool = True,
):
"""
Load a table with the CSV data.
"""

with conn.cursor() as cur:
if only_non_empty:
if only_empty:
exists = cur.execute(
sql.SQL(
"""
Expand Down Expand Up @@ -201,10 +201,10 @@ def write_to_dir(self, db: str = None):
os.makedirs(file.rsplit("/", 1)[0], exist_ok=True)
self.logger.info(f"writing to file: {f}")

with open(file, "wb") as f:
f.write(gzip.compress(str.encode(value)))
with open(file, "wb") as file:
file.write(gzip.compress(str.encode(value)))

def load_to_database(self):
def load_to_database(self, only_empty: bool = True):
"""
Download from S3 CSV files to load.
"""
Expand All @@ -219,7 +219,10 @@ def load_files():
table = load[int(table)]

self.load_table(
table, gzip.decompress(f.read()).decode("utf-8"), conn
table,
gzip.decompress(f.read()).decode("utf-8"),
conn,
only_empty,
)

for _, dirs, _ in os.walk(self.out):
Expand All @@ -246,12 +249,12 @@ def __init__(self, logger: logging.Logger = logging.getLogger(__name__)):
self.dir = os.getenv("PG_DD_DIR")
self.s3: S3ServiceResource = boto3.resource("s3")

def write_to_bucket(self):
def write_to_bucket(self, db: str = None):
"""
Write the CSV files to the S3 bucket.
"""

for _, _, key, value in self.target_files():
for _, _, key, value in self.target_files(db):
if self.prefix:
key = f"{self.prefix}/{key}"

Expand All @@ -275,4 +278,8 @@ def download_local(self, exists_ok: bool = True):
continue

s3_object = self.s3.Object(self.bucket, f"{self.prefix}/{f}")
s3_object.download_file(file)
try:
s3_object.download_file(file)
except Exception as e:
self.logger.info(f"could not find file: {e}")
continue

0 comments on commit 05d3737

Please sign in to comment.