Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 retry pulling image layer when they timeout #7051

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
5 changes: 2 additions & 3 deletions packages/models-library/src/models_library/progress_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@

from pydantic import BaseModel, ConfigDict

from .basic_types import IDStr

# NOTE: keep a list of possible unit, and please use correct official unit names
ProgressUnit: TypeAlias = Literal["Byte"]


class ProgressStructuredMessage(BaseModel):
description: IDStr
description: str
current: float
total: int
unit: str | None = None
Expand Down Expand Up @@ -51,6 +49,7 @@ class ProgressStructuredMessage(BaseModel):
class ProgressReport(BaseModel):
actual_value: float
total: float = 1.0
attempt: int = 0
unit: ProgressUnit | None = UNITLESS
message: ProgressStructuredMessage | None = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from typing import Final

import tqdm
from models_library.basic_types import IDStr
from pydantic import NonNegativeInt
from servicelib.logging_utils import log_catch
from tqdm.contrib.logging import tqdm_logging_redirect
Expand Down Expand Up @@ -199,7 +198,7 @@ async def archive_dir(
) -> None:
if progress_bar is None:
progress_bar = ProgressBarData(
num_steps=1, description=IDStr(f"compressing {dir_to_compress.name}")
num_steps=1, description=f"compressing {dir_to_compress.name}"
)

