diff --git a/src/extrainterpreters/lock.py b/src/extrainterpreters/lock.py index c1eb074..dd1c256 100644 --- a/src/extrainterpreters/lock.py +++ b/src/extrainterpreters/lock.py @@ -26,6 +26,24 @@ class _LockBuffer(StructBase): class _CrossInterpreterStructLock: + """Foundations for cross-interpreter lock. + + Used internally, coupled wiht larger memory structs + from which it will consume a single byte - it will + ideally lock those structs. + + A "struct" should be a StructBase class containing a single-byte + "lock" field, with a proper buffer. + + (Keep in mind that when the struct is passed to other interpreters, + if dealocated in the interpreter of origin, the "byte" buffer used + will point to unalocated memory, with certain disaster ahead) + + It is also used as base for the public Lock classes bellow: + those can be used in user-code. + + """ + def __init__(self, struct, timeout=DEFAULT_TIMEOUT): buffer_ptr, size = _address_and_size(struct._data) # , struct._offset) # struct_ptr = buffer_ptr + struct._offset @@ -44,30 +62,36 @@ def timeout(self, timeout: None | float): """ self._timeout = timeout return self - def __enter__(self): + try: + return self.acquire(self._timeout) + finally: + self._timeout = self._original_timeout + + def acquire(self, timeout): # Remember: all attributes are "interpreter-local" # just the bytes in the passed in struct are shared. if self._entered: self._entered += 1 return self - if self._timeout is None: + if timeout is None or timeout == 0: if not _atomic_byte_lock(self._lock_address): - self._timeout = self._original_timeout raise ResourceBusyError("Couldn't acquire lock") else: - threshold = time.time() + self._timeout - while time.time() <= threshold: + threshold = time.monotonic() + timeout + while time.monotonic() <= threshold: if _atomic_byte_lock(self._lock_address): break time.sleep(TIME_RESOLUTION * 4) else: - self._timeout = self._original_timeout raise TimeoutError("Timeout trying to acquire lock") self._entered += 1 return self def __exit__(self, *args): + self.release() + + def release(self): if not self._entered: return self._entered -= 1 @@ -76,7 +100,7 @@ def __exit__(self, *args): buffer = _remote_memory(self._lock_address, 1) buffer[0] = 0 del buffer - self._entered = False + self._entered = 0 self._timeout = self._original_timeout def __getstate__(self): @@ -110,7 +134,7 @@ def acquire(self, blocking=True, timeout=-1): timeout = TIMEOUT_MAX if timeout == -1 or not blocking else timeout self._lock.timeout(timeout) self._lock.__enter__() - return + return True else: self._lock.timeout(None) try: @@ -152,8 +176,32 @@ class RLock(IntRLock): class Lock(IntRLock): - ... - #def acquire(self, blocking=True, timeout=-1): - #if self.locked(): - #if not blocking or timeout == -1: + _delay = 4 * TIME_RESOLUTION + + def acquire(self, blocking=True, timeout=-1): + locked = self.locked() + if self.locked() and not blocking: + return False + timeout = TIMEOUT_MAX if timeout == -1 else timeout + start = time.monotonic() + retry = False + while time.monotonic() - start < timeout: + if self.locked(): + retry = True + else: + try: + self._lock.acquire(0) + except ResourceBusyError: + retry = True + + if self._lock._entered > 1: + retry = True + self._lock.release() + + if not retry: + return True + + time.sleep(self._delay) + retry = False + raise ResourceBusyError("Could not acquire lock") diff --git a/tests/test_lock.py b/tests/test_lock.py index 4cfd12d..421fc4a 100644 --- a/tests/test_lock.py +++ b/tests/test_lock.py @@ -33,10 +33,9 @@ def test_locks_work_as_context_manager(LockCls): def test_lock_cant_be_reacquired_same_interpreter(): lock = Lock() - lock.acquire() + assert lock.acquire() - with pytest.raises(TimeoutError): - lock.acquire(timeout=0) + assert not lock.acquire(blocking=False) @pytest.mark.skip("to be implemented") def test_lock_cant_be_reacquired_other_interpreter():