Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: support streaming large logs #423

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ ENV FEATURE_RUN_GROUPS=0
ENV FEATURE_DEBUG_VIEW=1

RUN apt-get update -y \
&& apt-get -y install libpq-dev unzip gcc curl
&& apt-get -y install libpq-dev unzip gcc curl git

RUN pip3 install virtualenv requests

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.ui_service
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ ARG CUSTOM_QUICKLINKS
ENV CUSTOM_QUICKLINKS=$CUSTOM_QUICKLINKS

RUN apt-get update -y \
&& apt-get -y install libpq-dev unzip gcc curl
&& apt-get -y install libpq-dev unzip gcc curl git

ADD services/__init__.py /root/services/__init__.py
ADD services/data /root/services/data
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.development.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ services:
- MF_UI_METADATA_HOST=${MF_UI_METADATA_HOST:-0.0.0.0}
- MF_METADATA_DB_POOL_MIN=1
- MF_METADATA_DB_POOL_MAX=10
- METAFLOW_S3_RETRY_COUNT=0
- LOGLEVEL=INFO
- AIOPG_ECHO=0
- UI_ENABLED=0
Expand All @@ -51,6 +52,7 @@ services:
- NOTIFICATIONS=$NOTIFICATIONS
- GA_TRACKING_ID=none
- PLUGINS=$PLUGINS
- AWS_PROFILE=$AWS_PROFILE
depends_on:
- migration
metadata:
Expand Down
28 changes: 22 additions & 6 deletions services/ui_backend_service/api/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from services.data.db_utils import DBResponse, translate_run_key, translate_task_key, DBPagination, DBResponse
from services.utils import handle_exceptions, web_response
from .utils import format_response_list, get_pathspec_from_request
from .utils import format_response_list, get_pathspec_from_request, logger

from aiohttp import web
from multidict import MultiDict
Expand Down Expand Up @@ -240,8 +240,10 @@ async def get_task_log_file(self, request, logtype=STDOUT):
attempt=task['attempt_id']
)

lines, _ = await read_and_output(self.cache, task, logtype, output_raw=True)
return file_download_response(log_filename, lines)
def _gen():
return stream_pages(self.cache, task, logtype, output_raw=True)

return await file_download_response(request, log_filename, _gen)


async def read_and_output(cache_client, task, logtype, limit=0, page=1, reverse_order=False, output_raw=False):
Expand All @@ -263,6 +265,15 @@ async def read_and_output(cache_client, task, logtype, limit=0, page=1, reverse_

return log_response["content"], log_response["pages"]

async def stream_pages(cache_client, task, logtype, output_raw):
page = 1
while True:
logs, _ = await read_and_output(cache_client, task, logtype, limit=1000, page=page, output_raw=output_raw)
if not logs:
break
yield logs
page += 1


def get_pagination_params(request):
"""extract pagination params from request
Expand All @@ -281,12 +292,17 @@ def get_pagination_params(request):
return limit, page, reverse_order


def file_download_response(filename, body):
return web.Response(
async def file_download_response(request, filename, async_line_iterator):
response = web.StreamResponse(
headers=MultiDict({'Content-Disposition': 'Attachment;filename={}'.format(filename)}),
body=body
)
await response.prepare(request)
# NOTE: this can not handle errors thrown by the cache, as status cannot be changed after .prepare() has been called.
async for lines in async_line_iterator():
await response.write(lines.encode("utf-8"))

await response.write_eof()
return response

class LogException(Exception):
def __init__(self, msg='Failed to read log', id='log-error', traceback_str=None):
Expand Down
42 changes: 26 additions & 16 deletions services/ui_backend_service/data/cache/get_log_file_action.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import hashlib
import json

from typing import Dict, List, Optional, Tuple
from typing import Callable, Dict, List, Optional, Tuple
from .client import CacheAction
from .utils import streamed_errors
import os
Expand Down Expand Up @@ -137,15 +137,18 @@ def execute(cls,
log_hash_changed = previous_log_hash is None or previous_log_hash != current_hash

if log_hash_changed:
content = log_provider.get_log_content(task, logtype)
results[log_key] = json.dumps({"log_hash": current_hash, "content": content})
# log_provider.get_log_content(task, logtype)
results[log_key] = json.dumps({"log_hash": current_hash})
else:
results = {**existing_keys}

if log_hash_changed or result_key not in existing_keys:
def content_iter():
return log_provider.get_log_content(task, logtype)

results[result_key] = json.dumps(
paginated_result(
json.loads(results[log_key])["content"],
content_iter,
page,
limit,
reverse,
Expand Down Expand Up @@ -204,15 +207,11 @@ def get_log_content(task: Task, logtype: str):
stream = 'stderr' if logtype == STDERR else 'stdout'
log_location = task.metadata_dict.get('log_location_%s' % stream)
if log_location:
return [
(None, line)
for line in task._load_log_legacy(log_location, stream).split("\n")
]
for line in task._load_log_legacy(log_location, stream).split("\n"):
yield (None, line)
else:
return [
(_datetime_to_epoch(datetime), line)
for datetime, line in task.loglines(stream)
]
for datetime, line in task.stream_loglines(stream):
yield (_datetime_to_epoch(datetime), line)


class LogProviderBase:
Expand Down Expand Up @@ -298,13 +297,24 @@ def get_log_content(self, task: Task, logtype: str):
return get_log_content(task, logtype)


def paginated_result(content: List[Tuple[Optional[int], str]], page: int = 1, limit: int = 0,
def paginated_result(content_iterator: Callable, page: int = 1, limit: int = 0,
reverse_order: bool = False, output_raw=False):
if not output_raw:
loglines, total_pages = format_loglines(content, page, limit, reverse_order)
# TODO: support line iteration for this case as well.
# Currently unsupported due to the need to offer 'reverse_order'
loglines, total_pages = format_loglines(content_iterator(), page, limit, reverse_order)
else:
loglines = "\n".join(line for _, line in content)
total_pages = 1
_offset = limit * (page - 1)
total_pages = -1
loglines = []
for lineno, item in enumerate(content_iterator()):
_ts, line = item
if limit and lineno>(_offset+limit):
break
if _offset and lineno<_offset:
continue
loglines.append(line)
loglines = "\n".join(loglines)

return {
"content": loglines,
Expand Down
2 changes: 1 addition & 1 deletion services/ui_backend_service/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ psycopg2
aiopg
pygit2==1.12.1
aiohttp_cors==0.7.0
metaflow>=2.11.4
metaflow @ git+https://github.com/Netflix/metaflow.git@feature/stream-logs-in-chunks
click==8.0.3
azure-storage-blob==12.13.1
azure-identity==1.10.0
Expand Down
Loading