diff --git a/src/dvc_task/proc/manager.py b/src/dvc_task/proc/manager.py index 374d3d0..9a0d3ed 100644 --- a/src/dvc_task/proc/manager.py +++ b/src/dvc_task/proc/manager.py @@ -45,12 +45,10 @@ def __iter__(self) -> Generator[str, None, None]: for name in os.listdir(self.wdir): yield name + @reraise(FileNotFoundError, KeyError) def __getitem__(self, key: str) -> "ProcessInfo": info_path = self._get_info_path(key) - try: - return ProcessInfo.load(info_path) - except FileNotFoundError as exc: - raise KeyError from exc + return ProcessInfo.load(info_path) @reraise(FileNotFoundError, KeyError) def __setitem__(self, key: str, value: "ProcessInfo"): diff --git a/src/dvc_task/proc/process.py b/src/dvc_task/proc/process.py index ba76d81..63dff3a 100644 --- a/src/dvc_task/proc/process.py +++ b/src/dvc_task/proc/process.py @@ -12,6 +12,7 @@ from funcy import cached_property from shortuuid import uuid +from ..contrib.kombu_filesystem import LOCK_EX, LOCK_SH, lock, unlock from ..utils import makedirs from .exceptions import TimeoutExpired @@ -37,7 +38,11 @@ def from_dict(cls, data: Dict[str, Any]) -> "ProcessInfo": def load(cls, filename: str) -> "ProcessInfo": """Construct the process information from a file.""" with open(filename, "r", encoding="utf-8") as fobj: - return cls.from_dict(json.load(fobj)) + lock(fobj, LOCK_SH) + try: + return cls.from_dict(json.load(fobj)) + finally: + unlock(fobj) def asdict(self) -> Dict[str, Any]: """Return this info as a dictionary.""" @@ -45,10 +50,12 @@ def asdict(self) -> Dict[str, Any]: def dump(self, filename: str) -> None: """Dump the process information into a file.""" - temp_info_file = f"{filename}.{uuid()}" - with open(temp_info_file, "w", encoding="utf-8") as fobj: - json.dump(self.asdict(), fobj) - os.replace(temp_info_file, filename) + with open(filename, "w", encoding="utf-8") as fobj: + lock(fobj, LOCK_EX) + try: + json.dump(self.asdict(), fobj) + finally: + unlock(fobj) class ManagedProcess(AbstractContextManager):