Skip to content

Commit

Permalink
[FSTORE-1574] Transformation functions bug fixes (#376)
Browse files Browse the repository at this point in the history
* peforming data validation after on-demand transformations are applied

* correcting error message for feature group transformations

* removing validations for on-deamnd features in Vector server as they are computed while retriving the featur evector

* adding tests for error messages

* adding warnings for transformation functions in external feature group and updating logger message

* correcting logging debug message
  • Loading branch information
manu-sj committed Nov 28, 2024
1 parent 432c71f commit 706efc4
Show file tree
Hide file tree
Showing 7 changed files with 320 additions and 268 deletions.
43 changes: 38 additions & 5 deletions python/hsfs/core/feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from __future__ import annotations

import warnings
from typing import List
from typing import List, Union

from hsfs import engine, feature, util
from hsfs import feature_group as fg
Expand Down Expand Up @@ -67,7 +67,7 @@ def _update_feature_group_schema_on_demand_transformations(

def save(
self,
feature_group,
feature_group: Union[fg.FeatureGroup, fg.ExternalFeatureGroup],
feature_dataframe,
write_options,
validation_options: dict = None,
Expand All @@ -80,6 +80,21 @@ def save(
feature_group=feature_group, features=dataframe_features
)
)

# Currently on-demand transformation functions not supported in external feature groups.
if feature_group.transformation_functions:
if not isinstance(feature_group, fg.ExternalFeatureGroup):
feature_dataframe = (
engine.get_instance()._apply_transformation_function(
feature_group.transformation_functions, feature_dataframe
)
)
else:
warnings.warn(
"On-Demand features were not created because On-Demand Transformations are not supported for External Feature Groups.",
stacklevel=1,
)

util.validate_embedding_feature_type(
feature_group.embedding_index, dataframe_features
)
Expand Down Expand Up @@ -119,7 +134,7 @@ def save(

def insert(
self,
feature_group,
feature_group: Union[fg.FeatureGroup, fg.ExternalFeatureGroup],
feature_dataframe,
overwrite,
operation,
Expand All @@ -132,6 +147,16 @@ def insert(
feature_group.time_travel_format,
features=feature_group.features,
)

# Currently on-demand transformation functions not supported in external feature groups.
if (
not isinstance(feature_group, fg.ExternalFeatureGroup)
and feature_group.transformation_functions
):
feature_dataframe = engine.get_instance()._apply_transformation_function(
feature_group.transformation_functions, feature_dataframe
)

dataframe_features = (
self._update_feature_group_schema_on_demand_transformations(
feature_group=feature_group, features=dataframe_features
Expand Down Expand Up @@ -299,7 +324,9 @@ def append_features(self, feature_group, new_features):
if feature_group.time_travel_format == "DELTA":
engine.get_instance().add_cols_to_delta_table(feature_group, new_features)
else:
engine.get_instance().save_empty_dataframe(feature_group, new_features=new_features)
engine.get_instance().save_empty_dataframe(
feature_group, new_features=new_features
)

def update_description(self, feature_group, description):
"""Updates the description of a feature group."""
Expand All @@ -326,7 +353,7 @@ def update_deprecated(self, feature_group, deprecate):

def insert_stream(
self,
feature_group,
feature_group: Union[fg.FeatureGroup, fg.ExternalFeatureGroup],
dataframe,
query_name,
output_mode,
Expand All @@ -349,6 +376,12 @@ def insert_stream(
feature_group=feature_group, features=dataframe_features
)
)

if feature_group.transformation_functions:
dataframe = engine.get_instance()._apply_transformation_function(
feature_group.transformation_functions, dataframe
)

util.validate_embedding_feature_type(
feature_group.embedding_index, dataframe_features
)
Expand Down
9 changes: 9 additions & 0 deletions python/hsfs/core/vector_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,15 @@ def identify_missing_features_pre_fetch(
passed_feature_names = passed_feature_names.union(
vector_db_features.keys()
)
if self._on_demand_feature_names and len(self._on_demand_feature_names) > 0:
# Remove on-demand features from validation check as they would be computed.
_logger.debug(
"Appending on_demand_feature_names : %s, to passed_feature_names for pre-fetch missing",
self._on_demand_feature_names,
)
passed_feature_names = passed_feature_names.union(
self._on_demand_feature_names
)
neither_fetched_nor_passed = fetched_features.difference(
passed_feature_names
)
Expand Down
25 changes: 13 additions & 12 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,15 +808,6 @@ def save_dataframe(
online_write_options: Dict[str, Any],
validation_id: Optional[int] = None,
) -> Optional[job.Job]:
# Currently on-demand transformation functions not supported in external feature groups.
if (
not isinstance(feature_group, ExternalFeatureGroup)
and feature_group.transformation_functions
):
dataframe = self._apply_transformation_function(
feature_group.transformation_functions, dataframe
)

if (
hasattr(feature_group, "EXTERNAL_FEATURE_GROUP")
and feature_group.online_enabled
Expand Down Expand Up @@ -1298,9 +1289,19 @@ def _apply_transformation_function(
dataset.columns
)
if missing_features:
raise FeatureStoreException(
f"Features {missing_features} specified in the transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please specify the feature required correctly."
)
if (
tf.transformation_type
== transformation_function.TransformationType.ON_DEMAND
):
# On-demand transformation are applied using the python/spark engine during insertion, the transformation while retrieving feature vectors are performed in the vector_server.
raise FeatureStoreException(
f"The following feature(s): `{'`, '.join(missing_features)}`, specified in the on-demand transformation function '{hopsworks_udf.function_name}' are not present in the dataframe being inserted into the feature group. "
+ "Please verify that the correct feature names are used in the transformation function and that these features exist in the dataframe being inserted."
)
else:
raise FeatureStoreException(
f"The following feature(s): `{'`, '.join(missing_features)}`, specified in the model-dependent transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please verify that the correct features are specified in the transformation function."
)
if tf.hopsworks_udf.dropped_features:
dropped_features.update(tf.hopsworks_udf.dropped_features)

Expand Down
60 changes: 32 additions & 28 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,14 +415,6 @@ def save_dataframe(
validation_id=None,
):
try:
# Currently on-demand transformation functions not supported in external feature groups.
if (
not isinstance(feature_group, fg_mod.ExternalFeatureGroup)
and feature_group.transformation_functions
):
dataframe = self._apply_transformation_function(
feature_group.transformation_functions, dataframe
)
if (
isinstance(feature_group, fg_mod.ExternalFeatureGroup)
and feature_group.online_enabled
Expand Down Expand Up @@ -467,11 +459,6 @@ def save_stream_dataframe(
checkpoint_dir: Optional[str],
write_options: Optional[Dict[str, Any]],
):
if feature_group.transformation_functions:
dataframe = self._apply_transformation_function(
feature_group.transformation_functions, dataframe
)

write_options = kafka_engine.get_kafka_config(
feature_group.feature_store_id, write_options, engine="spark"
)
Expand Down Expand Up @@ -1314,13 +1301,16 @@ def save_empty_dataframe(self, feature_group, new_features=None):

dataframe = self._spark_session.read.format("hudi").load(location)

if (new_features is not None):
if new_features is not None:
if isinstance(new_features, list):
for new_feature in new_features:
dataframe = dataframe.withColumn(new_feature.name, lit(None).cast(new_feature.type))
dataframe = dataframe.withColumn(
new_feature.name, lit(None).cast(new_feature.type)
)
else:
dataframe = dataframe.withColumn(new_features.name, lit(None).cast(new_features.type))

dataframe = dataframe.withColumn(
new_features.name, lit(None).cast(new_features.type)
)

self.save_dataframe(
feature_group,
Expand All @@ -1337,18 +1327,22 @@ def add_cols_to_delta_table(self, feature_group, new_features):

dataframe = self._spark_session.read.format("delta").load(location)

if (new_features is not None):
if new_features is not None:
if isinstance(new_features, list):
for new_feature in new_features:
dataframe = dataframe.withColumn(new_feature.name, lit("").cast(new_feature.type))
dataframe = dataframe.withColumn(
new_feature.name, lit("").cast(new_feature.type)
)
else:
dataframe = dataframe.withColumn(new_features.name, lit("").cast(new_features.type))
dataframe = dataframe.withColumn(
new_features.name, lit("").cast(new_features.type)
)

dataframe.limit(0).write.format("delta").mode(
"append"
).option("mergeSchema", "true").option(
"spark.databricks.delta.schema.autoMerge.enabled", "true"
).save(location)
dataframe.limit(0).write.format("delta").mode("append").option(
"mergeSchema", "true"
).option("spark.databricks.delta.schema.autoMerge.enabled", "true").save(
location
)

def _apply_transformation_function(
self,
Expand Down Expand Up @@ -1378,9 +1372,19 @@ def _apply_transformation_function(
)

if missing_features:
raise FeatureStoreException(
f"Features {missing_features} specified in the transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please specify the feature required correctly."
)
if (
tf.transformation_type
== transformation_function.TransformationType.ON_DEMAND
):
# On-demand transformation are applied using the python/spark engine during insertion, the transformation while retrieving feature vectors are performed in the vector_server.
raise FeatureStoreException(
f"The following feature(s): `{'`, '.join(missing_features)}`, specified in the on-demand transformation function '{hopsworks_udf.function_name}' are not present in the dataframe being inserted into the feature group. "
+ "Please verify that the correct feature names are used in the transformation function and that these features exist in the dataframe being inserted."
)
else:
raise FeatureStoreException(
f"The following feature(s): `{'`, '.join(missing_features)}`, specified in the model-dependent transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please verify that the correct features are specified in the transformation function."
)
if tf.hopsworks_udf.dropped_features:
dropped_features.update(hopsworks_udf.dropped_features)

Expand Down
Loading

0 comments on commit 706efc4

Please sign in to comment.