Skip to content

Commit

Permalink
Merge pull request #25 from Christopher-R-Perkins/feature/dag-visualizer
Browse files Browse the repository at this point in the history
DAG Visualizer
  • Loading branch information
strickvl authored Jul 16, 2024
2 parents 8ff93a6 + 2760d9e commit 0a959de
Show file tree
Hide file tree
Showing 39 changed files with 1,706 additions and 377 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ out/
build/
*.tsbuildinfo
.history/
dag-packed.js

# env
.env
Expand All @@ -38,4 +39,4 @@ build/
bundled/libs/
**/__pycache__
**/.pytest_cache
**/.vs
**/.vs
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The ZenML VSCode extension seamlessly integrates with [ZenML](https://github.com
## Features

- **Server, Stacks, and Pipeline Runs Views**: Interact directly with ML stacks, pipeline runs, and server configurations from the Activity Bar.
- **DAG Visualization for Pipeline Runs**: Explore Directed Acyclic Graphs for each pipeline view directly directly on the Activity Bar.
- **Python Tool Integration**: Utilizes a Language Server Protocol (LSP) server for real-time synchronization with the ZenML environment.
- **Real-Time Configuration Monitoring**: Leverages `watchdog` to dynamically update configurations, keeping the extension in sync with your ZenML setup.
- **Status Bar**: Display the current stack name and connection status. You can
Expand All @@ -27,9 +28,22 @@ this extension and your Python version needs to be 3.8 or greater.

- **Manage Server Connections**: Connect or disconnect from ZenML servers and refresh server status.
- **Stack Operations**: View stack details, rename, copy, or set active stacks directly from VSCode.
- **Pipeline Runs**: Monitor and manage pipeline runs, including deleting runs from the system.
- **Pipeline Runs**: Monitor and manage pipeline runs, including deleting runs from the system and rendering DAGs.
- **Environment Information**: Get detailed snapshots of the development environment, aiding troubleshooting.

### DAG Visualization

![DAG Visualization Example](resources/zenml-extension-dag.gif)

- **Directed Acyclic Graph rendering**
- click on the Render Dag context action (labeled 1 in above image) next to the pipeline run you want to render. This will render the DAG in the editor window.
- **Graph manuevering**
- Panning the graph can be done by clicking and dragging anywhere on the graph.
- Zooming can be controlled by the mousewheel, the control panel (labeled 2 in the above graph) or double-clicking anywhere there is not a node.
- Mousing over a node will highlight all edges being output by that node
- Clicking a node will display the data related to it in the ZenML panel view (labeled 3 in the above image)
- Double-clicking a node will open the dashboard in a web browser to either the pipeline run or the artifact version.

## Requirements

- **ZenML Installation:** ZenML needs to be installed in the local Python environment associated with the Python interpreter selected in the current VS Code workspace. This extension interacts directly with your ZenML environment, so ensuring that ZenML is installed and properly configured is essential.
Expand Down
24 changes: 24 additions & 0 deletions bundled/tool/lsp_zenml.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,27 @@ def fetch_pipeline_runs(wrapper_instance, args):
def delete_pipeline_run(wrapper_instance, args):
"""Deletes a specified ZenML pipeline run."""
return wrapper_instance.delete_pipeline_run(args)

@self.command(f"{TOOL_MODULE_NAME}.getPipelineRun")
@self.zenml_command(wrapper_name="pipeline_runs_wrapper")
def get_pipeline_run(wrapper_instance, args):
"""Gets a specified ZenML pipeline run."""
return wrapper_instance.get_pipeline_run(args)

@self.command(f"{TOOL_MODULE_NAME}.getPipelineRunStep")
@self.zenml_command(wrapper_name="pipeline_runs_wrapper")
def get_run_step(wrapper_instance, args):
"""Gets a specified ZenML pipeline run step."""
return wrapper_instance.get_run_step(args)

@self.command(f"{TOOL_MODULE_NAME}.getPipelineRunArtifact")
@self.zenml_command(wrapper_name="pipeline_runs_wrapper")
def get_run_artifact(wrapper_instance, args):
"""Gets a specified ZenML pipeline artifact"""
return wrapper_instance.get_run_artifact(args)

@self.command(f"{TOOL_MODULE_NAME}.getPipelineRunDag")
@self.zenml_command(wrapper_name="pipeline_runs_wrapper")
def get_run_dag(wrapper_instance, args):
"""Gets graph data for a specified ZenML pipeline run"""
return wrapper_instance.get_pipeline_run_graph(args)
58 changes: 58 additions & 0 deletions bundled/tool/type_hints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from typing import Any, TypedDict, Dict, List, Union
from uuid import UUID


class StepArtifactBody(TypedDict):
type: str
artifact: Dict[str, str]

class StepArtifact(TypedDict):
id: UUID
body: StepArtifactBody

class GraphNode(TypedDict):
id: str
type: str
data: Dict[str, str]

class GraphEdge(TypedDict):
id: str
source: str
target: str



class GraphResponse(TypedDict):
nodes: List[GraphNode]
edges: List[GraphEdge]
name: str
status: str
version: str

class ErrorResponse(TypedDict):
error: str

class RunStepResponse(TypedDict):
name: str
id: str
status: str
author: Dict[str, str]
startTime: Union[str, None]
endTime: Union[str, None]
duration: Union[str, None]
stackName: str
orchestrator: Dict[str, str]
pipeline: Dict[str, str]
cacheKey: str
sourceCode: str
logsUri: str

class RunArtifactResponse(TypedDict):
name: str
version: str
id: str
type: str
author: Dict[str, str]
update: str
data: Dict[str, str]
metadata: Dict[str, Any]
18 changes: 13 additions & 5 deletions bundled/tool/zen_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ def __init__(self, lsp_server):
"always",
]

