Skip to content

Commit

Permalink
[FSTORE-1078] Support Similarity Search in the Feature Store v1.5 (#1529
Browse files Browse the repository at this point in the history
)

* [FSTORE-1224] Validate the embedding index and features before creating feature group (#1747)

* validate embedding index and feature

* fix style

* fix comment

(cherry picked from commit 798c399)

* [APPEND][FSTORE-1202] Fix deleting external embedding feature group (#1752)

* remove metadata at the end

* fix comment

(cherry picked from commit 2849092)

* [FSTORE-1226] Make sure the index mapping limit does not exceed when creating feature group (#1756)

* validateWithinMappingLimit

* validate when appending features

* skip columns check if embedding

* check opensearch status code

* fix validate index creation

* fix style

* fix test

* remove test

(cherry picked from commit 5ff93f8)

* [FSTORE-1317] Handle opensearch bulk response with partial failures (#1751)

* check hasFailures only

* set refresh policy

* increase timeout

* retry index creation

* increase retry

* cap sleep time

* refactor constant variable

* set thread

* address comments

* rename variable

(cherry picked from commit a310525)

* [FSTORE-1264] Test ingesting different data types to opensearch (#1764)

* define mapping

* check supported feature types

* add test

(cherry picked from commit d94a981)

* [FSTORE-1241] Return online type in FG metadata when vector store is used (#1767)

* set embedding fg online type

* get opensearch type

* fix test

(cherry picked from commit 2420349)

* [APPEND][FSTORE-1226] Validate mapping size including sub-field (#1769)

* validate mapping size

* fix test

(cherry picked from commit 18285b3)

* [FSTORE-1378] Handle Opensearch vector database error (#1774)

* refactor retry

* opensearch setting

* do not throw exception

* refactor index identifier

* index cleaner

* fix import

* return index name

* fix get index

* address comments

* fix timer interval

* address comments

* address comments

* constrained retry

(cherry picked from commit 0110e63)

* [APPEND][FSTORE-1378] Handle opensearch restart in OpensearchVectorDatabase #1782

(cherry picked from commit 964aaa9)

* [APPEND][FSTORE-1378] Fix get index if not exist (#1785)

(cherry picked from commit d228a9b)

* [FSTORE-1314] Support defining similar function in embedding index (#1783)

* use sim function

* add license

* update license

* refactor getOpensearchFunction

(cherry picked from commit 5bf8980)
  • Loading branch information
kennethmhc authored May 17, 2024
1 parent 9aa1c5c commit d34dfd7
Show file tree
Hide file tree
Showing 28 changed files with 1,443 additions and 276 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def create_cached_featuregroup(project_id, featurestore_id, features: nil, featu
{
"name": "col1",
"type": "array<bigint>",
"description": "testfeaturedescription",
"primary": false,
"partition": false,
"hudiPrecombineKey": false,
Expand Down
20 changes: 20 additions & 0 deletions hopsworks-IT/src/test/ruby/spec/similarity_search_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,26 @@
expect(parsed_json["embeddingIndex"]["indexName"]).to eq("#{project.id}__embedding_test_index")
expect(parsed_json["embeddingIndex"]["features"].length).to be 1
expect(parsed_json["embeddingIndex"]["colPrefix"]).to eq("")
# check features type
expect(parsed_json.first["features"][0].key?("name")).to be true
expect(parsed_json.first["features"][0]["name"]).to eql("id")
expect(parsed_json.first["features"][0].key?("type")).to be true
expect(parsed_json.first["features"][0]["type"].downcase).to eql("int")
expect(parsed_json.first["features"][0].key?("onlineType")).to be true
expect(parsed_json.first["features"][0]["onlineType"].downcase).to eql("int")
expect(parsed_json.first["features"][0].key?("description")).to be true
expect(parsed_json.first["features"][0].key?("primary")).to be true
expect(parsed_json.first["features"][0]["primary"]).to eql(true)

expect(parsed_json.first["features"][1].key?("name")).to be true
expect(parsed_json.first["features"][1]["name"]).to eql("col1")
expect(parsed_json.first["features"][1].key?("type")).to be true
expect(parsed_json.first["features"][1]["type"].downcase).to eql("array<bigint>")
expect(parsed_json.first["features"][1].key?("onlineType")).to be true
expect(parsed_json.first["features"][0]["onlineType"].downcase).to eql("knn_vector")
expect(parsed_json.first["features"][1].key?("description")).to be true
expect(parsed_json.first["features"][1].key?("primary")).to be true
expect(parsed_json.first["features"][1]["primary"]).to eql(false)
end

it "should be able to delete a feature group with embedding and custom index" do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.EmbeddingDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController;
import io.hops.hopsworks.common.models.ModelFacade;
Expand All @@ -32,7 +33,9 @@
import io.hops.hopsworks.persistence.entity.models.version.ModelVersion;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.restutils.RESTCodes;
import io.hops.hopsworks.vectordb.Field;
import io.hops.hopsworks.vectordb.Index;
import io.hops.hopsworks.vectordb.OpensearchVectorDatabase;
import io.hops.hopsworks.vectordb.VectorDatabaseException;

import javax.ejb.EJB;
Expand All @@ -43,6 +46,8 @@
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.logging.Level;
Expand All @@ -62,20 +67,105 @@ public class EmbeddingController {
private ModelVersionFacade modelVersionFacade;
@EJB
private ModelFacade modelFacade;
private static final String embeddingIndexIdentifier = "__embedding";

public void createVectorDbIndex(Project project, Featuregroup featureGroup)
public EmbeddingController() {
}

// For testing
EmbeddingController(VectorDatabaseClient vectorDatabaseClient, Settings settings) {
this.vectorDatabaseClient = vectorDatabaseClient;
this.settings = settings;
}

public void createVectorDbIndex(Project project, Featuregroup featureGroup, List<FeatureGroupFeatureDTO> features)
throws FeaturestoreException {
Index index = new Index(featureGroup.getEmbedding().getVectorDbIndexName());
try {
vectorDatabaseClient.getClient().createIndex(index, createIndex(featureGroup.getEmbedding().getColPrefix(),
featureGroup.getEmbedding().getEmbeddingFeatures()), true);
if (isDefaultVectorDbIndex(project, index.getName())) {
vectorDatabaseClient.getClient().createIndex(index, createIndex(featureGroup.getEmbedding().getColPrefix(),
featureGroup.getEmbedding().getEmbeddingFeatures(), features), true);
vectorDatabaseClient.getClient().addFields(index, createMapping(featureGroup.getEmbedding().getColPrefix(),
featureGroup.getEmbedding().getEmbeddingFeatures()));
featureGroup.getEmbedding().getEmbeddingFeatures(), features));
} else {
vectorDatabaseClient.getClient().createIndex(index, createIndex(featureGroup.getEmbedding().getColPrefix(),
featureGroup.getEmbedding().getEmbeddingFeatures(), features), false);
}

} catch (VectorDatabaseException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_CREATE_FEATUREGROUP,
Level.FINE, "Cannot create opensearch vectordb index: " + index.getName());
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_GET_VECTOR_DB_INDEX,
Level.FINE, String.format(
"Cannot create opensearch vectordb index: %s. Reason: %s", index.getName(), e.getMessage()));
}
}

public void validateWithinMappingLimit(Project project, Index index, Integer numFeatures)
throws FeaturestoreException {
String indexName = getProjectIndexName(project, index.getName());
try {
int remainingMappingSize;
if (indexExist(indexName)) {
remainingMappingSize = settings.getOpensearchDefaultIndexMappingLimit()
- vectorDatabaseClient.getClient().getSchema(new Index(indexName)).stream()
.mapToInt(field -> countMappingSizeIncludingSubFields(field.getType())).sum();
} else {
remainingMappingSize = settings.getOpensearchDefaultIndexMappingLimit();
}
if (numFeatures > remainingMappingSize) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.VECTOR_DATABASE_INDEX_MAPPING_LIMIT_EXCEEDED,
Level.FINE, String.format("Number of features exceeds the limit of the index '%s'."
+ " Maximum number of features can be added/created is %d."
+ " Reduce the number of features or use a different embedding index.",
index.getName(), remainingMappingSize));
}
} catch (VectorDatabaseException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_GET_VECTOR_DB_INDEX,
Level.FINE, "Cannot create opensearch vectordb index: " + indexName, e.getMessage());
}
}

// In opensearch, if the type has sub-fields, it is considered as 2 mappings
// `"4160_col_98":{"type":"long"}` --> 1 mapping
// `"5438_binary":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}}` --> 2 mappings
private int countMappingSizeIncludingSubFields(Object value) {
int count = 1;
if (value instanceof Map) {
if (((Map<?, ?>) value).containsKey("fields")) {
count += 1;
}
}
return count;
}

public boolean indexExist(String name) throws FeaturestoreException {
try {
return vectorDatabaseClient.getClient().getIndex(name).isPresent();
} catch (VectorDatabaseException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_GET_VECTOR_DB_INDEX,
Level.FINE, "Cannot get opensearch vectordb index: " + name);
}
}

public void verifyIndexName(Project project, String name) throws FeaturestoreException {
if (name != null && !Strings.isNullOrEmpty(name)) {
String projectIndexName = getProjectIndexName(project, name);
if (indexExist(projectIndexName) && !isDefaultVectorDbIndex(project, projectIndexName)) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.EMBEDDING_INDEX_EXISTED, Level.FINE,
String.format("Provided embedding index `%s` already exists in the vector database.", projectIndexName));
}
}
}

String getProjectIndexName(Project project, String name) throws FeaturestoreException {
if (Strings.isNullOrEmpty(name)) {
return getDefaultVectorDbIndex(project);
} else {
String vectorDbIndexPrefix = getVectorDbIndexPrefix(project);
// In hopsworks opensearch, users can only access indexes which start with specific prefix
if (!name.startsWith(vectorDbIndexPrefix)) {
return vectorDbIndexPrefix + "_" + name;
}
return name;
}
}

Expand All @@ -94,15 +184,13 @@ public Embedding getEmbedding(Project project, EmbeddingDTO embeddingDTO, Featur
throws FeaturestoreException {
Embedding embedding = new Embedding();
embedding.setFeaturegroup(featuregroup);
String projectIndexName = getProjectIndexName(project, embeddingDTO.getIndexName());
embedding.setVectorDbIndexName(projectIndexName);
if (Strings.isNullOrEmpty(embeddingDTO.getIndexName())) {
embedding.setVectorDbIndexName(getDefaultVectorDbIndex(project));
embedding.setColPrefix(getVectorDbColPrefix(featuregroup));
} else {
String vectorDbIndexPrefix = getVectorDbIndexPrefix(project);
// In hopsworks opensearch, users can only access indexes which start with specific prefix
if (!embeddingDTO.getIndexName().startsWith(vectorDbIndexPrefix)) {
embedding.setVectorDbIndexName(
vectorDbIndexPrefix + "_" + embeddingDTO.getIndexName());
embedding.setColPrefix("");
}
if (isDefaultVectorDbIndex(project, embeddingDTO.getIndexName())) {
Expand Down Expand Up @@ -133,7 +221,7 @@ public Embedding getEmbedding(Project project, EmbeddingDTO embeddingDTO, Featur
public void dropEmbeddingForProject(Project project)
throws FeaturestoreException {
try {
for (Index index: vectorDatabaseClient.getClient().getAllIndices().stream()
for (Index index : vectorDatabaseClient.getClient().getAllIndices().stream()
.filter(index -> index.getName().startsWith(getVectorDbIndexPrefix(project))).collect(Collectors.toSet())) {
vectorDatabaseClient.getClient().deleteIndex(index);
}
Expand All @@ -143,6 +231,14 @@ public void dropEmbeddingForProject(Project project)
}
}

public Boolean isEmbeddingIndex(String indexName) {
return indexName.matches("^\\d+" + embeddingIndexIdentifier + ".*");
}

public Integer getProjectId(String indexName) {
return Integer.valueOf(indexName.split(embeddingIndexIdentifier)[0]);
}

public void dropEmbedding(Project project, Featuregroup featureGroup)
throws FeaturestoreException {
Index index = new Index(featureGroup.getEmbedding().getVectorDbIndexName());
Expand All @@ -160,7 +256,7 @@ public void dropEmbedding(Project project, Featuregroup featureGroup)
}
} catch (VectorDatabaseException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_DELETE_FEATUREGROUP,
Level.FINE, "Cannot delete documents from vectordb for feature group: " +
Level.FINE, "Cannot delete index from vectordb for feature group: " +
featureGroup.getName(), e.getMessage(), e);
}
}
Expand All @@ -174,37 +270,66 @@ private boolean isPreviousDefaultVectorDbIndex(Embedding embedding)
}

private void removeDocuments(Featuregroup featureGroup) throws FeaturestoreException, VectorDatabaseException {
EmbeddingFeature feature = featureGroup.getEmbedding().getEmbeddingFeatures().stream().findFirst().get();
Set<String> fields =
vectorDatabaseClient.getClient().getSchema(new Index(featureGroup.getEmbedding().getVectorDbIndexName()))
.stream().map(Field::getName).collect(Collectors.toSet());
// Get any of the embedding feature which exists in the vector database for removing document if it is not null
Optional<String> embeddingFeatureName = featureGroup
.getEmbedding().getEmbeddingFeatures().stream().map(feature -> feature.getEmbedding().getColPrefix() == null
? feature.getName()
: feature.getEmbedding().getColPrefix() + feature.getName()).filter(fields::contains).findFirst();
String matchQuery = "%s:*";

String field = feature.getEmbedding().getColPrefix() == null
? feature.getName()
: feature.getEmbedding().getColPrefix() + feature.getName();
vectorDatabaseClient.getClient().deleteByQuery(
new Index(featureGroup.getEmbedding().getVectorDbIndexName()),
String.format(matchQuery, field)
);
if (embeddingFeatureName.isPresent()) {
vectorDatabaseClient.getClient().deleteByQuery(
new Index(featureGroup.getEmbedding().getVectorDbIndexName()),
String.format(matchQuery, embeddingFeatureName.get())
);
}
}

protected String createMapping(String prefix, Collection<EmbeddingFeature> features) {
protected String createMapping(String prefix, Collection<EmbeddingFeature> embeddingFeatures,
List<FeatureGroupFeatureDTO> features) {
Set<String> embeddingFeatureNames =
embeddingFeatures.stream().map(EmbeddingFeature::getName).collect(Collectors.toSet());
String mappingString = "{\n" +
" \"properties\": {\n" +
"%s\n" +
" }\n" +
" }";
String fieldString = " \"%s\": {\n" +
String embeddingFieldString = " \"%s\": {\n" +
" \"type\": \"knn_vector\",\n" +
" \"dimension\": %d\n" +
" \"dimension\": %d,\n" +
" \"method\": {\n" +
" \"name\": \"hnsw\",\n" +
" \"space_type\": \"%s\",\n" +
" \"engine\": \"nmslib\"\n" +
" }\n" +
" }";
String fieldString = " \"%s\": {\n" +
" \"type\": \"%s\"\n" +
" }";
List<String> fieldMapping = Lists.newArrayList();
for (EmbeddingFeature feature : features) {

for (EmbeddingFeature feature : embeddingFeatures) {
fieldMapping.add(String.format(
fieldString, prefix + feature.getName(), feature.getDimension()));
embeddingFieldString,
prefix + feature.getName(), feature.getDimension(),
feature.getSimilarityFunctionType().getOpensearchFunction()));
}
for (FeatureGroupFeatureDTO feature : features) {
if (!embeddingFeatureNames.contains(feature.getName())) {
String type = OpensearchVectorDatabase.getDataType(feature.getType());
if (type != null) { // if type cannot be converted, opensearch will infer the type
fieldMapping.add(String.format(
fieldString, prefix + feature.getName(), type));
}
}
}
return String.format(mappingString, String.join(",\n", fieldMapping));
}

protected String createIndex(String prefix, Collection<EmbeddingFeature> features) {
protected String createIndex(String prefix, Collection<EmbeddingFeature> embeddingFeatures,
List<FeatureGroupFeatureDTO> features) {
String jsonString = "{\n" +
" \"settings\": {\n" +
" \"index\": {\n" +
Expand All @@ -214,17 +339,17 @@ protected String createIndex(String prefix, Collection<EmbeddingFeature> feature
" },\n" +
" \"mappings\": %s\n" +
"}";
return String.format(jsonString, createMapping(prefix, features));
return String.format(jsonString, createMapping(prefix, embeddingFeatures, features));

}

private String getDefaultVectorDbIndex(Project project) throws FeaturestoreException {
String getDefaultVectorDbIndex(Project project) throws FeaturestoreException {
Set<String> indexName = getAllDefaultVectorDbIndex(project);
// randomly select an index
return indexName.stream().sorted(Comparator.comparingInt(i -> RANDOM.nextInt())).findFirst().get();
}

private boolean isDefaultVectorDbIndex(Project project, String index) throws FeaturestoreException {
boolean isDefaultVectorDbIndex(Project project, String index) throws FeaturestoreException {
return getAllDefaultVectorDbIndex(project).contains(index);
}

Expand All @@ -247,8 +372,8 @@ private Set<String> getAllDefaultVectorDbIndex(Project project) throws Featurest
return indices;
}

private String getVectorDbIndexPrefix(Project project) {
return project.getId() + "__embedding";
String getVectorDbIndexPrefix(Project project) {
return project.getId() + embeddingIndexIdentifier;
}

private String getVectorDbColPrefix(Featuregroup featuregroup) {
Expand Down
Loading

0 comments on commit d34dfd7

Please sign in to comment.