Skip to content

Commit

Permalink
lint: pylint fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
pmrowla committed Feb 15, 2022
1 parent f44eb38 commit 8431bea
Show file tree
Hide file tree
Showing 14 changed files with 177 additions and 117 deletions.
8 changes: 4 additions & 4 deletions src/dvc_task/app/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ def _get_fs_config(
"processed_folder": broker_processed_path,
"store_processed": True,
},
"result_backend": "file://{}".format(_unc_path(result_path)),
"result_backend": f"file://{_unc_path(result_path)}",
"result_persistent": True,
"task_serializer": "json",
"result_serializer": "json",
"accept_content": ["json"],
"task_serializer": task_serializer,
"result_serializer": result_serializer,
"accept_content": [task_serializer],
}


Expand Down
5 changes: 4 additions & 1 deletion src/dvc_task/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
"""Exception classes."""


class DvcTaskError(Exception):
pass
"""Base DVC Task exception."""
9 changes: 9 additions & 0 deletions src/dvc_task/proc/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
"""Process exceptions."""
from ..exceptions import DvcTaskError


class ProcessNotTerminatedError(DvcTaskError):
"""Process is still running."""

def __init__(self, name):
super().__init__(f"Managed process '{name}' has not been terminated.")


class ProcessNotFoundError(DvcTaskError):
"""Process does not exist."""

def __init__(self, name):
super().__init__(f"Managed process '{name}' does not exist.")


class TimeoutExpired(DvcTaskError):
"""Process timeout expired."""

