Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert entire image proxy route async #3388

Merged
merged 6 commits into from
Dec 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions api/api/utils/asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import asyncio
import logging
from collections.abc import Awaitable


parent_logger = logging.getLogger(__name__)


_do_not_wait_for_logger = parent_logger.getChild("do_not_wait_for")


def do_not_wait_for(awaitable: Awaitable) -> None:
"""
Consume an awaitable without waiting for it to finish.

This allows us to call an async function that we don't care about
the result of, without needing to wait for it to complete. This is
useful, for example, if some operation creates a side effect that
isn't necessary for the response we're processing (e.g., Redis tallying).
"""
try:
loop = asyncio.get_running_loop()
except RuntimeError as exc:
_do_not_wait_for_logger.error(
"`do_not_wait_for` must be called inside a running event loop."
)
raise exc

loop.create_task(awaitable)
87 changes: 60 additions & 27 deletions api/api/utils/image_proxy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@
from django.http import HttpResponse
from rest_framework.exceptions import UnsupportedMediaType

import aiohttp
import django_redis
import requests
import sentry_sdk
from asgiref.sync import sync_to_async
from sentry_sdk import push_scope, set_context

from api.utils.aiohttp import get_aiohttp_session
from api.utils.asyncio import do_not_wait_for
from api.utils.image_proxy.exception import UpstreamThumbnailException
from api.utils.image_proxy.extension import get_image_extension
from api.utils.image_proxy.photon import get_photon_request_params
Expand Down Expand Up @@ -75,11 +79,40 @@ def get_request_params_for_extension(
)


