Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:Add Knowledge Process Workflow #2210

Merged
merged 4 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions dbgpt/app/operators/rag.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,12 @@ async def map(self, context: HOContextBody) -> List[Chunk]:
return chunks

metadata = ViewMetadata(
label=_("Knowledge Operator"),
label=_("Knowledge Space Operator"),
Aries-ckt marked this conversation as resolved.
Show resolved Hide resolved
name="higher_order_knowledge_operator",
category=OperatorCategory.RAG,
description=_(
_(
"Knowledge Operator, retrieve your knowledge(documents) from knowledge"
" space"
"Knowledge Space Operator, retrieve your knowledge from knowledge space"
)
),
parameters=[
Expand Down
5 changes: 5 additions & 0 deletions dbgpt/core/awel/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ class OperatorType(str, Enum):
"embeddings": _CategoryDetail("Embeddings", "The embeddings resource"),
"rag": _CategoryDetail("RAG", "The resource"),
"vector_store": _CategoryDetail("Vector Store", "The vector store resource"),
"knowledge_graph": _CategoryDetail(
"Knowledge Graph", "The knowledge graph resource"
),
"database": _CategoryDetail("Database", "Interact with the database"),
"example": _CategoryDetail("Example", "The example resource"),
}
Expand All @@ -231,6 +234,8 @@ class ResourceCategory(str, Enum):
EMBEDDINGS = "embeddings"
RAG = "rag"
VECTOR_STORE = "vector_store"
KNOWLEDGE_GRAPH = "knowledge_graph"
FULL_TEXT = "full_text"
DATABASE = "database"
EXAMPLE = "example"

Expand Down
13 changes: 12 additions & 1 deletion dbgpt/rag/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Module for RAG operators."""

from .chunk_manager import ChunkManagerOperator # noqa: F401
from .datasource import DatasourceRetrieverOperator # noqa: F401
from .db_schema import ( # noqa: F401
DBSchemaAssemblerOperator,
Expand All @@ -10,21 +10,32 @@
EmbeddingRetrieverOperator,
)
from .evaluation import RetrieverEvaluatorOperator # noqa: F401
from .full_text import FullTextStorageOperator # noqa: F401
from .knowledge import ChunksToStringOperator, KnowledgeOperator # noqa: F401
from .knowledge_graph import KnowledgeGraphOperator # noqa: F401
from .process_branch import KnowledgeProcessBranchOperator # noqa: F401
from .process_branch import KnowledgeProcessJoinOperator
from .rerank import RerankOperator # noqa: F401
from .rewrite import QueryRewriteOperator # noqa: F401
from .summary import SummaryAssemblerOperator # noqa: F401
from .vector_store import VectorStorageOperator # noqa: F401

__all__ = [
"ChunkManagerOperator",
"DatasourceRetrieverOperator",
"DBSchemaRetrieverOperator",
"DBSchemaAssemblerOperator",
"EmbeddingRetrieverOperator",
"EmbeddingAssemblerOperator",
"FullTextStorageOperator",
"KnowledgeOperator",
"KnowledgeGraphOperator",
"KnowledgeProcessBranchOperator",
"KnowledgeProcessJoinOperator",
"ChunksToStringOperator",
"RerankOperator",
"QueryRewriteOperator",
"SummaryAssemblerOperator",
"RetrieverEvaluatorOperator",
"VectorStorageOperator",
]
68 changes: 68 additions & 0 deletions dbgpt/rag/operators/chunk_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""Chunk Manager Operator."""
from typing import List, Optional

from dbgpt.core import Chunk
from dbgpt.core.awel import MapOperator
from dbgpt.core.awel.flow import IOField, OperatorCategory, Parameter, ViewMetadata
from dbgpt.rag import ChunkParameters
from dbgpt.rag.chunk_manager import ChunkManager
from dbgpt.rag.knowledge.base import Knowledge
from dbgpt.util.i18n_utils import _


class ChunkManagerOperator(MapOperator[Knowledge, List[Chunk]]):
"""Chunk Manager Operator."""

metadata = ViewMetadata(
label=_("Chunk Manager Operator"),
name="chunk_manager_operator",
description=_(" Split Knowledge Documents into chunks."),
category=OperatorCategory.RAG,
parameters=[
Parameter.build_from(
_("Chunk Split Parameters"),
"chunk_parameters",
ChunkParameters,
description=_("Chunk Split Parameters."),
optional=True,
default=None,
alias=["chunk_parameters"],
),
],
inputs=[
IOField.build_from(
_("Knowledge"),
"knowledge",
Knowledge,
description=_("The knowledge to be loaded."),
)
],
outputs=[
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_("The split chunks by chunk manager."),
is_list=True,
)
],
)

