Skip to content

Commit

Permalink
chore: improve doc strings and parameters names
Browse files Browse the repository at this point in the history
improve clarity of the utility of `ActiveOperationTracker` in doc
strings and parameter names / instance variables.
  • Loading branch information
aaraney committed Mar 28, 2024
1 parent b9bc835 commit d5d87d3
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 27 deletions.
8 changes: 4 additions & 4 deletions python/services/dataservice/dmod/dataservice/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
DataProvisionManager,
RequiredDataChecksManager,
ServiceManager,
TempDatasetsManager,
TempDataTaskManager,
DockerS3FSPluginHelper,
)
from .service_settings import ServiceSettings
Expand Down Expand Up @@ -103,16 +103,16 @@ def main():
required_data_checks_manager = RequiredDataChecksManager(
job_util=job_util,
dataset_manager_collection=dataset_manager_collection,
tracker=count,
checks_underway_tracker=count,
dataset_inquery_util=dataset_inquery_util,
)
data_provision_manager = DataProvisionManager(job_util=job_util,
dataset_manager_collection=dataset_manager_collection,
docker_s3fs_helper=docker_s3fs_plugin_helper,
data_derive_util=data_derive_util,
tracker=count,
provision_underway_tracker=count,
)
temp_datasets_manager = TempDatasetsManager(dataset_manager_collection=dataset_manager_collection, tracker=count)
temp_datasets_manager = TempDataTaskManager(dataset_manager_collection=dataset_manager_collection, safe_to_exec_tracker=count)

