forked from milvus-io/bootcamp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
milvus_helpers.py
176 lines (155 loc) · 6.86 KB
/
milvus_helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import sys
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
from config import MILVUS_HOST, MILVUS_PORT, VECTOR_DIMENSION, METRIC_TYPE, SHARDS_NUM
from logs import LOGGER
class MilvusHelper:
"""
Say something about the ExampleCalass...
Args:
args_0 (`type`):
...
"""
def __init__(self):
try:
self.collection = None
connections.connect(host=MILVUS_HOST, port=MILVUS_PORT)
LOGGER.debug(f"Successfully connect to Milvus with IP:{MILVUS_HOST} and PORT:{MILVUS_PORT}")
except Exception as e:
LOGGER.error(f"Failed to connect Milvus: {e}")
sys.exit(1)
def set_collection(self, collection_name):
try:
if self.has_collection(collection_name):
self.collection = Collection(name=collection_name)
else:
raise Exception(f"There is no collection named:{collection_name}")
except Exception as e:
LOGGER.error(f"ERROR: {e}")
sys.exit(1)
def has_collection(self, collection_name):
# Return if Milvus has the collection
try:
return utility.has_collection(collection_name)
except Exception as e:
LOGGER.error(f"Failed to load data to Milvus: {e}")
sys.exit(1)
def create_collection(self, collection_name):
# Create milvus collection if not exists
try:
if not self.has_collection(collection_name):
field1 = FieldSchema(name="id", dtype=DataType.INT64, descrition="int64", is_primary=True,
auto_id=False)
field2 = FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, descrition="float vector",
dim=VECTOR_DIMENSION, is_primary=False)
schema = CollectionSchema(fields=[field1, field2], description="collection description")
self.collection = Collection(name=collection_name, schema=schema, shards_num=SHARDS_NUM)
LOGGER.debug("Create Milvus collection: {}".format(collection_name))
return True
else:
self.collection = Collection(collection_name)
LOGGER.debug(f"collection {collection_name} exists")
return True
except Exception as e:
LOGGER.error(f"Failed to load data to Milvus: {e}")
sys.exit(1)
def insert(self, collection_name, vectors, ids=None):
# Batch insert vectors to milvus collection
try:
if not self.has_collection(collection_name):
self.create_collection(collection_name)
else:
self.collection = Collection(collection_name)
self.collection.insert([ids, vectors])
LOGGER.debug(
f"Insert vectors to Milvus in collection: {collection_name} with {len(vectors)} rows")
return ids
except Exception as e:
LOGGER.error(f"Failed to load data to Milvus: {e}")
sys.exit(1)
def create_index(self, collection_name, index_params):
# Create IVF_FLAT index on milvus collection
try:
self.set_collection(collection_name)
status = self.collection.create_index(field_name="embedding", index_params=index_params)
if not status.code:
LOGGER.debug(
f"Successfully create index in collection:{collection_name} with param:{index_params}")
return status
else:
raise Exception(status.message)
except Exception as e:
LOGGER.error(f"Failed to create index: {e}")
sys.exit(1)
def delete_collection(self, collection_name):
# Delete Milvus collection
try:
utility.drop_collection(collection_name)
LOGGER.debug("Successfully drop collection!")
return "ok"
except Exception as e:
LOGGER.error("Failed to drop collection: {}".format(e))
sys.exit(1)
def search_vectors(self, collection_name, vectors, top_k, search_params):
# Search vector in milvus collection
try:
self.set_collection(collection_name)
res = self.collection.search(vectors, anns_field="embedding", param=search_params, limit=top_k)
LOGGER.debug("Successfully search in collection: {}".format(res))
return res
except Exception as e:
LOGGER.error("Failed to search vectors in Milvus: {}".format(e))
sys.exit(1)
def count(self, collection_name):
# Get the number of milvus collection
try:
self.set_collection(collection_name)
num = self.collection.num_entities
LOGGER.debug(f"Successfully get the num:{num} of the collection:{collection_name}")
return num
except Exception as e:
LOGGER.error(f"Failed to count vectors in Milvus: {e}")
sys.exit(1)
def get_index_params(self, collection_name):
# get index info
self.set_collection(collection_name)
return [index.params for index in self.collection.indexes]
def create_partition(self, collection_name, partition_name):
# create a partition for Milvus
self.set_collection(collection_name)
if self.collection.has_partition(partition_name):
return f"This partition {partition_name} exists"
else:
partition = self.collection.create_partition(partition_name)
return partition
def delete_index(self, collection_name):
# drop index
self.set_collection(collection_name)
self.collection.drop_index()
def load_data(self, collection_name):
# load data from disk to
try:
self.set_collection(collection_name)
self.collection.load()
except Exception as e:
LOGGER.error(f"Failed load data: {e}")
sys.exit(1)
def list_collection(self):
# List all collections.
return utility.list_collections()
def get_loading_progress(self, collection_name):
# Query the progress of loading.
return utility.loading_progress(collection_name)
def get_index_progress(self, collection_name):
# Query the progress of index building.
return utility.index_building_progress(collection_name)
def release_data(self, collection_name):
# release collection data from memory
try:
self.set_collection(collection_name)
self.collection.release()
except Exception as e:
LOGGER.error(f"Failed release data: {e}")
sys.exit(1)
def calculate_distance(self, vectors_left, vectors_right):
# Calculate distance between two vector arrays.
return utility.calc_distance(vectors_left, vectors_right)