diff --git a/README.md b/README.md index 27bb3c0..5b9e20b 100644 --- a/README.md +++ b/README.md @@ -89,27 +89,17 @@ print(retry_get_region("", "/var/pktfwd/region")) ``` ## LockSingleton -`LockSingleton` prevents the concurrent access to a resource. +`LockSingleton` prevents the concurrent access to a resource across threads. ### Methods -**InterprocessLock(name, initial_value=1)** +**LockSingleton()** -Creates a new `InterprocessLock` object. -- `name` uniquely identifies the `LockSingleton` across processes in the system -- `available_resources` the count of the available resources -- `reset` set `True` to reset the available_resources +Creates a new `LockSingleton` object. -Note: - The available_resources of a `InterprocessLock` get reset on every restart of the system or docker container. - It's tested in Ubuntu 20.04 desktop and diagnostics container in a Hotspot. - Resetting the available_resources by passing the `reset=True` should be used with a caution and it can be used - in a very specific scenarios such as in the development environment. It's designed for facilitating - the development. It's not recommended to be used in production. +**acquire(timeout = DEFAULT_TIMEOUT)** -**acquire([timeout = None])** - -Waits until the resource is available and then returns, decrementing the available count. +Waits until the resource is available. DEFAULT_TIMEOUT = 2 seconds **release()** @@ -119,13 +109,9 @@ Release the resource. Check if there is an available resource. -**value()** - -Returns the count of available resources. - ### Usage ``` -lock = LockSingleton("some_resource") +lock = LockSingleton() try: # try to acquire the resource or may raise an exception @@ -140,20 +126,18 @@ try: lock.release() except ResourceBusyError: print("The resource is busy now.") -except CannotLockError: - print("Can't lock the resource for some internal issue.") ``` -### `@ecc_lock` decorator -`@ecc_lock(timeout=DEFAULT_TIMEOUT, raise_exception=False):` +### `@lock_ecc` decorator +`@lock_ecc(timeout=DEFAULT_TIMEOUT, raise_exception=False):` This is the convenient decorator wrapping around the `LockSingleton`. - - timeout: timeout value. DEFAULT_TIMEOUT = 2 seconds + - timeout: timeout value. DEFAULT_TIMEOUT = 2 seconds. - raise_exception: set True to raise exception in case of timeout or some error, otherwise just log the error msg Usage ``` -@ecc_lock() +@lock_ecc() def run_gateway_mfr(): return subprocess.run( [gateway_mfr_path, "key", "0"], diff --git a/hm_pyhelper/interprocess_lock.py b/hm_pyhelper/interprocess_lock.py deleted file mode 100644 index 4d58b60..0000000 --- a/hm_pyhelper/interprocess_lock.py +++ /dev/null @@ -1,129 +0,0 @@ -import functools -import os -import posix_ipc -from hm_pyhelper.logger import get_logger - -LOGGER = get_logger(__name__) - -LOCK_ECC = 'LOCK_ECC' -DEFAULT_TIMEOUT = 2.0 # 2 seconds - - -class InterprocessLock(object): - _prefix = "LockSingleton." - _mode = 0o644 - - def __init__(self, name, available_resources=1, reset=False): - """ - name: uniquely identifies the `LockSingleton` across processes - available_resources: the count of the available resources - reset: set True to reset the available_resources - - The available_resources of a `InterprocessLock` get reset on every - restart of the system or docker container. It's tested in Ubuntu - 20.04 desktop and diagnostics container in a Hotspot. - Resetting the available_resources by passing the `reset=True` should be - used with a caution and it can be used in a very specific scenarios - such as in the development environment. It's designed for facilitating - the development. It's not recommended to be used in production. - """ - self._name = self._prefix + name - # so it doesn't interfere with our semaphore mode - old_umask = os.umask(0) - try: - self._sem = posix_ipc.Semaphore(self._name, - mode=self._mode, - flags=posix_ipc.O_CREAT, - initial_value=available_resources) - - if reset: - """ Some hack to set the Semaphore's read-only value -https://github.com/rwarren/SystemEvent/blob/87422d850a3f0a4631528f5fbee23904170c0703/SystemEvent/__init__.py#L47 -https://github.com/GEANT/CAT/blob/b53bc299c7e822c7abd8deb1ee1a9e44f3f465da/ansible/ManagedSP/templates/daemon/fr_restart.py#L46 -https://github.com/south-coast-science/scs_host_rpi/blob/50b6277b4281b043d7c8f340371183faea4c5b8c/src/scs_host/sync/binary_semaphore.py#L54 - """ - while available_resources > self._sem.value: - self._sem.release() - while available_resources < self._sem.value: - self._sem.acquire() - - finally: - os.umask(old_umask) - - def acquire(self, timeout=DEFAULT_TIMEOUT): - """Acquire the lock - """ - try: - self._sem.acquire(timeout) - except posix_ipc.BusyError: - raise ResourceBusyError() - except posix_ipc.Error: # Catch all IPC Errors except BusyError - raise CannotLockError() - - def release(self): - """Release the lock - """ - self._sem.release() - - def locked(self): - return self.value() == 0 - - def value(self): - return self._sem.value - - -class ResourceBusyError(posix_ipc.Error): - """ - Raised when a call times out - """ - - def __init__(self, *args, **kwargs): - pass - - -class CannotLockError(posix_ipc.Error): - """ - Raised when can not lock the resource due to the permission issue, - wrong IPC object or whatever internal issue. - """ - - def __init__(self, *args, **kwargs): - pass - - -def ecc_lock(timeout=DEFAULT_TIMEOUT, raise_exception=False): - """ - Returns an ECC LOCK decorator. - - timeout: timeout value. DEFAULT_TIMEOUT = 2 seconds. - raise_exception: set True to raise exception in case of timeout and error. - Otherwise just log the error msg - """ - - def decorator_ecc_lock(func): - lock = InterprocessLock(LOCK_ECC) - - @functools.wraps(func) - def wrapper_ecc_lock(*args, **kwargs): - try: - # try to acquire the ECC resource or may raise an exception - lock.acquire(timeout=timeout) - - value = func(*args, **kwargs) - - # release the resource - lock.release() - - return value - except ResourceBusyError as ex: - LOGGER.error("ECC is busy now.") - if raise_exception: - raise ex - except CannotLockError as ex: - LOGGER.error("Can't lock the ECC for some internal issue.") - if raise_exception: - raise ex - - return wrapper_ecc_lock - - return decorator_ecc_lock diff --git a/hm_pyhelper/lock_singleton.py b/hm_pyhelper/lock_singleton.py new file mode 100644 index 0000000..d4560d8 --- /dev/null +++ b/hm_pyhelper/lock_singleton.py @@ -0,0 +1,68 @@ +import functools +import threading +from hm_pyhelper.logger import get_logger + +LOGGER = get_logger(__name__) + +DEFAULT_TIMEOUT = 2.0 # 2 seconds + + +class LockSingleton(object): + _instance = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + if not cls._instance: + with cls._lock: + if not cls._instance: + cls._instance = super(LockSingleton, cls).__new__(cls) + return cls._instance + + def acquire(self, timeout=DEFAULT_TIMEOUT): + if not self._lock.acquire(blocking=True, timeout=timeout): + raise ResourceBusyError() + + def release(self): + self._lock.release() + + def locked(self): + return self._lock.locked() + + +class ResourceBusyError(Exception): + """Raised when the resource is busy""" + pass + + +def lock_ecc(timeout=DEFAULT_TIMEOUT, raise_exception=False): + """ + Returns a decorator that locks the ECC. + + timeout: timeout value. DEFAULT_TIMEOUT = 2 seconds. + raise_exception: set True to raise exception in case of timeout and error. + Otherwise just log the error msg + """ + + def decorator_lock_ecc(func): + lock = LockSingleton() + + @functools.wraps(func) + def wrapper_lock_ecc(*args, **kwargs): + try: + # try to acquire the ECC resource or may raise an exception + lock.acquire(timeout=timeout) + + value = func(*args, **kwargs) + + # release the resource + lock.release() + + return value + except Exception as ex: + LOGGER.error("ECC is busy now.") + if raise_exception: + raise ex + + return wrapper_lock_ecc + + return decorator_lock_ecc diff --git a/hm_pyhelper/miner_param.py b/hm_pyhelper/miner_param.py index 4fabf82..48deaad 100644 --- a/hm_pyhelper/miner_param.py +++ b/hm_pyhelper/miner_param.py @@ -2,7 +2,7 @@ import subprocess import json from retry import retry -from hm_pyhelper.interprocess_lock import ecc_lock +from hm_pyhelper.lock_singleton import lock_ecc from hm_pyhelper.logger import get_logger from hm_pyhelper.exceptions import MalformedRegionException, \ SPIUnavailableException, ECCMalfunctionException @@ -15,7 +15,7 @@ SPI_UNAVAILABLE_SLEEP_SECONDS = 60 -@ecc_lock() +@lock_ecc() def run_gateway_mfr(args): direct_path = os.path.dirname(os.path.abspath(__file__)) gateway_mfr_path = os.path.join(direct_path, 'gateway_mfr') @@ -83,7 +83,7 @@ def provision_key(): return True try: - @ecc_lock() + @lock_ecc() def run_gateway_mfr(): return subprocess.run( [gateway_mfr_path, "provision"], diff --git a/hm_pyhelper/tests/test_interprocess_lock.py b/hm_pyhelper/tests/test_lock_singleton.py similarity index 70% rename from hm_pyhelper/tests/test_interprocess_lock.py rename to hm_pyhelper/tests/test_lock_singleton.py index 987c0cf..3e02352 100644 --- a/hm_pyhelper/tests/test_interprocess_lock.py +++ b/hm_pyhelper/tests/test_lock_singleton.py @@ -3,8 +3,8 @@ import pytest import mock from time import sleep -from hm_pyhelper.interprocess_lock import ecc_lock, InterprocessLock, \ - ResourceBusyError, LOCK_ECC +from hm_pyhelper.lock_singleton import LockSingleton, ResourceBusyError, \ + lock_ecc # https://gist.github.com/sbrugman/59b3535ebcd5aa0e2598293cfa58b6ab @@ -34,13 +34,12 @@ def run(self): raise last_exception -class TestInterprocessLock(unittest.TestCase): - def test_interprocess_lock_basic(self): - lock = InterprocessLock('test', available_resources=1, reset=True) +class TestLockSingleton(unittest.TestCase): + def test_lock_singleton_simple(self): + lock = LockSingleton() self.assertFalse(lock.locked()) lock.acquire() - self.assertTrue(lock.locked()) # Do some work sleep(0.001) @@ -48,23 +47,27 @@ def test_interprocess_lock_basic(self): lock.release() self.assertFalse(lock.locked()) - def test_interprocess_lock_timeout(self): + def test_lock_singleton_timeout(self): """ Start a slow running thread and then try to acquire a shared lock. Expect the acquire call to throw an exception. """ - lock = InterprocessLock('test', available_resources=1, reset=True) def slow_task(): - sleep(0.01) - # lock.release() + sleep(0.1) + lock.release() return True - slow_thread = threading.Thread(target=slow_task, daemon=True) + lock = LockSingleton() + # ECC is going to be occupied by `slow_task`. + slow_thread = threading.Thread(target=slow_task, daemon=True) lock.acquire() slow_thread.start() + self.assertTrue(lock.locked()) + + # Try to acquire the ECC that is occupied by `slow_task` currently. expected_exception = False try: lock.acquire(timeout=0.00001) @@ -73,23 +76,23 @@ def slow_task(): self.assertTrue(expected_exception) - def test_interprocess_lock_racing(self): + def test_lock_singleton_racing(self): def slow_task(): - lock = InterprocessLock('racing') + lock = LockSingleton() lock.acquire() self.assertTrue(lock.locked()) # Do some slow work print("Starting the slow task...") - sleep(0.003) + sleep(0.01) print("Finished the slow task!") lock.release() self.assertFalse(lock.locked()) def fast_task(): - lock = InterprocessLock('racing') + lock = LockSingleton() lock.acquire() self.assertTrue(lock.locked()) @@ -102,55 +105,43 @@ def fast_task(): lock.release() self.assertFalse(lock.locked()) - lock = InterprocessLock('racing', available_resources=1, reset=True) - slow_thread = threading.Thread(target=slow_task, daemon=True) fast_thread = threading.Thread(target=fast_task, daemon=True) - # Ensure there is no lock before starting the work - self.assertFalse(lock.locked()) - # Start work print("\n") print("Initiated the slow task.") slow_thread.start() + print("Initiated the fast task.") fast_thread.start() # Wait to finish - fast_thread.join() - slow_thread.join() - - # Ensure there is no lock after finishing the work - self.assertFalse(lock.locked()) + try: + fast_thread.join() + slow_thread.join() + except Exception as ex: + print(ex) - def test_ecc_lock_basic(self): - @ecc_lock() + def test_lock_ecc_simple(self): + @lock_ecc() def some_task(): sleep(0.00001) - lock = InterprocessLock(LOCK_ECC, available_resources=1, reset=True) - self.assertFalse(lock.locked()) - some_task_thread = threading.Thread(target=some_task, daemon=True) some_task_thread.start() - self.assertTrue(lock.locked()) some_task_thread.join() - self.assertFalse(lock.locked()) - def test_ecc_lock_timeout(self): - @ecc_lock() + def test_lock_ecc_timeout(self): + @lock_ecc() def slow_task(): sleep(0.1) - @ecc_lock(timeout=0.01, raise_exception=True) + @lock_ecc(timeout=0.01, raise_exception=True) def lock_with_timeout(): pass - lock = InterprocessLock(LOCK_ECC, available_resources=1, reset=True) - self.assertFalse(lock.locked()) - slow_thread = threading.Thread(target=slow_task, daemon=True) slow_thread.start() @@ -162,24 +153,21 @@ def lock_with_timeout(): self.assertTrue(expected_exception) - def test_ecc_lock_racing(self): - @ecc_lock() + def test_lock_ecc_racing(self): + @lock_ecc() def slow_task(): # Do some slow work print("Starting the slow task...") sleep(0.003) print("Finished the slow task!") - @ecc_lock() + @lock_ecc() def fast_task(): # Do some slow work print("Starting the fast task...") sleep(0.001) print("Finished the fast task!") - lock = InterprocessLock(LOCK_ECC, available_resources=1, reset=True) - self.assertFalse(lock.locked()) - slow_thread = threading.Thread(target=slow_task, daemon=True) fast_thread = threading.Thread(target=fast_task, daemon=True) @@ -193,6 +181,3 @@ def fast_task(): # Wait to finish fast_thread.join() slow_thread.join() - - # Ensure there is no lock after finishing the work - self.assertFalse(lock.locked()) diff --git a/requirements.txt b/requirements.txt index 1801067..6794e0f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ jsonrpcclient==3.3.6 requests==2.26.0 retry==0.9.2 -posix-ipc==1.0.5