Skip to content

Commit

Permalink
Avoid temp dataset purge race in dataservice.
Browse files Browse the repository at this point in the history
Updating dataservice ServiceManager class to block processing of
managing temporary datasets at all times when datasets may be marked as
fulfilling data requirements, including updating the latter to also do
linking to dataset users at the same time.
  • Loading branch information
robertbartel authored and aaraney committed Jan 30, 2024
1 parent e9ae795 commit 9d27ae8
Showing 1 changed file with 32 additions and 6 deletions.
38 changes: 32 additions & 6 deletions python/services/dataservice/dmod/dataservice/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ def __init__(self, job_util: JobUtil, *args, **kwargs):
derive_util=self._data_derive_util)
self._marked_expired_datasets: Set[str] = set()
""" Names of expired datasets marked for deletion the next time through ::method:`manage_temp_datasets`. """
self._is_fulfilling_requirements: int = 0
""" Internal flag of whether any searches are being performed to fulfill requirements (to block purging). """
#self._dataset_users: Dict[UUID, DatasetUser] = dict()
#""" Dataset user records maintained in memory by the instance (e.g., when a job has a dataset provisioned). """

Expand Down Expand Up @@ -917,15 +919,18 @@ async def manage_data_provision(self):
for job in [j for j in self._job_util.get_all_active_jobs() if j.status_step == JobExecStep.AWAITING_DATA]:
logging.debug("Managing provisioning for job {} that is awaiting data.".format(job.job_id))
try:
# Block temp dataset purging and maintenance while we handle things here
self._is_fulfilling_requirements += 1
# Derive any datasets as required
reqs_w_derived_datasets = await self._data_derive_util.derive_datasets(job)
logging.info('Job {} had {} datasets derived.'.format(job.job_id, len(reqs_w_derived_datasets)))

# Create or retrieve dataset user job wrapper instance and then link associated datasets
# Create or retrieve dataset user job wrapper instance and link associated, newly-derived datasets
# It should be the case that pre-existing datasets were linked when found to be fulfilling
known_ds = self.get_known_datasets()
job_uuid = UUID(job.job_id)
job_ds_user = prior_users[job_uuid] if job_uuid in prior_users else JobDatasetUser(job_uuid)
for ds in (known_ds[req.fulfilled_by] for req in job.data_requirements if req.fulfilled_by):
for ds in (known_ds[req.fulfilled_by] for req in reqs_w_derived_datasets if req.fulfilled_by):
job_ds_user.link_to_dataset(ds)

# Initialize dataset Docker volumes required for a job
Expand All @@ -935,6 +940,8 @@ async def manage_data_provision(self):
job.set_status_step(JobExecStep.DATA_FAILURE)
self._job_util.save_job(job)
continue
finally:
self._is_fulfilling_requirements -= 1

job.set_status_step(JobExecStep.AWAITING_SCHEDULING)
self._job_util.save_job(job)
Expand All @@ -950,17 +957,23 @@ async def manage_temp_datasets(self) -> NoReturn:
Async task for managing temporary datasets, including updating expire times and purging of expired datasets.
"""
while True:
# Ensure that mark and sweep doesn't proceed while something is potentially to linking datasets
while self._is_fulfilling_requirements > 0:
await asyncio.sleep(10)
self._temp_dataset_mark_and_sweep()
await asyncio.sleep(3600)

async def perform_checks_for_job(self, job: Job) -> bool:
"""
Check whether all requirements for this job can be fulfilled, setting the fulfillment associations.
Check whether all requirements for this job can be fulfilled, setting fulfillment associations and usage links.
Check whether all the requirements for the provided job can be fulfilled, such that the job can move on to the
next successful step in the execution workflow. As part of the check, also update the ::class:`DataRequirement`
objects with the name of the fulfilling dataset and the location at which the dataset will be accessible to the
job.
next successful step in the execution workflow. Potentially do some other tracking and linking steps as well,
depending on whether the fulfilling dataset already exists.
When an existing dataset is found to fulfill a ::class:`DataRequirement`, update the requirement object with the
name of the fulfilling dataset and the location at which the dataset will be accessible to the job.
Additionally, link the fulfilling dataset to a ::class:`JobDatasetUser` for the provided job.
Parameters
----------
Expand All @@ -978,13 +991,23 @@ async def perform_checks_for_job(self, job: Job) -> bool:
"""
# TODO: (later) should we check whether any 'fulfilled_by' datasets exist, or handle this differently?
try:
# Ensure here that we block mark-and-sweep routing for temporary datasets
self._is_fulfilling_requirements += 1

# Create/lookup dataset user job wrapper instance for this job
existing_ds_users: Dict[UUID, DatasetUser] = self._get_ds_users()
job_uuid = UUID(job.job_id)
job_ds_user = existing_ds_users.get(job_uuid, JobDatasetUser(job_uuid))

for requirement in [req for req in job.data_requirements if req.fulfilled_by is None]:
can_fulfill, dataset = await self._dataset_inquery_util.can_be_fulfilled(requirement=requirement,
job=job)
if not can_fulfill:
logging.error("Cannot fulfill '{}' category data requirement".format(requirement.category.name))
return False
elif dataset is not None:
# Link user when we've found an existing fulfilling dataset
job_ds_user.link_to_dataset(dataset=dataset)
# TODO: (later) in the future, whether the job is running via Docker needs to be checked
# TODO: also, whatever is done here needs to align with what is done within _create_output_dataset,
# when creating the output data DataRequirement
Expand All @@ -1000,3 +1023,6 @@ async def perform_checks_for_job(self, job: Job) -> bool:
msg = "Encountered {} checking if job {} data requirements could be fulfilled - {}"
logging.error(msg.format(e.__class__.__name__, job.job_id, str(e)))
return False
finally:
# Unblock mark and sweep
self._is_fulfilling_requirements -= 1

0 comments on commit 9d27ae8

Please sign in to comment.