diff --git a/tests/conftest.py b/tests/conftest.py index 57dd7e68a2b..5b189443f25 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,23 +1,21 @@ import compileall +import contextlib import fnmatch -import io import os import re import shutil import subprocess import sys -from contextlib import ExitStack, contextmanager from pathlib import Path from typing import ( - TYPE_CHECKING, AnyStr, Callable, + ContextManager, Dict, Iterable, Iterator, List, Optional, - Union, ) from unittest.mock import patch from zipfile import ZipFile @@ -36,25 +34,20 @@ from installer.sources import WheelFile from pip import __file__ as pip_location -from pip._internal.cli.main import main as pip_entry_point from pip._internal.locations import _USE_SYSCONFIG from pip._internal.utils.temp_dir import global_tempdir_manager -from tests.lib import DATA_DIR, SRC_DIR, PipTestEnvironment, TestData -from tests.lib.server import MockServer as _MockServer -from tests.lib.server import make_mock_server, server_running +from tests.lib import ( + DATA_DIR, + SRC_DIR, + CertFactory, + InMemoryPip, + PipTestEnvironment, + ScriptFactory, + TestData, +) +from tests.lib.server import MockServer, make_mock_server from tests.lib.venv import VirtualEnvironment, VirtualEnvironmentType -from .lib.compat import nullcontext - -if TYPE_CHECKING: - from typing import Protocol - - from wsgi import WSGIApplication -else: - # TODO: Protocol was introduced in Python 3.8. Remove this branch when - # dropping support for Python 3.7. - Protocol = object - def pytest_addoption(parser: Parser) -> None: parser.addoption( @@ -325,7 +318,7 @@ def scoped_global_tempdir_manager(request: pytest.FixtureRequest) -> Iterator[No temporary directories in the application. """ if "no_auto_tempdir_manager" in request.keywords: - ctx = nullcontext + ctx: Callable[[], ContextManager[None]] = contextlib.nullcontext else: ctx = global_tempdir_manager @@ -502,16 +495,6 @@ def virtualenv( yield virtualenv_factory(tmpdir.joinpath("workspace", "venv")) -class ScriptFactory(Protocol): - def __call__( - self, - tmpdir: Path, - virtualenv: Optional[VirtualEnvironment] = None, - environ: Optional[Dict[AnyStr, AnyStr]] = None, - ) -> PipTestEnvironment: - ... - - @pytest.fixture(scope="session") def script_factory( virtualenv_factory: Callable[[Path], VirtualEnvironment], @@ -631,26 +614,6 @@ def data(tmpdir: Path) -> TestData: return TestData.copy(tmpdir.joinpath("data")) -class InMemoryPipResult: - def __init__(self, returncode: int, stdout: str) -> None: - self.returncode = returncode - self.stdout = stdout - - -class InMemoryPip: - def pip(self, *args: Union[str, Path]) -> InMemoryPipResult: - orig_stdout = sys.stdout - stdout = io.StringIO() - sys.stdout = stdout - try: - returncode = pip_entry_point([os.fspath(a) for a in args]) - except SystemExit as e: - returncode = e.code or 0 - finally: - sys.stdout = orig_stdout - return InMemoryPipResult(returncode, stdout.getvalue()) - - @pytest.fixture def in_memory_pip() -> InMemoryPip: return InMemoryPip() @@ -662,9 +625,6 @@ def deprecated_python() -> bool: return sys.version_info[:2] in [] -CertFactory = Callable[[], str] - - @pytest.fixture(scope="session") def cert_factory(tmpdir_factory: pytest.TempPathFactory) -> CertFactory: # Delay the import requiring cryptography in order to make it possible @@ -686,49 +646,6 @@ def factory() -> str: return factory -class MockServer: - def __init__(self, server: _MockServer) -> None: - self._server = server - self._running = False - self.context = ExitStack() - - @property - def port(self) -> int: - return self._server.port - - @property - def host(self) -> str: - return self._server.host - - def set_responses(self, responses: Iterable["WSGIApplication"]) -> None: - assert not self._running, "responses cannot be set on running server" - self._server.mock.side_effect = responses - - def start(self) -> None: - assert not self._running, "running server cannot be started" - self.context.enter_context(server_running(self._server)) - self.context.enter_context(self._set_running()) - - @contextmanager - def _set_running(self) -> Iterator[None]: - self._running = True - try: - yield - finally: - self._running = False - - def stop(self) -> None: - assert self._running, "idle server cannot be stopped" - self.context.close() - - def get_requests(self) -> List[Dict[str, str]]: - """Get environ for each received request.""" - assert not self._running, "cannot get mock from running server" - # Legacy: replace call[0][0] with call.args[0] - # when pip drops support for python3.7 - return [call[0][0] for call in self._server.mock.call_args_list] - - @pytest.fixture def mock_server() -> Iterator[MockServer]: server = make_mock_server() diff --git a/tests/functional/test_completion.py b/tests/functional/test_completion.py index b02cd4fa317..28381c2097e 100644 --- a/tests/functional/test_completion.py +++ b/tests/functional/test_completion.py @@ -5,8 +5,7 @@ import pytest -from tests.conftest import ScriptFactory -from tests.lib import PipTestEnvironment, TestData, TestPipResult +from tests.lib import PipTestEnvironment, ScriptFactory, TestData, TestPipResult if TYPE_CHECKING: from typing import Protocol diff --git a/tests/functional/test_download.py b/tests/functional/test_download.py index 31418ca8c2b..8c00dc09edf 100644 --- a/tests/functional/test_download.py +++ b/tests/functional/test_download.py @@ -14,15 +14,15 @@ from pip._internal.cli.status_codes import ERROR from pip._internal.utils.urls import path_to_url -from tests.conftest import MockServer, ScriptFactory from tests.lib import ( PipTestEnvironment, + ScriptFactory, TestData, TestPipResult, create_basic_sdist_for_package, create_really_basic_wheel, ) -from tests.lib.server import file_response +from tests.lib.server import MockServer, file_response def fake_wheel(data: TestData, wheel_path: str) -> None: diff --git a/tests/functional/test_help.py b/tests/functional/test_help.py index dba41af5f79..69419b8d9a3 100644 --- a/tests/functional/test_help.py +++ b/tests/functional/test_help.py @@ -5,8 +5,7 @@ from pip._internal.cli.status_codes import ERROR, SUCCESS from pip._internal.commands import commands_dict, create_command from pip._internal.exceptions import CommandError -from tests.conftest import InMemoryPip -from tests.lib import PipTestEnvironment +from tests.lib import InMemoryPip, PipTestEnvironment def test_run_method_should_return_success_when_finds_command_name() -> None: diff --git a/tests/functional/test_inspect.py b/tests/functional/test_inspect.py index c9f43134624..f6690fb1fb1 100644 --- a/tests/functional/test_inspect.py +++ b/tests/functional/test_inspect.py @@ -2,8 +2,7 @@ import pytest -from tests.conftest import ScriptFactory -from tests.lib import PipTestEnvironment, TestData +from tests.lib import PipTestEnvironment, ScriptFactory, TestData @pytest.fixture(scope="session") diff --git a/tests/functional/test_install.py b/tests/functional/test_install.py index 63712827479..c29880e611a 100644 --- a/tests/functional/test_install.py +++ b/tests/functional/test_install.py @@ -15,8 +15,8 @@ from pip._internal.models.index import PyPI, TestPyPI from pip._internal.utils.misc import rmtree from pip._internal.utils.urls import path_to_url -from tests.conftest import CertFactory from tests.lib import ( + CertFactory, PipTestEnvironment, ResolverVariant, TestData, diff --git a/tests/functional/test_install_config.py b/tests/functional/test_install_config.py index 9f8a8067787..ecaf2f705a2 100644 --- a/tests/functional/test_install_config.py +++ b/tests/functional/test_install_config.py @@ -8,9 +8,9 @@ import pytest -from tests.conftest import CertFactory, MockServer, ScriptFactory -from tests.lib import PipTestEnvironment, TestData +from tests.lib import CertFactory, PipTestEnvironment, ScriptFactory, TestData from tests.lib.server import ( + MockServer, authorization_response, file_response, make_mock_server, diff --git a/tests/functional/test_list.py b/tests/functional/test_list.py index bd45f82df7f..a960f3c4e71 100644 --- a/tests/functional/test_list.py +++ b/tests/functional/test_list.py @@ -5,9 +5,9 @@ import pytest from pip._internal.models.direct_url import DirectUrl, DirInfo -from tests.conftest import ScriptFactory from tests.lib import ( PipTestEnvironment, + ScriptFactory, TestData, _create_test_package, create_test_package_with_setup, diff --git a/tests/lib/__init__.py b/tests/lib/__init__.py index 7410072f50e..cd0b83d1292 100644 --- a/tests/lib/__init__.py +++ b/tests/lib/__init__.py @@ -10,11 +10,12 @@ from base64 import urlsafe_b64encode from contextlib import contextmanager from hashlib import sha256 -from io import BytesIO +from io import BytesIO, StringIO from textwrap import dedent from typing import ( TYPE_CHECKING, Any, + AnyStr, Callable, Dict, Iterable, @@ -32,6 +33,7 @@ from pip._vendor.packaging.utils import canonicalize_name from scripttest import FoundDir, FoundFile, ProcResult, TestFileEnvironment +from pip._internal.cli.main import main as pip_entry_point from pip._internal.index.collector import LinkCollector from pip._internal.index.package_finder import PackageFinder from pip._internal.locations import get_major_minor_version @@ -43,12 +45,12 @@ from tests.lib.wheel import make_wheel if TYPE_CHECKING: - # Literal was introduced in Python 3.8. - from typing import Literal + from typing import Literal, Protocol ResolverVariant = Literal["resolvelib", "legacy"] -else: - ResolverVariant = str +else: # TODO: Remove this branch when dropping support for Python 3.7. + Protocol = object # Protocol was introduced in Python 3.8. + ResolverVariant = str # Literal was introduced in Python 3.8. DATA_DIR = pathlib.Path(__file__).parent.parent.joinpath("data").resolve() SRC_DIR = pathlib.Path(__file__).resolve().parent.parent.parent @@ -1336,3 +1338,39 @@ def need_svn(fn: _Test) -> _Test: def need_mercurial(fn: _Test) -> _Test: return pytest.mark.mercurial(need_executable("Mercurial", ("hg", "version"))(fn)) + + +class InMemoryPipResult: + def __init__(self, returncode: int, stdout: str) -> None: + self.returncode = returncode + self.stdout = stdout + + +class InMemoryPip: + def pip(self, *args: Union[str, pathlib.Path]) -> InMemoryPipResult: + orig_stdout = sys.stdout + stdout = StringIO() + sys.stdout = stdout + try: + returncode = pip_entry_point([os.fspath(a) for a in args]) + except SystemExit as e: + if isinstance(e.code, int): + returncode = e.code + else: + returncode = int(bool(e.code)) + finally: + sys.stdout = orig_stdout + return InMemoryPipResult(returncode, stdout.getvalue()) + + +class ScriptFactory(Protocol): + def __call__( + self, + tmpdir: pathlib.Path, + virtualenv: Optional[VirtualEnvironment] = None, + environ: Optional[Dict[AnyStr, AnyStr]] = None, + ) -> PipTestEnvironment: + ... + + +CertFactory = Callable[[], str] diff --git a/tests/lib/compat.py b/tests/lib/compat.py index 4d44cbddbbc..866ac7a7734 100644 --- a/tests/lib/compat.py +++ b/tests/lib/compat.py @@ -2,32 +2,13 @@ import contextlib import signal -from typing import Iterable, Iterator - - -@contextlib.contextmanager -def nullcontext() -> Iterator[None]: - """ - Context manager that does no additional processing. - - Used as a stand-in for a normal context manager, when a particular block of - code is only sometimes used with a normal context manager: - - cm = optional_cm if condition else nullcontext() - with cm: - # Perform operation, using optional_cm if condition is True - - TODO: Replace with contextlib.nullcontext after dropping Python 3.6 - support. - """ - yield - +from typing import Callable, ContextManager, Iterable, Iterator # Applies on Windows. if not hasattr(signal, "pthread_sigmask"): # We're not relying on this behavior anywhere currently, it's just best # practice. - blocked_signals = nullcontext + blocked_signals: Callable[[], ContextManager[None]] = contextlib.nullcontext else: @contextlib.contextmanager diff --git a/tests/lib/server.py b/tests/lib/server.py index 4cc18452cb5..1048a173d40 100644 --- a/tests/lib/server.py +++ b/tests/lib/server.py @@ -2,9 +2,9 @@ import ssl import threading from base64 import b64encode -from contextlib import contextmanager +from contextlib import ExitStack, contextmanager from textwrap import dedent -from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, Iterator +from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, Iterator, List from unittest.mock import Mock from werkzeug.serving import BaseWSGIServer, WSGIRequestHandler @@ -18,7 +18,7 @@ Body = Iterable[bytes] -class MockServer(BaseWSGIServer): +class _MockServer(BaseWSGIServer): mock: Mock = Mock() @@ -64,7 +64,7 @@ def adapter(environ: "WSGIEnvironment", start_response: "StartResponse") -> Body return adapter -def make_mock_server(**kwargs: Any) -> MockServer: +def make_mock_server(**kwargs: Any) -> _MockServer: """Creates a mock HTTP(S) server listening on a random port on localhost. The `mock` property of the returned server provides and records all WSGI @@ -189,3 +189,46 @@ def responder(environ: "WSGIEnvironment", start_response: "StartResponse") -> Bo return [path.read_bytes()] return responder + + +class MockServer: + def __init__(self, server: _MockServer) -> None: + self._server = server + self._running = False + self.context = ExitStack() + + @property + def port(self) -> int: + return self._server.port + + @property + def host(self) -> str: + return self._server.host + + def set_responses(self, responses: Iterable["WSGIApplication"]) -> None: + assert not self._running, "responses cannot be set on running server" + self._server.mock.side_effect = responses + + def start(self) -> None: + assert not self._running, "running server cannot be started" + self.context.enter_context(server_running(self._server)) + self.context.enter_context(self._set_running()) + + @contextmanager + def _set_running(self) -> Iterator[None]: + self._running = True + try: + yield + finally: + self._running = False + + def stop(self) -> None: + assert self._running, "idle server cannot be stopped" + self.context.close() + + def get_requests(self) -> List[Dict[str, str]]: + """Get environ for each received request.""" + assert not self._running, "cannot get mock from running server" + # Legacy: replace call[0][0] with call.args[0] + # when pip drops support for python3.7 + return [call[0][0] for call in self._server.mock.call_args_list]