def get(
@sync_to_async
def _tally_response(
tallies,
media_info: MediaInfo,
month: str,
domain: str,
response: aiohttp.ClientResponse,
):
"""
Tally image proxy response without waiting for Redis to respond.

Pulled into a separate function to help reduce overload when skimming
the `get` function, which is complex enough as is.
"""
tallies.incr(f"thumbnail_response_code:{month}:{response.status}"),
tallies.incr(
f"thumbnail_response_code_by_domain:{domain}:" f"{month}:{response.status}"
)
tallies.incr(
f"thumbnail_response_code_by_provider:{media_info.media_provider}:"
f"{month}:{response.status}"
)


_UPSTREAM_TIMEOUT = aiohttp.ClientTimeout(15)


async def get(
media_info: MediaInfo,
request_config: RequestConfig = RequestConfig(),
) -> HttpResponse:
"""
Retrieve the proxied image.

sarayourfriend marked this conversation as resolved.
Show resolved Hide resolved
Proxy an image through Photon if its file type is supported, else return the
original image if the file type is SVG. Otherwise, raise an exception.
"""
Expand All @@ -88,9 +121,10 @@ def get(

logger = parent_logger.getChild("get")
tallies = django_redis.get_redis_connection("tallies")
tallies_incr = sync_to_async(tallies.incr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm missing a lot of sync-to-async context since this is the first such PR that I'm reviewing. Why do we not use async def, but need an sync_to_async for tallies?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using async def, unless I unintentionally missed it somewhere. async def is only necessary when awaiting, though.

sync_to_async is necessary to prevent context issues in the Redis client, which assumes that only one thread is every touching the connection pool. sync_to_async creates a safe context for such synchronous functions. Relevant to Django ORM as well. From Django's documentation:

The reason this is needed in Django is that many libraries, specifically database adapters, require that they are accessed in the same thread that they were created in. Also a lot of existing Django code assumes it all runs in the same thread, e.g. middleware adding things to a request for later use in views.

As an additional point of clarification, Django's own async cache support just wraps the synchronous versions of functions with sync_to_async, rather than using fully async clients. In the future we could switch to an asynchronous Redis client (like https://github.com/Andrew-Chen-Wang/django-async-redis, but I'm not sure that it's 100% stable), but for now it's not necessary and Django's own caching behaviour would still use the sync_to_async adapter approach for this anyway.

So the reason here is to create a new async version of tallies.incr that we can call later with await.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if I answered your question or if I misunderstood. The Django documentation on async adaption is a good read and important for understanding how and when it's necessary to adapt synchronous functions to an async context.

month = get_monthly_timestamp()

image_extension = get_image_extension(image_url, media_identifier)
image_extension = await get_image_extension(image_url, media_identifier)

headers = {"Accept": request_config.accept_header} | HEADERS

Expand All @@ -106,26 +140,35 @@ def get(
)

try:
upstream_response = requests.get(
session = await get_aiohttp_session()

upstream_response = await session.get(
upstream_url,
timeout=15,
timeout=_UPSTREAM_TIMEOUT,
params=params,
headers=headers,
)
tallies.incr(f"thumbnail_response_code:{month}:{upstream_response.status_code}")
tallies.incr(
f"thumbnail_response_code_by_domain:{domain}:"
f"{month}:{upstream_response.status_code}"
)
tallies.incr(
f"thumbnail_response_code_by_provider:{media_info.media_provider}:"
f"{month}:{upstream_response.status_code}"
do_not_wait_for(
_tally_response(tallies, media_info, month, domain, upstream_response)
)
upstream_response.raise_for_status()
status_code = upstream_response.status
content_type = upstream_response.headers.get("Content-Type")
logger.debug(
"Image proxy response status: %s, content-type: %s",
status_code,
content_type,
)
content = await upstream_response.content.read()
return HttpResponse(
content,
status=status_code,
content_type=content_type,
)
except Exception as exc:
exception_name = f"{exc.__class__.__module__}.{exc.__class__.__name__}"
key = f"thumbnail_error:{exception_name}:{domain}:{month}"
count = tallies.incr(key)
count = await tallies_incr(key)
if count <= settings.THUMBNAIL_ERROR_INITIAL_ALERT_THRESHOLD or (
count % settings.THUMBNAIL_ERROR_REPEATED_ALERT_FREQUENCY == 0
):
Expand All @@ -144,24 +187,14 @@ def get(
sentry_sdk.capture_exception(exc)
if isinstance(exc, requests.exceptions.HTTPError):
code = exc.response.status_code
tallies.incr(
f"thumbnail_http_error:{domain}:{month}:{code}:{exc.response.text}"
do_not_wait_for(
tallies_incr(
f"thumbnail_http_error:{domain}:{month}:{code}:{exc.response.text}"
)
)
logger.warning(
f"Failed to render thumbnail "
f"{upstream_url=} {code=} "
f"{media_info.media_provider=}"
)
raise UpstreamThumbnailException(f"Failed to render thumbnail. {exc}")

res_status = upstream_response.status_code
content_type = upstream_response.headers.get("Content-Type")
logger.debug(
f"Image proxy response status: {res_status}, content-type: {content_type}"
)

return HttpResponse(
upstream_response.content,
status=res_status,
content_type=content_type,
)
29 changes: 18 additions & 11 deletions api/api/utils/image_proxy/extension.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,50 @@
from os.path import splitext
from urllib.parse import urlparse

import aiohttp
import django_redis
import requests
import sentry_sdk
from asgiref.sync import sync_to_async

from api.utils.aiohttp import get_aiohttp_session
from api.utils.asyncio import do_not_wait_for
from api.utils.image_proxy.exception import UpstreamThumbnailException


def get_image_extension(image_url: str, media_identifier: str) -> str | None:
_HEAD_TIMEOUT = aiohttp.ClientTimeout(10)


async def get_image_extension(image_url: str, media_identifier) -> str | None:
cache = django_redis.get_redis_connection("default")
key = f"media:{media_identifier}:thumb_type"

ext = _get_file_extension_from_url(image_url)

if not ext:
# If the extension is not present in the URL, try to get it from the redis cache
ext = cache.get(key)
ext = await sync_to_async(cache.get)(key)
ext = ext.decode("utf-8") if ext else None

if not ext:
# If the extension is still not present, try getting it from the content type
try:
response = requests.head(image_url, timeout=10)
session = await get_aiohttp_session()
response = await session.head(image_url, timeout=_HEAD_TIMEOUT)
response.raise_for_status()
if response.headers and "Content-Type" in response.headers:
content_type = response.headers["Content-Type"]
ext = _get_file_extension_from_content_type(content_type)
else:
ext = None

do_not_wait_for(sync_to_async(cache.set)(key, ext if ext else "unknown"))
except Exception as exc:
sentry_sdk.capture_exception(exc)
raise UpstreamThumbnailException(
"Failed to render thumbnail due to inability to check media "
f"type. {exc}"
)
else:
if response.headers and "Content-Type" in response.headers:
content_type = response.headers["Content-Type"]
ext = _get_file_extension_from_content_type(content_type)
else:
ext = None

cache.set(key, ext if ext else "unknown")
return ext


Expand Down
5 changes: 1 addition & 4 deletions api/api/views/media_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ class InvalidSource(APIException):
default_code = "invalid_source"


image_proxy_aget = sync_to_async(image_proxy.get)


class MediaViewSet(AsyncViewSetMixin, AsyncAPIView, ReadOnlyModelViewSet):
view_is_async = True

Expand Down Expand Up @@ -296,7 +293,7 @@ async def thumbnail(self, request, *_, **__):

media_info = await self.get_image_proxy_media_info()

return await image_proxy_aget(
return await image_proxy.get(
media_info,
request_config=image_proxy.RequestConfig(
accept_header=request.headers.get("Accept", "image/*"),
Expand Down
29 changes: 26 additions & 3 deletions api/test/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,34 @@
import asyncio

import pytest
from asgiref.sync import async_to_sync

from conf.asgi import application


@pytest.fixture
def get_new_loop():
loops: list[asyncio.AbstractEventLoop] = []

def _get_new_loop() -> asyncio.AbstractEventLoop:
loop = asyncio.new_event_loop()
loops.append(loop)
return loop

yield _get_new_loop

for loop in loops:
loop.close()
krysal marked this conversation as resolved.
Show resolved Hide resolved


@pytest.fixture(scope="session")
def session_loop() -> asyncio.AbstractEventLoop:
loop = asyncio.new_event_loop()
yield loop
loop.close()
Comment on lines +23 to +27
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can't use get_new_loop because get_new_loop is intentionally function scoped, whereas the session loop, as its name implies, is scoped to the session, and therefore cannot depend on the function scoped fixture.



@pytest.fixture(scope="session", autouse=True)
def ensure_asgi_lifecycle():
def ensure_asgi_lifecycle(session_loop: asyncio.AbstractEventLoop):
"""
Call application shutdown lifecycle event.

Expand All @@ -27,4 +50,4 @@ async def shutdown():
return {"type": "lifespan.shutdown"}

yield
async_to_sync(application)(scope, shutdown, noop)
session_loop.run_until_complete(application(scope, shutdown, noop))
19 changes: 0 additions & 19 deletions api/test/unit/utils/test_aiohttp.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,6 @@
import asyncio

import pytest

from api.utils.aiohttp import get_aiohttp_session


@pytest.fixture(autouse=True)
def get_new_loop():
loops: list[asyncio.AbstractEventLoop] = []

def _get_new_loop():
loop = asyncio.new_event_loop()
loops.append(loop)
return loop

yield _get_new_loop

for loop in loops:
loop.close()


def test_reuses_session_within_same_loop(get_new_loop):
loop = get_new_loop()

Expand Down
Loading