Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support HTTP sender #1383

Merged
merged 1 commit into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions dbgpt/cli/cli_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ def run():
pass


@click.group()
def net():
"""Net tools."""
pass


stop_all_func_list = []


Expand All @@ -100,6 +106,7 @@ def stop_all():
cli.add_command(app)
cli.add_command(repo)
cli.add_command(run)
cli.add_command(net)
add_command_alias(stop_all, name="all", parent_group=stop)

try:
Expand Down Expand Up @@ -200,6 +207,13 @@ def stop_all():
except ImportError as e:
logging.warning(f"Integrating dbgpt client command line tool failed: {e}")

try:
from dbgpt.util.network._cli import start_forward

add_command_alias(start_forward, name="forward", parent_group=net)
except ImportError as e:
logging.warning(f"Integrating dbgpt net command line tool failed: {e}")


def main():
return cli()
Expand Down
2 changes: 2 additions & 0 deletions dbgpt/core/awel/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def __init__(self, label: str, description: str):

_OPERATOR_CATEGORY_DETAIL = {
"trigger": _CategoryDetail("Trigger", "Trigger your AWEL flow"),
"sender": _CategoryDetail("Sender", "Send the data to the target"),
"llm": _CategoryDetail("LLM", "Invoke LLM model"),
"conversion": _CategoryDetail("Conversion", "Handle the conversion"),
"output_parser": _CategoryDetail("Output Parser", "Parse the output of LLM model"),
Expand All @@ -121,6 +122,7 @@ class OperatorCategory(str, Enum):
"""The category of the operator."""

TRIGGER = "trigger"
SENDER = "sender"
LLM = "llm"
CONVERSION = "conversion"
OUTPUT_PARSER = "output_parser"
Expand Down
3 changes: 2 additions & 1 deletion dbgpt/core/awel/flow/flow_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .exceptions import (
FlowClassMetadataException,
FlowDAGMetadataException,
FlowException,
FlowMetadataException,
)

Expand Down Expand Up @@ -720,5 +721,5 @@ def fill_flow_panel(flow_panel: FlowPanel):
param.default = new_param.default
param.placeholder = new_param.placeholder

except ValueError as e:
except (FlowException, ValueError) as e:
logger.warning(f"Unable to fill the flow panel: {e}")
124 changes: 122 additions & 2 deletions dbgpt/core/awel/trigger/ext_http_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
Supports more trigger types, such as RequestHttpTrigger.
"""
from enum import Enum
from typing import List, Optional, Type, Union
from typing import Dict, List, Optional, Type, Union

from starlette.requests import Request

from dbgpt.util.i18n_utils import _

from ..flow import IOField, OperatorCategory, OperatorType, ViewMetadata
from ..flow import IOField, OperatorCategory, OperatorType, Parameter, ViewMetadata
from ..operators.common_operator import MapOperator
from .http_trigger import (
_PARAMETER_ENDPOINT,
_PARAMETER_MEDIA_TYPE,
Expand Down Expand Up @@ -82,3 +83,122 @@ def __init__(
register_to_app=True,
**kwargs,
)


class DictHTTPSender(MapOperator[Dict, Dict]):
"""HTTP Sender operator for AWEL."""

metadata = ViewMetadata(
label=_("HTTP Sender"),
name="awel_dict_http_sender",
category=OperatorCategory.SENDER,
description=_("Send a HTTP request to a specified endpoint"),
inputs=[
IOField.build_from(
_("Request Body"),
"request_body",
dict,
description=_("The request body to send"),
)
],
outputs=[
IOField.build_from(
_("Response Body"),
"response_body",
dict,
description=_("The response body of the HTTP request"),
)
],
parameters=[
Parameter.build_from(
_("HTTP Address"),
_("address"),
type=str,
description=_("The address to send the HTTP request to"),
),
_PARAMETER_METHODS_ALL.new(),
_PARAMETER_STATUS_CODE.new(),
Parameter.build_from(
_("Timeout"),
"timeout",
type=int,
optional=True,
default=60,
description=_("The timeout of the HTTP request in seconds"),
),
Parameter.build_from(
_("Token"),
"token",
type=str,
optional=True,
default=None,
description=_("The token to use for the HTTP request"),
),
Parameter.build_from(
_("Cookies"),
"cookies",
type=str,
optional=True,
default=None,
description=_("The cookies to use for the HTTP request"),
),
],
)

def __init__(
self,
address: str,
methods: Optional[str] = "GET",
status_code: Optional[int] = 200,
timeout: Optional[int] = 60,
token: Optional[str] = None,
cookies: Optional[Dict[str, str]] = None,
**kwargs,
):
"""Initialize a HTTPSender."""
try:
import aiohttp # noqa: F401
except ImportError:
raise ImportError(
"aiohttp is required for HTTPSender, please install it with "
"`pip install aiohttp`"
)
self._address = address
self._methods = methods
self._status_code = status_code
self._timeout = timeout
self._token = token
self._cookies = cookies
super().__init__(**kwargs)

async def map(self, request_body: Dict) -> Dict:
"""Send the request body to the specified address."""
import aiohttp

if self._methods in ["POST", "PUT"]:
req_kwargs = {"json": request_body}
else:
req_kwargs = {"params": request_body}
method = self._methods or "GET"

headers = {}
if self._token:
headers["Authorization"] = f"Bearer {self._token}"
async with aiohttp.ClientSession(
headers=headers,
cookies=self._cookies,
timeout=aiohttp.ClientTimeout(total=self._timeout),
) as session:
async with session.request(
method,
self._address,
raise_for_status=False,
**req_kwargs,
) as response:
status_code = response.status
if status_code != self._status_code:
raise ValueError(
f"HTTP request failed with status code {status_code}"
)
response_body = await response.json()
return response_body
2 changes: 1 addition & 1 deletion dbgpt/core/awel/trigger/http_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ async def map(self, request_body: CommonLLMHttpRequestBody) -> Dict[str, Any]:
keys = self._key.split(".")
for k in keys:
dict_value = dict_value[k]
if isinstance(dict_value, dict):
if not isinstance(dict_value, dict):
raise ValueError(
f"Prefix key {self._key} is not a valid key of the request body"
)
Expand Down
Empty file added dbgpt/util/network/__init__.py
Empty file.
Loading
Loading