Skip to content

Commit

Permalink
Finish test for yesterday collection
Browse files Browse the repository at this point in the history
  • Loading branch information
ugyballoons committed Aug 28, 2024
1 parent 3b11abd commit cf43247
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 28 deletions.
22 changes: 11 additions & 11 deletions python/lsst/ts/rubintv/background/currentpoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,10 @@ async def check_for_empty_per_day_channels(self) -> None:

async def poll_buckets_for_todays_data(self, test_day: str = "") -> None:
while True:
data_for_today_found = False
try:
if self._current_day_obs != get_current_day_obs():
await self.check_for_empty_per_day_channels()
await self.clear_todays_data()
data_for_today_found = False
day_obs = self._current_day_obs = get_current_day_obs()

for location in self.locations:
Expand All @@ -102,7 +100,6 @@ async def poll_buckets_for_todays_data(self, test_day: str = "") -> None:

objects = await client.async_list_objects(prefix)
if objects:
data_for_today_found = True
loc_cam = self._get_loc_cam(location.name, camera)
objects = await self.sieve_out_metadata(
objects, prefix, location, camera
Expand All @@ -112,14 +109,7 @@ async def poll_buckets_for_todays_data(self, test_day: str = "") -> None:
)
await self.process_channel_objects(objects, loc_cam, camera)

# Only look for yesterday's missing per day data if nothing
# yet found for today.

# TODO: This is too broad- check for data from each camera
# separately

if not data_for_today_found:
await self.poll_for_yesterdays_per_day(location)
await self.poll_for_yesterdays_per_day(location)

self.completed_first_poll = True

Expand Down Expand Up @@ -179,6 +169,16 @@ async def process_channel_objects(
)
await notify_ws_clients("camera", MessageType.CAMERA_TABLE, loc_cam, table)

# clear all relevant prefixes from the store looking for
# yesterday's per day updates

loc = loc_cam.split("/")[0]
prefixes = self._yesterday_prefixes.get(loc, [])
new_prefixes = [
prefix for prefix in prefixes if not prefix.startswith(camera.name)
]
self._yesterday_prefixes[loc] = new_prefixes

async def update_channel_events(
self, events: list[Event], loc_cam: str, camera: Camera
) -> None:
Expand Down
32 changes: 24 additions & 8 deletions tests/background/currentpoller_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

import pytest
from lsst.ts.rubintv.background.currentpoller import CurrentPoller
from lsst.ts.rubintv.models.models import Camera, Location, get_current_day_obs
from lsst.ts.rubintv.models.models import (
Camera,
Location,
ServiceMessageTypes,
get_current_day_obs,
)
from lsst.ts.rubintv.models.models_helpers import find_first
from lsst.ts.rubintv.models.models_init import ModelsInitiator

Expand Down Expand Up @@ -227,7 +232,7 @@ async def test_pick_up_yesterdays_movie(
with (
patch(
"lsst.ts.rubintv.background.currentpoller.get_current_day_obs",
return_value=day_obs.isoformat(),
return_value=day_obs,
) as mock_day_obs,
):
assert mock_day_obs
Expand All @@ -238,26 +243,37 @@ async def test_pick_up_yesterdays_movie(
mocked.delete_channel_events(location, camera, channel)
await current_poller.poll_buckets_for_todays_data()

print(current_poller._yesterday_prefixes)
# rollover day obs
yesterday = day_obs
day_obs = day_obs + timedelta(days=1)
# rubin_data_mocker.day_obs = day_obs

with (
patch(
"lsst.ts.rubintv.background.currentpoller.get_current_day_obs",
return_value=day_obs.isoformat(),
return_value=day_obs,
) as mock_day_obs,
patch(
"lsst.ts.rubintv.background.currentpoller.notify_ws_clients",
new_callable=AsyncMock,
) as mock_notify,
):
await current_poller.poll_buckets_for_todays_data()

# add movie data
# add movie data (arbitrary number of objs)
mocked.add_seq_objs_for_channel(location, camera, channel, 3)
await current_poller.poll_buckets_for_todays_data()

print(await current_poller.get_current_per_day_data(location.name, camera))
print(yesterday)
# assert that notification was made with the new event
# from yesterday.
service_msg = ServiceMessageTypes.CAMERA_PD_BACKDATED
loc_cam = f"{location.name}/{camera.name}"
events = mocked.get_mocked_events(location, camera, channel)
assert events is not []
last_event = max(events)
assert last_event
assert last_event.day_obs == yesterday.isoformat()
payload = {channel.name: last_event.__dict__}
mock_notify.assert_called_once_with("camera", service_msg, loc_cam, payload)


def get_test_camera_and_location() -> tuple[Camera, Location]:
Expand Down
18 changes: 9 additions & 9 deletions tests/mockdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,11 @@ def dicts_to_events(self, channel_dicts: list[dict[str, str]]) -> list[Event]:
events = [Event(**cd) for cd in channel_dicts]
return events

async def get_mocked_seq_events(self, location: Location) -> list[Event]:
def get_mocked_events(
self, location: Location, camera: Camera, channel: Channel
) -> list[Event]:
"""
Asynchronously retrieve sequence events for a given location.
Retrieve events for a given location.
Parameters
----------
Expand All @@ -232,13 +234,11 @@ async def get_mocked_seq_events(self, location: Location) -> list[Event]:
list[Event]
A list of Event objects representing sequence events.
"""
events = self.events.get(location.name)
if events is None:
return []
channels = self.location_channels[location.name]
seq_chan_names = [c for c in channels if not c.per_day]
seq_chan_events = [e for e in events if e.channel_name in seq_chan_names]
return seq_chan_events
loc_cam = f"{location.name}/{camera.name}"
events = [
e for e in self.events.get(loc_cam, []) if e.channel_name in channel.name
]
return events

def add_camera_metadata(self, location: Location, camera: Camera) -> dict[str, str]:
"""
Expand Down

0 comments on commit cf43247

Please sign in to comment.