diff --git a/java/beam/pom.xml b/java/beam/pom.xml
index fa0318e6f..92d49a52f 100644
--- a/java/beam/pom.xml
+++ b/java/beam/pom.xml
@@ -5,7 +5,7 @@
hsfs-parent
com.logicalclocks
- 4.1.0
+ 4.1.1
4.0.0
diff --git a/java/flink/pom.xml b/java/flink/pom.xml
index 06ca58eb2..018483a9a 100644
--- a/java/flink/pom.xml
+++ b/java/flink/pom.xml
@@ -5,7 +5,7 @@
hsfs-parent
com.logicalclocks
- 4.1.0
+ 4.1.1
4.0.0
diff --git a/java/hsfs/pom.xml b/java/hsfs/pom.xml
index a336347e7..4e7af8b1f 100644
--- a/java/hsfs/pom.xml
+++ b/java/hsfs/pom.xml
@@ -5,7 +5,7 @@
hsfs-parent
com.logicalclocks
- 4.1.0
+ 4.1.1
4.0.0
diff --git a/java/pom.xml b/java/pom.xml
index 57a6e283c..05ccc704a 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -7,7 +7,7 @@
com.logicalclocks
hsfs-parent
pom
- 4.1.0
+ 4.1.1
hsfs
spark
diff --git a/java/spark/pom.xml b/java/spark/pom.xml
index f1e8e6305..c374860f4 100644
--- a/java/spark/pom.xml
+++ b/java/spark/pom.xml
@@ -22,7 +22,7 @@
hsfs-parent
com.logicalclocks
- 4.1.0
+ 4.1.1
4.0.0
diff --git a/python/hopsworks_common/version.py b/python/hopsworks_common/version.py
index f29fdb5ff..6cb16d71c 100644
--- a/python/hopsworks_common/version.py
+++ b/python/hopsworks_common/version.py
@@ -14,4 +14,4 @@
# limitations under the License.
#
-__version__ = "4.1.0"
+__version__ = "4.1.1"
diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py
index f00a044e1..30d1cbe4b 100644
--- a/python/hsfs/core/feature_group_engine.py
+++ b/python/hsfs/core/feature_group_engine.py
@@ -15,7 +15,7 @@
from __future__ import annotations
import warnings
-from typing import List
+from typing import List, Union
from hsfs import engine, feature, util
from hsfs import feature_group as fg
@@ -67,7 +67,7 @@ def _update_feature_group_schema_on_demand_transformations(
def save(
self,
- feature_group,
+ feature_group: Union[fg.FeatureGroup, fg.ExternalFeatureGroup],
feature_dataframe,
write_options,
validation_options: dict = None,
@@ -80,6 +80,21 @@ def save(
feature_group=feature_group, features=dataframe_features
)
)
+
+ # Currently on-demand transformation functions not supported in external feature groups.
+ if feature_group.transformation_functions:
+ if not isinstance(feature_group, fg.ExternalFeatureGroup):
+ feature_dataframe = (
+ engine.get_instance()._apply_transformation_function(
+ feature_group.transformation_functions, feature_dataframe
+ )
+ )
+ else:
+ warnings.warn(
+ "On-Demand features were not created because On-Demand Transformations are not supported for External Feature Groups.",
+ stacklevel=1,
+ )
+
util.validate_embedding_feature_type(
feature_group.embedding_index, dataframe_features
)
@@ -119,7 +134,7 @@ def save(
def insert(
self,
- feature_group,
+ feature_group: Union[fg.FeatureGroup, fg.ExternalFeatureGroup],
feature_dataframe,
overwrite,
operation,
@@ -132,6 +147,16 @@ def insert(
feature_group.time_travel_format,
features=feature_group.features,
)
+
+ # Currently on-demand transformation functions not supported in external feature groups.
+ if (
+ not isinstance(feature_group, fg.ExternalFeatureGroup)
+ and feature_group.transformation_functions
+ ):
+ feature_dataframe = engine.get_instance()._apply_transformation_function(
+ feature_group.transformation_functions, feature_dataframe
+ )
+
dataframe_features = (
self._update_feature_group_schema_on_demand_transformations(
feature_group=feature_group, features=dataframe_features
@@ -299,7 +324,9 @@ def append_features(self, feature_group, new_features):
if feature_group.time_travel_format == "DELTA":
engine.get_instance().add_cols_to_delta_table(feature_group, new_features)
else:
- engine.get_instance().save_empty_dataframe(feature_group, new_features=new_features)
+ engine.get_instance().save_empty_dataframe(
+ feature_group, new_features=new_features
+ )
def update_description(self, feature_group, description):
"""Updates the description of a feature group."""
@@ -326,7 +353,7 @@ def update_deprecated(self, feature_group, deprecate):
def insert_stream(
self,
- feature_group,
+ feature_group: Union[fg.FeatureGroup, fg.ExternalFeatureGroup],
dataframe,
query_name,
output_mode,
@@ -349,6 +376,12 @@ def insert_stream(
feature_group=feature_group, features=dataframe_features
)
)
+
+ if feature_group.transformation_functions:
+ dataframe = engine.get_instance()._apply_transformation_function(
+ feature_group.transformation_functions, dataframe
+ )
+
util.validate_embedding_feature_type(
feature_group.embedding_index, dataframe_features
)
diff --git a/python/hsfs/core/vector_server.py b/python/hsfs/core/vector_server.py
index 0e785dde5..277b25051 100755
--- a/python/hsfs/core/vector_server.py
+++ b/python/hsfs/core/vector_server.py
@@ -1323,6 +1323,15 @@ def identify_missing_features_pre_fetch(
passed_feature_names = passed_feature_names.union(
vector_db_features.keys()
)
+ if self._on_demand_feature_names and len(self._on_demand_feature_names) > 0:
+ # Remove on-demand features from validation check as they would be computed.
+ _logger.debug(
+ "Appending on_demand_feature_names : %s, to passed_feature_names for pre-fetch missing",
+ self._on_demand_feature_names,
+ )
+ passed_feature_names = passed_feature_names.union(
+ self._on_demand_feature_names
+ )
neither_fetched_nor_passed = fetched_features.difference(
passed_feature_names
)
diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py
index a4dc65378..d5e16fe93 100644
--- a/python/hsfs/engine/python.py
+++ b/python/hsfs/engine/python.py
@@ -808,15 +808,6 @@ def save_dataframe(
online_write_options: Dict[str, Any],
validation_id: Optional[int] = None,
) -> Optional[job.Job]:
- # Currently on-demand transformation functions not supported in external feature groups.
- if (
- not isinstance(feature_group, ExternalFeatureGroup)
- and feature_group.transformation_functions
- ):
- dataframe = self._apply_transformation_function(
- feature_group.transformation_functions, dataframe
- )
-
if (
hasattr(feature_group, "EXTERNAL_FEATURE_GROUP")
and feature_group.online_enabled
@@ -1298,9 +1289,19 @@ def _apply_transformation_function(
dataset.columns
)
if missing_features:
- raise FeatureStoreException(
- f"Features {missing_features} specified in the transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please specify the feature required correctly."
- )
+ if (
+ tf.transformation_type
+ == transformation_function.TransformationType.ON_DEMAND
+ ):
+ # On-demand transformation are applied using the python/spark engine during insertion, the transformation while retrieving feature vectors are performed in the vector_server.
+ raise FeatureStoreException(
+ f"The following feature(s): `{'`, '.join(missing_features)}`, specified in the on-demand transformation function '{hopsworks_udf.function_name}' are not present in the dataframe being inserted into the feature group. "
+ + "Please verify that the correct feature names are used in the transformation function and that these features exist in the dataframe being inserted."
+ )
+ else:
+ raise FeatureStoreException(
+ f"The following feature(s): `{'`, '.join(missing_features)}`, specified in the model-dependent transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please verify that the correct features are specified in the transformation function."
+ )
if tf.hopsworks_udf.dropped_features:
dropped_features.update(tf.hopsworks_udf.dropped_features)
diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py
index 69b17915a..10d3a9cb1 100644
--- a/python/hsfs/engine/spark.py
+++ b/python/hsfs/engine/spark.py
@@ -415,14 +415,6 @@ def save_dataframe(
validation_id=None,
):
try:
- # Currently on-demand transformation functions not supported in external feature groups.
- if (
- not isinstance(feature_group, fg_mod.ExternalFeatureGroup)
- and feature_group.transformation_functions
- ):
- dataframe = self._apply_transformation_function(
- feature_group.transformation_functions, dataframe
- )
if (
isinstance(feature_group, fg_mod.ExternalFeatureGroup)
and feature_group.online_enabled
@@ -467,11 +459,6 @@ def save_stream_dataframe(
checkpoint_dir: Optional[str],
write_options: Optional[Dict[str, Any]],
):
- if feature_group.transformation_functions:
- dataframe = self._apply_transformation_function(
- feature_group.transformation_functions, dataframe
- )
-
write_options = kafka_engine.get_kafka_config(
feature_group.feature_store_id, write_options, engine="spark"
)
@@ -1314,13 +1301,16 @@ def save_empty_dataframe(self, feature_group, new_features=None):
dataframe = self._spark_session.read.format("hudi").load(location)
- if (new_features is not None):
+ if new_features is not None:
if isinstance(new_features, list):
for new_feature in new_features:
- dataframe = dataframe.withColumn(new_feature.name, lit(None).cast(new_feature.type))
+ dataframe = dataframe.withColumn(
+ new_feature.name, lit(None).cast(new_feature.type)
+ )
else:
- dataframe = dataframe.withColumn(new_features.name, lit(None).cast(new_features.type))
-
+ dataframe = dataframe.withColumn(
+ new_features.name, lit(None).cast(new_features.type)
+ )
self.save_dataframe(
feature_group,
@@ -1337,18 +1327,22 @@ def add_cols_to_delta_table(self, feature_group, new_features):
dataframe = self._spark_session.read.format("delta").load(location)
- if (new_features is not None):
+ if new_features is not None:
if isinstance(new_features, list):
for new_feature in new_features:
- dataframe = dataframe.withColumn(new_feature.name, lit("").cast(new_feature.type))
+ dataframe = dataframe.withColumn(
+ new_feature.name, lit("").cast(new_feature.type)
+ )
else:
- dataframe = dataframe.withColumn(new_features.name, lit("").cast(new_features.type))
+ dataframe = dataframe.withColumn(
+ new_features.name, lit("").cast(new_features.type)
+ )
- dataframe.limit(0).write.format("delta").mode(
- "append"
- ).option("mergeSchema", "true").option(
- "spark.databricks.delta.schema.autoMerge.enabled", "true"
- ).save(location)
+ dataframe.limit(0).write.format("delta").mode("append").option(
+ "mergeSchema", "true"
+ ).option("spark.databricks.delta.schema.autoMerge.enabled", "true").save(
+ location
+ )
def _apply_transformation_function(
self,
@@ -1378,9 +1372,19 @@ def _apply_transformation_function(
)
if missing_features:
- raise FeatureStoreException(
- f"Features {missing_features} specified in the transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please specify the feature required correctly."
- )
+ if (
+ tf.transformation_type
+ == transformation_function.TransformationType.ON_DEMAND
+ ):
+ # On-demand transformation are applied using the python/spark engine during insertion, the transformation while retrieving feature vectors are performed in the vector_server.
+ raise FeatureStoreException(
+ f"The following feature(s): `{'`, '.join(missing_features)}`, specified in the on-demand transformation function '{hopsworks_udf.function_name}' are not present in the dataframe being inserted into the feature group. "
+ + "Please verify that the correct feature names are used in the transformation function and that these features exist in the dataframe being inserted."
+ )
+ else:
+ raise FeatureStoreException(
+ f"The following feature(s): `{'`, '.join(missing_features)}`, specified in the model-dependent transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please verify that the correct features are specified in the transformation function."
+ )
if tf.hopsworks_udf.dropped_features:
dropped_features.update(hopsworks_udf.dropped_features)
diff --git a/python/tests/core/test_feature_group_engine.py b/python/tests/core/test_feature_group_engine.py
index 91f1086ed..e5cc55c05 100644
--- a/python/tests/core/test_feature_group_engine.py
+++ b/python/tests/core/test_feature_group_engine.py
@@ -56,6 +56,49 @@ def test_save(self, mocker):
# Assert
assert mock_engine_get_instance.return_value.save_dataframe.call_count == 1
+ def test_save_dataframe_transformation_functions(self, mocker):
+ # Arrange
+ feature_store_id = 99
+
+ mocker.patch("hsfs.engine.get_type")
+ mock_engine_get_instance = mocker.patch("hsfs.engine.get_instance")
+ mocker.patch(
+ "hsfs.core.feature_group_engine.FeatureGroupEngine.save_feature_group_metadata"
+ )
+ mocker.patch("hsfs.core.great_expectation_engine.GreatExpectationEngine")
+
+ fg_engine = feature_group_engine.FeatureGroupEngine(
+ feature_store_id=feature_store_id
+ )
+
+ @udf(int)
+ def test(feature):
+ return feature + 1
+
+ fg = feature_group.FeatureGroup(
+ name="test",
+ version=1,
+ featurestore_id=feature_store_id,
+ primary_key=[],
+ partition_key=[],
+ transformation_functions=[test],
+ id=10,
+ )
+
+ # Act
+ fg_engine.save(
+ feature_group=fg,
+ feature_dataframe=None,
+ write_options=None,
+ )
+
+ # Assert
+ assert mock_engine_get_instance.return_value.save_dataframe.call_count == 1
+ assert (
+ mock_engine_get_instance.return_value._apply_transformation_function.call_count
+ == 1
+ )
+
def test_save_ge_report(self, mocker):
# Arrange
feature_store_id = 99
@@ -143,6 +186,56 @@ def test_insert(self, mocker):
assert mock_fg_api.return_value.delete_content.call_count == 0
assert mock_engine_get_instance.return_value.save_dataframe.call_count == 1
+ def test_insert_transformation_functions(self, mocker):
+ # Arrange
+ feature_store_id = 99
+
+ mocker.patch("hsfs.engine.get_type")
+ mock_engine_get_instance = mocker.patch("hsfs.engine.get_instance")
+ mocker.patch(
+ "hsfs.core.feature_group_engine.FeatureGroupEngine.save_feature_group_metadata"
+ )
+ mocker.patch(
+ "hsfs.core.feature_group_engine.FeatureGroupEngine._verify_schema_compatibility"
+ )
+ mocker.patch("hsfs.core.great_expectation_engine.GreatExpectationEngine")
+ mock_fg_api = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi")
+
+ fg_engine = feature_group_engine.FeatureGroupEngine(
+ feature_store_id=feature_store_id
+ )
+
+ @udf(int)
+ def test(feature):
+ return feature + 1
+
+ fg = feature_group.FeatureGroup(
+ name="test",
+ version=1,
+ featurestore_id=feature_store_id,
+ transformation_functions=[test],
+ primary_key=[],
+ partition_key=[],
+ )
+
+ # Act
+ fg_engine.insert(
+ feature_group=fg,
+ feature_dataframe=None,
+ overwrite=None,
+ operation=None,
+ storage=None,
+ write_options=None,
+ )
+
+ # Assert
+ assert mock_fg_api.return_value.delete_content.call_count == 0
+ assert mock_engine_get_instance.return_value.save_dataframe.call_count == 1
+ assert (
+ mock_engine_get_instance.return_value._apply_transformation_function.call_count
+ == 1
+ )
+
def test_insert_id(self, mocker):
# Arrange
feature_store_id = 99
@@ -909,6 +1002,59 @@ def test_insert_stream_stream(self, mocker):
mock_engine_get_instance.return_value.save_stream_dataframe.call_count == 1
)
+ def test_insert_stream_stream_transformation_functions(self, mocker):
+ # Arrange
+ feature_store_id = 99
+
+ mocker.patch("hsfs.engine.get_type")
+ mock_engine_get_instance = mocker.patch("hsfs.engine.get_instance")
+ mocker.patch(
+ "hsfs.core.feature_group_engine.FeatureGroupEngine.save_feature_group_metadata"
+ )
+ mocker.patch(
+ "hsfs.core.feature_group_engine.FeatureGroupEngine._verify_schema_compatibility"
+ )
+
+ @udf(int)
+ def test(feature):
+ return feature + 1
+
+ fg_engine = feature_group_engine.FeatureGroupEngine(
+ feature_store_id=feature_store_id
+ )
+
+ fg = feature_group.FeatureGroup(
+ name="test",
+ version=1,
+ featurestore_id=feature_store_id,
+ primary_key=[],
+ partition_key=[],
+ transformation_functions=[test],
+ stream=True,
+ )
+
+ # Act
+ fg_engine.insert_stream(
+ feature_group=fg,
+ dataframe=None,
+ query_name=None,
+ output_mode=None,
+ await_termination=None,
+ timeout=None,
+ checkpoint_dir=None,
+ write_options=None,
+ )
+
+ # Assert
+ assert mock_engine_get_instance.return_value.save_dataframe.call_count == 0
+ assert (
+ mock_engine_get_instance.return_value.save_stream_dataframe.call_count == 1
+ )
+ assert (
+ mock_engine_get_instance.return_value._apply_transformation_function.call_count
+ == 1
+ )
+
def test_insert_stream_online_enabled_id(self, mocker):
# Arrange
feature_store_id = 99
diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py
index e921787be..84e2ca10a 100644
--- a/python/tests/engine/test_python.py
+++ b/python/tests/engine/test_python.py
@@ -1450,52 +1450,6 @@ def test_save_dataframe(self, mocker):
assert mock_python_engine_write_dataframe_kafka.call_count == 0
assert mock_python_engine_legacy_save_dataframe.call_count == 1
- def test_save_dataframe_transformation_functions(self, mocker):
- # Arrange
- mock_python_engine_write_dataframe_kafka = mocker.patch(
- "hsfs.engine.python.Engine._write_dataframe_kafka"
- )
- mock_python_engine_legacy_save_dataframe = mocker.patch(
- "hsfs.engine.python.Engine.legacy_save_dataframe"
- )
- mock_python_engine_apply_transformations = mocker.patch(
- "hsfs.engine.python.Engine._apply_transformation_function"
- )
-
- python_engine = python.Engine()
-
- @udf(int)
- def test(feature):
- return feature + 1
-
- fg = feature_group.FeatureGroup(
- name="test",
- version=1,
- featurestore_id=99,
- primary_key=[],
- partition_key=[],
- id=10,
- stream=False,
- transformation_functions=[test],
- )
-
- # Act
- python_engine.save_dataframe(
- feature_group=fg,
- dataframe=None,
- operation=None,
- online_enabled=None,
- storage=None,
- offline_write_options=None,
- online_write_options=None,
- validation_id=None,
- )
-
- # Assert
- assert mock_python_engine_write_dataframe_kafka.call_count == 0
- assert mock_python_engine_legacy_save_dataframe.call_count == 1
- assert mock_python_engine_apply_transformations.call_count == 1
-
def test_save_dataframe_stream(self, mocker):
# Arrange
mock_python_engine_write_dataframe_kafka = mocker.patch(
@@ -3456,6 +3410,88 @@ def test_get_unique_values(self):
assert 2 in result
assert 3 in result
+ def test_apply_transformation_function_missing_feature_on_demand_transformations(
+ self, mocker
+ ):
+ # Arrange
+ mocker.patch("hopsworks_common.client.get_instance")
+ hopsworks_common.connection._hsfs_engine_type = "python"
+ python_engine = python.Engine()
+
+ @udf(int)
+ def add_one(col1):
+ return col1 + 1
+
+ fg = feature_group.FeatureGroup(
+ name="test1",
+ version=1,
+ featurestore_id=99,
+ primary_key=[],
+ partition_key=[],
+ features=[feature.Feature("id"), feature.Feature("tf_name")],
+ transformation_functions=[add_one("missing_col1")],
+ id=11,
+ stream=False,
+ )
+
+ df = pd.DataFrame(data={"tf_name": [1, 2]})
+
+ # Act
+ with pytest.raises(exceptions.FeatureStoreException) as exception:
+ python_engine._apply_transformation_function(
+ transformation_functions=fg.transformation_functions, dataset=df
+ )
+ print(str(exception.value))
+ assert (
+ str(exception.value)
+ == "The following feature(s): `missing_col1`, specified in the on-demand transformation function 'add_one' are not present in the dataframe being inserted into the feature group. "
+ "Please verify that the correct feature names are used in the transformation function and that these features exist in the dataframe being inserted."
+ )
+
+ def test_apply_transformation_function_missing_feature_model_dependent_transformations(
+ self, mocker
+ ):
+ # Arrange
+ mocker.patch("hopsworks_common.client.get_instance")
+ hopsworks_common.connection._hsfs_engine_type = "python"
+ python_engine = python.Engine()
+
+ @udf(int)
+ def add_one(col1):
+ return col1 + 1
+
+ fg = feature_group.FeatureGroup(
+ name="test1",
+ version=1,
+ featurestore_id=99,
+ primary_key=[],
+ partition_key=[],
+ features=[feature.Feature("id"), feature.Feature("tf_name")],
+ id=11,
+ stream=False,
+ )
+
+ fv = feature_view.FeatureView(
+ name="fv_name",
+ query=fg.select_all(),
+ featurestore_id=99,
+ transformation_functions=[add_one("missing_col1")],
+ )
+
+ df = pd.DataFrame(data={"tf_name": [1, 2]})
+
+ # Act
+ with pytest.raises(exceptions.FeatureStoreException) as exception:
+ python_engine._apply_transformation_function(
+ transformation_functions=fv.transformation_functions, dataset=df
+ )
+ print(str(exception.value))
+ assert (
+ str(exception.value)
+ == "The following feature(s): `missing_col1`, specified in the model-dependent transformation function 'add_one' are not present in the feature view. "
+ "Please verify that the correct features are specified in the transformation function."
+ )
+
def test_materialization_kafka(self, mocker):
# Arrange
mocker.patch("hsfs.core.kafka_engine.get_kafka_config", return_value={})
diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py
index fb3f6e08f..f74aaf36f 100644
--- a/python/tests/engine/test_spark.py
+++ b/python/tests/engine/test_spark.py
@@ -605,51 +605,6 @@ def test_save_dataframe(self, mocker):
assert mock_spark_engine_save_online_dataframe.call_count == 0
assert mock_spark_engine_save_offline_dataframe.call_count == 1
- def test_save_dataframe_transformations(self, mocker):
- # Arrange
- mock_spark_engine_save_online_dataframe = mocker.patch(
- "hsfs.engine.spark.Engine._save_online_dataframe"
- )
- mock_spark_engine_save_offline_dataframe = mocker.patch(
- "hsfs.engine.spark.Engine._save_offline_dataframe"
- )
- mock_spark_engine_apply_transformations = mocker.patch(
- "hsfs.engine.spark.Engine._apply_transformation_function"
- )
-
- spark_engine = spark.Engine()
-
- @udf(int)
- def test(feature):
- return feature + 1
-
- fg = feature_group.FeatureGroup(
- name="test",
- version=1,
- featurestore_id=99,
- primary_key=[],
- partition_key=[],
- id=10,
- transformation_functions=[test],
- )
-
- # Act
- spark_engine.save_dataframe(
- feature_group=fg,
- dataframe=None,
- operation=None,
- online_enabled=None,
- storage=None,
- offline_write_options=None,
- online_write_options=None,
- validation_id=None,
- )
-
- # Assert
- assert mock_spark_engine_save_online_dataframe.call_count == 0
- assert mock_spark_engine_save_offline_dataframe.call_count == 1
- assert mock_spark_engine_apply_transformations.call_count == 1
-
def test_save_dataframe_storage_offline(self, mocker):
# Arrange
mock_spark_engine_save_online_dataframe = mocker.patch(
@@ -988,138 +943,6 @@ def test_save_stream_dataframe(self, mocker, backend_fixtures):
== 0
)
- def test_save_stream_dataframe_transformations(self, mocker, backend_fixtures):
- # Arrange
- mock_common_client_get_instance = mocker.patch(
- "hopsworks_common.client.get_instance"
- )
- mocker.patch("hopsworks_common.client._is_external", return_value=False)
- mocker.patch("hsfs.engine.spark.Engine._encode_complex_features")
- mock_spark_engine_online_fg_to_avro = mocker.patch(
- "hsfs.engine.spark.Engine._online_fg_to_avro"
- )
-
- mock_engine_get_instance = mocker.patch("hsfs.engine.get_instance")
- mock_engine_get_instance.return_value.add_file.return_value = (
- "result_from_add_file"
- )
-
- mock_storage_connector_api = mocker.patch(
- "hsfs.core.storage_connector_api.StorageConnectorApi"
- )
-
- mock_spark_engine_apply_transformations = mocker.patch(
- "hsfs.engine.spark.Engine._apply_transformation_function"
- )
-
- json = backend_fixtures["storage_connector"]["get_kafka_external"]["response"]
- sc = storage_connector.StorageConnector.from_response_json(json)
- mock_storage_connector_api.return_value.get_kafka_connector.return_value = sc
-
- spark_engine = spark.Engine()
-
- @udf(int)
- def test(feature):
- return feature + 1
-
- fg = feature_group.FeatureGroup(
- name="test",
- version=1,
- featurestore_id=99,
- primary_key=[],
- partition_key=[],
- id=10,
- online_topic_name="test_online_topic_name",
- transformation_functions=[test],
- )
- fg.feature_store = mocker.Mock()
- project_id = 1
- fg.feature_store.project_id = project_id
-
- mock_common_client_get_instance.return_value._project_name = "test_project_name"
-
- # Act
- spark_engine.save_stream_dataframe(
- feature_group=fg,
- dataframe=None,
- query_name=None,
- output_mode="test_mode",
- await_termination=None,
- timeout=None,
- checkpoint_dir=None,
- write_options={"test_name": "test_value"},
- )
-
- # Assert
- assert (
- mock_spark_engine_online_fg_to_avro.return_value.withColumn.call_args[0][0]
- == "headers"
- )
- assert (
- mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.call_args[
- 0
- ][0]
- == "test_mode"
- )
- assert (
- mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.call_args[
- 0
- ][0]
- == "kafka"
- )
- assert (
- mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[
- 0
- ][0]
- == "checkpointLocation"
- )
- assert (
- mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[
- 0
- ][1]
- == f"/Projects/test_project_name/Resources/{self._get_spark_query_name(project_id, fg)}-checkpoint"
- )
- assert (
- mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.call_args[
- 1
- ]
- == {
- "kafka.bootstrap.servers": "test_bootstrap_servers",
- "kafka.security.protocol": "test_security_protocol",
- "kafka.ssl.endpoint.identification.algorithm": "test_ssl_endpoint_identification_algorithm",
- "kafka.ssl.key.password": "test_ssl_key_password",
- "kafka.ssl.keystore.location": "result_from_add_file",
- "kafka.ssl.keystore.password": "test_ssl_keystore_password",
- "kafka.ssl.truststore.location": "result_from_add_file",
- "kafka.ssl.truststore.password": "test_ssl_truststore_password",
- "kafka.test_option_name": "test_option_value",
- "test_name": "test_value",
- }
- )
- assert (
- mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[
- 0
- ][0]
- == "topic"
- )
- assert (
- mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[
- 0
- ][1]
- == "test_online_topic_name"
- )
- assert (
- mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.call_args[
- 0
- ][0]
- == self._get_spark_query_name(project_id, fg)
- )
- assert (
- mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.return_value.start.return_value.awaitTermination.call_count
- == 0
- )
- assert mock_spark_engine_apply_transformations.call_count == 1
-
def test_save_stream_dataframe_query_name(self, mocker, backend_fixtures):
# Arrange
mock_common_client_get_instance = mocker.patch(
diff --git a/utils/java/pom.xml b/utils/java/pom.xml
index ff201119d..4862a7fcc 100644
--- a/utils/java/pom.xml
+++ b/utils/java/pom.xml
@@ -5,7 +5,7 @@
com.logicalclocks
hsfs-utils
- 4.1.0
+ 4.1.1
3.2.0.0-SNAPSHOT