diff --git a/src/dvc_task/app/filesystem.py b/src/dvc_task/app/filesystem.py index 5c74e4c..4ee6aa4 100644 --- a/src/dvc_task/app/filesystem.py +++ b/src/dvc_task/app/filesystem.py @@ -33,11 +33,11 @@ def _get_fs_config( "processed_folder": broker_processed_path, "store_processed": True, }, - "result_backend": "file://{}".format(_unc_path(result_path)), + "result_backend": f"file://{_unc_path(result_path)}", "result_persistent": True, - "task_serializer": "json", - "result_serializer": "json", - "accept_content": ["json"], + "task_serializer": task_serializer, + "result_serializer": result_serializer, + "accept_content": [task_serializer], } diff --git a/src/dvc_task/exceptions.py b/src/dvc_task/exceptions.py index 84668be..ae3866c 100644 --- a/src/dvc_task/exceptions.py +++ b/src/dvc_task/exceptions.py @@ -1,2 +1,5 @@ +"""Exception classes.""" + + class DvcTaskError(Exception): - pass + """Base DVC Task exception.""" diff --git a/src/dvc_task/proc/exceptions.py b/src/dvc_task/proc/exceptions.py index 59f1dac..746ae5d 100644 --- a/src/dvc_task/proc/exceptions.py +++ b/src/dvc_task/proc/exceptions.py @@ -1,17 +1,24 @@ +"""Process exceptions.""" from ..exceptions import DvcTaskError class ProcessNotTerminatedError(DvcTaskError): + """Process is still running.""" + def __init__(self, name): super().__init__(f"Managed process '{name}' has not been terminated.") class ProcessNotFoundError(DvcTaskError): + """Process does not exist.""" + def __init__(self, name): super().__init__(f"Managed process '{name}' does not exist.") class TimeoutExpired(DvcTaskError): + """Process timeout expired.""" + def __init__(self, cmd, timeout): super().__init__( f"'{cmd}' did not complete before timeout '{timeout}'" @@ -21,5 +28,7 @@ def __init__(self, cmd, timeout): class UnsupportedSignalError(DvcTaskError): + """Unsupported process signal.""" + def __init__(self, sig): super().__init__(f"Unsupported signal: {sig}") diff --git a/src/dvc_task/proc/manager.py b/src/dvc_task/proc/manager.py index 031c04c..1766ece 100644 --- a/src/dvc_task/proc/manager.py +++ b/src/dvc_task/proc/manager.py @@ -7,7 +7,7 @@ import sys from typing import Generator, List, Optional, Tuple, Union -from celery import Signature, signature +from celery import Signature, signature # pylint: disable=no-name-in-module from funcy.flow import reraise from shortuuid import uuid @@ -49,8 +49,8 @@ def __getitem__(self, key: str) -> "ProcessInfo": try: with open(info_path, encoding="utf-8") as fobj: return ProcessInfo.from_dict(json.load(fobj)) - except FileNotFoundError: - raise KeyError + except FileNotFoundError as exc: + raise KeyError from exc @reraise(FileNotFoundError, KeyError) def __setitem__(self, key: str, value: "ProcessInfo"): @@ -64,12 +64,14 @@ def __delitem__(self, key: str) -> None: remove(path) def get(self, key: str, default=None): + """Return the specified process.""" try: return self[key] except KeyError: return default def processes(self) -> Generator[Tuple[str, "ProcessInfo"], None, None]: + """Iterate over managed processes.""" for name in self: try: yield name, self[name] @@ -111,7 +113,7 @@ def send_signal(self, name: str, sig: int): def handle_closed_process(): logging.warning( - f"Process {name} had already aborted unexpectedly." + "Process '%s' had already aborted unexpectedly.", name ) process_info.returncode = -1 self[name] = process_info @@ -138,7 +140,7 @@ def kill(self, name: str): if sys.platform == "win32": self.send_signal(name, signal.SIGTERM) else: - self.send_signal(name, signal.SIGKILL) + self.send_signal(name, signal.SIGKILL) # pylint: disable=no-member def remove(self, name: str, force: bool = False): """Remove the specified named process from this manager. diff --git a/src/dvc_task/proc/process.py b/src/dvc_task/proc/process.py index 29d7f06..b7f915c 100644 --- a/src/dvc_task/proc/process.py +++ b/src/dvc_task/proc/process.py @@ -1,11 +1,13 @@ +"""Managed process module.""" import json import logging +import multiprocessing as mp import os import shlex import subprocess -from contextlib import AbstractContextManager +from contextlib import AbstractContextManager, ExitStack from dataclasses import asdict, dataclass -from typing import Dict, List, Optional, TextIO, Union +from typing import Any, Dict, List, Optional, Union from funcy import cached_property from shortuuid import uuid @@ -18,6 +20,8 @@ @dataclass class ProcessInfo: + """Process information.""" + pid: int stdin: Optional[str] stdout: Optional[str] @@ -25,25 +29,20 @@ class ProcessInfo: returncode: Optional[int] @classmethod - def from_dict(cls, d): - return cls(**d) + def from_dict(cls, data: Dict[str, Any]) -> "ProcessInfo": + """Construct ProcessInfo from the specified dictionary.""" + return cls(**data) - def asdict(self): + def asdict(self) -> Dict[str, Any]: + """Return this info as a dictionary.""" return asdict(self) class ManagedProcess(AbstractContextManager): - """Run the specified command with redirected output. + """Class to manage the specified process with redirected output. stdout and stderr will both be redirected to .out. Interactive processes (requiring stdin input) are currently unsupported. - - Parameters: - args: Command to be run. - env: Optional environment variables. - wdir: If specified, redirected output files will be placed in `wdir`. - name: Name to use for this process, if not specified a UUID will be - generated instead. """ def __init__( @@ -53,6 +52,16 @@ def __init__( wdir: Optional[str] = None, name: Optional[str] = None, ): + """Construct a MangedProcess. + + Arguments: + args: Command to be run. + env: Optional environment variables. + wdir: If specified, redirected output files will be placed in + `wdir`. Defaults to current working directory. + name: Name to use for this process, if not specified a UUID will be + generated instead. + """ self.args: List[str] = ( shlex.split(args, posix=os.name == "posix") if isinstance(args, str) @@ -62,8 +71,7 @@ def __init__( self.wdir = wdir self.name = name or uuid() self.returncode: Optional[int] = None - self._stdout: Optional[TextIO] = None - self._stderr: Optional[TextIO] = None + self._fd_stack = ExitStack() self._proc: Optional[subprocess.Popen] = None def __enter__(self): @@ -75,30 +83,30 @@ def __exit__(self, *args, **kwargs): self.wait() def _close_fds(self): - if self._stdout: - self._stdout.close() - self._stdout = None - if self._stderr: - self._stderr.close() - self._stderr = None + with self._fd_stack: + pass def _make_path(self, path: str) -> str: return os.path.join(self.wdir, path) if self.wdir else path @cached_property def stdout_path(self) -> str: + """Return redirected stdout path.""" return self._make_path(f"{self.name}.out") @cached_property def info_path(self) -> str: + """Return process information file path.""" return self._make_path(f"{self.name}.json") @cached_property def pidfile_path(self) -> str: + """Return process pidfile path.""" return self._make_path(f"{self.name}.pid") @property def info(self) -> "ProcessInfo": + """Return process information.""" return ProcessInfo( pid=self.pid, stdin=None, @@ -107,6 +115,17 @@ def info(self) -> "ProcessInfo": returncode=self.returncode, ) + @property + def pid(self) -> int: + """Return process PID. + + Raises: + ValueError: Process is not running. + """ + if self._proc is None: + raise ValueError + return self._proc.pid + def _make_wdir(self): if self.wdir: makedirs(self.wdir, exist_ok=True) @@ -119,23 +138,24 @@ def _dump(self): fobj.write(str(self.pid)) def run(self): + """Run this process.""" self._make_wdir() logger.debug( "Appending output to '%s'", self.stdout_path, ) - self._stdout = open(self.stdout_path, "ab") + stdout = self._fd_stack.enter_context(open(self.stdout_path, "ab")) try: + # pylint: disable=consider-using-with self._proc = subprocess.Popen( self.args, stdin=subprocess.DEVNULL, - stdout=self._stdout, + stdout=stdout, stderr=subprocess.STDOUT, close_fds=True, shell=False, env=self.env, ) - self.pid: int = self._proc.pid self._dump() except Exception: if self._proc is not None: @@ -167,8 +187,6 @@ def spawn(cls, *args, **kwargs) -> Optional[int]: Returns: The spawned process PID. """ - import multiprocessing as mp - proc = mp.Process( target=cls._spawn, args=args, diff --git a/src/dvc_task/proc/tasks.py b/src/dvc_task/proc/tasks.py index faa02f6..529bb82 100644 --- a/src/dvc_task/proc/tasks.py +++ b/src/dvc_task/proc/tasks.py @@ -1,4 +1,5 @@ -from typing import Dict, List, Optional, Union +"""Celery tasks.""" +from typing import Any, Dict from celery import shared_task @@ -6,22 +7,13 @@ @shared_task(bind=True) -def run( - self, - args: Union[str, List[str]], - env: Optional[Dict[str, str]] = None, - wdir: Optional[str] = None, - name: Optional[str] = None, -) -> Optional[int]: +def run( # pylint: disable=unused-argument + self, *args: Any, **kwargs: Any +) -> Dict[str, Any]: """Run a command inside a celery task. - Arguments: - args: Command to run. - env: Optional environment variables. - wdir: If specified, redirected output files will be placed in `wdir`. - name: Name to use for this process, if not specified a UUID will be - generated instead. + Accepts the same arguments as `proc.process.ManagedProcess`. """ - with ManagedProcess(args, env=env, wdir=wdir, name=name) as proc: + with ManagedProcess(*args, **kwargs) as proc: self.update_state(state="RUNNING", meta=proc.info.asdict()) return proc.info.asdict() diff --git a/src/dvc_task/utils.py b/src/dvc_task/utils.py index 1e3f0d9..dbdd4ed 100644 --- a/src/dvc_task/utils.py +++ b/src/dvc_task/utils.py @@ -1,3 +1,5 @@ +"""General utilities.""" + import errno import logging import os @@ -10,19 +12,19 @@ def _chmod( # pylint: disable=unused-argument - func: Callable, p: str, excinfo: Any + func: Callable, path: str, excinfo: Any ): - perm = os.lstat(p).st_mode + perm = os.lstat(path).st_mode perm |= stat.S_IWRITE try: - os.chmod(p, perm) + os.chmod(path, perm) except OSError as exc: # broken symlink or file is not owned by us if exc.errno not in [errno.ENOENT, errno.EPERM]: raise - func(p) + func(path) def _unlink(path: str, onerror: Callable): @@ -33,6 +35,7 @@ def _unlink(path: str, onerror: Callable): def remove(path: str): + """Remove the specified path.""" logger.debug("Removing '%s'", path) try: if os.path.isdir(path): @@ -45,6 +48,7 @@ def remove(path: str): def makedirs(path: str, exist_ok: bool = False, mode: Optional[int] = None): + """Make the specified directory and any parent directories.""" if mode is None: os.makedirs(path, exist_ok=exist_ok) return @@ -61,7 +65,7 @@ def makedirs(path: str, exist_ok: bool = False, mode: Optional[int] = None): # Defeats race condition when another thread created the path pass cdir = os.curdir - if tail == cdir: # xxx/newdir/. exists if xxx/newdir exists + if tail == cdir: # foo/newdir/. exists if foo/newdir exists return try: os.mkdir(path, mode) diff --git a/src/dvc_task/worker/temporary.py b/src/dvc_task/worker/temporary.py index 6e88c61..70a1dc5 100644 --- a/src/dvc_task/worker/temporary.py +++ b/src/dvc_task/worker/temporary.py @@ -1,3 +1,4 @@ +"""Temporary worker module.""" import logging import threading import time @@ -11,7 +12,7 @@ class TemporaryWorker: """Temporary worker that automatically shuts down when queue is empty.""" - def __init__( + def __init__( # pylint: disable=too-many-arguments self, app: Celery, timeout: int = 60, @@ -54,7 +55,7 @@ def start(self, name: str) -> None: f"--hostname={name}", ] if self.concurrency: - f"--concurrency={self.concurrency}", + argv.append(f"--concurrency={self.concurrency}") if self.task_events: argv.append("-E") self.app.worker_main(argv=argv) diff --git a/tests/app/test_filesystem.py b/tests/app/test_filesystem.py index 32b5009..7c8cd42 100644 --- a/tests/app/test_filesystem.py +++ b/tests/app/test_filesystem.py @@ -1,13 +1,14 @@ +"""Filesystem app tests.""" import os import pytest -from pytest_mock import MockerFixture from pytest_test_utils import TmpDir from dvc_task.app.filesystem import FSApp, _get_fs_config, _unc_path def test_config(tmp_dir: TmpDir): + """Should return a filesystem broker/resut config.""" config = _get_fs_config(str(tmp_dir), mkdir=True) assert (tmp_dir / "broker" / "in").is_dir() assert (tmp_dir / "broker" / "processed").is_dir() @@ -17,11 +18,13 @@ def test_config(tmp_dir: TmpDir): @pytest.mark.skipif(os.name != "nt", reason="Windows only") def test_unc_path(): + """Windows paths should be converted to UNC paths.""" assert "//?/c:/foo" == _unc_path(r"c:\foo") assert "//foo/bar" == _unc_path(r"\\foo\bar") -def test_fs_app(tmp_dir: TmpDir, mocker: MockerFixture): +def test_fs_app(tmp_dir: TmpDir): + """App should be constructed with filesystem broker/result config.""" app = FSApp(wdir=str(tmp_dir), mkdir=True) assert app.wdir == str(tmp_dir) assert (tmp_dir / "broker" / "in").is_dir() diff --git a/tests/proc/conftest.py b/tests/proc/conftest.py index 78f3e07..46bdad5 100644 --- a/tests/proc/conftest.py +++ b/tests/proc/conftest.py @@ -1,13 +1,53 @@ +"""Process test fixtures.""" +import json +import os +from typing import Optional + import pytest from pytest_mock import MockerFixture +from pytest_test_utils import TmpDir + +from dvc_task.proc.process import ProcessInfo TEST_PID = 1234 +PID_FINISHED = 1234 +PID_RUNNING = 5678 @pytest.fixture def popen_pid(mocker: MockerFixture) -> int: + """Return a mocked Popen PID.""" mocker.patch( "subprocess.Popen", return_value=mocker.MagicMock(pid=TEST_PID, returncode=None), ) return TEST_PID + + +def create_process( + root: str, name: str, pid: int, returncode: Optional[int] = None +): + """Create a test process info directory.""" + info_path = os.path.join(root, name, f"{name}.json") + os.makedirs(os.path.join(root, name)) + process_info = ProcessInfo( + pid=pid, stdin=None, stdout=None, stderr=None, returncode=returncode + ) + with open(info_path, "w", encoding="utf-8") as fobj: + json.dump(process_info.asdict(), fobj) + + +@pytest.fixture +def finished_process(tmp_dir: TmpDir) -> str: + """Return a finished test process.""" + key = "finished" + create_process(tmp_dir, key, PID_FINISHED, 0) + return key + + +@pytest.fixture +def running_process(tmp_dir: TmpDir) -> str: + """Return a running test process.""" + key = "running" + create_process(tmp_dir, key, PID_RUNNING) + return key diff --git a/tests/proc/test_manager.py b/tests/proc/test_manager.py index 6375392..c2346f1 100644 --- a/tests/proc/test_manager.py +++ b/tests/proc/test_manager.py @@ -1,8 +1,6 @@ -import json -import os +"""Process manager tests.""" import signal import sys -from typing import Optional import pytest from pytest_mock import MockerFixture @@ -13,36 +11,8 @@ UnsupportedSignalError, ) from dvc_task.proc.manager import ProcessManager -from dvc_task.proc.process import ProcessInfo -PID_FINISHED = 1234 -PID_RUNNING = 5678 - - -def create_process( - root: str, name: str, pid: int, returncode: Optional[int] = None -): - info_path = os.path.join(root, name, f"{name}.json") - os.makedirs(os.path.join(root, name)) - process_info = ProcessInfo( - pid=pid, stdin=None, stdout=None, stderr=None, returncode=returncode - ) - with open(info_path, "w", encoding="utf-8") as fobj: - json.dump(process_info.asdict(), fobj) - - -@pytest.fixture -def finished_process(tmp_dir: TmpDir) -> str: - key = "finished" - create_process(tmp_dir, key, PID_FINISHED, 0) - return key - - -@pytest.fixture -def running_process(tmp_dir: TmpDir) -> str: - key = "running" - create_process(tmp_dir, key, PID_RUNNING) - return key +from .conftest import PID_RUNNING def test_send_signal( @@ -51,14 +21,15 @@ def test_send_signal( finished_process: str, running_process: str, ): - m = mocker.patch("os.kill") + """Terminate signal should be sent.""" + mock_kill = mocker.patch("os.kill") process_manager = ProcessManager(tmp_dir) process_manager.send_signal(running_process, signal.SIGTERM) - m.assert_called_once_with(PID_RUNNING, signal.SIGTERM) + mock_kill.assert_called_once_with(PID_RUNNING, signal.SIGTERM) - m.reset_mock() + mock_kill.reset_mock() process_manager.send_signal(finished_process, signal.SIGTERM) - m.assert_not_called() + mock_kill.assert_not_called() if sys.platform == "win32": with pytest.raises(UnsupportedSignalError): @@ -68,6 +39,7 @@ def test_send_signal( def test_dead_process( tmp_dir: TmpDir, mocker: MockerFixture, running_process: str ): + """Dead process lookup should fail.""" process_manager = ProcessManager(tmp_dir) def side_effect(*args): @@ -75,8 +47,7 @@ def side_effect(*args): err = OSError() err.winerror = 87 raise err - else: - raise ProcessLookupError() + raise ProcessLookupError() mocker.patch("os.kill", side_effect=side_effect) with pytest.raises(ProcessLookupError): @@ -90,17 +61,20 @@ def test_kill( finished_process: str, running_process: str, ): - m = mocker.patch("os.kill") + """Kill signal should be sent.""" + mock_kill = mocker.patch("os.kill") process_manager = ProcessManager(tmp_dir) process_manager.kill(running_process) if sys.platform == "win32": - m.assert_called_once_with(PID_RUNNING, signal.SIGTERM) + mock_kill.assert_called_once_with(PID_RUNNING, signal.SIGTERM) else: - m.assert_called_once_with(PID_RUNNING, signal.SIGKILL) + mock_kill.assert_called_once_with( + PID_RUNNING, signal.SIGKILL # pylint: disable=no-member + ) - m.reset_mock() + mock_kill.reset_mock() process_manager.kill(finished_process) - m.assert_not_called() + mock_kill.assert_not_called() def test_terminate( @@ -109,14 +83,15 @@ def test_terminate( running_process: str, finished_process: str, ): - m = mocker.patch("os.kill") + """Terminate signal should be sent.""" + mock_kill = mocker.patch("os.kill") process_manager = ProcessManager(tmp_dir) process_manager.terminate(running_process) - m.assert_called_once_with(PID_RUNNING, signal.SIGTERM) + mock_kill.assert_called_once_with(PID_RUNNING, signal.SIGTERM) - m.reset_mock() + mock_kill.reset_mock() process_manager.terminate(finished_process) - m.assert_not_called() + mock_kill.assert_not_called() def test_remove( @@ -125,6 +100,7 @@ def test_remove( running_process: str, finished_process: str, ): + """Process should be removed.""" mocker.patch("os.kill", return_value=None) process_manager = ProcessManager(tmp_dir) process_manager.remove(finished_process) @@ -144,6 +120,7 @@ def test_cleanup( finished_process: str, force: bool, ): + """Process directory should be removed.""" mocker.patch("os.kill", return_value=None) process_manager = ProcessManager(tmp_dir) process_manager.cleanup(force) diff --git a/tests/proc/test_process.py b/tests/proc/test_process.py index a6b307a..e0dba19 100644 --- a/tests/proc/test_process.py +++ b/tests/proc/test_process.py @@ -1,13 +1,16 @@ +"""Process tests.""" import json import subprocess +from typing import List, Union import pytest from pytest_mock import MockerFixture -from pytest_test_utils import TmpDir +from dvc_task.proc.exceptions import TimeoutExpired from dvc_task.proc.process import ManagedProcess, ProcessInfo +@pytest.mark.usefixtures("tmp_dir", "popen_pid") @pytest.mark.parametrize( "args", [ @@ -15,13 +18,16 @@ ["/bin/foo", "-o", "option"], ], ) -def test_init_args(tmp_dir: TmpDir, args, popen_pid: int): +def test_init_args(args: Union[str, List[str]]): + """Args should be shlex'd.""" expected = ["/bin/foo", "-o", "option"] proc = ManagedProcess(args) assert expected == proc.args -def test_run(tmp_dir: TmpDir, popen_pid: int): +@pytest.mark.usefixtures("tmp_dir") +def test_run(popen_pid: int): + """Process info should be generated.""" proc = ManagedProcess("/bin/foo") proc.run() assert popen_pid == proc.info.pid @@ -31,10 +37,11 @@ def test_run(tmp_dir: TmpDir, popen_pid: int): assert popen_pid == info.pid -def test_wait(tmp_dir: TmpDir, mocker: MockerFixture, popen_pid: int): - from dvc_task.proc.exceptions import TimeoutExpired - +@pytest.mark.usefixtures("tmp_dir", "popen_pid") +def test_wait(mocker: MockerFixture): + """Wait should block while process is running and incomplete.""" with ManagedProcess("/bin/foo") as proc: + # pylint: disable=protected-access proc._proc.wait = mocker.Mock( side_effect=subprocess.TimeoutExpired("/bin/foo", 5) ) diff --git a/tests/proc/test_tasks.py b/tests/proc/test_tasks.py index a701365..bcf6911 100644 --- a/tests/proc/test_tasks.py +++ b/tests/proc/test_tasks.py @@ -1,7 +1,7 @@ +"""Process task tests.""" from typing import Any, Dict -from celery import Celery -from celery.worker.worker import WorkController +import pytest from pytest_mock import MockerFixture from pytest_test_utils import TmpDir @@ -9,13 +9,13 @@ from dvc_task.proc.tasks import run +@pytest.mark.usefixtures("celery_app", "celery_worker") def test_run( tmp_dir: TmpDir, - celery_app: Celery, - celery_worker: WorkController, popen_pid: int, mocker: MockerFixture, ): + """Task should run the process.""" env = {"FOO": "1"} wdir = str(tmp_dir / "wdir") name = "foo" diff --git a/tests/worker/test_temporary.py b/tests/worker/test_temporary.py index bfe213c..a1a0f21 100644 --- a/tests/worker/test_temporary.py +++ b/tests/worker/test_temporary.py @@ -1,3 +1,4 @@ +"""Temporary Worker tests.""" from celery import Celery from celery.worker.worker import WorkController from pytest_mock import MockerFixture @@ -6,6 +7,7 @@ def test_start(celery_app: Celery, mocker: MockerFixture): + """Should start underlying Celery worker.""" worker_cls = mocker.patch.object(celery_app, "Worker") thread = mocker.patch("threading.Thread") worker = TemporaryWorker(celery_app) @@ -24,6 +26,7 @@ def test_start_already_exists( celery_worker: WorkController, mocker: MockerFixture, ): + """Should not start if worker instance already exists.""" worker_cls = mocker.patch.object(celery_app, "Worker") thread = mocker.patch("threading.Thread") worker = TemporaryWorker(celery_app) @@ -37,6 +40,7 @@ def test_monitor( celery_worker: WorkController, mocker: MockerFixture, ): + """Should shutdown worker when queue empty.""" worker = TemporaryWorker(celery_app, timeout=1) shutdown = mocker.spy(celery_app.control, "shutdown") worker.monitor(celery_worker.hostname) # type: ignore[attr-defined]