Skip to content

Commit

Permalink
feat(core): Fetch flow nodes API supports filterd by tags
Browse files Browse the repository at this point in the history
  • Loading branch information
fangyinc committed Aug 11, 2024
1 parent a348e6c commit a00a9d5
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 6 deletions.
37 changes: 34 additions & 3 deletions dbgpt/core/awel/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1145,9 +1145,40 @@ def get_registry_item(self, key: str) -> Optional[_RegistryItem]:
"""Get the registry item by the key."""
return self._registry.get(key)

def metadata_list(self):
"""Get the metadata list."""
return [item.metadata.to_dict() for item in self._registry.values()]
def metadata_list(
self,
tags: Optional[Dict[str, str]] = None,
user_name: Optional[str] = None,
sys_code: Optional[str] = None,
) -> List[Dict]:
"""Get the metadata list.
TODO: Support the user and system code filter.
Args:
tags (Optional[Dict[str, str]], optional): The tags. Defaults to None.
user_name (Optional[str], optional): The user name. Defaults to None.
sys_code (Optional[str], optional): The system code. Defaults to None.
Returns:
List[Dict]: The metadata list.
"""
if not tags:
return [item.metadata.to_dict() for item in self._registry.values()]
else:
results = []
for item in self._registry.values():
node_tags = item.metadata.tags
is_match = True
if not node_tags or not isinstance(node_tags, dict):
continue
for k, v in tags.items():
if node_tags.get(k) != v:
is_match = False
break
if is_match:
results.append(item.metadata.to_dict())
return results

async def refresh(
self,
Expand Down
2 changes: 2 additions & 0 deletions dbgpt/core/interface/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,8 @@ def is_variable_string(variable_str: str) -> bool:
Returns:
bool: True if the string is a variable string, False otherwise.
"""
if not variable_str or not isinstance(variable_str, str):
return False
if not _is_variable_format(variable_str):
return False
try:
Expand Down
29 changes: 26 additions & 3 deletions dbgpt/serve/flow/api/endpoints.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
from functools import cache
from typing import List, Literal, Optional, Union
from typing import Dict, List, Literal, Optional, Union

from fastapi import APIRouter, Depends, HTTPException, Query, Request
from fastapi.security.http import HTTPAuthorizationCredentials, HTTPBearer
Expand Down Expand Up @@ -229,16 +230,38 @@ async def query_page(


@router.get("/nodes", dependencies=[Depends(check_api_key)])
async def get_nodes():
async def get_nodes(
user_name: Optional[str] = Query(default=None, description="user name"),
sys_code: Optional[str] = Query(default=None, description="system code"),
tags: Optional[str] = Query(default=None, description="tags"),
):
"""Get the operator or resource nodes
Args:
user_name (Optional[str]): The username
sys_code (Optional[str]): The system code
tags (Optional[str]): The tags encoded in JSON format
Returns:
Result[List[Union[ViewMetadata, ResourceMetadata]]]:
The operator or resource nodes
"""
from dbgpt.core.awel.flow.base import _OPERATOR_REGISTRY

metadata_list = _OPERATOR_REGISTRY.metadata_list()
tags_dict: Optional[Dict[str, str]] = None
if tags:
try:
tags_dict = json.loads(tags)
except json.JSONDecodeError:
return Result.fail("Invalid JSON format for tags")

metadata_list = await blocking_func_to_async(
global_system_app,
_OPERATOR_REGISTRY.metadata_list,
tags_dict,
user_name,
sys_code,
)
return Result.succ(metadata_list)


Expand Down
36 changes: 36 additions & 0 deletions examples/awel/awel_flow_ui_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -881,3 +881,39 @@ async def map(self, user_name: str) -> str:
}
json_data = json.dumps(dict_dict, ensure_ascii=False)
return "Your name is %s, and your model info is %s." % (user_name, json_data)


class ExampleFlowTagsOperator(MapOperator[str, str]):
"""An example flow operator that includes a tags option."""

metadata = ViewMetadata(
label="Example Tags Operator",
name="example_tags_operator",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a tags",
parameters=[],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
),
],
outputs=[
IOField.build_from(
"Tags",
"tags",
str,
description="The tags to use.",
),
],
tags={"order": "higher-order", "type": "example"},
)

def __init__(self, **kwargs):
super().__init__(**kwargs)

async def map(self, user_name: str) -> str:
"""Map the user name to the tags."""
return "Your name is %s, and your tags are %s." % (user_name, "higher-order")

0 comments on commit a00a9d5

Please sign in to comment.