From b2d0d85b9587bf1fdaa4ebf8ea889e04ff45904d Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 1 Apr 2024 13:28:33 -0700 Subject: [PATCH] docs: update agent development documentation (#5130) * docs: update agent development documentation * nit Signed-off-by: Kevin Su * lint Signed-off-by: Kevin Su * Update docs/flyte_agents/developing_agents.md Co-authored-by: Nikki Everett Signed-off-by: Kevin Su * Update docs/flyte_agents/developing_agents.md Co-authored-by: Nikki Everett Signed-off-by: Kevin Su * Update docs/flyte_agents/developing_agents.md Co-authored-by: Nikki Everett Signed-off-by: Kevin Su * Update docs/flyte_agents/developing_agents.md Co-authored-by: Nikki Everett Signed-off-by: Kevin Su * Update docs/flyte_agents/developing_agents.md Co-authored-by: Nikki Everett Signed-off-by: Kevin Su * Update docs/flyte_agents/developing_agents.md Co-authored-by: Nikki Everett Signed-off-by: Kevin Su * Update docs/flyte_agents/developing_agents.md Co-authored-by: Nikki Everett Signed-off-by: Kevin Su * Update docs/flyte_agents/developing_agents.md Co-authored-by: Nikki Everett Signed-off-by: Kevin Su * Update docs/flyte_agents/developing_agents.md Co-authored-by: Nikki Everett Signed-off-by: Kevin Su * Update docs/flyte_agents/developing_agents.md Co-authored-by: Nikki Everett Signed-off-by: Kevin Su * Update docs/flyte_agents/developing_agents.md Co-authored-by: Nikki Everett Signed-off-by: Kevin Su * Update docs/flyte_agents/developing_agents.md Co-authored-by: Nikki Everett Signed-off-by: Kevin Su --------- Signed-off-by: Kevin Su Co-authored-by: Nikki Everett --- docs/flyte_agents/developing_agents.md | 87 ++++++++++++++++----- docs/flyte_agents/index.md | 2 +- docs/flyte_agents/testing_agents_locally.md | 17 +++- 3 files changed, 80 insertions(+), 26 deletions(-) diff --git a/docs/flyte_agents/developing_agents.md b/docs/flyte_agents/developing_agents.md index 3ea6db1d03..f4fb1378b4 100644 --- a/docs/flyte_agents/developing_agents.md +++ b/docs/flyte_agents/developing_agents.md @@ -42,9 +42,12 @@ To create a new async agent, extend the [`AsyncAgentBase`](https://github.com/fl - `delete`: Invoking this method will send a request to delete the corresponding job. ```python -from flytekit.extend.backend.base_agent import AsyncAgentBase, AgentRegistry, Resource -from flytekit import StructuredDataset +from typing import Optional from dataclasses import dataclass +from flytekit.models.literals import LiteralMap +from flytekit.models.task import TaskTemplate +from flytekit.extend.backend.base_agent import AsyncAgentBase, AgentRegistry, Resource, ResourceMeta + @dataclass class BigQueryMetadata(ResourceMeta): @@ -60,21 +63,20 @@ class BigQueryAgent(AsyncAgentBase): def create( self, task_template: TaskTemplate, - inputs: typing.Optional[LiteralMap] = None, + inputs: Optional[LiteralMap] = None, **kwargs, ) -> BigQueryMetadata: - # Submit the job to BigQuery here. - return BigQueryMetadata(job_id=job_id, outputs={"o0": StructuredDataset(uri=result_table_uri))} + job_id = submit_bigquery_job(inputs) + return BigQueryMetadata(job_id=job_id) def get(self, resource_meta: BigQueryMetadata, **kwargs) -> Resource: - # Get the job status from BigQuery. - return Resource(phase=res.phase) + phase, outputs = get_job_status(resource_meta.job_id) + return Resource(phase=phase, outputs=outputs) def delete(self, resource_meta: BigQueryMetadata, **kwargs): - # Delete the job from BigQuery. - ... + cancel_bigquery_job(resource_meta.job_id) -# To register the custom agent +# To register the bigquery agent AgentRegistry.register(BigQueryAgent()) ``` @@ -87,8 +89,15 @@ To create a new sync agent, extend the [`SyncAgentBase`](https://github.com/flyt - `do`: This method is used to execute the synchronous task, and the worker in Flyte will be blocked until the method returns. ```python +from typing import Optional +from flytekit import FlyteContextManager +from flytekit.core.type_engine import TypeEngine +from flyteidl.core.execution_pb2 import TaskExecution +from flytekit.models.literals import LiteralMap +from flytekit.models.task import TaskTemplate from flytekit.extend.backend.base_agent import SyncAgentBase, AgentRegistry, Resource + class OpenAIAgent(SyncAgentBase): def __init__(self): super().__init__(task_type_name="openai") @@ -97,12 +106,31 @@ class OpenAIAgent(SyncAgentBase): # Convert the literal map to python value. ctx = FlyteContextManager.current_context() python_inputs = TypeEngine.literal_map_to_kwargs(ctx, inputs, literal_types=task_template.interface.inputs) - # Call the OpenAI API here. - return Resource(phase=phaseTaskExecution.SUCCEEDED, outputs={"o0": "Hello world!"}) + response = ask_chatgpt_question(python_inputs) + return Resource(phase=TaskExecution.SUCCEEDED, outputs={"o0": response}) AgentRegistry.register(OpenAIAgent()) ``` +#### Sensor interface specification +With the agent framework, you can easily build a custom sensor in Flyte to watch certain events or monitor the bucket in your workflow. + +To create a new sensor, extend the `[BaseSensor](https://github.com/flyteorg/flytekit/blob/master/flytekit/sensor/base_sensor.py#L43)` class and implement the `poke` method, which checks whether a specific condition is met. + +```python +from flytekit.sensor.base_sensor import BaseSensor +import s3fs + +class FileSensor(BaseSensor): + def __init__(self): + super().__init__(task_type="file_sensor") + + def poke(self, path: str) -> bool: + fs = s3fs.S3FileSystem() + return fs.exists(path) +``` + + ### 2. Test the agent locally See {doc}`"Testing agents locally" ` to test your agent locally. @@ -130,9 +158,14 @@ For flytekit versions `<=v1.10.2`, use `pyflyte serve`. For flytekit versions `>v1.10.2`, use `pyflyte serve agent`. ::: -### 4. Update FlyteAgent +### 4. Deploy Your Flyte agent 1. Update the FlyteAgent deployment's [image](https://github.com/flyteorg/flyte/blob/master/charts/flyteagent/templates/agent/deployment.yaml#L35) + +```bash +kubectl set image deployment/flyteagent flyteagent=ghcr.io/flyteorg/flyteagent:latest +``` + 2. Update the FlytePropeller configmap. ```YAML @@ -143,7 +176,26 @@ For flytekit versions `>v1.10.2`, use `pyflyte serve agent`. default-for-task-types: - bigquery_query_job_task: agent-service - custom_task: agent-service +``` +3. Restart FlytePropeller. + +``` +kubectl rollout restart deployment flytepropeller -n flyte +``` + + +### Canary deployment +Agents can be deployed independently in separate environments. Decoupling agents from the +production environment ensures that if any specific agent encounters an error or issue, it will not impact the overall production system. + +By running agents independently, you can thoroughly test and validate your agents in a +controlled environment before deploying them to the production cluster. + +By default, all agent requests will be sent to the default agent service. However, +you can route particular task requests to designated agent services by adjusting the flytepropeller configuration. + +```yaml plugins: agent-service: supportedTaskTypes: @@ -168,11 +220,4 @@ For flytekit versions `>v1.10.2`, use `pyflyte serve agent`. agentForTaskTypes: # It will override the default agent for custom_task, which means propeller will send the request to this agent. - custom_task: custom_agent - ``` - -3. Restart the FlytePropeller - -``` -kubectl rollout restart deployment flytepropeller -n flyte -``` - +``` \ No newline at end of file diff --git a/docs/flyte_agents/index.md b/docs/flyte_agents/index.md index 293f661be9..d5813650cc 100644 --- a/docs/flyte_agents/index.md +++ b/docs/flyte_agents/index.md @@ -23,7 +23,7 @@ You can create different agent services that host different agents, e.g., a prod If you need to connect to an external service in your workflow, we recommend using the corresponding agent rather than a web API plugin. Agents are designed to be scalable and can handle large workloads efficiently, and decrease load on FlytePropeller, since they run outside of it. You can also test agents locally without having to change the Flyte backend configuration, streamlining development. -For a list of agents you can use in your tasks and example usage for each, see the [Integrations](https://docs.flyte.org/en/latest/flytesnacks/integrations.html#agents) documentation. +For a list of agents you can use in your tasks and example usage for each, see the [Integrations](https://docs.flyte.org/en/latest/flytesnacks/integrations.html#flyte-agents) documentation. ## Table of contents diff --git a/docs/flyte_agents/testing_agents_locally.md b/docs/flyte_agents/testing_agents_locally.md index 2d7b98ba3e..dd4294dbea 100644 --- a/docs/flyte_agents/testing_agents_locally.md +++ b/docs/flyte_agents/testing_agents_locally.md @@ -11,7 +11,8 @@ jupytext: You can test agents locally without running the backend server, making agent development easier. -To test an agent locally, create a class for the agent task that inherits from [AsyncAgentExecutorMixin](https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/base_agent.py#L155). This mixin can handle both asynchronous tasks and synchronous tasks and allows flytekit to mimic FlytePropeller's behavior in calling the agent. +To test an agent locally, create a class for the agent task that inherits from `SyncAgentExecutorMixin` or `AsyncAgentExecutorMixin`. +These mixins can handle synchronous and asynchronous tasks, respectively, and allow flytekit to mimic FlytePropeller's behavior in calling the agent. ## BigQuery example @@ -24,11 +25,14 @@ For example, you need to set the `GOOGLE_APPLICATION_CREDENTIALS` environment va ``` -Add `AsyncAgentExecutorMixin` to this class to tell flytekit to use the agent to run the task. +Add `AsyncAgentExecutorMixin` or `SyncAgentExecutorMixin` to the class to tell flytekit to use the agent to run the task. ```python class BigQueryTask(AsyncAgentExecutorMixin, SQLTask[BigQueryConfig]): - def __init__(self, name: str, **kwargs): - ... + ... + +class ChatGPTTask(SyncAgentExecutorMixin, PythonTask): + ... + ``` Flytekit will automatically use the agent to run the task in the local execution. @@ -48,6 +52,10 @@ You can run the above example task locally and test the agent with the following pyflyte run bigquery_task.py bigquery_doge_coin --version 10 ``` +You can also run a BigQuery task in your Python interpreter to test the agent locally. + +![](https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/concepts/agents/bigquery_task.png) + ## Databricks example To test the Databricks agent, copy the following code to a file called `databricks_task.py`, modifying as needed. @@ -77,3 +85,4 @@ The Spark task will run locally if the `raw-output-data-prefix` is not set. pyflyte run --raw-output-data-prefix s3://my-s3-bucket/databricks databricks_task.py hello_spark ``` +![](https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/concepts/agents/spark_task.png) \ No newline at end of file