Skip to content

Commit

Permalink
feat: support returning and resuming partial results
Browse files Browse the repository at this point in the history
Add a parameter `keep_failed` to `Dials.list_all()`,
specifying whether to return the paginated partial results
when an HTTP request fails.

Add a parameter `resume_from` to `Dials.list_all()`
to accept paginated partial results and resume the fetching.

Add a field `exception` to `PaginatedBaseModel`
to record the exception that causes the request to fail.

If applied, users would be able to write something like:
```python
data = None
data = dials.h1d.list_all(<filter>, keep_failed=True, resume_from=data)
```
and resume by <Up><Enter> from the terminal.
  • Loading branch information
ShamrockLee committed Aug 14, 2024
1 parent c72a1ae commit 3abbb79
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 6 deletions.
50 changes: 45 additions & 5 deletions cmsdials/utils/api_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from importlib import util as importlib_util
from traceback import format_exception_only
from typing import Optional
from urllib.parse import parse_qs, urlparse
from warnings import warn

import requests
from requests.exceptions import HTTPError
Expand Down Expand Up @@ -99,23 +101,61 @@ def list(self, filters=None):

raise ValueError("pagination model is None and response is not a list.")

def __list_sync(self, filters, max_pages: Optional[int] = None, enable_progress: bool = False):
next_token = None
def __list_sync(
self,
filters,
max_pages: Optional[int] = None,
enable_progress: bool = False,
keep_failed: bool = False,
resume_from=None,
):
next_string: Optional[str] = None
results = []
is_last_page = False

if resume_from is not None:
results = resume_from.results
next_string = resume_from.next
if next_string is None and len(results):
warn(
"resume_from.next is None while resume_from.result is not empty, doing nothing.",
RuntimeWarning,
stacklevel=2,
)
is_last_page = True

total_pages = 0
use_tqdm = TQDM_INSTALLED and enable_progress

if use_tqdm:
progress = tqdm(desc="Progress", total=1)

while is_last_page is False:
next_token = parse_qs(urlparse(next_string).query).get("next_token") if next_string else None
curr_filters = self.filter_class(**filters.dict())
curr_filters.next_token = next_token
response = self.list(curr_filters)
try:
response = self.list(curr_filters)
except Exception as e:
if use_tqdm:
progress.close()

if not keep_failed:
raise e
warn(
"HTTP request failed, returning partial results. Exception: " + "\n".join(format_exception_only(e)),
RuntimeWarning,
stacklevel=2,
)
return self.pagination_model(
next=next_string,
previous=None,
results=results,
exception=e,
)
results.extend(response.results)
is_last_page = response.next is None
next_token = parse_qs(urlparse(response.next).query).get("next_token") if response.next else None
next_string = response.next
is_last_page = next_string is None
total_pages += 1
max_pages_reached = max_pages and total_pages >= max_pages
if use_tqdm:
Expand Down
8 changes: 7 additions & 1 deletion cmsdials/utils/base_model.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from importlib import util as importlib_util
from typing import Optional

from pydantic import BaseModel
from pydantic import BaseModel, Field


if importlib_util.find_spec("pandas"):
Expand All @@ -17,6 +18,11 @@ def cleandict(self):


class PaginatedBaseModel(BaseModel):
class Config:
arbitrary_types_allowed = True

exception: Optional[BaseException] = Field(default=None)

def to_pandas(self):
if PANDAS_NOT_INSTALLED:
raise RuntimeError(
Expand Down

0 comments on commit 3abbb79

Please sign in to comment.