diff --git a/news/12923.feature.rst b/news/12923.feature.rst new file mode 100644 index 00000000000..e152a8d5f1a --- /dev/null +++ b/news/12923.feature.rst @@ -0,0 +1 @@ +Download concrete dists for metadata-only resolves in parallel using worker threads. Add ``--batch-download-parallelism`` CLI flag to limit parallelism. diff --git a/src/pip/_internal/cli/cmdoptions.py b/src/pip/_internal/cli/cmdoptions.py index 3bb7cbf4246..4b7a3e9b51b 100644 --- a/src/pip/_internal/cli/cmdoptions.py +++ b/src/pip/_internal/cli/cmdoptions.py @@ -236,6 +236,23 @@ class PipOption(Option): ) +batch_download_parallelism: Callable[..., Option] = partial( + Option, + "--batch-download-parallelism", + dest="batch_download_parallelism", + type="int", + default=10, + help=( + "Maximum parallelism employed for batch downloading of metadata-only dists" + " (default %default parallel requests)." + " Note that more than 10 downloads may overflow the requests connection pool," + " which may affect performance." + " Note also that commands such as 'install --dry-run' should avoid downloads" + " entirely, and so will not be affected by this option." + ), +) + + log: Callable[..., Option] = partial( PipOption, "--log", diff --git a/src/pip/_internal/cli/req_command.py b/src/pip/_internal/cli/req_command.py index 521cd7c917c..e215d64fe04 100644 --- a/src/pip/_internal/cli/req_command.py +++ b/src/pip/_internal/cli/req_command.py @@ -101,6 +101,7 @@ def make_requirement_preparer( use_user_site: bool, download_dir: Optional[str] = None, verbosity: int = 0, + batch_download_parallelism: Optional[int] = None, ) -> RequirementPreparer: """ Create a RequirementPreparer instance for the given parameters. @@ -144,6 +145,7 @@ def make_requirement_preparer( verbosity=verbosity, quietness=options.quiet, color=not options.no_color, + batch_download_parallelism=batch_download_parallelism, legacy_resolver=legacy_resolver, ) diff --git a/src/pip/_internal/commands/download.py b/src/pip/_internal/commands/download.py index 917bbb91d83..c2a090de1a3 100644 --- a/src/pip/_internal/commands/download.py +++ b/src/pip/_internal/commands/download.py @@ -47,6 +47,7 @@ def add_options(self) -> None: self.cmd_opts.add_option(cmdoptions.pre()) self.cmd_opts.add_option(cmdoptions.require_hashes()) self.cmd_opts.add_option(cmdoptions.progress_bar()) + self.cmd_opts.add_option(cmdoptions.batch_download_parallelism()) self.cmd_opts.add_option(cmdoptions.no_build_isolation()) self.cmd_opts.add_option(cmdoptions.use_pep517()) self.cmd_opts.add_option(cmdoptions.no_use_pep517()) @@ -116,6 +117,7 @@ def run(self, options: Values, args: List[str]) -> int: download_dir=options.download_dir, use_user_site=False, verbosity=self.verbosity, + batch_download_parallelism=options.batch_download_parallelism, ) resolver = self.make_resolver( diff --git a/src/pip/_internal/commands/install.py b/src/pip/_internal/commands/install.py index ad45a2f2a57..fda0e76ba23 100644 --- a/src/pip/_internal/commands/install.py +++ b/src/pip/_internal/commands/install.py @@ -237,6 +237,7 @@ def add_options(self) -> None: self.cmd_opts.add_option(cmdoptions.prefer_binary()) self.cmd_opts.add_option(cmdoptions.require_hashes()) self.cmd_opts.add_option(cmdoptions.progress_bar()) + self.cmd_opts.add_option(cmdoptions.batch_download_parallelism()) self.cmd_opts.add_option(cmdoptions.root_user_action()) index_opts = cmdoptions.make_option_group( @@ -359,6 +360,7 @@ def run(self, options: Values, args: List[str]) -> int: finder=finder, use_user_site=options.use_user_site, verbosity=self.verbosity, + batch_download_parallelism=options.batch_download_parallelism, ) resolver = self.make_resolver( preparer=preparer, diff --git a/src/pip/_internal/commands/wheel.py b/src/pip/_internal/commands/wheel.py index 278719f4e0c..21f11bfe49c 100644 --- a/src/pip/_internal/commands/wheel.py +++ b/src/pip/_internal/commands/wheel.py @@ -67,6 +67,7 @@ def add_options(self) -> None: self.cmd_opts.add_option(cmdoptions.ignore_requires_python()) self.cmd_opts.add_option(cmdoptions.no_deps()) self.cmd_opts.add_option(cmdoptions.progress_bar()) + self.cmd_opts.add_option(cmdoptions.batch_download_parallelism()) self.cmd_opts.add_option( "--no-verify", @@ -131,6 +132,7 @@ def run(self, options: Values, args: List[str]) -> int: download_dir=options.wheel_dir, use_user_site=False, verbosity=self.verbosity, + batch_download_parallelism=options.batch_download_parallelism, ) resolver = self.make_resolver( diff --git a/src/pip/_internal/network/download.py b/src/pip/_internal/network/download.py index ebae94fc7f1..d39780b2606 100644 --- a/src/pip/_internal/network/download.py +++ b/src/pip/_internal/network/download.py @@ -5,18 +5,20 @@ import logging import mimetypes import os +from contextlib import contextmanager from pathlib import Path -from typing import Iterable, List, Mapping, Optional, Tuple +from queue import Queue +from threading import Event, Semaphore, Thread +from typing import Iterable, Iterator, List, Mapping, Optional, Tuple, Union from pip._vendor.requests.models import Response -from pip._vendor.rich.progress import TaskID from pip._internal.cli.progress_bars import ( BatchedProgress, ProgressBarType, get_download_progress_renderer, ) -from pip._internal.exceptions import NetworkConnectionError +from pip._internal.exceptions import CommandError, NetworkConnectionError from pip._internal.models.index import PyPI from pip._internal.models.link import Link from pip._internal.network.cache import is_from_cache @@ -201,6 +203,104 @@ def __call__(self, link: Link, location: str) -> Tuple[str, str]: return filepath, content_type +class _ErrorReceiver: + def __init__(self, error_flag: Event) -> None: + self._error_flag = error_flag + self._thread_exception: Optional[BaseException] = None + + def receive_error(self, exc: BaseException) -> None: + self._error_flag.set() + self._thread_exception = exc + + def stored_error(self) -> Optional[BaseException]: + return self._thread_exception + + +@contextmanager +def _spawn_workers( + workers: List[Thread], error_flag: Event +) -> Iterator[_ErrorReceiver]: + err_recv = _ErrorReceiver(error_flag) + try: + for w in workers: + w.start() + yield err_recv + except BaseException as e: + err_recv.receive_error(e) + finally: + thread_exception = err_recv.stored_error() + if thread_exception is not None: + logger.critical("Received exception, shutting down downloader threads...") + + # Ensure each thread is complete by the time the queue has exited, either by + # writing the full request contents, or by checking the Event from an exception. + for w in workers: + # If the user (reasonably) wants to hit ^C again to try to make it close + # faster, we want to avoid spewing out a ton of error text, but at least + # let's let them know we hear them and we're trying to shut down! + while w.is_alive(): + try: + w.join() + except BaseException: + logger.critical("Shutting down worker threads, please wait...") + + if thread_exception is not None: + raise thread_exception + + +def _copy_chunks( + output_queue: "Queue[Union[Tuple[Link, Path, Optional[str]], BaseException]]", + error_flag: Event, + semaphore: Semaphore, + session: PipSession, + location: Path, + batched_progress: BatchedProgress, + download_info: Tuple[Link, Tuple[Optional[int], str]], +) -> None: + link, (cur_length, filename) = download_info + + _log_download_link(link, cur_length) + task_id = batched_progress.add_subtask(filename, cur_length) + with semaphore: + # Check if another thread exited with an exception before we started. + if error_flag.is_set(): + return + try: + try: + resp = _http_get_download(session, link) + except NetworkConnectionError as e: + assert e.response is not None + logger.critical( + "HTTP error %s while getting %s", + e.response.status_code, + link, + ) + raise + + filepath = location / filename + content_type = resp.headers.get("Content-Type") + # TODO: different chunk size for batched downloads? + chunks = response_chunks(resp) + with filepath.open("wb") as output_file: + # Notify that the current task has begun. + batched_progress.start_subtask(task_id) + for chunk in chunks: + # Check if another thread exited with an exception between chunks. + if error_flag.is_set(): + return + # Copy chunk directly to output file, without any + # additional buffering. + output_file.write(chunk) + # Update progress. + batched_progress.advance_subtask(task_id, len(chunk)) + + output_queue.put((link, filepath, content_type)) + except Exception as e: + output_queue.put(e) + finally: + batched_progress.finish_subtask(task_id) + + class BatchDownloader: def __init__( self, @@ -208,12 +308,21 @@ def __init__( progress_bar: ProgressBarType, quiet: bool = False, color: bool = True, + max_parallelism: Optional[int] = None, ) -> None: self._session = session self._progress_bar = progress_bar self._quiet = quiet self._color = color + if max_parallelism is None: + max_parallelism = 1 + if max_parallelism < 1: + raise CommandError( + f"invalid batch download parallelism {max_parallelism}: must be >=1" + ) + self._max_parallelism: int = max_parallelism + def __call__( self, links: Iterable[Link], location: Path ) -> Iterable[Tuple[Link, Tuple[Path, Optional[str]]]]: @@ -232,48 +341,63 @@ def __call__( assert total_length is not None total_length += maybe_len + # Set up state to track thread progress, including inner exceptions. + total_downloads: int = len(links_with_lengths) + completed_downloads: int = 0 + q: "Queue[Union[Tuple[Link, Path, Optional[str]], BaseException]]" = Queue() + error_flag = Event() + # Limit downloads to 10 at a time so we can reuse our connection pool. + semaphore = Semaphore(value=self._max_parallelism) batched_progress = BatchedProgress.select_progress_bar( self._progress_bar ).create( - num_tasks=len(links_with_lengths), + num_tasks=total_downloads, known_total_length=total_length, quiet=self._quiet, color=self._color, ) - link_tasks: List[Tuple[Link, TaskID, str]] = [] - for link, (maybe_len, filename) in links_with_lengths: - _log_download_link(link, maybe_len) - task_id = batched_progress.add_subtask(filename, maybe_len) - link_tasks.append((link, task_id, filename)) + # Distribute request i/o across equivalent threads. + # NB: event-based/async is likely a better model than thread-per-request, but + # (1) pip doesn't use async anywhere else yet, + # (2) this is at most one thread per dependency in the graph (less if any + # are cached) + # (3) pip is fundamentally run in a synchronous context with a clear start + # and end, instead of e.g. as a server which needs to process + # arbitrary further requests at the same time. + # For these reasons, thread-per-request should be sufficient for our needs. + workers = [ + Thread( + target=_copy_chunks, + args=( + q, + error_flag, + semaphore, + self._session, + location, + batched_progress, + download_info, + ), + ) + for download_info in links_with_lengths + ] - with batched_progress: - for link, task_id, filename in link_tasks: - try: - resp = _http_get_download(self._session, link) - except NetworkConnectionError as e: - assert e.response is not None - logger.critical( - "HTTP error %s while getting %s", - e.response.status_code, - link, - ) - raise - - filepath = location / filename - content_type = resp.headers.get("Content-Type") - # TODO: different chunk size for batched downloads? - chunks = response_chunks(resp) - with open(filepath, "wb") as content_file: - # Notify that the current task has begun. - batched_progress.start_subtask(task_id) - for chunk in chunks: - # Copy chunk directly to output file, without any - # additional buffering. - content_file.write(chunk) - # Update progress. - batched_progress.advance_subtask(task_id, len(chunk)) - # Notify of completion. - batched_progress.finish_subtask(task_id) - # Yield completed link and download path. - yield link, (filepath, content_type) + with _spawn_workers(workers, error_flag) as err_recv: + with batched_progress: + # Read completed downloads from queue, or extract the exception. + while completed_downloads < total_downloads: + # Get item from queue, but also check for ^C from user! + try: + item = q.get() + except BaseException as e: + err_recv.receive_error(e) + break + # Now see if the worker thread failed with an exception (unlikely). + if isinstance(item, BaseException): + err_recv.receive_error(item) + break + # Otherwise, the thread succeeded, and we can pass it to + # the preparer! + link, filepath, content_type = item + completed_downloads += 1 + yield link, (filepath, content_type) diff --git a/src/pip/_internal/operations/prepare.py b/src/pip/_internal/operations/prepare.py index cd716ea8903..a445c5e17f2 100644 --- a/src/pip/_internal/operations/prepare.py +++ b/src/pip/_internal/operations/prepare.py @@ -233,6 +233,7 @@ def __init__( # noqa: PLR0913 verbosity: int, quietness: int, color: bool, + batch_download_parallelism: Optional[int], legacy_resolver: bool, ) -> None: super().__init__() @@ -249,6 +250,7 @@ def __init__( # noqa: PLR0913 progress_bar, quiet=quietness > 0, color=color, + max_parallelism=batch_download_parallelism, ) self.finder = finder diff --git a/tests/functional/test_download.py b/tests/functional/test_download.py index d469e71c360..c620773837a 100644 --- a/tests/functional/test_download.py +++ b/tests/functional/test_download.py @@ -1288,6 +1288,8 @@ def run_for_generated_index( str(download_dir), "-i", "http://localhost:8000", + "--batch-download-parallelism", + "1", *args, ] result = script.pip(*pip_args, allow_error=allow_error) diff --git a/tests/unit/test_req.py b/tests/unit/test_req.py index 460a4ae7549..5fa4a2e8159 100644 --- a/tests/unit/test_req.py +++ b/tests/unit/test_req.py @@ -109,6 +109,7 @@ def _basic_resolver( verbosity=0, quietness=0, color=True, + batch_download_parallelism=None, legacy_resolver=True, ) yield Resolver(