From 25cab52da83506010bf57a349a8a2ab7811815c0 Mon Sep 17 00:00:00 2001 From: Yichi Yang Date: Sat, 16 Jul 2022 17:55:36 -0700 Subject: [PATCH] Add support to resume incomplete download --- news/11180.feature.rst | 1 + src/pip/_internal/cli/cmdoptions.py | 21 +++ src/pip/_internal/cli/progress_bars.py | 12 +- src/pip/_internal/cli/req_command.py | 4 + src/pip/_internal/network/download.py | 156 ++++++++++++---- src/pip/_internal/operations/prepare.py | 10 +- tests/unit/test_network_download.py | 233 +++++++++++++++++++++++- tests/unit/test_operations_prepare.py | 8 +- tests/unit/test_req.py | 2 + 9 files changed, 400 insertions(+), 47 deletions(-) create mode 100644 news/11180.feature.rst diff --git a/news/11180.feature.rst b/news/11180.feature.rst new file mode 100644 index 00000000000..24931855789 --- /dev/null +++ b/news/11180.feature.rst @@ -0,0 +1 @@ +Add support to resume incomplete download. The behavior can be controlled using flags ``--incomplete-downloads`` and ``--incomplete-download-retries``. diff --git a/src/pip/_internal/cli/cmdoptions.py b/src/pip/_internal/cli/cmdoptions.py index df5152a78c1..e93c719d582 100644 --- a/src/pip/_internal/cli/cmdoptions.py +++ b/src/pip/_internal/cli/cmdoptions.py @@ -1019,6 +1019,25 @@ def check_list_path_option(options: Values) -> None: help=("Enable deprecated functionality, that will be removed in the future."), ) +incomplete_downloads: Callable[..., Option] = partial( + Option, + "--incomplete-downloads", + dest="resume_incomplete", + choices=["resume", "discard"], + default="discard", + metavar="policy", + help="How to handle an incomplete download: resume, discard (default to %default).", +) + +incomplete_download_retries: Callable[..., Option] = partial( + Option, + "--incomplete-download-retries", + dest="resume_attempts", + type="int", + default=5, + help="Maximum number of resumption retries for incomplete download " + "(default %default times).", +) ########## # groups # @@ -1050,6 +1069,8 @@ def check_list_path_option(options: Values) -> None: no_python_version_warning, use_new_feature, use_deprecated_feature, + incomplete_downloads, + incomplete_download_retries, ], } diff --git a/src/pip/_internal/cli/progress_bars.py b/src/pip/_internal/cli/progress_bars.py index 0ad14031ca5..8c58d88a3d4 100644 --- a/src/pip/_internal/cli/progress_bars.py +++ b/src/pip/_internal/cli/progress_bars.py @@ -24,6 +24,7 @@ def _rich_progress_bar( *, bar_type: str, size: int, + initial_progress: Optional[int] = None, ) -> Generator[bytes, None, None]: assert bar_type == "on", "This should only be used in the default mode." @@ -49,6 +50,8 @@ def _rich_progress_bar( progress = Progress(*columns, refresh_per_second=30) task_id = progress.add_task(" " * (get_indentation() + 2), total=total) + if initial_progress is not None: + progress.update(task_id, advance=initial_progress) with progress: for chunk in iterable: yield chunk @@ -56,13 +59,18 @@ def _rich_progress_bar( def get_download_progress_renderer( - *, bar_type: str, size: Optional[int] = None + *, bar_type: str, size: Optional[int] = None, initial_progress: Optional[int] = None ) -> DownloadProgressRenderer: """Get an object that can be used to render the download progress. Returns a callable, that takes an iterable to "wrap". """ if bar_type == "on": - return functools.partial(_rich_progress_bar, bar_type=bar_type, size=size) + return functools.partial( + _rich_progress_bar, + bar_type=bar_type, + size=size, + initial_progress=initial_progress, + ) else: return iter # no-op, when passed an iterator diff --git a/src/pip/_internal/cli/req_command.py b/src/pip/_internal/cli/req_command.py index 6db058869ac..a27483207dd 100644 --- a/src/pip/_internal/cli/req_command.py +++ b/src/pip/_internal/cli/req_command.py @@ -331,6 +331,8 @@ def make_requirement_preparer( "fast-deps has no effect when used with the legacy resolver." ) + resume_incomplete = options.resume_incomplete == "resume" + return RequirementPreparer( build_dir=temp_build_dir_path, src_dir=options.src_dir, @@ -345,6 +347,8 @@ def make_requirement_preparer( use_user_site=use_user_site, lazy_wheel=lazy_wheel, verbosity=verbosity, + resume_incomplete=resume_incomplete, + resume_attempts=options.resume_attempts, ) @classmethod diff --git a/src/pip/_internal/network/download.py b/src/pip/_internal/network/download.py index 79b82a570e5..88c7c0a5a20 100644 --- a/src/pip/_internal/network/download.py +++ b/src/pip/_internal/network/download.py @@ -4,6 +4,7 @@ import logging import mimetypes import os +from http import HTTPStatus from typing import Iterable, Optional, Tuple from pip._vendor.requests.models import CONTENT_CHUNK_SIZE, Response @@ -27,13 +28,21 @@ def _get_http_response_size(resp: Response) -> Optional[int]: return None +def _get_http_response_etag_or_date(resp: Response) -> Optional[str]: + """ + Return either the ETag or Date header (or None if neither exists). + The return value can be used in an If-Range header. + """ + return resp.headers.get("etag", resp.headers.get("date")) + + def _prepare_download( resp: Response, link: Link, progress_bar: str, + total_length: Optional[int], + range_start: Optional[int] = None, ) -> Iterable[bytes]: - total_length = _get_http_response_size(resp) - if link.netloc == PyPI.file_storage_domain: url = link.show_url else: @@ -42,10 +51,17 @@ def _prepare_download( logged_url = redact_auth_from_url(url) if total_length: - logged_url = "{} ({})".format(logged_url, format_size(total_length)) + if range_start is not None: + logged_url = "{} ({}/{})".format( + logged_url, format_size(range_start), format_size(total_length) + ) + else: + logged_url = "{} ({})".format(logged_url, format_size(total_length)) if is_from_cache(resp): logger.info("Using cached %s", logged_url) + elif range_start is not None: + logger.info("Resume download %s", logged_url) else: logger.info("Downloading %s", logged_url) @@ -65,7 +81,9 @@ def _prepare_download( if not show_progress: return chunks - renderer = get_download_progress_renderer(bar_type=progress_bar, size=total_length) + renderer = get_download_progress_renderer( + bar_type=progress_bar, size=total_length, initial_progress=range_start + ) return renderer(chunks) @@ -112,10 +130,27 @@ def _get_http_response_filename(resp: Response, link: Link) -> str: return filename -def _http_get_download(session: PipSession, link: Link) -> Response: +def _http_get_download( + session: PipSession, + link: Link, + range_start: Optional[int] = None, + if_range: Optional[str] = None, +) -> Response: target_url = link.url.split("#", 1)[0] - resp = session.get(target_url, headers=HEADERS, stream=True) - raise_for_status(resp) + try: + headers = {**HEADERS} + # request a partial download + if range_start is not None: + headers["Range"] = "bytes={}-".format(range_start) + # make sure the file hasn't changed + if if_range is not None: + headers["If-Range"] = if_range + resp = session.get(target_url, headers=headers, stream=True) + raise_for_status(resp) + except NetworkConnectionError as e: + assert e.response is not None + logger.critical("HTTP error %s while getting %s", e.response.status_code, link) + raise return resp @@ -124,28 +159,90 @@ def __init__( self, session: PipSession, progress_bar: str, + resume_incomplete: bool, + resume_attempts: int, ) -> None: self._session = session self._progress_bar = progress_bar + self._resume_incomplete = resume_incomplete + assert ( + resume_attempts > 0 + ), "Number of max incomplete download retries must be positive" + self._resume_attempts = resume_attempts def __call__(self, link: Link, location: str) -> Tuple[str, str]: """Download the file given by link into location.""" - try: - resp = _http_get_download(self._session, link) - except NetworkConnectionError as e: - assert e.response is not None - logger.critical( - "HTTP error %s while getting %s", e.response.status_code, link - ) - raise + resp = _http_get_download(self._session, link) + total_length = _get_http_response_size(resp) + etag_or_date = _get_http_response_etag_or_date(resp) filename = _get_http_response_filename(resp, link) filepath = os.path.join(location, filename) - chunks = _prepare_download(resp, link, self._progress_bar) + chunks = _prepare_download(resp, link, self._progress_bar, total_length) + bytes_received = 0 + with open(filepath, "wb") as content_file: + + # Process the initial response for chunk in chunks: + bytes_received += len(chunk) content_file.write(chunk) + + if self._resume_incomplete: + attempts_left = self._resume_attempts + + while total_length is not None and bytes_received < total_length: + if attempts_left <= 0: + break + attempts_left -= 1 + + # Attempt to resume download + resume_resp = _http_get_download( + self._session, + link, + range_start=bytes_received, + if_range=etag_or_date, + ) + + restart = resume_resp.status_code != HTTPStatus.PARTIAL_CONTENT + # If the server responded with 200 (e.g. when the file has been + # modifiedon the server or the server doesn't support range + # requests), reset the download to start from the beginning. + if restart: + content_file.seek(0) + content_file.truncate() + bytes_received = 0 + etag_or_date = _get_http_response_etag_or_date(resume_resp) + + chunks = _prepare_download( + resume_resp, + link, + self._progress_bar, + total_length, + range_start=bytes_received, + ) + for chunk in chunks: + bytes_received += len(chunk) + content_file.write(chunk) + + if total_length is not None and bytes_received < total_length: + if self._resume_incomplete: + logger.critical( + "Failed to download %s after %d resumption attempts.", + link, + self._resume_attempts, + ) + else: + logger.critical( + "Failed to download %s." + " Set --incomplete-downloads=resume to automatically" + "resume incomplete download.", + link, + ) + os.remove(filepath) + raise RuntimeError("Incomplete download") + content_type = resp.headers.get("Content-Type", "") return filepath, content_type @@ -155,32 +252,17 @@ def __init__( self, session: PipSession, progress_bar: str, + resume_incomplete: bool, + resume_attempts: int, ) -> None: - self._session = session - self._progress_bar = progress_bar + self._downloader = Downloader( + session, progress_bar, resume_incomplete, resume_attempts + ) def __call__( self, links: Iterable[Link], location: str ) -> Iterable[Tuple[Link, Tuple[str, str]]]: """Download the files given by links into location.""" for link in links: - try: - resp = _http_get_download(self._session, link) - except NetworkConnectionError as e: - assert e.response is not None - logger.critical( - "HTTP error %s while getting %s", - e.response.status_code, - link, - ) - raise - - filename = _get_http_response_filename(resp, link) - filepath = os.path.join(location, filename) - - chunks = _prepare_download(resp, link, self._progress_bar) - with open(filepath, "wb") as content_file: - for chunk in chunks: - content_file.write(chunk) - content_type = resp.headers.get("Content-Type", "") + filepath, content_type = self._downloader(link, location) yield link, (filepath, content_type) diff --git a/src/pip/_internal/operations/prepare.py b/src/pip/_internal/operations/prepare.py index 80723fffe47..f5c2145a614 100644 --- a/src/pip/_internal/operations/prepare.py +++ b/src/pip/_internal/operations/prepare.py @@ -221,6 +221,8 @@ def __init__( use_user_site: bool, lazy_wheel: bool, verbosity: int, + resume_incomplete: bool, + resume_attempts: int, ) -> None: super().__init__() @@ -228,8 +230,12 @@ def __init__( self.build_dir = build_dir self.build_tracker = build_tracker self._session = session - self._download = Downloader(session, progress_bar) - self._batch_download = BatchDownloader(session, progress_bar) + self._download = Downloader( + session, progress_bar, resume_incomplete, resume_attempts + ) + self._batch_download = BatchDownloader( + session, progress_bar, resume_incomplete, resume_attempts + ) self.finder = finder # Where still-packed archives should be written to. If None, they are diff --git a/tests/unit/test_network_download.py b/tests/unit/test_network_download.py index 53200f2e511..066a5a05ea2 100644 --- a/tests/unit/test_network_download.py +++ b/tests/unit/test_network_download.py @@ -1,52 +1,77 @@ import logging import sys -from typing import Dict +from typing import Dict, List, Optional, Tuple +from unittest.mock import MagicMock, call, patch import pytest from pip._internal.models.link import Link from pip._internal.network.download import ( + Downloader, + _get_http_response_size, + _http_get_download, _prepare_download, parse_content_disposition, sanitize_content_filename, ) +from pip._internal.network.session import PipSession +from pip._internal.network.utils import HEADERS +from tests.lib.path import Path from tests.lib.requests_mocks import MockResponse @pytest.mark.parametrize( - "url, headers, from_cache, expected", + "url, headers, from_cache, range_start, expected", [ ( "http://example.com/foo.tgz", {}, False, + None, "Downloading http://example.com/foo.tgz", ), ( "http://example.com/foo.tgz", {"content-length": "2"}, False, + None, "Downloading http://example.com/foo.tgz (2 bytes)", ), ( "http://example.com/foo.tgz", {"content-length": "2"}, True, + None, "Using cached http://example.com/foo.tgz (2 bytes)", ), - ("https://files.pythonhosted.org/foo.tgz", {}, False, "Downloading foo.tgz"), + ( + "https://files.pythonhosted.org/foo.tgz", + {}, + False, + None, + "Downloading foo.tgz", + ), ( "https://files.pythonhosted.org/foo.tgz", {"content-length": "2"}, False, + None, "Downloading foo.tgz (2 bytes)", ), ( "https://files.pythonhosted.org/foo.tgz", {"content-length": "2"}, True, + None, "Using cached foo.tgz", ), + ( + "http://example.com/foo.tgz", + {"content-length": "200"}, + False, + 100, + "Resume download http://example.com/foo.tgz (100 bytes/200 bytes)", + ), ], ) def test_prepare_download__log( @@ -54,6 +79,7 @@ def test_prepare_download__log( url: str, headers: Dict[str, str], from_cache: bool, + range_start: Optional[int], expected: str, ) -> None: caplog.set_level(logging.INFO) @@ -63,7 +89,14 @@ def test_prepare_download__log( if from_cache: resp.from_cache = from_cache link = Link(url) - _prepare_download(resp, link, progress_bar="on") + total_length = _get_http_response_size(resp) + _prepare_download( + resp, + link, + progress_bar="on", + total_length=total_length, + range_start=range_start, + ) assert len(caplog.records) == 1 record = caplog.records[0] @@ -113,6 +146,29 @@ def test_sanitize_content_filename__platform_dependent( assert sanitize_content_filename(filename) == expected +@pytest.mark.parametrize( + "range_start, if_range, expected_headers", + [ + (None, None, HEADERS), + (1234, None, {**HEADERS, "Range": "bytes=1234-"}), + (1234, '"etag"', {**HEADERS, "Range": "bytes=1234-", "If-Range": '"etag"'}), + ], +) +def test_http_get_download( + range_start: Optional[int], + if_range: Optional[str], + expected_headers: Dict[str, str], +) -> None: + session = PipSession() + session.get = MagicMock() + link = Link("http://example.com/foo.tgz") + with patch("pip._internal.network.download.raise_for_status"): + _http_get_download(session, link, range_start, if_range) + session.get.assert_called_once_with( + "http://example.com/foo.tgz", headers=expected_headers, stream=True + ) + + @pytest.mark.parametrize( "content_disposition, default_filename, expected", [ @@ -124,3 +180,172 @@ def test_parse_content_disposition( ) -> None: actual = parse_content_disposition(content_disposition, default_filename) assert actual == expected + + +@pytest.mark.parametrize( + "resume_incomplete," + "resume_attempts," + "mock_responses," + "expected_resume_args," + "expected_bytes", + [ + # If content-length is not provided, the download will + # always "succeed" since we don't have a way to check if + # the download is complete. + ( + False, + 5, + [({}, 200, b"0cfa7e9d-1868-4dd7-9fb3-f2561d5dfd89")], + [], + b"0cfa7e9d-1868-4dd7-9fb3-f2561d5dfd89", + ), + # Complete download (content-length matches body) + ( + False, + 5, + [({"content-length": "36"}, 200, b"0cfa7e9d-1868-4dd7-9fb3-f2561d5dfd89")], + [], + b"0cfa7e9d-1868-4dd7-9fb3-f2561d5dfd89", + ), + # Incomplete download with auto resume disabled + ( + False, + 5, + [({"content-length": "36"}, 200, b"0cfa7e9d-1868-4dd7-9fb3-")], + [], + None, + ), + # Incomplete download with auto resume enabled + ( + True, + 5, + [ + ({"content-length": "36"}, 200, b"0cfa7e9d-1868-4dd7-9fb3-"), + ({"content-length": "12"}, 206, b"f2561d5dfd89"), + ], + [(24, None)], + b"0cfa7e9d-1868-4dd7-9fb3-f2561d5dfd89", + ), + # If the server responses with 200 (e.g. no range header support or the file + # has changed between the requests) the downloader should restart instead of + # resume. The downloaded file should not be affected. + ( + True, + 5, + [ + ({"content-length": "36"}, 200, b"0cfa7e9d-1868-4dd7-9fb3-"), + ({"content-length": "36"}, 200, b"0cfa7e9d-1868-"), + ( + {"content-length": "36"}, + 200, + b"0cfa7e9d-1868-4dd7-9fb3-f2561d5dfd89", + ), + ], + [(24, None), (14, None)], + b"0cfa7e9d-1868-4dd7-9fb3-f2561d5dfd89", + ), + # The downloader should fail after resume_attempts attempts. + # This prevents the downloader from getting stuck if the connection + # is unstable and the server doesn't not support range requests. + ( + True, + 1, + [ + ({"content-length": "36"}, 200, b"0cfa7e9d-1868-4dd7-9fb3-"), + ({"content-length": "36"}, 200, b"0cfa7e9d-1868-"), + ], + [(24, None)], + None, + ), + # The downloader should use If-Range header to make the range + # request conditional if it is possible to check for modifications + # (e.g. if we know the creation time of the initial response). + ( + True, + 5, + [ + ( + {"content-length": "36", "date": "Wed, 21 Oct 2015 07:28:00 GMT"}, + 200, + b"0cfa7e9d-1868-4dd7-9fb3-", + ), + ( + {"content-length": "12", "date": "Wed, 21 Oct 2015 07:54:00 GMT"}, + 206, + b"f2561d5dfd89", + ), + ], + [(24, "Wed, 21 Oct 2015 07:28:00 GMT")], + b"0cfa7e9d-1868-4dd7-9fb3-f2561d5dfd89", + ), + # ETag is preferred over Date for the If-Range condition. + ( + True, + 5, + [ + ( + { + "content-length": "36", + "date": "Wed, 21 Oct 2015 07:28:00 GMT", + "etag": '"33a64df551425fcc55e4d42a148795d9f25f89d4"', + }, + 200, + b"0cfa7e9d-1868-4dd7-9fb3-", + ), + ( + { + "content-length": "12", + "date": "Wed, 21 Oct 2015 07:54:00 GMT", + "etag": '"33a64df551425fcc55e4d42a148795d9f25f89d4"', + }, + 206, + b"f2561d5dfd89", + ), + ], + [(24, '"33a64df551425fcc55e4d42a148795d9f25f89d4"')], + b"0cfa7e9d-1868-4dd7-9fb3-f2561d5dfd89", + ), + ], +) +def test_downloader( + resume_incomplete: bool, + resume_attempts: int, + mock_responses: List[Tuple[Dict[str, str], int, bytes]], + # list of (range_start, if_range) + expected_resume_args: List[Tuple[Optional[int], Optional[int]]], + # expected_bytes is None means the download should fail + expected_bytes: Optional[bytes], + tmpdir: Path, +) -> None: + session = PipSession() + link = Link("http://example.com/foo.tgz") + downloader = Downloader(session, "on", resume_incomplete, resume_attempts) + + responses = [] + for headers, status_code, body in mock_responses: + resp = MockResponse(body) + resp.headers = headers + resp.status_code = status_code + responses.append(resp) + _http_get_download = MagicMock(side_effect=responses) + + with patch("pip._internal.network.download._http_get_download", _http_get_download): + if expected_bytes is None: + remove = MagicMock(return_value=None) + with patch("os.remove", remove): + with pytest.raises(RuntimeError): + downloader(link, str(tmpdir)) + # Make sure the incomplete file is removed + remove.assert_called_once() + else: + filepath, _ = downloader(link, str(tmpdir)) + with open(filepath, "rb") as downloaded_file: + downloaded_bytes = downloaded_file.read() + assert downloaded_bytes == expected_bytes + + calls = [call(session, link)] # the initial request + for range_start, if_range in expected_resume_args: + calls.append(call(session, link, range_start=range_start, if_range=if_range)) + + # Make sure that the download makes additional requests for resumption + _http_get_download.assert_has_calls(calls) diff --git a/tests/unit/test_operations_prepare.py b/tests/unit/test_operations_prepare.py index 8838fa9ce0e..a9b97240d70 100644 --- a/tests/unit/test_operations_prepare.py +++ b/tests/unit/test_operations_prepare.py @@ -32,7 +32,9 @@ def _fake_session_get(*args: Any, **kwargs: Any) -> Dict[str, str]: session = Mock() session.get = _fake_session_get - download = Downloader(session, progress_bar="on") + download = Downloader( + session, progress_bar="on", resume_incomplete=False, resume_attempts=5 + ) uri = path_to_url(data.packages.joinpath("simple-1.0.tar.gz")) link = Link(uri) @@ -78,7 +80,9 @@ def test_download_http_url__no_directory_traversal( "content-disposition": 'attachment;filename="../out_dir_file"', } session.get.return_value = resp - download = Downloader(session, progress_bar="on") + download = Downloader( + session, progress_bar="on", resume_incomplete=False, resume_attempts=5 + ) download_dir = tmpdir.joinpath("download") os.mkdir(download_dir) diff --git a/tests/unit/test_req.py b/tests/unit/test_req.py index 18932bb344d..d11a598f636 100644 --- a/tests/unit/test_req.py +++ b/tests/unit/test_req.py @@ -107,6 +107,8 @@ def _basic_resolver( use_user_site=False, lazy_wheel=False, verbosity=0, + resume_incomplete=False, + resume_attempts=5, ) yield Resolver( preparer=preparer,