Skip to content

Commit

Permalink
[card-cache-service] optimize caching
Browse files Browse the repository at this point in the history
 # Core Changes:
- created a background service that will poll all the cards/dataupdate for individual tasks
- The background service will run one process per taskspec for a small amount of time specified via the ENV vars
- This service will be launched when cards are requested.
- The reading of cards will be happenning directly from cache and the reads will be "best-effort"
- API routes to get a card / list cards will have async-waits until the cache is updated.
- The new optimization will require the MF GUI to also be up-to-date with the new server.
- Uses a new optimized mf client.
- Metaflow UI which keeps best effor polling new cards every 0.5 seconds can work best with new server.
- async routines that will clean up the cache and remove completed async-processes
- removed dead code which will no longer be used.

 # Why not use the existing cache client:
- The way the existing cache client works, it loads the entire `Task` / `Card` object in memory and then returns the html/data from it.
- This is inefficient because load the `Card` object does datastore list calls which are time expensive.
- Once the path to the cards/data-updates has been found, getting the actual object is very fast.
- For example, listing cards, takes ~ 1-2 seconds, but getting the actual card once the path is resolved takes ~ 10 milliseconds.
- The current cache actions are "stateless" meaning, that once the action is done, the previous state is lost when a new action is called.
- This stateless nature is not good for cards, where the data may change a lot more frequently but paths won't change.
- The new cache service retrives the object paths once and then keeps updating them until the background-process finishes execution.
- This approach improves latency drastically

 # Configuration Options:
- `CARD_CACHE_PROCESS_NO_CARD_WAIT_TIME` : How long should the process wait for a card to be available before it exits
- `CARD_CACHE_PROCESS_MAX_UPTIME` : The max duration the process should run
- `CARD_CACHE_CARD_LIST_POLLING_FREQUENCY` : How frequently should the process poll for listing new cards
- `CARD_CACHE_CARD_UPDATE_POLLING_FREQUENCY` : How frequently should the process poll for the card html content
- `CARD_CACHE_DATA_UPDATE_POLLING_FREQUENCY` : How frequently should the process poll for the data updates
- `CARD_CACHE_DISK_CLEANUP_INTERVAL`: The interval at which the cached cards are stored should be cleaned up
- `CARD_API_HTML_WAIT_TIME`: the timeperiod the card HTML retrieval API will max busy wait for the card to be ready before timing out and resulting in null response.
  • Loading branch information
valayDave committed Feb 28, 2024
1 parent 707c534 commit 3e7d0e0
Show file tree
Hide file tree
Showing 8 changed files with 788 additions and 285 deletions.
129 changes: 64 additions & 65 deletions services/ui_backend_service/api/card.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@
DBPagination,
DBResponse,
)
from services.ui_backend_service.data.cache.card_cache_manager import wait_until_card_is_ready, CARD_API_HTML_WAIT_TIME
from services.ui_backend_service.data.cache.card_cache_manager import list_cards as list_cards_from_cache
import time
from aiohttp import web
import asyncio


class CardsApi(object):
def __init__(self, app, db, cache=None):
self.db = db
self.cache = getattr(cache, "artifact_cache", None)

self.cache = getattr(cache, "card_cache", None)
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/cards",
Expand Down Expand Up @@ -91,8 +94,7 @@ async def get_cards_list_for_task(self, request):
if not task:
return web_response(404, {"data": []})

invalidate_cache = query_param_enabled(request, "invalidate")
cards = await get_cards_for_task(self.cache, task, invalidate_cache)
cards = await get_card_list(self.cache, task, max_wait_time=1)

if cards is None:
# Handle edge: Cache failed to return anything, even errors.
Expand All @@ -104,7 +106,6 @@ async def get_cards_list_for_task(self, request):
{"id": data["id"], "hash": hash, "type": data["type"]}
for hash, data in cards.items()
]

