Skip to content

Commit

Permalink
Update dataservice to support purge temp datasets.
Browse files Browse the repository at this point in the history
Updating service manager for dataservice with functions to support
purging expired temporary datasets.
  • Loading branch information
robertbartel committed Jan 25, 2024
1 parent a58a009 commit 4deb382
Showing 1 changed file with 95 additions and 3 deletions.
98 changes: 95 additions & 3 deletions python/services/dataservice/dmod/dataservice/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
import json
import os
from time import sleep as time_sleep
from datetime import datetime, timedelta
from docker.types import Healthcheck, RestartPolicy, ServiceMode
from dmod.communication import DatasetManagementMessage, DatasetManagementResponse, ManagementAction, WebSocketInterface
from dmod.communication.dataset_management_message import DatasetQuery, QueryType
from dmod.communication.data_transmit_message import DataTransmitMessage, DataTransmitResponse
from dmod.core.meta_data import DataCategory, DataDomain, DataFormat, DataRequirement, DiscreteRestriction, \
StandardDatasetIndex
from dmod.core.dataset import Dataset, DatasetManager, DatasetUser, DatasetType
from dmod.core.serializable import ResultIndicator, BasicResultIndicator
from dmod.core.exception import DmodRuntimeError
from dmod.modeldata.data.object_store_manager import Dataset, DatasetManager, DatasetType, ObjectStoreDatasetManager
from dmod.modeldata.data.object_store_manager import ObjectStoreDatasetManager
from dmod.modeldata.data.filesystem_manager import FilesystemDatasetManager
from dmod.scheduler import SimpleDockerUtil
from dmod.scheduler.job import Job, JobExecStep, JobUtil
Expand All @@ -21,6 +23,7 @@
from websockets import WebSocketServerProtocol
from .dataset_inquery_util import DatasetInqueryUtil
from .data_derive_util import DataDeriveUtil
from .dataset_user_impl import JobDatasetUser

import logging

Expand Down Expand Up @@ -223,6 +226,10 @@ def __init__(self, job_util: JobUtil, *args, **kwargs):
self._data_derive_util: DataDeriveUtil = DataDeriveUtil(data_mgrs_by_ds_type=self._all_data_managers)
self._dataset_inquery_util: DatasetInqueryUtil = DatasetInqueryUtil(data_mgrs_by_ds_type=self._all_data_managers,
derive_util=self._data_derive_util)
self._marked_expired_datasets: Set[str] = set()
""" Names of expired datasets marked for deletion the next time through ::method:`manage_temp_datasets`. """
#self._dataset_users: Dict[UUID, DatasetUser] = dict()
#""" Dataset user records maintained in memory by the instance (e.g., when a job has a dataset provisioned). """

def _add_manager(self, manager: DatasetManager):
"""
Expand Down Expand Up @@ -509,6 +516,18 @@ def _determine_dataset_type(self, message: DatasetManagementMessage) -> DatasetT
# TODO: (later) implement this correctly
return DatasetType.OBJECT_STORE

def _get_ds_users(self) -> Dict[UUID, DatasetUser]:
"""
Get dataset users from all associated managers of the service.
Returns
-------
Dict[UUID, DatasetUser]
Map of all ::class:`DatasetUser` from all associated managers of the service, keyed by the UUID of each.
"""
return {uuid: mgr.get_dataset_user(uuid)
for _, mgr in self._all_data_managers.values() for uuid in mgr.get_dataset_user_ids()}

def _process_dataset_create(self, message: DatasetManagementMessage) -> DatasetManagementResponse:
"""
As part of the communication protocol for the service, handle incoming messages that request dataset creation.
Expand Down Expand Up @@ -643,6 +662,59 @@ def _process_query(self, message: DatasetManagementMessage) -> DatasetManagement
reason = 'Unsupported {} Query Type - {}'.format(DatasetQuery.__class__.__name__, query_type.name)
return DatasetManagementResponse(action=message.management_action, success=False, reason=reason)

def _temp_dataset_mark_and_sweep(self):
"""
Encapsulation of the required behavior for a single iteration through ::method:`manage_temp_datasets`.
Method scans for in use temporary datasets, updating the expire times of each. It then deletes any expired
datasets marked for deletion already within the instance's private field. Finally, it updates the instance's
private field for marked-for-deletion datasets with any that are now expired (and thus will be removed on the
next invocation).
"""
# Get these upfront, since no meaningful changes can happen during the course of an iteration
temp_datasets = {ds_name: ds for ds_name, ds in self.get_known_datasets().values() if ds.is_temporary}

# Account for any in-use temporary datasets by potentially unmarking and updating expire time
for ds in (ds for _, ds in temp_datasets.values() if ds.manager.get_user_ids_for_dataset(ds)):
if ds.name in self._marked_expired_datasets:
self._marked_expired_datasets.remove(ds.name)

