Skip to content

Commit

Permalink
add wait_for_online_ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
bubriks committed Dec 13, 2024
1 parent f7b4800 commit f5b1206
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 0 deletions.
5 changes: 5 additions & 0 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,11 @@ def _write_dataframe_kafka(
),
await_termination=offline_write_options.get("wait_for_job", False),
)

# wait for online ingestion
if feature_group.online_enabled and offline_write_options.get("wait_for_online_ingestion", False):
feature_group.get_latest_online_ingestion().wait_for_completion()

return feature_group.materialization_job

@staticmethod
Expand Down
8 changes: 8 additions & 0 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,10 @@ def save_stream_dataframe(
if await_termination:
query.awaitTermination(timeout)

# wait for online ingestion
if feature_group.online_enabled and write_options.get("wait_for_online_ingestion", False):
feature_group.get_latest_online_ingestion().wait_for_completion()

return query

def _save_offline_dataframe(
Expand Down Expand Up @@ -573,6 +577,10 @@ def _save_online_dataframe(self, feature_group, dataframe, write_options):
.save()
)

# wait for online ingestion
if feature_group.online_enabled and write_options.get("wait_for_online_ingestion", False):
feature_group.get_latest_online_ingestion().wait_for_completion()

def _get_headers(
self,
feature_group: Union[fg_mod.FeatureGroup, fg_mod.ExternalFeatureGroup],
Expand Down

0 comments on commit f5b1206

Please sign in to comment.