# paginate list of cards
limit, page, offset = get_pagination_params(request)
_pages = max(len(card_hashes) // limit, 1)
Expand Down Expand Up @@ -141,31 +142,31 @@ async def get_card_content_by_hash(self, request):
schema:
$ref: '#/definitions/ResponsesError405'
"""

hash = request.match_info.get("hash")
task = await self.get_task_by_request(request)
if not task:
return web.Response(
content_type="text/html", status=404, body="Task not found."
)
invalidate_cache = query_param_enabled(request, "invalidate")

cards = await get_cards_for_task(
self.cache, task, invalidate_cache=invalidate_cache
cards = await get_card_html_for_task_async(
self.cache,
task,
hash,
)

if cards is None:
return web.Response(
content_type="text/html",
status=404,
body="Card not found for task. Possibly still being processed.",
body="Card not found for task. Possibly still being processed. Please refresh page to check again.",
)

if cards and hash in cards:
return web.Response(content_type="text/html", body=cards[hash]["html"])
else:
return web.Response(
content_type="text/html", status=404, body="Card not found for task."
content_type="text/html",
status=404,
body="Card not found for task.",
)

@handle_exceptions
Expand Down Expand Up @@ -194,16 +195,16 @@ async def get_card_data_by_hash(self, request):
schema:
$ref: '#/definitions/ResponsesError405'
"""

_hash = request.match_info.get("hash")
task = await self.get_task_by_request(request)
if not task:
return web.Response(
content_type="text/html", status=404, body="Task not found."
)
invalidate_cache = query_param_enabled(request, "invalidate")
data = await get_card_data_for_task(
self.cache, task, _hash, invalidate_cache=invalidate_cache
data = await get_card_data_for_task_async(
self.cache,
task,
_hash,
)

if data is None:
Expand All @@ -212,8 +213,21 @@ async def get_card_data_by_hash(self, request):
return web_response(200, data)


async def get_card_data_for_task(
cache_client, task, card_hash, invalidate_cache=False
def _card_data_from_cache(local_cache):
data = local_cache.read_data()
if data is None:
return None
return {
"data": data["data"],
"id": local_cache.card_id,
"type": local_cache.card_type,
}


async def get_card_html_for_task_async(
cache_client,
task,
card_hash,
) -> Optional[Dict[str, Dict]]:
"""
Return the card-data from the cache, or nothing.
Expand All @@ -232,43 +246,27 @@ async def get_card_data_for_task(
step_name=task.get("step_name"),
task_id=task.get("task_name") or task.get("task_id"),
)
pathspec_with_hash = "{pathspec}/{card_hash}".format(
pathspec=pathspec, card_hash=card_hash
_local_cache = cache_client.cache_manager.get_local_cache(pathspec, card_hash)
_html = await wait_until_card_is_ready(
cache_client.cache_manager, _local_cache, max_wait_time=CARD_API_HTML_WAIT_TIME
)
res = await cache_client.cache.GetCardData(pathspec, card_hash, invalidate_cache)

if res.has_pending_request():
async for event in res.stream():
if event["type"] == "error":
# raise error, there was an exception during fetching.
raise CardException(event["message"], event["id"], event["traceback"])
await res.wait() # wait until results are ready
data = res.get()
if data and pathspec_with_hash in data:
success, value, detail, trace = unpack_processed_value(data[pathspec_with_hash])
if success:
return value
else:
if value in ["card-not-present", "cannot-fetch-card"]:
return None
raise CardException(detail, value, trace)
return None
return _html


async def get_cards_for_task(
cache_client, task, invalidate_cache=False
async def get_card_data_for_task_async(
cache_client,
task,
card_hash,
) -> Optional[Dict[str, Dict]]:
"""
Return a dictionary of cards from the cache, or nothing.
Return the card-data from the cache, or nothing.
Example:
--------
{
"abc123": {
"id": 1,
"hash": "abc123",
"html": "htmlcontent"
}
"id": 1,
"hash": "abc123",
"data": {}
}
"""
pathspec = "{flow_id}/{run_id}/{step_name}/{task_id}".format(
Expand All @@ -277,25 +275,26 @@ async def get_cards_for_task(
step_name=task.get("step_name"),
task_id=task.get("task_name") or task.get("task_id"),
)
res = await cache_client.cache.GetCards([pathspec], invalidate_cache)

if res.has_pending_request():
async for event in res.stream():
if event["type"] == "error":
# raise error, there was an exception during fetching.
raise CardException(event["message"], event["id"], event["traceback"])
await res.wait() # wait until results are ready
data = res.get()
if data and pathspec in data:
success, value, detail, trace = unpack_processed_value(data[pathspec])
if success:
return value
else:
if value in ["card-not-present", "cannot-fetch-card"]:
return None
raise CardException(detail, value, trace)
await cache_client.cache_manager.register(pathspec)
_local_cache = cache_client.cache_manager.get_local_cache(pathspec, card_hash)
if not _local_cache.read_ready():
# Since this is a data update call we can return a 404 and the client
# should handle calling back so we only await at registration.
return None

return _card_data_from_cache(_local_cache)


return None
async def get_card_list(
cache_client, task, max_wait_time=3
):
pathspec = "{flow_id}/{run_id}/{step_name}/{task_id}".format(
flow_id=task.get("flow_id"),
run_id=task.get("run_id") or task.get("run_number"),
step_name=task.get("step_name"),
task_id=task.get("task_name") or task.get("task_id"),
)
return await list_cards_from_cache(cache_client.cache_manager, pathspec, max_wait_time)


def get_pagination_params(request):
Expand Down
Loading

0 comments on commit 3e7d0e0

Please sign in to comment.