Skip to content

Commit

Permalink
Add night reports to mocking
Browse files Browse the repository at this point in the history
Also remove redundant clean-up method.
  • Loading branch information
ugyballoons committed Jul 23, 2024
1 parent bf3938d commit e65483f
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 63 deletions.
1 change: 0 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ async def mocked_app(
app = create_app()
async with LifespanManager(app):
yield app, mocker
mocker.cleanup()


@pytest_asyncio.fixture(scope="function")
Expand Down
143 changes: 81 additions & 62 deletions tests/mockdata.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
import json
import logging
import random
from datetime import date, timedelta
from datetime import date
from itertools import chain
from pathlib import Path

import boto3
from botocore.exceptions import ClientError
from lsst.ts.rubintv.models.models import Channel, Event, get_current_day_obs
from lsst.ts.rubintv.models.models import (
Channel,
Event,
NightReportData,
get_current_day_obs,
)
from lsst.ts.rubintv.models.models_helpers import find_first
from lsst.ts.rubintv.models.models_init import Camera, Location

today = get_current_day_obs()
the_past = today - timedelta(days=100)
_metadata = {f"col{n}": "dummy" for n in range(1, 6)}
md_json = json.dumps(_metadata)


class RubinDataMocker:
FIRST_SEQ = 0

def __init__(
self,
locations: list[Location],
day_obs: date = today,
day_obs: date = get_current_day_obs(),
s3_required: bool = False,
) -> None:
"""
Expand Down Expand Up @@ -52,19 +52,26 @@ def __init__(
self.empty_channel: dict[str, str] = {}
self.events: dict[str, list[Event]] = {}
self.metadata: dict[str, dict[str, str]] = {}
self.mock_up_data()

def cleanup(self) -> None:
print("Started cleaning...")
self.delete_buckets()
self.last_seq = {}
self._locations = []
self.s3_required = False
self.location_channels = {}
self.seq_objs = {}
self.events = {}
self.metadata = {}
print("Finished cleaning...")
metadata = {f"col{n}": "dummy" for n in range(1, 6)}
self.md_json = json.dumps(metadata)

self.nr_text = r"""
This is line one.
This is line two.
This spacing is intentional.
"""

self.nr_qa_plots = {
"Link to RubinTV githib": "https://github.com/lsst-ts/rubintv",
"Link to LSST": "https://www.lsst.org",
}

self.nr_groups = ["coverage", "elana", "test"]
self.nr_metadata: dict[str, dict[str, str]] = {}
self.nr_plots: dict[str, list[NightReportData]] = {}

self.mock_up_data()

def delete_buckets(self) -> None:
if self.s3_required:
Expand Down Expand Up @@ -159,6 +166,7 @@ def add_seq_objs(self, location: Location, camera: Camera) -> None:
for index in range(start, start + iterations):
seq_num = f"{index:06}"

# randomly add "final" sequence for per-day channels
if channel.per_day and index == start + iterations - 1:
seq_num = random.choice((seq_num, "final"))

Expand All @@ -167,7 +175,10 @@ def add_seq_objs(self, location: Location, camera: Camera) -> None:
)

channel_data.append(event_obj)
self.last_seq[loc_cam_chan] = index
self.last_seq[loc_cam_chan] = index
if seq_num == "final":
print(f"final written for {loc_cam_chan}")
continue

# store the objects for testing against
if loc_cam in self.seq_objs:
Expand All @@ -177,39 +188,69 @@ def add_seq_objs(self, location: Location, camera: Camera) -> None:
self.seq_objs[loc_cam] = channel_data
self.events[loc_cam] = self.dicts_to_events(channel_data)

# TODO: Write the below functions to add to the mocked state
def add_camera_metadata(self, location: Location, camera: Camera) -> dict[str, str]:
"""
Mock metadata for a given camera at a specified location.
def mock_per_day_event(self, location: Location, camera: Camera) -> None:
# channel_data: list[dict[str, str]] = []
loc_cam = f"{location.name}/{camera.name}"
Parameters
----------
location : Location
The location for which the metadata is being mocked.
camera : Camera
The camera for which the metadata is being mocked.
for channel in camera.channels:
loc_cam_chan = f"{loc_cam}/{channel.name}"
start = self.last_seq.get(loc_cam_chan, self.FIRST_SEQ)
start
# TODO: Finish this function
Returns
-------
dict[str, str]
A dictionary containing mocked metadata.
"""

def mock_night_report_metadata(self) -> None:
pass
key = f"{camera.name}/{self.day_obs}/metadata.json"
if self.s3_required:
self.upload_fileobj(self.md_json, location.bucket_name, key)
metadata = {key: self.md_json}
return metadata

def mock_night_report_plots(self) -> None:
pass
def mock_night_report_metadata(self, location: Location, camera: Camera) -> None:
key = f"{camera.name}/{self.day_obs}/night_report/md.json"
bucket_name = location.bucket_name
text_dict = {}
for idx in range(1, 4):
text_dict[f"text_{idx}"] = self.nr_text
if self.s3_required:
text_json = json.dumps(text_dict)
self.upload_fileobj(text_json, bucket_name, key)
self.nr_metadata[f"{location.name}/{camera.name}"] = text_dict

def generate_event(
self, bucket_name: str, camera_name: str, channel_name: str, seq_num: str
) -> dict[str, str]:
day_obs = self.day_obs
key = f"{camera_name}/{day_obs}/{channel_name}/{seq_num}/mocked_event.jpg"
def mock_night_report_plots(self, location: Location, camera: Camera) -> None:
bucket_name = location.bucket_name
loc_cam = f"{location.name}/{camera.name}"
self.nr_plots[loc_cam] = []
for group in self.nr_groups:
for idx in range(1, 2):
key = f"{camera.name}/{self.day_obs}/night_report/{group}/{group}_test-img0{idx}.jpg"
hash = self.upload_image(bucket_name, key, "assets/testcard_f.jpg")
self.nr_plots[loc_cam].append(NightReportData(key=key, hash=hash))

def upload_image(self, bucket_name: str, key: str, img_rel_path: str) -> str:
hash: str | None = None
if self.s3_required:
if self.upload_file(
Path(__file__).parent / "assets/testcard_f.jpg",
Path(__file__).parent / img_rel_path,
bucket_name,
key,
):
hash = self.get_obj_hash(bucket_name, key)
if hash is None:
hash = str(random.getrandbits(128))
return hash

def generate_event(
self, bucket_name: str, camera_name: str, channel_name: str, seq_num: str
) -> dict[str, str]:
day_obs = self.day_obs
key = f"{camera_name}/{day_obs}/{channel_name}/{seq_num}/mocked_event.jpg"
hash = self.upload_image(bucket_name, key, "assets/testcard_f.jpg")
return {"key": key, "hash": hash}

def dicts_to_events(self, channel_dicts: list[dict[str, str]]) -> list[Event]:
Expand Down Expand Up @@ -251,28 +292,6 @@ async def get_mocked_seq_events(self, location: Location) -> list[Event]:
seq_chan_events = [e for e in events if e.channel_name in seq_chan_names]
return seq_chan_events

def add_camera_metadata(self, location: Location, camera: Camera) -> dict[str, str]:
"""
Mock metadata for a given camera at a specified location.
Parameters
----------
location : Location
The location for which the metadata is being mocked.
camera : Camera
The camera for which the metadata is being mocked.
Returns
-------
dict[str, str]
A dictionary containing mocked metadata.
"""
key = f"{camera.name}/{today}/metadata.json"
metadata = {key: md_json}
if self.s3_required:
self.upload_fileobj(md_json, location.bucket_name, key)
return metadata

def upload_file(self, file_name: Path | str, bucket_name: str, key: str) -> bool:
"""Upload a file to an S3 bucket.
Expand Down

0 comments on commit e65483f

Please sign in to comment.