diff --git a/python/hsfs/core/online_ingestion.py b/python/hsfs/core/online_ingestion.py index 192000359..645638e9b 100644 --- a/python/hsfs/core/online_ingestion.py +++ b/python/hsfs/core/online_ingestion.py @@ -17,6 +17,8 @@ import json import time +import warnings +from datetime import datetime, timedelta from typing import Any, Dict, Optional import humps @@ -122,7 +124,11 @@ def aborted_entries(self) -> int: def batch_results(self): return self._batch_results - def wait_for_completion(self): + def wait_for_completion(self, options={}): + # Set timeout time + timeout_delta = timedelta(seconds=options.get("timeout", 60)) + timeout_time = datetime.now() + timeout_delta + with tqdm(total=self.num_entries, bar_format="{desc}: {percentage:.2f}% |{bar}| Rows {n_fmt}/{total_fmt}", desc="Online data ingestion progress", @@ -137,6 +143,13 @@ def wait_for_completion(self): if self.processed_entries >= self.num_entries: break - time.sleep(1) + if datetime.now() >= timeout_time: + warnings.warn( + f"Timeout of {timeout_delta} was exceeded while waiting for online ingestion completion.", + stacklevel=1, + ) + break + + time.sleep(options.get("period", 1)) self.refresh()