From 2760d9e287a037c7e294a6325859f7e2af5a6fb3 Mon Sep 17 00:00:00 2001 From: Christopher Perkins Date: Tue, 16 Jul 2024 10:06:52 -0400 Subject: [PATCH] chore: Implemented Alex's notes from code review --- README.md | 12 +++---- bundled/tool/type_hints.py | 58 ++++++++++++++++++++++++++++++++++ bundled/tool/zenml_client.py | 3 +- bundled/tool/zenml_grapher.py | 23 ++++++++------ bundled/tool/zenml_wrappers.py | 11 ++++--- 5 files changed, 84 insertions(+), 23 deletions(-) create mode 100644 bundled/tool/type_hints.py diff --git a/README.md b/README.md index 72510dad..95fa73ab 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ The ZenML VSCode extension seamlessly integrates with [ZenML](https://github.com ## Features - **Server, Stacks, and Pipeline Runs Views**: Interact directly with ML stacks, pipeline runs, and server configurations from the Activity Bar. -- **DAG Visualization for Pipeline Runs**: Explore Directed Acyclic Graphs for each pipeline view directly from command on the Activity Bar. +- **DAG Visualization for Pipeline Runs**: Explore Directed Acyclic Graphs for each pipeline view directly directly on the Activity Bar. - **Python Tool Integration**: Utilizes a Language Server Protocol (LSP) server for real-time synchronization with the ZenML environment. - **Real-Time Configuration Monitoring**: Leverages `watchdog` to dynamically update configurations, keeping the extension in sync with your ZenML setup. - **Status Bar**: Display the current stack name and connection status. You can @@ -31,17 +31,17 @@ this extension and your Python version needs to be 3.8 or greater. - **Pipeline Runs**: Monitor and manage pipeline runs, including deleting runs from the system and rendering DAGs. - **Environment Information**: Get detailed snapshots of the development environment, aiding troubleshooting. -### DAG Rendering +### DAG Visualization -![DAG Rendering Example](resources/zenml-extension-dag.gif) +![DAG Visualization Example](resources/zenml-extension-dag.gif) - **Directed Acyclic Graph rendering** - - click on the Render Dag context action(labeled 1 in above image) next to the pipeline run you want to render. This will render the DAG in the editor window. + - click on the Render Dag context action (labeled 1 in above image) next to the pipeline run you want to render. This will render the DAG in the editor window. - **Graph manuevering** - Panning the graph can be done by clicking and dragging anywhere on the graph. - - Zooming can be controlled by the mousewheel, the control panel(labeled 2 in the above graph) or double-clicking anywhere there is not a node. + - Zooming can be controlled by the mousewheel, the control panel (labeled 2 in the above graph) or double-clicking anywhere there is not a node. - Mousing over a node will highlight all edges being output by that node - - Clicking a node will display the data related to it in the ZenML panel view(labeled 3 in the above image) + - Clicking a node will display the data related to it in the ZenML panel view (labeled 3 in the above image) - Double-clicking a node will open the dashboard in a web browser to either the pipeline run or the artifact version. ## Requirements diff --git a/bundled/tool/type_hints.py b/bundled/tool/type_hints.py new file mode 100644 index 00000000..bda4b20e --- /dev/null +++ b/bundled/tool/type_hints.py @@ -0,0 +1,58 @@ +from typing import Any, TypedDict, Dict, List, Union +from uuid import UUID + + +class StepArtifactBody(TypedDict): + type: str + artifact: Dict[str, str] + +class StepArtifact(TypedDict): + id: UUID + body: StepArtifactBody + +class GraphNode(TypedDict): + id: str + type: str + data: Dict[str, str] + +class GraphEdge(TypedDict): + id: str + source: str + target: str + + + +class GraphResponse(TypedDict): + nodes: List[GraphNode] + edges: List[GraphEdge] + name: str + status: str + version: str + +class ErrorResponse(TypedDict): + error: str + +class RunStepResponse(TypedDict): + name: str + id: str + status: str + author: Dict[str, str] + startTime: Union[str, None] + endTime: Union[str, None] + duration: Union[str, None] + stackName: str + orchestrator: Dict[str, str] + pipeline: Dict[str, str] + cacheKey: str + sourceCode: str + logsUri: str + +class RunArtifactResponse(TypedDict): + name: str + version: str + id: str + type: str + author: Dict[str, str] + update: str + data: Dict[str, str] + metadata: Dict[str, Any] \ No newline at end of file diff --git a/bundled/tool/zenml_client.py b/bundled/tool/zenml_client.py index 9c234745..7b3111a4 100644 --- a/bundled/tool/zenml_client.py +++ b/bundled/tool/zenml_client.py @@ -12,7 +12,6 @@ # permissions and limitations under the License. """ZenML client class. Initializes all wrappers.""" - class ZenMLClient: """Provides a high-level interface to ZenML functionalities by wrapping core components.""" @@ -36,4 +35,4 @@ def __init__(self): self.zen_server_wrapper = ZenServerWrapper(self.config_wrapper) self.stacks_wrapper = StacksWrapper(self.client) self.pipeline_runs_wrapper = PipelineRunsWrapper(self.client) - self.initialized = True + self.initialized = True \ No newline at end of file diff --git a/bundled/tool/zenml_grapher.py b/bundled/tool/zenml_grapher.py index eae91c5a..a2bde8af 100644 --- a/bundled/tool/zenml_grapher.py +++ b/bundled/tool/zenml_grapher.py @@ -12,14 +12,17 @@ # permissions and limitations under the License. """This module contains a tool to mimic LineageGraph output for pipeline runs""" +from typing import Dict, List +from type_hints import GraphEdge, GraphNode, GraphResponse, StepArtifact + class Grapher: """Quick and dirty implementation of ZenML/LineageGraph to reduce number of api calls""" def __init__(self, run): self.run = run - self.nodes = [] - self.edges = [] - self.artifacts = {} + self.nodes: List[GraphNode] = [] + self.edges: List[GraphEdge] = [] + self.artifacts: Dict[str, bool] = {} def build_nodes_from_steps(self) -> None: """Builds internal node list from run steps""" @@ -41,10 +44,10 @@ def build_nodes_from_steps(self) -> None: self.add_artifacts_from_list(step_data.body.outputs) - def add_artifacts_from_list(self, list) -> None: + def add_artifacts_from_list(self, dictOfArtifacts: Dict[str, StepArtifact]) -> None: """Used to add unique artifacts to the internal nodes list by build_nodes_from_steps""" - for artifact in list: - id = str(list[artifact].body.artifact.id) + for artifact in dictOfArtifacts: + id = str(dictOfArtifacts[artifact].body.artifact.id) if id in self.artifacts: continue @@ -55,8 +58,8 @@ def add_artifacts_from_list(self, list) -> None: "id": id, "data": { "name": artifact, - "artifact_type": list[artifact].body.type, - "execution_id": str(list[artifact].id), + "artifact_type": dictOfArtifacts[artifact].body.type, + "execution_id": str(dictOfArtifacts[artifact].id), }, }) @@ -78,7 +81,7 @@ def build_edges_from_steps(self) -> None: self.add_edge(step_id, output_id) - def add_edge(self, v, w) -> None: + def add_edge(self, v: str, w: str) -> None: """Helper method to add an edge to the internal edges list""" self.edges.append({ "id": f"{v}_{w}", @@ -86,7 +89,7 @@ def add_edge(self, v, w) -> None: "target": w, }) - def to_dict(self) -> dict: + def to_dict(self) -> GraphResponse: """Returns dictionary containing graph data""" return { "nodes": self.nodes, diff --git a/bundled/tool/zenml_wrappers.py b/bundled/tool/zenml_wrappers.py index 87839c20..a119fe86 100644 --- a/bundled/tool/zenml_wrappers.py +++ b/bundled/tool/zenml_wrappers.py @@ -14,7 +14,8 @@ import json import pathlib -from typing import Any +from typing import Any, Tuple, Union +from type_hints import GraphResponse, ErrorResponse, RunStepResponse, RunArtifactResponse from zenml_grapher import Grapher @@ -326,7 +327,7 @@ def delete_pipeline_run(self, args) -> dict: except self.ZenMLBaseException as e: return {"error": f"Failed to delete pipeline run: {str(e)}"} - def get_pipeline_run(self, args) -> dict: + def get_pipeline_run(self, args: Tuple[str]) -> dict: """Gets a ZenML pipeline run. Args: @@ -363,7 +364,7 @@ def get_pipeline_run(self, args) -> dict: except self.ZenMLBaseException as e: return {"error": f"Failed to retrieve pipeline run: {str(e)}"} - def get_pipeline_run_graph(self, args) -> dict: + def get_pipeline_run_graph(self, args: Tuple[str]) -> Union[GraphResponse, ErrorResponse]: """Gets a ZenML pipeline run step DAG. Args: @@ -381,7 +382,7 @@ def get_pipeline_run_graph(self, args) -> dict: except self.ZenMLBaseException as e: return {"error": f"Failed to retrieve pipeline run graph: {str(e)}"} - def get_run_step(self, args) -> dict: + def get_run_step(self, args: Tuple[str]) -> Union[RunStepResponse, ErrorResponse]: """Gets a ZenML pipeline run step. Args: @@ -428,7 +429,7 @@ def get_run_step(self, args) -> dict: except self.ZenMLBaseException as e: return {"error": f"Failed to retrieve pipeline run step: {str(e)}"} - def get_run_artifact(self, args) -> dict: + def get_run_artifact(self, args: Tuple[str]) -> Union[RunArtifactResponse, ErrorResponse]: """Gets a ZenML pipeline run artifact. Args: