Skip to content

Commit

Permalink
Add tests for send_email task
Browse files Browse the repository at this point in the history
  • Loading branch information
kevincarrogan committed Dec 23, 2024
1 parent 8f955b0 commit 6dc4891
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 10 deletions.
17 changes: 8 additions & 9 deletions mail/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _log_error(message, lite_usage_data_id):
logger.error("Failed to send LITE UsageData [{%s}] to LITE API -> {%s}", lite_usage_data_id, message)


MAX_ATTEMPTS = 3
MAX_RETRIES = 3
RETRY_BACKOFF = 180
LOCK_EXPIRE = 60 * 10 # secs (10 min)
CELERY_SEND_LICENCE_UPDATES_TASK_NAME = "mail.celery_tasks.send_licence_details_to_hmrc"
Expand All @@ -100,7 +100,7 @@ def on_failure(self, exc, task_id, args, kwargs, einfo):
ConnectionResetError,
SMTPException,
),
max_retries=MAX_ATTEMPTS,
max_retries=MAX_RETRIES,
retry_backoff=RETRY_BACKOFF,
base=SendEmailBaseTask,
serializer="pickle",
Expand All @@ -117,10 +117,9 @@ def send_email_task(message):
This is achieved by deferring all email sending functionality to this task.
Before sending email it first tries to acquire a lock.
- If there are no active connections then it acquires lock and sends email.
In some cases we need to update state which is handled in subtask linked to this task.
- If there is active connection (lock acquisition fails) then it raises an exception
which triggers a retry.
If all retries fail then manual intervention may be required (unlikely)
- If there is already an active connection then it will block until it is closed.
- In some cases we need to update state which is handled in subtask linked to this task.
- If all retries fail then manual intervention may be required (unlikely)
"""

global_lock_id = "global_send_email_lock"
Expand Down Expand Up @@ -217,7 +216,7 @@ def on_failure(self, exc, task_id, args, kwargs, einfo):

@shared_task(
autoretry_for=(EdifactValidationError,),
max_retries=MAX_ATTEMPTS,
max_retries=MAX_RETRIES,
retry_backoff=RETRY_BACKOFF,
base=SendLicenceDetailsBaseTask,
)
Expand Down Expand Up @@ -272,7 +271,7 @@ def send_licence_details_to_hmrc():

@shared_task(
autoretry_for=(Exception,),
max_retries=MAX_ATTEMPTS,
max_retries=MAX_RETRIES,
retry_backoff=True,
base=SendUsageDataBaseTask,
)
Expand Down Expand Up @@ -348,7 +347,7 @@ def send_licence_usage_figures_to_lite_api(lite_usage_data_id):
# Scan Inbox for SPIRE and HMRC Emails
@shared_task(
autoretry_for=(Exception,),
max_retries=MAX_ATTEMPTS,
max_retries=MAX_RETRIES,
retry_backoff=RETRY_BACKOFF,
)
def manage_inbox():
Expand Down
134 changes: 133 additions & 1 deletion mail/tests/test_celery_tasks.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
import concurrent.futures
import email.mime.multipart
import time
from datetime import datetime, timezone
from email.mime.multipart import MIMEMultipart
from smtplib import SMTPException
from unittest import mock
from unittest.mock import MagicMock

import pytest
from django.test import TestCase, override_settings
from parameterized import parameterized

from mail.celery_tasks import get_lite_api_url, manage_inbox, notify_users_of_rejected_licences
from mail.celery_tasks import (
MAX_RETRIES,
get_lite_api_url,
manage_inbox,
notify_users_of_rejected_licences,
send_email_task,
)
from mail.enums import ExtractTypeEnum, ReceptionStatusEnum, SourceEnum
from mail.libraries.email_message_dto import EmailMessageDto
from mail.libraries.routing_controller import check_and_route_emails
Expand Down Expand Up @@ -223,3 +232,126 @@ def test_processing_of_licence_reply_with_rejected_licences(
self.assertEqual(message["From"], emails_data[index]["sender"])
self.assertEqual(message["To"], emails_data[index]["recipients"])
self.assertEqual(message["Subject"], emails_data[index]["subject"])


class SendEmailTestTests(TestCase):
@pytest.fixture(autouse=True)
def inject_fixtures(self, caplog):
self.caplog = caplog

@mock.patch("mail.celery_tasks.cache")
@mock.patch("mail.servers.get_smtp_connection")
def test_sends_email(self, mock_get_smtp_connection, mock_cache):
mock_conn = mock_get_smtp_connection()
message = {
"From": "[email protected]",
"To": "[email protected]",
}
send_email_task.apply(args=[message])
mock_conn.send_message.assert_called_with(message)
mock_conn.quit.assert_called()
mock_cache.lock.assert_called_with("global_send_email_lock", timeout=600)

@parameterized.expand(
[
(ConnectionResetError,),
(SMTPException,),
]
)
@mock.patch("mail.celery_tasks.cache")
@mock.patch("mail.servers.get_smtp_connection")
def test_sends_email_failed_then_succeeds(self, exception_class, mock_get_smtp_connection, mock_cache):
mock_conn = mock_get_smtp_connection()
message = {
"From": "[email protected]",
"To": "[email protected]",
}
mock_conn.send_message.side_effect = [exception_class(), None]
send_email_task.apply(args=[message])
mock_conn.send_message.assert_called_with(message)
self.assertEqual(mock_conn.send_message.call_count, 2)
self.assertEqual(mock_conn.quit.call_count, 2)
mock_cache.lock.assert_called_with("global_send_email_lock", timeout=600)

@parameterized.expand(
[
(ConnectionResetError,),
(SMTPException,),
]
)
@mock.patch("mail.celery_tasks.cache")
@mock.patch("mail.servers.get_smtp_connection")
def test_sends_email_max_retry_failures(self, exception_class, mock_get_smtp_connection, mock_cache):
mock_conn = mock_get_smtp_connection()
message = {
"From": "[email protected]",
"To": "[email protected]",
}
mock_conn.send_message.side_effect = exception_class()
send_email_task.apply(args=[message])
mock_conn.send_message.assert_called_with(message)
self.assertEqual(
mock_conn.send_message.call_count,
MAX_RETRIES + 1,
)
self.assertEqual(mock_conn.quit.call_count, MAX_RETRIES + 1)
mock_cache.lock.assert_called_with("global_send_email_lock", timeout=600)

@mock.patch("mail.servers.get_smtp_connection")
def test_locking(self, mock_get_smtp_connection):
results = []

SLEEP_TIME = 1

def _sleepy(message):
call = {}
call["start"] = {
"message": message,
"time": time.monotonic(),
}
time.sleep(SLEEP_TIME)
call["end"] = {
"message": message,
"time": time.monotonic(),
}
results.append(call)

mock_conn = mock_get_smtp_connection()
mock_conn.send_message.side_effect = _sleepy

with concurrent.futures.ThreadPoolExecutor() as executor:
message_1 = {
"From": "[email protected]",
"To": "[email protected]",
}
future_1 = executor.submit(send_email_task.apply, args=[message_1])

message_2 = {
"From": "[email protected]",
"To": "[email protected]",
}
future_2 = executor.submit(send_email_task.apply, args=[message_2])

future_1.result()
future_2.result()

first_call, second_call = results

# We don't particularly care about the exact order of these calls
# We just care that they didn't happen at the same time due to the lock

self.assertEqual(first_call["start"]["message"], first_call["end"]["message"])
self.assertEqual(second_call["start"]["message"], second_call["end"]["message"])
self.assertNotEqual(first_call["start"]["message"], second_call["start"]["message"])
self.assertNotEqual(first_call["end"]["message"], second_call["end"]["message"])

self.assertGreater(
second_call["start"]["time"],
first_call["end"]["time"],
"The second call should start after the end of the first call",
)
self.assertGreater(
second_call["start"]["time"],
first_call["start"]["time"] + SLEEP_TIME,
f"The second call should start at least {SLEEP_TIME} after the first started",
)

0 comments on commit 6dc4891

Please sign in to comment.