Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LTD-4652-send-email-task #230

Closed
wants to merge 11 commits into from
7 changes: 7 additions & 0 deletions conf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,3 +283,10 @@ def _build_redis_url(base_url, db_number, **query_args):
CELERY_TASK_ALWAYS_EAGER = env.bool("CELERY_TASK_ALWAYS_EAGER", False)
CELERY_TASK_STORE_EAGER_RESULT = env.bool("CELERY_TASK_STORE_EAGER_RESULT", False)
CELERY_TASK_SEND_SENT_EVENT = env.bool("CELERY_TASK_SEND_SENT_EVENT", True)

CACHES = {
"default": {
"BACKEND": "django.core.cache.backends.redis.RedisCache",
"LOCATION": REDIS_BASE_URL,
}
}
77 changes: 64 additions & 13 deletions mail/celery_tasks.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import time
import urllib.parse
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from smtplib import SMTPException
from typing import List, MutableMapping, Tuple
from contextlib import contextmanager
from django.core.cache import cache

from celery import Task, shared_task
from celery.utils.log import get_task_logger
from django.conf import settings
from django.db import transaction
from django.utils import timezone
from mail.libraries.email_message_dto import EmailMessageDto
from rest_framework.status import HTTP_207_MULTI_STATUS, HTTP_208_ALREADY_REPORTED

from mail import requests as mail_requests
from mail.enums import ReceptionStatusEnum, SourceEnum
from mail.libraries.builders import build_licence_data_mail
from mail.libraries.builders import build_email_message, build_email_rejected_licence_message, build_licence_data_mail
from mail.libraries.data_processors import build_request_mail_message_dto
from mail.libraries.routing_controller import check_and_route_emails, send, update_mail
from mail.libraries.usage_data_decomposition import build_json_payload_from_data_blocks, split_edi_data_by_id
Expand All @@ -22,6 +24,7 @@

logger = get_task_logger(__name__)


