Skip to content

Commit

Permalink
add examples udc for watch for files locally and run flow agents
Browse files Browse the repository at this point in the history
  • Loading branch information
bensonlee5 committed Aug 28, 2024
1 parent e50968c commit 7cb43c9
Showing 1 changed file with 153 additions and 3 deletions.
156 changes: 153 additions & 3 deletions docs/sdk/markdowns/AgentModelsAndMethods.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,20 @@ in the directory that the Agent is watching. The _execute_ function is run whene
is used to specify the parameters to be passed to the Flow, and the _execute_ function specifies the input files and parameters to
the Flow that is executed.

Below is a skeleton example for the user-defined function with associated types:
Below is a skeleton example for the user-defined code with associated types:

```python
def fp(watch_dir: str, parent_dir: str, pattern: str) -> Callable[[str], bool]:
"""
This function returns a function that performs pattern matching against a file path.
Use this function as a template for creating your own pattern matching functions, which
you can then use in the values of the return object in the get_param_mapping function.
Returns
-------
Callable[[str], bool]
Function that takes a file as input and returns True if the file matches the pattern.
"""
pass

def get_param_mapping(
Expand All @@ -26,14 +36,154 @@ def get_param_mapping(
modified_time: str = "",
body: bytes = bytes(),
) -> Dict[str, Callable[[str], bool]]:
# returns a dictionary of fp objects indexed by `node name`.`param name`
"""
This function is called when a file is added or modified in the watch directory.
Modify this function to capture the files you want to trigger the flow;
the function should return a dictionary where the keys are <node name>.<param name>
and values are functions for performing pattern matching against the target file.
For nodes that accept multiple inputs, specify a list of functions to match against;
each specified function should uniquely match 1 file.
This function returns a dictionary of fp objects indexed by `node name`.`param name`
"""

pass

def execute(flow_params_fw: FileWatcherResult) -> TriggerFlowParams:
# returns a TriggerFlowParams object
"""
Called when all glob patterns specified by get_param_mapping have been matched.
Parameters
----------
flow_params_fw : FileWatcherResult
Dict of FileParam objects indexed by <node name>.<param name>
"""
pass
```

## User-defined code examples

### Deliver files to Nodes that take a single file as input

This example shows an Agent that delivers a csv file to the [Bioreactor_File Node](../../nodes/File/CSV_Read), an excel file containing the word 'medium' to the [Medium_Composition Node](../../nodes/File/Excel_Read.md), and an excel file containing the word 'eventlog' to the [Event_Log Node](../../nodes/File/Excel_Read.md).

```python
import glob
import os
import re
from typing import Callable, Dict, List, Union
from urllib import parse

from ganymede_sdk.agent.models import (
FileParam,
FileWatcherResult,
MultiFileParam,
TriggerFlowParams,
)


def fp(watch_dir: str, parent_dir: str, pattern: str) -> Callable[[str], bool]:
def fp_res(x: str):
x = parse.unquote(x)
return x in glob.glob(os.path.join(watch_dir, pattern), recursive=True)

return fp_res


# Required Function
def get_param_mapping(
watch_dir: str,
parent_dir: str = "",
file_name: str = "",
modified_time: str = "",
body: bytes = bytes(),
) -> Dict[str, Union[Callable[[str], bool], List[Callable[[str], bool]]]]:
id_group = re.search(r"^(\w+)", file_name)
if id_group is None:
return {}
id = id_group.group()
return {
"Bioreactor_File.csv": fp(watch_dir, parent_dir, f"*.csv"),
"Medium_Composition.excel": fp(watch_dir, parent_dir, f"*medium*.xlsx"),
"Event_Log.excel": fp(watch_dir, parent_dir, f"*eventlog*.xlsx"),
}


def execute(flow_params_fw: FileWatcherResult, **kwargs) -> TriggerFlowParams:
return TriggerFlowParams(
single_file_params=flow_params_fw.files,
multi_file_params=None,
benchling_tag=None,
additional_params={},
)
```

### Example with Node taking multiple files as input

This example shows an Agent configured to work with an flow with a Node taking multiple inputs, picking up filenames starting with 'Yeast_B1', 'Yeast_B2', 'Yeast_C1', 'Yeast_C2' and delivering the observed files to the Read_FCS_Files node. The Agent also delivers an input parameter of "exp234" to the [Experiment_ID Node](../../nodes/File/Input_File_Multi.md), which is an [Input_Param node](../../nodes/tag/Input_Param).

```python
import glob
import os
from typing import Callable, Dict

from agent_sdk import info

from ganymede_sdk.agent.models import (
FileWatcherResult,
MultiFileParam,
TriggerFlowParams,
)


def fp(watch_dir: str, parent_dir: str, pattern: str) -> Callable[[str], bool]:
def fp_res(x: str):
return x in glob.glob(os.path.join(watch_dir, pattern), recursive=True)

return fp_res

def get_param_mapping(
watch_dir: str,
parent_dir: str = "",
file_name: str = "",
modified_time: str = "",
body: bytes = bytes(),
) -> Dict[str, Callable[[str], bool]]:
match_dict = dict()
for well_row in range(1, 3):
for well_col in ["B", "C"]:
well_row_zfill = str(well_row).zfill(2)
match_key = f"Yeast_{well_col}{well_row}*"
match_dict[f"Yeast_{well_col}{well_row}.fcs"] = fp(
watch_dir, parent_dir, f"*{match_key}"
)

return match_dict


# Required Function
def execute(flow_params_fw: FileWatcherResult, **kwargs) -> TriggerFlowParams:
fcs_param = "Read_FCS_Files.fcs"

file_param_list = []
for file_param in list(flow_params_fw.files.values()):
file_param.content_type = 'application/octet-stream'
file_param.param = fcs_param
file_param_list.append(file_param)

m = MultiFileParam.from_file_param(file_param_list)
m.param = fcs_param


return TriggerFlowParams(
single_file_params=None,
multi_file_params={fcs_param: m},
benchling_tag=None,
additional_params={"Experiment_ID": "exp234"},
)
```

## Classes for Agent-triggered flows

### FileWatcherResult Class
Expand Down

0 comments on commit 7cb43c9

Please sign in to comment.