Skip to content

Commit

Permalink
add timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
bubriks committed Dec 17, 2024
1 parent b80c0de commit 1f83a73
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions python/hsfs/core/online_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import json
import time
import warnings
from datetime import datetime, timedelta
from typing import Any, Dict, Optional

import humps
Expand Down Expand Up @@ -122,7 +124,11 @@ def aborted_entries(self) -> int:
def batch_results(self):
return self._batch_results

def wait_for_completion(self):
def wait_for_completion(self, options={}):

Check failure on line 127 in python/hsfs/core/online_ingestion.py

View workflow job for this annotation

GitHub Actions / Lint and Stylecheck

Ruff (B006)

python/hsfs/core/online_ingestion.py:127:43: B006 Do not use mutable data structures for argument defaults
# Set timeout time
timeout_delta = timedelta(seconds=options.get("timeout", 60))
timeout_time = datetime.now() + timeout_delta

with tqdm(total=self.num_entries,
bar_format="{desc}: {percentage:.2f}% |{bar}| Rows {n_fmt}/{total_fmt}",
desc="Online data ingestion progress",
Expand All @@ -137,6 +143,13 @@ def wait_for_completion(self):
if self.processed_entries >= self.num_entries:
break

time.sleep(1)
if datetime.now() >= timeout_time:
warnings.warn(
f"Timeout of {timeout_delta} was exceeded while waiting for online ingestion completion.",
stacklevel=1,
)
break

time.sleep(options.get("period", 1))

self.refresh()

0 comments on commit 1f83a73

Please sign in to comment.