Skip to content

Commit

Permalink
execute batch downloads in parallel worker threads
Browse files Browse the repository at this point in the history
- limit downloads to 10 at a time instead of starting all at once
- add cli arg to limit download parallelism
- factor out receiving thread exceptions into a contextmanager
- default batch parallelism to 10
- make batch download parallelism 1 in html index test
  • Loading branch information
cosmicexplorer committed Aug 20, 2024
1 parent 847fee1 commit 481e1c4
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 39 deletions.
1 change: 1 addition & 0 deletions news/12923.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Download concrete dists for metadata-only resolves in parallel using worker threads. Add ``--batch-download-parallelism`` CLI flag to limit parallelism.
17 changes: 17 additions & 0 deletions src/pip/_internal/cli/cmdoptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,23 @@ class PipOption(Option):
)


batch_download_parallelism: Callable[..., Option] = partial(
Option,
"--batch-download-parallelism",
dest="batch_download_parallelism",
type="int",
default=10,
help=(
"Maximum parallelism employed for batch downloading of metadata-only dists"
" (default %default parallel requests)."
" Note that more than 10 downloads may overflow the requests connection pool,"
" which may affect performance."
" Note also that commands such as 'install --dry-run' should avoid downloads"
" entirely, and so will not be affected by this option."
),
)


