Skip to content

Commit

Permalink
chore: Implemented Alex's notes from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
Christopher-R-Perkins committed Jul 16, 2024
1 parent 68161cd commit 2760d9e
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 23 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ The ZenML VSCode extension seamlessly integrates with [ZenML](https://github.com
## Features

- **Server, Stacks, and Pipeline Runs Views**: Interact directly with ML stacks, pipeline runs, and server configurations from the Activity Bar.
- **DAG Visualization for Pipeline Runs**: Explore Directed Acyclic Graphs for each pipeline view directly from command on the Activity Bar.
- **DAG Visualization for Pipeline Runs**: Explore Directed Acyclic Graphs for each pipeline view directly directly on the Activity Bar.
- **Python Tool Integration**: Utilizes a Language Server Protocol (LSP) server for real-time synchronization with the ZenML environment.
- **Real-Time Configuration Monitoring**: Leverages `watchdog` to dynamically update configurations, keeping the extension in sync with your ZenML setup.
- **Status Bar**: Display the current stack name and connection status. You can
Expand All @@ -31,17 +31,17 @@ this extension and your Python version needs to be 3.8 or greater.
- **Pipeline Runs**: Monitor and manage pipeline runs, including deleting runs from the system and rendering DAGs.
- **Environment Information**: Get detailed snapshots of the development environment, aiding troubleshooting.

### DAG Rendering
### DAG Visualization

![DAG Rendering Example](resources/zenml-extension-dag.gif)
![DAG Visualization Example](resources/zenml-extension-dag.gif)

- **Directed Acyclic Graph rendering**
- click on the Render Dag context action(labeled 1 in above image) next to the pipeline run you want to render. This will render the DAG in the editor window.
- click on the Render Dag context action (labeled 1 in above image) next to the pipeline run you want to render. This will render the DAG in the editor window.
- **Graph manuevering**
- Panning the graph can be done by clicking and dragging anywhere on the graph.
- Zooming can be controlled by the mousewheel, the control panel(labeled 2 in the above graph) or double-clicking anywhere there is not a node.
- Zooming can be controlled by the mousewheel, the control panel (labeled 2 in the above graph) or double-clicking anywhere there is not a node.
- Mousing over a node will highlight all edges being output by that node
- Clicking a node will display the data related to it in the ZenML panel view(labeled 3 in the above image)
- Clicking a node will display the data related to it in the ZenML panel view (labeled 3 in the above image)
- Double-clicking a node will open the dashboard in a web browser to either the pipeline run or the artifact version.

## Requirements
Expand Down
58 changes: 58 additions & 0 deletions bundled/tool/type_hints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from typing import Any, TypedDict, Dict, List, Union
from uuid import UUID


class StepArtifactBody(TypedDict):
type: str
artifact: Dict[str, str]

class StepArtifact(TypedDict):
id: UUID
body: StepArtifactBody

class GraphNode(TypedDict):
id: str
type: str
data: Dict[str, str]

class GraphEdge(TypedDict):
id: str
source: str
target: str



class GraphResponse(TypedDict):
nodes: List[GraphNode]
edges: List[GraphEdge]
name: str
status: str
version: str

class ErrorResponse(TypedDict):
error: str

class RunStepResponse(TypedDict):
name: str
id: str
status: str
author: Dict[str, str]
startTime: Union[str, None]
endTime: Union[str, None]
duration: Union[str, None]
stackName: str
orchestrator: Dict[str, str]
pipeline: Dict[str, str]
cacheKey: str
sourceCode: str
logsUri: str

class RunArtifactResponse(TypedDict):
name: str
version: str
id: str
type: str
author: Dict[str, str]
update: str
data: Dict[str, str]
metadata: Dict[str, Any]
3 changes: 1 addition & 2 deletions bundled/tool/zenml_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# permissions and limitations under the License.
"""ZenML client class. Initializes all wrappers."""


class ZenMLClient:
"""Provides a high-level interface to ZenML functionalities by wrapping core components."""

Expand All @@ -36,4 +35,4 @@ def __init__(self):
self.zen_server_wrapper = ZenServerWrapper(self.config_wrapper)
self.stacks_wrapper = StacksWrapper(self.client)
self.pipeline_runs_wrapper = PipelineRunsWrapper(self.client)
self.initialized = True
self.initialized = True
23 changes: 13 additions & 10 deletions bundled/tool/zenml_grapher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
# permissions and limitations under the License.
"""This module contains a tool to mimic LineageGraph output for pipeline runs"""

from typing import Dict, List
from type_hints import GraphEdge, GraphNode, GraphResponse, StepArtifact

class Grapher:
"""Quick and dirty implementation of ZenML/LineageGraph to reduce number of api calls"""

def __init__(self, run):
self.run = run
self.nodes = []
self.edges = []
self.artifacts = {}
self.nodes: List[GraphNode] = []
self.edges: List[GraphEdge] = []
self.artifacts: Dict[str, bool] = {}

def build_nodes_from_steps(self) -> None:
"""Builds internal node list from run steps"""
Expand All @@ -41,10 +44,10 @@ def build_nodes_from_steps(self) -> None:
self.add_artifacts_from_list(step_data.body.outputs)


def add_artifacts_from_list(self, list) -> None:
def add_artifacts_from_list(self, dictOfArtifacts: Dict[str, StepArtifact]) -> None:
"""Used to add unique artifacts to the internal nodes list by build_nodes_from_steps"""
for artifact in list:
id = str(list[artifact].body.artifact.id)
for artifact in dictOfArtifacts:
id = str(dictOfArtifacts[artifact].body.artifact.id)
if id in self.artifacts:
continue

Expand All @@ -55,8 +58,8 @@ def add_artifacts_from_list(self, list) -> None:
"id": id,
"data": {
"name": artifact,
"artifact_type": list[artifact].body.type,
"execution_id": str(list[artifact].id),
"artifact_type": dictOfArtifacts[artifact].body.type,
"execution_id": str(dictOfArtifacts[artifact].id),
},
})