# Handles websocket communication and async task loop
service_manager = ServiceManager(
Expand Down
58 changes: 36 additions & 22 deletions python/services/dataservice/dmod/dataservice/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,21 +681,21 @@ class RequiredDataChecksManager:
Access and update jobs
dataset_manager_collection : DatasetManagerCollection
Facilitates creating and accessing Datasets
tracker : ActiveOperationTracker
Semaphore-like object for signaling task processing state
checks_underway_tracker : ActiveOperationTracker
Semaphore-like object for signaling that data checks are underway
dataset_inquery_util : DatasetInqueryUtil
Facilitates dataset detail queries and searches
"""
def __init__(
self,
job_util: JobUtil,
dataset_manager_collection: DatasetManagerCollection,
tracker: "ActiveOperationTracker",
checks_underway_tracker: "ActiveOperationTracker",
dataset_inquery_util: DatasetInqueryUtil,
):
self._job_util = job_util
self._managers = dataset_manager_collection
self._tracker = tracker
self._checks_underway_tracker = checks_underway_tracker
self._dataset_inquery_util: DatasetInqueryUtil = dataset_inquery_util

async def start(self) -> NoReturn:
Expand Down Expand Up @@ -819,7 +819,7 @@ async def perform_checks_for_job(self, job: Job) -> bool:
"""
# TODO: (later) should we check whether any 'fulfilled_by' datasets exist, or handle this differently?
# Ensure here that we block mark-and-sweep routing for temporary datasets
self._tracker.acquire()
self._checks_underway_tracker.acquire()
try:
# Create/lookup dataset user job wrapper instance for this job
existing_ds_users: Dict[UUID, DatasetUser] = _get_ds_users(self._managers)
Expand Down Expand Up @@ -852,23 +852,24 @@ async def perform_checks_for_job(self, job: Job) -> bool:
return False
finally:
# Unblock mark and sweep
self._tracker.release()
self._checks_underway_tracker.release()

class TempDatasetsManager:
class TempDataTaskManager:
"""
Async task for managing temporary datasets, including updating expire times and purging of expired datasets.
Async task that purges and prolongs the expiration of temporary datasets.
Start the task by calling ::method:`start`.
Parameters
----------
dataset_manager_collection : DatasetManagerCollection
Facilitates creating and accessing Datasets
tracker : ActiveOperationTracker
Used to synchronize when it is okay to remove temporary datasets
safe_to_exec_tracker : ActiveOperationTracker
Used to determine if it is okay to purge or prolong temporary datasets
"""

def __init__(self, dataset_manager_collection: DatasetManagerCollection, tracker: "ActiveOperationTracker"):
self._tracker = tracker
def __init__(self, dataset_manager_collection: DatasetManagerCollection, safe_to_exec_tracker: "ActiveOperationTracker"):
self._safe_to_exec_tracker = safe_to_exec_tracker
self._managers = dataset_manager_collection

self._marked_expired_datasets: Set[str] = set()
Expand All @@ -883,7 +884,7 @@ async def _manage_temp_datasets(self) -> NoReturn:
"""
while True:
# Ensure that mark and sweep doesn't proceed while something is potentially to linking datasets
while self._tracker.value > 0:
while self._safe_to_exec_tracker.value > 0:
await asyncio.sleep(10)
self._temp_dataset_mark_and_sweep()
await asyncio.sleep(3600)
Expand Down Expand Up @@ -930,21 +931,21 @@ class DataProvisionManager:
Facilitates initialize Docker volumes for jobs
data_derive_util : DataDeriveUtil
Facilitates deriving data and datasets
tracker : ActiveOperationTracker
Semaphore-like object for signaling task processing state
provision_underway_tracker: ActiveOperationTracker
Semaphore-like object for signaling that provisioning is underway
"""
def __init__(
self,
job_util: JobUtil,
dataset_manager_collection: DatasetManagerCollection,
docker_s3fs_helper: DockerS3FSPluginHelper,
data_derive_util: DataDeriveUtil,
tracker: "ActiveOperationTracker",
provision_underway_tracker: "ActiveOperationTracker",
):
self._job_util = job_util
self._managers = dataset_manager_collection
self._docker_s3fs_helper = docker_s3fs_helper
self._tracker = tracker
self._provision_underway_tracker = provision_underway_tracker
self._data_derive_util: DataDeriveUtil = data_derive_util

async def start(self) -> NoReturn:
Expand All @@ -967,7 +968,7 @@ async def _manage_data_provision(self):
logging.debug("Managing provisioning for job {} that is awaiting data.".format(job.job_id))
try:
# Block temp dataset purging and maintenance while we handle things here
self._tracker.acquire()
self._provision_underway_tracker.acquire()
# Derive any datasets as required
reqs_w_derived_datasets = await self._data_derive_util.derive_datasets(job)
logging.info('Job {} had {} datasets derived.'.format(job.job_id, len(reqs_w_derived_datasets)))
Expand All @@ -988,7 +989,7 @@ async def _manage_data_provision(self):
self._job_util.save_job(job)
continue
finally:
self._tracker.release()
self._provision_underway_tracker.release()

job.set_status_step(JobExecStep.AWAITING_SCHEDULING)
self._job_util.save_job(job)
Expand Down Expand Up @@ -1037,11 +1038,24 @@ def _get_ds_users(managers: DatasetManagerCollection) -> Dict[UUID, DatasetUser]
return {uuid: mgr.get_dataset_user(uuid)
for _, mgr in managers.managers() for uuid in mgr.get_dataset_user_ids()}

# TODO: replace with RWLock implementation
class ActiveOperationTracker:
"""
Unbound Semaphore-like class for counting acquisitions and releases. Unlike a semaphore, `acquire()` nor `release()`
block. `acquire()` and `release()` increment and decrement an internal value respectively. The internal count can be
read via the `value` property.
Unbound Semaphore-like synchronization object for counting acquisitions and releases. Unlike a
semaphore, neither `acquire()` nor `release()` block. `acquire()` and `release()` increment and
decrement an internal value respectively. The internal count can be read via the `value`
property. Unlike most semaphore implementations, `value` < 0 does not throw.
Example:
def foo(tracker: ActiveOperationTracker):
tracker.acquire()
# do something e.g. contact data store
tracker.release()
def bar(tracker: ActiveOperationTracker):
while tracker.value > 0:
sleep(5)
# do something
"""
def __init__(self, value: int = 0):
self._count = value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def setUp(self) -> None:
self.manager = RequiredDataChecksManager(
job_util=None,
dataset_manager_collection=dataset_manager_collection,
tracker=ActiveOperationTracker(),
checks_underway_tracker=ActiveOperationTracker(),
dataset_inquery_util=dataset_inquery_util,
)

Expand Down

0 comments on commit d5d87d3

Please sign in to comment.