options = " ".join(
Expand All @@ -223,7 +222,7 @@ async def archive_dir(

async with AsyncExitStack() as exit_stack:
sub_progress = await exit_stack.enter_async_context(
progress_bar.sub_progress(folder_size_bytes, description=IDStr("..."))
progress_bar.sub_progress(folder_size_bytes, description="...")
)

tqdm_progress = exit_stack.enter_context(
Expand Down Expand Up @@ -290,7 +289,7 @@ async def unarchive_dir(
) -> set[Path]:
if progress_bar is None:
progress_bar = ProgressBarData(
num_steps=1, description=IDStr(f"extracting {archive_to_extract.name}")
num_steps=1, description=f"extracting {archive_to_extract.name}"
)

# get archive information
Expand All @@ -304,7 +303,7 @@ async def unarchive_dir(

async with AsyncExitStack() as exit_stack:
sub_prog = await exit_stack.enter_async_context(
progress_bar.sub_progress(steps=total_bytes, description=IDStr("..."))
progress_bar.sub_progress(steps=total_bytes, description="...")
)

tqdm_progress = exit_stack.enter_context(
Expand Down
104 changes: 71 additions & 33 deletions packages/service-library/src/servicelib/docker_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import logging
from collections.abc import Awaitable, Callable
from contextlib import AsyncExitStack
Expand All @@ -11,8 +12,21 @@
from models_library.docker import DockerGenericTag
from models_library.generated_models.docker_rest_api import ProgressDetail
from models_library.utils.change_case import snake_to_camel
from pydantic import BaseModel, ByteSize, ConfigDict, TypeAdapter, ValidationError
from pydantic import (
BaseModel,
ByteSize,
ConfigDict,
NonNegativeInt,
TypeAdapter,
ValidationError,
)
from settings_library.docker_registry import RegistrySettings
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_random_exponential,
)
from yarl import URL

from .logging_utils import LogLevelInt
Expand Down Expand Up @@ -209,6 +223,8 @@ async def pull_image(
progress_bar: ProgressBarData,
log_cb: LogCB,
image_information: DockerImageManifestsV2 | None,
*,
retry_upon_error_count: NonNegativeInt = 10,
) -> None:
"""pull a docker image to the host machine.

Expand All @@ -219,7 +235,9 @@ async def pull_image(
progress_bar -- the current progress bar
log_cb -- a callback function to send logs to
image_information -- the image layer information. If this is None, then no fine progress will be retrieved.
retry_upon_error_count -- number of tries if there is a TimeoutError. Usually cased by networking issues.
"""

registry_auth = None
if registry_settings.REGISTRY_URL and registry_settings.REGISTRY_URL in image:
registry_auth = {
Expand All @@ -245,39 +263,59 @@ async def pull_image(

client = await exit_stack.enter_async_context(aiodocker.Docker())

reported_progress = 0.0
async for pull_progress in client.images.pull(
image, stream=True, auth=registry_auth
):
try:
parsed_progress = TypeAdapter(_DockerPullImage).validate_python(
pull_progress
def _reset_progress_from_previous_attempt() -> None:
for pulled_status in layer_id_to_size.values():
pulled_status.downloaded = 0
pulled_status.extracted = 0

@retry(
wait=wait_random_exponential(),
stop=stop_after_attempt(retry_upon_error_count),
reraise=True,
retry=retry_if_exception_type(asyncio.TimeoutError),
)
async def _pull_image_with_retry() -> None:
# for each attempt rest the progress
progress_bar.reset_progress()
_reset_progress_from_previous_attempt()

_logger.info("trying to pull image='%s'", image)

reported_progress = 0.0
async for pull_progress in client.images.pull(
image, stream=True, auth=registry_auth
):
try:
parsed_progress = TypeAdapter(_DockerPullImage).validate_python(
pull_progress
)
except ValidationError:
_logger.exception(
"Unexpected error while validating '%s'. "
"TIP: This is probably an unforeseen pull status text that shall be added to the code. "
"The pulling process will still continue.",
f"{pull_progress=}",
)
else:
await _parse_pull_information(
parsed_progress, layer_id_to_size=layer_id_to_size
)

# compute total progress
total_downloaded_size = sum(
layer.downloaded for layer in layer_id_to_size.values()
)
except ValidationError:
_logger.exception(
"Unexpected error while validating '%s'. "
"TIP: This is probably an unforeseen pull status text that shall be added to the code. "
"The pulling process will still continue.",
f"{pull_progress=}",
total_extracted_size = sum(
layer.extracted for layer in layer_id_to_size.values()
)
else:
await _parse_pull_information(
parsed_progress, layer_id_to_size=layer_id_to_size
total_progress = (total_downloaded_size + total_extracted_size) / 2.0
progress_to_report = total_progress - reported_progress
await progress_bar.update(progress_to_report)
reported_progress = total_progress

await log_cb(
f"pulling {image_short_name}: {pull_progress}...",
logging.DEBUG,
)

# compute total progress
total_downloaded_size = sum(
layer.downloaded for layer in layer_id_to_size.values()
)
total_extracted_size = sum(
layer.extracted for layer in layer_id_to_size.values()
)
total_progress = (total_downloaded_size + total_extracted_size) / 2.0
progress_to_report = total_progress - reported_progress
await progress_bar.update(progress_to_report)
reported_progress = total_progress

await log_cb(
f"pulling {image_short_name}: {pull_progress}...",
logging.DEBUG,
)
await _pull_image_with_retry()
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Final

import httpx
from models_library.basic_types import IDStr
from models_library.docker import DockerGenericTag
from pydantic import ByteSize, TypeAdapter, ValidationError
from settings_library.docker_registry import RegistrySettings
Expand Down Expand Up @@ -129,9 +128,7 @@ async def pull_images(
num_steps=images_total_size,
progress_report_cb=progress_cb,
progress_unit="Byte",
description=IDStr(
f"pulling {len(images)} images",
),
description=f"pulling {len(images)} images",
) as pbar:

await asyncio.gather(
Expand Down
12 changes: 9 additions & 3 deletions packages/service-library/src/servicelib/progress_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from inspect import isawaitable
from typing import Final, Optional, Protocol, runtime_checkable

from models_library.basic_types import IDStr
from models_library.progress_bar import (
ProgressReport,
ProgressStructuredMessage,
Expand Down Expand Up @@ -84,10 +83,11 @@ async def main_fct():
"description": "Optionally defines the step relative weight (defaults to steps of equal weights)"
},
)
description: IDStr = field(metadata={"description": "define the progress name"})
description: str = field(metadata={"description": "define the progress name"})
progress_unit: ProgressUnit | None = None
progress_report_cb: AsyncReportCB | ReportCB | None = None
_current_steps: float = _INITIAL_VALUE
_currnet_attempt: int = 0
_children: list["ProgressBarData"] = field(default_factory=list)
_parent: Optional["ProgressBarData"] = None
_continuous_value_lock: asyncio.Lock = field(init=False)
Expand Down Expand Up @@ -147,6 +147,7 @@ async def _report_external(self, value: float) -> None:
# NOTE: here we convert back to actual value since this is possibly weighted
actual_value=value * self.num_steps,
total=self.num_steps,
attempt=self._currnet_attempt,
unit=self.progress_unit,
message=self.compute_report_message_stuct(),
),
Expand Down Expand Up @@ -197,6 +198,11 @@ async def update(self, steps: float = 1) -> None:
await self._update_parent(parent_update_value)
await self._report_external(new_progress_value)

def reset_progress(self) -> None:
self._currnet_attempt += 1
self._current_steps = _INITIAL_VALUE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that what you mean by the progress never goes "down"?
you silently reset the progress and the user does not know about it.

Let's discuss the following use-case:

  • a large image of 20-60GiB,
  • pulling it takes a very long time, and the progress is at 80%,
  • now comes an error and it will retry, if I get that right then you reset the progress bar which kind of make sense,
    --> but the progress stays at 80%??
    --> what does the user see until it eventually reaches 80% again? nothing?
    if that is the case I don't think that is a nice user experience at all:
    --> it makes user not believe our progress bars at all,
    --> it probably makes the user shutdown the website and call it a day, it's too much waiting without feedback.

As I said, I think what makes sense if I am a user is:

  • the progress bar goes back to 0
  • I get a message saying "sorry something bad happened while pulling your image, we try again, please be patient..."

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one option when you reset is to change the message by adding something like:

new_description = current_progress_bar.description += f"(retrying {attemp})"
current_progress_bar.reset_progress(new_message=new_description)

and reset_progress should then call set_ so that the update is transmitted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took the opportunity to extend the progress bar to also include retry attempts. These are also now displayed directly by the frontend.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see screenshots above in the description

self._last_report_value = _INITIAL_VALUE

async def set_(self, new_value: float) -> None:
await self.update(new_value - self._current_steps)

Expand All @@ -207,7 +213,7 @@ async def finish(self) -> None:
def sub_progress(
self,
steps: int,
description: IDStr,
description: str,
step_weights: list[float] | None = None,
progress_unit: ProgressUnit | None = None,
) -> "ProgressBarData":
Expand Down
27 changes: 27 additions & 0 deletions packages/service-library/tests/test_progress_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,33 @@ async def test_set_progress(caplog: pytest.LogCaptureFixture, faker: Faker):
assert "TIP:" in caplog.messages[0]


async def test_reset_progress(caplog: pytest.LogCaptureFixture, faker: Faker):
async with ProgressBarData(num_steps=50, description=faker.pystr()) as root:
assert root._current_steps == pytest.approx(0) # noqa: SLF001
assert root.num_steps == 50
assert root.step_weights is None
await root.set_(50)
assert root._current_steps == pytest.approx(50) # noqa: SLF001
assert "already reached maximum" not in caplog.text
await root.set_(51)
assert root._current_steps == pytest.approx(50) # noqa: SLF001
assert "already reached maximum" in caplog.text

caplog.clear()
root.reset_progress()

assert root._current_steps == pytest.approx(-1) # noqa: SLF001
assert "already reached maximum" not in caplog.text

await root.set_(12)
assert root._current_steps == pytest.approx(12) # noqa: SLF001
assert "already reached maximum" not in caplog.text

await root.set_(51)
assert root._current_steps == pytest.approx(50) # noqa: SLF001
assert "already reached maximum" in caplog.text


async def test_concurrent_progress_bar(faker: Faker):
async def do_something(root: ProgressBarData):
async with root.sub_progress(steps=50, description=faker.pystr()) as sub:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from pathlib import Path
from tempfile import TemporaryDirectory

from models_library.basic_types import IDStr
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID, StorageFileID
from models_library.users import UserID
Expand Down Expand Up @@ -105,7 +104,7 @@ async def _pull_legacy_archive(
) -> None:
# NOTE: the legacy way of storing states was as zip archives
async with progress_bar.sub_progress(
steps=2, description=IDStr(f"pulling {destination_path.name}")
steps=2, description=f"pulling {destination_path.name}"
) as sub_prog:
with TemporaryDirectory() as tmp_dir_name:
archive_file = Path(tmp_dir_name) / __get_s3_name(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
from asyncio.streams import StreamReader
from pathlib import Path

from common_library.errors_classes import OsparcErrorMixin

from aiocache import cached # type: ignore[import-untyped]
from models_library.basic_types import IDStr
from common_library.errors_classes import OsparcErrorMixin
from pydantic import AnyUrl, ByteSize
from servicelib.progress_bar import ProgressBarData
from servicelib.utils import logged_gather
Expand Down Expand Up @@ -242,7 +240,7 @@ async def _sync_sources(
async with progress_bar.sub_progress(
steps=folder_size,
progress_unit="Byte",
description=IDStr(f"transferring {local_dir.name}"),
description=f"transferring {local_dir.name}",
) as sub_progress:
aws_s3_cli_log_parsers: list[BaseLogParser] = (
[DebugLogParser()] if debug_logs else []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
RequestInfo,
)
from models_library.api_schemas_storage import ETag, FileUploadSchema, UploadedPart
from models_library.basic_types import IDStr, SHA256Str
from models_library.basic_types import SHA256Str
from multidict import MultiMapping
from pydantic import AnyUrl, NonNegativeInt
from servicelib.aiohttp import status
Expand Down Expand Up @@ -216,7 +216,7 @@ async def download_link_to_file(
sub_progress = await stack.enter_async_context(
progress_bar.sub_progress(
steps=file_size or 1,
description=IDStr(f"downloading {file_path.name}"),
description=f"downloading {file_path.name}",
)
)

Expand Down Expand Up @@ -400,7 +400,7 @@ async def upload_file_to_presigned_links(
)
sub_progress = await stack.enter_async_context(
progress_bar.sub_progress(
steps=file_size, description=IDStr(f"uploading {file_name}")
steps=file_size, description=f"uploading {file_name}"
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
LinkType,
UploadedPart,
)
from models_library.basic_types import IDStr, SHA256Str
from models_library.basic_types import SHA256Str
from models_library.projects_nodes_io import LocationID, LocationName, StorageFileID
from models_library.users import UserID
from pydantic import AnyUrl, ByteSize, TypeAdapter
Expand Down Expand Up @@ -364,7 +364,7 @@ async def _upload_path( # pylint: disable=too-many-arguments
)

if not progress_bar:
progress_bar = ProgressBarData(num_steps=1, description=IDStr("uploading"))
progress_bar = ProgressBarData(num_steps=1, description="uploading")

is_directory: bool = isinstance(path_to_upload, Path) and path_to_upload.is_dir()
if (
Expand Down
Loading
Loading