From d5d87d3d816a9f47facf515ec7b6c5354a45b142 Mon Sep 17 00:00:00 2001 From: Austin Raney Date: Thu, 28 Mar 2024 11:29:42 -0400 Subject: [PATCH] chore: improve doc strings and parameters names improve clarity of the utility of `ActiveOperationTracker` in doc strings and parameter names / instance variables. --- .../dataservice/dmod/dataservice/__main__.py | 8 +-- .../dataservice/dmod/dataservice/service.py | 58 ++++++++++++------- .../test/test_required_data_checks_manager.py | 2 +- 3 files changed, 41 insertions(+), 27 deletions(-) diff --git a/python/services/dataservice/dmod/dataservice/__main__.py b/python/services/dataservice/dmod/dataservice/__main__.py index d040fd085..f4ed16c17 100644 --- a/python/services/dataservice/dmod/dataservice/__main__.py +++ b/python/services/dataservice/dmod/dataservice/__main__.py @@ -23,7 +23,7 @@ DataProvisionManager, RequiredDataChecksManager, ServiceManager, - TempDatasetsManager, + TempDataTaskManager, DockerS3FSPluginHelper, ) from .service_settings import ServiceSettings @@ -103,16 +103,16 @@ def main(): required_data_checks_manager = RequiredDataChecksManager( job_util=job_util, dataset_manager_collection=dataset_manager_collection, - tracker=count, + checks_underway_tracker=count, dataset_inquery_util=dataset_inquery_util, ) data_provision_manager = DataProvisionManager(job_util=job_util, dataset_manager_collection=dataset_manager_collection, docker_s3fs_helper=docker_s3fs_plugin_helper, data_derive_util=data_derive_util, - tracker=count, + provision_underway_tracker=count, ) - temp_datasets_manager = TempDatasetsManager(dataset_manager_collection=dataset_manager_collection, tracker=count) + temp_datasets_manager = TempDataTaskManager(dataset_manager_collection=dataset_manager_collection, safe_to_exec_tracker=count) # Handles websocket communication and async task loop service_manager = ServiceManager( diff --git a/python/services/dataservice/dmod/dataservice/service.py b/python/services/dataservice/dmod/dataservice/service.py index fa35a0e48..44c3316e8 100644 --- a/python/services/dataservice/dmod/dataservice/service.py +++ b/python/services/dataservice/dmod/dataservice/service.py @@ -681,8 +681,8 @@ class RequiredDataChecksManager: Access and update jobs dataset_manager_collection : DatasetManagerCollection Facilitates creating and accessing Datasets - tracker : ActiveOperationTracker - Semaphore-like object for signaling task processing state + checks_underway_tracker : ActiveOperationTracker + Semaphore-like object for signaling that data checks are underway dataset_inquery_util : DatasetInqueryUtil Facilitates dataset detail queries and searches """ @@ -690,12 +690,12 @@ def __init__( self, job_util: JobUtil, dataset_manager_collection: DatasetManagerCollection, - tracker: "ActiveOperationTracker", + checks_underway_tracker: "ActiveOperationTracker", dataset_inquery_util: DatasetInqueryUtil, ): self._job_util = job_util self._managers = dataset_manager_collection - self._tracker = tracker + self._checks_underway_tracker = checks_underway_tracker self._dataset_inquery_util: DatasetInqueryUtil = dataset_inquery_util async def start(self) -> NoReturn: @@ -819,7 +819,7 @@ async def perform_checks_for_job(self, job: Job) -> bool: """ # TODO: (later) should we check whether any 'fulfilled_by' datasets exist, or handle this differently? # Ensure here that we block mark-and-sweep routing for temporary datasets - self._tracker.acquire() + self._checks_underway_tracker.acquire() try: # Create/lookup dataset user job wrapper instance for this job existing_ds_users: Dict[UUID, DatasetUser] = _get_ds_users(self._managers) @@ -852,23 +852,24 @@ async def perform_checks_for_job(self, job: Job) -> bool: return False finally: # Unblock mark and sweep - self._tracker.release() + self._checks_underway_tracker.release() -class TempDatasetsManager: +class TempDataTaskManager: """ - Async task for managing temporary datasets, including updating expire times and purging of expired datasets. + Async task that purges and prolongs the expiration of temporary datasets. + Start the task by calling ::method:`start`. Parameters ---------- dataset_manager_collection : DatasetManagerCollection Facilitates creating and accessing Datasets - tracker : ActiveOperationTracker - Used to synchronize when it is okay to remove temporary datasets + safe_to_exec_tracker : ActiveOperationTracker + Used to determine if it is okay to purge or prolong temporary datasets """ - def __init__(self, dataset_manager_collection: DatasetManagerCollection, tracker: "ActiveOperationTracker"): - self._tracker = tracker + def __init__(self, dataset_manager_collection: DatasetManagerCollection, safe_to_exec_tracker: "ActiveOperationTracker"): + self._safe_to_exec_tracker = safe_to_exec_tracker self._managers = dataset_manager_collection self._marked_expired_datasets: Set[str] = set() @@ -883,7 +884,7 @@ async def _manage_temp_datasets(self) -> NoReturn: """ while True: # Ensure that mark and sweep doesn't proceed while something is potentially to linking datasets - while self._tracker.value > 0: + while self._safe_to_exec_tracker.value > 0: await asyncio.sleep(10) self._temp_dataset_mark_and_sweep() await asyncio.sleep(3600) @@ -930,8 +931,8 @@ class DataProvisionManager: Facilitates initialize Docker volumes for jobs data_derive_util : DataDeriveUtil Facilitates deriving data and datasets - tracker : ActiveOperationTracker - Semaphore-like object for signaling task processing state + provision_underway_tracker: ActiveOperationTracker + Semaphore-like object for signaling that provisioning is underway """ def __init__( self, @@ -939,12 +940,12 @@ def __init__( dataset_manager_collection: DatasetManagerCollection, docker_s3fs_helper: DockerS3FSPluginHelper, data_derive_util: DataDeriveUtil, - tracker: "ActiveOperationTracker", + provision_underway_tracker: "ActiveOperationTracker", ): self._job_util = job_util self._managers = dataset_manager_collection self._docker_s3fs_helper = docker_s3fs_helper - self._tracker = tracker + self._provision_underway_tracker = provision_underway_tracker self._data_derive_util: DataDeriveUtil = data_derive_util async def start(self) -> NoReturn: @@ -967,7 +968,7 @@ async def _manage_data_provision(self): logging.debug("Managing provisioning for job {} that is awaiting data.".format(job.job_id)) try: # Block temp dataset purging and maintenance while we handle things here - self._tracker.acquire() + self._provision_underway_tracker.acquire() # Derive any datasets as required reqs_w_derived_datasets = await self._data_derive_util.derive_datasets(job) logging.info('Job {} had {} datasets derived.'.format(job.job_id, len(reqs_w_derived_datasets))) @@ -988,7 +989,7 @@ async def _manage_data_provision(self): self._job_util.save_job(job) continue finally: - self._tracker.release() + self._provision_underway_tracker.release() job.set_status_step(JobExecStep.AWAITING_SCHEDULING) self._job_util.save_job(job) @@ -1037,11 +1038,24 @@ def _get_ds_users(managers: DatasetManagerCollection) -> Dict[UUID, DatasetUser] return {uuid: mgr.get_dataset_user(uuid) for _, mgr in managers.managers() for uuid in mgr.get_dataset_user_ids()} +# TODO: replace with RWLock implementation class ActiveOperationTracker: """ - Unbound Semaphore-like class for counting acquisitions and releases. Unlike a semaphore, `acquire()` nor `release()` - block. `acquire()` and `release()` increment and decrement an internal value respectively. The internal count can be - read via the `value` property. + Unbound Semaphore-like synchronization object for counting acquisitions and releases. Unlike a + semaphore, neither `acquire()` nor `release()` block. `acquire()` and `release()` increment and + decrement an internal value respectively. The internal count can be read via the `value` + property. Unlike most semaphore implementations, `value` < 0 does not throw. + + Example: + def foo(tracker: ActiveOperationTracker): + tracker.acquire() + # do something e.g. contact data store + tracker.release() + + def bar(tracker: ActiveOperationTracker): + while tracker.value > 0: + sleep(5) + # do something """ def __init__(self, value: int = 0): self._count = value diff --git a/python/services/dataservice/dmod/test/test_required_data_checks_manager.py b/python/services/dataservice/dmod/test/test_required_data_checks_manager.py index ea7febfac..9e38f1044 100644 --- a/python/services/dataservice/dmod/test/test_required_data_checks_manager.py +++ b/python/services/dataservice/dmod/test/test_required_data_checks_manager.py @@ -120,7 +120,7 @@ def setUp(self) -> None: self.manager = RequiredDataChecksManager( job_util=None, dataset_manager_collection=dataset_manager_collection, - tracker=ActiveOperationTracker(), + checks_underway_tracker=ActiveOperationTracker(), dataset_inquery_util=dataset_inquery_util, )