Skip to content

Commit

Permalink
Merge pull request #293 from uktrade/LTD-5748-email-task-failures
Browse files Browse the repository at this point in the history
Ltd 5748 email task failures
  • Loading branch information
kevincarrogan authored Dec 30, 2024
2 parents baa71a2 + 6dc4891 commit 5cf9cce
Show file tree
Hide file tree
Showing 9 changed files with 695 additions and 587 deletions.
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ typing-extensions ="~=4.3.0"
django-log-formatter-asim = "~=0.0.5"
dbt-copilot-python = "~=0.2.1"
dj-database-url = "~=2.2.0"
django-redis = "~=5.4.0"

[requires]
python_version = "3.9"
Expand Down
595 changes: 313 additions & 282 deletions Pipfile.lock

Large diffs are not rendered by default.

15 changes: 12 additions & 3 deletions conf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,11 @@ def _build_redis_url(base_url, db_number, **query_args):

CACHES = {
"default": {
"BACKEND": "django.core.cache.backends.redis.RedisCache",
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": REDIS_BASE_URL,
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
},
}
}

Expand Down Expand Up @@ -332,8 +335,11 @@ def _build_redis_url(base_url, db_number, **query_args):

CACHES = {
"default": {
"BACKEND": "django.core.cache.backends.redis.RedisCache",
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": REDIS_BASE_URL,
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
},
}
}

Expand Down Expand Up @@ -379,8 +385,11 @@ def _build_redis_url(base_url, db_number, **query_args):

CACHES = {
"default": {
"BACKEND": "django.core.cache.backends.redis.RedisCache",
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": REDIS_BASE_URL,
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
},
}
}

Expand Down
62 changes: 18 additions & 44 deletions mail/celery_tasks.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import time
import urllib.parse

from smtplib import SMTPException
from typing import List, MutableMapping, Tuple

from celery import Task, shared_task
from celery.utils.log import get_task_logger
from contextlib import contextmanager
from django.conf import settings
from django.core.cache import cache
from django.db import transaction
Expand Down Expand Up @@ -78,30 +75,13 @@ def _log_error(message, lite_usage_data_id):
logger.error("Failed to send LITE UsageData [{%s}] to LITE API -> {%s}", lite_usage_data_id, message)


MAX_ATTEMPTS = 3
MAX_RETRIES = 3
RETRY_BACKOFF = 180
LOCK_EXPIRE = 60 * 10 # secs (10 min)
CELERY_SEND_LICENCE_UPDATES_TASK_NAME = "mail.celery_tasks.send_licence_details_to_hmrc"
CELERY_MANAGE_INBOX_TASK_NAME = "mail.celery_tasks.manage_inbox"


class SMTPConnectionBusy(SMTPException):
pass


@contextmanager
def cache_lock(lock_id):
timeout_at = time.monotonic() + LOCK_EXPIRE - 3
# cache.add fails if the key already exists.
# return True if lock is acquired, False otherwise
status = cache.add(lock_id, "lock_acquired", LOCK_EXPIRE)
try:
yield status
finally:
if time.monotonic() < timeout_at and status:
cache.delete(lock_id)


class SendEmailBaseTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
message = """
Expand All @@ -116,8 +96,11 @@ def on_failure(self, exc, task_id, args, kwargs, einfo):


@shared_task(
autoretry_for=(SMTPConnectionBusy, SMTPException),
max_retries=MAX_ATTEMPTS,
autoretry_for=(
ConnectionResetError,
SMTPException,
),
max_retries=MAX_RETRIES,
retry_backoff=RETRY_BACKOFF,
base=SendEmailBaseTask,
serializer="pickle",
Expand All @@ -134,25 +117,20 @@ def send_email_task(message):
This is achieved by deferring all email sending functionality to this task.
Before sending email it first tries to acquire a lock.
- If there are no active connections then it acquires lock and sends email.
In some cases we need to update state which is handled in subtask linked to this task.
- If there is active connection (lock acquisition fails) then it raises an exception
which triggers a retry.
If all retries fail then manual intervention may be required (unlikely)
- If there is already an active connection then it will block until it is closed.
- In some cases we need to update state which is handled in subtask linked to this task.
- If all retries fail then manual intervention may be required (unlikely)
"""

