Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Add GCS registry support to feast-serving #34

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions common-test/src/main/java/feast/common/it/DataGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,34 @@ public static FeatureTableSpec createFeatureTableSpec(
.build();
}

// Create a FeatureTableSpec with project
public static FeatureTableSpec createFeatureTableSpec(
String name,
List<String> entities,
ImmutableMap<String, ValueProto.ValueType.Enum> features,
int maxAgeSecs,
Map<String, String> labels,
String project) {

return FeatureTableSpec.newBuilder()
.setName(name)
.addAllEntities(entities)
.addAllFeatures(
features.entrySet().stream()
.map(
entry ->
FeatureSpecV2.newBuilder()
.setName(entry.getKey())
.setValueType(entry.getValue())
.putAllLabels(labels)
.build())
.collect(Collectors.toList()))
.setMaxAge(Duration.newBuilder().setSeconds(maxAgeSecs).build())
.putAllLabels(labels)
.setProject(project)
.build();
}

public static DataSource createFileDataSourceSpec(
String fileURL, String timestampColumn, String datePartitionColumn) {
return DataSource.newBuilder()
Expand Down
6 changes: 6 additions & 0 deletions serving/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<version>1.111.1</version>
</dependency>


<!-- TODO: SLF4J is being used via Lombok, but also jog4j - pick one -->
<dependency>
Expand Down
41 changes: 20 additions & 21 deletions serving/src/main/java/feast/serving/config/FeastProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Positive;
import org.apache.logging.log4j.core.config.plugins.validation.constraints.ValidHost;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.info.BuildProperties;
Expand Down Expand Up @@ -66,11 +65,11 @@ public FeastProperties() {}
/* Feast Serving build version */
@NotBlank private String version = "unknown";

/* Feast Core host to connect to. */
@ValidHost @NotBlank private String coreHost;
/* Bucket name for Feast object registry. */
@NotBlank private String bucketName;

/* Feast Core port to connect to. */
@Positive private int coreGrpcPort;
/* Object name for Feast object registry. */
@NotBlank private String objectName;

private CoreAuthenticationProperties coreAuthentication;

Expand Down Expand Up @@ -181,39 +180,39 @@ public void setVersion(String version) {
}

/**
* Gets Feast Core host.
* Gets Feast object registry bucket name.
*
* @return Feast Core host
* @return Feast object registry bucket name
*/
public String getCoreHost() {
return coreHost;
public String getBucketName() {
return bucketName;
}

/**
* Sets Feast Core host to connect to.
* Sets Feast object registry bucket name.
*
* @param coreHost Feast Core host
* @param bucketName Feast object registry bucket name
*/
public void setCoreHost(String coreHost) {
this.coreHost = coreHost;
public void setBucketName(String bucketName) {
this.bucketName = bucketName;
}

/**
* Gets Feast Core gRPC port.
* Gets Feast object registry object name.
*
* @return Feast Core gRPC port
* @return Feast object registry object name
*/
public int getCoreGrpcPort() {
return coreGrpcPort;
public String getObjectName() {
return objectName;
}

/**
* Sets Feast Core gRPC port.
* Sets Feast object registry object name.
*
* @param coreGrpcPort gRPC port of Feast Core
* @param objectName object registry object name
*/
public void setCoreGrpcPort(int coreGrpcPort) {
this.coreGrpcPort = coreGrpcPort;
public void setObjectName(String objectName) {
this.objectName = objectName;
}

/**
Expand Down
19 changes: 8 additions & 11 deletions serving/src/main/java/feast/serving/config/SpecServiceConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.serving.specs.CachedSpecService;
import feast.serving.specs.CoreSpecService;
import io.grpc.CallCredentials;
import feast.serving.specs.RegistrySpecService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -34,14 +32,14 @@
public class SpecServiceConfig {

private static final Logger log = org.slf4j.LoggerFactory.getLogger(SpecServiceConfig.class);
private String feastCoreHost;
private int feastCorePort;
private String bucketName;
private String objectName;
private int feastCachedSpecServiceRefreshInterval;

@Autowired
public SpecServiceConfig(FeastProperties feastProperties) {
this.feastCoreHost = feastProperties.getCoreHost();
this.feastCorePort = feastProperties.getCoreGrpcPort();
this.bucketName = feastProperties.getBucketName();
this.objectName = feastProperties.getObjectName();
this.feastCachedSpecServiceRefreshInterval = feastProperties.getCoreCacheRefreshInterval();
}

Expand All @@ -60,11 +58,10 @@ public ScheduledExecutorService cachedSpecServiceScheduledExecutorService(
}

@Bean
public CachedSpecService specService(ObjectProvider<CallCredentials> callCredentials)
public CachedSpecService specService()
throws InvalidProtocolBufferException, JsonProcessingException {
CoreSpecService coreService =
new CoreSpecService(feastCoreHost, feastCorePort, callCredentials);
CachedSpecService cachedSpecStorage = new CachedSpecService(coreService);
RegistrySpecService registryService = new RegistrySpecService(bucketName, objectName);
CachedSpecService cachedSpecStorage = new CachedSpecService(registryService);
try {
cachedSpecStorage.populateCache();
} catch (Exception e) {
Expand Down
68 changes: 26 additions & 42 deletions serving/src/main/java/feast/serving/specs/CachedSpecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import feast.proto.core.CoreServiceProto;
import feast.proto.core.CoreServiceProto.GetFeatureTableRequest;
import feast.proto.core.CoreServiceProto.ListFeatureTablesRequest;
import feast.proto.core.CoreServiceProto.ListFeatureTablesResponse;
import feast.proto.core.CoreServiceProto.ListProjectsRequest;
import feast.proto.core.FeatureProto;
import feast.proto.core.FeatureTableProto.FeatureTable;
import feast.proto.core.FeatureTableProto.FeatureTableSpec;
import feast.proto.serving.ServingAPIProto;
import feast.serving.exception.SpecRetrievalException;
import io.grpc.StatusRuntimeException;
import io.prometheus.client.Gauge;
import java.util.HashMap;
import java.util.List;
Expand All @@ -37,14 +35,14 @@
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;

/** In-memory cache of specs hosted in Feast Core. */
/** In-memory cache of specs from the object store registry. */
public class CachedSpecService {

private static final int MAX_SPEC_COUNT = 1000;
private static final Logger log = org.slf4j.LoggerFactory.getLogger(CachedSpecService.class);
private static final String DEFAULT_PROJECT_NAME = "default";

private final CoreSpecService coreService;
private final RegistrySpecService registryService;

private static Gauge cacheLastUpdated =
Gauge.build()
Expand All @@ -66,8 +64,8 @@ public class CachedSpecService {
ImmutablePair<String, ServingAPIProto.FeatureReferenceV2>, FeatureProto.FeatureSpecV2>
featureCache;

public CachedSpecService(CoreSpecService coreService) {
this.coreService = coreService;
public CachedSpecService(RegistrySpecService registryService) {
this.registryService = registryService;

CacheLoader<ImmutablePair<String, String>, FeatureTableSpec> featureTableCacheLoader =
CacheLoader.from(k -> retrieveSingleFeatureTable(k.getLeft(), k.getRight()));
Expand All @@ -83,7 +81,7 @@ public CachedSpecService(CoreSpecService coreService) {

/**
* Reload the store configuration from the given config path, then retrieve the necessary specs
* from core to preload the cache.
* from the object store registry to preload the cache.
*/
public void populateCache() {
ImmutablePair<
Expand Down Expand Up @@ -127,47 +125,33 @@ public void scheduledPopulateCache() {
HashMap<ImmutablePair<String, ServingAPIProto.FeatureReferenceV2>, FeatureProto.FeatureSpecV2>
features = new HashMap<>();

List<String> projects =
coreService.listProjects(ListProjectsRequest.newBuilder().build()).getProjectsList();

for (String project : projects) {
try {
ListFeatureTablesResponse featureTablesResponse =
coreService.listFeatureTables(
ListFeatureTablesRequest.newBuilder()
.setFilter(ListFeatureTablesRequest.Filter.newBuilder().setProject(project))
.build());
Map<ServingAPIProto.FeatureReferenceV2, FeatureProto.FeatureSpecV2> featureRefSpecMap =
new HashMap<>();
for (FeatureTable featureTable : featureTablesResponse.getTablesList()) {
FeatureTableSpec spec = featureTable.getSpec();
featureTables.put(ImmutablePair.of(project, spec.getName()), spec);

String featureTableName = spec.getName();
List<FeatureProto.FeatureSpecV2> featureSpecs = spec.getFeaturesList();
for (FeatureProto.FeatureSpecV2 featureSpec : featureSpecs) {
ServingAPIProto.FeatureReferenceV2 featureReference =
ServingAPIProto.FeatureReferenceV2.newBuilder()
.setFeatureTable(featureTableName)
.setName(featureSpec.getName())
.build();
features.put(ImmutablePair.of(project, featureReference), featureSpec);
}
}

} catch (StatusRuntimeException e) {
throw new RuntimeException(
String.format("Unable to retrieve specs matching project %s", project), e);
ListFeatureTablesResponse featureTablesResponse =
registryService.listFeatureTables(ListFeatureTablesRequest.newBuilder().build());
Map<ServingAPIProto.FeatureReferenceV2, FeatureProto.FeatureSpecV2> featureRefSpecMap =
new HashMap<>();
for (FeatureTable featureTable : featureTablesResponse.getTablesList()) {
FeatureTableSpec spec = featureTable.getSpec();
featureTables.put(ImmutablePair.of(spec.getProject(), spec.getName()), spec);

String featureTableName = spec.getName();
List<FeatureProto.FeatureSpecV2> featureSpecs = spec.getFeaturesList();
for (FeatureProto.FeatureSpecV2 featureSpec : featureSpecs) {
ServingAPIProto.FeatureReferenceV2 featureReference =
ServingAPIProto.FeatureReferenceV2.newBuilder()
.setFeatureTable(featureTableName)
.setName(featureSpec.getName())
.build();
features.put(ImmutablePair.of(spec.getProject(), featureReference), featureSpec);
}
}
return ImmutablePair.of(featureTables, features);
}

private FeatureTableSpec retrieveSingleFeatureTable(String projectName, String tableName) {
FeatureTable table =
coreService
registryService
.getFeatureTable(
CoreServiceProto.GetFeatureTableRequest.newBuilder()
GetFeatureTableRequest.newBuilder()
.setProject(projectName)
.setName(tableName)
.build())
Expand All @@ -178,7 +162,7 @@ private FeatureTableSpec retrieveSingleFeatureTable(String projectName, String t
private FeatureProto.FeatureSpecV2 retrieveSingleFeature(
String projectName, ServingAPIProto.FeatureReferenceV2 featureReference) {
FeatureTableSpec featureTableSpec =
getFeatureTableSpec(projectName, featureReference); // don't stress core too much
getFeatureTableSpec(projectName, featureReference); // don't stress registry too much
if (featureTableSpec == null) {
return null;
}
Expand Down
68 changes: 0 additions & 68 deletions serving/src/main/java/feast/serving/specs/CoreSpecService.java

This file was deleted.

Loading