From 706efc4ad3265bc378a1b544f6ad868722f2e9d0 Mon Sep 17 00:00:00 2001 From: manu-sj <152865565+manu-sj@users.noreply.github.com> Date: Fri, 25 Oct 2024 18:17:02 +0200 Subject: [PATCH] [FSTORE-1574] Transformation functions bug fixes (#376) * peforming data validation after on-demand transformations are applied * correcting error message for feature group transformations * removing validations for on-deamnd features in Vector server as they are computed while retriving the featur evector * adding tests for error messages * adding warnings for transformation functions in external feature group and updating logger message * correcting logging debug message --- python/hsfs/core/feature_group_engine.py | 43 ++++- python/hsfs/core/vector_server.py | 9 + python/hsfs/engine/python.py | 25 +-- python/hsfs/engine/spark.py | 60 +++--- .../tests/core/test_feature_group_engine.py | 146 +++++++++++++++ python/tests/engine/test_python.py | 128 ++++++++----- python/tests/engine/test_spark.py | 177 ------------------ 7 files changed, 320 insertions(+), 268 deletions(-) diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index f00a044e1..30d1cbe4b 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -15,7 +15,7 @@ from __future__ import annotations import warnings -from typing import List +from typing import List, Union from hsfs import engine, feature, util from hsfs import feature_group as fg @@ -67,7 +67,7 @@ def _update_feature_group_schema_on_demand_transformations( def save( self, - feature_group, + feature_group: Union[fg.FeatureGroup, fg.ExternalFeatureGroup], feature_dataframe, write_options, validation_options: dict = None, @@ -80,6 +80,21 @@ def save( feature_group=feature_group, features=dataframe_features ) ) + + # Currently on-demand transformation functions not supported in external feature groups. + if feature_group.transformation_functions: + if not isinstance(feature_group, fg.ExternalFeatureGroup): + feature_dataframe = ( + engine.get_instance()._apply_transformation_function( + feature_group.transformation_functions, feature_dataframe + ) + ) + else: + warnings.warn( + "On-Demand features were not created because On-Demand Transformations are not supported for External Feature Groups.", + stacklevel=1, + ) + util.validate_embedding_feature_type( feature_group.embedding_index, dataframe_features ) @@ -119,7 +134,7 @@ def save( def insert( self, - feature_group, + feature_group: Union[fg.FeatureGroup, fg.ExternalFeatureGroup], feature_dataframe, overwrite, operation, @@ -132,6 +147,16 @@ def insert( feature_group.time_travel_format, features=feature_group.features, ) + + # Currently on-demand transformation functions not supported in external feature groups. + if ( + not isinstance(feature_group, fg.ExternalFeatureGroup) + and feature_group.transformation_functions + ): + feature_dataframe = engine.get_instance()._apply_transformation_function( + feature_group.transformation_functions, feature_dataframe + ) + dataframe_features = ( self._update_feature_group_schema_on_demand_transformations( feature_group=feature_group, features=dataframe_features @@ -299,7 +324,9 @@ def append_features(self, feature_group, new_features): if feature_group.time_travel_format == "DELTA": engine.get_instance().add_cols_to_delta_table(feature_group, new_features) else: - engine.get_instance().save_empty_dataframe(feature_group, new_features=new_features) + engine.get_instance().save_empty_dataframe( + feature_group, new_features=new_features + ) def update_description(self, feature_group, description): """Updates the description of a feature group.""" @@ -326,7 +353,7 @@ def update_deprecated(self, feature_group, deprecate): def insert_stream( self, - feature_group, + feature_group: Union[fg.FeatureGroup, fg.ExternalFeatureGroup], dataframe, query_name, output_mode, @@ -349,6 +376,12 @@ def insert_stream( feature_group=feature_group, features=dataframe_features ) ) + + if feature_group.transformation_functions: + dataframe = engine.get_instance()._apply_transformation_function( + feature_group.transformation_functions, dataframe + ) + util.validate_embedding_feature_type( feature_group.embedding_index, dataframe_features ) diff --git a/python/hsfs/core/vector_server.py b/python/hsfs/core/vector_server.py index 0e785dde5..277b25051 100755 --- a/python/hsfs/core/vector_server.py +++ b/python/hsfs/core/vector_server.py @@ -1323,6 +1323,15 @@ def identify_missing_features_pre_fetch( passed_feature_names = passed_feature_names.union( vector_db_features.keys() ) + if self._on_demand_feature_names and len(self._on_demand_feature_names) > 0: + # Remove on-demand features from validation check as they would be computed. + _logger.debug( + "Appending on_demand_feature_names : %s, to passed_feature_names for pre-fetch missing", + self._on_demand_feature_names, + ) + passed_feature_names = passed_feature_names.union( + self._on_demand_feature_names + ) neither_fetched_nor_passed = fetched_features.difference( passed_feature_names ) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 68d3adfd6..2ab4742c6 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -808,15 +808,6 @@ def save_dataframe( online_write_options: Dict[str, Any], validation_id: Optional[int] = None, ) -> Optional[job.Job]: - # Currently on-demand transformation functions not supported in external feature groups. - if ( - not isinstance(feature_group, ExternalFeatureGroup) - and feature_group.transformation_functions - ): - dataframe = self._apply_transformation_function( - feature_group.transformation_functions, dataframe - ) - if ( hasattr(feature_group, "EXTERNAL_FEATURE_GROUP") and feature_group.online_enabled @@ -1298,9 +1289,19 @@ def _apply_transformation_function( dataset.columns ) if missing_features: - raise FeatureStoreException( - f"Features {missing_features} specified in the transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please specify the feature required correctly." - ) + if ( + tf.transformation_type + == transformation_function.TransformationType.ON_DEMAND + ): + # On-demand transformation are applied using the python/spark engine during insertion, the transformation while retrieving feature vectors are performed in the vector_server. + raise FeatureStoreException( + f"The following feature(s): `{'`, '.join(missing_features)}`, specified in the on-demand transformation function '{hopsworks_udf.function_name}' are not present in the dataframe being inserted into the feature group. " + + "Please verify that the correct feature names are used in the transformation function and that these features exist in the dataframe being inserted." + ) + else: + raise FeatureStoreException( + f"The following feature(s): `{'`, '.join(missing_features)}`, specified in the model-dependent transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please verify that the correct features are specified in the transformation function." + ) if tf.hopsworks_udf.dropped_features: dropped_features.update(tf.hopsworks_udf.dropped_features) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 69b17915a..10d3a9cb1 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -415,14 +415,6 @@ def save_dataframe( validation_id=None, ): try: - # Currently on-demand transformation functions not supported in external feature groups. - if ( - not isinstance(feature_group, fg_mod.ExternalFeatureGroup) - and feature_group.transformation_functions - ): - dataframe = self._apply_transformation_function( - feature_group.transformation_functions, dataframe - ) if ( isinstance(feature_group, fg_mod.ExternalFeatureGroup) and feature_group.online_enabled @@ -467,11 +459,6 @@ def save_stream_dataframe( checkpoint_dir: Optional[str], write_options: Optional[Dict[str, Any]], ): - if feature_group.transformation_functions: - dataframe = self._apply_transformation_function( - feature_group.transformation_functions, dataframe - ) - write_options = kafka_engine.get_kafka_config( feature_group.feature_store_id, write_options, engine="spark" ) @@ -1314,13 +1301,16 @@ def save_empty_dataframe(self, feature_group, new_features=None): dataframe = self._spark_session.read.format("hudi").load(location) - if (new_features is not None): + if new_features is not None: if isinstance(new_features, list): for new_feature in new_features: - dataframe = dataframe.withColumn(new_feature.name, lit(None).cast(new_feature.type)) + dataframe = dataframe.withColumn( + new_feature.name, lit(None).cast(new_feature.type) + ) else: - dataframe = dataframe.withColumn(new_features.name, lit(None).cast(new_features.type)) - + dataframe = dataframe.withColumn( + new_features.name, lit(None).cast(new_features.type) + ) self.save_dataframe( feature_group, @@ -1337,18 +1327,22 @@ def add_cols_to_delta_table(self, feature_group, new_features): dataframe = self._spark_session.read.format("delta").load(location) - if (new_features is not None): + if new_features is not None: if isinstance(new_features, list): for new_feature in new_features: - dataframe = dataframe.withColumn(new_feature.name, lit("").cast(new_feature.type)) + dataframe = dataframe.withColumn( + new_feature.name, lit("").cast(new_feature.type) + ) else: - dataframe = dataframe.withColumn(new_features.name, lit("").cast(new_features.type)) + dataframe = dataframe.withColumn( + new_features.name, lit("").cast(new_features.type) + ) - dataframe.limit(0).write.format("delta").mode( - "append" - ).option("mergeSchema", "true").option( - "spark.databricks.delta.schema.autoMerge.enabled", "true" - ).save(location) + dataframe.limit(0).write.format("delta").mode("append").option( + "mergeSchema", "true" + ).option("spark.databricks.delta.schema.autoMerge.enabled", "true").save( + location + ) def _apply_transformation_function( self, @@ -1378,9 +1372,19 @@ def _apply_transformation_function( ) if missing_features: - raise FeatureStoreException( - f"Features {missing_features} specified in the transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please specify the feature required correctly." - ) + if ( + tf.transformation_type + == transformation_function.TransformationType.ON_DEMAND + ): + # On-demand transformation are applied using the python/spark engine during insertion, the transformation while retrieving feature vectors are performed in the vector_server. + raise FeatureStoreException( + f"The following feature(s): `{'`, '.join(missing_features)}`, specified in the on-demand transformation function '{hopsworks_udf.function_name}' are not present in the dataframe being inserted into the feature group. " + + "Please verify that the correct feature names are used in the transformation function and that these features exist in the dataframe being inserted." + ) + else: + raise FeatureStoreException( + f"The following feature(s): `{'`, '.join(missing_features)}`, specified in the model-dependent transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please verify that the correct features are specified in the transformation function." + ) if tf.hopsworks_udf.dropped_features: dropped_features.update(hopsworks_udf.dropped_features) diff --git a/python/tests/core/test_feature_group_engine.py b/python/tests/core/test_feature_group_engine.py index 91f1086ed..e5cc55c05 100644 --- a/python/tests/core/test_feature_group_engine.py +++ b/python/tests/core/test_feature_group_engine.py @@ -56,6 +56,49 @@ def test_save(self, mocker): # Assert assert mock_engine_get_instance.return_value.save_dataframe.call_count == 1 + def test_save_dataframe_transformation_functions(self, mocker): + # Arrange + feature_store_id = 99 + + mocker.patch("hsfs.engine.get_type") + mock_engine_get_instance = mocker.patch("hsfs.engine.get_instance") + mocker.patch( + "hsfs.core.feature_group_engine.FeatureGroupEngine.save_feature_group_metadata" + ) + mocker.patch("hsfs.core.great_expectation_engine.GreatExpectationEngine") + + fg_engine = feature_group_engine.FeatureGroupEngine( + feature_store_id=feature_store_id + ) + + @udf(int) + def test(feature): + return feature + 1 + + fg = feature_group.FeatureGroup( + name="test", + version=1, + featurestore_id=feature_store_id, + primary_key=[], + partition_key=[], + transformation_functions=[test], + id=10, + ) + + # Act + fg_engine.save( + feature_group=fg, + feature_dataframe=None, + write_options=None, + ) + + # Assert + assert mock_engine_get_instance.return_value.save_dataframe.call_count == 1 + assert ( + mock_engine_get_instance.return_value._apply_transformation_function.call_count + == 1 + ) + def test_save_ge_report(self, mocker): # Arrange feature_store_id = 99 @@ -143,6 +186,56 @@ def test_insert(self, mocker): assert mock_fg_api.return_value.delete_content.call_count == 0 assert mock_engine_get_instance.return_value.save_dataframe.call_count == 1 + def test_insert_transformation_functions(self, mocker): + # Arrange + feature_store_id = 99 + + mocker.patch("hsfs.engine.get_type") + mock_engine_get_instance = mocker.patch("hsfs.engine.get_instance") + mocker.patch( + "hsfs.core.feature_group_engine.FeatureGroupEngine.save_feature_group_metadata" + ) + mocker.patch( + "hsfs.core.feature_group_engine.FeatureGroupEngine._verify_schema_compatibility" + ) + mocker.patch("hsfs.core.great_expectation_engine.GreatExpectationEngine") + mock_fg_api = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi") + + fg_engine = feature_group_engine.FeatureGroupEngine( + feature_store_id=feature_store_id + ) + + @udf(int) + def test(feature): + return feature + 1 + + fg = feature_group.FeatureGroup( + name="test", + version=1, + featurestore_id=feature_store_id, + transformation_functions=[test], + primary_key=[], + partition_key=[], + ) + + # Act + fg_engine.insert( + feature_group=fg, + feature_dataframe=None, + overwrite=None, + operation=None, + storage=None, + write_options=None, + ) + + # Assert + assert mock_fg_api.return_value.delete_content.call_count == 0 + assert mock_engine_get_instance.return_value.save_dataframe.call_count == 1 + assert ( + mock_engine_get_instance.return_value._apply_transformation_function.call_count + == 1 + ) + def test_insert_id(self, mocker): # Arrange feature_store_id = 99 @@ -909,6 +1002,59 @@ def test_insert_stream_stream(self, mocker): mock_engine_get_instance.return_value.save_stream_dataframe.call_count == 1 ) + def test_insert_stream_stream_transformation_functions(self, mocker): + # Arrange + feature_store_id = 99 + + mocker.patch("hsfs.engine.get_type") + mock_engine_get_instance = mocker.patch("hsfs.engine.get_instance") + mocker.patch( + "hsfs.core.feature_group_engine.FeatureGroupEngine.save_feature_group_metadata" + ) + mocker.patch( + "hsfs.core.feature_group_engine.FeatureGroupEngine._verify_schema_compatibility" + ) + + @udf(int) + def test(feature): + return feature + 1 + + fg_engine = feature_group_engine.FeatureGroupEngine( + feature_store_id=feature_store_id + ) + + fg = feature_group.FeatureGroup( + name="test", + version=1, + featurestore_id=feature_store_id, + primary_key=[], + partition_key=[], + transformation_functions=[test], + stream=True, + ) + + # Act + fg_engine.insert_stream( + feature_group=fg, + dataframe=None, + query_name=None, + output_mode=None, + await_termination=None, + timeout=None, + checkpoint_dir=None, + write_options=None, + ) + + # Assert + assert mock_engine_get_instance.return_value.save_dataframe.call_count == 0 + assert ( + mock_engine_get_instance.return_value.save_stream_dataframe.call_count == 1 + ) + assert ( + mock_engine_get_instance.return_value._apply_transformation_function.call_count + == 1 + ) + def test_insert_stream_online_enabled_id(self, mocker): # Arrange feature_store_id = 99 diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py index e921787be..84e2ca10a 100644 --- a/python/tests/engine/test_python.py +++ b/python/tests/engine/test_python.py @@ -1450,52 +1450,6 @@ def test_save_dataframe(self, mocker): assert mock_python_engine_write_dataframe_kafka.call_count == 0 assert mock_python_engine_legacy_save_dataframe.call_count == 1 - def test_save_dataframe_transformation_functions(self, mocker): - # Arrange - mock_python_engine_write_dataframe_kafka = mocker.patch( - "hsfs.engine.python.Engine._write_dataframe_kafka" - ) - mock_python_engine_legacy_save_dataframe = mocker.patch( - "hsfs.engine.python.Engine.legacy_save_dataframe" - ) - mock_python_engine_apply_transformations = mocker.patch( - "hsfs.engine.python.Engine._apply_transformation_function" - ) - - python_engine = python.Engine() - - @udf(int) - def test(feature): - return feature + 1 - - fg = feature_group.FeatureGroup( - name="test", - version=1, - featurestore_id=99, - primary_key=[], - partition_key=[], - id=10, - stream=False, - transformation_functions=[test], - ) - - # Act - python_engine.save_dataframe( - feature_group=fg, - dataframe=None, - operation=None, - online_enabled=None, - storage=None, - offline_write_options=None, - online_write_options=None, - validation_id=None, - ) - - # Assert - assert mock_python_engine_write_dataframe_kafka.call_count == 0 - assert mock_python_engine_legacy_save_dataframe.call_count == 1 - assert mock_python_engine_apply_transformations.call_count == 1 - def test_save_dataframe_stream(self, mocker): # Arrange mock_python_engine_write_dataframe_kafka = mocker.patch( @@ -3456,6 +3410,88 @@ def test_get_unique_values(self): assert 2 in result assert 3 in result + def test_apply_transformation_function_missing_feature_on_demand_transformations( + self, mocker + ): + # Arrange + mocker.patch("hopsworks_common.client.get_instance") + hopsworks_common.connection._hsfs_engine_type = "python" + python_engine = python.Engine() + + @udf(int) + def add_one(col1): + return col1 + 1 + + fg = feature_group.FeatureGroup( + name="test1", + version=1, + featurestore_id=99, + primary_key=[], + partition_key=[], + features=[feature.Feature("id"), feature.Feature("tf_name")], + transformation_functions=[add_one("missing_col1")], + id=11, + stream=False, + ) + + df = pd.DataFrame(data={"tf_name": [1, 2]}) + + # Act + with pytest.raises(exceptions.FeatureStoreException) as exception: + python_engine._apply_transformation_function( + transformation_functions=fg.transformation_functions, dataset=df + ) + print(str(exception.value)) + assert ( + str(exception.value) + == "The following feature(s): `missing_col1`, specified in the on-demand transformation function 'add_one' are not present in the dataframe being inserted into the feature group. " + "Please verify that the correct feature names are used in the transformation function and that these features exist in the dataframe being inserted." + ) + + def test_apply_transformation_function_missing_feature_model_dependent_transformations( + self, mocker + ): + # Arrange + mocker.patch("hopsworks_common.client.get_instance") + hopsworks_common.connection._hsfs_engine_type = "python" + python_engine = python.Engine() + + @udf(int) + def add_one(col1): + return col1 + 1 + + fg = feature_group.FeatureGroup( + name="test1", + version=1, + featurestore_id=99, + primary_key=[], + partition_key=[], + features=[feature.Feature("id"), feature.Feature("tf_name")], + id=11, + stream=False, + ) + + fv = feature_view.FeatureView( + name="fv_name", + query=fg.select_all(), + featurestore_id=99, + transformation_functions=[add_one("missing_col1")], + ) + + df = pd.DataFrame(data={"tf_name": [1, 2]}) + + # Act + with pytest.raises(exceptions.FeatureStoreException) as exception: + python_engine._apply_transformation_function( + transformation_functions=fv.transformation_functions, dataset=df + ) + print(str(exception.value)) + assert ( + str(exception.value) + == "The following feature(s): `missing_col1`, specified in the model-dependent transformation function 'add_one' are not present in the feature view. " + "Please verify that the correct features are specified in the transformation function." + ) + def test_materialization_kafka(self, mocker): # Arrange mocker.patch("hsfs.core.kafka_engine.get_kafka_config", return_value={}) diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index fb3f6e08f..f74aaf36f 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -605,51 +605,6 @@ def test_save_dataframe(self, mocker): assert mock_spark_engine_save_online_dataframe.call_count == 0 assert mock_spark_engine_save_offline_dataframe.call_count == 1 - def test_save_dataframe_transformations(self, mocker): - # Arrange - mock_spark_engine_save_online_dataframe = mocker.patch( - "hsfs.engine.spark.Engine._save_online_dataframe" - ) - mock_spark_engine_save_offline_dataframe = mocker.patch( - "hsfs.engine.spark.Engine._save_offline_dataframe" - ) - mock_spark_engine_apply_transformations = mocker.patch( - "hsfs.engine.spark.Engine._apply_transformation_function" - ) - - spark_engine = spark.Engine() - - @udf(int) - def test(feature): - return feature + 1 - - fg = feature_group.FeatureGroup( - name="test", - version=1, - featurestore_id=99, - primary_key=[], - partition_key=[], - id=10, - transformation_functions=[test], - ) - - # Act - spark_engine.save_dataframe( - feature_group=fg, - dataframe=None, - operation=None, - online_enabled=None, - storage=None, - offline_write_options=None, - online_write_options=None, - validation_id=None, - ) - - # Assert - assert mock_spark_engine_save_online_dataframe.call_count == 0 - assert mock_spark_engine_save_offline_dataframe.call_count == 1 - assert mock_spark_engine_apply_transformations.call_count == 1 - def test_save_dataframe_storage_offline(self, mocker): # Arrange mock_spark_engine_save_online_dataframe = mocker.patch( @@ -988,138 +943,6 @@ def test_save_stream_dataframe(self, mocker, backend_fixtures): == 0 ) - def test_save_stream_dataframe_transformations(self, mocker, backend_fixtures): - # Arrange - mock_common_client_get_instance = mocker.patch( - "hopsworks_common.client.get_instance" - ) - mocker.patch("hopsworks_common.client._is_external", return_value=False) - mocker.patch("hsfs.engine.spark.Engine._encode_complex_features") - mock_spark_engine_online_fg_to_avro = mocker.patch( - "hsfs.engine.spark.Engine._online_fg_to_avro" - ) - - mock_engine_get_instance = mocker.patch("hsfs.engine.get_instance") - mock_engine_get_instance.return_value.add_file.return_value = ( - "result_from_add_file" - ) - - mock_storage_connector_api = mocker.patch( - "hsfs.core.storage_connector_api.StorageConnectorApi" - ) - - mock_spark_engine_apply_transformations = mocker.patch( - "hsfs.engine.spark.Engine._apply_transformation_function" - ) - - json = backend_fixtures["storage_connector"]["get_kafka_external"]["response"] - sc = storage_connector.StorageConnector.from_response_json(json) - mock_storage_connector_api.return_value.get_kafka_connector.return_value = sc - - spark_engine = spark.Engine() - - @udf(int) - def test(feature): - return feature + 1 - - fg = feature_group.FeatureGroup( - name="test", - version=1, - featurestore_id=99, - primary_key=[], - partition_key=[], - id=10, - online_topic_name="test_online_topic_name", - transformation_functions=[test], - ) - fg.feature_store = mocker.Mock() - project_id = 1 - fg.feature_store.project_id = project_id - - mock_common_client_get_instance.return_value._project_name = "test_project_name" - - # Act - spark_engine.save_stream_dataframe( - feature_group=fg, - dataframe=None, - query_name=None, - output_mode="test_mode", - await_termination=None, - timeout=None, - checkpoint_dir=None, - write_options={"test_name": "test_value"}, - ) - - # Assert - assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.call_args[0][0] - == "headers" - ) - assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.call_args[ - 0 - ][0] - == "test_mode" - ) - assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.call_args[ - 0 - ][0] - == "kafka" - ) - assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ - 0 - ][0] - == "checkpointLocation" - ) - assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ - 0 - ][1] - == f"/Projects/test_project_name/Resources/{self._get_spark_query_name(project_id, fg)}-checkpoint" - ) - assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.call_args[ - 1 - ] - == { - "kafka.bootstrap.servers": "test_bootstrap_servers", - "kafka.security.protocol": "test_security_protocol", - "kafka.ssl.endpoint.identification.algorithm": "test_ssl_endpoint_identification_algorithm", - "kafka.ssl.key.password": "test_ssl_key_password", - "kafka.ssl.keystore.location": "result_from_add_file", - "kafka.ssl.keystore.password": "test_ssl_keystore_password", - "kafka.ssl.truststore.location": "result_from_add_file", - "kafka.ssl.truststore.password": "test_ssl_truststore_password", - "kafka.test_option_name": "test_option_value", - "test_name": "test_value", - } - ) - assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ - 0 - ][0] - == "topic" - ) - assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ - 0 - ][1] - == "test_online_topic_name" - ) - assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.call_args[ - 0 - ][0] - == self._get_spark_query_name(project_id, fg) - ) - assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.return_value.start.return_value.awaitTermination.call_count - == 0 - ) - assert mock_spark_engine_apply_transformations.call_count == 1 - def test_save_stream_dataframe_query_name(self, mocker, backend_fixtures): # Arrange mock_common_client_get_instance = mocker.patch(