From e65483fd6c7002dc031653c6d9ef4ecb00e64038 Mon Sep 17 00:00:00 2001 From: ugyballoons Date: Tue, 23 Jul 2024 12:03:22 +0100 Subject: [PATCH] Add night reports to mocking Also remove redundant clean-up method. --- tests/conftest.py | 1 - tests/mockdata.py | 143 ++++++++++++++++++++++++++-------------------- 2 files changed, 81 insertions(+), 63 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 47cd5cc7..e99c0467 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -40,7 +40,6 @@ async def mocked_app( app = create_app() async with LifespanManager(app): yield app, mocker - mocker.cleanup() @pytest_asyncio.fixture(scope="function") diff --git a/tests/mockdata.py b/tests/mockdata.py index 4f09f790..e3d3889d 100644 --- a/tests/mockdata.py +++ b/tests/mockdata.py @@ -1,21 +1,21 @@ import json import logging import random -from datetime import date, timedelta +from datetime import date from itertools import chain from pathlib import Path import boto3 from botocore.exceptions import ClientError -from lsst.ts.rubintv.models.models import Channel, Event, get_current_day_obs +from lsst.ts.rubintv.models.models import ( + Channel, + Event, + NightReportData, + get_current_day_obs, +) from lsst.ts.rubintv.models.models_helpers import find_first from lsst.ts.rubintv.models.models_init import Camera, Location -today = get_current_day_obs() -the_past = today - timedelta(days=100) -_metadata = {f"col{n}": "dummy" for n in range(1, 6)} -md_json = json.dumps(_metadata) - class RubinDataMocker: FIRST_SEQ = 0 @@ -23,7 +23,7 @@ class RubinDataMocker: def __init__( self, locations: list[Location], - day_obs: date = today, + day_obs: date = get_current_day_obs(), s3_required: bool = False, ) -> None: """ @@ -52,19 +52,26 @@ def __init__( self.empty_channel: dict[str, str] = {} self.events: dict[str, list[Event]] = {} self.metadata: dict[str, dict[str, str]] = {} - self.mock_up_data() - def cleanup(self) -> None: - print("Started cleaning...") - self.delete_buckets() - self.last_seq = {} - self._locations = [] - self.s3_required = False - self.location_channels = {} - self.seq_objs = {} - self.events = {} - self.metadata = {} - print("Finished cleaning...") + metadata = {f"col{n}": "dummy" for n in range(1, 6)} + self.md_json = json.dumps(metadata) + + self.nr_text = r""" + This is line one. + This is line two. + This spacing is intentional. + """ + + self.nr_qa_plots = { + "Link to RubinTV githib": "https://github.com/lsst-ts/rubintv", + "Link to LSST": "https://www.lsst.org", + } + + self.nr_groups = ["coverage", "elana", "test"] + self.nr_metadata: dict[str, dict[str, str]] = {} + self.nr_plots: dict[str, list[NightReportData]] = {} + + self.mock_up_data() def delete_buckets(self) -> None: if self.s3_required: @@ -159,6 +166,7 @@ def add_seq_objs(self, location: Location, camera: Camera) -> None: for index in range(start, start + iterations): seq_num = f"{index:06}" + # randomly add "final" sequence for per-day channels if channel.per_day and index == start + iterations - 1: seq_num = random.choice((seq_num, "final")) @@ -167,7 +175,10 @@ def add_seq_objs(self, location: Location, camera: Camera) -> None: ) channel_data.append(event_obj) - self.last_seq[loc_cam_chan] = index + self.last_seq[loc_cam_chan] = index + if seq_num == "final": + print(f"final written for {loc_cam_chan}") + continue # store the objects for testing against if loc_cam in self.seq_objs: @@ -177,39 +188,69 @@ def add_seq_objs(self, location: Location, camera: Camera) -> None: self.seq_objs[loc_cam] = channel_data self.events[loc_cam] = self.dicts_to_events(channel_data) - # TODO: Write the below functions to add to the mocked state + def add_camera_metadata(self, location: Location, camera: Camera) -> dict[str, str]: + """ + Mock metadata for a given camera at a specified location. - def mock_per_day_event(self, location: Location, camera: Camera) -> None: - # channel_data: list[dict[str, str]] = [] - loc_cam = f"{location.name}/{camera.name}" + Parameters + ---------- + location : Location + The location for which the metadata is being mocked. + camera : Camera + The camera for which the metadata is being mocked. - for channel in camera.channels: - loc_cam_chan = f"{loc_cam}/{channel.name}" - start = self.last_seq.get(loc_cam_chan, self.FIRST_SEQ) - start - # TODO: Finish this function + Returns + ------- + dict[str, str] + A dictionary containing mocked metadata. + """ - def mock_night_report_metadata(self) -> None: - pass + key = f"{camera.name}/{self.day_obs}/metadata.json" + if self.s3_required: + self.upload_fileobj(self.md_json, location.bucket_name, key) + metadata = {key: self.md_json} + return metadata - def mock_night_report_plots(self) -> None: - pass + def mock_night_report_metadata(self, location: Location, camera: Camera) -> None: + key = f"{camera.name}/{self.day_obs}/night_report/md.json" + bucket_name = location.bucket_name + text_dict = {} + for idx in range(1, 4): + text_dict[f"text_{idx}"] = self.nr_text + if self.s3_required: + text_json = json.dumps(text_dict) + self.upload_fileobj(text_json, bucket_name, key) + self.nr_metadata[f"{location.name}/{camera.name}"] = text_dict - def generate_event( - self, bucket_name: str, camera_name: str, channel_name: str, seq_num: str - ) -> dict[str, str]: - day_obs = self.day_obs - key = f"{camera_name}/{day_obs}/{channel_name}/{seq_num}/mocked_event.jpg" + def mock_night_report_plots(self, location: Location, camera: Camera) -> None: + bucket_name = location.bucket_name + loc_cam = f"{location.name}/{camera.name}" + self.nr_plots[loc_cam] = [] + for group in self.nr_groups: + for idx in range(1, 2): + key = f"{camera.name}/{self.day_obs}/night_report/{group}/{group}_test-img0{idx}.jpg" + hash = self.upload_image(bucket_name, key, "assets/testcard_f.jpg") + self.nr_plots[loc_cam].append(NightReportData(key=key, hash=hash)) + + def upload_image(self, bucket_name: str, key: str, img_rel_path: str) -> str: hash: str | None = None if self.s3_required: if self.upload_file( - Path(__file__).parent / "assets/testcard_f.jpg", + Path(__file__).parent / img_rel_path, bucket_name, key, ): hash = self.get_obj_hash(bucket_name, key) if hash is None: hash = str(random.getrandbits(128)) + return hash + + def generate_event( + self, bucket_name: str, camera_name: str, channel_name: str, seq_num: str + ) -> dict[str, str]: + day_obs = self.day_obs + key = f"{camera_name}/{day_obs}/{channel_name}/{seq_num}/mocked_event.jpg" + hash = self.upload_image(bucket_name, key, "assets/testcard_f.jpg") return {"key": key, "hash": hash} def dicts_to_events(self, channel_dicts: list[dict[str, str]]) -> list[Event]: @@ -251,28 +292,6 @@ async def get_mocked_seq_events(self, location: Location) -> list[Event]: seq_chan_events = [e for e in events if e.channel_name in seq_chan_names] return seq_chan_events - def add_camera_metadata(self, location: Location, camera: Camera) -> dict[str, str]: - """ - Mock metadata for a given camera at a specified location. - - Parameters - ---------- - location : Location - The location for which the metadata is being mocked. - camera : Camera - The camera for which the metadata is being mocked. - - Returns - ------- - dict[str, str] - A dictionary containing mocked metadata. - """ - key = f"{camera.name}/{today}/metadata.json" - metadata = {key: md_json} - if self.s3_required: - self.upload_fileobj(md_json, location.bucket_name, key) - return metadata - def upload_file(self, file_name: Path | str, bucket_name: str, key: str) -> bool: """Upload a file to an S3 bucket.