Skip to content

Commit

Permalink
Add cloud watcher example (#346)
Browse files Browse the repository at this point in the history
* add examples udc for watch for files locally and run flow agents

* cleanup

* spellcheck

* clarification nit

* spellcheck

* add note on configuring output_path; add example cloud watcher agent

* spellcheck

* upd
  • Loading branch information
bensonlee5 authored Aug 29, 2024
1 parent 936b829 commit 03fd638
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 20 deletions.
4 changes: 4 additions & 0 deletions .wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ Uptime
executables
DiffOutlined
SelectOutlined
AgentMonitoring
Hostname
toc
worklist


ApiOutlined
Expand Down
4 changes: 4 additions & 0 deletions docs/app/agents/Agent.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ Three additional input parameters are available for all configurations:

A Connection watches a directory (and associated subdirectories) for new files and updates to existing files. The Flow configured with the Agent is triggered when all expected files are found.

The _input_path_ variable, configured for each Connection [at installation](#windows-installation), specifies the directory to watch.

#### Input Parameters

- `Flow Name`: The flow to run upon observing new files matching the specified pattern.
Expand Down Expand Up @@ -328,6 +330,8 @@ Multiple flow cytometers are used to observe cell populations for a related set
A Connection that monitors Flow file outputs, saving them to local directories as specified by the Agent logic.
The _output_path_ variable, configured for each Connection [at installation](#windows-installation), specifies the directory to store files to.
#### Input Parameters
- `Flow Name`: The flow from which to download output files. This will autopopulate the glob pattern matching field correspondingly.
Expand Down
22 changes: 15 additions & 7 deletions docs/app/agents/AgentMonitoring.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ title: Monitoring Agents
displayed_sidebar: webUiSidebar
---

import { DiffOutlined, PlusOutlined, SelectOutlined } from "@ant-design/icons";
import { DiffOutlined, EditOutlined, PlusOutlined, SelectOutlined } from "@ant-design/icons";

## Maintaining Agents

Expand Down Expand Up @@ -114,13 +114,21 @@ Disabled Agents cannot be updated and its connections can no longer communicate

### Updating Connections

To update a Connection, select the desired Connection in the Connections tab. You can update the following fields on a connection from the Ganymede UI:
To update a Connection, select the desired Connection in the Connections tab and click the <div className="button darker_gray_button"><EditOutlined /> Edit Connections</div> button. You can update the following fields on a connection from the Ganymede UI:

- Name: Display name for the Connection
- Image: An image to associate with the Connection in the Ganymede UI.
- Tags: [File tags](../files/Tags) to apply to files uploaded by the Connection
- Variables: Key-value pairs that can be referenced in user-defined code
- Labels: Strings that can be used to group Connections in the Ganymede UI
<div class="text--center">
<img
alt="Connection Detail UI"
src="https://ganymede-bio.mo.cloudinary.net/agent/ConnectionDetails20240726.png"
width="600"
/>
</div>

- **Name**: Display name for the Connection
- **Image**: An image to associate with the Connection in the Ganymede UI.
- **Tags**: [File tags](../files/Tags) to apply to files uploaded by the Connection
- **Variables**: Key-value pairs that can be referenced in user-defined code
- **Labels**: Strings that can be used to group Connections in the Ganymede UI

### Remote Update of Live Connections

Expand Down
63 changes: 50 additions & 13 deletions docs/sdk/markdowns/AgentModelsAndMethods.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
id: AgentModelsAndMethods
title: Agent Models and Methods
displayed_sidebar: SDKSidebar
toc_max_heading_level: 4
---

import NodeChip from '@site/src/components/NodeChip.js'
Expand All @@ -16,6 +17,8 @@ the Flow that is executed.
Below is a skeleton example for the user-defined code with associated types:

```python
from typing import Callable

def fp(watch_dir: str, parent_dir: str, pattern: str) -> Callable[[str], bool]:
"""
This function returns a function that performs pattern matching against a file path.
Expand All @@ -35,7 +38,7 @@ def get_param_mapping(
file_name: str = "",
modified_time: str = "",
body: bytes = bytes(),
) -> Dict[str, Callable[[str], bool]]:
) -> dict[str, Callable[[str], bool]]:
"""
This function is called when a file is added or modified in the watch directory.
Modify this function to capture the files you want to trigger the flow;
Expand All @@ -47,7 +50,6 @@ def get_param_mapping(
This function returns a dictionary of fp objects indexed by `node name`.`param name`
"""

pass

def execute(flow_params_fw: FileWatcherResult) -> TriggerFlowParams:
Expand All @@ -62,17 +64,19 @@ def execute(flow_params_fw: FileWatcherResult) -> TriggerFlowParams:
pass
```

## User-defined code examples
## User-defined code examples by Agent type

### Watch for files locally and run flow

### Deliver files to Nodes that take a single file as input
#### Delivering files to a Flow with single file input Nodes

This example shows an Agent that delivers a csv file to the [Bioreactor_File Node](../../nodes/File/CSV_Read), an excel file containing the word 'medium' to the [Medium_Composition Node](../../nodes/File/Excel_Read.md), and an excel file containing the word 'eventlog' to the [Event_Log Node](../../nodes/File/Excel_Read.md).

```python
import glob
import os
import re
from typing import Callable, Dict, List, Union
from typing import Callable
from urllib import parse

from ganymede_sdk.agent.models import (
Expand All @@ -91,14 +95,13 @@ def fp(watch_dir: str, parent_dir: str, pattern: str) -> Callable[[str], bool]:
return fp_res


# Required Function
def get_param_mapping(
watch_dir: str,
parent_dir: str = "",
file_name: str = "",
modified_time: str = "",
body: bytes = bytes(),
) -> Dict[str, Union[Callable[[str], bool], List[Callable[[str], bool]]]]:
) -> dict[str, Callable[[str], bool] | list[Callable[[str], bool]]]:
id_group = re.search(r"^(\w+)", file_name)
if id_group is None:
return {}
Expand All @@ -119,14 +122,14 @@ def execute(flow_params_fw: FileWatcherResult, **kwargs) -> TriggerFlowParams:
)
```

### Example with Node taking multiple files as input
#### Deliver files to a Flow with a multi-input Node

This example shows an Agent configured to work with an flow with a Node taking multiple inputs, picking up filenames starting with 'Yeast_B1', 'Yeast_B2', 'Yeast_C1', 'Yeast_C2' and delivering the observed files to the Read_FCS_Files node. The Agent also delivers an input parameter of "exp234" to the [Experiment_ID Node](../../nodes/File/Input_File_Multi.md), which is an [Input_Param node](../../nodes/Tag/Input_Param).

```python
import glob
import os
from typing import Callable, Dict
from typing import Callable

from agent_sdk import info

Expand All @@ -149,7 +152,7 @@ def get_param_mapping(
file_name: str = "",
modified_time: str = "",
body: bytes = bytes(),
) -> Dict[str, Callable[[str], bool]]:
) -> dict[str, Callable[[str], bool]]:
match_dict = dict()
for well_row in range(1, 3):
for well_col in ["B", "C"]:
Expand Down Expand Up @@ -184,6 +187,40 @@ def execute(flow_params_fw: FileWatcherResult, **kwargs) -> TriggerFlowParams:
)
```

### Watch for flow outputs then save locally

#### Deliver worklist to a liquid handler PC

```python
from ganymede_sdk.agent.models import FileParam
from pathlib import Path
import os


def execute(new_file: FileParam, **kwargs) -> None:
filename = new_file.filename.split("/")[-1]
filename_split = filename.split("_")
exp_id = f"{filename_split[0]}_{filename_split[1]}"

default_path = "C:/Users/dev/liquid_handler/worklist/"

# The vars parameter can be updated by adding variables on the associated [Connection page](../../app/agents/AgentMonitoring#updating-connections) or upon [Connection installation](../../app/agents/Agent#windows-installation) using a -v flag for each variable.
path = Path(kwargs.get("vars", {}).get("output_path", default_path))

# add experiment ID to the path variable
path = path / exp_id

if not os.path.exists(path):
os.makedirs(path)

# full_path is C:/Users/dev/liquid_handler/worklist/<exp_id>/<filename>
full_path = path / filename
fp = open(full_path, "wb")
fp.write(new_file.body)

return
```

## Classes for Agent-triggered flows

### FileWatcherResult Class
Expand All @@ -197,10 +234,10 @@ FileWatcherResult is a dictionary of FileParam objects indexed by `node name`.`p

TriggerFlowParams specifies the inputs for the Flow executed when all files are observed. It includes the following parameters:

- _param_ **single_file_params**: Optional[Dict[str, FileParam]] - Dict of FileParam objects indexed by `node name`.`param name`. These parameters are used for Nodes that accept a single file as input.
- _param_ **multi_file_params**: Optional[Dict[str, MultiFileParam]] - Dict of MultiFileParam objects indexed by `node name`.`param name`. These parameters are used for Nodes that accept multiple files as input.
- _param_ **single_file_params**: Optional[dict[str, FileParam]] - Dict of FileParam objects indexed by `node name`.`param name`. These parameters are used for Nodes that accept a single file as input.
- _param_ **multi_file_params**: Optional[dict[str, MultiFileParam]] - Dict of MultiFileParam objects indexed by `node name`.`param name`. These parameters are used for Nodes that accept multiple files as input.
- _param_ **benchling_tag**: Optional[Tag] - Additional parameters to be passed to flow. This parameter is used for inputs to the Input_Benchling node.
- _param_ **additional_params**: Optional[Dict[str, str]] - Additional parameters to be passed to flow. This parameter is used for inputs to the [Input_Param node](../../nodes/Tag/Input_Param.md); the key is the name if the Node name for the input parameter, and the value is the string to pass into the Node.
- _param_ **additional_params**: Optional[dict[str, str]] - Additional parameters to be passed to flow. This parameter is used for inputs to the [Input_Param node](../../nodes/Tag/Input_Param.md); the key is the name if the Node name for the input parameter, and the value is the string to pass into the Node.

### FileParam Class

Expand Down

0 comments on commit 03fd638

Please sign in to comment.