Skip to content

Commit

Permalink
Merge branch 'main' into dedup-hsml
Browse files Browse the repository at this point in the history
  • Loading branch information
aversey authored Oct 22, 2024
2 parents 7479664 + 44cedc0 commit d4a0dde
Show file tree
Hide file tree
Showing 40 changed files with 3,675 additions and 426 deletions.
32 changes: 28 additions & 4 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,14 @@ jobs:
python-version: ["3.8", "3.9", "3.10"]

steps:
- name: Set up JDK 8
uses: actions/setup-java@v3
with:
java-version: "8"
distribution: "adopt"

- name: Set Timezone
run: sudo timedatectl set-timezone UTC
run: sudo timedatectl set-timezone UTC && echo UTC | sudo tee /etc/timezone

- uses: actions/checkout@v4
- name: Copy README
Expand All @@ -89,8 +95,14 @@ jobs:
runs-on: ubuntu-latest

steps:
- name: Set up JDK 8
uses: actions/setup-java@v3
with:
java-version: "8"
distribution: "adopt"

- name: Set Timezone
run: sudo timedatectl set-timezone UTC
run: sudo timedatectl set-timezone UTC && echo UTC | sudo tee /etc/timezone

- uses: actions/checkout@v4
- name: Copy README
Expand All @@ -113,8 +125,14 @@ jobs:
runs-on: ubuntu-latest

steps:
- name: Set up JDK 8
uses: actions/setup-java@v3
with:
java-version: "8"
distribution: "adopt"

- name: Set Timezone
run: sudo timedatectl set-timezone UTC
run: sudo timedatectl set-timezone UTC && echo UTC | sudo tee /etc/timezone

- uses: actions/checkout@v4
- name: Copy README
Expand All @@ -140,8 +158,14 @@ jobs:
runs-on: ubuntu-latest

steps:
- name: Set up JDK 8
uses: actions/setup-java@v3
with:
java-version: "8"
distribution: "adopt"

- name: Set Timezone
run: sudo timedatectl set-timezone Europe/Amsterdam
run: sudo timedatectl set-timezone Europe/Amsterdam && echo Europe/Amsterdam | sudo tee /etc/timezone

- uses: actions/checkout@v4
- name: Copy README
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.logicalclocks.hsfs.OnlineConfig;
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine;
import com.logicalclocks.hsfs.beam.engine.BeamProducer;
import com.logicalclocks.hsfs.constructor.QueryBase;
Expand All @@ -48,7 +49,7 @@ public class StreamFeatureGroup extends FeatureGroupBase<PCollection<Object>> {
public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName,
String eventTime, OnlineConfig onlineConfig) {
String eventTime, OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
this();
this.featureStore = featureStore;
this.name = name;
Expand All @@ -65,6 +66,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
this.onlineTopicName = onlineTopicName;
this.eventTime = eventTime;
this.onlineConfig = onlineConfig;
this.storageConnector = storageConnector;
this.path = path;
}

public StreamFeatureGroup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.logicalclocks.hsfs.OnlineConfig;
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.constructor.QueryBase;

import com.logicalclocks.hsfs.metadata.Statistics;
Expand Down Expand Up @@ -54,7 +55,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
OnlineConfig onlineConfig) {
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
this();
this.featureStore = featureStore;
this.name = name;
Expand All @@ -73,6 +74,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
this.notificationTopicName = notificationTopicName;
this.eventTime = eventTime;
this.onlineConfig = onlineConfig;
this.storageConnector = storageConnector;
this.path = path;
}

public StreamFeatureGroup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ public abstract class FeatureGroupBase<T> {
@Setter
protected OnlineConfig onlineConfig;

@Getter
@Setter
protected StorageConnector storageConnector;

@Getter
@Setter
protected String path;

@JsonIgnore
// These are only used in the client. In the server they are aggregated in the `features` field
protected List<String> partitionKeys;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ public static class S3Connector extends StorageConnector {
@Getter @Setter
protected String bucket;

@Getter @Setter
protected String region;

@Getter @Setter
protected String sessionToken;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public static Long getTimeStampFromDateString(String inputDate) throws FeatureSt
}

SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatPattern);
Long commitTimeStamp = dateFormat.parse(tempDate).getTime();;
Long commitTimeStamp = dateFormat.parse(tempDate).getTime();

return commitTimeStamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import org.apache.http.HttpRequest;
import org.apache.http.client.ResponseHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;

public interface HopsworksHttpClient {
static final Logger LOGGER = null;
Logger LOGGER = LoggerFactory.getLogger(HopsworksHttpClient.class);

<T> T handleRequest(HttpRequest request, ResponseHandler<T> responseHandler)
throws IOException, FeatureStoreException;
Expand All @@ -46,7 +48,7 @@ <T> T handleRequest(HttpRequest request, ResponseHandler<T> responseHandler)

static String readCertKey(String materialPwd) {
try {
return FileUtils.readFileToString(new File(materialPwd));
return FileUtils.readFileToString(new File(materialPwd), Charset.defaultCharset());
} catch (IOException ex) {
LOGGER.warn("Failed to get cert password.", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,12 @@ public void testStringEntitySerialization() throws IOException {
Assertions.assertEquals("{\"email\":\"[email protected]\",\"firstName\":\"test\",\"lastName\":\"de la Rúa Martínez\"}",
json);
}

// FSTORE-1562: readCertKey throws NullPointerException if
@Test
public void testReadCertKey_failure() {
String key = HopsworksHttpClient.readCertKey("/this/path/does/not/exists");
Assertions.assertNull(key);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@
@JsonIgnoreProperties(ignoreUnknown = true)
public class ExternalFeatureGroup extends FeatureGroupBase<Dataset<Row>> {

@Getter
@Setter
private StorageConnector storageConnector;

@Getter
@Setter
private String query;
Expand All @@ -69,10 +65,6 @@ public class ExternalFeatureGroup extends FeatureGroupBase<Dataset<Row>> {
@Setter
private ExternalDataFormat dataFormat;

@Getter
@Setter
private String path;

@Getter
@Setter
private List<OnDemandOptions> options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.logicalclocks.hsfs.OnlineConfig;
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.metadata.Statistics;
Expand Down Expand Up @@ -64,7 +65,8 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver
String description, List<String> primaryKeys, List<String> partitionKeys,
String hudiPrecombineKey, boolean onlineEnabled, TimeTravelFormat timeTravelFormat,
List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName,
String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig) {
String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig,
StorageConnector storageConnector, String path) {
this();
this.featureStore = featureStore;
this.name = name;
Expand All @@ -85,6 +87,8 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver
this.notificationTopicName = notificationTopicName;
this.eventTime = eventTime;
this.onlineConfig = onlineConfig;
this.storageConnector = storageConnector;
this.path = path;
}

public FeatureGroup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.logicalclocks.hsfs.OnlineConfig;
import com.logicalclocks.hsfs.StatisticsConfig;
import com.logicalclocks.hsfs.Storage;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.metadata.Statistics;

Expand Down Expand Up @@ -62,7 +63,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
OnlineConfig onlineConfig) {
OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
this();
this.featureStore = featureStore;
this.name = name;
Expand All @@ -81,6 +82,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
this.notificationTopicName = notificationTopicName;
this.eventTime = eventTime;
this.onlineConfig = onlineConfig;
this.storageConnector = storageConnector;
this.path = path;
}

public StreamFeatureGroup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,6 @@ public Dataset<Row> registerOnDemandTemporaryTable(ExternalFeatureGroup onDemand
? onDemandFeatureGroup.getDataFormat().toString() : null, getOnDemandOptions(onDemandFeatureGroup),
onDemandFeatureGroup.getStorageConnector().getPath(onDemandFeatureGroup.getPath()));

if (!Strings.isNullOrEmpty(onDemandFeatureGroup.getLocation())) {
sparkSession.sparkContext().textFile(onDemandFeatureGroup.getLocation(), 0).collect();
}

dataset.createOrReplaceTempView(alias);
return dataset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ public void testFeatureGroupPrimaryKey() {

StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
Collections.singletonList("primaryKey"), Collections.singletonList("partitionKey"), "hudiPrecombineKey",
true, features, null, "onlineTopicName", null, null, null, null);
true, features, null, "onlineTopicName", null, null, null, null, null, null);

Exception pkException = assertThrows(FeatureStoreException.class, () -> {
featureGroupEngine.saveFeatureGroupMetaData(featureGroup,
null, null, null, null, null);;;
null, null, null, null, null);
});

// Assert
Expand All @@ -93,11 +93,11 @@ public void testFeatureGroupEventTimeFeature() {

StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
Collections.singletonList("featureA"), null, null,
true, features, null, "onlineTopicName", null, null, "eventTime", null);
true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);

Exception eventTimeException = assertThrows(FeatureStoreException.class, () -> {
streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup,
null, null, null, null, null);;;
null, null, null, null, null);
});

