Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1632] Change limit location for delta ingestion #430

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
@Override
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled,
StatisticsConfig statisticsConfig, String eventTime, OnlineConfig onlineConfig)
TimeTravelFormat timeTravelFormat, StatisticsConfig statisticsConfig, String eventTime, OnlineConfig onlineConfig)
throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Beam");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@

package com.logicalclocks.hsfs.beam;

import java.io.IOException;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.beam.sdk.values.PCollection;

import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureStoreException;
Expand All @@ -26,19 +34,14 @@
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.beam.engine.BeamProducer;
import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine;
import com.logicalclocks.hsfs.constructor.QueryBase;
import com.logicalclocks.hsfs.metadata.Statistics;

import lombok.Builder;
import lombok.NonNull;
import org.apache.beam.sdk.values.PCollection;

import java.io.IOException;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class StreamFeatureGroup extends FeatureGroupBase<PCollection<Object>> {

Expand All @@ -48,8 +51,9 @@ public class StreamFeatureGroup extends FeatureGroupBase<PCollection<Object>> {
@Builder
public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName,
String eventTime, OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List<Feature> features,
StatisticsConfig statisticsConfig, String onlineTopicName, String eventTime,
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
this();
this.featureStore = featureStore;
this.name = name;
Expand All @@ -61,6 +65,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
? partitionKeys.stream().map(String::toLowerCase).collect(Collectors.toList()) : null;
this.hudiPrecombineKey = hudiPrecombineKey != null ? hudiPrecombineKey.toLowerCase() : null;
this.onlineEnabled = onlineEnabled;
this.timeTravelFormat = timeTravelFormat != null ? timeTravelFormat : TimeTravelFormat.HUDI;
this.features = features;
this.statisticsConfig = statisticsConfig != null ? statisticsConfig : new StatisticsConfig();
this.onlineTopicName = onlineTopicName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys,
String hudiPrecombineKey, boolean onlineEnabled,
StatisticsConfig statisticsConfig, String eventTime,
OnlineConfig onlineConfig)
TimeTravelFormat timeTravelFormat,
StatisticsConfig statisticsConfig,
String eventTime, OnlineConfig onlineConfig)
throws IOException, FeatureStoreException {
throw new UnsupportedOperationException("Not supported for Flink");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@

package com.logicalclocks.hsfs.flink;

import java.io.IOException;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureGroupBase;
Expand All @@ -27,22 +36,14 @@
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.constructor.QueryBase;

import com.logicalclocks.hsfs.flink.engine.FeatureGroupEngine;
import com.logicalclocks.hsfs.metadata.Statistics;

import com.logicalclocks.hsfs.flink.engine.FeatureGroupEngine;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NonNull;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;

import java.io.IOException;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
Expand All @@ -53,9 +54,9 @@ public class StreamFeatureGroup extends FeatureGroupBase<DataStream<?>> {
@Builder
public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List<Feature> features,
StatisticsConfig statisticsConfig, String onlineTopicName, String topicName, String notificationTopicName,
String eventTime, OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
this();
this.featureStore = featureStore;
this.name = name;
Expand All @@ -67,6 +68,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
? partitionKeys.stream().map(String::toLowerCase).collect(Collectors.toList()) : null;
this.hudiPrecombineKey = hudiPrecombineKey != null ? hudiPrecombineKey.toLowerCase() : null;
this.onlineEnabled = onlineEnabled;
this.timeTravelFormat = timeTravelFormat != null ? timeTravelFormat : TimeTravelFormat.HUDI;
this.features = features;
this.statisticsConfig = statisticsConfig != null ? statisticsConfig : new StatisticsConfig();
this.onlineTopicName = onlineTopicName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ public abstract Object getOrCreateStreamFeatureGroup(String name, Integer versio
public abstract Object getOrCreateStreamFeatureGroup(String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys,
String hudiPrecombineKey, boolean onlineEnabled,
StatisticsConfig statisticsConfig, String eventTime,
OnlineConfig onlineConfig)
TimeTravelFormat timeTravelFormat, StatisticsConfig statisticsConfig,
String eventTime, OnlineConfig onlineConfig)
throws IOException, FeatureStoreException;

public abstract Object createExternalFeatureGroup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@

public enum TimeTravelFormat {
NONE,
HUDI
HUDI,
DELTA
}
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ public StreamFeatureGroup.StreamFeatureGroupBuilder createStreamFeatureGroup() {
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version)
throws IOException, FeatureStoreException {
return featureGroupEngine.getOrCreateStreamFeatureGroup(this, name, version, null,
null, null, null, false, null, null, null);
null, null, null, false, TimeTravelFormat.HUDI, null, null, null);
}

/**
Expand Down Expand Up @@ -438,7 +438,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
boolean onlineEnabled, String eventTime)
throws IOException, FeatureStoreException {
return featureGroupEngine.getOrCreateStreamFeatureGroup(this, name, version, null,
primaryKeys, null, null, onlineEnabled, null, eventTime, null);
primaryKeys, null, null, onlineEnabled, TimeTravelFormat.HUDI, null, eventTime, null);
}

/**
Expand Down Expand Up @@ -477,7 +477,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver


return featureGroupEngine.getOrCreateStreamFeatureGroup(this, name, version, null,
primaryKeys, partitionKeys, null, onlineEnabled, null, eventTime, null);
primaryKeys, partitionKeys, null, onlineEnabled, TimeTravelFormat.HUDI, null, eventTime, null);
}

/**
Expand Down Expand Up @@ -506,6 +506,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
* the first primary key of the feature group will be used as hudi precombine key.
* @param onlineEnabled Define whether the feature group should be made available also in the online feature store
* for low latency access.
* @param timeTravelFormat Format used for time travel, defaults to `"HUDI"`.
* @param statisticsConfig A configuration object, to generally enable descriptive statistics computation for
* this feature group, `"correlations`" to turn on feature correlation computation,
* `"histograms"` to compute feature value frequencies and `"exact_uniqueness"` to compute
Expand All @@ -523,13 +524,14 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys,
String hudiPrecombineKey, boolean onlineEnabled,
StatisticsConfig statisticsConfig, String eventTime,
OnlineConfig onlineConfig)
TimeTravelFormat timeTravelFormat,
StatisticsConfig statisticsConfig,
String eventTime, OnlineConfig onlineConfig)
throws IOException, FeatureStoreException {

return featureGroupEngine.getOrCreateStreamFeatureGroup(this, name, version, description,
primaryKeys, partitionKeys, hudiPrecombineKey, onlineEnabled, statisticsConfig, eventTime,
onlineConfig);
primaryKeys, partitionKeys, hudiPrecombineKey, onlineEnabled, timeTravelFormat,
statisticsConfig, eventTime, onlineConfig);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,40 @@

package com.logicalclocks.hsfs.spark;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.logicalclocks.hsfs.spark.constructor.Query;
import com.logicalclocks.hsfs.spark.engine.FeatureGroupEngine;
import com.logicalclocks.hsfs.spark.engine.StatisticsEngine;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.streaming.StreamingQuery;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.logicalclocks.hsfs.EntityEndpointType;
import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HudiOperationType;
import com.logicalclocks.hsfs.JobConfiguration;
import com.logicalclocks.hsfs.OnlineConfig;
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.metadata.Statistics;
import com.logicalclocks.hsfs.spark.constructor.Query;
import com.logicalclocks.hsfs.spark.engine.FeatureGroupEngine;
import com.logicalclocks.hsfs.spark.engine.StatisticsEngine;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NonNull;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.streaming.StreamingQuery;

import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class StreamFeatureGroup extends FeatureGroupBase<Dataset<Row>> {
Expand All @@ -61,9 +61,10 @@ public class StreamFeatureGroup extends FeatureGroupBase<Dataset<Row>> {
@Builder
public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List<Feature> features,
StatisticsConfig statisticsConfig, String onlineTopicName, String topicName,
String notificationTopicName, String eventTime, OnlineConfig onlineConfig,
StorageConnector storageConnector, String path) {
this();
this.featureStore = featureStore;
this.name = name;
Expand All @@ -75,6 +76,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
? partitionKeys.stream().map(String::toLowerCase).collect(Collectors.toList()) : null;
this.hudiPrecombineKey = hudiPrecombineKey != null ? hudiPrecombineKey.toLowerCase() : null;
this.onlineEnabled = onlineEnabled;
this.timeTravelFormat = timeTravelFormat != null ? timeTravelFormat : TimeTravelFormat.HUDI;
this.features = features;
this.statisticsConfig = statisticsConfig != null ? statisticsConfig : new StatisticsConfig();
this.onlineTopicName = onlineTopicName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,8 @@ public List<FeatureGroup> getFeatureGroups(FeatureStore featureStore, String fgN
public StreamFeatureGroup getOrCreateStreamFeatureGroup(FeatureStore featureStore, String name, Integer version,
String description, List<String> primaryKeys,
List<String> partitionKeys, String hudiPrecombineKey,
boolean onlineEnabled, StatisticsConfig statisticsConfig,
boolean onlineEnabled, TimeTravelFormat timeTravelFormat,
StatisticsConfig statisticsConfig,
String eventTime, OnlineConfig onlineConfig)
throws IOException, FeatureStoreException {
StreamFeatureGroup featureGroup;
Expand All @@ -381,6 +382,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(FeatureStore featureStor
.partitionKeys(partitionKeys)
.hudiPrecombineKey(hudiPrecombineKey)
.onlineEnabled(onlineEnabled)
.timeTravelFormat(timeTravelFormat)
.statisticsConfig(statisticsConfig)
.eventTime(eventTime)
.onlineConfig(onlineConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.Project;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.metadata.FeatureGroupApi;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
Expand Down Expand Up @@ -67,7 +68,7 @@ public void testFeatureGroupPrimaryKey() {

StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
Collections.singletonList("primaryKey"), Collections.singletonList("partitionKey"), "hudiPrecombineKey",
true, features, null, "onlineTopicName", null, null, null, null, null, null);
true, TimeTravelFormat.HUDI, features, null, "onlineTopicName", null, null, null, null, null, null);

Exception pkException = assertThrows(FeatureStoreException.class, () -> {
featureGroupEngine.saveFeatureGroupMetaData(featureGroup,
Expand All @@ -93,7 +94,7 @@ public void testFeatureGroupEventTimeFeature() {

StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
Collections.singletonList("featureA"), null, null,
true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
true, TimeTravelFormat.HUDI, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);

Exception eventTimeException = assertThrows(FeatureStoreException.class, () -> {
streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup,
Expand All @@ -119,7 +120,7 @@ public void testFeatureGroupPartitionPrecombineKeys() {

StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
Collections.singletonList("featureA"), Collections.singletonList("partitionKey"), "hudiPrecombineKey",
true, features, null, "onlineTopicName", null, null, null, null, null, null);
true, TimeTravelFormat.HUDI, features, null, "onlineTopicName", null, null, null, null, null, null);

Exception partitionException = assertThrows(FeatureStoreException.class, () -> {
streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup,
Expand Down Expand Up @@ -164,7 +165,7 @@ public void testFeatureGroupAppendFeaturesResetSubject() throws FeatureStoreExce

StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
Collections.singletonList("featureA"), null, null,
true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
true, TimeTravelFormat.HUDI, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
featureGroup.featureGroupEngine = featureGroupEngine;

// Act
Expand Down
Loading
Loading