Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] Changed strings to f-strings #431

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 26 additions & 53 deletions datastore/providers/milvus_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,10 @@ def _create_connection(self):
if (
x[1]
and ("address" in addr)
and (addr["address"] == "{}:{}".format(MILVUS_HOST, MILVUS_PORT))
and (addr["address"] == f"{MILVUS_HOST}:{MILVUS_PORT}")
):
self.alias = x[0]
logger.info(
"Reuse connection to Milvus server '{}:{}' with alias '{:s}'".format(
MILVUS_HOST, MILVUS_PORT, self.alias
)
)
logger.info(f"Reuse connection to Milvus server '{MILVUS_HOST}:{MILVUS_PORT}' with alias '{self.alias}'")
break

# Connect to the Milvus instance using the passed in Environment variables
Expand All @@ -159,17 +155,9 @@ def _create_connection(self):
password=MILVUS_PASSWORD, # type: ignore
secure=MILVUS_USE_SECURITY,
)
logger.info(
"Create connection to Milvus server '{}:{}' with alias '{:s}'".format(
MILVUS_HOST, MILVUS_PORT, self.alias
)
)
logger.info(f"Create connection to Milvus server '{MILVUS_HOST}:{MILVUS_PORT}' with alias '{self.alias}'")
except Exception as e:
logger.error(
"Failed to create connection to Milvus server '{}:{}', error: {}".format(
MILVUS_HOST, MILVUS_PORT, e
)
)
logger.error(f"Failed to create connection to Milvus server '{MILVUS_HOST}:{MILVUS_PORT}', error: {e}")

def _create_collection(self, collection_name, create_new: bool) -> None:
"""Create a collection based on environment and passed in variables.
Expand Down Expand Up @@ -197,9 +185,7 @@ def _create_collection(self, collection_name, create_new: bool) -> None:
)
self._schema_ver = "V2"
logger.info(
"Create Milvus collection '{}' with schema {} and consistency level {}".format(
collection_name, self._schema_ver, self._consistency_level
)
f"Create Milvus collection '{collection_name}' with schema {self._schema_ver} and consistency level {self._consistency_level}"
)
else:
# If the collection exists, point to it
Expand All @@ -209,15 +195,9 @@ def _create_collection(self, collection_name, create_new: bool) -> None:
if field.name == "id" and field.is_primary:
self._schema_ver = "V2"
break
logger.info(
"Milvus collection '{}' already exists with schema {}".format(
collection_name, self._schema_ver
)
)
logger.info(f"Milvus collection '{collection_name}' already exists with schema {self._schema_ver}")
except Exception as e:
logger.error(
"Failed to create collection '{}', error: {}".format(collection_name, e)
)
logger.error(f"Failed to create collection '{collection_name}', error: {e}")

def _create_index(self):
# TODO: verify index/search params passed by os.environ
Expand All @@ -229,7 +209,7 @@ def _create_index(self):
if self.index_params is not None:
# Convert the string format to JSON format parameters passed by MILVUS_INDEX_PARAMS
self.index_params = json.loads(self.index_params)
logger.info("Create Milvus index: {}".format(self.index_params))
logger.info(f"Create Milvus index: {self.index_params}")
# Create an index on the 'embedding' field with the index params found in init
self.col.create_index(
EMBEDDING_FIELD, index_params=self.index_params
Expand All @@ -242,17 +222,10 @@ def _create_index(self):
"index_type": "HNSW",
"params": {"M": 8, "efConstruction": 64},
}
logger.info(
"Attempting creation of Milvus '{}' index".format(
i_p["index_type"]
)
)
logger.info(f"Attempting creation of Milvus '{i_p['index_type']}' index")
self.col.create_index(EMBEDDING_FIELD, index_params=i_p)
self.index_params = i_p
logger.info(
"Creation of Milvus '{}' index successful".format(
i_p["index_type"]
)
logger.info(f"Creation of Milvus '{i_p['index_type']}' index successful")
)
# If create fails, most likely due to being Zilliz Cloud instance, try to create an AutoIndex
except MilvusException:
Expand All @@ -271,7 +244,7 @@ def _create_index(self):
for index in self.col.indexes:
idx = index.to_dict()
if idx["field"] == EMBEDDING_FIELD:
logger.info("Index already exists: {}".format(idx))
logger.info(f"Index already exists: {idx}")
self.index_params = idx["index_param"]
break

Expand Down Expand Up @@ -304,9 +277,9 @@ def _create_index(self):
self.search_params = default_search_params[
self.index_params["index_type"]
]
logger.info("Milvus search parameters: {}".format(self.search_params))
logger.info(f"Milvus search parameters: {self.search_params}")
except Exception as e:
logger.error("Failed to create index, error: {}".format(e))
logger.error(f"Failed to create index, error: {e}")

async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]:
"""Upsert chunks into the datastore.
Expand Down Expand Up @@ -362,7 +335,7 @@ async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]:
# self.col.flush()
return doc_ids
except Exception as e:
logger.error("Failed to insert records, error: {}".format(e))
logger.error(f"Failed to insert records, error: {e}")
return []

