diff --git a/mail/apps.py b/mail/apps.py index 2bab89d2..e88654e9 100644 --- a/mail/apps.py +++ b/mail/apps.py @@ -16,9 +16,9 @@ def initialize_background_tasks(cls, **kwargs): LICENCE_DATA_TASK_QUEUE, MANAGE_INBOX_TASK_QUEUE, manage_inbox, - schedule_licence_usage_figures_for_lite_api, send_licence_data_to_hmrc, ) + from mail.celery_tasks import send_licence_usage_figures_to_lite_api Task.objects.filter(queue=MANAGE_INBOX_TASK_QUEUE).delete() Task.objects.filter(queue=LICENCE_DATA_TASK_QUEUE).delete() @@ -33,7 +33,7 @@ def initialize_background_tasks(cls, **kwargs): usage_updates_not_sent_to_lite = UsageData.objects.filter(has_lite_data=True, lite_sent_at__isnull=True) for obj in usage_updates_not_sent_to_lite: - schedule_licence_usage_figures_for_lite_api(str(obj.id)) + send_licence_usage_figures_to_lite_api.delay(str(obj.id)) def ready(self): post_migrate.connect(self.initialize_background_tasks, sender=self) diff --git a/mail/celery_tasks.py b/mail/celery_tasks.py new file mode 100644 index 00000000..4f12f702 --- /dev/null +++ b/mail/celery_tasks.py @@ -0,0 +1,141 @@ +import urllib.parse +from typing import List, MutableMapping, Tuple +from mail.enums import ReceptionStatusEnum + +from celery import shared_task +from celery.utils.log import get_task_logger +from django.conf import settings +from django.utils import timezone +from rest_framework.status import HTTP_207_MULTI_STATUS, HTTP_208_ALREADY_REPORTED + +from mail import requests as mail_requests +from mail.models import LicenceIdMapping, UsageData +from mail.libraries.usage_data_decomposition import build_json_payload_from_data_blocks, split_edi_data_by_id + + +logger = get_task_logger(__name__) + + +# Send Usage Figures to LITE API +def get_lite_api_url(): + """The URL for the licence usage callback, from the LITE_API_URL setting. + + If the configured URL has no path, use `/licences/hmrc-integration/`. + """ + url = settings.LITE_API_URL + components = urllib.parse.urlparse(url) + + if components.path in ("", "/"): + components = components._replace(path="/licences/hmrc-integration/") + url = urllib.parse.urlunparse(components) + + return url + + +def parse_response(response) -> Tuple[MutableMapping, List[str], List[str]]: + response = response.json() + licences = response["licences"] + + accepted_licences = [ + LicenceIdMapping.objects.get(lite_id=licence.get("id")).reference + for licence in licences["accepted"] + if licence.get("id") + ] + rejected_licences = [ + LicenceIdMapping.objects.get(lite_id=licence.get("id")).reference + for licence in licences["rejected"] + if licence.get("id") + ] + + return response, accepted_licences, rejected_licences + + +def save_response(lite_usage_data: UsageData, accepted_licences, rejected_licences, response): + lite_usage_data.lite_accepted_licences = accepted_licences + lite_usage_data.lite_rejected_licences = rejected_licences + lite_usage_data.lite_sent_at = timezone.now() + lite_usage_data.lite_response = response + + if not lite_usage_data.has_spire_data: + lite_usage_data.mail.status = ReceptionStatusEnum.REPLY_RECEIVED + lite_usage_data.mail.save() + + lite_usage_data.save() + + +def _handle_exception(message, lite_usage_data_id): + error_message = f"Failed to send LITE UsageData [{lite_usage_data_id}] to LITE API -> {message} " + raise Exception(error_message) + + +MAX_ATTEMPTS = 3 + + +@shared_task( + autoretry_for=(Exception,), + max_retries=MAX_ATTEMPTS, + retry_backoff=True, +) +def send_licence_usage_figures_to_lite_api(lite_usage_data_id): + """Sends HMRC Usage figure updates to LITE""" + + logger.info("Preparing LITE UsageData [%s] for LITE API", lite_usage_data_id) + + try: + lite_usage_data = UsageData.objects.get(id=lite_usage_data_id) + licences = UsageData.licence_ids + except UsageData.DoesNotExist: # noqa + _handle_exception( + f"LITE UsageData [{lite_usage_data_id}] does not exist.", + lite_usage_data_id, + ) + + # Extract usage details of Licences issued from LITE + _, data = split_edi_data_by_id(lite_usage_data.mail.edi_data, lite_usage_data) + payload = build_json_payload_from_data_blocks(data) + + # We only process usage data for active licences so below error is unlikely + if len(payload["licences"]) == 0: + logger.error("Licences is blank in payload for %s", lite_usage_data, exc_info=True) + return + + payload["usage_data_id"] = lite_usage_data_id + lite_api_url = get_lite_api_url() + logger.info("Sending LITE UsageData [%s] figures for Licences [%s] to LITE API", lite_usage_data_id, licences) + + try: + lite_usage_data.lite_payload = payload + lite_usage_data.save() + + response = mail_requests.put( + lite_api_url, + lite_usage_data.lite_payload, + hawk_credentials=settings.HAWK_LITE_HMRC_INTEGRATION_CREDENTIALS, + timeout=settings.LITE_API_REQUEST_TIMEOUT, + ) + except Exception as exc: # noqa + _handle_exception( + f"An unexpected error occurred when sending LITE UsageData [{lite_usage_data_id}] to LITE API -> " + f"{type(exc).__name__}: {exc}", + lite_usage_data_id, + ) + + if response.status_code not in [HTTP_207_MULTI_STATUS, HTTP_208_ALREADY_REPORTED]: + _handle_exception( + f"An unexpected response was received when sending LITE UsageData [{lite_usage_data_id}] to " + f"LITE API -> status=[{response.status_code}], message=[{response.text}]", + lite_usage_data_id, + ) + + if response.status_code == HTTP_207_MULTI_STATUS: + try: + response, accepted_licences, rejected_licences = parse_response(response) + except Exception as exc: # noqa + _handle_exception( + f"An unexpected error occurred when parsing the response for LITE UsageData " + f"[{lite_usage_data_id}] -> {type(exc).__name__}: {exc}", + lite_usage_data_id, + ) + save_response(lite_usage_data, accepted_licences, rejected_licences, response) + + logger.info("Successfully sent LITE UsageData [%s] to LITE API", lite_usage_data_id) diff --git a/mail/models.py b/mail/models.py index 8c854cf7..cfb7b58b 100644 --- a/mail/models.py +++ b/mail/models.py @@ -145,9 +145,9 @@ def get_licence_ids(self): @staticmethod def send_usage_updates_to_lite(id): - from mail.tasks import schedule_licence_usage_figures_for_lite_api + from mail.celery_tasks import send_licence_usage_figures_to_lite_api - schedule_licence_usage_figures_for_lite_api(str(id)) + send_licence_usage_figures_to_lite_api.delay(str(id)) class LicencePayload(models.Model): diff --git a/mail/tasks.py b/mail/tasks.py index f28e6a7d..fa7d91b8 100644 --- a/mail/tasks.py +++ b/mail/tasks.py @@ -1,26 +1,18 @@ import logging import os -import urllib.parse -from datetime import timedelta from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText -from typing import List, MutableMapping, Tuple from background_task import background -from background_task.models import Task from django.conf import settings from django.db import transaction -from django.utils import timezone -from rest_framework.status import HTTP_207_MULTI_STATUS, HTTP_208_ALREADY_REPORTED -from mail import requests as mail_requests from mail.enums import ChiefSystemEnum, ReceptionStatusEnum, SourceEnum from mail.libraries.builders import build_licence_data_mail from mail.libraries.data_processors import build_request_mail_message_dto from mail.libraries.lite_to_edifact_converter import EdifactValidationError from mail.libraries.routing_controller import check_and_route_emails, send, update_mail -from mail.libraries.usage_data_decomposition import build_json_payload_from_data_blocks, split_edi_data_by_id -from mail.models import LicenceIdMapping, LicencePayload, Mail, UsageData +from mail.models import LicencePayload, Mail from mail.servers import smtp_send logger = logging.getLogger(__name__) @@ -29,178 +21,9 @@ MANAGE_INBOX_TASK_QUEUE = "manage_inbox_queue" NOTIFY_USERS_TASK_QUEUE = "notify_users_queue" LICENCE_DATA_TASK_QUEUE = "licences_updates_queue" -USAGE_FIGURES_QUEUE = "usage_figures_queue" TASK_BACK_OFF = 3600 # Time, in seconds, to wait before scheduling a new task (used after MAX_ATTEMPTS is reached) -# Send Usage Figures to LITE API -def get_lite_api_url(): - """The URL for the licence usage callback, from the LITE_API_URL setting. - - If the configured URL has no path, use `/licences/hmrc-integration/`. - """ - url = settings.LITE_API_URL - components = urllib.parse.urlparse(url) - - if components.path in ("", "/"): - components = components._replace(path="/licences/hmrc-integration/") - url = urllib.parse.urlunparse(components) - - return url - - -@background(queue=USAGE_FIGURES_QUEUE, schedule=0) -def send_licence_usage_figures_to_lite_api(lite_usage_data_id): - """Sends HMRC Usage figure updates to LITE""" - - logger.info("Preparing LITE UsageData [%s] for LITE API", lite_usage_data_id) - - try: - lite_usage_data = UsageData.objects.get(id=lite_usage_data_id) - licences = UsageData.licence_ids - except UsageData.DoesNotExist: # noqa - _handle_exception( - f"LITE UsageData [{lite_usage_data_id}] does not exist.", - lite_usage_data_id, - ) - return - - # Extract usage details of Licences issued from LITE - _, data = split_edi_data_by_id(lite_usage_data.mail.edi_data, lite_usage_data) - payload = build_json_payload_from_data_blocks(data) - - # We only process usage data for active licences so below error is unlikely - if len(payload["licences"]) == 0: - logger.error("Licences is blank in payload for %s", lite_usage_data, exc_info=True) - return - - payload["usage_data_id"] = lite_usage_data_id - lite_api_url = get_lite_api_url() - logger.info("Sending LITE UsageData [%s] figures for Licences [%s] to LITE API", lite_usage_data_id, licences) - - try: - lite_usage_data.lite_payload = payload - lite_usage_data.save() - - response = mail_requests.put( - lite_api_url, - lite_usage_data.lite_payload, - hawk_credentials=settings.HAWK_LITE_HMRC_INTEGRATION_CREDENTIALS, - timeout=settings.LITE_API_REQUEST_TIMEOUT, - ) - except Exception as exc: # noqa - _handle_exception( - f"An unexpected error occurred when sending LITE UsageData [{lite_usage_data_id}] to LITE API -> " - f"{type(exc).__name__}: {exc}", - lite_usage_data_id, - ) - return - - if response.status_code not in [HTTP_207_MULTI_STATUS, HTTP_208_ALREADY_REPORTED]: - _handle_exception( - f"An unexpected response was received when sending LITE UsageData [{lite_usage_data_id}] to " - f"LITE API -> status=[{response.status_code}], message=[{response.text}]", - lite_usage_data_id, - ) - return - - if response.status_code == HTTP_207_MULTI_STATUS: - try: - response, accepted_licences, rejected_licences = parse_response(response) - except Exception as exc: # noqa - _handle_exception( - f"An unexpected error occurred when parsing the response for LITE UsageData " - f"[{lite_usage_data_id}] -> {type(exc).__name__}: {exc}", - lite_usage_data_id, - ) - return - save_response(lite_usage_data, accepted_licences, rejected_licences, response) - - logger.info("Successfully sent LITE UsageData [%s] to LITE API", lite_usage_data_id) - - -def schedule_licence_usage_figures_for_lite_api(lite_usage_data_id): - logger.info("Scheduling UsageData '%s' for LITE API", lite_usage_data_id) - task = Task.objects.filter(queue=USAGE_FIGURES_QUEUE, task_params=f'[["{lite_usage_data_id}"], {{}}]') - - if task.exists(): - logger.info("UsageData '%s' has already been scheduled", lite_usage_data_id) - else: - send_licence_usage_figures_to_lite_api(lite_usage_data_id) - logger.info("UsageData '%s' has been scheduled", lite_usage_data_id) - - -def parse_response(response) -> Tuple[MutableMapping, List[str], List[str]]: - response = response.json() - licences = response["licences"] - - accepted_licences = [ - LicenceIdMapping.objects.get(lite_id=licence.get("id")).reference - for licence in licences["accepted"] - if licence.get("id") - ] - rejected_licences = [ - LicenceIdMapping.objects.get(lite_id=licence.get("id")).reference - for licence in licences["rejected"] - if licence.get("id") - ] - - return response, accepted_licences, rejected_licences - - -def save_response(lite_usage_data: UsageData, accepted_licences, rejected_licences, response): - lite_usage_data.lite_accepted_licences = accepted_licences - lite_usage_data.lite_rejected_licences = rejected_licences - lite_usage_data.lite_sent_at = timezone.now() - lite_usage_data.lite_response = response - - if not lite_usage_data.has_spire_data: - lite_usage_data.mail.status = ReceptionStatusEnum.REPLY_RECEIVED - lite_usage_data.mail.save() - - lite_usage_data.save() - - -def schedule_max_tried_task_as_new_task(lite_usage_data_id): - """ - Used to schedule a max-tried task as a new task (starting from attempts=0); - Abstracted from 'send_licence_usage_figures_to_lite_api' to enable unit testing of a recursive operation - """ - - logger.warning( - "Maximum attempts of %s for LITE UsageData [%s] has been reached", settings.MAX_ATTEMPTS, lite_usage_data_id - ) - - schedule_datetime = timezone.now() + timedelta(seconds=TASK_BACK_OFF) - logger.info( - "Scheduling new task for LITE UsageData [%s] to commence at [%s]", lite_usage_data_id, schedule_datetime - ) - send_licence_usage_figures_to_lite_api(lite_usage_data_id, schedule=TASK_BACK_OFF) # noqa - - -def _handle_exception(message, lite_usage_data_id): - error_message = f"Failed to send LITE UsageData [{lite_usage_data_id}] to LITE API -> {message} " - - try: - task = Task.objects.get(queue=USAGE_FIGURES_QUEUE, task_params=f'[["{lite_usage_data_id}"], {{}}]') - except Task.DoesNotExist: - logger.error("No task was found for UsageData [%s]", lite_usage_data_id) - else: - # Get the task's current attempt number by retrieving the previous attempts and adding 1 - current_attempt = task.attempts + 1 - - # Schedule a new task if the current task has been attempted MAX_ATTEMPTS times; - # HMRC Integration tasks need to be resilient and keep retrying post-failure indefinitely. - # This logic will make MAX_ATTEMPTS attempts to send licence changes according to the Django Background Task - # Runner scheduling, then wait TASK_BACK_OFF seconds before starting the process again. - if current_attempt >= settings.MAX_ATTEMPTS: - schedule_max_tried_task_as_new_task(lite_usage_data_id) - - # Raise an exception - # this will cause the task to be marked as 'Failed' and retried if there are retry attempts left - raise Exception(error_message) - - @background(queue=LICENCE_DATA_TASK_QUEUE, schedule=0) def send_licence_data_to_hmrc(): """Sends LITE (or ICMS) licence updates to HMRC diff --git a/mail/tests/conftest.py b/mail/tests/conftest.py new file mode 100644 index 00000000..e804cf5e --- /dev/null +++ b/mail/tests/conftest.py @@ -0,0 +1,6 @@ +import pytest +import os + +@pytest.fixture(autouse=True) +def celery_sync(settings): + settings.CELERY_TASK_ALWAYS_EAGER = True diff --git a/mail/tests/test_celery_tasks.py b/mail/tests/test_celery_tasks.py new file mode 100644 index 00000000..7e36babe --- /dev/null +++ b/mail/tests/test_celery_tasks.py @@ -0,0 +1,24 @@ +from django.test import TestCase + +from mail.celery_tasks import get_lite_api_url + +class GetLiteAPIUrlTests(TestCase): + def test_get_url_with_no_path(self): + with self.settings(LITE_API_URL="https://example.com"): + result = get_lite_api_url() + + self.assertEqual(result, "https://example.com/licences/hmrc-integration/") + + def test_get_url_with_root_path(self): + with self.settings(LITE_API_URL="https://example.com/"): + result = get_lite_api_url() + + self.assertEqual(result, "https://example.com/licences/hmrc-integration/") + + def test_get_url_with_path_from_setting(self): + with self.settings(LITE_API_URL="https://example.com/foo"): + result = get_lite_api_url() + + self.assertEqual(result, "https://example.com/foo") + + diff --git a/mail/tests/test_send_licence_usage_figures_to_lite_api.py b/mail/tests/test_send_licence_usage_figures_to_lite_api.py index efed5243..7a48bcd5 100644 --- a/mail/tests/test_send_licence_usage_figures_to_lite_api.py +++ b/mail/tests/test_send_licence_usage_figures_to_lite_api.py @@ -6,7 +6,7 @@ from rest_framework.status import HTTP_207_MULTI_STATUS, HTTP_208_ALREADY_REPORTED, HTTP_400_BAD_REQUEST from mail.models import GoodIdMapping, LicenceIdMapping, LicencePayload, Mail, UsageData -from mail.tasks import schedule_max_tried_task_as_new_task, send_licence_usage_figures_to_lite_api +from mail.celery_tasks import send_licence_usage_figures_to_lite_api from mail.tests.libraries.client import LiteHMRCTestClient @@ -64,7 +64,7 @@ def setUp(self): spire_run_number=0, ) - @mock.patch("mail.tasks.mail_requests.put") + @mock.patch("mail.celery_tasks.mail_requests.put") def test_schedule_usages_for_lite_api_207_ok(self, put_request): put_request.return_value = MockResponse( json={ @@ -97,7 +97,7 @@ def test_schedule_usages_for_lite_api_207_ok(self, put_request): status_code=HTTP_207_MULTI_STATUS, ) - send_licence_usage_figures_to_lite_api.now(str(self.usage_data.id)) + send_licence_usage_figures_to_lite_api.delay(str(self.usage_data.id)) self.usage_data.refresh_from_db() put_request.assert_called_with( @@ -111,14 +111,14 @@ def test_schedule_usages_for_lite_api_207_ok(self, put_request): self.assertEqual(self.usage_data.lite_accepted_licences, ["GBSIEL/2020/0000008/P"]) self.assertEqual(self.usage_data.lite_rejected_licences, ["GBSIEL/2020/0000009/P"]) - @mock.patch("mail.tasks.mail_requests.put") + @mock.patch("mail.celery_tasks.mail_requests.put") def test_schedule_usages_for_lite_api_208_ok(self, put_request): original_sent_at = self.usage_data.lite_sent_at original_accepted_licences = self.usage_data.lite_accepted_licences original_rejected_licences = self.usage_data.lite_rejected_licences put_request.return_value = MockResponse(status_code=HTTP_208_ALREADY_REPORTED) - send_licence_usage_figures_to_lite_api.now(str(self.usage_data.id)) + send_licence_usage_figures_to_lite_api.delay(str(self.usage_data.id)) self.usage_data.refresh_from_db() put_request.assert_called_with( @@ -132,12 +132,11 @@ def test_schedule_usages_for_lite_api_208_ok(self, put_request): self.assertEqual(self.usage_data.lite_accepted_licences, original_accepted_licences) self.assertEqual(self.usage_data.lite_rejected_licences, original_rejected_licences) - @mock.patch("mail.tasks.mail_requests.put") + @mock.patch("mail.celery_tasks.mail_requests.put") def test_schedule_usages_for_lite_api_400_bad_request(self, put_request): put_request.return_value = MockResponse(status_code=HTTP_400_BAD_REQUEST) - with self.assertRaises(Exception) as error: - send_licence_usage_figures_to_lite_api.now(str(self.usage_data.id)) + send_licence_usage_figures_to_lite_api.delay(str(self.usage_data.id)) self.usage_data.refresh_from_db() put_request.assert_called_with( @@ -149,38 +148,7 @@ def test_schedule_usages_for_lite_api_400_bad_request(self, put_request): self.usage_data.refresh_from_db() self.assertIsNone(self.usage_data.lite_sent_at) - @mock.patch("mail.tasks.schedule_max_tried_task_as_new_task") - @mock.patch("mail.tasks.Task.objects.get") - @mock.patch("mail.tasks.mail_requests.put") - def test_schedule_usages_for_lite_api_max_tried_task(self, put_request, get_task, schedule_new_task): - put_request.return_value = MockResponse(status_code=HTTP_400_BAD_REQUEST) - get_task.return_value = MockTask(settings.MAX_ATTEMPTS - 1) - schedule_new_task.return_value = None - - with self.assertRaises(Exception) as error: - send_licence_usage_figures_to_lite_api.now(str(self.usage_data.id)) - - self.usage_data.refresh_from_db() - - put_request.assert_called_with( - f"{settings.LITE_API_URL}/licences/hmrc-integration/", - self.usage_data.lite_payload, - hawk_credentials=settings.HAWK_LITE_HMRC_INTEGRATION_CREDENTIALS, - timeout=settings.LITE_API_REQUEST_TIMEOUT, - ) - schedule_new_task.assert_called_with(str(self.usage_data.id)) - self.usage_data.refresh_from_db() - self.assertIsNone(self.usage_data.lite_sent_at) - - @mock.patch("mail.tasks.send_licence_usage_figures_to_lite_api") - def test_schedule_new_task(self, send_licence_usage_figures): - send_licence_usage_figures.return_value = None - - schedule_max_tried_task_as_new_task(str(self.usage_data.id)) - - send_licence_usage_figures.assert_called_with(str(self.usage_data.id), schedule=mock.ANY) - - @mock.patch("mail.tasks.mail_requests.put") + @mock.patch("mail.celery_tasks.mail_requests.put") def test_licence_usage_ignore_licence_completion(self, put_request): """ Test that ensures that licenceUsage transaction that has a completion date @@ -266,7 +234,7 @@ def test_licence_usage_ignore_licence_completion(self, put_request): status_code=HTTP_207_MULTI_STATUS, ) - send_licence_usage_figures_to_lite_api.now(str(usage_data.id)) + send_licence_usage_figures_to_lite_api.delay(str(usage_data.id)) usage_data.refresh_from_db() put_request.assert_called_with( diff --git a/mail/tests/test_tasks.py b/mail/tests/test_tasks.py index e6868372..28be5081 100644 --- a/mail/tests/test_tasks.py +++ b/mail/tests/test_tasks.py @@ -3,27 +3,7 @@ from django.test import TestCase -from mail.tasks import get_lite_api_url, notify_users_of_rejected_mail - - -class GetLiteAPIUrlTests(TestCase): - def test_get_url_with_no_path(self): - with self.settings(LITE_API_URL="https://example.com"): - result = get_lite_api_url() - - self.assertEqual(result, "https://example.com/licences/hmrc-integration/") - - def test_get_url_with_root_path(self): - with self.settings(LITE_API_URL="https://example.com/"): - result = get_lite_api_url() - - self.assertEqual(result, "https://example.com/licences/hmrc-integration/") - - def test_get_url_with_path_from_setting(self): - with self.settings(LITE_API_URL="https://example.com/foo"): - result = get_lite_api_url() - - self.assertEqual(result, "https://example.com/foo") +from mail.tasks import notify_users_of_rejected_mail class NotifyUsersOfRejectedMailTests(TestCase):