# Send Usage Figures to LITE API
def get_lite_api_url():
"""The URL for the licence usage callback, from the LITE_API_URL setting.
Expand Down Expand Up @@ -75,6 +78,7 @@ def _log_error(message, lite_usage_data_id):

MAX_ATTEMPTS = 3
RETRY_BACKOFF = 180
LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
CELERY_SEND_LICENCE_UPDATES_TASK_NAME = "mail.celery_tasks.send_licence_details_to_hmrc"
CELERY_MANAGE_INBOX_TASK_NAME = "mail.celery_tasks.manage_inbox"

Expand All @@ -91,14 +95,7 @@ def notify_users_of_rejected_licences(mail_id, mail_response_subject):
logger.info("Notifying users of rejected licences found in mail with subject %s", mail_response_subject)

try:
multipart_msg = MIMEMultipart()
multipart_msg["From"] = settings.EMAIL_USER
multipart_msg["To"] = ",".join(settings.NOTIFY_USERS)
multipart_msg["Subject"] = "Licence rejected by HMRC"
body = MIMEText(f"Mail (Id: {mail_id}) with subject {mail_response_subject} has rejected licences")
multipart_msg.attach(body)

smtp_send(multipart_msg)
send_smtp_task.apply_async(kwargs={"mail_id": mail_id, "mail_response_subject": mail_response_subject})

except SMTPException:
logger.exception(
Expand Down Expand Up @@ -162,8 +159,7 @@ def send_licence_details_to_hmrc():
"Created licenceData mail with subject %s for licences [%s]", mail_dto.subject, licence_references
)

send(mail_dto)
update_mail(mail, mail_dto)
send(mail_dto, mail)

# Mark the payloads as processed
licences.update(is_processed=True)
Expand Down Expand Up @@ -273,3 +269,58 @@ def manage_inbox():
exc_info=True,
)
raise exc


@contextmanager
def cache_lock(lock_id):
timeout_at = time.monotonic() + LOCK_EXPIRE - 3
# cache.add fails if the key already exists
status = cache.add(lock_id, "lock_acquired", LOCK_EXPIRE)
try:
yield status
finally:
if time.monotonic() < timeout_at and status:
cache.delete(lock_id)


class SendSmtpFailureTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
mail_id = kwargs.get("mail_id", "Unknown")
message = f"""
Task failed permanently after all retries: send_smtp_task
Mail ID: {mail_id}
Exception: {exc}
Args: {args}
Kwargs: {kwargs}
Task ID: {task_id}
Exception Info: {einfo}
"""

# Log the final failure message
logger.critical(message)


@shared_task(bind=True, max_retries=MAX_ATTEMPTS, retry_backoff=3, base=SendSmtpFailureTask)
def send_smtp_task(self, mail_id=None, email_message_data=None, mail_response_subject=None):
global_lock_id = "global_send_email_lock"

# From task notify_users_of_rejected_licences
if mail_id and mail_response_subject:
message = build_email_rejected_licence_message(mail_id, mail_response_subject)
else:
deserialized_email_message_dto = EmailMessageDto(**email_message_data)
message = build_email_message(deserialized_email_message_dto)

with cache_lock(global_lock_id) as acquired:
if acquired:
logger.info("Global lock acquired, sending email")
smtp_send(message)
logger.info(f"Mail with Response Subject:{message['Subject']}; successfully sent.")

# send_licence_details_to_hmrc and _collect_and_send -> update_mail were moved here if sucessfully sent
if mail_id and email_message_data:
mail = Mail.objects.filter(id=mail_id).first()
update_mail(mail, deserialized_email_message_dto)
else:
logger.info("Another send_smtp_task is currently in progress, will retry...")
raise self.retry()
11 changes: 11 additions & 0 deletions mail/libraries/builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,14 @@ def _validate_dto(email_message_dto):

if email_message_dto.attachment is None:
raise TypeError("None file attachment received!")


def build_email_rejected_licence_message(mail_id, mail_response_subject):
multipart_msg = MIMEMultipart()
multipart_msg["From"] = settings.EMAIL_USER
multipart_msg["To"] = ",".join(settings.NOTIFY_USERS)
multipart_msg["Subject"] = "Licence rejected by HMRC"
body = MIMEText(f"Mail (Id: {mail_id}) with subject {mail_response_subject} has rejected licences")
multipart_msg.attach(body)

return multipart_msg
19 changes: 12 additions & 7 deletions mail/libraries/routing_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

from django.conf import settings
from django.utils import timezone

from rest_framework.exceptions import ValidationError

from mail.auth import BasicAuthentication, ModernAuthentication
from mail.enums import ExtractTypeEnum, MailReadStatuses, ReceptionStatusEnum, SourceEnum
from mail.libraries.builders import build_email_message
from mail.libraries.data_processors import (
lock_db_for_sending_transaction,
serialize_email_message,
Expand All @@ -23,7 +23,7 @@
)
from mail.libraries.mailbox_service import get_message_iterator
from mail.models import Mail
from mail.servers import MailServer, smtp_send
from mail.servers import MailServer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -168,10 +168,16 @@ def update_mail(mail: Mail, mail_dto: EmailMessageDto):
mail.save()


def send(email_message_dto: EmailMessageDto):
def send(email_message_dto: EmailMessageDto, mail=None):
# Needs to be imported here otherwise you will get a circular import errors.
from mail.celery_tasks import send_smtp_task

logger.info("Preparing to send email")
message = build_email_message(email_message_dto)
smtp_send(message)
email_message_data = email_message_dto._asdict()
if mail:
send_smtp_task.apply_async(kwargs={"mail_id": mail.id, "email_message_data": email_message_data})
else:
send_smtp_task.apply_async(kwargs={"email_message_data": email_message_data})


def _collect_and_send(mail: Mail):
Expand All @@ -185,8 +191,7 @@ def _collect_and_send(mail: Mail):

if message_to_send_dto:
if message_to_send_dto.receiver != SourceEnum.LITE and message_to_send_dto.subject:
send(message_to_send_dto)
update_mail(mail, message_to_send_dto)
send(message_to_send_dto, mail)

logger.info(
"Mail [%s] routed from [%s] to [%s] with subject %s",
Expand Down
47 changes: 0 additions & 47 deletions mail/tests/test_celery_task.py

This file was deleted.

Loading