Skip to content

Commit

Permalink
chore: rename Count -> ActiveOperationTracker; update usage
Browse files Browse the repository at this point in the history
  • Loading branch information
aaraney committed Mar 18, 2024
1 parent 2199895 commit 76c8401
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 22 deletions.
10 changes: 5 additions & 5 deletions python/services/dataservice/dmod/dataservice/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from .dataset_inquery_util import DatasetInqueryUtil
from .dataset_manager_collection import DatasetManagerCollection
from .service import (
Count,
ActiveOperationTracker,
DataProvisionManager,
RequiredDataChecksManager,
ServiceManager,
Expand Down Expand Up @@ -97,22 +97,22 @@ def main():


# count is used to signal when it is okay to remove temporary datasets
count = Count()
count = ActiveOperationTracker()

# initialize background task objects
required_data_checks_manager = RequiredDataChecksManager(
job_util=job_util,
dataset_manager_collection=dataset_manager_collection,
count=count,
tracker=count,
dataset_inquery_util=dataset_inquery_util,
)
data_provision_manager = DataProvisionManager(job_util=job_util,
dataset_manager_collection=dataset_manager_collection,
docker_s3fs_helper=docker_s3fs_plugin_helper,
data_derive_util=data_derive_util,
count=count,
tracker=count,
)
temp_datasets_manager = TempDatasetsManager(dataset_manager_collection=dataset_manager_collection, count=count)
temp_datasets_manager = TempDatasetsManager(dataset_manager_collection=dataset_manager_collection, tracker=count)

# Handles websocket communication and async task loop
service_manager = ServiceManager(
Expand Down
30 changes: 15 additions & 15 deletions python/services/dataservice/dmod/dataservice/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ class RequiredDataChecksManager:
Access and update jobs
dataset_manager_collection : DatasetManagerCollection
Facilitates creating and accessing Datasets
count : Count
tracker : ActiveOperationTracker
Semaphore-like object for signaling task processing state
dataset_inquery_util : DatasetInqueryUtil
Facilitates dataset detail queries and searches
Expand All @@ -668,12 +668,12 @@ def __init__(
self,
job_util: JobUtil,
dataset_manager_collection: DatasetManagerCollection,
count: "Count",
tracker: "ActiveOperationTracker",
dataset_inquery_util: DatasetInqueryUtil,
):
self._job_util = job_util
self._managers = dataset_manager_collection
self._count = count
self._tracker = tracker
self._dataset_inquery_util: DatasetInqueryUtil = dataset_inquery_util

async def start(self) -> NoReturn:
Expand Down Expand Up @@ -797,7 +797,7 @@ async def perform_checks_for_job(self, job: Job) -> bool:
"""
# TODO: (later) should we check whether any 'fulfilled_by' datasets exist, or handle this differently?
# Ensure here that we block mark-and-sweep routing for temporary datasets
self._count.acquire()
self._tracker.acquire()
try:
# Create/lookup dataset user job wrapper instance for this job
existing_ds_users: Dict[UUID, DatasetUser] = _get_ds_users(self._managers)
Expand Down Expand Up @@ -830,7 +830,7 @@ async def perform_checks_for_job(self, job: Job) -> bool:
return False
finally:
# Unblock mark and sweep
self._count.release()
self._tracker.release()

class TempDatasetsManager:
"""
Expand All @@ -841,12 +841,12 @@ class TempDatasetsManager:
----------
dataset_manager_collection : DatasetManagerCollection
Facilitates creating and accessing Datasets
count : Count
tracker : ActiveOperationTracker
Used to synchronize when it is okay to remove temporary datasets
"""

def __init__(self, dataset_manager_collection: DatasetManagerCollection, count: "Count"):
self._count = count
def __init__(self, dataset_manager_collection: DatasetManagerCollection, tracker: "ActiveOperationTracker"):
self._tracker = tracker
self._managers = dataset_manager_collection

self._marked_expired_datasets: Set[str] = set()
Expand All @@ -861,7 +861,7 @@ async def _manage_temp_datasets(self) -> NoReturn:
"""
while True:
# Ensure that mark and sweep doesn't proceed while something is potentially to linking datasets
while self._count.value > 0:
while self._tracker.value > 0:
await asyncio.sleep(10)
self._temp_dataset_mark_and_sweep()
await asyncio.sleep(3600)
Expand Down Expand Up @@ -908,7 +908,7 @@ class DataProvisionManager:
Facilitates initialize Docker volumes for jobs
data_derive_util : DataDeriveUtil
Facilitates deriving data and datasets
count : Count
tracker : ActiveOperationTracker
Semaphore-like object for signaling task processing state
"""
def __init__(
Expand All @@ -917,12 +917,12 @@ def __init__(
dataset_manager_collection: DatasetManagerCollection,
docker_s3fs_helper: DockerS3FSPluginHelper,
data_derive_util: DataDeriveUtil,
count: "Count",
tracker: "ActiveOperationTracker",
):
self._job_util = job_util
self._managers = dataset_manager_collection
self._docker_s3fs_helper = docker_s3fs_helper
self._count = count
self._tracker = tracker
self._data_derive_util: DataDeriveUtil = data_derive_util

async def start(self) -> NoReturn:
Expand All @@ -945,7 +945,7 @@ async def _manage_data_provision(self):
logging.debug("Managing provisioning for job {} that is awaiting data.".format(job.job_id))
try:
# Block temp dataset purging and maintenance while we handle things here
self._count.acquire()
self._tracker.acquire()
# Derive any datasets as required
reqs_w_derived_datasets = await self._data_derive_util.derive_datasets(job)
logging.info('Job {} had {} datasets derived.'.format(job.job_id, len(reqs_w_derived_datasets)))
Expand All @@ -966,7 +966,7 @@ async def _manage_data_provision(self):
self._job_util.save_job(job)
continue
finally:
self._count.release()
self._tracker.release()

job.set_status_step(JobExecStep.AWAITING_SCHEDULING)
self._job_util.save_job(job)
Expand Down Expand Up @@ -1015,7 +1015,7 @@ def _get_ds_users(managers: DatasetManagerCollection) -> Dict[UUID, DatasetUser]
return {uuid: mgr.get_dataset_user(uuid)
for _, mgr in managers.managers() for uuid in mgr.get_dataset_user_ids()}

class Count:
class ActiveOperationTracker:
"""
Unbound Semaphore-like class for counting acquisitions and releases. Unlike a semaphore, `acquire()` nor `release()`
block. `acquire()` and `release()` increment and decrement an internal value respectively. The internal count can be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from ..dataservice.dataset_manager_collection import DatasetManagerCollection
from ..dataservice.dataset_inquery_util import DatasetInqueryUtil
from ..dataservice.service import Count, RequiredDataChecksManager
from ..dataservice.service import ActiveOperationTracker, RequiredDataChecksManager
from dmod.communication.client import get_or_create_eventloop
from dmod.core.dataset import DataCategory, DataDomain, Dataset, DatasetManager, DatasetType
from dmod.scheduler.job import RequestedJob
Expand Down Expand Up @@ -120,7 +120,7 @@ def setUp(self) -> None:
self.manager = RequiredDataChecksManager(
job_util=None,
dataset_manager_collection=dataset_manager_collection,
count=Count(),
tracker=ActiveOperationTracker(),
dataset_inquery_util=dataset_inquery_util,
)

Expand Down

0 comments on commit 76c8401

Please sign in to comment.