Skip to content

Commit e7f2adb

Browse files
authored
[Runtime] Simplify the vector store setup process (#231)
1 parent 23ff0e1 commit e7f2adb

File tree

9 files changed

+111
-208
lines changed

9 files changed

+111
-208
lines changed

api/src/main/java/org/apache/flink/agents/api/resource/ResourceType.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ public enum ResourceType {
2929
EMBEDDING_MODEL("embedding_model"),
3030
EMBEDDING_MODEL_CONNECTION("embedding_model_connection"),
3131
VECTOR_STORE("vector_store"),
32-
VECTOR_STORE_CONNECTION("vector_store_connection"),
3332
PROMPT("prompt"),
3433
TOOL("tool");
3534

python/flink_agents/api/decorators.py

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -173,26 +173,8 @@ def mcp_server(func: Callable) -> Callable:
173173
return func
174174

175175

176-
def vector_store_connection(func: Callable) -> Callable:
177-
"""Decorator for marking a function declaring a vector store connection.
178-
179-
Parameters
180-
----------
181-
func : Callable
182-
Function to be decorated.
183-
184-
Returns:
185-
-------
186-
Callable
187-
Decorator function that marks the target function declare a vector store
188-
connection.
189-
"""
190-
func._is_vector_store_connection = True
191-
return func
192-
193-
194-
def vector_store_setup(func: Callable) -> Callable:
195-
"""Decorator for marking a function declaring a vector store setup.
176+
def vector_store(func: Callable) -> Callable:
177+
"""Decorator for marking a function declaring a vector store.
196178
197179
Parameters
198180
----------
@@ -204,5 +186,5 @@ def vector_store_setup(func: Callable) -> Callable:
204186
Callable
205187
Decorator function that marks the target function declare a vector store.
206188
"""
207-
func._is_vector_store_setup = True
189+
func._is_vector_store = True
208190
return func

python/flink_agents/api/resource.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ class ResourceType(Enum):
3535
EMBEDDING_MODEL = "embedding_model"
3636
EMBEDDING_MODEL_CONNECTION = "embedding_model_connection"
3737
VECTOR_STORE = "vector_store"
38-
VECTOR_STORE_CONNECTION = "vector_store_connection"
3938
PROMPT = "prompt"
4039
MCP_SERVER = "mcp_server"
4140

python/flink_agents/api/vector_stores/vector_store.py

Lines changed: 20 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -110,43 +110,14 @@ def __str__(self) -> str:
110110
return f"QueryResult: {len(self.documents)} documents"
111111

112112

113-
class BaseVectorStoreConnection(Resource, ABC):
114-
"""Base abstract class for vector store connection.
113+
class BaseVectorStore(Resource, ABC):
114+
"""Base abstract class for vector store.
115115
116-
Manages connection configuration and provides raw vector search operations
117-
using pre-computed embeddings. One connection can be shared across multiple
118-
vector store setups.
116+
Provides vector store functionality that integrates embedding models
117+
for text-based semantic search. Handles both connection management and
118+
embedding generation internally.
119119
"""
120120

121-
@classmethod
122-
@override
123-
def resource_type(cls) -> ResourceType:
124-
"""Return resource type of class."""
125-
return ResourceType.VECTOR_STORE_CONNECTION
126-
127-
@abstractmethod
128-
def query(self, embedding: List[float], limit: int = 10, **kwargs: Any) -> List[Document]:
129-
"""Perform vector search using pre-computed embedding.
130-
131-
Args:
132-
embedding: Pre-computed embedding vector for semantic search
133-
limit: Maximum number of results to return (default: 10)
134-
**kwargs: Vector store-specific parameters (filters, distance metrics, etc.)
135-
136-
Returns:
137-
List of documents matching the search criteria
138-
"""
139-
140-
141-
class BaseVectorStoreSetup(Resource, ABC):
142-
"""Base abstract class for vector store setup.
143-
144-
Coordinates between vector store connections and embedding models to provide
145-
text-based semantic search. Automatically converts text queries to embeddings
146-
before delegating to the connection layer.
147-
"""
148-
149-
connection: str = Field(description="Name of the referenced connection.")
150121
embedding_model: str = Field(description="Name of the embedding model resource to use.")
151122

152123
@classmethod
@@ -181,19 +152,27 @@ def query(self, query: VectorStoreQuery) -> VectorStoreQueryResult:
181152
)
182153
query_embedding = embedding_model.embed(query.query_text)
183154

184-
# Get vector store connection resource
185-
connection = self.get_resource(
186-
self.connection, ResourceType.VECTOR_STORE_CONNECTION
187-
)
188-
189155
# Merge setup kwargs with query-specific args
190156
merged_kwargs = self.store_kwargs.copy()
191157
merged_kwargs.update(query.extra_args)
192158

193-
# Perform vector search
194-
documents = connection.query(query_embedding, query.limit, **merged_kwargs)
159+
# Perform vector search using the abstract method
160+
documents = self.query_embedding(query_embedding, query.limit, **merged_kwargs)
195161

196162
# Return structured result
197163
return VectorStoreQueryResult(
198164
documents=documents
199165
)
166+
167+
@abstractmethod
168+
def query_embedding(self, embedding: List[float], limit: int = 10, **kwargs: Any) -> List[Document]:
169+
"""Perform vector search using pre-computed embedding.
170+
171+
Args:
172+
embedding: Pre-computed embedding vector for semantic search
173+
limit: Maximum number of results to return (default: 10)
174+
**kwargs: Vector store-specific parameters (filters, distance metrics, etc.)
175+
176+
Returns:
177+
List of documents matching the search criteria
178+
"""

python/flink_agents/integrations/vector_stores/chroma/chroma_vector_store.py

Lines changed: 48 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,15 @@
2424
from pydantic import Field
2525

2626
from flink_agents.api.vector_stores.vector_store import (
27-
BaseVectorStoreConnection,
28-
BaseVectorStoreSetup,
27+
BaseVectorStore,
2928
Document,
3029
)
3130

3231
DEFAULT_COLLECTION = "flink_agents_chroma_collection"
3332

3433

35-
class ChromaVectorStoreConnection(BaseVectorStoreConnection):
36-
"""ChromaDB Vector Store Connection which manages connection to ChromaDB.
34+
class ChromaVectorStore(BaseVectorStore):
35+
"""ChromaDB vector store that handles connection and semantic search.
3736
3837
Visit https://docs.trychroma.com/ for ChromaDB documentation.
3938
@@ -59,8 +58,15 @@ class ChromaVectorStoreConnection(BaseVectorStoreConnection):
5958
ChromaDB tenant for multi-tenancy support (default: "default_tenant").
6059
database : str
6160
ChromaDB database name (default: "default_database").
61+
collection : str
62+
Name of the ChromaDB collection to use (default: flink_agents_collection).
63+
collection_metadata : Dict[str, Any]
64+
Metadata for the collection (optional).
65+
create_collection_if_not_exists : bool
66+
Whether to create the collection if it doesn't exist (default: True).
6267
"""
6368

69+
# Connection configuration
6470
persist_directory: str | None = Field(
6571
default=None,
6672
description="Directory for persistent storage. If None, uses in-memory client.",
@@ -90,36 +96,60 @@ class ChromaVectorStoreConnection(BaseVectorStoreConnection):
9096
description="ChromaDB database name.",
9197
)
9298

99+
# Collection configuration
100+
collection: str = Field(
101+
default=DEFAULT_COLLECTION,
102+
description="Name of the ChromaDB collection to use.",
103+
)
104+
collection_metadata: Dict[str, Any] = Field(
105+
default_factory=dict,
106+
description="Metadata for the collection.",
107+
)
108+
create_collection_if_not_exists: bool = Field(
109+
default=True,
110+
description="Whether to create the collection if it doesn't exist.",
111+
)
112+
93113
__client: ChromaClient | None = None
94114

95115
def __init__(
96116
self,
117+
*,
118+
embedding_model: str,
97119
persist_directory: str | None = None,
98120
host: str | None = None,
99121
port: int | None = 8000,
100122
api_key: str | None = None,
101123
client_settings: Settings | None = None,
102124
tenant: str = "default_tenant",
103125
database: str = "default_database",
126+
collection: str = DEFAULT_COLLECTION,
127+
collection_metadata: Dict[str, Any] | None = None,
128+
create_collection_if_not_exists: bool = True,
104129
**kwargs: Any,
105130
) -> None:
106131
"""Init method."""
132+
if collection_metadata is None:
133+
collection_metadata = {}
107134
super().__init__(
135+
embedding_model=embedding_model,
108136
persist_directory=persist_directory,
109137
host=host,
110138
port=port,
111139
api_key=api_key,
112140
client_settings=client_settings,
113141
tenant=tenant,
114142
database=database,
143+
collection=collection,
144+
collection_metadata=collection_metadata,
145+
create_collection_if_not_exists=create_collection_if_not_exists,
115146
**kwargs,
116147
)
117148

118149
@property
119150
def client(self) -> ChromaClient:
120151
"""Return ChromaDB client, creating it if necessary."""
121152
if self.__client is None:
122-
123153
# Choose client type based on configuration
124154
if self.api_key is not None:
125155
# Cloud mode
@@ -155,7 +185,16 @@ def client(self) -> ChromaClient:
155185

156186
return self.__client
157187

158-
def query(self, embedding: List[float], limit: int = 10, **kwargs: Any) -> List[Document]:
188+
@property
189+
def store_kwargs(self) -> Dict[str, Any]:
190+
"""Return ChromaDB-specific setup settings."""
191+
return {
192+
"collection": self.collection,
193+
"collection_metadata": self.collection_metadata,
194+
"create_collection_if_not_exists": self.create_collection_if_not_exists,
195+
}
196+
197+
def query_embedding(self, embedding: List[float], limit: int = 10, **kwargs: Any) -> List[Document]:
159198
"""Perform vector search using pre-computed embedding.
160199
161200
Args:
@@ -167,9 +206,9 @@ def query(self, embedding: List[float], limit: int = 10, **kwargs: Any) -> List[
167206
List of documents matching the search criteria
168207
"""
169208
# Extract ChromaDB-specific parameters
170-
collection_name = kwargs.get("collection", DEFAULT_COLLECTION)
171-
collection_metadata = kwargs.get("collection_metadata", {})
172-
create_collection_if_not_exists = kwargs.get("create_collection_if_not_exists", True)
209+
collection_name = kwargs.get("collection", self.collection)
210+
collection_metadata = kwargs.get("collection_metadata", self.collection_metadata)
211+
create_collection_if_not_exists = kwargs.get("create_collection_if_not_exists", self.create_collection_if_not_exists)
173212
where = kwargs.get("where") # Metadata filters
174213

175214
# Get or create collection based on configuration
@@ -206,62 +245,3 @@ def query(self, embedding: List[float], limit: int = 10, **kwargs: Any) -> List[
206245

207246
return documents
208247

209-
210-
class ChromaVectorStoreSetup(BaseVectorStoreSetup):
211-
"""ChromaDB vector store setup which manages collection configuration
212-
and coordinates with embedding models for semantic search.
213-
214-
Attributes:
215-
----------
216-
collection : str
217-
Name of the ChromaDB collection to use (default: flink_agents_collection).
218-
collection_metadata : Dict[str, Any]
219-
Metadata for the collection (optional).
220-
create_collection_if_not_exists : bool
221-
Whether to create the collection if it doesn't exist (default: True).
222-
"""
223-
224-
collection: str = Field(
225-
default=DEFAULT_COLLECTION,
226-
description="Name of the ChromaDB collection to use.",
227-
)
228-
collection_metadata: Dict[str, Any] = Field(
229-
default_factory=dict,
230-
description="Metadata for the collection.",
231-
)
232-
create_collection_if_not_exists: bool = Field(
233-
default=True,
234-
description="Whether to create the collection if it doesn't exist.",
235-
)
236-
237-
def __init__(
238-
self,
239-
*,
240-
connection: str,
241-
embedding_model: str,
242-
collection: str = DEFAULT_COLLECTION,
243-
collection_metadata: Dict[str, Any] | None = None,
244-
create_collection_if_not_exists: bool = True,
245-
**kwargs: Any,
246-
) -> None:
247-
"""Init method."""
248-
if collection_metadata is None:
249-
collection_metadata = {}
250-
super().__init__(
251-
connection=connection,
252-
embedding_model=embedding_model,
253-
collection=collection,
254-
collection_metadata=collection_metadata,
255-
create_collection_if_not_exists=create_collection_if_not_exists,
256-
**kwargs,
257-
)
258-
259-
@property
260-
def store_kwargs(self) -> Dict[str, Any]:
261-
"""Return ChromaDB-specific setup settings passed to connection."""
262-
return {
263-
"collection": self.collection,
264-
"collection_metadata": self.collection_metadata,
265-
"create_collection_if_not_exists": self.create_collection_if_not_exists,
266-
}
267-

0 commit comments

Comments
 (0)