Skip to content

Commit

Permalink
Moves update_mail and mail builders inside celery task and make the t…
Browse files Browse the repository at this point in the history
…ask async
  • Loading branch information
seijihg committed Feb 11, 2024
1 parent f2ca1f7 commit c3b2e67
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 80 deletions.
73 changes: 43 additions & 30 deletions mail/celery_tasks.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import time
import urllib.parse
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from smtplib import SMTPException
from typing import List, MutableMapping, Tuple
import time
from contextlib import contextmanager
from django.core.cache import cache

Expand All @@ -12,11 +10,12 @@
from django.conf import settings
from django.db import transaction
from django.utils import timezone
from mail.libraries.email_message_dto import EmailMessageDto
from rest_framework.status import HTTP_207_MULTI_STATUS, HTTP_208_ALREADY_REPORTED

from mail import requests as mail_requests
from mail.enums import ReceptionStatusEnum, SourceEnum
from mail.libraries.builders import build_licence_data_mail
from mail.libraries.builders import build_email_message, build_email_rejected_licence_message, build_licence_data_mail
from mail.libraries.data_processors import build_request_mail_message_dto
from mail.libraries.routing_controller import check_and_route_emails, send, update_mail
from mail.libraries.usage_data_decomposition import build_json_payload_from_data_blocks, split_edi_data_by_id
Expand Down Expand Up @@ -96,14 +95,7 @@ def notify_users_of_rejected_licences(mail_id, mail_response_subject):
logger.info("Notifying users of rejected licences found in mail with subject %s", mail_response_subject)

try:
multipart_msg = MIMEMultipart()
multipart_msg["From"] = settings.EMAIL_USER
multipart_msg["To"] = ",".join(settings.NOTIFY_USERS)
multipart_msg["Subject"] = "Licence rejected by HMRC"
body = MIMEText(f"Mail (Id: {mail_id}) with subject {mail_response_subject} has rejected licences")
multipart_msg.attach(body)

send_smtp_task(multipart_msg)
send_smtp_task.apply_async(kwargs={"mail_id": mail_id, "mail_response_subject": mail_response_subject})

except SMTPException:
logger.exception(
Expand Down Expand Up @@ -167,8 +159,7 @@ def send_licence_details_to_hmrc():
"Created licenceData mail with subject %s for licences [%s]", mail_dto.subject, licence_references
)

send(mail_dto)
update_mail(mail, mail_dto)
send(mail_dto, mail)

# Mark the payloads as processed
licences.update(is_processed=True)
Expand Down Expand Up @@ -281,33 +272,55 @@ def manage_inbox():


@contextmanager
def memcache_lock(lock_id, oid=None):
def cache_lock(lock_id):
timeout_at = time.monotonic() + LOCK_EXPIRE - 3
status = cache.add(lock_id, "locked", LOCK_EXPIRE)
# cache.add fails if the key already exists
status = cache.add(lock_id, "lock_acquired", LOCK_EXPIRE)
try:
yield status
finally:
if time.monotonic() < timeout_at and status:
cache.delete(lock_id)


@shared_task(bind=True, autoretry_for=(SMTPException,), max_retries=MAX_ATTEMPTS, retry_backoff=RETRY_BACKOFF)
def send_smtp_task(self, multipart_msg, mail=None, email_message_dto=None):
class SendSmtpFailureTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
mail_id = kwargs.get("mail_id", "Unknown")
message = f"""
Task failed permanently after all retries: send_smtp_task
Mail ID: {mail_id}
Exception: {exc}
Args: {args}
Kwargs: {kwargs}
Task ID: {task_id}
Exception Info: {einfo}
"""

# Log the final failure message
logger.critical(message)


@shared_task(bind=True, max_retries=MAX_ATTEMPTS, retry_backoff=3, base=SendSmtpFailureTask)
def send_smtp_task(self, mail_id=None, email_message_data=None, mail_response_subject=None):
global_lock_id = "global_send_email_lock"

with memcache_lock(global_lock_id) as acquired:
# From task notify_users_of_rejected_licences
if mail_id and mail_response_subject:
message = build_email_rejected_licence_message(mail_id, mail_response_subject)
else:
deserialized_email_message_dto = EmailMessageDto(**email_message_data)
message = build_email_message(deserialized_email_message_dto)