def __init__(
self,
chunk_parameters: Optional[ChunkParameters] = None,
**kwargs,
):
"""Init the datasource operator."""
MapOperator.__init__(self, **kwargs)
self._chunk_parameters = chunk_parameters or ChunkParameters(
chunk_strategy="Automatic"
)

async def map(self, knowledge: Knowledge) -> List[Chunk]:
"""Persist chunks in vector db."""
documents = knowledge.load()
chunk_manager = ChunkManager(
knowledge=knowledge, chunk_parameter=self._chunk_parameters
)
return chunk_manager.split(documents)
74 changes: 74 additions & 0 deletions dbgpt/rag/operators/full_text.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Full Text Operator."""
import os
from typing import List, Optional

from dbgpt.core import Chunk
from dbgpt.core.awel import MapOperator
from dbgpt.core.awel.flow import IOField, OperatorCategory, Parameter, ViewMetadata
from dbgpt.storage.full_text.base import FullTextStoreBase
from dbgpt.util.i18n_utils import _


class FullTextStorageOperator(MapOperator[List[Chunk], List[Chunk]]):
"""Full Text Operator."""

metadata = ViewMetadata(
label=_("Full Text Storage Operator"),
name="full text_storage_operator",
description=_("Persist embeddings into full text storage."),
category=OperatorCategory.RAG,
parameters=[
Parameter.build_from(
_("Full Text Connector"),
"full_text_store",
FullTextStoreBase,
description=_("The full text store."),
alias=["full_text_store"],
),
],
inputs=[
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_("The text split chunks by chunk manager."),
is_list=True,
)
],
outputs=[
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_(
"The assembled chunks, it has been persisted to full text " "store."
),
is_list=True,
)
],
)

def __init__(
self,
full_text_store: Optional[FullTextStoreBase] = None,
max_chunks_once_load: Optional[int] = None,
**kwargs,
):
"""Init the datasource operator."""
MapOperator.__init__(self, **kwargs)
self._full_text_store = full_text_store
self._embeddings = full_text_store.get_config().embedding_fn
self._max_chunks_once_load = max_chunks_once_load
self.full_text_store = full_text_store

async def map(self, chunks: List[Chunk]) -> List[Chunk]:
"""Persist chunks in full text db."""
max_chunks_once_load = self._max_chunks_once_load or int(
os.getenv("KNOWLEDGE_MAX_CHUNKS_ONCE_LOAD", 10)
)
full_text_ids = await self._full_text_store.aload_document_with_limit(
chunks, max_chunks_once_load
)
for chunk, full_text_id in zip(chunks, full_text_ids):
chunk.chunk_id = str(full_text_id)
return chunks
74 changes: 74 additions & 0 deletions dbgpt/rag/operators/knowledge_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Knowledge Graph Operator."""
import os
from typing import List, Optional

from dbgpt.core import Chunk
from dbgpt.core.awel import MapOperator
from dbgpt.core.awel.flow import IOField, OperatorCategory, Parameter, ViewMetadata
from dbgpt.storage.knowledge_graph.base import KnowledgeGraphBase
from dbgpt.util.i18n_utils import _


class KnowledgeGraphOperator(MapOperator[List[Chunk], List[Chunk]]):
"""Knowledge Graph Operator."""

metadata = ViewMetadata(
label=_("Knowledge Graph Operator"),
name="knowledge_graph_operator",
description=_("Extract Documents and persist into graph database."),
category=OperatorCategory.RAG,
parameters=[
Parameter.build_from(
_("Knowledge Graph Connector"),
"graph_store",
KnowledgeGraphBase,
description=_("The knowledge graph."),
alias=["graph_store"],
),
],
inputs=[
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_("The text split chunks by chunk manager."),
is_list=True,
)
],
outputs=[
IOField.build_from(
_("Chunks"),
"chunks",
List[Chunk],
description=_(
"The assembled chunks, it has been persisted to graph store."
),
is_list=True,
)
],
)

def __init__(
self,
graph_store: Optional[KnowledgeGraphBase] = None,
max_chunks_once_load: Optional[int] = None,
**kwargs,
):
"""Init the Knowledge Graph operator."""
MapOperator.__init__(self, **kwargs)
self._graph_store = graph_store
self._embeddings = graph_store.get_config().embedding_fn
self._max_chunks_once_load = max_chunks_once_load
self.graph_store = graph_store

async def map(self, chunks: List[Chunk]) -> List[Chunk]:
"""Persist chunks in graph db."""
max_chunks_once_load = self._max_chunks_once_load or int(
os.getenv("KNOWLEDGE_MAX_CHUNKS_ONCE_LOAD", 10)
)
graph_ids = await self._graph_store.aload_document_with_limit(
chunks, max_chunks_once_load
)
for chunk, graph_id in zip(chunks, graph_ids):
chunk.chunk_id = str(graph_id)
return chunks
Loading
Loading