From 43c94a6e9982edda752146c55d75ddc6c7dd3cac Mon Sep 17 00:00:00 2001 From: Bhargav Dodla <13788369+EXPEbdodla@users.noreply.github.com> Date: Fri, 8 Nov 2024 10:56:13 -0800 Subject: [PATCH] feat: Create infra as part of the streaming ingestion (#155) * feat: Create infra as part of the streaming ingestion --------- Co-authored-by: Bhargav Dodla --- .../infra/contrib/spark_kafka_processor.py | 19 +++++++++++++++++++ .../spark/spark_materialization_engine.py | 2 +- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index 20fd6b28ad..d98366c1a4 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -23,6 +23,7 @@ from feast.infra.materialization.contrib.spark.spark_materialization_engine import ( _SparkSerializedArtifacts, ) +from feast.infra.provider import get_provider from feast.stream_feature_view import StreamFeatureView from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping @@ -116,9 +117,27 @@ def __init__( # data_source type has been checked to be an instance of KafkaSource. self.data_source: KafkaSource = self.data_source # type: ignore + def _create_infra_if_necessary(self): + if self.fs.config.online_store is not None and getattr( + self.fs.config.online_store, "lazy_table_creation", False + ): + print( + f"Online store {self.fs.config.online_store.__class__.__name__} supports lazy table creation and it is enabled" + ) + provider = get_provider(self.fs.config) + provider.update_infra( + project=self.fs.project, + tables_to_delete=[], + tables_to_keep=[self.sfv], + entities_to_delete=[], + entities_to_keep=[], + partial=True, + ) + def ingest_stream_feature_view( self, to: PushMode = PushMode.ONLINE ) -> StreamingQuery: + self._create_infra_if_necessary() ingested_stream_df = self._ingest_stream_data() transformed_df = self._construct_transformation_plan(ingested_stream_df) if self.fs.config.provider == "expedia": diff --git a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py index d33daa0b60..cf30a42d72 100644 --- a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py @@ -180,7 +180,7 @@ def _materialize_one( ) print( - f"INFO!!! Processing {feature_view.name} with {spark_df.count()} records" + f"INFO: Processing {feature_view.name} with {spark_df.count()} records and {spark_df.rdd.getNumPartitions()} partitions" ) spark_df.mapInPandas(