Skip to content

Commit

Permalink
get_paginated_jobs, get_paginated_uploads: merge, cache job outcome s…
Browse files Browse the repository at this point in the history
…tat processing

the logic is effectively repeated between the two functions - take
advantage of this to make a common implementation that conditionally
redis-caches the result. the thinking here being that once all a
job's notifications are in a "completed" status they are unlikely
to change again and we can save ourselves from scanning the
notifications table again for up to 100k rows per job. being able
to share this cache between the get_paginated_jobs and
get_paginated_uploads is a bonus.
  • Loading branch information
risicle committed Dec 27, 2024
1 parent 8d06f3b commit 07047fe
Show file tree
Hide file tree
Showing 6 changed files with 356 additions and 49 deletions.
32 changes: 31 additions & 1 deletion app/dao/jobs_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
from datetime import datetime, timedelta

from flask import current_app
from notifications_utils.clients.redis import RequestCache
from notifications_utils.letter_timings import (
CANCELLABLE_JOB_LETTER_STATUSES,
letter_can_be_cancelled,
)
from sqlalchemy import and_, asc, desc, func

from app import db
from app import db, redis_store
from app.constants import (
JOB_STATUS_CANCELLED,
JOB_STATUS_FINISHED,
Expand All @@ -17,8 +18,10 @@
LETTER_TYPE,
NOTIFICATION_CANCELLED,
NOTIFICATION_CREATED,
NOTIFICATION_STATUS_TYPES_COMPLETED,
)
from app.dao.dao_utils import autocommit
from app.dao.fact_notification_status_dao import fetch_notification_statuses_for_job
from app.dao.templates_dao import dao_get_template_by_id
from app.models import (
FactNotificationStatus,
Expand Down Expand Up @@ -258,3 +261,30 @@ def find_missing_row_for_job(job_id, job_size):
.filter(Notification.job_row_number == None) # noqa
)
return query.all()


redis_cache = RequestCache(redis_store)


@redis_cache.set("job-{job_id}-notification-outcomes", ttl_in_seconds=timedelta(days=1).total_seconds())
def get_possibly_cached_notification_outcomes_for_job(
job_id: uuid.UUID | str, notification_count: int | None, processing_started: datetime | None
):
if processing_started is None:
statuses = []
elif processing_started.replace(tzinfo=None) < midnight_n_days_ago(3):
# ft_notification_status table
statuses = fetch_notification_statuses_for_job(job_id)
else:
# notifications table
statuses = dao_get_notification_outcomes_for_job(job_id)

return RequestCache.CacheResultWrapper(
value=[{"status": status.status, "count": status.count} for status in statuses],
# cache if all rows of the job are accounted for and no
# notifications are in a state still likely to change
cache_decision=bool(
sum(status.count for status in statuses) == notification_count
and all(status.status in NOTIFICATION_STATUS_TYPES_COMPLETED for status in statuses)
),
)
18 changes: 5 additions & 13 deletions app/job/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
JOB_STATUS_SCHEDULED,
LETTER_TYPE,
)
from app.dao.fact_notification_status_dao import (
fetch_notification_statuses_for_job,
)
from app.dao.jobs_dao import (
can_letter_job_be_cancelled,
dao_cancel_letter_job,
Expand All @@ -24,6 +21,7 @@
dao_get_scheduled_job_by_id_and_service_id,
dao_get_scheduled_job_stats,
dao_update_job,
get_possibly_cached_notification_outcomes_for_job,
)
from app.dao.notifications_dao import (
dao_get_notification_count_for_job_id,
Expand All @@ -39,7 +37,7 @@
notifications_filter_schema,
unarchived_template_schema,
)
from app.utils import midnight_n_days_ago, pagination_links
from app.utils import pagination_links

job_blueprint = Blueprint("job", __name__, url_prefix="/service/<uuid:service_id>/job")

