From 94520588b34d42d82a7f0fbaa31004df05a2dcc7 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Tue, 14 May 2024 16:45:32 +0300 Subject: [PATCH 1/8] wip: temporarily add direct git dependency metaflow client. --- Dockerfile | 2 +- Dockerfile.ui_service | 2 +- services/ui_backend_service/requirements.txt | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index e40ab20e..584e1702 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,7 +18,7 @@ ENV FEATURE_RUN_GROUPS=0 ENV FEATURE_DEBUG_VIEW=1 RUN apt-get update -y \ - && apt-get -y install libpq-dev unzip gcc curl + && apt-get -y install libpq-dev unzip gcc curl git RUN pip3 install virtualenv requests diff --git a/Dockerfile.ui_service b/Dockerfile.ui_service index 5b0ffa61..3969362a 100644 --- a/Dockerfile.ui_service +++ b/Dockerfile.ui_service @@ -16,7 +16,7 @@ ARG CUSTOM_QUICKLINKS ENV CUSTOM_QUICKLINKS=$CUSTOM_QUICKLINKS RUN apt-get update -y \ - && apt-get -y install libpq-dev unzip gcc curl + && apt-get -y install libpq-dev unzip gcc curl git ADD services/__init__.py /root/services/__init__.py ADD services/data /root/services/data diff --git a/services/ui_backend_service/requirements.txt b/services/ui_backend_service/requirements.txt index 74942fa3..3c454e2c 100644 --- a/services/ui_backend_service/requirements.txt +++ b/services/ui_backend_service/requirements.txt @@ -6,7 +6,7 @@ psycopg2 aiopg pygit2==1.12.1 aiohttp_cors==0.7.0 -metaflow>=2.11.4 +metaflow @ git+https://github.com/Netflix/metaflow.git@feature/stream-logs-in-chunks click==8.0.3 azure-storage-blob==12.13.1 azure-identity==1.10.0 From aeea96cf4c6a6333afff630d9669702fa5e982c3 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Thu, 16 May 2024 12:22:16 +0300 Subject: [PATCH 2/8] fix localdev S3 access --- docker-compose.development.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docker-compose.development.yml b/docker-compose.development.yml index b4a049fd..55465388 100644 --- a/docker-compose.development.yml +++ b/docker-compose.development.yml @@ -25,6 +25,7 @@ services: - MF_UI_METADATA_HOST=${MF_UI_METADATA_HOST:-0.0.0.0} - MF_METADATA_DB_POOL_MIN=1 - MF_METADATA_DB_POOL_MAX=10 + - METAFLOW_S3_RETRY_COUNT=0 - LOGLEVEL=INFO - AIOPG_ECHO=0 - UI_ENABLED=0 @@ -51,6 +52,7 @@ services: - NOTIFICATIONS=$NOTIFICATIONS - GA_TRACKING_ID=none - PLUGINS=$PLUGINS + - AWS_PROFILE=$AWS_PROFILE depends_on: - migration metadata: From 22d4f359c949e6c333f85b4a38ae96901829491d Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Thu, 16 May 2024 16:31:09 +0300 Subject: [PATCH 3/8] wip: async StreamResponse for log download route --- services/ui_backend_service/api/log.py | 27 ++++++++++++++----- .../data/cache/get_log_file_action.py | 4 ++- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/services/ui_backend_service/api/log.py b/services/ui_backend_service/api/log.py index b33f8df0..2f20d60c 100644 --- a/services/ui_backend_service/api/log.py +++ b/services/ui_backend_service/api/log.py @@ -3,7 +3,7 @@ from services.data.db_utils import DBResponse, translate_run_key, translate_task_key, DBPagination, DBResponse from services.utils import handle_exceptions, web_response -from .utils import format_response_list, get_pathspec_from_request +from .utils import format_response_list, get_pathspec_from_request, logger from aiohttp import web from multidict import MultiDict @@ -240,8 +240,10 @@ async def get_task_log_file(self, request, logtype=STDOUT): attempt=task['attempt_id'] ) - lines, _ = await read_and_output(self.cache, task, logtype, output_raw=True) - return file_download_response(log_filename, lines) + def _gen(): + return stream_pages(self.cache, task, logtype, output_raw=True) + + return await file_download_response(request, log_filename, _gen) async def read_and_output(cache_client, task, logtype, limit=0, page=1, reverse_order=False, output_raw=False): @@ -263,6 +265,15 @@ async def read_and_output(cache_client, task, logtype, limit=0, page=1, reverse_ return log_response["content"], log_response["pages"] +async def stream_pages(cache_client, task, logtype, output_raw): + page = 1 + while True: + logs, _ = await read_and_output(cache_client, task, logtype, limit=1000, page=page, output_raw=output_raw) + if not logs: + break + yield logs + page += 1 + def get_pagination_params(request): """extract pagination params from request @@ -281,12 +292,16 @@ def get_pagination_params(request): return limit, page, reverse_order -def file_download_response(filename, body): - return web.Response( +async def file_download_response(request, filename, async_line_iterator): + response = web.StreamResponse( headers=MultiDict({'Content-Disposition': 'Attachment;filename={}'.format(filename)}), - body=body ) + await response.prepare(request) + async for line in async_line_iterator(): + await response.write(line.encode("utf-8")) + await response.write_eof() + return response class LogException(Exception): def __init__(self, msg='Failed to read log', id='log-error', traceback_str=None): diff --git a/services/ui_backend_service/data/cache/get_log_file_action.py b/services/ui_backend_service/data/cache/get_log_file_action.py index bb262e62..f13bc8f3 100644 --- a/services/ui_backend_service/data/cache/get_log_file_action.py +++ b/services/ui_backend_service/data/cache/get_log_file_action.py @@ -303,7 +303,9 @@ def paginated_result(content: List[Tuple[Optional[int], str]], page: int = 1, li if not output_raw: loglines, total_pages = format_loglines(content, page, limit, reverse_order) else: - loglines = "\n".join(line for _, line in content) + loglines = "" + if page == 1: + loglines = "\n".join(line for _, line in content) total_pages = 1 return { From c20f046d564d020ad0f7fbdfe0ae25f500af925d Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Fri, 17 May 2024 02:11:31 +0300 Subject: [PATCH 4/8] add rudimentary pagination for testing purposes --- .../ui_backend_service/data/cache/get_log_file_action.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/services/ui_backend_service/data/cache/get_log_file_action.py b/services/ui_backend_service/data/cache/get_log_file_action.py index f13bc8f3..cfe251c8 100644 --- a/services/ui_backend_service/data/cache/get_log_file_action.py +++ b/services/ui_backend_service/data/cache/get_log_file_action.py @@ -303,10 +303,11 @@ def paginated_result(content: List[Tuple[Optional[int], str]], page: int = 1, li if not output_raw: loglines, total_pages = format_loglines(content, page, limit, reverse_order) else: + _offset = limit * (page - 1) + total_pages = max(len(content) // limit, 1) if limit else 1 loglines = "" - if page == 1: - loglines = "\n".join(line for _, line in content) - total_pages = 1 + if page <= total_pages: + loglines = "\n".join(line for _, line in content[_offset:][:limit]) return { "content": loglines, From bafb9419b1d81fd040a7648ef9253c1685089754 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Fri, 17 May 2024 17:17:16 +0300 Subject: [PATCH 5/8] wip: use log streaming instead of loading whole content in memory --- .../data/cache/get_log_file_action.py | 40 +++++++++++-------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/services/ui_backend_service/data/cache/get_log_file_action.py b/services/ui_backend_service/data/cache/get_log_file_action.py index cfe251c8..88311fee 100644 --- a/services/ui_backend_service/data/cache/get_log_file_action.py +++ b/services/ui_backend_service/data/cache/get_log_file_action.py @@ -1,7 +1,7 @@ import hashlib import json -from typing import Dict, List, Optional, Tuple +from typing import Callable, Dict, List, Optional, Tuple from .client import CacheAction from .utils import streamed_errors import os @@ -137,15 +137,18 @@ def execute(cls, log_hash_changed = previous_log_hash is None or previous_log_hash != current_hash if log_hash_changed: - content = log_provider.get_log_content(task, logtype) - results[log_key] = json.dumps({"log_hash": current_hash, "content": content}) + # log_provider.get_log_content(task, logtype) + results[log_key] = json.dumps({"log_hash": current_hash}) else: results = {**existing_keys} if log_hash_changed or result_key not in existing_keys: + def content_iter(): + return log_provider.get_log_content(task, logtype) + results[result_key] = json.dumps( paginated_result( - json.loads(results[log_key])["content"], + content_iter, page, limit, reverse, @@ -204,15 +207,11 @@ def get_log_content(task: Task, logtype: str): stream = 'stderr' if logtype == STDERR else 'stdout' log_location = task.metadata_dict.get('log_location_%s' % stream) if log_location: - return [ - (None, line) - for line in task._load_log_legacy(log_location, stream).split("\n") - ] + for line in task._load_log_legacy(log_location, stream).split("\n"): + yield (None, line) else: - return [ - (_datetime_to_epoch(datetime), line) - for datetime, line in task.loglines(stream) - ] + for datetime, line in task.stream_loglines(stream): + yield (_datetime_to_epoch(datetime), line) class LogProviderBase: @@ -298,16 +297,23 @@ def get_log_content(self, task: Task, logtype: str): return get_log_content(task, logtype) -def paginated_result(content: List[Tuple[Optional[int], str]], page: int = 1, limit: int = 0, +def paginated_result(content_iterator: Callable, page: int = 1, limit: int = 0, reverse_order: bool = False, output_raw=False): if not output_raw: + content = [(ts, line) for ts, line in content_iterator()] loglines, total_pages = format_loglines(content, page, limit, reverse_order) else: _offset = limit * (page - 1) - total_pages = max(len(content) // limit, 1) if limit else 1 - loglines = "" - if page <= total_pages: - loglines = "\n".join(line for _, line in content[_offset:][:limit]) + total_pages = -1 + loglines = [] + for lineno, item in enumerate(content_iterator()): + _ts, line = item + if limit and lineno>(_offset+limit): + break + if _offset and lineno<_offset: + continue + loglines.append(line) + loglines = "\n".join(loglines) return { "content": loglines, From 04be8f2498a17946b2fa723286ad847a9f29c0b4 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Thu, 23 May 2024 17:34:22 +0300 Subject: [PATCH 6/8] comment on shortcomings of file streaming --- services/ui_backend_service/api/log.py | 5 +++-- .../ui_backend_service/data/cache/get_log_file_action.py | 3 +-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/services/ui_backend_service/api/log.py b/services/ui_backend_service/api/log.py index 2f20d60c..a6d01a67 100644 --- a/services/ui_backend_service/api/log.py +++ b/services/ui_backend_service/api/log.py @@ -297,8 +297,9 @@ async def file_download_response(request, filename, async_line_iterator): headers=MultiDict({'Content-Disposition': 'Attachment;filename={}'.format(filename)}), ) await response.prepare(request) - async for line in async_line_iterator(): - await response.write(line.encode("utf-8")) + # NOTE: this can not handle errors thrown by the cache, as status cannot be changed after .prepare() has been called. + async for lines in async_line_iterator(): + await response.write(lines.encode("utf-8")) await response.write_eof() return response diff --git a/services/ui_backend_service/data/cache/get_log_file_action.py b/services/ui_backend_service/data/cache/get_log_file_action.py index 88311fee..34622882 100644 --- a/services/ui_backend_service/data/cache/get_log_file_action.py +++ b/services/ui_backend_service/data/cache/get_log_file_action.py @@ -300,8 +300,7 @@ def get_log_content(self, task: Task, logtype: str): def paginated_result(content_iterator: Callable, page: int = 1, limit: int = 0, reverse_order: bool = False, output_raw=False): if not output_raw: - content = [(ts, line) for ts, line in content_iterator()] - loglines, total_pages = format_loglines(content, page, limit, reverse_order) + loglines, total_pages = format_loglines(content_iterator(), page, limit, reverse_order) else: _offset = limit * (page - 1) total_pages = -1 From d9b417e3dc4d276efcaff191c110de791c92c5e8 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Thu, 23 May 2024 17:51:05 +0300 Subject: [PATCH 7/8] add todo for log iterator --- services/ui_backend_service/data/cache/get_log_file_action.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/services/ui_backend_service/data/cache/get_log_file_action.py b/services/ui_backend_service/data/cache/get_log_file_action.py index 34622882..86e3a410 100644 --- a/services/ui_backend_service/data/cache/get_log_file_action.py +++ b/services/ui_backend_service/data/cache/get_log_file_action.py @@ -300,6 +300,8 @@ def get_log_content(self, task: Task, logtype: str): def paginated_result(content_iterator: Callable, page: int = 1, limit: int = 0, reverse_order: bool = False, output_raw=False): if not output_raw: + # TODO: support line iteration for this case as well. + # Currently unsupported due to the need to offer 'reverse_order' loglines, total_pages = format_loglines(content_iterator(), page, limit, reverse_order) else: _offset = limit * (page - 1) From 471443acc1e5c81d883beb09f40dd91e8978b0d2 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Mon, 27 May 2024 16:35:51 +0300 Subject: [PATCH 8/8] lint --- services/ui_backend_service/api/log.py | 4 +++- .../ui_backend_service/data/cache/get_log_file_action.py | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/services/ui_backend_service/api/log.py b/services/ui_backend_service/api/log.py index a6d01a67..22e71536 100644 --- a/services/ui_backend_service/api/log.py +++ b/services/ui_backend_service/api/log.py @@ -265,6 +265,7 @@ async def read_and_output(cache_client, task, logtype, limit=0, page=1, reverse_ return log_response["content"], log_response["pages"] + async def stream_pages(cache_client, task, logtype, output_raw): page = 1 while True: @@ -301,9 +302,10 @@ async def file_download_response(request, filename, async_line_iterator): async for lines in async_line_iterator(): await response.write(lines.encode("utf-8")) - await response.write_eof() + await response.write_eof() return response + class LogException(Exception): def __init__(self, msg='Failed to read log', id='log-error', traceback_str=None): self.message = msg diff --git a/services/ui_backend_service/data/cache/get_log_file_action.py b/services/ui_backend_service/data/cache/get_log_file_action.py index 86e3a410..70477f32 100644 --- a/services/ui_backend_service/data/cache/get_log_file_action.py +++ b/services/ui_backend_service/data/cache/get_log_file_action.py @@ -145,7 +145,7 @@ def execute(cls, if log_hash_changed or result_key not in existing_keys: def content_iter(): return log_provider.get_log_content(task, logtype) - + results[result_key] = json.dumps( paginated_result( content_iter, @@ -309,9 +309,9 @@ def paginated_result(content_iterator: Callable, page: int = 1, limit: int = 0, loglines = [] for lineno, item in enumerate(content_iterator()): _ts, line = item - if limit and lineno>(_offset+limit): + if limit and lineno > (_offset + limit): break - if _offset and lineno<_offset: + if _offset and lineno < _offset: continue loglines.append(line) loglines = "\n".join(loglines)