From 0699646f183ecc4187d8a3c0660cc0f95eb15c03 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 28 Mar 2024 02:05:15 -0700 Subject: [PATCH] docs: update agent development documentation --- docs/flyte_agents/developing_agents.md | 88 ++++++++++++++++----- docs/flyte_agents/index.md | 2 +- docs/flyte_agents/testing_agents_locally.md | 17 +++- 3 files changed, 81 insertions(+), 26 deletions(-) diff --git a/docs/flyte_agents/developing_agents.md b/docs/flyte_agents/developing_agents.md index 3ea6db1d03..423b129c56 100644 --- a/docs/flyte_agents/developing_agents.md +++ b/docs/flyte_agents/developing_agents.md @@ -42,9 +42,11 @@ To create a new async agent, extend the [`AsyncAgentBase`](https://github.com/fl - `delete`: Invoking this method will send a request to delete the corresponding job. ```python -from flytekit.extend.backend.base_agent import AsyncAgentBase, AgentRegistry, Resource -from flytekit import StructuredDataset +from typing import Optional +from flytekit.extend.backend.base_agent import AsyncAgentBase, AgentRegistry, Resource, ResourceMeta from dataclasses import dataclass +from flytekit.models.literals import LiteralMap +from flytekit.models.task import TaskTemplate @dataclass class BigQueryMetadata(ResourceMeta): @@ -60,21 +62,20 @@ class BigQueryAgent(AsyncAgentBase): def create( self, task_template: TaskTemplate, - inputs: typing.Optional[LiteralMap] = None, + inputs: Optional[LiteralMap] = None, **kwargs, ) -> BigQueryMetadata: - # Submit the job to BigQuery here. - return BigQueryMetadata(job_id=job_id, outputs={"o0": StructuredDataset(uri=result_table_uri))} + job_id = submit_bigquery_job(inputs) + return BigQueryMetadata(job_id=job_id) def get(self, resource_meta: BigQueryMetadata, **kwargs) -> Resource: - # Get the job status from BigQuery. - return Resource(phase=res.phase) + phase, outputs = get_job_status(resource_meta.job_id) + return Resource(phase=phase, outputs=outputs) def delete(self, resource_meta: BigQueryMetadata, **kwargs): - # Delete the job from BigQuery. - ... + cancel_bigquery_job(resource_meta.job_id) -# To register the custom agent +# To register the bigquery agent AgentRegistry.register(BigQueryAgent()) ``` @@ -87,7 +88,13 @@ To create a new sync agent, extend the [`SyncAgentBase`](https://github.com/flyt - `do`: This method is used to execute the synchronous task, and the worker in Flyte will be blocked until the method returns. ```python +from typing import Optional +from flyteidl.core.execution_pb2 import TaskExecution from flytekit.extend.backend.base_agent import SyncAgentBase, AgentRegistry, Resource +from flytekit.models.literals import LiteralMap +from flytekit.models.task import TaskTemplate +from flytekit import FlyteContextManager +from flytekit.core.type_engine import TypeEngine class OpenAIAgent(SyncAgentBase): def __init__(self): @@ -97,12 +104,34 @@ class OpenAIAgent(SyncAgentBase): # Convert the literal map to python value. ctx = FlyteContextManager.current_context() python_inputs = TypeEngine.literal_map_to_kwargs(ctx, inputs, literal_types=task_template.interface.inputs) - # Call the OpenAI API here. - return Resource(phase=phaseTaskExecution.SUCCEEDED, outputs={"o0": "Hello world!"}) + response = ask_chatgpt_question(python_inputs) + return Resource(phase=TaskExecution.SUCCEEDED, outputs={"o0": response}) AgentRegistry.register(OpenAIAgent()) ``` +#### Sensor interface specification +Building a sensor in Flyte is useful when you want to watch certain events or monitor the bucket in your workflow. +Leveraging the Agent functionality simplifies the process of integrating custom sensors into Flyte + +BaseSensor is a specialized abstraction built on top of the agent framework in Flyte. +It is designed to simplify the process of adding sensors to Flyte workflows. +You only need to implement the poke method, which checks whether a specific condition is met + +```python +from flytekit.sensor.base_sensor import BaseSensor +import s3fs + +class FileSensor(BaseSensor): + def __init__(self): + super().__init__(task_type="file_sensor") + + def poke(self, path: str) -> bool: + fs = s3fs.S3FileSystem() + return fs.exists(path) +``` + + ### 2. Test the agent locally See {doc}`"Testing agents locally" ` to test your agent locally. @@ -130,9 +159,14 @@ For flytekit versions `<=v1.10.2`, use `pyflyte serve`. For flytekit versions `>v1.10.2`, use `pyflyte serve agent`. ::: -### 4. Update FlyteAgent +### 4. Deploy Your FlyteAgent 1. Update the FlyteAgent deployment's [image](https://github.com/flyteorg/flyte/blob/master/charts/flyteagent/templates/agent/deployment.yaml#L35) + +```bash +kubectl set image deployment/flyteagent flyteagent=ghcr.io/flyteorg/flyteagent:latest +``` + 2. Update the FlytePropeller configmap. ```YAML @@ -143,7 +177,26 @@ For flytekit versions `>v1.10.2`, use `pyflyte serve agent`. default-for-task-types: - bigquery_query_job_task: agent-service - custom_task: agent-service +``` + +3. Restart the FlytePropeller +``` +kubectl rollout restart deployment flytepropeller -n flyte +``` + + +### Canary Deployment +Agents can be deployed independently in separate environments. This decoupling of agents from the +production environment ensures that if any specific agent encounters an error or issue, it will not impact the overall production system. + +By running agents independently, developers and operators can thoroughly test and validate their agents in a +controlled environment before deploying them to the production cluster. + +By default, all the requests will be sent to the default agent service. However, +you are still able to route particular task requests to designated agent services by adjusting the flytepropeller configuration. + +```yaml plugins: agent-service: supportedTaskTypes: @@ -168,11 +221,4 @@ For flytekit versions `>v1.10.2`, use `pyflyte serve agent`. agentForTaskTypes: # It will override the default agent for custom_task, which means propeller will send the request to this agent. - custom_task: custom_agent - ``` - -3. Restart the FlytePropeller - -``` -kubectl rollout restart deployment flytepropeller -n flyte -``` - +``` \ No newline at end of file diff --git a/docs/flyte_agents/index.md b/docs/flyte_agents/index.md index 293f661be9..d5813650cc 100644 --- a/docs/flyte_agents/index.md +++ b/docs/flyte_agents/index.md @@ -23,7 +23,7 @@ You can create different agent services that host different agents, e.g., a prod If you need to connect to an external service in your workflow, we recommend using the corresponding agent rather than a web API plugin. Agents are designed to be scalable and can handle large workloads efficiently, and decrease load on FlytePropeller, since they run outside of it. You can also test agents locally without having to change the Flyte backend configuration, streamlining development. -For a list of agents you can use in your tasks and example usage for each, see the [Integrations](https://docs.flyte.org/en/latest/flytesnacks/integrations.html#agents) documentation. +For a list of agents you can use in your tasks and example usage for each, see the [Integrations](https://docs.flyte.org/en/latest/flytesnacks/integrations.html#flyte-agents) documentation. ## Table of contents diff --git a/docs/flyte_agents/testing_agents_locally.md b/docs/flyte_agents/testing_agents_locally.md index 2d7b98ba3e..b2ba6811c8 100644 --- a/docs/flyte_agents/testing_agents_locally.md +++ b/docs/flyte_agents/testing_agents_locally.md @@ -11,7 +11,8 @@ jupytext: You can test agents locally without running the backend server, making agent development easier. -To test an agent locally, create a class for the agent task that inherits from [AsyncAgentExecutorMixin](https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/base_agent.py#L155). This mixin can handle both asynchronous tasks and synchronous tasks and allows flytekit to mimic FlytePropeller's behavior in calling the agent. +To test an agent locally, create a class for the agent task that inherits from `SyncAgentExecutorMixin` or `AsyncAgentExecutorMixin`. +these mixins can handle synchronous and asynchronous tasks respectively and allows flytekit to mimic FlytePropeller's behavior in calling the agent. ## BigQuery example @@ -24,11 +25,14 @@ For example, you need to set the `GOOGLE_APPLICATION_CREDENTIALS` environment va ``` -Add `AsyncAgentExecutorMixin` to this class to tell flytekit to use the agent to run the task. +Add `AsyncAgentExecutorMixin` or `SyncAgentExecutorMixin` to the class to tell flytekit to use the agent to run the task. ```python class BigQueryTask(AsyncAgentExecutorMixin, SQLTask[BigQueryConfig]): - def __init__(self, name: str, **kwargs): - ... + ... + +class ChatGPTTask(SyncAgentExecutorMixin, PythonTask): + ... + ``` Flytekit will automatically use the agent to run the task in the local execution. @@ -48,6 +52,10 @@ You can run the above example task locally and test the agent with the following pyflyte run bigquery_task.py bigquery_doge_coin --version 10 ``` +You can also run a BigQuery task in your Python interpreter to test the agent locally. + +![](https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/concepts/agents/bigquery_task.png) + ## Databricks example To test the Databricks agent, copy the following code to a file called `databricks_task.py`, modifying as needed. @@ -77,3 +85,4 @@ The Spark task will run locally if the `raw-output-data-prefix` is not set. pyflyte run --raw-output-data-prefix s3://my-s3-bucket/databricks databricks_task.py hello_spark ``` +![](https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/concepts/agents/spark_task.png) \ No newline at end of file