From d97ac1a3590cb823517f24dd446ee83f621389fa Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Fri, 29 Mar 2024 15:50:38 +0800 Subject: [PATCH 1/4] Log Agents In The Agent Server Signed-off-by: Future-Outlier --- flytekit/clis/sdk_in_container/serve.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/flytekit/clis/sdk_in_container/serve.py b/flytekit/clis/sdk_in_container/serve.py index 6a7e5c3c28..b50b547458 100644 --- a/flytekit/clis/sdk_in_container/serve.py +++ b/flytekit/clis/sdk_in_container/serve.py @@ -57,6 +57,8 @@ async def _start_grpc_server(port: int, worker: int, timeout: int): _start_http_server() click.secho("Starting the agent service...", fg="blue") + print_agents_metadata() + server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=worker)) add_AsyncAgentServiceServicer_to_server(AsyncAgentService(), server) @@ -96,3 +98,17 @@ def _start_health_check_server(server: grpc.Server, worker: int): except ImportError as e: click.secho(f"Failed to start the health check servicer with error {e}", fg="red") + + +def print_agents_metadata(): + from flytekit.extend.backend.base_agent import AgentRegistry + + agents = AgentRegistry.list_agents() + for agent in agents: + name = agent.name + task_type = "sync" if agent.is_sync else "async" + for task_category in agent.supported_task_categories: + click.secho( + f"Starting {name} supports {task_type} task {task_category.name} with version {task_category.version}", + fg="blue", + ) From cfc27e5ed4b4a71668151d01c05093138e44a5f8 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Fri, 29 Mar 2024 16:10:42 +0800 Subject: [PATCH 2/4] add tests Signed-off-by: Future-Outlier --- tests/flytekit/unit/extend/test_agent.py | 26 ++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/flytekit/unit/extend/test_agent.py b/tests/flytekit/unit/extend/test_agent.py index 2bf23abb25..4d77dc81b1 100644 --- a/tests/flytekit/unit/extend/test_agent.py +++ b/tests/flytekit/unit/extend/test_agent.py @@ -6,6 +6,7 @@ import grpc import pytest from flyteidl.admin.agent_pb2 import ( + Agent, CreateRequestHeader, CreateTaskRequest, DeleteTaskRequest, @@ -20,6 +21,7 @@ from flyteidl.core.identifier_pb2 import ResourceType from flytekit import PythonFunctionTask, task +from flytekit.clis.sdk_in_container.serve import print_agents_metadata from flytekit.configuration import FastSerializationSettings, Image, ImageConfig, SerializationSettings from flytekit.core.base_task import PythonTask, kwtypes from flytekit.core.interface import Interface @@ -384,3 +386,27 @@ def test_render_task_template(): "task-name", "simple_task", ] + + +@pytest.fixture +def sample_agents(): + async_agent = Agent( + name="Sensor", is_sync=False, supported_task_categories=[TaskCategory(name="sensor", version=0)] + ) + sync_agent = Agent( + name="ChatGPT Agent", is_sync=True, supported_task_categories=[TaskCategory(name="chatgpt", version=0)] + ) + return [async_agent, sync_agent] + + +@patch("flytekit.clis.sdk_in_container.serve.click.secho") +@patch("flytekit.extend.backend.base_agent.AgentRegistry.list_agents") +def test_print_agents_metadata_output(list_agents_mock, mock_secho, sample_agents): + list_agents_mock.return_value = sample_agents + print_agents_metadata() + expected_calls = [ + (("Starting Sensor supports async task sensor with version 0",), {"fg": "blue"}), + (("Starting ChatGPT Agent supports sync task chatgpt with version 0",), {"fg": "blue"}), + ] + mock_secho.assert_has_calls(expected_calls, any_order=True) + assert mock_secho.call_count == len(expected_calls) From b1150edc35ae0dd7cf2ccd75d455779cb1bbbfb9 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sun, 7 Apr 2024 03:52:31 -0700 Subject: [PATCH 3/4] nit Signed-off-by: Kevin Su --- flytekit/clis/sdk_in_container/serve.py | 7 +------ tests/flytekit/unit/extend/test_agent.py | 4 ++-- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/flytekit/clis/sdk_in_container/serve.py b/flytekit/clis/sdk_in_container/serve.py index b50b547458..232d7164c1 100644 --- a/flytekit/clis/sdk_in_container/serve.py +++ b/flytekit/clis/sdk_in_container/serve.py @@ -106,9 +106,4 @@ def print_agents_metadata(): agents = AgentRegistry.list_agents() for agent in agents: name = agent.name - task_type = "sync" if agent.is_sync else "async" - for task_category in agent.supported_task_categories: - click.secho( - f"Starting {name} supports {task_type} task {task_category.name} with version {task_category.version}", - fg="blue", - ) + click.secho(f"Starting {name}...", fg="blue") diff --git a/tests/flytekit/unit/extend/test_agent.py b/tests/flytekit/unit/extend/test_agent.py index 4d77dc81b1..208a84684a 100644 --- a/tests/flytekit/unit/extend/test_agent.py +++ b/tests/flytekit/unit/extend/test_agent.py @@ -405,8 +405,8 @@ def test_print_agents_metadata_output(list_agents_mock, mock_secho, sample_agent list_agents_mock.return_value = sample_agents print_agents_metadata() expected_calls = [ - (("Starting Sensor supports async task sensor with version 0",), {"fg": "blue"}), - (("Starting ChatGPT Agent supports sync task chatgpt with version 0",), {"fg": "blue"}), + (("Starting Sensor...",), {"fg": "blue"}), + (("Starting ChatGPT Agent...",), {"fg": "blue"}), ] mock_secho.assert_has_calls(expected_calls, any_order=True) assert mock_secho.call_count == len(expected_calls) From 9fb8af83ac00c33f69e0feb08fb58c563f27ef8b Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sun, 7 Apr 2024 04:15:50 -0700 Subject: [PATCH 4/4] nit Signed-off-by: Kevin Su --- flytekit/clis/sdk_in_container/serve.py | 3 ++- plugins/flytekit-snowflake/flytekitplugins/snowflake/agent.py | 2 ++ tests/flytekit/unit/extend/test_agent.py | 4 ++-- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/flytekit/clis/sdk_in_container/serve.py b/flytekit/clis/sdk_in_container/serve.py index 232d7164c1..efe7086126 100644 --- a/flytekit/clis/sdk_in_container/serve.py +++ b/flytekit/clis/sdk_in_container/serve.py @@ -106,4 +106,5 @@ def print_agents_metadata(): agents = AgentRegistry.list_agents() for agent in agents: name = agent.name - click.secho(f"Starting {name}...", fg="blue") + metadata = [category.name for category in agent.supported_task_categories] + click.secho(f"Starting {name} that supports task categories {metadata}", fg="blue") diff --git a/plugins/flytekit-snowflake/flytekitplugins/snowflake/agent.py b/plugins/flytekit-snowflake/flytekitplugins/snowflake/agent.py index 8cb38662e3..71eba91186 100644 --- a/plugins/flytekit-snowflake/flytekitplugins/snowflake/agent.py +++ b/plugins/flytekit-snowflake/flytekitplugins/snowflake/agent.py @@ -59,6 +59,8 @@ def get_connection(metadata: SnowflakeJobMetadata) -> snowflake_connector: class SnowflakeAgent(AsyncAgentBase): + name = "Snowflake Agent" + def __init__(self): super().__init__(task_type_name=TASK_TYPE, metadata_type=SnowflakeJobMetadata) diff --git a/tests/flytekit/unit/extend/test_agent.py b/tests/flytekit/unit/extend/test_agent.py index 208a84684a..17db5c2788 100644 --- a/tests/flytekit/unit/extend/test_agent.py +++ b/tests/flytekit/unit/extend/test_agent.py @@ -405,8 +405,8 @@ def test_print_agents_metadata_output(list_agents_mock, mock_secho, sample_agent list_agents_mock.return_value = sample_agents print_agents_metadata() expected_calls = [ - (("Starting Sensor...",), {"fg": "blue"}), - (("Starting ChatGPT Agent...",), {"fg": "blue"}), + (("Starting Sensor that supports task categories ['sensor']",), {"fg": "blue"}), + (("Starting ChatGPT Agent that supports task categories ['chatgpt']",), {"fg": "blue"}), ] mock_secho.assert_has_calls(expected_calls, any_order=True) assert mock_secho.call_count == len(expected_calls)