global_lock_id = "global_send_email_lock"

with cache_lock(global_lock_id) as lock_acquired:
if not lock_acquired:
logger.exception("Another SMTP connection is active, will be retried after backing off")
raise SMTPConnectionBusy()

with cache.lock(global_lock_id, timeout=LOCK_EXPIRE):
logger.info("Lock acquired, proceeding to send email from %s to %s", message["From"], message["To"])

try:
smtp_send(message)
except SMTPException:
logger.exception("An unexpected error occurred when sending email -> %s")
except (SMTPException, ConnectionResetError):
logger.exception("An unexpected error occurred when sending email")
raise

logger.info("Email sent successfully to %s", message["To"])
Expand Down Expand Up @@ -238,7 +216,7 @@ def on_failure(self, exc, task_id, args, kwargs, einfo):

@shared_task(
autoretry_for=(EdifactValidationError,),
max_retries=MAX_ATTEMPTS,
max_retries=MAX_RETRIES,
retry_backoff=RETRY_BACKOFF,
base=SendLicenceDetailsBaseTask,
)
Expand Down Expand Up @@ -293,7 +271,7 @@ def send_licence_details_to_hmrc():

@shared_task(
autoretry_for=(Exception,),
max_retries=MAX_ATTEMPTS,
max_retries=MAX_RETRIES,
retry_backoff=True,
base=SendUsageDataBaseTask,
)
Expand Down Expand Up @@ -369,7 +347,7 @@ def send_licence_usage_figures_to_lite_api(lite_usage_data_id):
# Scan Inbox for SPIRE and HMRC Emails
@shared_task(
autoretry_for=(Exception,),
max_retries=MAX_ATTEMPTS,
max_retries=MAX_RETRIES,
retry_backoff=RETRY_BACKOFF,
)
def manage_inbox():
Expand All @@ -378,10 +356,6 @@ def manage_inbox():
logger.info("Polling inbox for updates")
try:
check_and_route_emails()
except Exception as exc: # noqa
logger.error(
"An unexpected error occurred when polling inbox for updates -> %s",
type(exc).__name__,
exc_info=True,
)
raise exc
except Exception: # noqa
logger.exception("An unexpected error occurred when polling inbox for updates")
raise
19 changes: 13 additions & 6 deletions mail/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ class Meta:
db_table = "mail"
ordering = ["created_at"]

def __str__(self):
return f"{self.__class__.__name__} object (id={self.id}, status={self.status})"
def __repr__(self):
return f"id={self.id} status={self.status}"

def save(self, *args, **kwargs):
if not self.edi_data or not self.edi_filename:
Expand Down Expand Up @@ -101,6 +101,13 @@ class LicenceData(models.Model):
class Meta:
ordering = ["mail__created_at"]

def __repr__(self):
source = self.source
if source == SourceEnum.SPIRE:
source = f"{source} ({self.source_run_number})"

return f"hmrc_run_number={self.hmrc_run_number} source={source} status={self.mail.status}"

def set_licence_ids(self, data: List):
self.licence_ids = json.dumps(data)

Expand Down Expand Up @@ -166,8 +173,8 @@ def save(self, *args, **kwargs):
if settings.CHIEF_SOURCE_SYSTEM == ChiefSystemEnum.SPIRE:
LicenceIdMapping.objects.get_or_create(lite_id=self.lite_id, reference=self.reference)

def __str__(self):
return f"LicencePayload(lite_id={self.lite_id}, reference={self.reference}, action={self.action})"
def __repr__(self):
return f"lite_id={self.lite_id} reference={self.reference} action={self.action}"


class LicenceIdMapping(models.Model):
Expand Down Expand Up @@ -211,5 +218,5 @@ class MailReadStatus(TimeStampedModel):
status = models.TextField(choices=MailReadStatuses.choices, default=MailReadStatuses.UNREAD, db_index=True)
mailbox = models.ForeignKey(MailboxConfig, on_delete=models.CASCADE)

def __str__(self):
return f"{self.__class__.__name__}(message_id={self.message_id}, status={self.status})"
def __repr__(self):
return f"message_id={self.message_id} status={self.status}"
Loading

0 comments on commit 5cf9cce

Please sign in to comment.