log: Callable[..., Option] = partial(
PipOption,
"--log",
Expand Down
2 changes: 2 additions & 0 deletions src/pip/_internal/cli/req_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def make_requirement_preparer(
use_user_site: bool,
download_dir: Optional[str] = None,
verbosity: int = 0,
batch_download_parallelism: Optional[int] = None,
) -> RequirementPreparer:
"""
Create a RequirementPreparer instance for the given parameters.
Expand Down Expand Up @@ -144,6 +145,7 @@ def make_requirement_preparer(
verbosity=verbosity,
quietness=options.quiet,
color=not options.no_color,
batch_download_parallelism=batch_download_parallelism,
legacy_resolver=legacy_resolver,
)

Expand Down
2 changes: 2 additions & 0 deletions src/pip/_internal/commands/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def add_options(self) -> None:
self.cmd_opts.add_option(cmdoptions.pre())
self.cmd_opts.add_option(cmdoptions.require_hashes())
self.cmd_opts.add_option(cmdoptions.progress_bar())
self.cmd_opts.add_option(cmdoptions.batch_download_parallelism())
self.cmd_opts.add_option(cmdoptions.no_build_isolation())
self.cmd_opts.add_option(cmdoptions.use_pep517())
self.cmd_opts.add_option(cmdoptions.no_use_pep517())
Expand Down Expand Up @@ -116,6 +117,7 @@ def run(self, options: Values, args: List[str]) -> int:
download_dir=options.download_dir,
use_user_site=False,
verbosity=self.verbosity,
batch_download_parallelism=options.batch_download_parallelism,
)

resolver = self.make_resolver(
Expand Down
2 changes: 2 additions & 0 deletions src/pip/_internal/commands/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ def add_options(self) -> None:
self.cmd_opts.add_option(cmdoptions.prefer_binary())
self.cmd_opts.add_option(cmdoptions.require_hashes())
self.cmd_opts.add_option(cmdoptions.progress_bar())
self.cmd_opts.add_option(cmdoptions.batch_download_parallelism())
self.cmd_opts.add_option(cmdoptions.root_user_action())

index_opts = cmdoptions.make_option_group(
Expand Down Expand Up @@ -359,6 +360,7 @@ def run(self, options: Values, args: List[str]) -> int:
finder=finder,
use_user_site=options.use_user_site,
verbosity=self.verbosity,
batch_download_parallelism=options.batch_download_parallelism,
)
resolver = self.make_resolver(
preparer=preparer,
Expand Down
2 changes: 2 additions & 0 deletions src/pip/_internal/commands/wheel.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def add_options(self) -> None:
self.cmd_opts.add_option(cmdoptions.ignore_requires_python())
self.cmd_opts.add_option(cmdoptions.no_deps())
self.cmd_opts.add_option(cmdoptions.progress_bar())
self.cmd_opts.add_option(cmdoptions.batch_download_parallelism())

self.cmd_opts.add_option(
"--no-verify",
Expand Down Expand Up @@ -131,6 +132,7 @@ def run(self, options: Values, args: List[str]) -> int:
download_dir=options.wheel_dir,
use_user_site=False,
verbosity=self.verbosity,
batch_download_parallelism=options.batch_download_parallelism,
)

resolver = self.make_resolver(
Expand Down
202 changes: 163 additions & 39 deletions src/pip/_internal/network/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@
import logging
import mimetypes
import os
from contextlib import contextmanager
from pathlib import Path
from typing import Iterable, List, Mapping, Optional, Tuple
from queue import Queue
from threading import Event, Semaphore, Thread
from typing import Iterable, Iterator, List, Mapping, Optional, Tuple, Union

from pip._vendor.requests.models import Response
from pip._vendor.rich.progress import TaskID

from pip._internal.cli.progress_bars import (
BatchedProgress,
ProgressBarType,
get_download_progress_renderer,
)
from pip._internal.exceptions import NetworkConnectionError
from pip._internal.exceptions import CommandError, NetworkConnectionError
from pip._internal.models.index import PyPI
from pip._internal.models.link import Link
from pip._internal.network.cache import is_from_cache
Expand Down Expand Up @@ -201,19 +203,126 @@ def __call__(self, link: Link, location: str) -> Tuple[str, str]:
return filepath, content_type


class _ErrorReceiver:
def __init__(self, error_flag: Event) -> None:
self._error_flag = error_flag
self._thread_exception: Optional[BaseException] = None

def receive_error(self, exc: BaseException) -> None:
self._error_flag.set()
self._thread_exception = exc

def stored_error(self) -> Optional[BaseException]:
return self._thread_exception


@contextmanager
def _spawn_workers(
workers: List[Thread], error_flag: Event
) -> Iterator[_ErrorReceiver]:
err_recv = _ErrorReceiver(error_flag)
try:
for w in workers:
w.start()
yield err_recv
except BaseException as e:
err_recv.receive_error(e)
finally:
thread_exception = err_recv.stored_error()
if thread_exception is not None:
logger.critical("Received exception, shutting down downloader threads...")

# Ensure each thread is complete by the time the queue has exited, either by
# writing the full request contents, or by checking the Event from an exception.
for w in workers:
# If the user (reasonably) wants to hit ^C again to try to make it close
# faster, we want to avoid spewing out a ton of error text, but at least
# let's let them know we hear them and we're trying to shut down!
while w.is_alive():
try:
w.join()
except BaseException:
logger.critical("Shutting down worker threads, please wait...")

if thread_exception is not None:
raise thread_exception


def _copy_chunks(
output_queue: "Queue[Union[Tuple[Link, Path, Optional[str]], BaseException]]",
error_flag: Event,
semaphore: Semaphore,
session: PipSession,
location: Path,
batched_progress: BatchedProgress,
download_info: Tuple[Link, Tuple[Optional[int], str]],
) -> None:
link, (cur_length, filename) = download_info

_log_download_link(link, cur_length)
task_id = batched_progress.add_subtask(filename, cur_length)
with semaphore:
# Check if another thread exited with an exception before we started.
if error_flag.is_set():
return
try:
try:
resp = _http_get_download(session, link)
except NetworkConnectionError as e:
assert e.response is not None
logger.critical(
"HTTP error %s while getting %s",
e.response.status_code,
link,
)
raise

filepath = location / filename
content_type = resp.headers.get("Content-Type")
# TODO: different chunk size for batched downloads?
chunks = response_chunks(resp)
with filepath.open("wb") as output_file:
# Notify that the current task has begun.
batched_progress.start_subtask(task_id)
for chunk in chunks:
# Check if another thread exited with an exception between chunks.
if error_flag.is_set():
return
# Copy chunk directly to output file, without any
# additional buffering.
output_file.write(chunk)
# Update progress.
batched_progress.advance_subtask(task_id, len(chunk))

output_queue.put((link, filepath, content_type))
except Exception as e:
output_queue.put(e)
finally:
batched_progress.finish_subtask(task_id)


class BatchDownloader:
def __init__(
self,
session: PipSession,
progress_bar: ProgressBarType,
quiet: bool = False,
color: bool = True,
max_parallelism: Optional[int] = None,
) -> None:
self._session = session
self._progress_bar = progress_bar
self._quiet = quiet
self._color = color

if max_parallelism is None:
max_parallelism = 1
if max_parallelism < 1:
raise CommandError(
f"invalid batch download parallelism {max_parallelism}: must be >=1"
)
self._max_parallelism: int = max_parallelism

def __call__(
self, links: Iterable[Link], location: Path
) -> Iterable[Tuple[Link, Tuple[Path, Optional[str]]]]:
Expand All @@ -232,48 +341,63 @@ def __call__(
assert total_length is not None
total_length += maybe_len

# Set up state to track thread progress, including inner exceptions.
total_downloads: int = len(links_with_lengths)
completed_downloads: int = 0
q: "Queue[Union[Tuple[Link, Path, Optional[str]], BaseException]]" = Queue()
error_flag = Event()
# Limit downloads to 10 at a time so we can reuse our connection pool.
semaphore = Semaphore(value=self._max_parallelism)
batched_progress = BatchedProgress.select_progress_bar(
self._progress_bar
).create(
num_tasks=len(links_with_lengths),
num_tasks=total_downloads,
known_total_length=total_length,
quiet=self._quiet,
color=self._color,
)

link_tasks: List[Tuple[Link, TaskID, str]] = []
for link, (maybe_len, filename) in links_with_lengths:
_log_download_link(link, maybe_len)
task_id = batched_progress.add_subtask(filename, maybe_len)
link_tasks.append((link, task_id, filename))
# Distribute request i/o across equivalent threads.
# NB: event-based/async is likely a better model than thread-per-request, but
# (1) pip doesn't use async anywhere else yet,
# (2) this is at most one thread per dependency in the graph (less if any
# are cached)
# (3) pip is fundamentally run in a synchronous context with a clear start
# and end, instead of e.g. as a server which needs to process
# arbitrary further requests at the same time.
# For these reasons, thread-per-request should be sufficient for our needs.
workers = [
Thread(
target=_copy_chunks,
args=(
q,
error_flag,
semaphore,
self._session,
location,
batched_progress,
download_info,
),
)
for download_info in links_with_lengths
]

with batched_progress:
for link, task_id, filename in link_tasks:
try:
resp = _http_get_download(self._session, link)
except NetworkConnectionError as e:
assert e.response is not None
logger.critical(
"HTTP error %s while getting %s",
e.response.status_code,
link,
)
raise

filepath = location / filename
content_type = resp.headers.get("Content-Type")
# TODO: different chunk size for batched downloads?
chunks = response_chunks(resp)
with open(filepath, "wb") as content_file:
# Notify that the current task has begun.
batched_progress.start_subtask(task_id)
for chunk in chunks:
# Copy chunk directly to output file, without any
# additional buffering.
content_file.write(chunk)
# Update progress.
batched_progress.advance_subtask(task_id, len(chunk))
# Notify of completion.
batched_progress.finish_subtask(task_id)
# Yield completed link and download path.
yield link, (filepath, content_type)
with _spawn_workers(workers, error_flag) as err_recv:
with batched_progress:
# Read completed downloads from queue, or extract the exception.
while completed_downloads < total_downloads:
# Get item from queue, but also check for ^C from user!
try:
item = q.get()
except BaseException as e:
err_recv.receive_error(e)
break
# Now see if the worker thread failed with an exception (unlikely).
if isinstance(item, BaseException):
err_recv.receive_error(item)
break
# Otherwise, the thread succeeded, and we can pass it to
# the preparer!
link, filepath, content_type = item
completed_downloads += 1
yield link, (filepath, content_type)
2 changes: 2 additions & 0 deletions src/pip/_internal/operations/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ def __init__( # noqa: PLR0913
verbosity: int,
quietness: int,
color: bool,
batch_download_parallelism: Optional[int],
legacy_resolver: bool,
) -> None:
super().__init__()
Expand All @@ -249,6 +250,7 @@ def __init__( # noqa: PLR0913
progress_bar,
quiet=quietness > 0,
color=color,
max_parallelism=batch_download_parallelism,
)
self.finder = finder

Expand Down
2 changes: 2 additions & 0 deletions tests/functional/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -1288,6 +1288,8 @@ def run_for_generated_index(
str(download_dir),
"-i",
"http://localhost:8000",
"--batch-download-parallelism",
"1",
*args,
]
result = script.pip(*pip_args, allow_error=allow_error)
Expand Down
1 change: 1 addition & 0 deletions tests/unit/test_req.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def _basic_resolver(
verbosity=0,
quietness=0,
color=True,
batch_download_parallelism=None,
legacy_resolver=True,
)
yield Resolver(
Expand Down

0 comments on commit 481e1c4

Please sign in to comment.