def _get_values(self, chunk: DocumentChunk) -> List[any] | None: # type: ignore
Expand Down Expand Up @@ -396,7 +369,7 @@ def _get_values(self, chunk: DocumentChunk) -> List[any] | None: # type: ignore
x = values.get(key) or default
# If one of our required fields is missing, ignore the entire entry
if x is Required:
logger.info("Chunk " + values["id"] + " missing " + key + " skipping")
logger.info(f"Chunk {values['id']} missing {key} skipping")
return None
# Add the corresponding value if it passes the tests
ret.append(x)
Expand Down Expand Up @@ -468,7 +441,7 @@ async def _single_query(query: QueryWithEmbedding) -> QueryResult:

return QueryResult(query=query.query, results=results)
except Exception as e:
logger.error("Failed to query, error: {}".format(e))
logger.error(f"Failed to query, error: {e}")
return QueryResult(query=query.query, results=[])

results: List[QueryResult] = await asyncio.gather(
Expand All @@ -493,7 +466,7 @@ async def delete(
if delete_all:
coll_name = self.col.name
logger.info(
"Delete the entire collection {} and create new one".format(coll_name)
f"Delete the entire collection {coll_name} and create new one"
)
# Release the collection from memory
self.col.release()
Expand All @@ -514,14 +487,14 @@ async def delete(
# in future version we can delete by expression
if (ids is not None) and len(ids) > 0:
# Add quotation marks around the string format id
ids = ['"' + str(id) + '"' for id in ids]
ids = [f'"{id}"' for id in ids]
# Query for the pk's of entries that match id's
ids = self.col.query(f"document_id in [{','.join(ids)}]")
# Convert to list of pks
pks = [str(entry[pk_name]) for entry in ids] # type: ignore
# for schema V2, the "id" is varchar, rewrite the expression
if self._schema_ver != "V1":
pks = ['"' + pk + '"' for pk in pks]
pks = [f'"{pk}"' for pk in pks]

# Delete by ids batch by batch(avoid too long expression)
logger.info(
Expand All @@ -537,7 +510,7 @@ async def delete(
# Increment our deleted count
delete_count += int(res.delete_count) # type: ignore
except Exception as e:
logger.error("Failed to delete by ids, error: {}".format(e))
logger.error(f"Failed to delete by ids, error: {e}")

try:
# Check if empty filter
Expand All @@ -552,7 +525,7 @@ async def delete(
pks = [str(entry[pk_name]) for entry in res] # type: ignore
# for schema V2, the "id" is varchar, rewrite the expression
if self._schema_ver != "V1":
pks = ['"' + pk + '"' for pk in pks]
pks = [f'"{pk}"' for pk in pks]
# Check to see if there are valid pk's to delete, delete batch by batch(avoid too long expression)
while len(pks) > 0: # type: ignore
batch_pks = pks[:batch_size]
Expand All @@ -562,7 +535,7 @@ async def delete(
# Increment our delete count
delete_count += int(res.delete_count) # type: ignore
except Exception as e:
logger.error("Failed to delete by filter, error: {}".format(e))
logger.error(f"Failed to delete by filter, error: {e}")

logger.info("{:d} records deleted".format(delete_count))

Expand All @@ -588,18 +561,18 @@ def _get_filter(self, filter: DocumentMetadataFilter) -> Optional[str]:
# Convert start_date to int and add greater than or equal logic
if field == "start_date":
filters.append(
"(created_at >= " + str(to_unix_timestamp(value)) + ")"
f"(created_at >= {to_unix_timestamp(value)})"
)
# Convert end_date to int and add less than or equal logic
elif field == "end_date":
filters.append(
"(created_at <= " + str(to_unix_timestamp(value)) + ")"
f"(created_at <= {to_unix_timestamp(value)})"
)
# Convert Source to its string value and check equivalency
elif field == "source":
filters.append("(" + field + ' == "' + str(value.value) + '")')
filters.append(f'({field} == "{value.value}")')
# Check equivalency of rest of string fields
else:
filters.append("(" + field + ' == "' + str(value) + '")')
filters.append(f'({field} == "{value}")')
# Join all our expressions with `and``
return " and ".join(filters)