Skip to content

Commit

Permalink
Use reusable aiohttp session and prevent async redis event loop closures
Browse files Browse the repository at this point in the history
  • Loading branch information
sarayourfriend committed Jul 21, 2023
1 parent 36d16fc commit cd6e699
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 63 deletions.
15 changes: 15 additions & 0 deletions api/api/utils/aiohttp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import aiohttp


# aiohttp recommends reusing the same session for the whole application
# https://docs.aiohttp.org/en/stable/client_quickstart.html#make-a-request
_SESSION: aiohttp.ClientSession = None


def get_aiohttp_session():
global _SESSION

if _SESSION is None or _SESSION.loop.is_closed():
_SESSION = aiohttp.ClientSession()

return _SESSION
14 changes: 8 additions & 6 deletions api/api/utils/check_dead_links/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from decouple import config
from elasticsearch_dsl.response import Hit

from api.utils.aiohttp import get_aiohttp_session
from api.utils.check_dead_links.provider_status_mappings import provider_status_mappings
from api.utils.dead_link_mask import get_query_mask, save_query_mask

Expand All @@ -32,9 +33,11 @@ def _get_expiry(status, default):
return config(f"LINK_VALIDATION_CACHE_EXPIRY__{status}", default=default, cast=int)


async def _head(url: str, session: aiohttp.ClientSession) -> tuple[str, int]:
async def _head(url: str, **kwargs) -> tuple[str, int]:
try:
async with session.head(url, allow_redirects=False) as response:
async with get_aiohttp_session().head(
url, allow_redirects=False, headers=HEADERS, **kwargs
) as response:
return url, response.status
except (aiohttp.ClientError, asyncio.TimeoutError) as exception:
_log_validation_failure(exception)
Expand All @@ -46,10 +49,9 @@ async def _head(url: str, session: aiohttp.ClientSession) -> tuple[str, int]:
async def _make_head_requests(urls: list[str]) -> list[tuple[str, int]]:
tasks = []
timeout = aiohttp.ClientTimeout(total=2)
async with aiohttp.ClientSession(headers=HEADERS, timeout=timeout) as session:
tasks = [asyncio.ensure_future(_head(url, session)) for url in urls]
responses = asyncio.gather(*tasks)
await responses
tasks = [asyncio.ensure_future(_head(url, timeout=timeout)) for url in urls]
responses = asyncio.gather(*tasks)
await responses
return responses.result()


Expand Down
67 changes: 32 additions & 35 deletions api/api/utils/image_proxy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from asgiref.sync import sync_to_async
from sentry_sdk import push_scope, set_context

from api.utils.aiohttp import get_aiohttp_session
from api.utils.image_proxy.exception import UpstreamThumbnailException
from api.utils.image_proxy.extension import get_image_extension
from api.utils.image_proxy.photon import get_photon_request_params
Expand Down Expand Up @@ -94,49 +95,45 @@ async def get(
is_full_size,
is_compressed,
)

async with aiohttp.ClientSession(
headers=headers, timeout=aiohttp.ClientTimeout(total=15)
) as client:
try:
upstream_response = await client.get(
upstream_url,
params=params,
)

