diff --git a/rundetection/ingestion/extracts.py b/rundetection/ingestion/extracts.py index 477c9b1..26c28aa 100644 --- a/rundetection/ingestion/extracts.py +++ b/rundetection/ingestion/extracts.py @@ -52,8 +52,18 @@ def osiris_extract(job_request: JobRequest, dataset: Any) -> JobRequest: freq_6 = float(dataset.get("selog").get("freq6").get("value_log").get("value")[0]) freq_10 = float(dataset.get("selog").get("freq10").get("value_log").get("value")[0]) + + # Accounting for floating point errors + max_value = max(freq_6, freq_10) + difference = abs(freq_6 - freq_10) + if difference > max_value * 0.01: + raise ReductionMetadataError( + "Frequency 6 and 10 are not within 1% of each other. Osiris reduction is not possible" + ) if freq_6 != freq_10: - raise ReductionMetadataError("Frequency 6 and 10 do not match. Osiris reduction is not possible") + freq_6 = round(freq_6) + freq_10 = round(freq_10) + job_request.additional_values["freq6"] = freq_6 job_request.additional_values["freq10"] = freq_10 diff --git a/rundetection/run_detection.py b/rundetection/run_detection.py index 129164a..fbf50fb 100644 --- a/rundetection/run_detection.py +++ b/rundetection/run_detection.py @@ -16,6 +16,7 @@ from pika import BlockingConnection, ConnectionParameters, PlainCredentials # type: ignore from pika.adapters.blocking_connection import BlockingChannel # type: ignore +from rundetection.exceptions import ReductionMetadataError from rundetection.ingestion.ingest import ingest from rundetection.job_requests import JobRequest from rundetection.specifications import InstrumentSpecification @@ -105,6 +106,9 @@ def process_messages(channel: BlockingChannel, notification_queue: SimpleQueue[J process_message(body.decode(), notification_queue) logger.info("Acking message %s", method_frame.delivery_tag) channel.basic_ack(method_frame.delivery_tag) + except ReductionMetadataError as exc: + logger.exception("Problem with metadata, cannot reduce, skipping message", exc_info=exc) + channel.basic_ack(method_frame.delivery_tag) # pylint: disable = broad-exception-caught except AttributeError: # If the message frame or body is missing attributes required e.g. the delivery tag pass diff --git a/test/ingestion/test_extracts.py b/test/ingestion/test_extracts.py index fe392fc..faed658 100644 --- a/test/ingestion/test_extracts.py +++ b/test/ingestion/test_extracts.py @@ -192,6 +192,35 @@ def test_osiris_extract(_, job_request): assert job_request.additional_values["cycle_string"] == "some string" +@patch("rundetection.ingestion.extracts.get_cycle_string_from_path", return_value="some string") +def test_osiris_extract_non_matching_freqs_within_error_boundary(_, job_request): + """Test Osiris extract""" + dataset = { + "selog": { + "phase6": {"value": (1221.0,)}, + "phase10": {"value": (1221.0,)}, + "freq6": {"value_log": {"value": (6.0,)}}, + "freq10": {"value_log": {"value": (5.999,)}}, + }, + "instrument": { + "dae": { + "time_channels_1": {"time_of_flight": (10.0, 100.0)}, + "time_channels_2": {"time_of_flight": (12.1, 121.0)}, + } + }, + } + osiris_extract(job_request, dataset) + assert job_request.additional_values["freq10"] == 6 + assert job_request.additional_values["freq6"] == 6 + assert job_request.additional_values["tcb_detector_min"] == 10.0 + assert job_request.additional_values["tcb_detector_max"] == 100.0 + assert job_request.additional_values["tcb_monitor_min"] == 12.1 + assert job_request.additional_values["tcb_monitor_max"] == 121.0 + assert job_request.additional_values["phase6"] == 1221.0 + assert job_request.additional_values["phase10"] == 1221.0 + assert job_request.additional_values["cycle_string"] == "some string" + + @patch("rundetection.ingestion.extracts.get_cycle_string_from_path", return_value="some string") def test_osiris_extract_raises_on_bad_frequencies(job_request): """Test correct exception raised when freq6 and freq10 do not match""" diff --git a/test/test_run_detection.py b/test/test_run_detection.py index d35ad44..cc44d3c 100644 --- a/test/test_run_detection.py +++ b/test/test_run_detection.py @@ -13,6 +13,7 @@ import pytest +from rundetection.exceptions import ReductionMetadataError from rundetection.ingestion.ingest import JobRequest from rundetection.run_detection import ( process_message, @@ -124,7 +125,7 @@ def test_process_messages(mock_process): @patch("rundetection.run_detection.process_message") -def test_process_messages_raises_still_acks(mock_process): +def test_process_messages_raises_exception_nacks(mock_process): """ Test messages are still acked after exception in processing :param mock_process: Mock Process messages function @@ -144,6 +145,27 @@ def test_process_messages_raises_still_acks(mock_process): channel.basic_nack.assert_called_once_with(method_frame.delivery_tag) +@patch("rundetection.run_detection.process_message") +def test_process_messages_raises_metadataerror_still_acks(mock_process): + """ + Test messages are still acked after exception in processing + :param mock_process: Mock Process messages function + :return: None + """ + channel = MagicMock() + method_frame = MagicMock() + body = "message_body".encode() + channel.consume.return_value = [(method_frame, None, body)] + notification_queue = SimpleQueue() + mock_process.side_effect = ReductionMetadataError + + process_messages(channel, notification_queue) + + channel.consume.assert_called_once() + mock_process.assert_called_once_with(body.decode(), notification_queue) + channel.basic_ack.assert_called_once_with(method_frame.delivery_tag) + + @patch("rundetection.run_detection.process_message") def test_process_messages_does_not_ack_attribute_error(_): """