Skip to content

Commit

Permalink
AppBuilderClient 增加异步调用 (#666)
Browse files Browse the repository at this point in the history
* chainlit chainlit_agent支持工作流Agent应用

* update

* AppBuilderClient增加异步调用

* 完善debug模式curl命令打印

* 增加异步上传文档方法

* 增加async client单测

* 增加异步event_handler

* 异步event_handler增加追问、toolcall单测

* update

* add unittest

* add unittest

* update unittest

* update unittest
  • Loading branch information
userpj authored Dec 19, 2024
1 parent e605685 commit 9a95c5e
Show file tree
Hide file tree
Showing 15 changed files with 1,694 additions and 147 deletions.
26 changes: 14 additions & 12 deletions python/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def get_default_header():
from appbuilder.core.utils import get_model_list

from appbuilder.core.console.appbuilder_client.appbuilder_client import AppBuilderClient
from appbuilder.core.console.appbuilder_client.async_appbuilder_client import AsyncAppBuilderClient
from appbuilder.core.console.appbuilder_client.appbuilder_client import AgentBuilder
from appbuilder.core.console.appbuilder_client.appbuilder_client import get_app_list, get_all_apps, describe_apps
from appbuilder.core.console.knowledge_base.knowledge_base import KnowledgeBase
Expand All @@ -202,19 +203,20 @@ def get_default_header():
from appbuilder.utils.trace.tracer import AppBuilderTracer, AppbuilderInstrumentor

__all__ = [
'logger',
'BadRequestException',
'ForbiddenException',
'NotFoundException',
'PreconditionFailedException',
'InternalServerErrorException',
'HTTPConnectionException',
'AppBuilderServerException',
'AppbuilderTraceException',
'AppbuilderTestToolEval',
'AutomaticTestToolEval',
"logger",
"BadRequestException",
"ForbiddenException",
"NotFoundException",
"PreconditionFailedException",
"InternalServerErrorException",
"HTTPConnectionException",
"AppBuilderServerException",
"AppbuilderTraceException",
"AppbuilderTestToolEval",
"AutomaticTestToolEval",
"get_model_list",
"AppBuilderClient",
"AsyncAppBuilderClient",
"AgentBuilder",
"get_app_list",
"get_all_apps",
Expand All @@ -232,5 +234,5 @@ def get_default_header():
"AssistantEventHandler",
"AssistantStreamManager",
"AppBuilderTracer",
"AppbuilderInstrumentor"
"AppbuilderInstrumentor",
] + __COMPONENTS__
45 changes: 42 additions & 3 deletions python/core/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@

import requests
from requests.adapters import HTTPAdapter, Retry
from aiohttp import ClientResponse

from appbuilder import get_default_header

from appbuilder.core._exception import *
from appbuilder.core._session import InnerSession
from appbuilder.core._session import InnerSession, AsyncInnerSession
from appbuilder.core.constants import (
GATEWAY_URL,
GATEWAY_URL_V2,
Expand Down Expand Up @@ -100,7 +101,8 @@ def _init_secret_key(self, secret_key: str):
secret_key_prefix = os.getenv("SECRET_KEY_PREFIX", SECRET_KEY_PREFIX)

if not self.secret_key.startswith(secret_key_prefix):
self.secret_key = "{} {}".format(secret_key_prefix, self.secret_key)
self.secret_key = "{} {}".format(
secret_key_prefix, self.secret_key)

logger.debug("AppBuilder Secret key: {}\n".format(self.secret_key))

Expand Down Expand Up @@ -181,7 +183,8 @@ def check_console_response(response: requests.Response):
data = response.json()
if "code" in data and data.get("code") != 0:
requestId = __class__.response_request_id(response)
raise AppBuilderServerException(requestId, data["code"], data["message"])
raise AppBuilderServerException(
requestId, data["code"], data["message"])

def auth_header(self, request_id: Optional[str] = None):
r"""auth_header is a helper method return auth info"""
Expand Down Expand Up @@ -234,6 +237,42 @@ def inner(*args, **kwargs):
return inner


class AsyncHTTPClient(HTTPClient):
def __init__(self, secret_key=None, gateway="", gateway_v2=""):
super().__init__(secret_key, gateway, gateway_v2)
self.session = AsyncInnerSession()

@staticmethod
def check_response_header(response: ClientResponse):
r"""check_response_header is a helper method for check head status .
:param response: requests.Response.
:rtype:
"""
status_code = response.status
if status_code == requests.codes.ok:
return
message = "request_id={} , http status code is {}, body is {}".format(
__class__.response_request_id(response), status_code, response.text
)
if status_code == requests.codes.bad_request:
raise BadRequestException(message)
elif status_code == requests.codes.forbidden:
raise ForbiddenException(message)
elif status_code == requests.codes.not_found:
raise NotFoundException(message)
elif status_code == requests.codes.precondition_required:
raise PreconditionFailedException(message)
elif status_code == requests.codes.internal_server_error:
raise InternalServerErrorException(message)
else:
raise BaseRPCException(message)

@staticmethod
def response_request_id(response: ClientResponse):
r"""response_request_id is a helper method to get the unique request id"""
return response.headers.get("X-Appbuilder-Request-Id", "")


class AssistantHTTPClient(HTTPClient):
def service_url(self, sub_path: str, prefix: str = None):
"""
Expand Down
56 changes: 56 additions & 0 deletions python/core/_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import requests
import json
import aiohttp
from aiohttp import ClientSession, hdrs
from appbuilder.utils.logger_util import logger
from appbuilder.utils.trace.tracer_wrapper import session_post

Expand Down Expand Up @@ -72,3 +74,57 @@ def get(self, url, **kwargs):
@session_post
def put(self, url, data=None, **kwargs):
return super().put(url=url, data=data, **kwargs)


class AsyncInnerSession(ClientSession):

def __init__(self, *args, **kwargs):
"""
Initialize inner session.
"""
super(AsyncInnerSession, self).__init__(*args, **kwargs)

async def build_curl(self, method, url, data=None, json_data=None, **kwargs) -> str:
"""
Generate cURL command from prepared request object.
"""
curl = "curl -X {0} -L '{1}' \\\n".format(method, url)

headers = kwargs.get("headers", {})
headers_strs = [
"-H '{0}: {1}' \\".format(k, v) for k, v in headers.items()]
if headers_strs:
headers_strs[-1] = headers_strs[-1].rstrip(" \\")
curl += "\n".join(headers_strs)

if data:
try:
body = "'{0}'".format(json.dumps(data, ensure_ascii=False))
curl += " \\\n-d {0}".format(body)
except:
pass
elif json_data:
body = "'{0}'".format(json.dumps(json_data, ensure_ascii=False))
curl += " \\\n-d {0}".format(body)

return curl

@session_post
async def post(self, url, data=None, json=None, **kwargs):
logger.debug("Curl Command:\n" + await self.build_curl(hdrs.METH_POST, url, data=data, json_data=json, **kwargs) + "\n")
return await super().post(url=url, data=data, json=json, **kwargs)

@session_post
async def delete(self, url, **kwargs):
logger.debug("Curl Command:\n" + await self.build_curl(hdrs.METH_DELETE, url, **kwargs) + "\n")
return await super().delete(url=url, **kwargs)

@session_post
async def get(self, url, **kwargs):
logger.debug("Curl Command:\n" + await self.build_curl(hdrs.METH_GET, url, **kwargs) + "\n")
return await super().get(url=url, **kwargs)

@session_post
async def put(self, url, data=None, **kwargs):
logger.debug("Curl Command:\n" + await self.build_curl(hdrs.METH_PUT, url, data=data, **kwargs) + "\n")
return await super().put(url=url, data=data, **kwargs)
40 changes: 28 additions & 12 deletions python/core/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from typing import (
Dict, List, Optional, Any, Generator, Union, AsyncGenerator)
from appbuilder.core.utils import ttl_lru_cache
from appbuilder.core._client import HTTPClient
from appbuilder.core._client import HTTPClient, AsyncHTTPClient
from appbuilder.core.message import Message


Expand Down Expand Up @@ -118,18 +118,22 @@ class PlanStep(BaseModel, extra='allow'):
arguments: dict = Field(default={}, description="step参数")
thought: str = Field(default="", description="step思考结果")


class Plan(BaseModel, extra='allow'):
detail: str = Field(default="", description="计划详情")
steps: list[PlanStep] = Field(default=[], description="步骤列表")


class FunctionCall(BaseModel, extra='allow'):
thought: str = Field(default="", description="思考结果")
name: str = Field(default="", description="工具名")
arguments: dict = Field(default={}, description="参数列表")



class Json(BaseModel, extra='allow'):
data: str = Field(default="", description="json数据")


class Content(BaseModel):
name: str = Field(default="",
description="介绍当前yield内容的阶段名, 使用name的必要条件,是同一组件会输出不同type的content,并且需要加以区分,方便前端渲染与用户展示")
Expand All @@ -141,10 +145,10 @@ class Content(BaseModel):
description="大模型的token用量, ")
metrics: dict = Field(default={},
description="耗时、性能、内存等trace及debug所需信息")
type: str = Field(default="text",
type: str = Field(default="text",
description="代表event 类型,包括 text、code、files、urls、oral_text、references、image、chart、audio该字段的取值决定了下面text字段的内容结构")
text: Union[Text, Code, Files, Urls, OralText, References, Image, Chart, Audio, Plan, Json, FunctionCall] = Field(default=Text,
description="代表当前 event 元素的内容,每一种 event 对应的 text 结构固定")
text: Union[Text, Code, Files, Urls, OralText, References, Image, Chart, Audio, Plan, Json, FunctionCall] = Field(default=Text,
description="代表当前 event 元素的内容,每一种 event 对应的 text 结构固定")

@field_validator('text', mode='before')
def set_text(cls, v, values, **kwargs):
Expand Down Expand Up @@ -180,7 +184,7 @@ class ComponentOutput(BaseModel):
role: str = Field(default="tool",
description="role是区分当前消息来源的重要字段,对于绝大多数组件而言,都是填写tool,标明role所在的消息来源为组件。部分思考及问答组件,role需要填写为assistant")
content: list[Content] = Field(default=[],
description="content是当前组件返回内容的主要payload,List[Content],每个Content Dict 包括了当前输出的一个元素")
description="content是当前组件返回内容的主要payload,List[Content],每个Content Dict 包括了当前输出的一个元素")


class Component:
Expand All @@ -202,6 +206,7 @@ def __init__(
secret_key: Optional[str] = None,
gateway: str = "",
lazy_certification: bool = False,
is_aysnc: bool = False,
**kwargs
):
r"""Component初始化方法.
Expand All @@ -219,6 +224,7 @@ def __init__(
self.gateway = gateway
self._http_client = None
self.lazy_certification = lazy_certification
self.is_async = is_aysnc
if not self.lazy_certification:
self.set_secret_key_and_gateway(self.secret_key, self.gateway)

Expand All @@ -236,7 +242,10 @@ def set_secret_key_and_gateway(self, secret_key: Optional[str] = None, gateway:
"""
self.secret_key = secret_key
self.gateway = gateway
self._http_client = HTTPClient(self.secret_key, self.gateway)
if self.is_async:
self._http_client = AsyncHTTPClient(self.secret_key, self.gateway)
else:
self._http_client = HTTPClient(self.secret_key, self.gateway)

@property
def http_client(self):
Expand All @@ -251,7 +260,11 @@ def http_client(self):
"""
if self._http_client is None:
self._http_client = HTTPClient(self.secret_key, self.gateway)
if self.is_async:
self._http_client = AsyncHTTPClient(
self.secret_key, self.gateway)
else:
self._http_client = HTTPClient(self.secret_key, self.gateway)
return self._http_client

def __call__(self, *inputs, **kwargs):
Expand Down Expand Up @@ -521,7 +534,8 @@ def create_output(cls, type, text, role="tool", name="", visible_scope="all", ra
elif type == "json":
text = {"data": text}
else:
raise ValueError("Only when type=text/code/urls/oral_text, string text is allowed! Please give dict text")
raise ValueError(
"Only when type=text/code/urls/oral_text, string text is allowed! Please give dict text")
elif isinstance(text, dict):
if type == "text":
key_list = ["info"]
Expand All @@ -534,7 +548,8 @@ def create_output(cls, type, text, role="tool", name="", visible_scope="all", ra
elif type == "files":
key_list = ["filename", "url"]
elif type == "references":
key_list = ["type", "resource_type", "icon", "site_name", "source", "doc_id", "title", "content", "image_content", "image_url", "video_url"]
key_list = ["type", "resource_type", "icon", "site_name", "source",
"doc_id", "title", "content", "image_content", "image_url", "video_url"]
elif type == "image":
key_list = ["filename", "url"]
elif type == "chart":
Expand All @@ -551,7 +566,8 @@ def create_output(cls, type, text, role="tool", name="", visible_scope="all", ra
else:
raise ValueError("text must be str or dict")

assert role in ["tool", "assistant"], "role must be 'tool' or 'assistant'"
assert role in [
"tool", "assistant"], "role must be 'tool' or 'assistant'"
result = {
"role": role,
"content": [{
Expand All @@ -564,4 +580,4 @@ def create_output(cls, type, text, role="tool", name="", visible_scope="all", ra
"metrics": metrics
}]
}
return ComponentOutput(**result)
return ComponentOutput(**result)
Loading

0 comments on commit 9a95c5e

Please sign in to comment.