try:
async with get_aiohttp_session().get(
upstream_url,
params=params,
headers=headers,
timeout=aiohttp.ClientTimeout(total=15),
) as upstream_response:
await incr(f"thumbnail_response_code:{month}:{upstream_response.status}")
await incr(
f"thumbnail_response_code_by_domain:{domain}:"
f"{month}:{upstream_response.status}"
)
upstream_response.raise_for_status()
body = await upstream_response.read()
except Exception as exc:
raise exc
exception_name = f"{exc.__class__.__module__}.{exc.__class__.__name__}"
key = f"thumbnail_error:{exception_name}:{domain}:{month}"
count = await incr(key)
if count <= settings.THUMBNAIL_ERROR_INITIAL_ALERT_THRESHOLD or (
count % settings.THUMBNAIL_ERROR_REPEATED_ALERT_FREQUENCY == 0
):
with push_scope() as scope:
set_context(
"upstream_url",
{
"url": upstream_url,
"params": params,
"headers": headers,
},
)
scope.set_tag(
"occurrences", settings.THUMBNAIL_ERROR_REPEATED_ALERT_FREQUENCY
)
sentry_sdk.capture_exception(exc)
if isinstance(exc, requests.exceptions.HTTPError):
await incr(
f"thumbnail_http_error:{domain}:{month}:{exc.response.status_code}:{exc.response.text}"
except Exception as exc:
exception_name = f"{exc.__class__.__module__}.{exc.__class__.__name__}"
key = f"thumbnail_error:{exception_name}:{domain}:{month}"
count = await incr(key)
if count <= settings.THUMBNAIL_ERROR_INITIAL_ALERT_THRESHOLD or (
count % settings.THUMBNAIL_ERROR_REPEATED_ALERT_FREQUENCY == 0
):
with push_scope() as scope:
set_context(
"upstream_url",
{
"url": upstream_url,
"params": params,
"headers": headers,
},
)
raise UpstreamThumbnailException(f"Failed to render thumbnail. {exc}")
scope.set_tag(
"occurrences", settings.THUMBNAIL_ERROR_REPEATED_ALERT_FREQUENCY
)
sentry_sdk.capture_exception(exc)
if isinstance(exc, requests.exceptions.HTTPError):
await incr(
f"thumbnail_http_error:{domain}:{month}:{exc.response.status_code}:{exc.response.text}"
)
raise UpstreamThumbnailException(f"Failed to render thumbnail. {exc}")

res_status = upstream_response.status
content_type = upstream_response.headers.get("Content-Type")
Expand Down
50 changes: 28 additions & 22 deletions api/api/utils/image_proxy/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,51 @@
from urllib.parse import urlparse

import aiohttp
import django_async_redis
import django_redis
import sentry_sdk
from asgiref.sync import sync_to_async

from api.utils.aiohttp import get_aiohttp_session
from api.utils.image_proxy.exception import UpstreamThumbnailException


async def get_image_extension(image_url: str, media_identifier: str) -> str | None:
cache = await django_async_redis.get_redis_connection("adefault")
"""
Retrieve image extension from host or cache.
Does not use async Redis client due to issues with `django-async-redis`
incorrectly closing the event loop during the request lifecycle.
"""
cache = django_redis.get_redis_connection("default")
key = f"media:{media_identifier}:thumb_type"

ext = _get_file_extension_from_url(image_url)

if not ext:
# If the extension is not present in the URL, try to get it from the redis cache
ext = await cache.get(key)
ext = await sync_to_async(cache.get)(key)
ext = ext.decode("utf-8") if ext else None

if not ext:
# If the extension is still not present, try getting it from the content type
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=10)
) as client:
try:
response = await client.head(image_url, timeout=10)
response.raise_for_status()
except Exception as exc:
sentry_sdk.capture_exception(exc)
raise UpstreamThumbnailException(
"Failed to render thumbnail due to inability to check media "
f"type. {exc}"
)
try:
response = await get_aiohttp_session().head(
image_url, timeout=aiohttp.ClientTimeout(total=10)
)
response.raise_for_status()
except Exception as exc:
sentry_sdk.capture_exception(exc)
raise UpstreamThumbnailException(
"Failed to render thumbnail due to inability to check media "
f"type. {exc}"
)
else:
if response.headers and "Content-Type" in response.headers:
content_type = response.headers["Content-Type"]
ext = _get_file_extension_from_content_type(content_type)
else:
if response.headers and "Content-Type" in response.headers:
content_type = response.headers["Content-Type"]
ext = _get_file_extension_from_content_type(content_type)
else:
ext = None
ext = None

await cache.set(key, ext if ext else "unknown")
await sync_to_async(cache.set)(key, ext if ext else "unknown")
return ext


Expand Down

0 comments on commit cd6e699

Please sign in to comment.