Skip to content

Commit

Permalink
[FSTORE-1147] Online feature store notification system (#1462)
Browse files Browse the repository at this point in the history
* [FSTORE-1147] Online feature store notification system (#1654)
  • Loading branch information
bubriks authored Jan 31, 2024
1 parent b7f439e commit aeb774b
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.api.client.util.Sets;
import io.hops.hopsworks.api.filter.AllowedProjectRoles;
import io.hops.hopsworks.api.filter.Audience;
import io.hops.hopsworks.api.filter.JWTNotRequired;
import io.hops.hopsworks.api.auth.key.ApiKeyRequired;
import io.hops.hopsworks.api.jwt.JWTHelper;
import io.hops.hopsworks.common.api.ResourceRequest;
Expand Down Expand Up @@ -342,6 +343,37 @@ public Response update(
.build();
}


// This endpoint is necassary for onlinefs notification system
// (can't use Provenance since onlinefs doesn’t have a user)
@GET
@Path("featuregroup/{featureGroupId: [0-9]+}")
@JWTNotRequired
@Produces(MediaType.APPLICATION_JSON)
@AllowedProjectRoles({AllowedProjectRoles.DATA_OWNER, AllowedProjectRoles.DATA_SCIENTIST})
@ApiKeyRequired(acceptedScopes = {ApiScope.KAFKA},
allowedUserRoles = {"HOPS_SERVICE_USER", "AGENT"})
@ApiOperation(value = "Get all Feature Views associated to Feature Group.", response = FeatureViewDTO.class)
public Response getAllFeatureViewsByFeatureGroup(
@ApiParam(value = "Id of the featuregroup", required = true)
@PathParam("featureGroupId") Integer featureGroupId,
@Context HttpServletRequest req) {
List<FeatureView> featureViews = featureViewController.getByFeatureGroup(featureGroupId);

// basic DTO object
FeatureViewDTO parentFeatureViewDTO = new FeatureViewDTO();
for (FeatureView featureView : featureViews) {
FeatureViewDTO featureViewDTO = new FeatureViewDTO();
featureViewDTO.setId(featureView.getId());
featureViewDTO.setName(featureView.getName());
featureViewDTO.setVersion(featureView.getVersion());
featureViewDTO.setFeaturestoreId(featureView.getFeaturestore().getId());
parentFeatureViewDTO.addItem(featureViewDTO);
}

return Response.ok().entity(parentFeatureViewDTO).build();
}

public void setProject(Project project) {
this.project = project;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,11 @@ public FeaturegroupDTO updateFeaturegroupMetadata(Project project, Users user, F
featuregroup.setDescription(featuregroupDTO.getDescription());
}

// update the notification topic name
if (featuregroupDTO.getNotificationTopicName() != null) {
featuregroup.setNotificationTopicName(featuregroupDTO.getNotificationTopicName());
}

featuregroup = featuregroupFacade.updateFeaturegroupMetadata(featuregroup);
searchCommandLogger.updateMetadata(featuregroup);
return convertFeaturegrouptoDTO(featuregroup, project, user);
Expand Down Expand Up @@ -786,6 +791,7 @@ private Featuregroup persistFeaturegroupMetadata(Featurestore featurestore, Proj
featuregroup.setEventTime(featuregroupDTO.getEventTime());
featuregroup.setOnlineEnabled(settings.isOnlineFeaturestore() && featuregroupDTO.getOnlineEnabled());
featuregroup.setTopicName(featuregroupDTO.getTopicName());
featuregroup.setNotificationTopicName(featuregroupDTO.getNotificationTopicName());

StatisticsConfig statisticsConfig = new StatisticsConfig(featuregroupDTO.getStatisticsConfig().getEnabled(),
featuregroupDTO.getStatisticsConfig().getCorrelations(), featuregroupDTO.getStatisticsConfig().getHistograms(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class FeaturegroupDTO extends FeaturestoreEntityDTO<FeaturegroupDTO> {
@JsonSetter(nulls = Nulls.SKIP)
private Boolean deprecated = false;
private String topicName;
private String notificationTopicName;
private EmbeddingDTO embeddingIndex;

public FeaturegroupDTO() {
Expand All @@ -77,6 +78,7 @@ public FeaturegroupDTO(Featuregroup featuregroup) {
this.eventTime = featuregroup.getEventTime();
this.deprecated = featuregroup.isDeprecated();
this.topicName = featuregroup.getTopicName();
this.notificationTopicName = featuregroup.getNotificationTopicName();
if (featuregroup.getEmbedding() != null) {
this.embeddingIndex = new EmbeddingDTO(featuregroup.getEmbedding());
}
Expand Down Expand Up @@ -134,6 +136,14 @@ public String getTopicName() {
public void setTopicName(String topicName) {
this.topicName = topicName;
}

public String getNotificationTopicName() {
return notificationTopicName;
}

public void setNotificationTopicName(String notificationTopicName) {
this.notificationTopicName = notificationTopicName;
}

public Boolean getDeprecated() {
return deprecated;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,10 @@ private String getPrefixCheckCollision(Set<String> prefixFeatureNames, String fe
}
}

public List<FeatureView> getByFeatureGroup(Integer featureGroupId) {
return featureViewFacade.findByFeatureGroup(featureGroupId);
}

// For testing
public void setFeaturegroupController(
FeaturegroupController featuregroupController) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ private void setFilterQuery(AbstractFacade.FilterBy filterBy, Query q) {
}
}

public List<FeatureView> findByFeatureGroup(Integer featureGroupId) {
return em.createNamedQuery("FeatureView.findByFeatureGroup", FeatureView.class)
.setParameter("featureGroupId", featureGroupId).getResultList();
}

@Override
protected EntityManager getEntityManager() {
return em;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ public class Featuregroup implements Serializable {
private boolean onlineEnabled;
@Column(name = "topic_name")
private String topicName;
@Column(name = "notification_topic_name")
private String notificationTopicName;
@Column(name = "deprecated")
private boolean deprecated;
@OneToOne(cascade = CascadeType.ALL, mappedBy = "featuregroup")
Expand Down Expand Up @@ -321,6 +323,14 @@ public void setTopicName(String topicName) {
this.topicName = topicName;
}

public String getNotificationTopicName() {
return notificationTopicName;
}

public void setNotificationTopicName(String notificationTopicName) {
this.notificationTopicName = notificationTopicName;
}

public boolean isDeprecated() {
return deprecated;
}
Expand Down Expand Up @@ -358,6 +368,7 @@ public boolean equals(Object o) {
if (!Objects.equals(eventTime, that.eventTime)) return false;
if (!Objects.equals(onlineEnabled, that.onlineEnabled)) return false;
if (!Objects.equals(topicName, that.topicName)) return false;
if (!Objects.equals(notificationTopicName, that.notificationTopicName)) return false;
if (!Objects.equals(deprecated, that.deprecated)) return false;
if (!Objects.equals(expectationSuite, that.expectationSuite)) return false;
if (!Objects.equals(embedding, that.embedding)) return false;
Expand All @@ -381,6 +392,7 @@ public int hashCode() {
result = 31 * result + (eventTime != null ? eventTime.hashCode() : 0);
result = 31 * result + (onlineEnabled ? 1: 0);
result = 31 * result + (topicName != null ? topicName.hashCode() : 0);
result = 31 * result + (notificationTopicName != null ? notificationTopicName.hashCode() : 0);
result = 31 * result + (deprecated ? 1: 0);
result = 31 * result + (expectationSuite != null ? expectationSuite.hashCode(): 0);
result = 31 * result + (embedding != null ? embedding.hashCode(): 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@
@NamedQuery(name = "FeatureView.findByFeaturestoreAndNameOrderedByDescVersion", query = "SELECT fv FROM " +
"FeatureView fv WHERE fv.featurestore = :featurestore AND fv.name = :name ORDER BY fv.version DESC"),
@NamedQuery(name = "FeatureView.countByFeaturestore", query = "SELECT count(fv.id) FROM FeatureView fv " +
"WHERE fv.featurestore = :featurestore")})
"WHERE fv.featurestore = :featurestore"),
@NamedQuery(name = "FeatureView.findByFeatureGroup", query = "SELECT DISTINCT fv FROM FeatureView fv " +
"JOIN fv.features tdf WHERE tdf.featureGroup.id = :featureGroupId")})
public class FeatureView implements Serializable {
private static final long serialVersionUID = 1L;
public static final String TABLE_NAME = "FeatureView";
Expand Down

0 comments on commit aeb774b

Please sign in to comment.