From 035b95365b0fdf726608f47d40063705721d4e1f Mon Sep 17 00:00:00 2001 From: bubriks Date: Mon, 25 Nov 2024 12:16:46 +0200 Subject: [PATCH] refactor to be a feature_store_activity --- .../logicalclocks/hsfs/FeatureGroupBase.java | 6 ++--- ...IngestionRun.java => OnlineIngestion.java} | 4 ++-- .../hsfs/metadata/FeatureGroupApi.java | 2 -- ...ionRunApi.java => OnlineIngestionApi.java} | 22 ++++++++--------- .../hsfs/spark/engine/SparkEngine.java | 11 +++++---- .../{ingestion_run.py => online_ingestion.py} | 12 +++++----- ...ion_run_api.py => online_ingestion_api.py} | 22 ++++++++--------- python/hsfs/engine/python.py | 8 +++---- python/hsfs/engine/spark.py | 10 ++++---- python/hsfs/feature_group.py | 8 +++---- python/tests/engine/test_python.py | 24 +++++++++---------- python/tests/engine/test_python_writer.py | 4 ++-- python/tests/engine/test_spark.py | 8 +++---- python/tests/test_feature_group_writer.py | 12 +++++----- 14 files changed, 76 insertions(+), 77 deletions(-) rename java/hsfs/src/main/java/com/logicalclocks/hsfs/{IngestionRun.java => OnlineIngestion.java} (89%) rename java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/{IngestionRunApi.java => OnlineIngestionApi.java} (75%) rename python/hsfs/core/{ingestion_run.py => online_ingestion.py} (90%) rename python/hsfs/core/{ingestion_run_api.py => online_ingestion_api.py} (76%) diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java index 0304422b0..22b408ab1 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java @@ -34,7 +34,7 @@ import com.logicalclocks.hsfs.constructor.QueryBase; import com.logicalclocks.hsfs.engine.FeatureGroupEngineBase; import com.logicalclocks.hsfs.engine.FeatureGroupUtils; -import com.logicalclocks.hsfs.metadata.IngestionRunApi; +import com.logicalclocks.hsfs.metadata.OnlineIngestionApi; import com.logicalclocks.hsfs.metadata.Statistics; import com.logicalclocks.hsfs.metadata.Subject; import com.logicalclocks.hsfs.metadata.User; @@ -545,8 +545,8 @@ public Schema getDeserializedAvroSchema() throws FeatureStoreException, IOExcept } @JsonIgnore - public IngestionRun getLatestIngestionRun() throws IOException, FeatureStoreException { - return new IngestionRunApi().getIngestionRun(this, new HashMap() {{ + public OnlineIngestion getLatestOnlineIngestion() throws IOException, FeatureStoreException { + return new OnlineIngestionApi().getOnlineIngestion(this, new HashMap() {{ put("filter_by", "LATEST"); } }); diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/IngestionRun.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java similarity index 89% rename from java/hsfs/src/main/java/com/logicalclocks/hsfs/IngestionRun.java rename to java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java index 91b26156e..e7d98b0d3 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/IngestionRun.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java @@ -26,7 +26,7 @@ @NoArgsConstructor @AllArgsConstructor -public class IngestionRun extends RestDto { +public class OnlineIngestion extends RestDto { @Getter @Setter @@ -49,7 +49,7 @@ public class IngestionRun extends RestDto { @Getter private Integer processedEntries; - public IngestionRun(String startingOffsets, String endingOffsets) { + public OnlineIngestion(String startingOffsets, String endingOffsets) { this.startingOffsets = startingOffsets; this.endingOffsets = endingOffsets; } diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java index eca3d818d..4c9c346e6 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java @@ -48,8 +48,6 @@ public class FeatureGroupApi { public static final String FEATURE_GROUP_COMMIT_PATH = FEATURE_GROUP_ID_PATH + "/commits{?filter_by,sort_by,offset,limit}"; public static final String FEATURE_GROUP_CLEAR_PATH = FEATURE_GROUP_ID_PATH + "/clear"; - public static final String FEATURE_GROUP_INGESTION_RUN = FEATURE_GROUP_ID_PATH - + "/ingestionrun{?filter_by,sort_by,offset,limit}"; private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroupApi.class); diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/IngestionRunApi.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/OnlineIngestionApi.java similarity index 75% rename from java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/IngestionRunApi.java rename to java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/OnlineIngestionApi.java index 43ee4fe36..9222b0783 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/IngestionRunApi.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/metadata/OnlineIngestionApi.java @@ -20,7 +20,7 @@ import com.damnhandy.uri.template.UriTemplate; import com.logicalclocks.hsfs.FeatureGroupBase; import com.logicalclocks.hsfs.FeatureStoreException; -import com.logicalclocks.hsfs.IngestionRun; +import com.logicalclocks.hsfs.OnlineIngestion; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.slf4j.Logger; @@ -29,20 +29,20 @@ import java.io.IOException; import java.util.Map; -public class IngestionRunApi { +public class OnlineIngestionApi { public static final String FEATURE_GROUP_ID_PATH = "/featuregroups/{fgId}"; - public static final String FEATURE_GROUP_INGESTION_RUN = FEATURE_GROUP_ID_PATH - + "/ingestionrun{?filter_by,sort_by,offset,limit}"; + public static final String FEATURE_GROUP_ONLINE_INGESTION = FEATURE_GROUP_ID_PATH + + "/online_ingestion{?filter_by,sort_by,offset,limit}"; - private static final Logger LOGGER = LoggerFactory.getLogger(IngestionRunApi.class); + private static final Logger LOGGER = LoggerFactory.getLogger(OnlineIngestionApi.class); - public void saveIngestionRun(FeatureGroupBase featureGroupBase, IngestionRun ingestionRun) + public void saveOnlineIngestion(FeatureGroupBase featureGroupBase, OnlineIngestion onlineIngestion) throws FeatureStoreException, IOException { HopsworksClient hopsworksClient = HopsworksClient.getInstance(); String pathTemplate = HopsworksClient.PROJECT_PATH + FeatureStoreApi.FEATURE_STORE_PATH - + FEATURE_GROUP_INGESTION_RUN; + + FEATURE_GROUP_ONLINE_INGESTION; String uri = UriTemplate.fromTemplate(pathTemplate) .set("projectId", hopsworksClient.getProject().getProjectId()) @@ -51,16 +51,16 @@ public void saveIngestionRun(FeatureGroupBase featureGroupBase, IngestionRun ing .expand(); HttpPost postRequest = new HttpPost(uri); - postRequest.setEntity(hopsworksClient.buildStringEntity(ingestionRun)); + postRequest.setEntity(hopsworksClient.buildStringEntity(onlineIngestion)); hopsworksClient.handleRequest(postRequest); } - public IngestionRun getIngestionRun(FeatureGroupBase featureGroupBase, Map queryParams) + public OnlineIngestion getOnlineIngestion(FeatureGroupBase featureGroupBase, Map queryParams) throws IOException, FeatureStoreException { HopsworksClient hopsworksClient = HopsworksClient.getInstance(); String pathTemplate = HopsworksClient.PROJECT_PATH + FeatureStoreApi.FEATURE_STORE_PATH - + FEATURE_GROUP_INGESTION_RUN; + + FEATURE_GROUP_ONLINE_INGESTION; UriTemplate uriTemplate = UriTemplate.fromTemplate(pathTemplate) .set("projectId", hopsworksClient.getProject().getProjectId()) @@ -76,6 +76,6 @@ public IngestionRun getIngestionRun(FeatureGroupBase featureGroupBase, Map String endingCheckPoint = kafkaEngine.kafkaGetOffsets(featureGroupBase, writeOptions, true); - ingestionRunApi.saveIngestionRun(featureGroupBase, new IngestionRun(startingCheckPoint, endingCheckPoint)); + onlineIngestionApi.saveOnlineIngestion(featureGroupBase, new OnlineIngestion(startingCheckPoint, endingCheckPoint)); } public StreamingQuery writeStreamDataframe(FeatureGroupBase featureGroupBase, Dataset dataset, @@ -637,7 +637,8 @@ public void onQueryProgress(QueryProgressEvent queryProgress) { public void onQueryTerminated(QueryTerminatedEvent queryTerminated) { try { String endingCheckPoint = kafkaEngine.kafkaGetOffsets(featureGroupBase, writeOptions, true); - ingestionRunApi.saveIngestionRun(featureGroupBase, new IngestionRun(startingCheckPoint, endingCheckPoint)); + onlineIngestionApi.saveOnlineIngestion(featureGroupBase, + new OnlineIngestion(startingCheckPoint, endingCheckPoint)); } catch (Exception e) { e.printStackTrace(); } diff --git a/python/hsfs/core/ingestion_run.py b/python/hsfs/core/online_ingestion.py similarity index 90% rename from python/hsfs/core/ingestion_run.py rename to python/hsfs/core/online_ingestion.py index 0823b5b75..d70fa99f5 100644 --- a/python/hsfs/core/ingestion_run.py +++ b/python/hsfs/core/online_ingestion.py @@ -24,9 +24,9 @@ from tqdm.auto import tqdm -class IngestionRun: +class OnlineIngestion: """ - Metadata object used to provide Ingestion Run information for a feature group. + Metadata object used to provide Online Ingestion information for a feature group. """ def __init__( @@ -47,7 +47,7 @@ def __init__( self._processed_entries = processed_entries @classmethod - def from_response_json(cls, json_dict: Dict[str, Any]) -> "IngestionRun": + def from_response_json(cls, json_dict: Dict[str, Any]) -> "OnlineIngestion": if json_dict is None: return None @@ -63,13 +63,13 @@ def from_response_json(cls, json_dict: Dict[str, Any]) -> "IngestionRun": return None def refresh(self): - from hsfs.core.ingestion_run_api import IngestionRunApi + from python.hsfs.core.online_ingestion_api import OnlineIngestionApi - ingestion_run = IngestionRunApi().get_ingestion_run( + online_ingestion = OnlineIngestionApi().get_online_ingestion( self.feature_group, query_params={"filter_by": f"ID:{self.id}"} ) - self.__dict__.update(ingestion_run.__dict__) + self.__dict__.update(online_ingestion.__dict__) def to_dict(self): return { diff --git a/python/hsfs/core/ingestion_run_api.py b/python/hsfs/core/online_ingestion_api.py similarity index 76% rename from python/hsfs/core/ingestion_run_api.py rename to python/hsfs/core/online_ingestion_api.py index dc73ab4ae..df631b533 100644 --- a/python/hsfs/core/ingestion_run_api.py +++ b/python/hsfs/core/online_ingestion_api.py @@ -17,15 +17,15 @@ from hopsworks_common import client from hsfs import feature_group as fg_mod -from hsfs.core import ingestion_run +from hsfs.core import online_ingestion -class IngestionRunApi: +class OnlineIngestionApi: - def save_ingestion_run( + def save_online_ingestion( self, feature_group_instance: fg_mod.FeatureGroup, - ingestion_run_instance: ingestion_run.IngestionRun, + online_ingestion_instance: online_ingestion.OnlineIngestion, ): _client = client.get_instance() path_params = [ @@ -35,13 +35,13 @@ def save_ingestion_run( feature_group_instance.feature_store_id, "featuregroups", feature_group_instance.id, - "ingestionrun", + "online_ingestion", ] headers = {"content-type": "application/json"} - _client._send_request("POST", path_params, headers=headers, data=ingestion_run_instance.json()) + _client._send_request("POST", path_params, headers=headers, data=online_ingestion_instance.json()) - def get_ingestion_run( + def get_online_ingestion( self, feature_group_instance: fg_mod.FeatureGroup, query_params: None, @@ -54,11 +54,11 @@ def get_ingestion_run( feature_group_instance.feature_store_id, "featuregroups", feature_group_instance.id, - "ingestionrun", + "online_ingestion", ] - ingestion_run_instance = ingestion_run.IngestionRun.from_response_json( + online_ingestion_instance = online_ingestion.OnlineIngestion.from_response_json( _client._send_request("GET", path_params, query_params) ) - ingestion_run_instance.feature_group = feature_group_instance - return ingestion_run_instance + online_ingestion_instance.feature_group = feature_group_instance + return online_ingestion_instance diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 65081c4d5..0c5af358f 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -70,11 +70,11 @@ feature_group_api, feature_view_api, ingestion_job_conf, - ingestion_run, - ingestion_run_api, job, job_api, kafka_engine, + online_ingestion, + online_ingestion_api, statistics_api, storage_connector_api, training_dataset_api, @@ -1504,9 +1504,9 @@ def _write_dataframe_kafka( high=True, ) - ingestion_run_api.IngestionRunApi().save_ingestion_run( + online_ingestion_api.OnlineIngestionApi().save_online_ingestion( feature_group, - ingestion_run.IngestionRun( + online_ingestion.OnlineIngestion( starting_offsets=initial_check_point, ending_offsets=ending_check_point ) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 84fcfe38f..1a805c2fc 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -36,7 +36,7 @@ import tzlocal from hopsworks_common.core.constants import HAS_NUMPY, HAS_PANDAS from hsfs.constructor import query -from hsfs.core import feature_group_api, ingestion_run, ingestion_run_api +from hsfs.core import feature_group_api, online_ingestion, online_ingestion_api # in case importing in %%local from hsfs.core.vector_db_client import VectorDbClient @@ -559,9 +559,9 @@ def on_finished(): high=True, ) - ingestion_run_api.IngestionRunApi().save_ingestion_run( + online_ingestion_api.OnlineIngestionApi().save_online_ingestion( feature_group, - ingestion_run.IngestionRun( + online_ingestion.OnlineIngestion( starting_offsets=starting_check_point, ending_offsets=ending_check_point ) @@ -647,9 +647,9 @@ def _save_online_dataframe(self, feature_group, dataframe, write_options): high=True, ) - ingestion_run_api.IngestionRunApi().save_ingestion_run( + online_ingestion_api.OnlineIngestionApi().save_online_ingestion( feature_group, - ingestion_run.IngestionRun( + online_ingestion.OnlineIngestion( starting_offsets=starting_check_point, ending_offsets=ending_check_point ) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index ff0ee4d95..98e5603c9 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -64,9 +64,9 @@ feature_monitoring_result_engine, feature_store_api, great_expectation_engine, - ingestion_run, - ingestion_run_api, job_api, + online_ingestion, + online_ingestion_api, spine_group_engine, statistics_engine, validation_report_engine, @@ -1801,8 +1801,8 @@ def feature_store(self) -> feature_store_mod.FeatureStore: ) return self._feature_store - def get_latest_ingestion_run(self) -> ingestion_run.IngestionRun: - return ingestion_run_api.IngestionRunApi().get_ingestion_run(self, query_params={"filter_by": "LATEST"}) + def get_latest_online_ingestion(self) -> online_ingestion.OnlineIngestion: + return online_ingestion_api.OnlineIngestionApi().get_online_ingestion(self, query_params={"filter_by": "LATEST"}) @feature_store.setter def feature_store(self, feature_store: feature_store_mod.FeatureStore) -> None: diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py index 45490799a..8115594a6 100644 --- a/python/tests/engine/test_python.py +++ b/python/tests/engine/test_python.py @@ -3503,7 +3503,7 @@ def test_materialization_kafka(self, mocker): # Arrange mocker.patch("hsfs.core.kafka_engine.get_kafka_config", return_value={}) mocker.patch("hsfs.feature_group.FeatureGroup._get_encoded_avro_schema") - mock_save_ingestion_run = mocker.patch("hsfs.core.ingestion_run_api.IngestionRunApi.save_ingestion_run") + mock_save_online_ingestion = mocker.patch("hsfs.core.online_ingestion_api.OnlineIngestionApi.save_online_ingestion") mocker.patch("hsfs.core.kafka_engine.get_encoder_func") mocker.patch("hsfs.core.kafka_engine.encode_complex_features") mock_python_engine_kafka_produce = mocker.patch( @@ -3557,13 +3557,13 @@ def test_materialization_kafka(self, mocker): args="defaults", await_termination=False, ) - assert mock_save_ingestion_run.call_count == 1 + assert mock_save_online_ingestion.call_count == 1 def test_materialization_kafka_first_job_execution(self, mocker): # Arrange mocker.patch("hsfs.core.kafka_engine.get_kafka_config", return_value={}) mocker.patch("hsfs.feature_group.FeatureGroup._get_encoded_avro_schema") - mock_save_ingestion_run = mocker.patch("hsfs.core.ingestion_run_api.IngestionRunApi.save_ingestion_run") + mock_save_online_ingestion = mocker.patch("hsfs.core.online_ingestion_api.OnlineIngestionApi.save_online_ingestion") mocker.patch("hsfs.core.kafka_engine.get_encoder_func") mocker.patch("hsfs.core.kafka_engine.encode_complex_features") mock_python_engine_kafka_produce = mocker.patch( @@ -3617,13 +3617,13 @@ def test_materialization_kafka_first_job_execution(self, mocker): args="defaults -initialCheckPointString tests_offsets", await_termination=False, ) - assert mock_save_ingestion_run.call_count == 1 + assert mock_save_online_ingestion.call_count == 1 def test_materialization_kafka_skip_offsets(self, mocker): # Arrange mocker.patch("hsfs.core.kafka_engine.get_kafka_config", return_value={}) mocker.patch("hsfs.feature_group.FeatureGroup._get_encoded_avro_schema") - mock_save_ingestion_run = mocker.patch("hsfs.core.ingestion_run_api.IngestionRunApi.save_ingestion_run") + mock_save_online_ingestion = mocker.patch("hsfs.core.online_ingestion_api.OnlineIngestionApi.save_online_ingestion") mocker.patch("hsfs.core.kafka_engine.get_encoder_func") mocker.patch("hsfs.core.kafka_engine.encode_complex_features") mock_python_engine_kafka_produce = mocker.patch( @@ -3676,13 +3676,13 @@ def test_materialization_kafka_skip_offsets(self, mocker): args="defaults -initialCheckPointString tests_offsets", await_termination=False, ) - assert mock_save_ingestion_run.call_count == 1 + assert mock_save_online_ingestion.call_count == 1 def test_materialization_kafka_topic_doesnt_exist(self, mocker): # Arrange mocker.patch("hsfs.core.kafka_engine.get_kafka_config", return_value={}) mocker.patch("hsfs.feature_group.FeatureGroup._get_encoded_avro_schema") - mock_save_ingestion_run = mocker.patch("hsfs.core.ingestion_run_api.IngestionRunApi.save_ingestion_run") + mock_save_online_ingestion = mocker.patch("hsfs.core.online_ingestion_api.OnlineIngestionApi.save_online_ingestion") mocker.patch("hsfs.core.kafka_engine.get_encoder_func") mocker.patch("hsfs.core.kafka_engine.encode_complex_features") mock_python_engine_kafka_produce = mocker.patch( @@ -3732,7 +3732,7 @@ def test_materialization_kafka_topic_doesnt_exist(self, mocker): args="defaults -initialCheckPointString tests_offsets", await_termination=False, ) - assert mock_save_ingestion_run.call_count == 1 + assert mock_save_online_ingestion.call_count == 1 def test_test(self, mocker): fg = feature_group.FeatureGroup( @@ -3755,7 +3755,7 @@ def test_test(self, mocker): assert fg.materialization_job.config == {"defaultArgs": "defaults"} - def test_materialization_ingestion_run(self, mocker): + def test_materialization_online_ingestion(self, mocker): # Arrange mocker.patch("hsfs.core.kafka_engine.get_kafka_config", return_value={}) mocker.patch("hsfs.feature_group.FeatureGroup._get_encoded_avro_schema") @@ -3769,7 +3769,7 @@ def test_materialization_ingestion_run(self, mocker): "hsfs.core.kafka_engine.kafka_get_offsets", side_effect=["tests_offsets1", "tests_offsets2"], ) - mock_save_ingestion_run = mocker.patch("hsfs.core.ingestion_run_api.IngestionRunApi.save_ingestion_run") + mock_save_online_ingestion = mocker.patch("hsfs.core.online_ingestion_api.OnlineIngestionApi.save_online_ingestion") mocker.patch( "hsfs.core.job_api.JobApi.last_execution", return_value=[], @@ -3807,8 +3807,8 @@ def test_materialization_ingestion_run(self, mocker): ) # Assert - assert mock_save_ingestion_run.call_count == 1 - args, _ = mock_save_ingestion_run.call_args + assert mock_save_online_ingestion.call_count == 1 + args, _ = mock_save_online_ingestion.call_args assert args[0] == fg assert args[1].starting_offsets == "tests_offsets1" assert args[1].ending_offsets == "tests_offsets2" diff --git a/python/tests/engine/test_python_writer.py b/python/tests/engine/test_python_writer.py index 33cccb533..05c980be0 100644 --- a/python/tests/engine/test_python_writer.py +++ b/python/tests/engine/test_python_writer.py @@ -33,7 +33,7 @@ def test_write_dataframe_kafka(self, mocker, dataframe_fixture_times): avro_schema_mock = mocker.patch( "hsfs.feature_group.FeatureGroup._get_encoded_avro_schema" ) - mock_save_ingestion_run = mocker.patch("hsfs.core.ingestion_run_api.IngestionRunApi.save_ingestion_run") + mock_save_online_ingestion = mocker.patch("hsfs.core.online_ingestion_api.OnlineIngestionApi.save_online_ingestion") avro_schema = ( '{"type":"record","name":"test_fg","namespace":"test_featurestore.db","fields":' '[{"name":"primary_key","type":["null","long"]},{"name":"event_date","type":' @@ -114,4 +114,4 @@ def test_write_dataframe_kafka(self, mocker, dataframe_fixture_times): } assert reference_record == record - assert mock_save_ingestion_run.call_count == 1 + assert mock_save_online_ingestion.call_count == 1 diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index 709b35b89..c1ede09ea 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -1162,7 +1162,7 @@ def test_save_stream_dataframe_checkpoint_dir(self, mocker, backend_fixtures): mock_storage_connector_api = mocker.patch( "hsfs.core.storage_connector_api.StorageConnectorApi" ) - mock_save_ingestion_run = mocker.patch("hsfs.core.ingestion_run_api.IngestionRunApi.save_ingestion_run") + mock_save_online_ingestion = mocker.patch("hsfs.core.online_ingestion_api.OnlineIngestionApi.save_online_ingestion") json = backend_fixtures["storage_connector"]["get_kafka_external"]["response"] sc = storage_connector.StorageConnector.from_response_json(json) mock_storage_connector_api.return_value.get_kafka_connector.return_value = sc @@ -1289,7 +1289,7 @@ def test_save_stream_dataframe_checkpoint_dir(self, mocker, backend_fixtures): mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.return_value.start.return_value.awaitTermination.call_count == 1 ) - assert mock_save_ingestion_run.call_count == 1 + assert mock_save_online_ingestion.call_count == 1 def test_save_stream_dataframe_await_termination(self, mocker, backend_fixtures): # Arrange @@ -1592,7 +1592,7 @@ def test_save_online_dataframe(self, mocker, backend_fixtures): mock_storage_connector_api = mocker.patch( "hsfs.core.storage_connector_api.StorageConnectorApi" ) - mock_save_ingestion_run = mocker.patch("hsfs.core.ingestion_run_api.IngestionRunApi.save_ingestion_run") + mock_save_online_ingestion = mocker.patch("hsfs.core.online_ingestion_api.OnlineIngestionApi.save_online_ingestion") json = backend_fixtures["storage_connector"]["get_kafka_external"]["response"] sc = storage_connector.StorageConnector.from_response_json(json) mock_storage_connector_api.return_value.get_kafka_connector.return_value = sc @@ -1688,7 +1688,7 @@ def test_save_online_dataframe(self, mocker, backend_fixtures): mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.write.format.return_value.options.return_value.option.return_value.save.call_count == 1 ) - assert mock_save_ingestion_run.call_count == 1 + assert mock_save_online_ingestion.call_count == 1 def test_serialize_to_avro(self, mocker): # Arrange diff --git a/python/tests/test_feature_group_writer.py b/python/tests/test_feature_group_writer.py index 766b290d9..a2713e259 100644 --- a/python/tests/test_feature_group_writer.py +++ b/python/tests/test_feature_group_writer.py @@ -21,7 +21,7 @@ class TestFeatureGroupWriter: def test_fg_writer_context_manager(self, mocker, dataframe_fixture_basic): mock_insert = mocker.patch("hsfs.feature_group.FeatureGroup.insert") - mock_save_ingestion_run = mocker.patch("hsfs.core.ingestion_run_api.IngestionRunApi.save_ingestion_run") + mock_save_online_ingestion = mocker.patch("hsfs.core.online_ingestion_api.OnlineIngestionApi.save_online_ingestion") fg = feature_group.FeatureGroup( name="test", @@ -46,14 +46,14 @@ def test_fg_writer_context_manager(self, mocker, dataframe_fixture_basic): validation_options={"fetch_expectation_suite": False}, ) assert fg._multi_part_insert is False - assert mock_save_ingestion_run.call_count == 0 + assert mock_save_online_ingestion.call_count == 0 def test_fg_writer_cache_management(self, mocker, dataframe_fixture_basic): engine = python.Engine() mocker.patch("hsfs.engine.get_instance", return_value=engine) mocker.patch("hopsworks_common.client.get_instance") mocker.patch("hsfs.core.kafka_engine.kafka_get_offsets", return_value="") - mock_save_ingestion_run = mocker.patch("hsfs.core.ingestion_run_api.IngestionRunApi.save_ingestion_run") + mock_save_online_ingestion = mocker.patch("hsfs.core.online_ingestion_api.OnlineIngestionApi.save_online_ingestion") producer, feature_writers, writer_m = ( mocker.MagicMock(), mocker.MagicMock(), @@ -107,14 +107,14 @@ def test_fg_writer_cache_management(self, mocker, dataframe_fixture_basic): assert fg._feature_writers is None assert fg._kafka_headers is None assert fg._writer is None - assert mock_save_ingestion_run.call_count == 0 + assert mock_save_online_ingestion.call_count == 0 def test_fg_writer_without_context_manager(self, mocker, dataframe_fixture_basic): engine = python.Engine() mocker.patch("hsfs.engine.get_instance", return_value=engine) mocker.patch("hopsworks_common.client.get_instance") mocker.patch("hsfs.core.kafka_engine.kafka_get_offsets", return_value="") - mock_save_ingestion_run = mocker.patch("hsfs.core.ingestion_run_api.IngestionRunApi.save_ingestion_run") + mock_save_online_ingestion = mocker.patch("hsfs.core.online_ingestion_api.OnlineIngestionApi.save_online_ingestion") producer, feature_writers, writer_m = ( mocker.MagicMock(), mocker.MagicMock(), @@ -166,4 +166,4 @@ def test_fg_writer_without_context_manager(self, mocker, dataframe_fixture_basic assert fg._feature_writers is None assert fg._kafka_headers is None assert fg._writer is None - assert mock_save_ingestion_run.call_count == 0 + assert mock_save_online_ingestion.call_count == 0