Skip to content

Commit

Permalink
docs: update agent development documentation (#5130)
Browse files Browse the repository at this point in the history
* docs: update agent development documentation

* nit

Signed-off-by: Kevin Su <[email protected]>

* lint

Signed-off-by: Kevin Su <[email protected]>

* Update docs/flyte_agents/developing_agents.md

Co-authored-by: Nikki Everett <[email protected]>
Signed-off-by: Kevin Su <[email protected]>

* Update docs/flyte_agents/developing_agents.md

Co-authored-by: Nikki Everett <[email protected]>
Signed-off-by: Kevin Su <[email protected]>

* Update docs/flyte_agents/developing_agents.md

Co-authored-by: Nikki Everett <[email protected]>
Signed-off-by: Kevin Su <[email protected]>

* Update docs/flyte_agents/developing_agents.md

Co-authored-by: Nikki Everett <[email protected]>
Signed-off-by: Kevin Su <[email protected]>

* Update docs/flyte_agents/developing_agents.md

Co-authored-by: Nikki Everett <[email protected]>
Signed-off-by: Kevin Su <[email protected]>

* Update docs/flyte_agents/developing_agents.md

Co-authored-by: Nikki Everett <[email protected]>
Signed-off-by: Kevin Su <[email protected]>

* Update docs/flyte_agents/developing_agents.md

Co-authored-by: Nikki Everett <[email protected]>
Signed-off-by: Kevin Su <[email protected]>

* Update docs/flyte_agents/developing_agents.md

Co-authored-by: Nikki Everett <[email protected]>
Signed-off-by: Kevin Su <[email protected]>

* Update docs/flyte_agents/developing_agents.md

Co-authored-by: Nikki Everett <[email protected]>
Signed-off-by: Kevin Su <[email protected]>

* Update docs/flyte_agents/developing_agents.md

Co-authored-by: Nikki Everett <[email protected]>
Signed-off-by: Kevin Su <[email protected]>

* Update docs/flyte_agents/developing_agents.md

Co-authored-by: Nikki Everett <[email protected]>
Signed-off-by: Kevin Su <[email protected]>

* Update docs/flyte_agents/developing_agents.md

Co-authored-by: Nikki Everett <[email protected]>
Signed-off-by: Kevin Su <[email protected]>

---------

Signed-off-by: Kevin Su <[email protected]>
Co-authored-by: Nikki Everett <[email protected]>
  • Loading branch information
pingsutw and neverett authored Apr 1, 2024
1 parent ba86c69 commit b2d0d85
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 26 deletions.
87 changes: 66 additions & 21 deletions docs/flyte_agents/developing_agents.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ To create a new async agent, extend the [`AsyncAgentBase`](https://github.com/fl
- `delete`: Invoking this method will send a request to delete the corresponding job.

```python
from flytekit.extend.backend.base_agent import AsyncAgentBase, AgentRegistry, Resource
from flytekit import StructuredDataset
from typing import Optional
from dataclasses import dataclass
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate
from flytekit.extend.backend.base_agent import AsyncAgentBase, AgentRegistry, Resource, ResourceMeta


@dataclass
class BigQueryMetadata(ResourceMeta):
Expand All @@ -60,21 +63,20 @@ class BigQueryAgent(AsyncAgentBase):
def create(
self,
task_template: TaskTemplate,
inputs: typing.Optional[LiteralMap] = None,
inputs: Optional[LiteralMap] = None,
**kwargs,
) -> BigQueryMetadata:
# Submit the job to BigQuery here.
return BigQueryMetadata(job_id=job_id, outputs={"o0": StructuredDataset(uri=result_table_uri))}
job_id = submit_bigquery_job(inputs)
return BigQueryMetadata(job_id=job_id)

def get(self, resource_meta: BigQueryMetadata, **kwargs) -> Resource:
# Get the job status from BigQuery.
return Resource(phase=res.phase)
phase, outputs = get_job_status(resource_meta.job_id)
return Resource(phase=phase, outputs=outputs)

def delete(self, resource_meta: BigQueryMetadata, **kwargs):
# Delete the job from BigQuery.
...
cancel_bigquery_job(resource_meta.job_id)

# To register the custom agent
# To register the bigquery agent
AgentRegistry.register(BigQueryAgent())
```

Expand All @@ -87,8 +89,15 @@ To create a new sync agent, extend the [`SyncAgentBase`](https://github.com/flyt
- `do`: This method is used to execute the synchronous task, and the worker in Flyte will be blocked until the method returns.

```python
from typing import Optional
from flytekit import FlyteContextManager
from flytekit.core.type_engine import TypeEngine
from flyteidl.core.execution_pb2 import TaskExecution
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate
from flytekit.extend.backend.base_agent import SyncAgentBase, AgentRegistry, Resource


class OpenAIAgent(SyncAgentBase):
def __init__(self):
super().__init__(task_type_name="openai")
Expand All @@ -97,12 +106,31 @@ class OpenAIAgent(SyncAgentBase):
# Convert the literal map to python value.
ctx = FlyteContextManager.current_context()
python_inputs = TypeEngine.literal_map_to_kwargs(ctx, inputs, literal_types=task_template.interface.inputs)
# Call the OpenAI API here.
return Resource(phase=phaseTaskExecution.SUCCEEDED, outputs={"o0": "Hello world!"})
response = ask_chatgpt_question(python_inputs)
return Resource(phase=TaskExecution.SUCCEEDED, outputs={"o0": response})

AgentRegistry.register(OpenAIAgent())
```

#### Sensor interface specification
With the agent framework, you can easily build a custom sensor in Flyte to watch certain events or monitor the bucket in your workflow.

To create a new sensor, extend the `[BaseSensor](https://github.com/flyteorg/flytekit/blob/master/flytekit/sensor/base_sensor.py#L43)` class and implement the `poke` method, which checks whether a specific condition is met.

```python
from flytekit.sensor.base_sensor import BaseSensor
import s3fs

class FileSensor(BaseSensor):
def __init__(self):
super().__init__(task_type="file_sensor")

def poke(self, path: str) -> bool:
fs = s3fs.S3FileSystem()
return fs.exists(path)
```


### 2. Test the agent locally

See {doc}`"Testing agents locally" <testing_agents_locally>` to test your agent locally.
Expand Down Expand Up @@ -130,9 +158,14 @@ For flytekit versions `<=v1.10.2`, use `pyflyte serve`.
For flytekit versions `>v1.10.2`, use `pyflyte serve agent`.
:::

### 4. Update FlyteAgent
### 4. Deploy Your Flyte agent

1. Update the FlyteAgent deployment's [image](https://github.com/flyteorg/flyte/blob/master/charts/flyteagent/templates/agent/deployment.yaml#L35)

```bash
kubectl set image deployment/flyteagent flyteagent=ghcr.io/flyteorg/flyteagent:latest
```

2. Update the FlytePropeller configmap.

```YAML
Expand All @@ -143,7 +176,26 @@ For flytekit versions `>v1.10.2`, use `pyflyte serve agent`.
default-for-task-types:
- bigquery_query_job_task: agent-service
- custom_task: agent-service
```
3. Restart FlytePropeller.
```
kubectl rollout restart deployment flytepropeller -n flyte
```


### Canary deployment
Agents can be deployed independently in separate environments. Decoupling agents from the
production environment ensures that if any specific agent encounters an error or issue, it will not impact the overall production system.

By running agents independently, you can thoroughly test and validate your agents in a
controlled environment before deploying them to the production cluster.

By default, all agent requests will be sent to the default agent service. However,
you can route particular task requests to designated agent services by adjusting the flytepropeller configuration.

```yaml
plugins:
agent-service:
supportedTaskTypes:
Expand All @@ -168,11 +220,4 @@ For flytekit versions `>v1.10.2`, use `pyflyte serve agent`.
agentForTaskTypes:
# It will override the default agent for custom_task, which means propeller will send the request to this agent.
- custom_task: custom_agent
```

3. Restart the FlytePropeller

```
kubectl rollout restart deployment flytepropeller -n flyte
```

```
2 changes: 1 addition & 1 deletion docs/flyte_agents/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ You can create different agent services that host different agents, e.g., a prod

If you need to connect to an external service in your workflow, we recommend using the corresponding agent rather than a web API plugin. Agents are designed to be scalable and can handle large workloads efficiently, and decrease load on FlytePropeller, since they run outside of it. You can also test agents locally without having to change the Flyte backend configuration, streamlining development.

For a list of agents you can use in your tasks and example usage for each, see the [Integrations](https://docs.flyte.org/en/latest/flytesnacks/integrations.html#agents) documentation.
For a list of agents you can use in your tasks and example usage for each, see the [Integrations](https://docs.flyte.org/en/latest/flytesnacks/integrations.html#flyte-agents) documentation.

## Table of contents

Expand Down
17 changes: 13 additions & 4 deletions docs/flyte_agents/testing_agents_locally.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ jupytext:

You can test agents locally without running the backend server, making agent development easier.

To test an agent locally, create a class for the agent task that inherits from [AsyncAgentExecutorMixin](https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/base_agent.py#L155). This mixin can handle both asynchronous tasks and synchronous tasks and allows flytekit to mimic FlytePropeller's behavior in calling the agent.
To test an agent locally, create a class for the agent task that inherits from `SyncAgentExecutorMixin` or `AsyncAgentExecutorMixin`.
These mixins can handle synchronous and asynchronous tasks, respectively, and allow flytekit to mimic FlytePropeller's behavior in calling the agent.

## BigQuery example

Expand All @@ -24,11 +25,14 @@ For example, you need to set the `GOOGLE_APPLICATION_CREDENTIALS` environment va
```

Add `AsyncAgentExecutorMixin` to this class to tell flytekit to use the agent to run the task.
Add `AsyncAgentExecutorMixin` or `SyncAgentExecutorMixin` to the class to tell flytekit to use the agent to run the task.
```python
class BigQueryTask(AsyncAgentExecutorMixin, SQLTask[BigQueryConfig]):
def __init__(self, name: str, **kwargs):
...
...

class ChatGPTTask(SyncAgentExecutorMixin, PythonTask):
...

```

Flytekit will automatically use the agent to run the task in the local execution.
Expand All @@ -48,6 +52,10 @@ You can run the above example task locally and test the agent with the following
pyflyte run bigquery_task.py bigquery_doge_coin --version 10
```

You can also run a BigQuery task in your Python interpreter to test the agent locally.

![](https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/concepts/agents/bigquery_task.png)

## Databricks example
To test the Databricks agent, copy the following code to a file called `databricks_task.py`, modifying as needed.

Expand Down Expand Up @@ -77,3 +85,4 @@ The Spark task will run locally if the `raw-output-data-prefix` is not set.
pyflyte run --raw-output-data-prefix s3://my-s3-bucket/databricks databricks_task.py hello_spark
```

![](https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/concepts/agents/spark_task.png)

0 comments on commit b2d0d85

Please sign in to comment.