Expand All @@ -78,15 +81,15 @@ def build_edges_from_steps(self) -> None:
self.add_edge(step_id, output_id)


def add_edge(self, v, w) -> None:
def add_edge(self, v: str, w: str) -> None:
"""Helper method to add an edge to the internal edges list"""
self.edges.append({
"id": f"{v}_{w}",
"source": v,
"target": w,
})

def to_dict(self) -> dict:
def to_dict(self) -> GraphResponse:
"""Returns dictionary containing graph data"""
return {
"nodes": self.nodes,
Expand Down
11 changes: 6 additions & 5 deletions bundled/tool/zenml_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

import json
import pathlib
from typing import Any
from typing import Any, Tuple, Union
from type_hints import GraphResponse, ErrorResponse, RunStepResponse, RunArtifactResponse
from zenml_grapher import Grapher


Expand Down Expand Up @@ -326,7 +327,7 @@ def delete_pipeline_run(self, args) -> dict:
except self.ZenMLBaseException as e:
return {"error": f"Failed to delete pipeline run: {str(e)}"}

def get_pipeline_run(self, args) -> dict:
def get_pipeline_run(self, args: Tuple[str]) -> dict:
"""Gets a ZenML pipeline run.
Args:
Expand Down Expand Up @@ -363,7 +364,7 @@ def get_pipeline_run(self, args) -> dict:
except self.ZenMLBaseException as e:
return {"error": f"Failed to retrieve pipeline run: {str(e)}"}

def get_pipeline_run_graph(self, args) -> dict:
def get_pipeline_run_graph(self, args: Tuple[str]) -> Union[GraphResponse, ErrorResponse]:
"""Gets a ZenML pipeline run step DAG.
Args:
Expand All @@ -381,7 +382,7 @@ def get_pipeline_run_graph(self, args) -> dict:
except self.ZenMLBaseException as e:
return {"error": f"Failed to retrieve pipeline run graph: {str(e)}"}

def get_run_step(self, args) -> dict:
def get_run_step(self, args: Tuple[str]) -> Union[RunStepResponse, ErrorResponse]:
"""Gets a ZenML pipeline run step.
Args:
Expand Down Expand Up @@ -428,7 +429,7 @@ def get_run_step(self, args) -> dict:
except self.ZenMLBaseException as e:
return {"error": f"Failed to retrieve pipeline run step: {str(e)}"}

def get_run_artifact(self, args) -> dict:
def get_run_artifact(self, args: Tuple[str]) -> Union[RunArtifactResponse, ErrorResponse]:
"""Gets a ZenML pipeline run artifact.
Args:
Expand Down

0 comments on commit 2760d9e

Please sign in to comment.