Skip to content

Commit

Permalink
docs: update agent development documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
pingsutw committed Mar 28, 2024
1 parent c81133b commit 0699646
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 26 deletions.
88 changes: 67 additions & 21 deletions docs/flyte_agents/developing_agents.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ To create a new async agent, extend the [`AsyncAgentBase`](https://github.com/fl
- `delete`: Invoking this method will send a request to delete the corresponding job.

```python
from flytekit.extend.backend.base_agent import AsyncAgentBase, AgentRegistry, Resource
from flytekit import StructuredDataset
from typing import Optional
from flytekit.extend.backend.base_agent import AsyncAgentBase, AgentRegistry, Resource, ResourceMeta
from dataclasses import dataclass
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate

@dataclass
class BigQueryMetadata(ResourceMeta):
Expand All @@ -60,21 +62,20 @@ class BigQueryAgent(AsyncAgentBase):
def create(
self,
task_template: TaskTemplate,
inputs: typing.Optional[LiteralMap] = None,
inputs: Optional[LiteralMap] = None,
**kwargs,
) -> BigQueryMetadata:
# Submit the job to BigQuery here.
return BigQueryMetadata(job_id=job_id, outputs={"o0": StructuredDataset(uri=result_table_uri))}
job_id = submit_bigquery_job(inputs)
return BigQueryMetadata(job_id=job_id)

def get(self, resource_meta: BigQueryMetadata, **kwargs) -> Resource:
# Get the job status from BigQuery.
return Resource(phase=res.phase)
phase, outputs = get_job_status(resource_meta.job_id)
return Resource(phase=phase, outputs=outputs)

def delete(self, resource_meta: BigQueryMetadata, **kwargs):
# Delete the job from BigQuery.
...
cancel_bigquery_job(resource_meta.job_id)

# To register the custom agent
# To register the bigquery agent
AgentRegistry.register(BigQueryAgent())
```

Expand All @@ -87,7 +88,13 @@ To create a new sync agent, extend the [`SyncAgentBase`](https://github.com/flyt
- `do`: This method is used to execute the synchronous task, and the worker in Flyte will be blocked until the method returns.

```python
from typing import Optional
from flyteidl.core.execution_pb2 import TaskExecution
from flytekit.extend.backend.base_agent import SyncAgentBase, AgentRegistry, Resource
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate
from flytekit import FlyteContextManager
from flytekit.core.type_engine import TypeEngine

class OpenAIAgent(SyncAgentBase):
def __init__(self):
Expand All @@ -97,12 +104,34 @@ class OpenAIAgent(SyncAgentBase):
# Convert the literal map to python value.
ctx = FlyteContextManager.current_context()
python_inputs = TypeEngine.literal_map_to_kwargs(ctx, inputs, literal_types=task_template.interface.inputs)
# Call the OpenAI API here.
return Resource(phase=phaseTaskExecution.SUCCEEDED, outputs={"o0": "Hello world!"})
response = ask_chatgpt_question(python_inputs)
return Resource(phase=TaskExecution.SUCCEEDED, outputs={"o0": response})

AgentRegistry.register(OpenAIAgent())
```

#### Sensor interface specification
Building a sensor in Flyte is useful when you want to watch certain events or monitor the bucket in your workflow.
Leveraging the Agent functionality simplifies the process of integrating custom sensors into Flyte

BaseSensor is a specialized abstraction built on top of the agent framework in Flyte.
It is designed to simplify the process of adding sensors to Flyte workflows.
You only need to implement the poke method, which checks whether a specific condition is met

```python
from flytekit.sensor.base_sensor import BaseSensor
import s3fs

class FileSensor(BaseSensor):
def __init__(self):
super().__init__(task_type="file_sensor")

def poke(self, path: str) -> bool:
fs = s3fs.S3FileSystem()
return fs.exists(path)
```


### 2. Test the agent locally

See {doc}`"Testing agents locally" <testing_agents_locally>` to test your agent locally.
Expand Down Expand Up @@ -130,9 +159,14 @@ For flytekit versions `<=v1.10.2`, use `pyflyte serve`.
For flytekit versions `>v1.10.2`, use `pyflyte serve agent`.
:::

### 4. Update FlyteAgent
### 4. Deploy Your FlyteAgent

1. Update the FlyteAgent deployment's [image](https://github.com/flyteorg/flyte/blob/master/charts/flyteagent/templates/agent/deployment.yaml#L35)

```bash
kubectl set image deployment/flyteagent flyteagent=ghcr.io/flyteorg/flyteagent:latest
```

2. Update the FlytePropeller configmap.

```YAML
Expand All @@ -143,7 +177,26 @@ For flytekit versions `>v1.10.2`, use `pyflyte serve agent`.
default-for-task-types:
- bigquery_query_job_task: agent-service
- custom_task: agent-service
```
3. Restart the FlytePropeller
```
kubectl rollout restart deployment flytepropeller -n flyte
```


### Canary Deployment
Agents can be deployed independently in separate environments. This decoupling of agents from the
production environment ensures that if any specific agent encounters an error or issue, it will not impact the overall production system.

By running agents independently, developers and operators can thoroughly test and validate their agents in a
controlled environment before deploying them to the production cluster.

By default, all the requests will be sent to the default agent service. However,
you are still able to route particular task requests to designated agent services by adjusting the flytepropeller configuration.

```yaml
plugins:
agent-service:
supportedTaskTypes:
Expand All @@ -168,11 +221,4 @@ For flytekit versions `>v1.10.2`, use `pyflyte serve agent`.
agentForTaskTypes:
# It will override the default agent for custom_task, which means propeller will send the request to this agent.
- custom_task: custom_agent
```

3. Restart the FlytePropeller

```
kubectl rollout restart deployment flytepropeller -n flyte
```

```
2 changes: 1 addition & 1 deletion docs/flyte_agents/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ You can create different agent services that host different agents, e.g., a prod

If you need to connect to an external service in your workflow, we recommend using the corresponding agent rather than a web API plugin. Agents are designed to be scalable and can handle large workloads efficiently, and decrease load on FlytePropeller, since they run outside of it. You can also test agents locally without having to change the Flyte backend configuration, streamlining development.

For a list of agents you can use in your tasks and example usage for each, see the [Integrations](https://docs.flyte.org/en/latest/flytesnacks/integrations.html#agents) documentation.
For a list of agents you can use in your tasks and example usage for each, see the [Integrations](https://docs.flyte.org/en/latest/flytesnacks/integrations.html#flyte-agents) documentation.

## Table of contents

Expand Down
17 changes: 13 additions & 4 deletions docs/flyte_agents/testing_agents_locally.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ jupytext:

You can test agents locally without running the backend server, making agent development easier.

To test an agent locally, create a class for the agent task that inherits from [AsyncAgentExecutorMixin](https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/base_agent.py#L155). This mixin can handle both asynchronous tasks and synchronous tasks and allows flytekit to mimic FlytePropeller's behavior in calling the agent.
To test an agent locally, create a class for the agent task that inherits from `SyncAgentExecutorMixin` or `AsyncAgentExecutorMixin`.
these mixins can handle synchronous and asynchronous tasks respectively and allows flytekit to mimic FlytePropeller's behavior in calling the agent.

## BigQuery example

Expand All @@ -24,11 +25,14 @@ For example, you need to set the `GOOGLE_APPLICATION_CREDENTIALS` environment va
```

Add `AsyncAgentExecutorMixin` to this class to tell flytekit to use the agent to run the task.
Add `AsyncAgentExecutorMixin` or `SyncAgentExecutorMixin` to the class to tell flytekit to use the agent to run the task.
```python
class BigQueryTask(AsyncAgentExecutorMixin, SQLTask[BigQueryConfig]):
def __init__(self, name: str, **kwargs):
...
...

class ChatGPTTask(SyncAgentExecutorMixin, PythonTask):
...

```

Flytekit will automatically use the agent to run the task in the local execution.
Expand All @@ -48,6 +52,10 @@ You can run the above example task locally and test the agent with the following
pyflyte run bigquery_task.py bigquery_doge_coin --version 10
```

You can also run a BigQuery task in your Python interpreter to test the agent locally.

![](https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/concepts/agents/bigquery_task.png)

## Databricks example
To test the Databricks agent, copy the following code to a file called `databricks_task.py`, modifying as needed.

Expand Down Expand Up @@ -77,3 +85,4 @@ The Spark task will run locally if the `raw-output-data-prefix` is not set.
pyflyte run --raw-output-data-prefix s3://my-s3-bucket/databricks databricks_task.py hello_spark
```

![](https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/concepts/agents/spark_task.png)

0 comments on commit 0699646

Please sign in to comment.