Skip to content

Commit

Permalink
Merge branch 'the-merge-python-hsfs' into the-merge-complete
Browse files Browse the repository at this point in the history
  • Loading branch information
aversey committed Jul 11, 2024
2 parents 6280332 + 1848cd8 commit 7ae1176
Show file tree
Hide file tree
Showing 44 changed files with 5,019 additions and 3,594 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,5 +118,9 @@ public static void main(String[] args) throws Exception {
if (op.equals("offline_fg_materialization") || op.equals("offline_fg_backfill")) {
SparkEngine.getInstance().streamToHudiTable(streamFeatureGroup, writeOptions);
}

LOGGER.info("Closing spark session...");
SparkEngine.getInstance().closeSparkSession();
System.exit(0);
}
}
3 changes: 3 additions & 0 deletions hsfs/utils/python/hsfs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,6 @@ def parse_isoformat_date(da: str) -> datetime:
import_fg(job_conf)
elif args.op == "run_feature_monitoring":
run_feature_monitoring(job_conf)

if spark is not None:
spark.stop()
Original file line number Diff line number Diff line change
Expand Up @@ -1097,5 +1097,10 @@ protected String makeQueryName(String queryName, FeatureGroupBase featureGroup)
}
return queryName;
}


public void closeSparkSession() {
if (getSparkSession() != null) {
getSparkSession().stop();
}
}
}
66 changes: 66 additions & 0 deletions python/hsfs/builtin_transformations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#
# Copyright 2024 Hopsworks AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import numpy as np
import pandas as pd
from hsfs.hopsworks_udf import udf
from hsfs.transformation_statistics import TransformationStatistics


feature_statistics = TransformationStatistics("feature")


@udf(float)
def min_max_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
return (feature - statistics.feature.min) / (
statistics.feature.max - statistics.feature.min
)


@udf(float)
def standard_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
return (feature - statistics.feature.mean) / statistics.feature.stddev


@udf(float)
def robust_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
return (feature - statistics.feature.percentiles[49]) / (
statistics.feature.percentiles[74] - statistics.feature.percentiles[24]
)


@udf(int)
def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
unique_data = sorted(
[value for value in statistics.feature.extended_statistics["unique_values"]]
)
value_to_index = {value: index for index, value in enumerate(unique_data)}
return pd.Series(
[value_to_index[data] if not pd.isna(data) else np.nan for data in feature]
)