Expand Down Expand Up @@ -220,15 +218,9 @@ def get_paginated_jobs(
start = job_data["processing_started"]
start = dateutil.parser.parse(start).replace(tzinfo=None) if start else None

if start is None:
statistics = []
elif start.replace(tzinfo=None) < midnight_n_days_ago(3):
# ft_notification_status table
statistics = fetch_notification_statuses_for_job(job_data["id"])
else:
# notifications table
statistics = dao_get_notification_outcomes_for_job(job_data["id"])
job_data["statistics"] = [{"status": statistic.status, "count": statistic.count} for statistic in statistics]
job_data["statistics"] = get_possibly_cached_notification_outcomes_for_job(
job_data["id"], job_data["notification_count"], start
)

return {
"data": data,
Expand Down
23 changes: 5 additions & 18 deletions app/upload/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@

from flask import Blueprint, abort, current_app, jsonify, request

from app.dao.fact_notification_status_dao import (
fetch_notification_statuses_for_job,
)
from app.dao.jobs_dao import dao_get_notification_outcomes_for_job
from app.dao.jobs_dao import get_possibly_cached_notification_outcomes_for_job
from app.dao.uploads_dao import (
dao_get_uploaded_letters_by_print_date,
dao_get_uploads_by_service_id,
)
from app.errors import register_errors
from app.schemas import notification_with_template_schema
from app.utils import midnight_n_days_ago, pagination_links
from app.utils import pagination_links

upload_blueprint = Blueprint("upload", __name__, url_prefix="/service/<uuid:service_id>/upload")

Expand Down Expand Up @@ -49,19 +46,9 @@ def get_paginated_uploads(service_id, limit_days, page):
"recipient": upload.recipient,
}
if upload.upload_type == "job":
start = upload.processing_started

if start is None:
statistics = []
elif start.replace(tzinfo=None) < midnight_n_days_ago(3):
# ft_notification_status table
statistics = fetch_notification_statuses_for_job(upload.id)
else:
# notifications table
statistics = dao_get_notification_outcomes_for_job(upload.id)
upload_dict["statistics"] = [
{"status": statistic.status, "count": statistic.count} for statistic in statistics
]
upload_dict["statistics"] = get_possibly_cached_notification_outcomes_for_job(
upload.id, upload.notification_count, upload.processing_started
)
else:
upload_dict["statistics"] = []
data.append(upload_dict)
Expand Down
196 changes: 196 additions & 0 deletions tests/app/dao/test_jobs_dao.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
import uuid
from collections import Counter
from datetime import datetime, timedelta
from functools import partial

Expand All @@ -20,9 +22,11 @@
dao_update_job,
find_jobs_with_missing_rows,
find_missing_row_for_job,
get_possibly_cached_notification_outcomes_for_job,
)
from app.models import Job
from tests.app.db import (
create_ft_notification_status,
create_job,
create_notification,
create_service,
Expand Down Expand Up @@ -566,3 +570,195 @@ def test_unique_key_on_job_id_and_job_row_number_no_error_if_row_number_for_diff
job_2 = create_job(template=sample_email_template)
create_notification(job=job_1, job_row_number=0)
create_notification(job=job_2, job_row_number=0)


@pytest.mark.parametrize(
[
"notification_statuses",
"notification_count",
"processing_started",
"create_notifications",
"expect_redis_set",
"expected_retval",
],
(
(
(
"permanent-failure",
"delivered",
"permanent-failure",
"technical-failure",
"technical-failure",
"permanent-failure",
),
6,
datetime.fromisoformat("2020-02-10T09:00:00"),
True,
True,
[
{"status": "delivered", "count": 1},
{"status": "permanent-failure", "count": 3},
{"status": "technical-failure", "count": 2},
],
),
(
(
"sent",
"delivered",
"delivered",
"delivered",
),
4,
datetime.fromisoformat("2020-02-09T09:00:00"),
True,
True,
[
{"status": "delivered", "count": 3},
{"status": "sent", "count": 1},
],
),
(
(
"technical-failure",
"delivered",
"technical-failure",
),
3,
datetime.fromisoformat("2020-02-04T09:00:00"), # so from ft_notification_status
False,
True,
[
{"status": "technical-failure", "count": 2},
{"status": "delivered", "count": 1},
],
),
(
(
"technical-failure",
"delivered",
"created",
"technical-failure",
),
4,
datetime.fromisoformat("2020-02-09T23:59:58"),
True,
False, # because non-complete status
[
{"status": "technical-failure", "count": 2},
{"status": "created", "count": 1},
{"status": "delivered", "count": 1},
],
),
(
(
"sent",
"delivered",
"delivered",
),
4,
datetime.fromisoformat("2020-02-10T09:00:00"),
True,
False, # because missing rows
[
{"status": "delivered", "count": 2},
{"status": "sent", "count": 1},
],
),
(
(
"delivered",
"created",
"delivered",
),
3,
None,
True,
False, # because non-complete status
[],
),
),
)
def test_get_possibly_cached_notification_outcomes_for_job_empty_cache(
sample_email_template,
notification_statuses,
notification_count,
processing_started,
create_notifications,
expect_redis_set,
expected_retval,
mocker,
):
call_datetime = datetime.fromisoformat("2020-02-10T10:00:00")
ft_status_bins = Counter()

with freeze_time(processing_started) as frozen_time:
job = create_job(template=sample_email_template, processing_started=processing_started)
for i, status in enumerate(notification_statuses):
frozen_time.tick()

if create_notifications:
create_notification(job=job, status=status, job_row_number=i)

d = datetime.now().date()
if d != call_datetime.date():
ft_status_bins[d, status] += 1

for (d, status), count in ft_status_bins.items():
create_ft_notification_status(
bst_date=d,
notification_type="email",
service=job.service,
job=job,
template=job.template,
key_type="normal",
notification_status=status,
count=count,
)

mock_redis_get = mocker.patch("app.redis_store.get", return_value=None)
mock_redis_set = mocker.patch("app.redis_store.set", return_value=None)

with freeze_time(call_datetime):
retval = get_possibly_cached_notification_outcomes_for_job(job.id, notification_count, processing_started)

assert sorted(retval, key=lambda x: x["status"]) == sorted(expected_retval, key=lambda x: x["status"])

if expect_redis_set:
assert mock_redis_set.mock_calls == [
mocker.call(
f"job-{job.id}-notification-outcomes", json.dumps(retval), ex=timedelta(days=1).total_seconds()
),
]
else:
assert not mock_redis_set.mock_calls

assert mock_redis_get.mock_calls == [
mocker.call(f"job-{job.id}-notification-outcomes"),
]


def test_get_possibly_cached_notification_outcomes_for_job_present_cache(
fake_uuid,
mocker,
):
mocker.patch(
"app.dao.jobs_dao.fetch_notification_statuses_for_job",
side_effect=AssertionError("fetch_notification_statuses_for_job call not expected"),
)
mocker.patch(
"app.dao.jobs_dao.dao_get_notification_outcomes_for_job",
side_effect=AssertionError("dao_get_notification_outcomes_for_job call not expected"),
)

mock_redis_get = mocker.patch(
"app.redis_store.get", return_value=b'[{"status": "delivered", "count": 12}, {"status": "sent", "count": 34}]'
)
mocker.patch("app.redis_store.set", return_value=AssertionError("redis set call not expected"))

retval = get_possibly_cached_notification_outcomes_for_job(fake_uuid, 46, datetime.now())

assert retval == [{"status": "delivered", "count": 12}, {"status": "sent", "count": 34}]

assert mock_redis_get.mock_calls == [
mocker.call(f"job-{fake_uuid}-notification-outcomes"),
]
Loading

0 comments on commit 07047fe

Please sign in to comment.