Skip to content

Commit

Permalink
Separate filename and s3key
Browse files Browse the repository at this point in the history
  • Loading branch information
tukiains committed Nov 25, 2024
1 parent d0d9b8d commit d3ce691
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/processing/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def _submit_file(filename: Path, row: dict) -> str:
bucket = "cloudnet-product-volatile" if row["volatile"] else "cloudnet-product"
if row["legacy"]:
bucket = f"{bucket}/legacy"
ss_url = f"{STORAGE_SERVICE_URL}/{bucket}/{row['filename']}"
ss_url = f"{STORAGE_SERVICE_URL}/{bucket}/{row['uuid']}/{row['filename']}"
ss_body = filename.read_bytes()
ss_headers = {"Content-MD5": utils.md5sum(filename, is_base64=True)}
ss_res = requests.put(
Expand Down
1 change: 1 addition & 0 deletions src/processing/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def process_instrument(processor: Processor, params: InstrumentParams, directory
processor.upload_file(
params,
new_file,
std_uuid.UUID(uuid.product),
filename,
volatile,
patch,
Expand Down
7 changes: 5 additions & 2 deletions src/processing/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def freeze(processor: Processor, params: ProcessParams, directory: Path) -> None
if params.product.experimental:
raise utils.SkipTaskError("Product is experimental")
logging.info(f"Freezing product: {metadata['uuid']}")
s3key = (
filename = (
f"legacy/{metadata['filename']}" if metadata["legacy"] else metadata["filename"]
)
if metadata["pid"]:
Expand All @@ -63,7 +63,9 @@ def freeze(processor: Processor, params: ProcessParams, directory: Path) -> None
full_path, pid=existing_pid
)
if uuid.UUID(file_uuid) != uuid.UUID(metadata["uuid"]):
msg = f"File {s3key} UUID mismatch (DB: {metadata['uuid']}, File: {file_uuid})"
msg = (
f"File {filename} UUID mismatch (DB: {metadata['uuid']}, File: {file_uuid})"
)
raise ValueError(msg)
if metadata["volatile"] and metadata["pid"]:
msg = f"Removing volatile status of {url}"
Expand All @@ -73,6 +75,7 @@ def freeze(processor: Processor, params: ProcessParams, directory: Path) -> None
msg = f"Minting PID {pid} to URL {url}"
logging.info(msg)

s3key = f"{file_uuid}/{filename}"
response_data = processor.storage_api.upload_product(
full_path, s3key, volatile=volatile
)
Expand Down
16 changes: 9 additions & 7 deletions src/processing/model.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
import uuid
import uuid as std_uuid
from pathlib import Path

from processing import nc_header_augmenter, utils
Expand All @@ -21,7 +21,7 @@ def process_model(processor: Processor, params: ModelParams, directory: Path):
if not file_meta["volatile"]:
logging.warning("Stable model file found. Replacing...")
volatile = False
product_uuid = uuid.UUID(file_meta["uuid"])
product_uuid = std_uuid.UUID(file_meta["uuid"])
filename = file_meta["filename"]
else:
product_uuid = _generate_uuid()
Expand All @@ -38,7 +38,9 @@ def process_model(processor: Processor, params: ModelParams, directory: Path):
volatile_pid = file_meta["pid"]
processor.pid_utils.add_pid_to_file(output_path, pid=volatile_pid)

processor.upload_file(params, output_path, filename, volatile, patch=True)
processor.upload_file(
params, output_path, product_uuid, filename, volatile, patch=True
)
if "hidden" in params.site.types:
logging.info("Skipping plotting for hidden site")
else:
Expand All @@ -52,8 +54,8 @@ def process_model(processor: Processor, params: ModelParams, directory: Path):
raise SkipTaskError(err.message) from err


def _generate_uuid() -> uuid.UUID:
return uuid.uuid4()
def _generate_uuid() -> std_uuid.UUID:
return std_uuid.uuid4()


def _generate_filename(params: ModelParams) -> str:
Expand All @@ -66,7 +68,7 @@ def _generate_filename(params: ModelParams) -> str:


def _harmonize_model(
params: ModelParams, input_path: Path, output_path: Path, uuid: uuid.UUID
params: ModelParams, input_path: Path, output_path: Path, uuid: std_uuid.UUID
):
data = {
"site_name": params.site.id,
Expand All @@ -80,7 +82,7 @@ def _harmonize_model(
nc_header_augmenter.harmonize_model_file(data)


def _print_info(file_uuid: uuid.UUID, qc_result: str | None = None) -> None:
def _print_info(file_uuid: std_uuid.UUID, qc_result: str | None = None) -> None:
link = utils.build_file_landing_page_url(str(file_uuid))
qc_str = f" QC: {qc_result.upper()}" if qc_result is not None else ""
logging.info(f"Updated model: {link}{qc_str}")
5 changes: 4 additions & 1 deletion src/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,17 +298,20 @@ def upload_file(
self,
params: ProcessParams,
full_path: Path,
s3key: str,
uuid: uuid.UUID,
filename: str,
volatile: bool,
patch: bool,
):
s3key = f"{uuid}/{filename}"
file_info = self.storage_api.upload_product(full_path, s3key, volatile)
payload = utils.create_product_put_payload(
full_path,
file_info,
volatile,
site=params.site.id,
)
payload["filename"] = filename
if isinstance(params, ModelParams) and "evaluation" not in params.product.type:
payload["model"] = params.model_id
elif isinstance(params, InstrumentParams):
Expand Down
8 changes: 6 additions & 2 deletions src/processing/product.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ def process_me(processor: Processor, params: ModelParams, directory: Path):
nc.file_uuid = existing_product["uuid"]
uuid.product = existing_product["uuid"]

processor.upload_file(params, new_file, filename, volatile, patch)
processor.upload_file(
params, new_file, std_uuid.UUID(uuid.product), filename, volatile, patch
)
processor.create_and_upload_l3_images(
new_file,
params.product.id,
Expand Down Expand Up @@ -121,7 +123,9 @@ def process_product(processor: Processor, params: ProductParams, directory: Path
nc.file_uuid = existing_product["uuid"]
uuid.product = existing_product["uuid"]

processor.upload_file(params, new_file, filename, volatile, patch)
processor.upload_file(
params, new_file, std_uuid.UUID(uuid.product), filename, volatile, patch
)
processor.create_and_upload_images(
new_file,
params.product.id,
Expand Down
5 changes: 3 additions & 2 deletions src/processing/storage_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(self, config: Config, session: requests.Session):
def upload_product(
self, full_path: PathLike | str, s3key: str, volatile: bool
) -> dict:
"""Upload a processed Cloudnet file."""
"""Upload a processed Cloudnet file to correct bucket."""
bucket = _get_product_bucket(volatile)
headers = self._get_headers(full_path)
url = f"{self._url}/{bucket}/{s3key}"
Expand Down Expand Up @@ -78,13 +78,14 @@ def download_products(
)

def delete_volatile_product(self, s3key: str) -> requests.Response:
"""Delete a volatile product."""
"""Delete a volatile product from volatile bucket."""
bucket = _get_product_bucket(volatile=True)
url = f"{self._url}/{bucket}/{s3key}"
res = self.session.delete(url, auth=self._auth)
return res

def upload_image(self, full_path: str | PathLike, s3key: str) -> None:
"""Upload an image to the Cloudnet image bucket."""
url = f"{self._url}/cloudnet-img/{s3key}"
headers = self._get_headers(full_path)
self._put(url, full_path, headers=headers)
Expand Down

0 comments on commit d3ce691

Please sign in to comment.