From f89e41174082b6eecde9699aeb4a8df17a609bfb Mon Sep 17 00:00:00 2001 From: Aries-ckt <916701291@qq.com> Date: Wed, 21 Feb 2024 10:24:12 +0800 Subject: [PATCH] feat:add rag awel operator view metadata. (#1174) --- dbgpt/core/awel/flow/base.py | 4 + dbgpt/core/awel/trigger/http_trigger.py | 51 ++++ .../core/interface/operators/llm_operator.py | 39 +++ dbgpt/rag/operators/knowledge.py | 72 +++++- dbgpt/rag/operators/rewrite.py | 54 ++++ dbgpt/rag/operators/summary.py | 65 +++++ dbgpt/serve/rag/operators/knowledge_space.py | 242 ++++++++++++++++++ examples/awel/simple_rag_summary_example.py | 2 +- examples/rag/simple_rag_embedding_example.py | 2 +- examples/rag/simple_rag_retriever_example.py | 2 +- 10 files changed, 527 insertions(+), 6 deletions(-) create mode 100644 dbgpt/serve/rag/operators/knowledge_space.py diff --git a/dbgpt/core/awel/flow/base.py b/dbgpt/core/awel/flow/base.py index 04a3b5c79..835b12632 100644 --- a/dbgpt/core/awel/flow/base.py +++ b/dbgpt/core/awel/flow/base.py @@ -112,6 +112,7 @@ def __init__(self, label: str, description: str): "output_parser": _CategoryDetail("Output Parser", "Parse the output of LLM model"), "common": _CategoryDetail("Common", "The common operator"), "agent": _CategoryDetail("Agent", "The agent operator"), + "rag": _CategoryDetail("RAG", "The RAG operator"), } @@ -124,6 +125,7 @@ class OperatorCategory(str, Enum): OUTPUT_PARSER = "output_parser" COMMON = "common" AGENT = "agent" + RAG = "rag" def label(self) -> str: """Get the label of the category.""" @@ -163,6 +165,7 @@ class OperatorType(str, Enum): "common": _CategoryDetail("Common", "The common resource"), "prompt": _CategoryDetail("Prompt", "The prompt resource"), "agent": _CategoryDetail("Agent", "The agent resource"), + "rag": _CategoryDetail("RAG", "The resource"), } @@ -176,6 +179,7 @@ class ResourceCategory(str, Enum): COMMON = "common" PROMPT = "prompt" AGENT = "agent" + RAG = "rag" def label(self) -> str: """Get the label of the category.""" diff --git a/dbgpt/core/awel/trigger/http_trigger.py b/dbgpt/core/awel/trigger/http_trigger.py index 75fbdb955..03c6edf95 100644 --- a/dbgpt/core/awel/trigger/http_trigger.py +++ b/dbgpt/core/awel/trigger/http_trigger.py @@ -1031,3 +1031,54 @@ def __init__(self, key: str = "user_input", **kwargs): async def map(self, request_body: CommonLLMHttpRequestBody) -> Dict[str, Any]: """Map the request body to response body.""" return {self._key: request_body.messages} + + +class RequestedParsedOperator(MapOperator[CommonLLMHttpRequestBody, str]): + """User input parsed operator.""" + + metadata = ViewMetadata( + label="Request Body Parsed To String Operator", + name="request_body_to_str__parsed_operator", + category=OperatorCategory.COMMON, + parameters=[ + Parameter.build_from( + "Key", + "key", + str, + optional=True, + default="", + description="The key of the dict, link 'user_input'", + ) + ], + inputs=[ + IOField.build_from( + "Request Body", + "request_body", + CommonLLMHttpRequestBody, + description="The request body of the API endpoint", + ) + ], + outputs=[ + IOField.build_from( + "User Input String", + "user_input_str", + str, + description="The user input dict of the API endpoint", + ) + ], + description="User input parsed operator", + ) + + def __init__(self, key: str = "user_input", **kwargs): + """Initialize a UserInputParsedOperator.""" + self._key = key + super().__init__(**kwargs) + + async def map(self, request_body: CommonLLMHttpRequestBody) -> str: + """Map the request body to response body.""" + dict_value = request_body.dict() + if not self._key or self._key not in dict_value: + raise ValueError( + f"Prefix key {self._key} is not a valid key of the request body" + ) + return dict_value[self._key] diff --git a/dbgpt/core/interface/operators/llm_operator.py b/dbgpt/core/interface/operators/llm_operator.py index a12624fe8..eb0c2a101 100644 --- a/dbgpt/core/interface/operators/llm_operator.py +++ b/dbgpt/core/interface/operators/llm_operator.py @@ -457,3 +457,42 @@ async def transform_stream(self, output_iter: AsyncIterator[ModelOutput]): decoded_unicode = model_output.text.replace("\ufffd", "") msg = decoded_unicode.replace("\n", "\\n") yield f"data:{msg}\n\n" + + +class StringOutput2ModelOutputOperator(MapOperator[str, ModelOutput]): + """Map String to ModelOutput.""" + + metadata = ViewMetadata( + label="Map String to ModelOutput", + name="string_2_model_output_operator", + category=OperatorCategory.COMMON, + description="Map String to ModelOutput.", + parameters=[], + inputs=[ + IOField.build_from( + "String", + "input_value", + str, + description="The input value of the operator.", + ), + ], + outputs=[ + IOField.build_from( + "Model Output", + "input_value", + ModelOutput, + description="The input value of the operator.", + ), + ], + ) + + def __int__(self, **kwargs): + """Create a new operator.""" + super().__init__(**kwargs) + + async def map(self, input_value: str) -> ModelOutput: + """Map the model output to the common response body.""" + return ModelOutput( + text=input_value, + error_code=500, + ) diff --git a/dbgpt/rag/operators/knowledge.py b/dbgpt/rag/operators/knowledge.py index 02de6a3e2..e7e74a19c 100644 --- a/dbgpt/rag/operators/knowledge.py +++ b/dbgpt/rag/operators/knowledge.py @@ -1,26 +1,92 @@ from typing import Any, List, Optional from dbgpt.core.awel import MapOperator +from dbgpt.core.awel.flow import ( + IOField, + OperatorCategory, + OptionValue, + Parameter, + ViewMetadata, +) from dbgpt.core.awel.task.base import IN from dbgpt.rag.knowledge.base import Knowledge, KnowledgeType from dbgpt.rag.knowledge.factory import KnowledgeFactory class KnowledgeOperator(MapOperator[Any, Any]): - """Knowledge Operator.""" + """Knowledge Factory Operator.""" + + metadata = ViewMetadata( + label="Knowledge Factory Operator", + name="knowledge_operator", + category=OperatorCategory.RAG, + description="The knowledge operator.", + inputs=[ + IOField.build_from( + "knowledge datasource", + "knowledge datasource", + dict, + "knowledge datasource", + ) + ], + outputs=[ + IOField.build_from( + "Knowledge", + "Knowledge", + Knowledge, + description="Knowledge", + ) + ], + parameters=[ + Parameter.build_from( + label="datasource", + name="datasource", + type=str, + optional=True, + default="DOCUMENT", + description="datasource", + ), + Parameter.build_from( + label="knowledge_type", + name="knowledge type", + type=str, + optional=True, + options=[ + OptionValue( + label="DOCUMENT", + name="DOCUMENT", + value=KnowledgeType.DOCUMENT.name, + ), + OptionValue(label="URL", name="URL", value=KnowledgeType.URL.name), + OptionValue( + label="TEXT", name="TEXT", value=KnowledgeType.TEXT.name + ), + ], + default=KnowledgeType.DOCUMENT.name, + description="knowledge type", + ), + ], + documentation_url="https://github.com/openai/openai-python", + ) def __init__( - self, knowledge_type: Optional[KnowledgeType] = KnowledgeType.DOCUMENT, **kwargs + self, + datasource: Optional[str] = None, + knowledge_type: Optional[str] = KnowledgeType.DOCUMENT.name, + **kwargs ): """Init the query rewrite operator. Args: knowledge_type: (Optional[KnowledgeType]) The knowledge type. """ super().__init__(**kwargs) - self._knowledge_type = knowledge_type + self._datasource = datasource + self._knowledge_type = KnowledgeType.get_by_value(knowledge_type) async def map(self, datasource: IN) -> Knowledge: """knowledge operator.""" + if self._datasource: + datasource = self._datasource return await self.blocking_func_to_async( KnowledgeFactory.create, datasource, self._knowledge_type ) diff --git a/dbgpt/rag/operators/rewrite.py b/dbgpt/rag/operators/rewrite.py index bade2677a..d911c0b0a 100644 --- a/dbgpt/rag/operators/rewrite.py +++ b/dbgpt/rag/operators/rewrite.py @@ -2,6 +2,7 @@ from dbgpt.core import LLMClient from dbgpt.core.awel import MapOperator +from dbgpt.core.awel.flow import IOField, OperatorCategory, Parameter, ViewMetadata from dbgpt.core.awel.task.base import IN from dbgpt.rag.retriever.rewrite import QueryRewrite @@ -9,6 +10,59 @@ class QueryRewriteOperator(MapOperator[Any, Any]): """The Rewrite Operator.""" + metadata = ViewMetadata( + label="Query Rewrite Operator", + name="query_rewrite_operator", + category=OperatorCategory.RAG, + description="query rewrite operator.", + inputs=[ + IOField.build_from("query_context", "query_context", dict, "query context") + ], + outputs=[ + IOField.build_from( + "rewritten queries", + "queries", + List[str], + description="rewritten queries", + ) + ], + parameters=[ + Parameter.build_from( + "LLM Client", + "llm_client", + LLMClient, + optional=True, + default=None, + description="The LLM Client.", + ), + Parameter.build_from( + label="model name", + name="model_name", + type=str, + optional=True, + default="gpt-3.5-turbo", + description="llm model name", + ), + Parameter.build_from( + label="prompt language", + name="language", + type=str, + optional=True, + default="en", + description="prompt language", + ), + Parameter.build_from( + label="nums", + name="nums", + type=int, + optional=True, + default=5, + description="rewrite query nums", + ), + ], + documentation_url="https://github.com/openai/openai-python", + ) + def __init__( self, llm_client: Optional[LLMClient], diff --git a/dbgpt/rag/operators/summary.py b/dbgpt/rag/operators/summary.py index fefee07fc..4f9ce0ae6 100644 --- a/dbgpt/rag/operators/summary.py +++ b/dbgpt/rag/operators/summary.py @@ -1,12 +1,77 @@ from typing import Any, Optional from dbgpt.core import LLMClient +from dbgpt.core.awel.flow import IOField, OperatorCategory, Parameter, ViewMetadata from dbgpt.core.awel.task.base import IN +from dbgpt.rag.knowledge.base import Knowledge from dbgpt.serve.rag.assembler.summary import SummaryAssembler from dbgpt.serve.rag.operators.base import AssemblerOperator class SummaryAssemblerOperator(AssemblerOperator[Any, Any]): + metadata = ViewMetadata( + label="Summary Operator", + name="summary_assembler_operator", + category=OperatorCategory.RAG, + description="The summary assembler operator.", + inputs=[ + IOField.build_from( + "Knowledge", "knowledge", Knowledge, "knowledge datasource" + ) + ], + outputs=[ + IOField.build_from( + "document summary", + "summary", + str, + description="document summary", + ) + ], + parameters=[ + Parameter.build_from( + "LLM Client", + "llm_client", + LLMClient, + optional=True, + default=None, + description="The LLM Client.", + ), + Parameter.build_from( + label="model name", + name="model_name", + type=str, + optional=True, + default="gpt-3.5-turbo", + description="llm model name", + ), + Parameter.build_from( + label="prompt language", + name="language", + type=str, + optional=True, + default="en", + description="prompt language", + ), + Parameter.build_from( + label="max_iteration_with_llm", + name="max_iteration_with_llm", + type=int, + optional=True, + default=5, + description="prompt language", + ), + Parameter.build_from( + label="concurrency_limit_with_llm", + name="concurrency_limit_with_llm", + type=int, + optional=True, + default=3, + description="The concurrency limit with llm", + ), + ], + documentation_url="https://github.com/openai/openai-python", + ) + def __init__( self, llm_client: Optional[LLMClient], diff --git a/dbgpt/serve/rag/operators/knowledge_space.py b/dbgpt/serve/rag/operators/knowledge_space.py new file mode 100644 index 000000000..b1ea66988 --- /dev/null +++ b/dbgpt/serve/rag/operators/knowledge_space.py @@ -0,0 +1,242 @@ +from functools import reduce +from typing import List, Optional + +from dbgpt.app.knowledge.api import knowledge_space_service +from dbgpt.app.knowledge.request.request import KnowledgeSpaceRequest +from dbgpt.app.knowledge.service import CFG, KnowledgeService +from dbgpt.configs.model_config import EMBEDDING_MODEL_CONFIG +from dbgpt.core import ( + BaseMessage, + ChatPromptTemplate, + HumanPromptTemplate, + ModelMessage, +) +from dbgpt.core.awel import JoinOperator, MapOperator +from dbgpt.core.awel.flow import ( + IOField, + OperatorCategory, + OperatorType, + OptionValue, + Parameter, + ViewMetadata, +) +from dbgpt.core.awel.task.base import IN, OUT +from dbgpt.core.interface.operators.prompt_operator import BasePromptBuilderOperator +from dbgpt.rag.embedding.embedding_factory import EmbeddingFactory +from dbgpt.rag.retriever.embedding import EmbeddingRetriever +from dbgpt.storage.vector_store.base import VectorStoreConfig +from dbgpt.storage.vector_store.connector import VectorStoreConnector +from dbgpt.util.function_utils import rearrange_args_by_type + + +class SpaceRetrieverOperator(MapOperator[IN, OUT]): + """knowledge space retriever operator.""" + + metadata = ViewMetadata( + label="Knowledge Space Operator", + name="space_operator", + category=OperatorCategory.RAG, + description="knowledge space retriever operator.", + inputs=[IOField.build_from("query", "query", str, "user query")], + outputs=[ + IOField.build_from( + "related chunk content", + "related chunk content", + List, + description="related chunk content", + ) + ], + parameters=[ + Parameter.build_from( + "Space Name", + "space_name", + str, + options=[ + OptionValue(label=space.name, name=space.name, value=space.name) + for space in knowledge_space_service.get_knowledge_space( + KnowledgeSpaceRequest() + ) + ], + optional=False, + default=None, + description="space name.", + ) + ], + documentation_url="https://github.com/openai/openai-python", + ) + + def __init__(self, space_name: str, recall_score: Optional[float] = 0.3, **kwargs): + """ + Args: + space_name (str): The space name. + recall_score (Optional[float], optional): The recall score. Defaults to 0.3. + """ + self._space_name = space_name + self._recall_score = recall_score + self._service = KnowledgeService() + embedding_factory = CFG.SYSTEM_APP.get_component( + "embedding_factory", EmbeddingFactory + ) + embedding_fn = embedding_factory.create( + model_name=EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL] + ) + config = VectorStoreConfig(name=self._space_name, embedding_fn=embedding_fn) + self._vector_store_connector = VectorStoreConnector( + vector_store_type=CFG.VECTOR_STORE_TYPE, + vector_store_config=config, + ) + + super().__init__(**kwargs) + + async def map(self, query: IN) -> OUT: + """Map input value to output value. + + Args: + input_value (IN): The input value. + + Returns: + OUT: The output value. + """ + space_context = self._service.get_space_context(self._space_name) + top_k = ( + CFG.KNOWLEDGE_SEARCH_TOP_SIZE + if space_context is None + else int(space_context["embedding"]["topk"]) + ) + recall_score = ( + CFG.KNOWLEDGE_SEARCH_RECALL_SCORE + if space_context is None + else float(space_context["embedding"]["recall_score"]) + ) + embedding_retriever = EmbeddingRetriever( + top_k=top_k, + vector_store_connector=self._vector_store_connector, + ) + if isinstance(query, str): + candidates = await embedding_retriever.aretrieve_with_scores( + query, recall_score + ) + elif isinstance(query, list): + candidates = [ + await embedding_retriever.aretrieve_with_scores(q, recall_score) + for q in query + ] + candidates = reduce(lambda x, y: x + y, candidates) + return [candidate.content for candidate in candidates] + + +class KnowledgeSpacePromptBuilderOperator( + BasePromptBuilderOperator, JoinOperator[List[ModelMessage]] +): + """The operator to build the prompt with static prompt. + + The prompt will pass to this operator. + """ + + metadata = ViewMetadata( + label="Knowledge Space Prompt Builder Operator", + name="knowledge_space_prompt_builder_operator", + description="Build messages from prompt template and chat history.", + operator_type=OperatorType.JOIN, + category=OperatorCategory.CONVERSION, + parameters=[ + Parameter.build_from( + "Chat Prompt Template", + "prompt", + ChatPromptTemplate, + description="The chat prompt template.", + ), + Parameter.build_from( + "History Key", + "history_key", + str, + optional=True, + default="chat_history", + description="The key of history in prompt dict.", + ), + Parameter.build_from( + "String History", + "str_history", + bool, + optional=True, + default=False, + description="Whether to convert the history to string.", + ), + ], + inputs=[ + IOField.build_from( + "user input", + "user_input", + str, + is_list=False, + description="user input", + ), + IOField.build_from( + "space related context", + "related_context", + List, + is_list=False, + description="context of knowledge space.", + ), + IOField.build_from( + "History", + "history", + BaseMessage, + is_list=True, + description="The history.", + ), + ], + outputs=[ + IOField.build_from( + "Formatted Messages", + "formatted_messages", + ModelMessage, + is_list=True, + description="The formatted messages.", + ) + ], + ) + + def __init__( + self, + prompt: ChatPromptTemplate, + history_key: str = "chat_history", + check_storage: bool = True, + str_history: bool = False, + **kwargs, + ): + """Create a new history dynamic prompt builder operator. + Args: + + prompt (ChatPromptTemplate): The chat prompt template. + history_key (str, optional): The key of history in prompt dict. Defaults to "chat_history". + check_storage (bool, optional): Whether to check the storage. Defaults to True. + str_history (bool, optional): Whether to convert the history to string. Defaults to False. + """ + + self._prompt = prompt + self._history_key = history_key + self._str_history = str_history + BasePromptBuilderOperator.__init__(self, check_storage=check_storage) + JoinOperator.__init__(self, combine_function=self.merge_context, **kwargs) + + @rearrange_args_by_type + async def merge_context( + self, + user_input: str, + related_context: List[str], + history: Optional[List[BaseMessage]], + ) -> List[ModelMessage]: + """Merge the prompt and history.""" + prompt_dict = dict() + prompt_dict["context"] = related_context + for prompt in self._prompt.messages: + if isinstance(prompt, HumanPromptTemplate): + prompt_dict[prompt.input_variables[0]] = user_input + + if history: + if self._str_history: + prompt_dict[self._history_key] = BaseMessage.messages_to_string(history) + else: + prompt_dict[self._history_key] = history + return await self.format_prompt(self._prompt, prompt_dict) diff --git a/examples/awel/simple_rag_summary_example.py b/examples/awel/simple_rag_summary_example.py index adc3b54ad..eb958934e 100644 --- a/examples/awel/simple_rag_summary_example.py +++ b/examples/awel/simple_rag_summary_example.py @@ -59,7 +59,7 @@ async def map(self, input_value: TriggerReqBody) -> Dict: request_handle_task = RequestHandleOperator() path_operator = MapOperator(lambda request: request["url"]) # build knowledge operator - knowledge_operator = KnowledgeOperator(knowledge_type=KnowledgeType.URL) + knowledge_operator = KnowledgeOperator(knowledge_type=KnowledgeType.URL.name) # build summary assembler operator summary_operator = SummaryAssemblerOperator( llm_client=OpenAILLMClient(), language="en" diff --git a/examples/rag/simple_rag_embedding_example.py b/examples/rag/simple_rag_embedding_example.py index 96d47ccc8..86f248153 100644 --- a/examples/rag/simple_rag_embedding_example.py +++ b/examples/rag/simple_rag_embedding_example.py @@ -76,7 +76,7 @@ async def map(self, chunks: List) -> str: "/examples/rag/embedding", methods="POST", request_body=TriggerReqBody ) request_handle_task = RequestHandleOperator() - knowledge_operator = KnowledgeOperator(knowledge_type=KnowledgeType.URL) + knowledge_operator = KnowledgeOperator(knowledge_type=KnowledgeType.URL.name) vector_connector = _create_vector_connector() url_parser_operator = MapOperator(map_function=lambda x: x["url"]) embedding_operator = EmbeddingAssemblerOperator( diff --git a/examples/rag/simple_rag_retriever_example.py b/examples/rag/simple_rag_retriever_example.py index e04f4ed0c..b9c7ca97f 100644 --- a/examples/rag/simple_rag_retriever_example.py +++ b/examples/rag/simple_rag_retriever_example.py @@ -39,7 +39,7 @@ ..code-block:: shell DBGPT_SERVER="http://127.0.0.1:5555" curl -X POST $DBGPT_SERVER/api/v1/awel/trigger/examples/rag/retrieve \ - -H "Content-Type: application/json" -d '{ + -H "Content-Type: application/json" -d '{ \ "query": "what is awel talk about?" }' """