From 9d27ae89ae3ec00541d630988453c98a9bdffcd5 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 29 Jan 2024 11:55:00 -0500 Subject: [PATCH] Avoid temp dataset purge race in dataservice. Updating dataservice ServiceManager class to block processing of managing temporary datasets at all times when datasets may be marked as fulfilling data requirements, including updating the latter to also do linking to dataset users at the same time. --- .../dataservice/dmod/dataservice/service.py | 38 ++++++++++++++++--- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/python/services/dataservice/dmod/dataservice/service.py b/python/services/dataservice/dmod/dataservice/service.py index 54472836c..33afeb1d2 100644 --- a/python/services/dataservice/dmod/dataservice/service.py +++ b/python/services/dataservice/dmod/dataservice/service.py @@ -228,6 +228,8 @@ def __init__(self, job_util: JobUtil, *args, **kwargs): derive_util=self._data_derive_util) self._marked_expired_datasets: Set[str] = set() """ Names of expired datasets marked for deletion the next time through ::method:`manage_temp_datasets`. """ + self._is_fulfilling_requirements: int = 0 + """ Internal flag of whether any searches are being performed to fulfill requirements (to block purging). """ #self._dataset_users: Dict[UUID, DatasetUser] = dict() #""" Dataset user records maintained in memory by the instance (e.g., when a job has a dataset provisioned). """ @@ -917,15 +919,18 @@ async def manage_data_provision(self): for job in [j for j in self._job_util.get_all_active_jobs() if j.status_step == JobExecStep.AWAITING_DATA]: logging.debug("Managing provisioning for job {} that is awaiting data.".format(job.job_id)) try: + # Block temp dataset purging and maintenance while we handle things here + self._is_fulfilling_requirements += 1 # Derive any datasets as required reqs_w_derived_datasets = await self._data_derive_util.derive_datasets(job) logging.info('Job {} had {} datasets derived.'.format(job.job_id, len(reqs_w_derived_datasets))) - # Create or retrieve dataset user job wrapper instance and then link associated datasets + # Create or retrieve dataset user job wrapper instance and link associated, newly-derived datasets + # It should be the case that pre-existing datasets were linked when found to be fulfilling known_ds = self.get_known_datasets() job_uuid = UUID(job.job_id) job_ds_user = prior_users[job_uuid] if job_uuid in prior_users else JobDatasetUser(job_uuid) - for ds in (known_ds[req.fulfilled_by] for req in job.data_requirements if req.fulfilled_by): + for ds in (known_ds[req.fulfilled_by] for req in reqs_w_derived_datasets if req.fulfilled_by): job_ds_user.link_to_dataset(ds) # Initialize dataset Docker volumes required for a job @@ -935,6 +940,8 @@ async def manage_data_provision(self): job.set_status_step(JobExecStep.DATA_FAILURE) self._job_util.save_job(job) continue + finally: + self._is_fulfilling_requirements -= 1 job.set_status_step(JobExecStep.AWAITING_SCHEDULING) self._job_util.save_job(job) @@ -950,17 +957,23 @@ async def manage_temp_datasets(self) -> NoReturn: Async task for managing temporary datasets, including updating expire times and purging of expired datasets. """ while True: + # Ensure that mark and sweep doesn't proceed while something is potentially to linking datasets + while self._is_fulfilling_requirements > 0: + await asyncio.sleep(10) self._temp_dataset_mark_and_sweep() await asyncio.sleep(3600) async def perform_checks_for_job(self, job: Job) -> bool: """ - Check whether all requirements for this job can be fulfilled, setting the fulfillment associations. + Check whether all requirements for this job can be fulfilled, setting fulfillment associations and usage links. Check whether all the requirements for the provided job can be fulfilled, such that the job can move on to the - next successful step in the execution workflow. As part of the check, also update the ::class:`DataRequirement` - objects with the name of the fulfilling dataset and the location at which the dataset will be accessible to the - job. + next successful step in the execution workflow. Potentially do some other tracking and linking steps as well, + depending on whether the fulfilling dataset already exists. + + When an existing dataset is found to fulfill a ::class:`DataRequirement`, update the requirement object with the + name of the fulfilling dataset and the location at which the dataset will be accessible to the job. + Additionally, link the fulfilling dataset to a ::class:`JobDatasetUser` for the provided job. Parameters ---------- @@ -978,6 +991,14 @@ async def perform_checks_for_job(self, job: Job) -> bool: """ # TODO: (later) should we check whether any 'fulfilled_by' datasets exist, or handle this differently? try: + # Ensure here that we block mark-and-sweep routing for temporary datasets + self._is_fulfilling_requirements += 1 + + # Create/lookup dataset user job wrapper instance for this job + existing_ds_users: Dict[UUID, DatasetUser] = self._get_ds_users() + job_uuid = UUID(job.job_id) + job_ds_user = existing_ds_users.get(job_uuid, JobDatasetUser(job_uuid)) + for requirement in [req for req in job.data_requirements if req.fulfilled_by is None]: can_fulfill, dataset = await self._dataset_inquery_util.can_be_fulfilled(requirement=requirement, job=job) @@ -985,6 +1006,8 @@ async def perform_checks_for_job(self, job: Job) -> bool: logging.error("Cannot fulfill '{}' category data requirement".format(requirement.category.name)) return False elif dataset is not None: + # Link user when we've found an existing fulfilling dataset + job_ds_user.link_to_dataset(dataset=dataset) # TODO: (later) in the future, whether the job is running via Docker needs to be checked # TODO: also, whatever is done here needs to align with what is done within _create_output_dataset, # when creating the output data DataRequirement @@ -1000,3 +1023,6 @@ async def perform_checks_for_job(self, job: Job) -> bool: msg = "Encountered {} checking if job {} data requirements could be fulfilled - {}" logging.error(msg.format(e.__class__.__name__, job.job_id, str(e))) return False + finally: + # Unblock mark and sweep + self._is_fulfilling_requirements -= 1