diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 1a563c8c60..3911876ac7 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -158,6 +158,11 @@ def _dispatch_execute( utils.write_proto_to_file(v.to_flyte_idl(), os.path.join(ctx.execution_state.engine_dir, k)) ctx.file_access.put_data(ctx.execution_state.engine_dir, output_prefix, is_multipart=True) +<<<<<<< HEAD +======= + _output_deck(task_def.name.split(".")[-1], ctx.user_space_params) + utils._output_span() +>>>>>>> d02d0913 (expose metrics) logger.info(f"Engine folder written successfully to the output prefix {output_prefix}") if not task_def.disable_deck: diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index 6556ac1469..b9b556783d 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -16,7 +16,6 @@ IgnoreOutputs """ - import collections import datetime from abc import abstractmethod diff --git a/flytekit/core/constants.py b/flytekit/core/constants.py index cda20602b2..d1df73b7b6 100644 --- a/flytekit/core/constants.py +++ b/flytekit/core/constants.py @@ -2,6 +2,9 @@ OUTPUT_FILE_NAME = "outputs.pb" FUTURES_FILE_NAME = "futures.pb" ERROR_FILE_NAME = "error.pb" +# Cuurrently, only timeit spans will be stored in the span file +SPAN_FILE_NAME = "span.pb" +DECK_FILE_NAME = "deck.html" class SdkTaskType(object): diff --git a/flytekit/core/utils.py b/flytekit/core/utils.py index 437d2b71a4..a834747d86 100644 --- a/flytekit/core/utils.py +++ b/flytekit/core/utils.py @@ -9,7 +9,9 @@ from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, cast from flyteidl.core import tasks_pb2 as _core_task +from flyteidl.core.metrics_pb2 import Span +from flytekit.core import constants as _constants from flytekit.core.pod_template import PodTemplate from flytekit.loggers import logger @@ -331,3 +333,25 @@ def __exit__(self, exc_type, exc_val, exc_tb): end_process_time - self._start_process_time, ) ) + + +def _output_span(): + from flytekit.core.context_manager import FlyteContextManager + + ctx = FlyteContextManager.current_context() + root_span = Span() + + for info in ctx.user_space_params.timeline_deck.time_info: # type: ignore + span = Span() + span.operation_id = info["Name"] + span.start_time.FromDatetime(info["Start"]) + span.end_time.FromDatetime(info["Finish"]) + root_span.spans.append(span) + + local_dir = ctx.file_access.get_random_local_directory() + local_path = f"{local_dir}{_os.sep}{_constants.SPAN_FILE_NAME}" + with open(local_path, "wb") as f: + f.write(root_span.SerializeToString()) + + remote_path = f"{ctx.user_space_params.output_metadata_prefix}{_os.sep}{_constants.SPAN_FILE_NAME}" # type: ignore + ctx.file_access.put_data(local_path, remote_path) diff --git a/flytekit/deck/deck.py b/flytekit/deck/deck.py index c011a218a6..7bb9d46d1a 100644 --- a/flytekit/deck/deck.py +++ b/flytekit/deck/deck.py @@ -2,12 +2,12 @@ import typing from typing import Optional +from flytekit.core import constants as _constants from flytekit.core.context_manager import ExecutionParameters, ExecutionState, FlyteContext, FlyteContextManager from flytekit.loggers import logger from flytekit.tools.interactive import ipython_check OUTPUT_DIR_JUPYTER_PREFIX = "jupyter" -DECK_FILE_NAME = "deck.html" class Deck: @@ -146,12 +146,12 @@ def _get_deck( def _output_deck(task_name: str, new_user_params: ExecutionParameters): ctx = FlyteContext.current_context() local_dir = ctx.file_access.get_random_local_directory() - local_path = f"{local_dir}{os.sep}{DECK_FILE_NAME}" + local_path = f"{local_dir}{os.sep}{_constants.DECK_FILE_NAME}" with open(local_path, "w") as f: f.write(_get_deck(new_user_params, ignore_jupyter=True)) logger.info(f"{task_name} task creates flyte deck html to file://{local_path}") if ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION: - remote_path = f"{new_user_params.output_metadata_prefix}{os.sep}{DECK_FILE_NAME}" + remote_path = f"{new_user_params.output_metadata_prefix}{os.sep}{_constants.DECK_FILE_NAME}" kwargs: typing.Dict[str, str] = { "ContentType": "text/html", # For s3 "content_type": "text/html", # For gcs