// Assert
Expand All @@ -119,7 +119,7 @@ public void testFeatureGroupPartitionPrecombineKeys() {

StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
Collections.singletonList("featureA"), Collections.singletonList("partitionKey"), "hudiPrecombineKey",
true, features, null, "onlineTopicName", null, null, null, null);
true, features, null, "onlineTopicName", null, null, null, null, null, null);

Exception partitionException = assertThrows(FeatureStoreException.class, () -> {
streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup,
Expand Down Expand Up @@ -164,7 +164,7 @@ public void testFeatureGroupAppendFeaturesResetSubject() throws FeatureStoreExce

StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
Collections.singletonList("featureA"), null, null,
true, features, null, "onlineTopicName", null, null, "eventTime", null);
true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
featureGroup.featureGroupEngine = featureGroupEngine;

// Act
Expand Down
11 changes: 9 additions & 2 deletions python/hopsworks_common/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import re
import sys
import warnings
import weakref
from typing import Any, Optional

from hopsworks_common import client, usage, util, version
Expand Down Expand Up @@ -351,6 +352,7 @@ def connect(self) -> None:
"""
client.stop()
self._connected = True
finalizer = weakref.finalize(self, self.close)
try:
# determine engine, needed to init client
if (self._engine is not None and self._engine.lower() == "spark") or (
Expand Down Expand Up @@ -413,6 +415,7 @@ def connect(self) -> None:
self._provide_project()
except (TypeError, ConnectionError):
self._connected = False
finalizer.detach()
raise

self._check_compatibility()
Expand Down Expand Up @@ -446,7 +449,7 @@ def close(self) -> None:
This will clean up any materialized certificates on the local file system of
external environments such as AWS SageMaker.
Usage is recommended but optional.
Usage is optional.
!!! example
```python
Expand All @@ -455,9 +458,13 @@ def close(self) -> None:
conn.close()
```
"""
if not self._connected:
return # the connection is already closed

from hsfs import engine

OpenSearchClientSingleton().close()
if OpenSearchClientSingleton._instance:
OpenSearchClientSingleton().close()
client.stop()
engine.stop()
self._feature_store_api = None
Expand Down
11 changes: 10 additions & 1 deletion python/hsfs/constructor/fs_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(
expand: Optional[List[str]] = None,
items: Optional[List[Dict[str, Any]]] = None,
type: Optional[str] = None,
delta_cached_feature_groups: Optional[List[Dict[str, Any]]] = None,
**kwargs,
) -> None:
self._query = query
Expand All @@ -60,6 +61,14 @@ def __init__(
else:
self._hudi_cached_feature_groups = []

if delta_cached_feature_groups is not None:
self._delta_cached_feature_groups = [
hudi_feature_group_alias.HudiFeatureGroupAlias.from_response_json(fg)
for fg in delta_cached_feature_groups
]
else:
self._delta_cached_feature_groups = []

@classmethod
def from_response_json(cls, json_dict: Dict[str, Any]) -> "FsQuery":
json_decamelized = humps.decamelize(json_dict)
Expand Down Expand Up @@ -127,7 +136,7 @@ def register_delta_tables(
feature_store_name: str,
read_options: Optional[Dict[str, Any]],
) -> None:
for hudi_fg in self._hudi_cached_feature_groups:
for hudi_fg in self._delta_cached_feature_groups:
engine.get_instance().register_delta_temporary_table(
hudi_fg, feature_store_id, feature_store_name, read_options
)
Loading

0 comments on commit d4a0dde

Please sign in to comment.