From d3ce6917212a50802cc2babceaeb7dbe328ee219 Mon Sep 17 00:00:00 2001 From: Simo Tukiainen Date: Wed, 30 Oct 2024 10:59:05 +0200 Subject: [PATCH] Separate filename and s3key --- src/processing/fetch.py | 2 +- src/processing/instrument.py | 1 + src/processing/jobs.py | 7 +++++-- src/processing/model.py | 16 +++++++++------- src/processing/processor.py | 5 ++++- src/processing/product.py | 8 ++++++-- src/processing/storage_api.py | 5 +++-- 7 files changed, 29 insertions(+), 15 deletions(-) diff --git a/src/processing/fetch.py b/src/processing/fetch.py index d007ba03..abe2d5c1 100644 --- a/src/processing/fetch.py +++ b/src/processing/fetch.py @@ -240,7 +240,7 @@ def _submit_file(filename: Path, row: dict) -> str: bucket = "cloudnet-product-volatile" if row["volatile"] else "cloudnet-product" if row["legacy"]: bucket = f"{bucket}/legacy" - ss_url = f"{STORAGE_SERVICE_URL}/{bucket}/{row['filename']}" + ss_url = f"{STORAGE_SERVICE_URL}/{bucket}/{row['uuid']}/{row['filename']}" ss_body = filename.read_bytes() ss_headers = {"Content-MD5": utils.md5sum(filename, is_base64=True)} ss_res = requests.put( diff --git a/src/processing/instrument.py b/src/processing/instrument.py index 4a2942db..ae014794 100644 --- a/src/processing/instrument.py +++ b/src/processing/instrument.py @@ -65,6 +65,7 @@ def process_instrument(processor: Processor, params: InstrumentParams, directory processor.upload_file( params, new_file, + std_uuid.UUID(uuid.product), filename, volatile, patch, diff --git a/src/processing/jobs.py b/src/processing/jobs.py index d75fa7e8..486de933 100644 --- a/src/processing/jobs.py +++ b/src/processing/jobs.py @@ -50,7 +50,7 @@ def freeze(processor: Processor, params: ProcessParams, directory: Path) -> None if params.product.experimental: raise utils.SkipTaskError("Product is experimental") logging.info(f"Freezing product: {metadata['uuid']}") - s3key = ( + filename = ( f"legacy/{metadata['filename']}" if metadata["legacy"] else metadata["filename"] ) if metadata["pid"]: @@ -63,7 +63,9 @@ def freeze(processor: Processor, params: ProcessParams, directory: Path) -> None full_path, pid=existing_pid ) if uuid.UUID(file_uuid) != uuid.UUID(metadata["uuid"]): - msg = f"File {s3key} UUID mismatch (DB: {metadata['uuid']}, File: {file_uuid})" + msg = ( + f"File {filename} UUID mismatch (DB: {metadata['uuid']}, File: {file_uuid})" + ) raise ValueError(msg) if metadata["volatile"] and metadata["pid"]: msg = f"Removing volatile status of {url}" @@ -73,6 +75,7 @@ def freeze(processor: Processor, params: ProcessParams, directory: Path) -> None msg = f"Minting PID {pid} to URL {url}" logging.info(msg) + s3key = f"{file_uuid}/{filename}" response_data = processor.storage_api.upload_product( full_path, s3key, volatile=volatile ) diff --git a/src/processing/model.py b/src/processing/model.py index 24f59d7d..da683bc3 100644 --- a/src/processing/model.py +++ b/src/processing/model.py @@ -1,5 +1,5 @@ import logging -import uuid +import uuid as std_uuid from pathlib import Path from processing import nc_header_augmenter, utils @@ -21,7 +21,7 @@ def process_model(processor: Processor, params: ModelParams, directory: Path): if not file_meta["volatile"]: logging.warning("Stable model file found. Replacing...") volatile = False - product_uuid = uuid.UUID(file_meta["uuid"]) + product_uuid = std_uuid.UUID(file_meta["uuid"]) filename = file_meta["filename"] else: product_uuid = _generate_uuid() @@ -38,7 +38,9 @@ def process_model(processor: Processor, params: ModelParams, directory: Path): volatile_pid = file_meta["pid"] processor.pid_utils.add_pid_to_file(output_path, pid=volatile_pid) - processor.upload_file(params, output_path, filename, volatile, patch=True) + processor.upload_file( + params, output_path, product_uuid, filename, volatile, patch=True + ) if "hidden" in params.site.types: logging.info("Skipping plotting for hidden site") else: @@ -52,8 +54,8 @@ def process_model(processor: Processor, params: ModelParams, directory: Path): raise SkipTaskError(err.message) from err -def _generate_uuid() -> uuid.UUID: - return uuid.uuid4() +def _generate_uuid() -> std_uuid.UUID: + return std_uuid.uuid4() def _generate_filename(params: ModelParams) -> str: @@ -66,7 +68,7 @@ def _generate_filename(params: ModelParams) -> str: def _harmonize_model( - params: ModelParams, input_path: Path, output_path: Path, uuid: uuid.UUID + params: ModelParams, input_path: Path, output_path: Path, uuid: std_uuid.UUID ): data = { "site_name": params.site.id, @@ -80,7 +82,7 @@ def _harmonize_model( nc_header_augmenter.harmonize_model_file(data) -def _print_info(file_uuid: uuid.UUID, qc_result: str | None = None) -> None: +def _print_info(file_uuid: std_uuid.UUID, qc_result: str | None = None) -> None: link = utils.build_file_landing_page_url(str(file_uuid)) qc_str = f" QC: {qc_result.upper()}" if qc_result is not None else "" logging.info(f"Updated model: {link}{qc_str}") diff --git a/src/processing/processor.py b/src/processing/processor.py index 915ae8a7..31f5c201 100644 --- a/src/processing/processor.py +++ b/src/processing/processor.py @@ -298,10 +298,12 @@ def upload_file( self, params: ProcessParams, full_path: Path, - s3key: str, + uuid: uuid.UUID, + filename: str, volatile: bool, patch: bool, ): + s3key = f"{uuid}/{filename}" file_info = self.storage_api.upload_product(full_path, s3key, volatile) payload = utils.create_product_put_payload( full_path, @@ -309,6 +311,7 @@ def upload_file( volatile, site=params.site.id, ) + payload["filename"] = filename if isinstance(params, ModelParams) and "evaluation" not in params.product.type: payload["model"] = params.model_id elif isinstance(params, InstrumentParams): diff --git a/src/processing/product.py b/src/processing/product.py index c7944f0e..dae0e623 100644 --- a/src/processing/product.py +++ b/src/processing/product.py @@ -58,7 +58,9 @@ def process_me(processor: Processor, params: ModelParams, directory: Path): nc.file_uuid = existing_product["uuid"] uuid.product = existing_product["uuid"] - processor.upload_file(params, new_file, filename, volatile, patch) + processor.upload_file( + params, new_file, std_uuid.UUID(uuid.product), filename, volatile, patch + ) processor.create_and_upload_l3_images( new_file, params.product.id, @@ -121,7 +123,9 @@ def process_product(processor: Processor, params: ProductParams, directory: Path nc.file_uuid = existing_product["uuid"] uuid.product = existing_product["uuid"] - processor.upload_file(params, new_file, filename, volatile, patch) + processor.upload_file( + params, new_file, std_uuid.UUID(uuid.product), filename, volatile, patch + ) processor.create_and_upload_images( new_file, params.product.id, diff --git a/src/processing/storage_api.py b/src/processing/storage_api.py index 8b2a29fc..9449903e 100644 --- a/src/processing/storage_api.py +++ b/src/processing/storage_api.py @@ -33,7 +33,7 @@ def __init__(self, config: Config, session: requests.Session): def upload_product( self, full_path: PathLike | str, s3key: str, volatile: bool ) -> dict: - """Upload a processed Cloudnet file.""" + """Upload a processed Cloudnet file to correct bucket.""" bucket = _get_product_bucket(volatile) headers = self._get_headers(full_path) url = f"{self._url}/{bucket}/{s3key}" @@ -78,13 +78,14 @@ def download_products( ) def delete_volatile_product(self, s3key: str) -> requests.Response: - """Delete a volatile product.""" + """Delete a volatile product from volatile bucket.""" bucket = _get_product_bucket(volatile=True) url = f"{self._url}/{bucket}/{s3key}" res = self.session.delete(url, auth=self._auth) return res def upload_image(self, full_path: str | PathLike, s3key: str) -> None: + """Upload an image to the Cloudnet image bucket.""" url = f"{self._url}/cloudnet-img/{s3key}" headers = self._get_headers(full_path) self._put(url, full_path, headers=headers)