with cache_lock(global_lock_id) as acquired:
if acquired:
logger.info("Global lock acquired, sending email")
try:
smtp_send(multipart_msg)
logger.info("Successfully sent email.")
if mail and email_message_dto:
update_mail(mail, email_message_dto)
except SMTPException as e:
logger.error(f"Failed to send email: {e}")
raise
smtp_send(message)
logger.info(f"Mail with Response Subject:{message['Subject']}; successfully sent.")

# send_licence_details_to_hmrc and _collect_and_send -> update_mail were moved here if sucessfully sent
if mail_id and email_message_data:
mail = Mail.objects.filter(id=mail_id).first()
update_mail(mail, deserialized_email_message_dto)
else:
logger.info("Another send_smtp_task is currently in progress, will retry...")

retry_delay = RETRY_BACKOFF * (2**self.request.retries)
raise self.retry(countdown=retry_delay)
raise self.retry()
11 changes: 11 additions & 0 deletions mail/libraries/builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,14 @@ def _validate_dto(email_message_dto):

if email_message_dto.attachment is None:
raise TypeError("None file attachment received!")


def build_email_rejected_licence_message(mail_id, mail_response_subject):
multipart_msg = MIMEMultipart()
multipart_msg["From"] = settings.EMAIL_USER
multipart_msg["To"] = ",".join(settings.NOTIFY_USERS)
multipart_msg["Subject"] = "Licence rejected by HMRC"
body = MIMEText(f"Mail (Id: {mail_id}) with subject {mail_response_subject} has rejected licences")
multipart_msg.attach(body)

return multipart_msg
8 changes: 5 additions & 3 deletions mail/libraries/routing_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

