Skip to content

Commit

Permalink
Osiris quick fixes (#266)
Browse files Browse the repository at this point in the history
* Ack for metadata_errors and round and check frequencies

* Add missing arg name
  • Loading branch information
keiranjprice101 committed May 3, 2024
1 parent 92587a2 commit aed51f9
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 2 deletions.
12 changes: 11 additions & 1 deletion rundetection/ingestion/extracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,18 @@ def osiris_extract(job_request: JobRequest, dataset: Any) -> JobRequest:

freq_6 = float(dataset.get("selog").get("freq6").get("value_log").get("value")[0])
freq_10 = float(dataset.get("selog").get("freq10").get("value_log").get("value")[0])

# Accounting for floating point errors
max_value = max(freq_6, freq_10)
difference = abs(freq_6 - freq_10)
if difference > max_value * 0.01:
raise ReductionMetadataError(
"Frequency 6 and 10 are not within 1% of each other. Osiris reduction is not possible"
)
if freq_6 != freq_10:
raise ReductionMetadataError("Frequency 6 and 10 do not match. Osiris reduction is not possible")
freq_6 = round(freq_6)
freq_10 = round(freq_10)

job_request.additional_values["freq6"] = freq_6
job_request.additional_values["freq10"] = freq_10

Expand Down
4 changes: 4 additions & 0 deletions rundetection/run_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from pika import BlockingConnection, ConnectionParameters, PlainCredentials # type: ignore
from pika.adapters.blocking_connection import BlockingChannel # type: ignore

from rundetection.exceptions import ReductionMetadataError
from rundetection.ingestion.ingest import ingest
from rundetection.job_requests import JobRequest
from rundetection.specifications import InstrumentSpecification
Expand Down Expand Up @@ -105,6 +106,9 @@ def process_messages(channel: BlockingChannel, notification_queue: SimpleQueue[J
process_message(body.decode(), notification_queue)
logger.info("Acking message %s", method_frame.delivery_tag)
channel.basic_ack(method_frame.delivery_tag)
except ReductionMetadataError as exc:
logger.exception("Problem with metadata, cannot reduce, skipping message", exc_info=exc)
channel.basic_ack(method_frame.delivery_tag)
# pylint: disable = broad-exception-caught
except AttributeError: # If the message frame or body is missing attributes required e.g. the delivery tag
pass
Expand Down
29 changes: 29 additions & 0 deletions test/ingestion/test_extracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,35 @@ def test_osiris_extract(_, job_request):
assert job_request.additional_values["cycle_string"] == "some string"


@patch("rundetection.ingestion.extracts.get_cycle_string_from_path", return_value="some string")
def test_osiris_extract_non_matching_freqs_within_error_boundary(_, job_request):
"""Test Osiris extract"""
dataset = {
"selog": {
"phase6": {"value": (1221.0,)},
"phase10": {"value": (1221.0,)},
"freq6": {"value_log": {"value": (6.0,)}},
"freq10": {"value_log": {"value": (5.999,)}},
},
"instrument": {
"dae": {
"time_channels_1": {"time_of_flight": (10.0, 100.0)},
"time_channels_2": {"time_of_flight": (12.1, 121.0)},
}
},
}
osiris_extract(job_request, dataset)
assert job_request.additional_values["freq10"] == 6
assert job_request.additional_values["freq6"] == 6
assert job_request.additional_values["tcb_detector_min"] == 10.0
assert job_request.additional_values["tcb_detector_max"] == 100.0
assert job_request.additional_values["tcb_monitor_min"] == 12.1
assert job_request.additional_values["tcb_monitor_max"] == 121.0
assert job_request.additional_values["phase6"] == 1221.0
assert job_request.additional_values["phase10"] == 1221.0
assert job_request.additional_values["cycle_string"] == "some string"


@patch("rundetection.ingestion.extracts.get_cycle_string_from_path", return_value="some string")
def test_osiris_extract_raises_on_bad_frequencies(job_request):
"""Test correct exception raised when freq6 and freq10 do not match"""
Expand Down
24 changes: 23 additions & 1 deletion test/test_run_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import pytest

from rundetection.exceptions import ReductionMetadataError
from rundetection.ingestion.ingest import JobRequest
from rundetection.run_detection import (
process_message,
Expand Down Expand Up @@ -124,7 +125,7 @@ def test_process_messages(mock_process):


@patch("rundetection.run_detection.process_message")
def test_process_messages_raises_still_acks(mock_process):
def test_process_messages_raises_exception_nacks(mock_process):
"""
Test messages are still acked after exception in processing
:param mock_process: Mock Process messages function
Expand All @@ -144,6 +145,27 @@ def test_process_messages_raises_still_acks(mock_process):
channel.basic_nack.assert_called_once_with(method_frame.delivery_tag)


@patch("rundetection.run_detection.process_message")
def test_process_messages_raises_metadataerror_still_acks(mock_process):
"""
Test messages are still acked after exception in processing
:param mock_process: Mock Process messages function
:return: None
"""
channel = MagicMock()
method_frame = MagicMock()
body = "message_body".encode()
channel.consume.return_value = [(method_frame, None, body)]
notification_queue = SimpleQueue()
mock_process.side_effect = ReductionMetadataError

process_messages(channel, notification_queue)

channel.consume.assert_called_once()
mock_process.assert_called_once_with(body.decode(), notification_queue)
channel.basic_ack.assert_called_once_with(method_frame.delivery_tag)


@patch("rundetection.run_detection.process_message")
def test_process_messages_does_not_ack_attribute_error(_):
"""
Expand Down

0 comments on commit aed51f9

Please sign in to comment.