From 209344a78e3908eb43ca98584ce3cc67ffa39b23 Mon Sep 17 00:00:00 2001 From: nikki everett Date: Tue, 5 Mar 2024 16:13:07 -0600 Subject: [PATCH 1/4] add docker image build and flyteagent update steps Signed-off-by: nikki everett --- docs/flyte_agents/developing_agents.md | 82 +++++++++++++++++++++++++- 1 file changed, 80 insertions(+), 2 deletions(-) diff --git a/docs/flyte_agents/developing_agents.md b/docs/flyte_agents/developing_agents.md index a4a4adc7c7..6aacc6ef55 100644 --- a/docs/flyte_agents/developing_agents.md +++ b/docs/flyte_agents/developing_agents.md @@ -29,7 +29,11 @@ While agents can be written in any programming language, we currently only suppo ``` -## Async agent interface specification +## Steps + +### 1. Implement required methods + +#### Async agent interface specification To create a new async agent, extend the [`AsyncAgentBase`](https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/base_agent.py#L127) class and implement `create`, `get`, and `delete` methods. These methods must be idempotent. @@ -76,7 +80,7 @@ AgentRegistry.register(BigQueryAgent()) For an example implementation, see the [BigQuery agent](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py#L43). -## Sync agent interface specification +#### Sync agent interface specification To create a new sync agent, extend the [`SyncAgentBase`](https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/base_agent.py#L107) class and implement a `do` method. This method must be idempotent. @@ -98,3 +102,77 @@ class OpenAIAgent(SyncAgentBase): AgentRegistry.register(OpenAIAgent()) ``` + +### 2. Test the agent locally + +See "Testing agents locally" to test your agent locally. + +### 3. Build a new Docker image + +The following is a sample Dockerfile for building an image for a Flyte agent: + +```Dockerfile +FROM python:3.9-slim-buster + +MAINTAINER Flyte Team +LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytekit + +WORKDIR /root +ENV PYTHONPATH /root + +# flytekit will autoload the agent if package is installed. +RUN pip install flytekitplugins-bigquery +CMD pyflyte serve agent --port 8000 +``` + +:::{note} +For flytekit versions `<=v1.10.2`, use `pyflyte serve`. +For flytekit versions `>v1.10.2`, use `pyflyte serve agent`. +::: + +### 4. Update FlyteAgent + +1. Update the FlyteAgent deployment's [image](https://github.com/flyteorg/flyte/blob/master/charts/flyteagent/templates/agent/deployment.yaml#L35) +2. Update the FlytePropeller configmap. + +```YAML + tasks: + task-plugins: + enabled-plugins: + - agent-service + default-for-task-types: + - bigquery_query_job_task: agent-service + - custom_task: agent-service + + plugins: + agent-service: + supportedTaskTypes: + - bigquery_query_job_task + - default_task + - custom_task + # By default, all requests will be sent to the default agent. + defaultAgent: + endpoint: "dns:///flyteagent.flyte.svc.cluster.local:8000" + insecure: true + timeouts: + GetTask: 200ms + defaultTimeout: 50ms + agents: + custom_agent: + endpoint: "dns:///custom-flyteagent.flyte.svc.cluster.local:8000" + insecure: false + defaultServiceConfig: '{"loadBalancingConfig": [{"round_robin":{}}]}' + timeouts: + GetTask: 100ms + defaultTimeout: 20ms + agentForTaskTypes: + # It will override the default agent for custom_task, which means propeller will send the request to this agent. + - custom_task: custom_agent + ``` + +3. Restart the FlytePropeller + +``` +kubectl rollout restart deployment flytepropeller -n flyte +``` + From b5755e466a4214a036b6b5aac74a285c5db02c78 Mon Sep 17 00:00:00 2001 From: nikki everett Date: Tue, 5 Mar 2024 16:21:44 -0600 Subject: [PATCH 2/4] fix link Signed-off-by: nikki everett --- docs/flyte_agents/developing_agents.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/flyte_agents/developing_agents.md b/docs/flyte_agents/developing_agents.md index 6aacc6ef55..842fdaafd1 100644 --- a/docs/flyte_agents/developing_agents.md +++ b/docs/flyte_agents/developing_agents.md @@ -105,7 +105,7 @@ AgentRegistry.register(OpenAIAgent()) ### 2. Test the agent locally -See "Testing agents locally" to test your agent locally. +See {doc}`"Testing agents locally" ` to test your agent locally. ### 3. Build a new Docker image From ff7820f8a911a9e325a5255188efc9a91b45321a Mon Sep 17 00:00:00 2001 From: nikki everett Date: Tue, 5 Mar 2024 16:35:21 -0600 Subject: [PATCH 3/4] remove old agents doc and redirect to new agents guide Signed-off-by: nikki everett --- .../development_lifecycle/agents.md | 234 ------------------ .../user_guide/development_lifecycle/index.md | 1 - docs/user_guide/extending/index.md | 2 +- 3 files changed, 1 insertion(+), 236 deletions(-) delete mode 100644 docs/user_guide/development_lifecycle/agents.md diff --git a/docs/user_guide/development_lifecycle/agents.md b/docs/user_guide/development_lifecycle/agents.md deleted file mode 100644 index 2ec1309272..0000000000 --- a/docs/user_guide/development_lifecycle/agents.md +++ /dev/null @@ -1,234 +0,0 @@ ---- -jupytext: - cell_metadata_filter: all - formats: md:myst - main_language: python - notebook_metadata_filter: all - text_representation: - extension: .md - format_name: myst - format_version: 0.13 - jupytext_version: 1.16.1 -kernelspec: - display_name: Python 3 - language: python - name: python3 ---- - -(extend-agent-service)= - -# Agents - -```{eval-rst} -.. tags:: Extensibility, Contribute, Intermediate -``` - -:::{note} -This is an experimental feature, which is subject to change the API in the future. -::: - -## What is an agent? - -In Flyte, an agent is a long-running stateless service that can be used to execute tasks. It reduces the overhead of creating a pod for each task. -In addition, it's easy to scale up and down the agent service based on the workload. Agent services are designed to be language-agnostic. -For now, we only support Python agent, but we may support other languages in the future. - -Agent is designed to run a specific type of task. For example, you can create a BigQuery agent to run BigQuery task. Therefore, if you create a new type of task, you can -either run the task in the pod, or you can create a new agent to run it. You can determine how the task will be executed in the FlytePropeller configMap. - -Key goals of the agent service include: - -- Support for communication with external services: The focus is on enabling agents that seamlessly interact with external services. -- Independent testing and private deployment: Agents can be tested independently and deployed privately, providing flexibility and control over development. -- Flyte Agent usage in local development: Users, especially in `flytekit` and `unionml`, can leverage backend agents for local development, streamlining the development process. -- Language-agnostic: Agents can be authored in any programming language, allowing users to work with their preferred language and tools. -- Scalability: Agents are designed to be scalable, ensuring they can handle large-scale workloads effectively. -- Simple API: Agents offer a straightforward API, making integration and usage straightforward for developers. - -## Why do we need an agent service? - -Without agents, people need to implement a backend plugin in the propeller. The backend plugin is responsible for -creating a CRD and submitting a http request to the external service. However, it increases the complexity of flytepropeller, and -it's hard to maintain the backend plugin. For example, if we want to add a new plugin, we need to update and compile -flytepropeller, and it's also hard to test. In addition, the backend plugin is running in flytepropeller itself, so it -increases the load of the flytepropeller engine. - -Furthermore, implementing backend plugins can be challenging, particularly for data scientists and ML engineers who may lack proficiency in -Golang. Additionally, managing performance requirements, maintenance, and development can be burdensome. -To address these issues, we introduced the "Agent Service" in Flyte. This system enables rapid plugin -development while decoupling them from the core flytepropeller engine. - -## Overview - -The Flyte agent service is a Python-based agent registry powered by a gRPC server. It allows users and flytepropeller -to send gRPC requests to the registry for executing jobs such as BigQuery and Databricks. Each Agent service is a Kubernetes -deployment. You can create two different Agent services hosting different Agents. For example, you can create one production -agent service and one development agent service. - -:::{figure} https://i.ibb.co/vXhBDjP/Screen-Shot-2023-05-29-at-2-54-14-PM.png -:alt: Agent Service -:class: with-shadow -::: - -## How to register a new agent - -### Flytekit interface specification - -To register a new agent, you can extend the `AgentBase` class in the flytekit backend module. Implementing the following three methods is necessary, and it's important to ensure that all calls are idempotent: - -- `create`: This method is used to initiate a new task. Users have the flexibility to use gRPC, REST, or an SDK to create a task. -- `get`: This method allows retrieving the job Resource (jobID or output literal) associated with the task, such as a BigQuery Job ID or Databricks task ID. -- `delete`: Invoking this method will send a request to delete the corresponding job. - -```python -from flytekit.extend.backend.base_agent import AgentBase, AgentRegistry -from dataclasses import dataclass -import requests - -@dataclass -class Metadata: - # you can add any metadata you want, propeller will pass the metadata to the agent to get the job status. - # For example, you can add the job_id to the metadata, and the agent will use the job_id to get the job status. - # You could also add the s3 file path, and the agent can check if the file exists. - job_id: str - -class CustomAgent(AgentBase): - def __init__(self, task_type: str): - # Each agent should have a unique task type. Agent service will use the task type to find the corresponding agent. - self._task_type = task_type - - def create( - self, - context: grpc.ServicerContext, - output_prefix: str, - task_template: TaskTemplate, - inputs: typing.Optional[LiteralMap] = None, - ) -> TaskCreateResponse: - # 1. Submit the task to the external service (BigQuery, DataBricks, etc.) - # 2. Create a task metadata such as jobID. - # 3. Return the task metadata, and keep in mind that the metadata should be serialized to bytes. - res = requests.post(url, json=data) - return CreateTaskResponse(resource_meta=json.dumps(asdict(Metadata(job_id=str(res.job_id)))).encode("utf-8")) - - def get(self, context: grpc.ServicerContext, resource_meta: bytes) -> TaskGetResponse: - # 1. Deserialize the metadata. - # 2. Use the metadata to get the job status. - # 3. Return the job status. - metadata = Metadata(**json.loads(resource_meta.decode("utf-8"))) - res = requests.get(url, json={"job_id": metadata.job_id}) - return GetTaskResponse(resource=Resource(state=res.state) - - def delete(self, context: grpc.ServicerContext, resource_meta: bytes) -> TaskDeleteResponse: - # 1. Deserialize the metadata. - # 2. Use the metadata to delete the job. - # 3. If failed to delete the job, add the error message to the grpc context. - # context.set_code(grpc.StatusCode.INTERNAL) - # context.set_details(f"failed to create task with error {e}") - try: - metadata = Metadata(**json.loads(resource_meta.decode("utf-8"))) - requests.delete(url, json={"job_id": metadata.job_id}) - except Exception as e: - logger.error(f"failed to delete task with error {e}") - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"failed to delete task with error {e}") - return DeleteTaskResponse() - -# To register the custom agent -AgentRegistry.register(CustomAgent()) -``` - -Here is an example of [BigQuery Agent](https://github.com/flyteorg/flytekit/blob/9977aac26242ebbede8e00d476c2fbc59ac5487a/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py#L35) implementation. - -### How to test the agent - -Agent can be tested locally without running backend server. It makes the development of the agent easier. - -The task inherited from AsyncAgentExecutorMixin can be executed locally, allowing flytekit to mimic the propeller's behavior to call the agent. -In some cases, you should store credentials in your local environment when testing locally. -For example, you need to set the `GOOGLE_APPLICATION_CREDENTIALS` environment variable when testing the BigQuery task. -After setting up the CREDENTIALS, you can run the task locally. Flytekit will automatically call the agent to create, get, or delete the task. - -```python -bigquery_doge_coin = BigQueryTask( - name=f"bigquery.doge_coin", - inputs=kwtypes(version=int), - query_template="SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE version = @version LIMIT 10;", - output_structured_dataset_type=StructuredDataset, - task_config=BigQueryConfig(ProjectID="flyte-test-340607") -) -``` - -Task above task as an example, you can run the task locally and test agent with the following command: - -```bash -pyflyte run wf.py bigquery_doge_coin --version 10 -``` - -### Build a new image - -The following is a sample Dockerfile for building an image for a flyte agent. - -```Dockerfile -FROM python:3.9-slim-buster - -MAINTAINER Flyte Team -LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytekit - -WORKDIR /root -ENV PYTHONPATH /root - -# flytekit will autoload the agent if package is installed. -RUN pip install flytekitplugins-bigquery -CMD pyflyte serve agent --port 8000 -``` - -:::{note} -For flytekit versions `<=v1.10.2`, use `pyflyte serve`. -For flytekit versions `>v1.10.2`, use `pyflyte serve agent`. -::: - -### Update FlyteAgent - -1. Update the FlyteAgent deployment's [image](https://github.com/flyteorg/flyte/blob/c049865cba017ad826405c7145cd3eccbc553232/charts/flyteagent/templates/agent/deployment.yaml#L26) -2. Update the FlytePropeller configmap. - -```YAML -tasks: - task-plugins: - enabled-plugins: - - agent-service - default-for-task-types: - - bigquery_query_job_task: agent-service - - custom_task: agent-service - -plugins: - agent-service: - supportedTaskTypes: - - bigquery_query_job_task - - default_task - - custom_task - # By default, all the request will be sent to the default agent. - defaultAgent: - endpoint: "dns:///flyteagent.flyte.svc.cluster.local:8000" - insecure: true - timeouts: - GetTask: 200ms - defaultTimeout: 50ms - agents: - custom_agent: - endpoint: "dns:///custom-flyteagent.flyte.svc.cluster.local:8000" - insecure: false - defaultServiceConfig: '{"loadBalancingConfig": [{"round_robin":{}}]}' - timeouts: - GetTask: 100ms - defaultTimeout: 20ms - agentForTaskTypes: - # It will override the default agent for custom_task, which means propeller will send the request to this agent. - - custom_task: custom_agent -``` - -3. Restart the FlytePropeller - -``` -kubectl rollout restart deployment flytepropeller -n flyte -``` diff --git a/docs/user_guide/development_lifecycle/index.md b/docs/user_guide/development_lifecycle/index.md index 8c21abc291..1cb4f0b65a 100644 --- a/docs/user_guide/development_lifecycle/index.md +++ b/docs/user_guide/development_lifecycle/index.md @@ -8,7 +8,6 @@ You will gain an understanding of concepts like caching, the Flyte remote API, A :name: development_lifecycle_toc :hidden: -agents private_images caching cache_serializing diff --git a/docs/user_guide/extending/index.md b/docs/user_guide/extending/index.md index 19a553ddfc..999214bf50 100644 --- a/docs/user_guide/extending/index.md +++ b/docs/user_guide/extending/index.md @@ -157,7 +157,7 @@ of the plugin. _New in Flyte 1.7.0_ -{ref}`Flyte Agent Service ` allows you to write backend +The {ref}`Flyte Agent service ` allows you to write backend plugins in Python. ### Summary From 2fb26d825d8bec6e66ae580c1bf0fe23819607ac Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 6 Mar 2024 07:57:38 -0800 Subject: [PATCH 4/4] update timeout Signed-off-by: Kevin Su --- docs/flyte_agents/developing_agents.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/flyte_agents/developing_agents.md b/docs/flyte_agents/developing_agents.md index 842fdaafd1..3ea6db1d03 100644 --- a/docs/flyte_agents/developing_agents.md +++ b/docs/flyte_agents/developing_agents.md @@ -155,16 +155,16 @@ For flytekit versions `>v1.10.2`, use `pyflyte serve agent`. endpoint: "dns:///flyteagent.flyte.svc.cluster.local:8000" insecure: true timeouts: - GetTask: 200ms - defaultTimeout: 50ms + GetTask: 5s + defaultTimeout: 10s agents: custom_agent: endpoint: "dns:///custom-flyteagent.flyte.svc.cluster.local:8000" insecure: false defaultServiceConfig: '{"loadBalancingConfig": [{"round_robin":{}}]}' timeouts: - GetTask: 100ms - defaultTimeout: 20ms + GetTask: 5s + defaultTimeout: 10s agentForTaskTypes: # It will override the default agent for custom_task, which means propeller will send the request to this agent. - custom_task: custom_agent