try:
with suppress_stdout_temporarily():
config_wrapper_instance = self.LSP_SERVER.zenml_client.config_wrapper
self.config_path = config_wrapper_instance.get_global_config_file_path()
except Exception as e:
self.log_error(f"Failed to retrieve global config file path: {e}")


def process_config_change(self, config_file_path: str):
"""Process the configuration file change."""
with suppress_stdout_temporarily():
Expand Down Expand Up @@ -88,20 +96,20 @@ def on_modified(self, event):
"""
Handles the modification event triggered when the global configuration file is changed.
"""
if event.src_path != self.config_path:
return

if self._timer is not None:
self._timer.cancel()

self._timer = Timer(self.debounce_interval, self.process_event, [event])
self._timer.start()

def process_event(self, event):
"""
Processes the event with a debounce mechanism.
"""
with suppress_stdout_temporarily():
config_wrapper_instance = self.LSP_SERVER.zenml_client.config_wrapper
config_file_path = config_wrapper_instance.get_global_config_file_path()
if event.src_path == str(config_file_path):
self.process_config_change(config_file_path)
self.process_config_change(event.src_path)

def watch_zenml_config_yaml(self):
"""
Expand Down
3 changes: 1 addition & 2 deletions bundled/tool/zenml_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# permissions and limitations under the License.
"""ZenML client class. Initializes all wrappers."""


class ZenMLClient:
"""Provides a high-level interface to ZenML functionalities by wrapping core components."""

Expand All @@ -36,4 +35,4 @@ def __init__(self):
self.zen_server_wrapper = ZenServerWrapper(self.config_wrapper)
self.stacks_wrapper = StacksWrapper(self.client)
self.pipeline_runs_wrapper = PipelineRunsWrapper(self.client)
self.initialized = True
self.initialized = True
100 changes: 100 additions & 0 deletions bundled/tool/zenml_grapher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Copyright (c) ZenML GmbH 2024. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing
# permissions and limitations under the License.
"""This module contains a tool to mimic LineageGraph output for pipeline runs"""

from typing import Dict, List
from type_hints import GraphEdge, GraphNode, GraphResponse, StepArtifact

class Grapher:
"""Quick and dirty implementation of ZenML/LineageGraph to reduce number of api calls"""

def __init__(self, run):
self.run = run
self.nodes: List[GraphNode] = []
self.edges: List[GraphEdge] = []
self.artifacts: Dict[str, bool] = {}

def build_nodes_from_steps(self) -> None:
"""Builds internal node list from run steps"""
self.nodes = []
self.artifacts = {}

for step in self.run.metadata.steps:
step_data = self.run.metadata.steps[step]
self.nodes.append({
"id": str(step_data.id),
"type": "step",
"data": {
"execution_id": str(step_data.id),
"name": step,
"status": step_data.body.status,
},
})
self.add_artifacts_from_list(step_data.body.inputs)
self.add_artifacts_from_list(step_data.body.outputs)


def add_artifacts_from_list(self, dictOfArtifacts: Dict[str, StepArtifact]) -> None:
"""Used to add unique artifacts to the internal nodes list by build_nodes_from_steps"""
for artifact in dictOfArtifacts:
id = str(dictOfArtifacts[artifact].body.artifact.id)
if id in self.artifacts:
continue

self.artifacts[id] = True

self.nodes.append({
"type": "artifact",
"id": id,
"data": {
"name": artifact,
"artifact_type": dictOfArtifacts[artifact].body.type,
"execution_id": str(dictOfArtifacts[artifact].id),
},
})


def build_edges_from_steps(self) -> None:
"""Builds internal edges list from run steps"""
self.edges = []

for step in self.run.metadata.steps:
step_data = self.run.metadata.steps[step]
step_id = str(step_data.id)

for artifact in step_data.body.inputs:
input_id = str(step_data.body.inputs[artifact].body.artifact.id)
self.add_edge(input_id, step_id)

for artifact in step_data.body.outputs:
output_id = str(step_data.body.outputs[artifact].body.artifact.id)
self.add_edge(step_id, output_id)


def add_edge(self, v: str, w: str) -> None:
"""Helper method to add an edge to the internal edges list"""
self.edges.append({
"id": f"{v}_{w}",
"source": v,
"target": w,
})

def to_dict(self) -> GraphResponse:
"""Returns dictionary containing graph data"""
return {
"nodes": self.nodes,
"edges": self.edges,
"status": self.run.body.status,
"name": self.run.body.pipeline.name,
"version": self.run.body.pipeline.body.version,
}
Loading

0 comments on commit 0a959de

Please sign in to comment.