assert ds.expires is not None
ds.extend_life(timedelta(days=1) if ds.expires < (datetime.now()+timedelta(days=1)) else timedelta(hours=1))

# Delete any datasets previously marked for deletion
for ds in (temp_datasets.pop(ds_name) for ds_name in self._marked_expired_datasets):
ds.manager.delete(dataset=ds)

# Mark any expired datasets for deletion on the next iteration
self._marked_expired_datasets.update(n for n, ds in temp_datasets.values() if ds.expires > datetime.now())

def _unlink_finished_jobs(self, ds_users: Dict[UUID, DatasetUser]) -> Set[UUID]:
"""
Unlink dataset use for any provided job-based users for which the job is no longer active.
Unlink dataset use for any of the given dataset users that are specifically ::class`JobDatasetUser` and are
associated with a job that is not in the set of active jobs.
Parameters
----------
ds_users: Dict[UUID, DatasetUser]
A mapping of applicable dataset user objects, mapped by uuid; for the subset of interest, which are
::class`JobDatasetUser` instances, the key will also be the id of the associated job (as a ::class:`UUID`).
Returns
-------
Set[UUID]
Set of the UUIDs of all users associated with finished jobs for which unlinking was performed.
"""
finished_job_users: Set[UUID] = set()
active_job_ids = {UUID(j.job_id) for j in self._job_util.get_all_active_jobs()}
for user in (u for uid, u in ds_users.values() if isinstance(u, JobDatasetUser) and uid not in active_job_ids):
for ds in (self.get_known_datasets()[ds_name] for ds_name in user.datasets_and_managers):
user.unlink_to_dataset(ds)
finished_job_users.add(user.uuid)
return finished_job_users

def get_known_datasets(self) -> Dict[str, Dataset]:
"""
Get real-time mapping of all datasets known to this instance via its managers, in a map keyed by dataset name.
Expand Down Expand Up @@ -831,21 +903,31 @@ async def manage_required_data_checks(self):

async def manage_data_provision(self):
"""
Task method to periodically associate and (when needed) generate required datasets with/for jobs.
Task method to periodically associate, un-associate, and (when needed) generate required datasets with/for jobs.
"""
logging.debug("Starting task loop for performing data provisioning for requested jobs.")
while True:
lock_id = str(uuid4())
while not self._job_util.lock_active_jobs(lock_id):
await asyncio.sleep(2)

# Get any previously existing dataset users linked to any of the managers
prior_users: Dict[UUID, DatasetUser] = self._get_ds_users()

for job in [j for j in self._job_util.get_all_active_jobs() if j.status_step == JobExecStep.AWAITING_DATA]:
logging.debug("Managing provisioning for job {} that is awaiting data.".format(job.job_id))
try:
# Derive any datasets as required
reqs_w_derived_datasets = await self._data_derive_util.derive_datasets(job)
logging.info('Job {} had {} datasets derived.'.format(job.job_id, len(reqs_w_derived_datasets)))

# Create or retrieve dataset user job wrapper instance and then link associated datasets
known_ds = self.get_known_datasets()
job_uuid = UUID(job.job_id)
job_ds_user = prior_users[job_uuid] if job_uuid in prior_users else JobDatasetUser(job_uuid)
for ds in (known_ds[req.fulfilled_by] for req in job.data_requirements if req.fulfilled_by):
job_ds_user.link_to_dataset(ds)

# Initialize dataset Docker volumes required for a job
logging.debug('Initializing any required S3FS dataset volumes for {}'.format(job.job_id))
self._docker_s3fs_helper.init_volumes(job=job)
Expand All @@ -857,9 +939,20 @@ async def manage_data_provision(self):
job.set_status_step(JobExecStep.AWAITING_SCHEDULING)
self._job_util.save_job(job)

# Also, unlink usage for any previously existing, job-based users for which the job is no longer active ...
self._unlink_finished_jobs(ds_users=prior_users)

self._job_util.unlock_active_jobs(lock_id)
await asyncio.sleep(5)

async def manage_temp_datasets(self):
"""
Async task for managing temporary datasets, including updating expire times and purging of expired datasets.
"""
while True:
self._temp_dataset_mark_and_sweep()
await asyncio.sleep(3600)

async def perform_checks_for_job(self, job: Job) -> bool:
"""
Check whether all requirements for this job can be fulfilled, setting the fulfillment associations.
Expand Down Expand Up @@ -907,4 +1000,3 @@ async def perform_checks_for_job(self, job: Job) -> bool:
msg = "Encountered {} checking if job {} data requirements could be fulfilled - {}"
logging.error(msg.format(e.__class__.__name__, job.job_id, str(e)))
return False

0 comments on commit 4deb382

Please sign in to comment.