def __init__(self, cmd, timeout):
super().__init__(
f"'{cmd}' did not complete before timeout '{timeout}'"
Expand All @@ -21,5 +28,7 @@ def __init__(self, cmd, timeout):


class UnsupportedSignalError(DvcTaskError):
"""Unsupported process signal."""

def __init__(self, sig):
super().__init__(f"Unsupported signal: {sig}")
12 changes: 7 additions & 5 deletions src/dvc_task/proc/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import sys
from typing import Generator, List, Optional, Tuple, Union

from celery import Signature, signature
from celery import Signature, signature # pylint: disable=no-name-in-module
from funcy.flow import reraise
from shortuuid import uuid

Expand Down Expand Up @@ -49,8 +49,8 @@ def __getitem__(self, key: str) -> "ProcessInfo":
try:
with open(info_path, encoding="utf-8") as fobj:
return ProcessInfo.from_dict(json.load(fobj))
except FileNotFoundError:
raise KeyError
except FileNotFoundError as exc:
raise KeyError from exc

@reraise(FileNotFoundError, KeyError)
def __setitem__(self, key: str, value: "ProcessInfo"):
Expand All @@ -64,12 +64,14 @@ def __delitem__(self, key: str) -> None:
remove(path)

def get(self, key: str, default=None):
"""Return the specified process."""
try:
return self[key]
except KeyError:
return default

def processes(self) -> Generator[Tuple[str, "ProcessInfo"], None, None]:
"""Iterate over managed processes."""
for name in self:
try:
yield name, self[name]
Expand Down Expand Up @@ -111,7 +113,7 @@ def send_signal(self, name: str, sig: int):

def handle_closed_process():
logging.warning(
f"Process {name} had already aborted unexpectedly."
"Process '%s' had already aborted unexpectedly.", name
)
process_info.returncode = -1
self[name] = process_info
Expand All @@ -138,7 +140,7 @@ def kill(self, name: str):
if sys.platform == "win32":
self.send_signal(name, signal.SIGTERM)
else:
self.send_signal(name, signal.SIGKILL)
self.send_signal(name, signal.SIGKILL) # pylint: disable=no-member

def remove(self, name: str, force: bool = False):
"""Remove the specified named process from this manager.
Expand Down
70 changes: 44 additions & 26 deletions src/dvc_task/proc/process.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Managed process module."""
import json
import logging
import multiprocessing as mp
import os
import shlex
import subprocess
from contextlib import AbstractContextManager
from contextlib import AbstractContextManager, ExitStack
from dataclasses import asdict, dataclass
from typing import Dict, List, Optional, TextIO, Union
from typing import Any, Dict, List, Optional, Union

from funcy import cached_property
from shortuuid import uuid
Expand All @@ -18,32 +20,29 @@

@dataclass
class ProcessInfo:
"""Process information."""

pid: int
stdin: Optional[str]
stdout: Optional[str]
stderr: Optional[str]
returncode: Optional[int]

@classmethod
def from_dict(cls, d):
return cls(**d)
def from_dict(cls, data: Dict[str, Any]) -> "ProcessInfo":
"""Construct ProcessInfo from the specified dictionary."""
return cls(**data)

def asdict(self):
def asdict(self) -> Dict[str, Any]:
"""Return this info as a dictionary."""
return asdict(self)


class ManagedProcess(AbstractContextManager):
"""Run the specified command with redirected output.
"""Class to manage the specified process with redirected output.
stdout and stderr will both be redirected to <name>.out.
Interactive processes (requiring stdin input) are currently unsupported.
Parameters:
args: Command to be run.
env: Optional environment variables.
wdir: If specified, redirected output files will be placed in `wdir`.
name: Name to use for this process, if not specified a UUID will be
generated instead.
"""

def __init__(
Expand All @@ -53,6 +52,16 @@ def __init__(
wdir: Optional[str] = None,
name: Optional[str] = None,
):
"""Construct a MangedProcess.
Arguments:
args: Command to be run.
env: Optional environment variables.
wdir: If specified, redirected output files will be placed in
`wdir`. Defaults to current working directory.
name: Name to use for this process, if not specified a UUID will be
generated instead.
"""
self.args: List[str] = (
shlex.split(args, posix=os.name == "posix")
if isinstance(args, str)
Expand All @@ -62,8 +71,7 @@ def __init__(
self.wdir = wdir
self.name = name or uuid()
self.returncode: Optional[int] = None
self._stdout: Optional[TextIO] = None
self._stderr: Optional[TextIO] = None
self._fd_stack = ExitStack()
self._proc: Optional[subprocess.Popen] = None

def __enter__(self):
Expand All @@ -75,30 +83,30 @@ def __exit__(self, *args, **kwargs):
self.wait()

def _close_fds(self):
if self._stdout:
self._stdout.close()
self._stdout = None
if self._stderr:
self._stderr.close()
self._stderr = None
with self._fd_stack:
pass

def _make_path(self, path: str) -> str:
return os.path.join(self.wdir, path) if self.wdir else path

@cached_property
def stdout_path(self) -> str:
"""Return redirected stdout path."""
return self._make_path(f"{self.name}.out")

@cached_property
def info_path(self) -> str:
"""Return process information file path."""
return self._make_path(f"{self.name}.json")

@cached_property
def pidfile_path(self) -> str:
"""Return process pidfile path."""
return self._make_path(f"{self.name}.pid")

@property
def info(self) -> "ProcessInfo":
"""Return process information."""
return ProcessInfo(
pid=self.pid,
stdin=None,
Expand All @@ -107,6 +115,17 @@ def info(self) -> "ProcessInfo":
returncode=self.returncode,
)

@property
def pid(self) -> int:
"""Return process PID.
Raises:
ValueError: Process is not running.
"""
if self._proc is None:
raise ValueError
return self._proc.pid

def _make_wdir(self):
if self.wdir:
makedirs(self.wdir, exist_ok=True)
Expand All @@ -119,23 +138,24 @@ def _dump(self):
fobj.write(str(self.pid))

def run(self):
"""Run this process."""
self._make_wdir()
logger.debug(
"Appending output to '%s'",
self.stdout_path,
)
self._stdout = open(self.stdout_path, "ab")
stdout = self._fd_stack.enter_context(open(self.stdout_path, "ab"))
try:
# pylint: disable=consider-using-with
self._proc = subprocess.Popen(
self.args,
stdin=subprocess.DEVNULL,
stdout=self._stdout,
stdout=stdout,
stderr=subprocess.STDOUT,
close_fds=True,
shell=False,
env=self.env,
)
self.pid: int = self._proc.pid
self._dump()
except Exception:
if self._proc is not None:
Expand Down Expand Up @@ -167,8 +187,6 @@ def spawn(cls, *args, **kwargs) -> Optional[int]:
Returns: The spawned process PID.
"""
import multiprocessing as mp

proc = mp.Process(
target=cls._spawn,
args=args,
Expand Down
22 changes: 7 additions & 15 deletions src/dvc_task/proc/tasks.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,19 @@
from typing import Dict, List, Optional, Union
"""Celery tasks."""
from typing import Any, Dict

from celery import shared_task

from .process import ManagedProcess


@shared_task(bind=True)
def run(
self,
args: Union[str, List[str]],
env: Optional[Dict[str, str]] = None,
wdir: Optional[str] = None,
name: Optional[str] = None,
) -> Optional[int]:
def run( # pylint: disable=unused-argument
self, *args: Any, **kwargs: Any
) -> Dict[str, Any]:
"""Run a command inside a celery task.
Arguments:
args: Command to run.
env: Optional environment variables.
wdir: If specified, redirected output files will be placed in `wdir`.
name: Name to use for this process, if not specified a UUID will be
generated instead.
Accepts the same arguments as `proc.process.ManagedProcess`.
"""
with ManagedProcess(args, env=env, wdir=wdir, name=name) as proc:
with ManagedProcess(*args, **kwargs) as proc:
self.update_state(state="RUNNING", meta=proc.info.asdict())
return proc.info.asdict()
14 changes: 9 additions & 5 deletions src/dvc_task/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""General utilities."""

import errno
import logging
import os
Expand All @@ -10,19 +12,19 @@


def _chmod( # pylint: disable=unused-argument
func: Callable, p: str, excinfo: Any
func: Callable, path: str, excinfo: Any
):
perm = os.lstat(p).st_mode
perm = os.lstat(path).st_mode
perm |= stat.S_IWRITE

try:
os.chmod(p, perm)
os.chmod(path, perm)
except OSError as exc:
# broken symlink or file is not owned by us
if exc.errno not in [errno.ENOENT, errno.EPERM]:
raise

func(p)
func(path)


def _unlink(path: str, onerror: Callable):
Expand All @@ -33,6 +35,7 @@ def _unlink(path: str, onerror: Callable):


def remove(path: str):
"""Remove the specified path."""
logger.debug("Removing '%s'", path)
try:
if os.path.isdir(path):
Expand All @@ -45,6 +48,7 @@ def remove(path: str):


def makedirs(path: str, exist_ok: bool = False, mode: Optional[int] = None):
"""Make the specified directory and any parent directories."""
if mode is None:
os.makedirs(path, exist_ok=exist_ok)
return
Expand All @@ -61,7 +65,7 @@ def makedirs(path: str, exist_ok: bool = False, mode: Optional[int] = None):
# Defeats race condition when another thread created the path
pass
cdir = os.curdir
if tail == cdir: # xxx/newdir/. exists if xxx/newdir exists
if tail == cdir: # foo/newdir/. exists if foo/newdir exists
return
try:
os.mkdir(path, mode)
Expand Down
5 changes: 3 additions & 2 deletions src/dvc_task/worker/temporary.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Temporary worker module."""
import logging
import threading
import time
Expand All @@ -11,7 +12,7 @@
class TemporaryWorker:
"""Temporary worker that automatically shuts down when queue is empty."""

def __init__(
def __init__( # pylint: disable=too-many-arguments
self,
app: Celery,
timeout: int = 60,
Expand Down Expand Up @@ -54,7 +55,7 @@ def start(self, name: str) -> None:
f"--hostname={name}",
]
if self.concurrency:
f"--concurrency={self.concurrency}",
argv.append(f"--concurrency={self.concurrency}")
if self.task_events:
argv.append("-E")
self.app.worker_main(argv=argv)
Expand Down
Loading

0 comments on commit 8431bea

Please sign in to comment.