Skip to content

Commit

Permalink
Add locks to process operations to prevent race conditions.
Browse files Browse the repository at this point in the history
fix: #93

Our old solution on #53, can reduce possibility of race condition, but
it still happens occasionally.

1. Add locks to write and read operations to prevent this kind of error.
2. Remove old file replacing solution.
3. Use reraise to simplify the code in `__getitem__`
  • Loading branch information
karajan1001 authored and pmrowla committed Nov 8, 2022
1 parent 22b25a4 commit 6e6dc47
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 9 deletions.
6 changes: 2 additions & 4 deletions src/dvc_task/proc/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,10 @@ def __iter__(self) -> Generator[str, None, None]:
for name in os.listdir(self.wdir):
yield name

@reraise(FileNotFoundError, KeyError)
def __getitem__(self, key: str) -> "ProcessInfo":
info_path = self._get_info_path(key)
try:
return ProcessInfo.load(info_path)
except FileNotFoundError as exc:
raise KeyError from exc
return ProcessInfo.load(info_path)

@reraise(FileNotFoundError, KeyError)
def __setitem__(self, key: str, value: "ProcessInfo"):
Expand Down
17 changes: 12 additions & 5 deletions src/dvc_task/proc/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from funcy import cached_property
from shortuuid import uuid

from ..contrib.kombu_filesystem import LOCK_EX, LOCK_SH, lock, unlock
from ..utils import makedirs
from .exceptions import TimeoutExpired

Expand All @@ -37,18 +38,24 @@ def from_dict(cls, data: Dict[str, Any]) -> "ProcessInfo":
def load(cls, filename: str) -> "ProcessInfo":
"""Construct the process information from a file."""
with open(filename, "r", encoding="utf-8") as fobj:
return cls.from_dict(json.load(fobj))
lock(fobj, LOCK_SH)
try:
return cls.from_dict(json.load(fobj))
finally:
unlock(fobj)

def asdict(self) -> Dict[str, Any]:
"""Return this info as a dictionary."""
return asdict(self)

def dump(self, filename: str) -> None:
"""Dump the process information into a file."""
temp_info_file = f"{filename}.{uuid()}"
with open(temp_info_file, "w", encoding="utf-8") as fobj:
json.dump(self.asdict(), fobj)
os.replace(temp_info_file, filename)
with open(filename, "w", encoding="utf-8") as fobj:
lock(fobj, LOCK_EX)
try:
json.dump(self.asdict(), fobj)
finally:
unlock(fobj)


class ManagedProcess(AbstractContextManager):
Expand Down

0 comments on commit 6e6dc47

Please sign in to comment.