Skip to content

Commit

Permalink
[FSTORE-1047] Support Similarity Search in the Feature Store (#1437)
Browse files Browse the repository at this point in the history
* [FSTORE-1047] Support Similarity Search in the Feature Store (#1609)

* rename to embeddingIndex

vector db client

handle exception

embedding controller

get project fs

address comment

return feature name in dto

set default project embedding

set default project embedding

add embedding

modify opensearch client creation

add vector db module

* remove empty line

* skip delete sql db when embedding

(cherry picked from commit 9f0a98c)

# Conflicts:
#	hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/FeaturestoreService.java
#	hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java
#	hopsworks-common/src/main/java/io/hops/hopsworks/common/util/Settings.java
#	pom.xml

* [APPEND][FSTORE-1047] Add get fg endpoint for onlinefs (#1632)

* rename to embeddingIndex

vector db client

handle exception

embedding controller

get project fs

address comment

return feature name in dto

set default project embedding

set default project embedding

add embedding

modify opensearch client creation

add vector db module

* remove empty line

* return fg for onlinefs

* allow job to get fs by id

* add consumer group to offset table

* remove jwt

* fix unit test

(cherry picked from commit 3f11dce)

* [FSTORE-1127] Remove log4j (#1644)

(cherry picked from commit e82d686)
  • Loading branch information
kennethmhc authored Dec 20, 2023
1 parent 3050dd2 commit 672c238
Show file tree
Hide file tree
Showing 33 changed files with 1,405 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.hops.hopsworks.api.featurestore;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.hops.hopsworks.api.featurestore.datavalidationv2.greatexpectations.GreatExpectationResource;
import io.hops.hopsworks.api.featurestore.featuregroup.FeaturegroupService;
import io.hops.hopsworks.api.featurestore.featureview.FeatureViewService;
Expand Down Expand Up @@ -50,10 +51,12 @@
import javax.enterprise.context.RequestScoped;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.GenericEntity;
import javax.ws.rs.core.MediaType;
Expand Down Expand Up @@ -119,16 +122,32 @@ public void setProjectId(Integer projectId) {
@AllowedProjectRoles({AllowedProjectRoles.DATA_OWNER, AllowedProjectRoles.DATA_SCIENTIST})
@JWTRequired(acceptedTokens = {Audience.API, Audience.JOB},
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
@ApiKeyRequired(acceptedScopes = {ApiScope.FEATURESTORE},
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
@ApiKeyRequired(acceptedScopes = {ApiScope.FEATURESTORE, ApiScope.KAFKA},
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER", "AGENT"})
@ApiOperation(value = "Get the list of feature stores for the project",
response = FeaturestoreDTO.class,
responseContainer = "List")
public Response getFeaturestores(@Context SecurityContext sc) throws FeaturestoreException {
List<FeaturestoreDTO> featurestores = featurestoreController.getFeaturestoresForProject(project);
public Response getFeaturestores(
@Context
SecurityContext sc,
@QueryParam("include_shared")
@DefaultValue("true")
@ApiParam(value = "include_shared=false",
allowableValues = "include_shared=false,include_shared=true",
defaultValue = "true")
Boolean includeShared
)
throws FeaturestoreException {
List<FeaturestoreDTO> featurestores;
if (includeShared) {
featurestores = featurestoreController.getFeaturestoresForProject(project);
} else {
featurestores = Lists.newArrayList(featurestoreController.convertFeaturestoreToDTO(
featurestoreController.getProjectFeaturestore(project)
));
}
GenericEntity<List<FeaturestoreDTO>> featurestoresGeneric =
new GenericEntity<List<FeaturestoreDTO>>(featurestores) {
};
new GenericEntity<List<FeaturestoreDTO>>(featurestores) {};
return noCacheResponse.getNoCacheResponseBuilder(Response.Status.OK).entity(featurestoresGeneric).build();
}

Expand All @@ -142,7 +161,8 @@ public Response getFeaturestores(@Context SecurityContext sc) throws Featurestor
@Path("/{featurestoreId: [0-9]+}")
@Produces(MediaType.APPLICATION_JSON)
@AllowedProjectRoles({AllowedProjectRoles.DATA_OWNER, AllowedProjectRoles.DATA_SCIENTIST})
@JWTRequired(acceptedTokens = {Audience.API}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
@JWTRequired(acceptedTokens = {Audience.API, Audience.JOB},
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
@ApiKeyRequired(acceptedScopes = {ApiScope.FEATURESTORE},
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
@ApiOperation(value = "Get featurestore with specific Id",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.hops.hopsworks.api.featurestore.datavalidationv2.suites.ExpectationSuiteResource;
import io.hops.hopsworks.api.featurestore.statistics.StatisticsResource;
import io.hops.hopsworks.api.featurestore.tag.FeatureGroupTagResource;
import io.hops.hopsworks.api.filter.JWTNotRequired;
import io.hops.hopsworks.api.jobs.JobDTO;
import io.hops.hopsworks.api.jobs.JobsBuilder;
import io.hops.hopsworks.api.provenance.FeatureGroupProvenanceResource;
Expand Down Expand Up @@ -301,6 +302,35 @@ public Response getFeatureGroup(@ApiParam(value = "Id of the featuregroup", requ
return noCacheResponse.getNoCacheResponseBuilder(Response.Status.OK).entity(featuregroupGeneric).build();
}

/**
* Endpoint for retrieving a featuregroup with a specified id in a specified featurestore for onlinefs
*
* @param featuregroupId id of the featuregroup
* @return JSON representation of the featuregroup
*/
@GET
@Path("/{featuregroupId: [0-9]+}/onlinefs")
@JWTNotRequired
@Produces(MediaType.APPLICATION_JSON)
@AllowedProjectRoles({AllowedProjectRoles.DATA_OWNER, AllowedProjectRoles.DATA_SCIENTIST})
@ApiKeyRequired(acceptedScopes = {ApiScope.FEATURESTORE, ApiScope.KAFKA},
allowedUserRoles = {"HOPS_SERVICE_USER", "AGENT"})
@ApiOperation(value = "Get specific featuregroup for onlinefs from a specific featurestore",
response = FeaturegroupDTO.class)
public Response getFeatureGroupForOnlinefs(
@ApiParam(value = "Id of the featuregroup", required = true)
@PathParam("featuregroupId") Integer featuregroupId,
@Context HttpServletRequest req,
@Context SecurityContext sc)
throws FeaturestoreException {
verifyIdProvided(featuregroupId);
Featuregroup featuregroup = featuregroupController.getFeaturegroupById(featurestore, featuregroupId);
FeaturegroupDTO featuregroupDTO = new FeaturegroupDTO(featuregroup);
GenericEntity<FeaturegroupDTO> featuregroupGeneric =
new GenericEntity<FeaturegroupDTO>(featuregroupDTO) {};
return noCacheResponse.getNoCacheResponseBuilder(Response.Status.OK).entity(featuregroupGeneric).build();
}

/**
* Retrieve a specific feature group based name. Allow filtering on version.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,18 @@ private List<TrainingDatasetFeatureDTO> makeFeatures(FeatureView featureView) {
Map<Integer, String> fsLookupTable = trainingDatasetController.getFsLookupTableFeatures(tdFeatures);
return tdFeatures
.stream()
.map(f -> new TrainingDatasetFeatureDTO(trainingDatasetController.checkPrefix(f), f.getType(),
.map(f -> new TrainingDatasetFeatureDTO(
trainingDatasetController.checkPrefix(f),
f.getType(),
f.getFeatureGroup() != null ?
new FeaturegroupDTO(f.getFeatureGroup().getFeaturestore().getId(),
fsLookupTable.get(f.getFeatureGroup().getFeaturestore().getId()),
f.getFeatureGroup().getId(), f.getFeatureGroup().getName(),
f.getFeatureGroup().getId(),
f.getFeatureGroup().getName(),
f.getFeatureGroup().getVersion(),
f.getFeatureGroup().isDeprecated())
: null,
f.getIndex(), f.isLabel(), f.isInferenceHelperColumn(), f.isTrainingHelperColumn()))
f.getName(), f.getIndex(), f.isLabel(), f.isInferenceHelperColumn(), f.isTrainingHelperColumn()))
.collect(Collectors.toList());
}
}
5 changes: 5 additions & 0 deletions hopsworks-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@
<artifactId>hopsworks-service-discovery</artifactId>
</dependency>

<dependency>
<groupId>io.hops.hopsworks</groupId>
<artifactId>vector-db</artifactId>
</dependency>

<dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public FeaturestoreStorageConnectorDTO createOfflineJdbcConnector(String databas
* @param featurestore the featurestore entity
* @return a DTO representation of the featurestore
*/
private FeaturestoreDTO convertFeaturestoreToDTO(Featurestore featurestore) {
public FeaturestoreDTO convertFeaturestoreToDTO(Featurestore featurestore) {
FeaturestoreDTO featurestoreDTO = new FeaturestoreDTO(featurestore);
String featureStoreName = getOfflineFeaturestoreDbName(featurestore);
// TODO(Fabio): remove this when we switch to the new UI.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* This file is part of Hopsworks
* Copyright (C) 2023, Hopsworks AB. All rights reserved
*
* Hopsworks is free software: you can redistribute it and/or modify it under the terms of
* the GNU Affero General Public License as published by the Free Software Foundation,
* either version 3 of the License, or (at your option) any later version.
*
* Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
* PURPOSE. See the GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License along with this program.
* If not, see <https://www.gnu.org/licenses/>.
*/

package io.hops.hopsworks.common.featurestore.embedding;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.hops.hopsworks.common.featurestore.featuregroup.EmbeddingDTO;
import io.hops.hopsworks.common.hdfs.Utils;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Embedding;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.EmbeddingFeature;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.restutils.RESTCodes;
import io.hops.hopsworks.vectordb.Index;
import io.hops.hopsworks.vectordb.VectorDatabaseException;

import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.logging.Level;
import java.util.stream.Collectors;

@Stateless
@TransactionAttribute(TransactionAttributeType.NEVER)
public class EmbeddingController {

@EJB
private Settings settings;
@EJB
private VectorDatabaseClient vectorDatabaseClient;

public void createVectorDbIndex(Project project, Featuregroup featureGroup)
throws FeaturestoreException {
Index index = new Index(featureGroup.getEmbedding().getVectorDbIndexName());
try {
vectorDatabaseClient.getClient().createIndex(index, createIndex(featureGroup.getEmbedding().getColPrefix(),
featureGroup.getEmbedding().getEmbeddingFeatures()), true);
if (isDefaultVectorDbIndex(project, index.getName())) {
vectorDatabaseClient.getClient().addFields(index, createMapping(featureGroup.getEmbedding().getColPrefix(),
featureGroup.getEmbedding().getEmbeddingFeatures()));
}
} catch (VectorDatabaseException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_CREATE_FEATUREGROUP,
Level.FINE, "Cannot create opensearch vectordb index: " + index.getName());
}
}

public Embedding getEmbedding(Project project, EmbeddingDTO embeddingDTO, Featuregroup featuregroup)
throws FeaturestoreException {
Embedding embedding = new Embedding();
embedding.setFeaturegroup(featuregroup);
if (embeddingDTO.getIndexName() == null) {
embedding.setVectorDbIndexName(getDefaultVectorDbIndex(project));
embedding.setColPrefix(getVectorDbColPrefix(featuregroup));
} else {
String vectorDbIndexPrefix = getVectorDbIndexPrefix(project);
// In hopsworks opensearch, users can only access indexes which start with specific prefix
if (!embeddingDTO.getIndexName().startsWith(vectorDbIndexPrefix)) {
embedding.setVectorDbIndexName(
vectorDbIndexPrefix + "_" + embeddingDTO.getIndexName());
embedding.setColPrefix("");
}
if (isDefaultVectorDbIndex(project, embeddingDTO.getIndexName())) {
embedding.setColPrefix(getVectorDbColPrefix(featuregroup));
}
}
embedding.setEmbeddingFeatures(
embeddingDTO.getFeatures()
.stream()
.map(mapping -> new EmbeddingFeature(embedding, mapping.getName(), mapping.getDimension(),
mapping.getSimilarityFunctionType()))
.collect(Collectors.toList())
);
return embedding;
}

protected String createMapping(String prefix, Collection<EmbeddingFeature> features) {
String mappingString = "{\n" +
" \"properties\": {\n" +
"%s\n" +
" }\n" +
" }";
String fieldString = " \"%s\": {\n" +
" \"type\": \"knn_vector\",\n" +
" \"dimension\": %d\n" +
" }";
List<String> fieldMapping = Lists.newArrayList();
for (EmbeddingFeature feature : features) {
fieldMapping.add(String.format(
fieldString, prefix + feature.getName(), feature.getDimension()));
}
return String.format(mappingString, String.join(",\n", fieldMapping));
}

protected String createIndex(String prefix, Collection<EmbeddingFeature> features) {
String jsonString = "{\n" +
" \"settings\": {\n" +
" \"index\": {\n" +
" \"knn\": \"true\",\n" +
" \"knn.algo_param.ef_search\": 512\n" +
" }\n" +
" },\n" +
" \"mappings\": %s\n" +
"}";
return String.format(jsonString, createMapping(prefix, features));

}

private String getDefaultVectorDbIndex(Project project) throws FeaturestoreException {
Set<String> indexName = getAllDefaultVectorDbIndex(project);
// randomly select an index
return indexName.stream().sorted(Comparator.comparingInt(i -> new Random().nextInt())).findFirst().get();
}

private boolean isDefaultVectorDbIndex(Project project, String index) throws FeaturestoreException {
return getAllDefaultVectorDbIndex(project).contains(index);
}

private Set<String> getAllDefaultVectorDbIndex(Project project) throws FeaturestoreException {
Set<String> indices;
if (!Strings.isNullOrEmpty(settings.getOpensearchDefaultEmbeddingIndexName())) {
indices = Arrays.stream(settings.getOpensearchDefaultEmbeddingIndexName().split(","))
.collect(Collectors.toSet());
} else {
indices = Sets.newHashSet();
for (int i = 0; i < settings.getOpensearchNumDefaultEmbeddingIndex(); i++) {
indices.add(getVectorDbIndexPrefix(project) + "_default_project_embedding_" + i);
}
}
if (indices.size() == 0) {
throw new FeaturestoreException(
RESTCodes.FeaturestoreErrorCode.OPENSEARCH_DEFAULT_EMBEDDING_INDEX_SUFFIX_NOT_DEFINED, Level.FINE,
"Default vector db index is not defined.");
}
return indices;
}

private String getVectorDbIndexPrefix(Project project) {
return project.getId() + "__embedding";
}

private String getVectorDbColPrefix(Featuregroup featuregroup) {
return Utils.getFeaturegroupName(featuregroup) + "_";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* This file is part of Hopsworks
* Copyright (C) 2023, Hopsworks AB. All rights reserved
*
* Hopsworks is free software: you can redistribute it and/or modify it under the terms of
* the GNU Affero General Public License as published by the Free Software Foundation,
* either version 3 of the License, or (at your option) any later version.
*
* Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
* PURPOSE. See the GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License along with this program.
* If not, see <https://www.gnu.org/licenses/>.
*/

package io.hops.hopsworks.common.featurestore.embedding;

import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import io.hops.hopsworks.common.opensearch.OpenSearchClient;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.OpenSearchException;
import io.hops.hopsworks.restutils.RESTCodes;
import io.hops.hopsworks.vectordb.VectorDatabase;
import io.hops.hopsworks.vectordb.VectorDatabaseFactory;

import javax.annotation.PreDestroy;
import javax.ejb.ConcurrencyManagement;
import javax.ejb.ConcurrencyManagementType;
import javax.ejb.EJB;
import javax.ejb.Singleton;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import java.util.logging.Level;
import java.util.logging.Logger;

@Singleton
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
@ConcurrencyManagement(ConcurrencyManagementType.BEAN)
public class VectorDatabaseClient {

@EJB
private OpenSearchClient openSearchClient;
private VectorDatabase vectorDatabase;
private static final Logger LOG = Logger.getLogger(EmbeddingController.class.getName());

public synchronized VectorDatabase getClient() throws FeaturestoreException {
if (vectorDatabase == null) {
try {
vectorDatabase = VectorDatabaseFactory.getOpensearchDatabase(openSearchClient.getClient());
} catch (OpenSearchException | ServiceDiscoveryException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_CREATE_FEATUREGROUP,
Level.FINE, "Cannot create opensearch vectordb");
}
}
return vectorDatabase;
}

@PreDestroy
private void close() {
try {
vectorDatabase.close();
} catch (Exception ex) {
LOG.log(Level.SEVERE, null, ex);
}
}
}
Loading

0 comments on commit 672c238

Please sign in to comment.