From 1f83a730611c56c206e76a8eae4ff2d802450470 Mon Sep 17 00:00:00 2001 From: bubriks Date: Tue, 17 Dec 2024 17:47:25 +0200 Subject: [PATCH] add timeout --- python/hsfs/core/online_ingestion.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/python/hsfs/core/online_ingestion.py b/python/hsfs/core/online_ingestion.py index 192000359..645638e9b 100644 --- a/python/hsfs/core/online_ingestion.py +++ b/python/hsfs/core/online_ingestion.py @@ -17,6 +17,8 @@ import json import time +import warnings +from datetime import datetime, timedelta from typing import Any, Dict, Optional import humps @@ -122,7 +124,11 @@ def aborted_entries(self) -> int: def batch_results(self): return self._batch_results - def wait_for_completion(self): + def wait_for_completion(self, options={}): + # Set timeout time + timeout_delta = timedelta(seconds=options.get("timeout", 60)) + timeout_time = datetime.now() + timeout_delta + with tqdm(total=self.num_entries, bar_format="{desc}: {percentage:.2f}% |{bar}| Rows {n_fmt}/{total_fmt}", desc="Online data ingestion progress", @@ -137,6 +143,13 @@ def wait_for_completion(self): if self.processed_entries >= self.num_entries: break - time.sleep(1) + if datetime.now() >= timeout_time: + warnings.warn( + f"Timeout of {timeout_delta} was exceeded while waiting for online ingestion completion.", + stacklevel=1, + ) + break + + time.sleep(options.get("period", 1)) self.refresh()