from mail.auth import BasicAuthentication, ModernAuthentication
from mail.enums import ExtractTypeEnum, MailReadStatuses, ReceptionStatusEnum, SourceEnum
from mail.libraries.builders import build_email_message
from mail.libraries.data_processors import (
lock_db_for_sending_transaction,
serialize_email_message,
Expand Down Expand Up @@ -174,8 +173,11 @@ def send(email_message_dto: EmailMessageDto, mail=None):
from mail.celery_tasks import send_smtp_task

logger.info("Preparing to send email")
message = build_email_message(email_message_dto)
send_smtp_task(message, mail, email_message_dto)
email_message_data = email_message_dto._asdict()
if mail:
send_smtp_task.apply_async(kwargs={"mail_id": mail.id, "email_message_data": email_message_data})
else:
send_smtp_task.apply_async(kwargs={"email_message_data": email_message_data})


def _collect_and_send(mail: Mail):
Expand Down
117 changes: 70 additions & 47 deletions mail/tests/test_celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import pytest
from django.test import TestCase, override_settings
from django.core.cache import cache
from celery.exceptions import Retry

from django.conf import settings
from mail.celery_tasks import manage_inbox, notify_users_of_rejected_licences
from celery.exceptions import Retry, MaxRetriesExceededError

import email.mime.multipart
from mail.celery_tasks import SendSmtpFailureTask, manage_inbox, notify_users_of_rejected_licences
from mail.libraries.email_message_dto import EmailMessageDto
from mail.celery_tasks import send_smtp_task
from mail.celery_tasks import get_lite_api_url
Expand Down Expand Up @@ -73,6 +73,32 @@ def test_get_url_with_path_from_setting(self):
self.assertEqual(result, "https://example.com/foo")


class NotifyUsersOfRejectedMailTests(TestCase):
@override_settings(EMAIL_USER="[email protected]", NOTIFY_USERS=["[email protected]"]) # /PS-IGNORE
@mock.patch("mail.celery_tasks.smtp_send")
def test_send_success(self, mock_send):
notify_users_of_rejected_licences("123", "CHIEF_SPIRE_licenceReply_202401180900_42557")

mock_send.assert_called_once()

self.assertEqual(len(mock_send.call_args_list), 1)
message = mock_send.call_args[0][0]
self.assertIsInstance(message, email.mime.multipart.MIMEMultipart)

expected_headers = {
"Content-Type": "multipart/mixed",
"MIME-Version": "1.0",
"From": "[email protected]", # /PS-IGNORE
"To": "[email protected]", # /PS-IGNORE
"Subject": "Licence rejected by HMRC",
}
self.assertDictEqual(dict(message), expected_headers)

text_payload = message.get_payload(0)
expected_body = "Mail (Id: 123) with subject CHIEF_SPIRE_licenceReply_202401180900_42557 has rejected licences"
self.assertEqual(text_payload.get_payload(), expected_body)


class SendEmailTaskTests(TestCase):
def setUp(self):
attachment = "30 \U0001d5c4\U0001d5c6/\U0001d5c1 \u5317\u4EB0"
Expand All @@ -95,19 +121,14 @@ def test_locking_prevents_multiple_executions(self, mock_cache, mock_smtp_send):
# Simulate the lock being released after the first task finishes
mock_cache.delete.return_value = None

try:
send_smtp_task(self.email_message_dto)
except Retry:
self.fail("First task execution should not raise Retry.")

with self.assertRaises(Retry):
send_smtp_task(self.email_message_dto)
email_message_data = self.email_message_dto._asdict()
send_smtp_task.apply_async(kwargs={"email_message_data": email_message_data})
send_smtp_task.apply_async(kwargs={"email_message_data": email_message_data})

# Assert smtp_send was called once due to locking
mock_smtp_send.assert_called_once()
# After locked and being released
self.assertEqual(mock_cache.add.call_count, 2)
mock_cache.delete.assert_called_once_with("global_send_email_lock")
# Failed and re-tried so in total 3 times
self.assertEqual(mock_cache.add.call_count, 3)
# Check if lock was deleted
self.assertTrue(mock_cache.delete.called)

@mock.patch("mail.celery_tasks.send_smtp_task.retry", side_effect=Retry)
@mock.patch("mail.celery_tasks.smtp_send")
Expand All @@ -116,38 +137,40 @@ def test_retry_on_lock_failure(self, mock_cache, mock_smtp_send, mock_retry):
mock_cache.add.return_value = False
mock_smtp_send.return_value = None

with self.assertRaises(Retry):
send_smtp_task(self.email_message_dto)
email_message_data = self.email_message_dto._asdict()
try:
send_smtp_task.apply_async(kwargs={"email_message_data": email_message_data})
except Retry:
pass

mock_retry.assert_called_once()

retry_call_args = mock_retry.call_args
self.assertIn("countdown", retry_call_args[1])
retry_delay = retry_call_args[1]["countdown"]
self.assertEqual(retry_delay, 180)


class NotifyUsersOfRejectedMailTests(TestCase):
@override_settings(EMAIL_USER="[email protected]", NOTIFY_USERS=["[email protected]"]) # /PS-IGNORE
@mock.patch("mail.celery_tasks.smtp_send")
def test_send_success(self, mock_send):
notify_users_of_rejected_licences("123", "CHIEF_SPIRE_licenceReply_202401180900_42557")

mock_send.assert_called_once()

self.assertEqual(len(mock_send.call_args_list), 1)
message = mock_send.call_args[0][0]
self.assertIsInstance(message, email.mime.multipart.MIMEMultipart)

expected_headers = {
"Content-Type": "multipart/mixed",
"MIME-Version": "1.0",
"From": "[email protected]", # /PS-IGNORE
"To": "[email protected]", # /PS-IGNORE
"Subject": "Licence rejected by HMRC",
}
self.assertDictEqual(dict(message), expected_headers)

text_payload = message.get_payload(0)
expected_body = "Mail (Id: 123) with subject CHIEF_SPIRE_licenceReply_202401180900_42557 has rejected licences"
self.assertEqual(text_payload.get_payload(), expected_body)
@mock.patch("mail.celery_tasks.logger")
def test_on_failure_logging(self, mock_logger):
# Create an instance of the task
task = SendSmtpFailureTask()

# Simulated task failure information
exc = MaxRetriesExceededError()
task_id = "test_task_id"
args = ("arg1", "arg2")
kwargs = {"mail_id": "12345"}
einfo = "Simulated exception info"

# Manually call the on_failure method
task.on_failure(exc, task_id, args, kwargs, einfo)

# Build the expected message
expected_message = f"""
Task failed permanently after all retries: send_smtp_task
Mail ID: {kwargs['mail_id']}
Exception: {exc}
Args: {args}
Kwargs: {kwargs}
Task ID: {task_id}
Exception Info: {einfo}
"""

self.assertEqual(
" ".join(mock_logger.critical.call_args[0][0].strip().split()), " ".join(expected_message.strip().split())
)

0 comments on commit c3b2e67

Please sign in to comment.