@udf(bool)
def one_hot_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
unique_data = [
value for value in statistics.feature.extended_statistics["unique_values"]
]
one_hot = pd.get_dummies(feature, dtype="bool")
for data in unique_data:
if data not in one_hot:
one_hot[data] = False
# Sorting by columns so as to maintain consistency in column order.
return one_hot.reindex(sorted(one_hot.columns), axis=1)
2 changes: 1 addition & 1 deletion python/hsfs/constructor/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(
self._on = util.parse_features(on)
self._left_on = util.parse_features(left_on)
self._right_on = util.parse_features(right_on)
self._join_type = join_type or self.INNER
self._join_type = join_type or self.LEFT
self._prefix = prefix

def to_dict(self) -> Dict[str, Any]:
Expand Down
18 changes: 15 additions & 3 deletions python/hsfs/constructor/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(
fg_mod.ExternalFeatureGroup,
fg_mod.SpineGroup,
],
left_features: List[Union[str, "Feature"]],
left_features: List[Union[str, "Feature", Dict]],
feature_store_name: Optional[str] = None,
feature_store_id: Optional[int] = None,
left_feature_group_start_time: Optional[Union[str, int, date, datetime]] = None,
Expand Down Expand Up @@ -239,7 +239,7 @@ def join(
on: Optional[List[str]] = None,
left_on: Optional[List[str]] = None,
right_on: Optional[List[str]] = None,
join_type: Optional[str] = "inner",
join_type: Optional[str] = "left",
prefix: Optional[str] = None,
) -> "Query":
"""Join Query with another Query.
Expand Down Expand Up @@ -769,7 +769,7 @@ def featuregroups(
"""List of feature groups used in the query"""
featuregroups = {self._left_feature_group}
for join_obj in self.joins:
featuregroups.add(join_obj.query._left_feature_group)
self._fg_rec_add(join_obj, featuregroups)
return list(featuregroups)

@property
Expand Down Expand Up @@ -809,6 +809,18 @@ def get_feature(self, feature_name: str) -> "Feature":
"""
return self._get_feature_by_name(feature_name)[0]

def _fg_rec_add(self, join_object, featuregroups):
"""
Recursively get a feature groups from nested join and add to featuregroups list.
# Arguments
join_object: `Join object`.
"""
if len(join_object.query.joins) > 0:
for nested_join in join_object.query.joins:
self._fg_rec_add(nested_join, featuregroups)
featuregroups.add(join_object.query._left_feature_group)

def __getattr__(self, name: str) -> Any:
try:
return self.__getitem__(name)
Expand Down
13 changes: 13 additions & 0 deletions python/hsfs/core/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
import importlib.util


# Avro
HAS_FAST_AVRO: bool = importlib.util.find_spec("fastavro") is not None
HAS_AVRO: bool = importlib.util.find_spec("avro") is not None

# Confluent Kafka
HAS_CONFLUENT_KAFKA: bool = importlib.util.find_spec("confluent_kafka") is not None
confluent_kafka_not_installed_message = (
"Confluent Kafka package not found. "
"If you want to use Kafka with Hopsworks you can install the corresponding extras "
"""`pip install hopsworks[python]` or `pip install "hopsworks[python]"` if using zsh. """
"You can also install confluent-kafka directly in your environment e.g `pip install confluent-kafka`. "
"You will need to restart your kernel if applicable."
)
# Data Validation / Great Expectations
HAS_GREAT_EXPECTATIONS: bool = (
importlib.util.find_spec("great_expectations") is not None
Expand Down
65 changes: 51 additions & 14 deletions python/hsfs/core/feature_view_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,7 @@

from typing import List, Optional, Union

from hsfs import (
client,
feature_view,
training_dataset,
transformation_function_attached,
)
from hsfs import client, feature_view, training_dataset, transformation_function
from hsfs.client.exceptions import RestAPIError
from hsfs.constructor import query, serving_prepared_statement
from hsfs.core import explicit_provenance, job, training_dataset_job_conf
Expand Down Expand Up @@ -86,13 +81,28 @@ def update(self, feature_view_obj: feature_view.FeatureView) -> None:
data=feature_view_obj.json(),
)

def get_by_name(self, name: str) -> feature_view.FeatureView:
def get_by_name(self, name: str) -> List[feature_view.FeatureView]:
"""
Get a feature view from the backend using its name.
# Arguments
name `str`: Name of the feature view.
# Returns
`List[FeatureView]`: A list that contains all version of the feature view.
# Raises
`RestAPIError`: If the feature view cannot be found from the backend.
`ValueError`: If the feature group associated with the feature view cannot be found.
"""
path = self._base_path + [name]
try:
return [
feature_view.FeatureView.from_response_json(fv)
for fv in self._client._send_request(
self._GET, path, {"expand": ["query", "features"]}
self._GET,
path,
{"expand": ["query", "features", "transformationfunctions"]},
)["items"]
]
except RestAPIError as e:
Expand All @@ -106,11 +116,27 @@ def get_by_name(self, name: str) -> feature_view.FeatureView:
raise e

def get_by_name_version(self, name: str, version: int) -> feature_view.FeatureView:
"""
Get a feature view form the backend using both name and version
# Arguments
name `str`: Name of feature view.
version `version`: Version of the feature view.
# Returns
`FeatureView`
# Raises
`RestAPIError`: If the feature view cannot be found from the backend.
`ValueError`: If the feature group associated with the feature view cannot be found.
"""
path = self._base_path + [name, self._VERSION, version]
try:
return feature_view.FeatureView.from_response_json(
self._client._send_request(
self._GET, path, {"expand": ["query", "features"]}
self._GET,
path,
{"expand": ["query", "features", "transformationfunctions"]},
)
)
except RestAPIError as e:
Expand Down Expand Up @@ -190,12 +216,23 @@ def get_serving_prepared_statement(

def get_attached_transformation_fn(
self, name: str, version: int
) -> Union[
"transformation_function_attached.TransformationFunctionAttached",
List["transformation_function_attached.TransformationFunctionAttached"],
]:
) -> List["transformation_function.TransformationFunction"]:
"""
Get transformation functions attached to a feature view form the backend
# Arguments
name `str`: Name of feature view.
version `ìnt`: Version of feature view.
# Returns
`List[TransformationFunction]` : List of transformation functions attached to the feature view.
# Raises
`RestAPIError`: If the feature view cannot be found from the backend.
`ValueError`: If the feature group associated with the feature view cannot be found.
"""
path = self._base_path + [name, self._VERSION, version, self._TRANSFORMATION]
return transformation_function_attached.TransformationFunctionAttached.from_response_json(
return transformation_function.TransformationFunction.from_response_json(
self._client._send_request("GET", path)
)

Expand Down
Loading

0 comments on commit 7ae1176

Please sign in to comment.