Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 retry pulling image layer when they timeout #7051

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 69 additions & 35 deletions packages/service-library/src/servicelib/docker_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import logging
from collections.abc import Awaitable, Callable
from contextlib import AsyncExitStack
Expand All @@ -11,8 +12,21 @@
from models_library.docker import DockerGenericTag
from models_library.generated_models.docker_rest_api import ProgressDetail
from models_library.utils.change_case import snake_to_camel
from pydantic import BaseModel, ByteSize, ConfigDict, TypeAdapter, ValidationError
from pydantic import (
BaseModel,
ByteSize,
ConfigDict,
NonNegativeInt,
TypeAdapter,
ValidationError,
)
from settings_library.docker_registry import RegistrySettings
from tenacity import (
AsyncRetrying,
retry_if_exception_type,
stop_after_attempt,
wait_random_exponential,
)
from yarl import URL

from .logging_utils import LogLevelInt
Expand Down Expand Up @@ -209,6 +223,8 @@ async def pull_image(
progress_bar: ProgressBarData,
log_cb: LogCB,
image_information: DockerImageManifestsV2 | None,
*,
retry_upon_error_count: NonNegativeInt = 10,
) -> None:
"""pull a docker image to the host machine.

Expand All @@ -219,6 +235,7 @@ async def pull_image(
progress_bar -- the current progress bar
log_cb -- a callback function to send logs to
image_information -- the image layer information. If this is None, then no fine progress will be retrieved.
retry_upon_error_count -- number of tries if there is a TimeoutError. Usually cased by networking issues.
"""
registry_auth = None
if registry_settings.REGISTRY_URL and registry_settings.REGISTRY_URL in image:
Expand All @@ -245,39 +262,56 @@ async def pull_image(

client = await exit_stack.enter_async_context(aiodocker.Docker())

reported_progress = 0.0
async for pull_progress in client.images.pull(
image, stream=True, auth=registry_auth
async for attempt in AsyncRetrying(
wait=wait_random_exponential(),
stop=stop_after_attempt(retry_upon_error_count),
reraise=True,
retry=retry_if_exception_type(asyncio.TimeoutError),
):
try:
parsed_progress = TypeAdapter(_DockerPullImage).validate_python(
pull_progress
)
except ValidationError:
_logger.exception(
"Unexpected error while validating '%s'. "
"TIP: This is probably an unforeseen pull status text that shall be added to the code. "
"The pulling process will still continue.",
f"{pull_progress=}",
)
else:
await _parse_pull_information(
parsed_progress, layer_id_to_size=layer_id_to_size
)

# compute total progress
total_downloaded_size = sum(
layer.downloaded for layer in layer_id_to_size.values()
)
total_extracted_size = sum(
layer.extracted for layer in layer_id_to_size.values()
)
total_progress = (total_downloaded_size + total_extracted_size) / 2.0
progress_to_report = total_progress - reported_progress
await progress_bar.update(progress_to_report)
reported_progress = total_progress

await log_cb(
f"pulling {image_short_name}: {pull_progress}...",
logging.DEBUG,
# each time there is an error progress starts from zero
progress_bar.reset_progress()
_logger.info(
"attempt='%s' to pull image='%s'",
attempt.retry_state.attempt_number,
image,
)

GitHK marked this conversation as resolved.
Show resolved Hide resolved
with attempt:
reported_progress = 0.0
async for pull_progress in client.images.pull(
image, stream=True, auth=registry_auth
):
try:
parsed_progress = TypeAdapter(_DockerPullImage).validate_python(
pull_progress
)
except ValidationError:
_logger.exception(
GitHK marked this conversation as resolved.
Show resolved Hide resolved
"Unexpected error while validating '%s'. "
"TIP: This is probably an unforeseen pull status text that shall be added to the code. "
"The pulling process will still continue.",
f"{pull_progress=}",
)
else:
await _parse_pull_information(
parsed_progress, layer_id_to_size=layer_id_to_size
)

# compute total progress
total_downloaded_size = sum(
layer.downloaded for layer in layer_id_to_size.values()
)
total_extracted_size = sum(
layer.extracted for layer in layer_id_to_size.values()
)
total_progress = (
total_downloaded_size + total_extracted_size
) / 2.0
progress_to_report = total_progress - reported_progress
await progress_bar.update(progress_to_report)
reported_progress = total_progress

await log_cb(
f"pulling {image_short_name}: {pull_progress}...",
logging.DEBUG,
)
3 changes: 3 additions & 0 deletions packages/service-library/src/servicelib/progress_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ async def update(self, steps: float = 1) -> None:
await self._update_parent(parent_update_value)
await self._report_external(new_progress_value)

def reset_progress(self) -> None:
self._current_steps = _INITIAL_VALUE
GitHK marked this conversation as resolved.
Show resolved Hide resolved

async def set_(self, new_value: float) -> None:
await self.update(new_value - self._current_steps)

Expand Down
76 changes: 53 additions & 23 deletions packages/service-library/tests/test_progress_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import pytest
from faker import Faker
from models_library.basic_types import IDStr
from models_library.progress_bar import ProgressReport, ProgressStructuredMessage
from pydantic import ValidationError
from pytest_mock import MockerFixture
Expand Down Expand Up @@ -54,7 +55,7 @@ async def test_progress_bar_progress_report_cb(
num_steps=outer_num_steps,
progress_report_cb=mocked_cb,
progress_unit="Byte",
description=faker.pystr(),
description=IDStr(faker.pystr()),
) as root:
assert root.num_steps == outer_num_steps
assert root.step_weights is None # i.e. all steps have equal weight
Expand Down Expand Up @@ -96,7 +97,7 @@ async def test_progress_bar_progress_report_cb(
# 2nd step is a sub progress bar of 10 steps
inner_num_steps_step2 = 100
async with root.sub_progress(
steps=inner_num_steps_step2, description=faker.pystr()
steps=inner_num_steps_step2, description=IDStr(faker.pystr())
) as sub:
assert sub._current_steps == pytest.approx(0) # noqa: SLF001
assert root._current_steps == pytest.approx(1) # noqa: SLF001
Expand Down Expand Up @@ -125,7 +126,7 @@ async def test_progress_bar_progress_report_cb(
# 3rd step is another subprogress of 50 steps
inner_num_steps_step3 = 50
async with root.sub_progress(
steps=inner_num_steps_step3, description=faker.pystr()
steps=inner_num_steps_step3, description=IDStr(faker.pystr())
) as sub:
assert sub._current_steps == pytest.approx(0) # noqa: SLF001
assert root._current_steps == pytest.approx(2) # noqa: SLF001
Expand All @@ -147,7 +148,7 @@ async def test_progress_bar_progress_report_cb(
def test_creating_progress_bar_with_invalid_unit_fails(faker: Faker):
with pytest.raises(ValidationError):
ProgressBarData(
num_steps=321, progress_unit="invalid", description=faker.pystr()
num_steps=321, progress_unit="invalid", description=IDStr(faker.pystr())
)


Expand All @@ -158,7 +159,7 @@ async def test_progress_bar_always_reports_0_on_creation_and_1_on_finish(
progress_bar = ProgressBarData(
num_steps=num_steps,
progress_report_cb=mocked_progress_bar_cb,
description=faker.pystr(),
description=IDStr(faker.pystr()),
)
assert progress_bar._current_steps == _INITIAL_VALUE # noqa: SLF001
async with progress_bar as root:
Expand Down Expand Up @@ -206,7 +207,7 @@ async def test_progress_bar_always_reports_1_on_finish(
progress_bar = ProgressBarData(
num_steps=num_steps,
progress_report_cb=mocked_progress_bar_cb,
description=faker.pystr(),
description=IDStr(faker.pystr()),
)
assert progress_bar._current_steps == _INITIAL_VALUE # noqa: SLF001
async with progress_bar as root:
Expand Down Expand Up @@ -248,7 +249,7 @@ async def test_progress_bar_always_reports_1_on_finish(


async def test_set_progress(caplog: pytest.LogCaptureFixture, faker: Faker):
async with ProgressBarData(num_steps=50, description=faker.pystr()) as root:
async with ProgressBarData(num_steps=50, description=IDStr(faker.pystr())) as root:
assert root._current_steps == pytest.approx(0) # noqa: SLF001
assert root.num_steps == 50
assert root.step_weights is None
Expand All @@ -262,43 +263,72 @@ async def test_set_progress(caplog: pytest.LogCaptureFixture, faker: Faker):
assert "TIP:" in caplog.messages[0]


async def test_reset_progress(caplog: pytest.LogCaptureFixture, faker: Faker):
async with ProgressBarData(num_steps=50, description=IDStr(faker.pystr())) as root:
assert root._current_steps == pytest.approx(0) # noqa: SLF001
assert root.num_steps == 50
assert root.step_weights is None
await root.set_(50)
assert root._current_steps == pytest.approx(50) # noqa: SLF001
assert "already reached maximum" not in caplog.text
await root.set_(51)
assert root._current_steps == pytest.approx(50) # noqa: SLF001
assert "already reached maximum" in caplog.text
GitHK marked this conversation as resolved.
Show resolved Hide resolved

caplog.clear()
root.reset_progress()

assert root._current_steps == pytest.approx(-1) # noqa: SLF001
assert "already reached maximum" not in caplog.text

await root.set_(12)
assert root._current_steps == pytest.approx(12) # noqa: SLF001
assert "already reached maximum" not in caplog.text

await root.set_(51)
assert root._current_steps == pytest.approx(50) # noqa: SLF001
assert "already reached maximum" in caplog.text
GitHK marked this conversation as resolved.
Show resolved Hide resolved


async def test_concurrent_progress_bar(faker: Faker):
async def do_something(root: ProgressBarData):
async with root.sub_progress(steps=50, description=faker.pystr()) as sub:
async with root.sub_progress(steps=50, description=IDStr(faker.pystr())) as sub:
assert sub.num_steps == 50
assert sub.step_weights is None
assert sub._current_steps == 0 # noqa: SLF001
for n in range(50):
await sub.update()
assert sub._current_steps == (n + 1) # noqa: SLF001

async with ProgressBarData(num_steps=12, description=faker.pystr()) as root:
async with ProgressBarData(num_steps=12, description=IDStr(faker.pystr())) as root:
assert root._current_steps == pytest.approx(0) # noqa: SLF001
assert root.step_weights is None
await asyncio.gather(*[do_something(root) for n in range(12)])
assert root._current_steps == pytest.approx(12) # noqa: SLF001


async def test_too_many_sub_progress_bars_raises(faker: Faker):
async with ProgressBarData(num_steps=2, description=faker.pystr()) as root:
async with ProgressBarData(num_steps=2, description=IDStr(faker.pystr())) as root:
assert root.num_steps == 2
assert root.step_weights is None
async with root.sub_progress(steps=50, description=faker.pystr()) as sub:
async with root.sub_progress(steps=50, description=IDStr(faker.pystr())) as sub:
for _ in range(50):
await sub.update()
async with root.sub_progress(steps=50, description=faker.pystr()) as sub:
async with root.sub_progress(steps=50, description=IDStr(faker.pystr())) as sub:
for _ in range(50):
await sub.update()

with pytest.raises(RuntimeError):
async with root.sub_progress(steps=50, description=faker.pystr()) as sub:
async with root.sub_progress(
steps=50, description=IDStr(faker.pystr())
) as sub:
...


async def test_too_many_updates_does_not_raise_but_show_warning_with_stack(
caplog: pytest.LogCaptureFixture, faker: Faker
):
async with ProgressBarData(num_steps=2, description=faker.pystr()) as root:
async with ProgressBarData(num_steps=2, description=IDStr(faker.pystr())) as root:
assert root.num_steps == 2
assert root.step_weights is None
await root.update()
Expand All @@ -314,7 +344,7 @@ async def test_weighted_progress_bar(mocked_progress_bar_cb: mock.Mock, faker: F
num_steps=outer_num_steps,
step_weights=[1, 3, 1],
progress_report_cb=mocked_progress_bar_cb,
description=faker.pystr(),
description=IDStr(faker.pystr()),
) as root:
mocked_progress_bar_cb.assert_called_once_with(
ProgressReport(
Expand Down Expand Up @@ -369,7 +399,7 @@ async def test_weighted_progress_bar_with_weighted_sub_progress(
num_steps=outer_num_steps,
step_weights=[1, 3, 1],
progress_report_cb=mocked_progress_bar_cb,
description=faker.pystr(),
description=IDStr(faker.pystr()),
) as root:
mocked_progress_bar_cb.assert_called_once_with(
ProgressReport(
Expand All @@ -396,7 +426,7 @@ async def test_weighted_progress_bar_with_weighted_sub_progress(

# 2nd step is a sub progress bar of 5 steps
async with root.sub_progress(
steps=5, step_weights=[2, 5, 1, 2, 3], description=faker.pystr()
steps=5, step_weights=[2, 5, 1, 2, 3], description=IDStr(faker.pystr())
) as sub:
assert sub.step_weights == [2 / 13, 5 / 13, 1 / 13, 2 / 13, 3 / 13, 0]
assert sub._current_steps == pytest.approx(0) # noqa: SLF001
Expand Down Expand Up @@ -457,7 +487,7 @@ async def test_weighted_progress_bar_with_weighted_sub_progress(
async def test_weighted_progress_bar_wrong_num_weights_raises(faker: Faker):
with pytest.raises(RuntimeError):
async with ProgressBarData(
num_steps=3, step_weights=[3, 1], description=faker.pystr()
num_steps=3, step_weights=[3, 1], description=IDStr(faker.pystr())
):
...

Expand All @@ -466,7 +496,7 @@ async def test_weighted_progress_bar_with_0_weights_is_equivalent_to_standard_pr
faker: Faker,
):
async with ProgressBarData(
num_steps=3, step_weights=[0, 0, 0], description=faker.pystr()
num_steps=3, step_weights=[0, 0, 0], description=IDStr(faker.pystr())
) as root:
assert root.step_weights == [1, 1, 1, 0]

Expand All @@ -479,13 +509,13 @@ async def test_concurrent_sub_progress_update_correct_sub_progress(
num_steps=3,
step_weights=[3, 1, 2],
progress_report_cb=mocked_progress_bar_cb,
description=faker.pystr(),
description=IDStr(faker.pystr()),
) as root:
sub_progress1 = root.sub_progress(23, description=faker.pystr())
sub_progress1 = root.sub_progress(23, description=IDStr(faker.pystr()))
assert sub_progress1._current_steps == _INITIAL_VALUE # noqa: SLF001
sub_progress2 = root.sub_progress(45, description=faker.pystr())
sub_progress2 = root.sub_progress(45, description=IDStr(faker.pystr()))
GitHK marked this conversation as resolved.
Show resolved Hide resolved
assert sub_progress2._current_steps == _INITIAL_VALUE # noqa: SLF001
sub_progress3 = root.sub_progress(12, description=faker.pystr())
sub_progress3 = root.sub_progress(12, description=IDStr(faker.pystr()))
assert sub_progress3._current_steps == _INITIAL_VALUE # noqa: SLF001

# NOTE: in a gather call there is no control on which step finishes first
Expand Down
Loading