Skip to content

Commit

Permalink
feat: Support HTTP sender (#1383)
Browse files Browse the repository at this point in the history
  • Loading branch information
fangyinc authored Apr 8, 2024
1 parent df36b94 commit 634e62c
Show file tree
Hide file tree
Showing 7 changed files with 430 additions and 4 deletions.
14 changes: 14 additions & 0 deletions dbgpt/cli/cli_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ def run():
pass


@click.group()
def net():
"""Net tools."""
pass


stop_all_func_list = []


Expand All @@ -100,6 +106,7 @@ def stop_all():
cli.add_command(app)
cli.add_command(repo)
cli.add_command(run)
cli.add_command(net)
add_command_alias(stop_all, name="all", parent_group=stop)

try:
Expand Down Expand Up @@ -200,6 +207,13 @@ def stop_all():
except ImportError as e:
logging.warning(f"Integrating dbgpt client command line tool failed: {e}")

try:
from dbgpt.util.network._cli import start_forward

add_command_alias(start_forward, name="forward", parent_group=net)
except ImportError as e:
logging.warning(f"Integrating dbgpt net command line tool failed: {e}")


def main():
return cli()
Expand Down
2 changes: 2 additions & 0 deletions dbgpt/core/awel/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def __init__(self, label: str, description: str):

_OPERATOR_CATEGORY_DETAIL = {
"trigger": _CategoryDetail("Trigger", "Trigger your AWEL flow"),
"sender": _CategoryDetail("Sender", "Send the data to the target"),
"llm": _CategoryDetail("LLM", "Invoke LLM model"),
"conversion": _CategoryDetail("Conversion", "Handle the conversion"),
"output_parser": _CategoryDetail("Output Parser", "Parse the output of LLM model"),
Expand All @@ -121,6 +122,7 @@ class OperatorCategory(str, Enum):
"""The category of the operator."""

TRIGGER = "trigger"
SENDER = "sender"
LLM = "llm"
CONVERSION = "conversion"
OUTPUT_PARSER = "output_parser"
Expand Down
3 changes: 2 additions & 1 deletion dbgpt/core/awel/flow/flow_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .exceptions import (
FlowClassMetadataException,
FlowDAGMetadataException,
FlowException,
FlowMetadataException,
)

Expand Down Expand Up @@ -720,5 +721,5 @@ def fill_flow_panel(flow_panel: FlowPanel):
param.default = new_param.default
param.placeholder = new_param.placeholder

except ValueError as e:
except (FlowException, ValueError) as e:
logger.warning(f"Unable to fill the flow panel: {e}")
124 changes: 122 additions & 2 deletions dbgpt/core/awel/trigger/ext_http_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
Supports more trigger types, such as RequestHttpTrigger.
"""
from enum import Enum
from typing import List, Optional, Type, Union
from typing import Dict, List, Optional, Type, Union

from starlette.requests import Request

from dbgpt.util.i18n_utils import _

from ..flow import IOField, OperatorCategory, OperatorType, ViewMetadata
from ..flow import IOField, OperatorCategory, OperatorType, Parameter, ViewMetadata
from ..operators.common_operator import MapOperator
from .http_trigger import (
_PARAMETER_ENDPOINT,
_PARAMETER_MEDIA_TYPE,
Expand Down Expand Up @@ -82,3 +83,122 @@ def __init__(
register_to_app=True,
**kwargs,
)


class DictHTTPSender(MapOperator[Dict, Dict]):
"""HTTP Sender operator for AWEL."""

metadata = ViewMetadata(
label=_("HTTP Sender"),
name="awel_dict_http_sender",
category=OperatorCategory.SENDER,
description=_("Send a HTTP request to a specified endpoint"),
inputs=[
IOField.build_from(
_("Request Body"),
"request_body",
dict,
description=_("The request body to send"),
)
],
outputs=[
IOField.build_from(
_("Response Body"),
"response_body",
dict,
description=_("The response body of the HTTP request"),
)
],
parameters=[
Parameter.build_from(
_("HTTP Address"),
_("address"),
type=str,
description=_("The address to send the HTTP request to"),
),
_PARAMETER_METHODS_ALL.new(),
_PARAMETER_STATUS_CODE.new(),
Parameter.build_from(
_("Timeout"),
"timeout",
type=int,
optional=True,
default=60,
description=_("The timeout of the HTTP request in seconds"),
),
Parameter.build_from(
_("Token"),
"token",
type=str,
optional=True,
default=None,
description=_("The token to use for the HTTP request"),
),
Parameter.build_from(
_("Cookies"),
"cookies",
type=str,
optional=True,
default=None,
description=_("The cookies to use for the HTTP request"),
),
],
)

def __init__(
self,
address: str,
methods: Optional[str] = "GET",
status_code: Optional[int] = 200,
timeout: Optional[int] = 60,
token: Optional[str] = None,
cookies: Optional[Dict[str, str]] = None,
**kwargs,
):
"""Initialize a HTTPSender."""
try:
import aiohttp # noqa: F401
except ImportError:
raise ImportError(
"aiohttp is required for HTTPSender, please install it with "
"`pip install aiohttp`"
)
self._address = address
self._methods = methods
self._status_code = status_code
self._timeout = timeout
self._token = token
self._cookies = cookies
super().__init__(**kwargs)

async def map(self, request_body: Dict) -> Dict:
"""Send the request body to the specified address."""
import aiohttp

if self._methods in ["POST", "PUT"]:
req_kwargs = {"json": request_body}
else:
req_kwargs = {"params": request_body}
method = self._methods or "GET"

headers = {}
if self._token:
headers["Authorization"] = f"Bearer {self._token}"
async with aiohttp.ClientSession(
headers=headers,
cookies=self._cookies,
timeout=aiohttp.ClientTimeout(total=self._timeout),
) as session:
async with session.request(
method,
self._address,
raise_for_status=False,
**req_kwargs,
) as response:
status_code = response.status
if status_code != self._status_code:
raise ValueError(
f"HTTP request failed with status code {status_code}"
)
response_body = await response.json()
return response_body
2 changes: 1 addition & 1 deletion dbgpt/core/awel/trigger/http_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ async def map(self, request_body: CommonLLMHttpRequestBody) -> Dict[str, Any]:
keys = self._key.split(".")
for k in keys:
dict_value = dict_value[k]
if isinstance(dict_value, dict):
if not isinstance(dict_value, dict):
raise ValueError(
f"Prefix key {self._key} is not a valid key of the request body"
)
Expand Down
Empty file added dbgpt/util/network/__init__.py
Empty file.
Loading

0 comments on commit 634e62c

Please sign in to comment.