Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[card-cache-service] optimize caching #417

Merged
merged 2 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 64 additions & 65 deletions services/ui_backend_service/api/card.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@
DBPagination,
DBResponse,
)
from services.ui_backend_service.data.cache.card_cache_manager import wait_until_card_is_ready, CARD_API_HTML_WAIT_TIME
from services.ui_backend_service.data.cache.card_cache_manager import list_cards as list_cards_from_cache
import time
from aiohttp import web
import asyncio


class CardsApi(object):
def __init__(self, app, db, cache=None):
self.db = db
self.cache = getattr(cache, "artifact_cache", None)

self.cache = getattr(cache, "card_cache", None)
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/cards",
Expand Down Expand Up @@ -91,8 +94,7 @@ async def get_cards_list_for_task(self, request):
if not task:
return web_response(404, {"data": []})

invalidate_cache = query_param_enabled(request, "invalidate")
cards = await get_cards_for_task(self.cache, task, invalidate_cache)
cards = await get_card_list(self.cache, task, max_wait_time=1)

if cards is None:
# Handle edge: Cache failed to return anything, even errors.
Expand All @@ -104,7 +106,6 @@ async def get_cards_list_for_task(self, request):
{"id": data["id"], "hash": hash, "type": data["type"]}
for hash, data in cards.items()
]

# paginate list of cards
limit, page, offset = get_pagination_params(request)
_pages = max(len(card_hashes) // limit, 1)
Expand Down Expand Up @@ -141,31 +142,31 @@ async def get_card_content_by_hash(self, request):
schema:
$ref: '#/definitions/ResponsesError405'
"""

hash = request.match_info.get("hash")
task = await self.get_task_by_request(request)
if not task:
return web.Response(
content_type="text/html", status=404, body="Task not found."
)
invalidate_cache = query_param_enabled(request, "invalidate")

cards = await get_cards_for_task(
self.cache, task, invalidate_cache=invalidate_cache
cards = await get_card_html_for_task_async(
self.cache,
task,
hash,
)

if cards is None:
return web.Response(
content_type="text/html",
status=404,
body="Card not found for task. Possibly still being processed.",
body="Card not found for task. Possibly still being processed. Please refresh page to check again.",
)

if cards and hash in cards:
return web.Response(content_type="text/html", body=cards[hash]["html"])
else:
return web.Response(
content_type="text/html", status=404, body="Card not found for task."
content_type="text/html",
status=404,
body="Card not found for task.",
)

@handle_exceptions
Expand Down Expand Up @@ -194,16 +195,16 @@ async def get_card_data_by_hash(self, request):
schema:
$ref: '#/definitions/ResponsesError405'
"""

_hash = request.match_info.get("hash")
task = await self.get_task_by_request(request)
if not task:
return web.Response(
content_type="text/html", status=404, body="Task not found."
)
invalidate_cache = query_param_enabled(request, "invalidate")
data = await get_card_data_for_task(
self.cache, task, _hash, invalidate_cache=invalidate_cache
data = await get_card_data_for_task_async(
self.cache,
task,
_hash,
)

if data is None:
Expand All @@ -212,8 +213,21 @@ async def get_card_data_by_hash(self, request):
return web_response(200, data)


async def get_card_data_for_task(
cache_client, task, card_hash, invalidate_cache=False
def _card_data_from_cache(local_cache):
data = local_cache.read_data()
if data is None:
return None
return {
"data": data["data"],
"id": local_cache.card_id,
"type": local_cache.card_type,
}


async def get_card_html_for_task_async(
cache_client,
task,
card_hash,
) -> Optional[Dict[str, Dict]]:
"""
Return the card-data from the cache, or nothing.
Expand All @@ -232,43 +246,27 @@ async def get_card_data_for_task(
step_name=task.get("step_name"),
task_id=task.get("task_name") or task.get("task_id"),
)
pathspec_with_hash = "{pathspec}/{card_hash}".format(
pathspec=pathspec, card_hash=card_hash
_local_cache = cache_client.cache_manager.get_local_cache(pathspec, card_hash)
_html = await wait_until_card_is_ready(
cache_client.cache_manager, _local_cache, max_wait_time=CARD_API_HTML_WAIT_TIME
)
res = await cache_client.cache.GetCardData(pathspec, card_hash, invalidate_cache)

if res.has_pending_request():
async for event in res.stream():
if event["type"] == "error":
# raise error, there was an exception during fetching.
raise CardException(event["message"], event["id"], event["traceback"])
await res.wait() # wait until results are ready
data = res.get()
if data and pathspec_with_hash in data:
success, value, detail, trace = unpack_processed_value(data[pathspec_with_hash])
if success:
return value
else:
if value in ["card-not-present", "cannot-fetch-card"]:
return None
raise CardException(detail, value, trace)
return None
return _html


async def get_cards_for_task(
cache_client, task, invalidate_cache=False
async def get_card_data_for_task_async(
cache_client,
task,
card_hash,
) -> Optional[Dict[str, Dict]]:
"""
Return a dictionary of cards from the cache, or nothing.
Return the card-data from the cache, or nothing.

Example:
--------
{
"abc123": {
"id": 1,
"hash": "abc123",
"html": "htmlcontent"
}
"id": 1,
"hash": "abc123",
"data": {}
}
"""
pathspec = "{flow_id}/{run_id}/{step_name}/{task_id}".format(
Expand All @@ -277,25 +275,26 @@ async def get_cards_for_task(
step_name=task.get("step_name"),
task_id=task.get("task_name") or task.get("task_id"),
)
res = await cache_client.cache.GetCards([pathspec], invalidate_cache)

if res.has_pending_request():
async for event in res.stream():
if event["type"] == "error":
# raise error, there was an exception during fetching.
raise CardException(event["message"], event["id"], event["traceback"])
await res.wait() # wait until results are ready
data = res.get()
if data and pathspec in data:
success, value, detail, trace = unpack_processed_value(data[pathspec])
if success:
return value
else:
if value in ["card-not-present", "cannot-fetch-card"]:
return None
raise CardException(detail, value, trace)
await cache_client.cache_manager.register(pathspec)
_local_cache = cache_client.cache_manager.get_local_cache(pathspec, card_hash)
if not _local_cache.read_ready():
# Since this is a data update call we can return a 404 and the client
# should handle calling back so we only await at registration.
return None

return _card_data_from_cache(_local_cache)


return None
async def get_card_list(
cache_client, task, max_wait_time=3
):
pathspec = "{flow_id}/{run_id}/{step_name}/{task_id}".format(
flow_id=task.get("flow_id"),
run_id=task.get("run_id") or task.get("run_number"),
step_name=task.get("step_name"),
task_id=task.get("task_name") or task.get("task_id"),
)
return await list_cards_from_cache(cache_client.cache_manager, pathspec, max_wait_time)


def get_pagination_params(request):
Expand Down
Loading
Loading