diff --git a/python/services/dataservice/dmod/dataservice/__main__.py b/python/services/dataservice/dmod/dataservice/__main__.py index 1d1d84bd8..f4ed16c17 100644 --- a/python/services/dataservice/dmod/dataservice/__main__.py +++ b/python/services/dataservice/dmod/dataservice/__main__.py @@ -6,14 +6,131 @@ ) import argparse -from . import name as package_name -from .service import ServiceManager -from .service_settings import ServiceSettings -from .dataset_manager_collection import DatasetManagerCollection -from dmod.scheduler.job import DefaultJobUtilFactory from pathlib import Path from socket import gethostname +from dmod.scheduler.job import DefaultJobUtilFactory +from dmod.modeldata.data.filesystem_manager import FilesystemDatasetManager +from dmod.modeldata.data.object_store_manager import ObjectStoreDatasetManager +from dmod.core.dataset import DatasetType + +from . import name as package_name +from .data_derive_util import DataDeriveUtil +from .dataset_inquery_util import DatasetInqueryUtil +from .dataset_manager_collection import DatasetManagerCollection +from .service import ( + ActiveOperationTracker, + DataProvisionManager, + RequiredDataChecksManager, + ServiceManager, + TempDataTaskManager, + DockerS3FSPluginHelper, +) +from .service_settings import ServiceSettings + + +def main(): + args = _handle_args() + + if args.pycharm_debug: + _setup_remote_debugging(args) + else: + logging.info("Skipping data service remote debugging setup.") + + listen_host = gethostname() if args.host is None else args.host + # Flip this here to be less confusing + use_obj_store = not args.no_obj_store + # TODO: DataProvisionManager requires object store, so this is currently required. Add functionality to allow usage + # without object store. fail fast for now. + assert use_obj_store, "Object store is currently required" + + secrets_dir = Path('/run/secrets') + + # Figure out Redis password, trying for a Docker secret first + if args.redis_pass_secret is not None: + redis_pass_secret_file = secrets_dir.joinpath(args.redis_pass_secret) + redis_pass = redis_pass_secret_file.read_text().strip() + else: + redis_pass = args.redis_pass + + # Initialize objects that will be injected and shared by service subsystems + service_settings = ServiceSettings() + # Initialize a job util via the default factory, which requires some Redis params + job_util = DefaultJobUtilFactory.factory_create(redis_host=args.redis_host, redis_port=args.redis_port, + redis_pass=redis_pass) + # Datasets creation and access go through this object + dataset_manager_collection = DatasetManagerCollection() + data_derive_util = DataDeriveUtil(dataset_manager_collection=dataset_manager_collection) + dataset_inquery_util = DatasetInqueryUtil(dataset_manager_collection=dataset_manager_collection, derive_util=data_derive_util) + + # TODO: need to adjust arg groupings to allow for this to be cleaned up some + access_key = ( + None + if args.obj_store_access_key is None + else (secrets_dir / args.obj_store_access_key).read_text().strip() + ) + secret_key = ( + None + if args.obj_store_secret_key is None + else (secrets_dir / args.obj_store_secret_key).read_text().strip() + ) + object_store_dataset_manager = _init_object_store_dataset_manager( + obj_store_host=args.obj_store_host, + port=args.obj_store_port, + access_key=access_key, + secret_key=secret_key, + ) + dataset_manager_collection.add(object_store_dataset_manager) + + docker_s3fs_plugin_helper = _init_docker_s3fs_plugin_helper( + dataset_manager_collection=dataset_manager_collection, + access_key=access_key, + secret_key=secret_key, + settings=service_settings, + ) + + if args.file_dataset_config_dir is not None: + filesystem_dataset_manager = _init_filesystem_dataset_manager( + Path(args.file_dataset_config_dir) + ) + dataset_manager_collection.add(filesystem_dataset_manager) + + + # count is used to signal when it is okay to remove temporary datasets + count = ActiveOperationTracker() + + # initialize background task objects + required_data_checks_manager = RequiredDataChecksManager( + job_util=job_util, + dataset_manager_collection=dataset_manager_collection, + checks_underway_tracker=count, + dataset_inquery_util=dataset_inquery_util, + ) + data_provision_manager = DataProvisionManager(job_util=job_util, + dataset_manager_collection=dataset_manager_collection, + docker_s3fs_helper=docker_s3fs_plugin_helper, + data_derive_util=data_derive_util, + provision_underway_tracker=count, + ) + temp_datasets_manager = TempDataTaskManager(dataset_manager_collection=dataset_manager_collection, safe_to_exec_tracker=count) + + # Handles websocket communication and async task loop + service_manager = ServiceManager( + job_util=job_util, + listen_host=listen_host, + port=args.port, + ssl_dir=Path(args.ssl_dir), + dataset_manager_collection=dataset_manager_collection, + dataset_inquery_util=dataset_inquery_util, + ) + + # Setup other required async tasks + service_manager.add_async_task(required_data_checks_manager.start()) + service_manager.add_async_task(data_provision_manager.start()) + service_manager.add_async_task(temp_datasets_manager.start()) + + service_manager.run() + def _handle_args(): parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) @@ -117,62 +234,70 @@ def _setup_remote_debugging(args: argparse.Namespace): msg = 'Warning: could not set debugging trace to {} on {} due to {} - {}' print(msg.format(args.remote_debug_host, args.remote_debug_port, error.__class__.__name__, str(error))) -def main(): - args = _handle_args() - - if args.pycharm_debug: - _setup_remote_debugging(args) - else: - logging.info("Skipping data service remote debugging setup.") - listen_host = gethostname() if args.host is None else args.host - # Flip this here to be less confusing - use_obj_store = not args.no_obj_store +def _init_filesystem_dataset_manager( + file_dataset_config_dir: Path, +) -> FilesystemDatasetManager: + logging.info( + "Initializing manager for {} type datasets".format(DatasetType.FILESYSTEM.name) + ) + mgr = FilesystemDatasetManager(serialized_files_directory=file_dataset_config_dir) + logging.info( + "{} initialized with {} existing datasets".format( + mgr.__class__.__name__, len(mgr.datasets) + ) + ) + return mgr - secrets_dir = Path('/run/secrets') - # Figure out Redis password, trying for a Docker secret first - if args.redis_pass_secret is not None: - redis_pass_secret_file = secrets_dir.joinpath(args.redis_pass_secret) - redis_pass = redis_pass_secret_file.read_text().strip() - else: - redis_pass = args.redis_pass +def _init_object_store_dataset_manager( + obj_store_host: str, access_key: str, secret_key: str, port: int = 9000 +) -> ObjectStoreDatasetManager: + host_str = "{}:{}".format(obj_store_host, port) + logging.info("Initializing object store dataset manager at {}".format(host_str)) + mgr = ObjectStoreDatasetManager( + obj_store_host_str=host_str, access_key=access_key, secret_key=secret_key + ) + logging.info( + "Object store dataset manager initialized with {} existing datasets".format( + len(mgr.datasets) + ) + ) + return mgr - service_settings = ServiceSettings() - # Initialize a job util via the default factory, which requires some Redis params - job_util = DefaultJobUtilFactory.factory_create(redis_host=args.redis_host, redis_port=args.redis_port, - redis_pass=redis_pass) +def _init_docker_s3fs_plugin_helper( + dataset_manager_collection: DatasetManagerCollection, + access_key: str, + secret_key: str, + settings: ServiceSettings, + *args, + **kwargs +) -> DockerS3FSPluginHelper: + s3fs_url_proto = settings.s3fs_url_protocol + s3fs_url_host = settings.s3fs_url_host + s3fs_url_port = settings.s3fs_url_port + if s3fs_url_host is not None: + s3fs_helper_url = "{}://{}:{}/".format( + s3fs_url_proto, s3fs_url_host, s3fs_url_port + ) + else: + s3fs_helper_url = None - dataset_manager_collection = DatasetManagerCollection() - # Initiate a service manager WebsocketHandler implementation for primary messaging and async task loops - service_manager = ServiceManager( - job_util=job_util, - listen_host=listen_host, - port=args.port, - ssl_dir=Path(args.ssl_dir), - settings=service_settings, - dataset_manager_collection=dataset_manager_collection, + docker_s3fs_helper = DockerS3FSPluginHelper( + service_manager=dataset_manager_collection, + obj_store_access=access_key, + obj_store_secret=secret_key, + docker_image_name=settings.s3fs_vol_image_name, + docker_image_tag=settings.s3fs_vol_image_tag, + docker_networks=[settings.s3fs_helper_network], + docker_plugin_alias=settings.s3fs_plugin_alias, + obj_store_url=s3fs_helper_url, + *args, + **kwargs ) + return docker_s3fs_helper - # If we are set to use the object store ... - if use_obj_store: - # TODO: need to adjust arg groupings to allow for this to be cleaned up some - access_key_file = None if args.obj_store_access_key is None else secrets_dir.joinpath(args.obj_store_access_key) - secret_key_file = None if args.obj_store_secret_key is None else secrets_dir.joinpath(args.obj_store_secret_key) - service_manager.init_object_store_dataset_manager(obj_store_host=args.obj_store_host, - port=args.obj_store_port, - access_key=access_key_file.read_text().strip(), - secret_key=secret_key_file.read_text().strip()) - if args.file_dataset_config_dir is not None: - service_manager.init_filesystem_dataset_manager(Path(args.file_dataset_config_dir)) - - # Setup other required async tasks - service_manager.add_async_task(service_manager.manage_required_data_checks()) - service_manager.add_async_task(service_manager.manage_data_provision()) - service_manager.add_async_task(service_manager.manage_temp_datasets()) - - service_manager.run() if __name__ == '__main__': diff --git a/python/services/dataservice/dmod/dataservice/_version.py b/python/services/dataservice/dmod/dataservice/_version.py index ef72cc0f1..4ca39e7ce 100644 --- a/python/services/dataservice/dmod/dataservice/_version.py +++ b/python/services/dataservice/dmod/dataservice/_version.py @@ -1 +1 @@ -__version__ = '0.8.1' +__version__ = '0.8.2' diff --git a/python/services/dataservice/dmod/dataservice/service.py b/python/services/dataservice/dmod/dataservice/service.py index d0f89c526..44c3316e8 100644 --- a/python/services/dataservice/dmod/dataservice/service.py +++ b/python/services/dataservice/dmod/dataservice/service.py @@ -191,7 +191,6 @@ def to_nanoseconds(seconds: int): retries=5, start_period=to_nanoseconds(seconds=5)) - class ServiceManager(WebSocketInterface): """ Primary service management class. @@ -216,25 +215,14 @@ def __init__( self, job_util: JobUtil, *args, - settings: ServiceSettings, dataset_manager_collection: DatasetManagerCollection, + dataset_inquery_util: DatasetInqueryUtil, **kwargs ): super().__init__(*args, **kwargs) self._job_util = job_util - self._settings: ServiceSettings = settings self._managers: DatasetManagerCollection = dataset_manager_collection - """ Map of dataset class type (key), to service's dataset manager (value) for handling that dataset type. """ - self._docker_s3fs_helper = None - self._data_derive_util: DataDeriveUtil = DataDeriveUtil(dataset_manager_collection=self._managers) - self._dataset_inquery_util: DatasetInqueryUtil = DatasetInqueryUtil(dataset_manager_collection=self._managers, - derive_util=self._data_derive_util) - self._marked_expired_datasets: Set[str] = set() - """ Names of expired datasets marked for deletion the next time through ::method:`manage_temp_datasets`. """ - self._is_fulfilling_requirements: int = 0 - """ Internal flag of whether any searches are being performed to fulfill requirements (to block purging). """ - #self._dataset_users: Dict[UUID, DatasetUser] = dict() - #""" Dataset user records maintained in memory by the instance (e.g., when a job has a dataset provisioned). """ + self._dataset_inquery_util: DatasetInqueryUtil = dataset_inquery_util async def _async_process_add_data(self, dataset_name: str, dest_item_name: str, message: DataTransmitMessage, manager: DatasetManager, is_temp: bool = False) -> Union[DataTransmitResponse, @@ -414,56 +402,6 @@ async def _async_process_query(self, message: DatasetManagementMessage) -> Datas """ return self._process_query(message) - def _create_output_datasets(self, job: Job): - """ - Create output datasets and associated requirements for this job, based on its ::method:`Job.output_formats`. - - Create empty output datasets and the associated ::class:`DataRequirement` instances for this job, corresponding - to the output dataset formats listed in the job's ::method:`Job.output_formats` property. The values in this - property are iterated through by list index to be able to reuse the index value for dataset name, as noted - below. - - Datasets will be named as ``job--output-``, where ```` is the index of the - corresponding value in ::method:`Job.output_formats`. - - Parameters - ---------- - job : Job - The job for which to create output datasets. - """ - for i in range(len(job.model_request.output_formats)): - - id_restrict = DiscreteRestriction(variable=StandardDatasetIndex.ELEMENT_ID, values=[]) - - time_range = None - for data_domain in [req.domain for req in job.data_requirements if req.category == DataCategory.FORCING]: - time_restrictions = [r for k, r in data_domain.continuous_restrictions.items() if r.variable == 'Time'] - if len(time_restrictions) > 0: - time_range = time_restrictions[0] - break - - # TODO: (later) more intelligently determine type - mgr = self._managers.manager(DatasetType.OBJECT_STORE) - dataset = mgr.create(name='job-{}-output-{}'.format(job.job_id, i), - is_read_only=False, - category=DataCategory.OUTPUT, - domain=DataDomain(data_format=job.model_request.output_formats[i], - continuous_restrictions=None if time_range is None else [time_range], - discrete_restrictions=[id_restrict])) - # TODO: (later) in the future, whether the job is running via Docker needs to be checked - # TODO: also, whatever is done here needs to align with what is done within perform_checks_for_job, when - # setting the fulfilled_access_at for the DataRequirement - is_job_run_in_docker = True - if is_job_run_in_docker: - output_access_at = dataset.docker_mount - else: - msg = "Could not determine proper access location for new output dataset of type {} by non-Docker job {}." - raise DmodRuntimeError(msg.format(dataset.__class__.__name__, job.job_id)) - # Create a data requirement for the job, fulfilled by the new dataset - requirement = DataRequirement(domain=dataset.data_domain, is_input=False, category=DataCategory.OUTPUT, - fulfilled_by=dataset.name, fulfilled_access_at=output_access_at) - job.data_requirements.append(requirement) - def _determine_dataset_type(self, message: DatasetManagementMessage) -> DatasetType: """ Determine the right type of dataset for this situation. @@ -482,18 +420,6 @@ def _determine_dataset_type(self, message: DatasetManagementMessage) -> DatasetT # TODO: (later) implement this correctly return DatasetType.OBJECT_STORE - def _get_ds_users(self) -> Dict[UUID, DatasetUser]: - """ - Get dataset users from all associated managers of the service. - - Returns - ------- - Dict[UUID, DatasetUser] - Map of all ::class:`DatasetUser` from all associated managers of the service, keyed by the UUID of each. - """ - return {uuid: mgr.get_dataset_user(uuid) - for _, mgr in self._managers.managers() for uuid in mgr.get_dataset_user_ids()} - def _process_dataset_create(self, message: DatasetManagementMessage) -> DatasetManagementResponse: """ As part of the communication protocol for the service, handle incoming messages that request dataset creation. @@ -650,96 +576,6 @@ def _process_query(self, message: DatasetManagementMessage) -> DatasetManagement reason = 'Unsupported {} Query Type - {}'.format(DatasetQuery.__class__.__name__, query_type.name) return DatasetManagementResponse(action=message.management_action, success=False, reason=reason) - def _temp_dataset_mark_and_sweep(self): - """ - Encapsulation of the required behavior for a single iteration through ::method:`manage_temp_datasets`. - - Method scans for in use temporary datasets, updating the expire times of each. It then deletes any expired - datasets marked for deletion already within the instance's private field. Finally, it updates the instance's - private field for marked-for-deletion datasets with any that are now expired (and thus will be removed on the - next invocation). - """ - # Get these upfront, since no meaningful changes can happen during the course of an iteration - temp_datasets = {ds_name: ds for ds_name, ds in self._managers.known_datasets().items() if ds.is_temporary} - - # Account for any in-use temporary datasets by potentially unmarking and updating expire time - for ds in (ds for _, ds in temp_datasets.items() if ds.manager.get_user_ids_for_dataset(ds)): - if ds.name in self._marked_expired_datasets: - self._marked_expired_datasets.remove(ds.name) - - assert ds.expires is not None - ds.extend_life(timedelta(days=1) if ds.expires < (datetime.now()+timedelta(days=1)) else timedelta(hours=1)) - - # Delete any datasets previously marked for deletion - for ds in (temp_datasets.pop(ds_name) for ds_name in self._marked_expired_datasets): - ds.manager.delete(dataset=ds) - - # Mark any expired datasets for deletion on the next iteration - self._marked_expired_datasets.update(n for n, ds in temp_datasets.items() if ds.expires > datetime.now()) - - def _unlink_finished_jobs(self, ds_users: Dict[UUID, DatasetUser]) -> Set[UUID]: - """ - Unlink dataset use for any provided job-based users for which the job is no longer active. - - Unlink dataset use for any of the given dataset users that are specifically ::class`JobDatasetUser` and are - associated with a job that is not in the set of active jobs. - - Parameters - ---------- - ds_users: Dict[UUID, DatasetUser] - A mapping of applicable dataset user objects, mapped by uuid; for the subset of interest, which are - ::class`JobDatasetUser` instances, the key will also be the id of the associated job (as a ::class:`UUID`). - - Returns - ------- - Set[UUID] - Set of the UUIDs of all users associated with finished jobs for which unlinking was performed. - """ - finished_job_users: Set[UUID] = set() - active_job_ids = {UUID(j.job_id) for j in self._job_util.get_all_active_jobs()} - for user in (u for uid, u in ds_users.items() if isinstance(u, JobDatasetUser) and uid not in active_job_ids): - for ds in (self._managers.known_datasets()[ds_name] for ds_name in user.datasets_and_managers): - user.unlink_to_dataset(ds) - finished_job_users.add(user.uuid) - return finished_job_users - - def init_filesystem_dataset_manager(self, file_dataset_config_dir: Path): - logging.info("Initializing manager for {} type datasets".format(DatasetType.FILESYSTEM.name)) - mgr = FilesystemDatasetManager(serialized_files_directory=file_dataset_config_dir) - logging.info("{} initialized with {} existing datasets".format(mgr.__class__.__name__, len(mgr.datasets))) - self._managers.add(mgr) - self._filesystem_data_mgr = mgr - - def init_object_store_dataset_manager(self, obj_store_host: str, access_key: str, secret_key: str, port: int = 9000, - *args, **kwargs): - host_str = '{}:{}'.format(obj_store_host, port) - logging.info("Initializing object store dataset manager at {}".format(host_str)) - mgr = ObjectStoreDatasetManager(obj_store_host_str=host_str, access_key=access_key, secret_key=secret_key) - logging.info("Object store dataset manager initialized with {} existing datasets".format(len(mgr.datasets))) - self._managers.add(mgr) - self._obj_store_data_mgr = mgr - - self._obj_store_access_key = access_key - self._obj_store_secret_key = secret_key - - s3fs_url_proto = self._settings.s3fs_url_protocol - s3fs_url_host = self._settings.s3fs_url_host - s3fs_url_port = self._settings.s3fs_url_port - if s3fs_url_host is not None: - s3fs_helper_url = '{}://{}:{}/'.format(s3fs_url_proto, s3fs_url_host, s3fs_url_port) - else: - s3fs_helper_url = None - - self._docker_s3fs_helper = DockerS3FSPluginHelper(service_manager=self, - obj_store_access=self._obj_store_access_key, - obj_store_secret=self._obj_store_secret_key, - docker_image_name=self._settings.s3fs_vol_image_name, - docker_image_tag=self._settings.s3fs_vol_image_tag, - docker_networks=[self._settings.s3fs_helper_network], - docker_plugin_alias=self._settings.s3fs_plugin_alias, - obj_store_url=s3fs_helper_url, - *args, **kwargs) - async def listener(self, websocket: WebSocketServerProtocol, path): """ Process incoming messages over the websocket and respond appropriately. @@ -765,6 +601,7 @@ async def listener(self, websocket: WebSocketServerProtocol, path): # TODO: need to refactor this to be cleaner # Write data to temporary, partial item name, then after the last one, combine all the temps in this # transmit series into a single file + assert dataset_manager is not None, "Dataset manager should not be 'None' at this point" partial_item_name = '{}.{}.{}'.format(transmit_series_uuid, dest_item_name, partial_indx) response = await self._async_process_add_data(dataset_name=dest_dataset_name, dest_item_name=partial_item_name, @@ -831,7 +668,40 @@ async def listener(self, websocket: WebSocketServerProtocol, path): except Exception as e: logging.error("Encountered error: {}".format(str(e))) - async def manage_required_data_checks(self): + + +class RequiredDataChecksManager: + """ + Async task that periodically examines whether required data for jobs is available. + Start the task by calling ::method:`start`. + + Parameters + ---------- + job_util : JobUtil + Access and update jobs + dataset_manager_collection : DatasetManagerCollection + Facilitates creating and accessing Datasets + checks_underway_tracker : ActiveOperationTracker + Semaphore-like object for signaling that data checks are underway + dataset_inquery_util : DatasetInqueryUtil + Facilitates dataset detail queries and searches + """ + def __init__( + self, + job_util: JobUtil, + dataset_manager_collection: DatasetManagerCollection, + checks_underway_tracker: "ActiveOperationTracker", + dataset_inquery_util: DatasetInqueryUtil, + ): + self._job_util = job_util + self._managers = dataset_manager_collection + self._checks_underway_tracker = checks_underway_tracker + self._dataset_inquery_util: DatasetInqueryUtil = dataset_inquery_util + + async def start(self) -> NoReturn: + await self._manage_required_data_checks() + + async def _manage_required_data_checks(self): """ Task method to periodically examine whether required data for jobs is available. @@ -870,65 +740,56 @@ async def manage_required_data_checks(self): self._job_util.unlock_active_jobs(lock_id) await asyncio.sleep(5) - async def manage_data_provision(self): - """ - Task method to periodically associate, un-associate, and (when needed) generate required datasets with/for jobs. + def _create_output_datasets(self, job: Job): """ - logging.debug("Starting task loop for performing data provisioning for requested jobs.") - while True: - lock_id = str(uuid4()) - while not self._job_util.lock_active_jobs(lock_id): - await asyncio.sleep(2) - - # Get any previously existing dataset users linked to any of the managers - prior_users: Dict[UUID, DatasetUser] = self._get_ds_users() - - for job in [j for j in self._job_util.get_all_active_jobs() if j.status_step == JobExecStep.AWAITING_DATA]: - logging.debug("Managing provisioning for job {} that is awaiting data.".format(job.job_id)) - try: - # Block temp dataset purging and maintenance while we handle things here - self._is_fulfilling_requirements += 1 - # Derive any datasets as required - reqs_w_derived_datasets = await self._data_derive_util.derive_datasets(job) - logging.info('Job {} had {} datasets derived.'.format(job.job_id, len(reqs_w_derived_datasets))) + Create output datasets and associated requirements for this job, based on its ::method:`Job.output_formats`. - # Create or retrieve dataset user job wrapper instance and link associated, newly-derived datasets - # It should be the case that pre-existing datasets were linked when found to be fulfilling - known_ds = self._managers.known_datasets() - job_uuid = UUID(job.job_id) - job_ds_user = prior_users[job_uuid] if job_uuid in prior_users else JobDatasetUser(job_uuid) - for ds in (known_ds[req.fulfilled_by] for req in reqs_w_derived_datasets if req.fulfilled_by): - job_ds_user.link_to_dataset(ds) + Create empty output datasets and the associated ::class:`DataRequirement` instances for this job, corresponding + to the output dataset formats listed in the job's ::method:`Job.output_formats` property. The values in this + property are iterated through by list index to be able to reuse the index value for dataset name, as noted + below. - # Initialize dataset Docker volumes required for a job - logging.debug('Initializing any required S3FS dataset volumes for {}'.format(job.job_id)) - self._docker_s3fs_helper.init_volumes(job=job) - except Exception as e: - job.set_status_step(JobExecStep.DATA_FAILURE) - self._job_util.save_job(job) - continue - finally: - self._is_fulfilling_requirements -= 1 + Datasets will be named as ``job--output-``, where ```` is the index of the + corresponding value in ::method:`Job.output_formats`. - job.set_status_step(JobExecStep.AWAITING_SCHEDULING) - self._job_util.save_job(job) + Parameters + ---------- + job : Job + The job for which to create output datasets. + """ + # TODO: aaraney harden + for i in range(len(job.model_request.output_formats)): - # Also, unlink usage for any previously existing, job-based users for which the job is no longer active ... - self._unlink_finished_jobs(ds_users=prior_users) + id_restrict = DiscreteRestriction(variable=StandardDatasetIndex.ELEMENT_ID, values=[]) - self._job_util.unlock_active_jobs(lock_id) - await asyncio.sleep(5) + time_range = None + for data_domain in [req.domain for req in job.data_requirements if req.category == DataCategory.FORCING]: + time_restrictions = [r for k, r in data_domain.continuous_restrictions.items() if r.variable == 'Time'] + if len(time_restrictions) > 0: + time_range = time_restrictions[0] + break - async def manage_temp_datasets(self) -> NoReturn: - """ - Async task for managing temporary datasets, including updating expire times and purging of expired datasets. - """ - while True: - # Ensure that mark and sweep doesn't proceed while something is potentially to linking datasets - while self._is_fulfilling_requirements > 0: - await asyncio.sleep(10) - self._temp_dataset_mark_and_sweep() - await asyncio.sleep(3600) + # TODO: (later) more intelligently determine type + mgr = self._managers.manager(DatasetType.OBJECT_STORE) + dataset = mgr.create(name='job-{}-output-{}'.format(job.job_id, i), + is_read_only=False, + category=DataCategory.OUTPUT, + domain=DataDomain(data_format=job.model_request.output_formats[i], + continuous_restrictions=None if time_range is None else [time_range], + discrete_restrictions=[id_restrict])) + # TODO: (later) in the future, whether the job is running via Docker needs to be checked + # TODO: also, whatever is done here needs to align with what is done within perform_checks_for_job, when + # setting the fulfilled_access_at for the DataRequirement + is_job_run_in_docker = True + if is_job_run_in_docker: + output_access_at = dataset.docker_mount + else: + msg = "Could not determine proper access location for new output dataset of type {} by non-Docker job {}." + raise DmodRuntimeError(msg.format(dataset.__class__.__name__, job.job_id)) + # Create a data requirement for the job, fulfilled by the new dataset + requirement = DataRequirement(domain=dataset.data_domain, is_input=False, category=DataCategory.OUTPUT, + fulfilled_by=dataset.name, fulfilled_access_at=output_access_at) + job.data_requirements.append(requirement) async def perform_checks_for_job(self, job: Job) -> bool: """ @@ -957,12 +818,11 @@ async def perform_checks_for_job(self, job: Job) -> bool: ::method:`can_be_fulfilled` """ # TODO: (later) should we check whether any 'fulfilled_by' datasets exist, or handle this differently? + # Ensure here that we block mark-and-sweep routing for temporary datasets + self._checks_underway_tracker.acquire() try: - # Ensure here that we block mark-and-sweep routing for temporary datasets - self._is_fulfilling_requirements += 1 - # Create/lookup dataset user job wrapper instance for this job - existing_ds_users: Dict[UUID, DatasetUser] = self._get_ds_users() + existing_ds_users: Dict[UUID, DatasetUser] = _get_ds_users(self._managers) job_uuid = UUID(job.job_id) job_ds_user = existing_ds_users.get(job_uuid, JobDatasetUser(job_uuid)) @@ -992,4 +852,220 @@ async def perform_checks_for_job(self, job: Job) -> bool: return False finally: # Unblock mark and sweep - self._is_fulfilling_requirements -= 1 + self._checks_underway_tracker.release() + +class TempDataTaskManager: + """ + Async task that purges and prolongs the expiration of temporary datasets. + + Start the task by calling ::method:`start`. + + Parameters + ---------- + dataset_manager_collection : DatasetManagerCollection + Facilitates creating and accessing Datasets + safe_to_exec_tracker : ActiveOperationTracker + Used to determine if it is okay to purge or prolong temporary datasets + """ + + def __init__(self, dataset_manager_collection: DatasetManagerCollection, safe_to_exec_tracker: "ActiveOperationTracker"): + self._safe_to_exec_tracker = safe_to_exec_tracker + self._managers = dataset_manager_collection + + self._marked_expired_datasets: Set[str] = set() + """ Names of expired datasets marked for deletion the next time through ::method:`manage_temp_datasets`. """ + + async def start(self) -> NoReturn: + await self._manage_temp_datasets() + + async def _manage_temp_datasets(self) -> NoReturn: + """ + Async task for managing temporary datasets, including updating expire times and purging of expired datasets. + """ + while True: + # Ensure that mark and sweep doesn't proceed while something is potentially to linking datasets + while self._safe_to_exec_tracker.value > 0: + await asyncio.sleep(10) + self._temp_dataset_mark_and_sweep() + await asyncio.sleep(3600) + + def _temp_dataset_mark_and_sweep(self): + """ + Encapsulation of the required behavior for a single iteration through ::method:`manage_temp_datasets`. + + Method scans for in use temporary datasets, updating the expire times of each. It then deletes any expired + datasets marked for deletion already within the instance's private field. Finally, it updates the instance's + private field for marked-for-deletion datasets with any that are now expired (and thus will be removed on the + next invocation). + """ + # Get these upfront, since no meaningful changes can happen during the course of an iteration + temp_datasets = {ds_name: ds for ds_name, ds in self._managers.known_datasets().items() if ds.is_temporary} + + # Account for any in-use temporary datasets by potentially unmarking and updating expire time + for ds in (ds for _, ds in temp_datasets.items() if ds.manager.get_user_ids_for_dataset(ds)): + if ds.name in self._marked_expired_datasets: + self._marked_expired_datasets.remove(ds.name) + + assert ds.expires is not None + ds.extend_life(timedelta(days=1) if ds.expires < (datetime.now()+timedelta(days=1)) else timedelta(hours=1)) + + # Delete any datasets previously marked for deletion + for ds in (temp_datasets.pop(ds_name) for ds_name in self._marked_expired_datasets): + ds.manager.delete(dataset=ds) + + # Mark any expired datasets for deletion on the next iteration + self._marked_expired_datasets.update(n for n, ds in temp_datasets.items() if ds.expires > datetime.now()) + +class DataProvisionManager: + """ + Task method to periodically associate, un-associate, and (when needed) generate required datasets with/for jobs. + Start the task by calling ::method:`start`. + + Parameters + ---------- + job_util : JobUtil + Access and update jobs + dataset_manager_collection : DatasetManagerCollection + Facilitates creating and accessing Datasets + docker_s3fs_helper : DockerS3FSPluginHelper + Facilitates initialize Docker volumes for jobs + data_derive_util : DataDeriveUtil + Facilitates deriving data and datasets + provision_underway_tracker: ActiveOperationTracker + Semaphore-like object for signaling that provisioning is underway + """ + def __init__( + self, + job_util: JobUtil, + dataset_manager_collection: DatasetManagerCollection, + docker_s3fs_helper: DockerS3FSPluginHelper, + data_derive_util: DataDeriveUtil, + provision_underway_tracker: "ActiveOperationTracker", + ): + self._job_util = job_util + self._managers = dataset_manager_collection + self._docker_s3fs_helper = docker_s3fs_helper + self._provision_underway_tracker = provision_underway_tracker + self._data_derive_util: DataDeriveUtil = data_derive_util + + async def start(self) -> NoReturn: + await self._manage_data_provision() + + async def _manage_data_provision(self): + """ + Task method to periodically associate, un-associate, and (when needed) generate required datasets with/for jobs. + """ + logging.debug("Starting task loop for performing data provisioning for requested jobs.") + while True: + lock_id = str(uuid4()) + while not self._job_util.lock_active_jobs(lock_id): + await asyncio.sleep(2) + + # Get any previously existing dataset users linked to any of the managers + prior_users: Dict[UUID, DatasetUser] = _get_ds_users(self._managers) + + for job in [j for j in self._job_util.get_all_active_jobs() if j.status_step == JobExecStep.AWAITING_DATA]: + logging.debug("Managing provisioning for job {} that is awaiting data.".format(job.job_id)) + try: + # Block temp dataset purging and maintenance while we handle things here + self._provision_underway_tracker.acquire() + # Derive any datasets as required + reqs_w_derived_datasets = await self._data_derive_util.derive_datasets(job) + logging.info('Job {} had {} datasets derived.'.format(job.job_id, len(reqs_w_derived_datasets))) + + # Create or retrieve dataset user job wrapper instance and link associated, newly-derived datasets + # It should be the case that pre-existing datasets were linked when found to be fulfilling + known_ds = self._managers.known_datasets() + job_uuid = UUID(job.job_id) + job_ds_user = prior_users[job_uuid] if job_uuid in prior_users else JobDatasetUser(job_uuid) + for ds in (known_ds[req.fulfilled_by] for req in reqs_w_derived_datasets if req.fulfilled_by): + job_ds_user.link_to_dataset(ds) + + # Initialize dataset Docker volumes required for a job + logging.debug('Initializing any required S3FS dataset volumes for {}'.format(job.job_id)) + self._docker_s3fs_helper.init_volumes(job=job) + except Exception: + job.set_status_step(JobExecStep.DATA_FAILURE) + self._job_util.save_job(job) + continue + finally: + self._provision_underway_tracker.release() + + job.set_status_step(JobExecStep.AWAITING_SCHEDULING) + self._job_util.save_job(job) + + # Also, unlink usage for any previously existing, job-based users for which the job is no longer active ... + self._unlink_finished_jobs(ds_users=prior_users) + + self._job_util.unlock_active_jobs(lock_id) + await asyncio.sleep(5) + + def _unlink_finished_jobs(self, ds_users: Dict[UUID, DatasetUser]) -> Set[UUID]: + """ + Unlink dataset use for any provided job-based users for which the job is no longer active. + + Unlink dataset use for any of the given dataset users that are specifically ::class`JobDatasetUser` and are + associated with a job that is not in the set of active jobs. + + Parameters + ---------- + ds_users: Dict[UUID, DatasetUser] + A mapping of applicable dataset user objects, mapped by uuid; for the subset of interest, which are + ::class`JobDatasetUser` instances, the key will also be the id of the associated job (as a ::class:`UUID`). + + Returns + ------- + Set[UUID] + Set of the UUIDs of all users associated with finished jobs for which unlinking was performed. + """ + finished_job_users: Set[UUID] = set() + active_job_ids = {UUID(j.job_id) for j in self._job_util.get_all_active_jobs()} + for user in (u for uid, u in ds_users.items() if isinstance(u, JobDatasetUser) and uid not in active_job_ids): + for ds in (self._managers.known_datasets()[ds_name] for ds_name in user.datasets_and_managers): + user.unlink_to_dataset(ds) + finished_job_users.add(user.uuid) + return finished_job_users + +def _get_ds_users(managers: DatasetManagerCollection) -> Dict[UUID, DatasetUser]: + """ + Get dataset users from all associated managers of the service. + + Returns + ------- + Dict[UUID, DatasetUser] + Map of all ::class:`DatasetUser` from all associated managers of the service, keyed by the UUID of each. + """ + return {uuid: mgr.get_dataset_user(uuid) + for _, mgr in managers.managers() for uuid in mgr.get_dataset_user_ids()} + +# TODO: replace with RWLock implementation +class ActiveOperationTracker: + """ + Unbound Semaphore-like synchronization object for counting acquisitions and releases. Unlike a + semaphore, neither `acquire()` nor `release()` block. `acquire()` and `release()` increment and + decrement an internal value respectively. The internal count can be read via the `value` + property. Unlike most semaphore implementations, `value` < 0 does not throw. + + Example: + def foo(tracker: ActiveOperationTracker): + tracker.acquire() + # do something e.g. contact data store + tracker.release() + + def bar(tracker: ActiveOperationTracker): + while tracker.value > 0: + sleep(5) + # do something + """ + def __init__(self, value: int = 0): + self._count = value + + def acquire(self): + self._count += 1 + + def release(self): + self._count -= 1 + + @property + def value(self) -> int: + return self._count diff --git a/python/services/dataservice/dmod/test/test_dataset_manager_collection.py b/python/services/dataservice/dmod/test/test_dataset_manager_collection.py index f97279ec9..904557ddf 100644 --- a/python/services/dataservice/dmod/test/test_dataset_manager_collection.py +++ b/python/services/dataservice/dmod/test/test_dataset_manager_collection.py @@ -10,7 +10,7 @@ from dmod.core.exception import DmodRuntimeError from dmod.dataservice.dataset_manager_collection import DatasetManagerCollection -from .test_service_manager import MockDataset, MockDatasetManager +from .test_required_data_checks_manager import MockDataset, MockDatasetManager class MockDatasetManagerSupportsMultipleTypes(MockDatasetManager): diff --git a/python/services/dataservice/dmod/test/test_service_manager.py b/python/services/dataservice/dmod/test/test_required_data_checks_manager.py similarity index 96% rename from python/services/dataservice/dmod/test/test_service_manager.py rename to python/services/dataservice/dmod/test/test_required_data_checks_manager.py index cff1c1d61..9e38f1044 100644 --- a/python/services/dataservice/dmod/test/test_service_manager.py +++ b/python/services/dataservice/dmod/test/test_required_data_checks_manager.py @@ -4,14 +4,13 @@ import os from ..dataservice.dataset_manager_collection import DatasetManagerCollection -from ..dataservice.service import ServiceManager -from ..dataservice.service_settings import ServiceSettings +from ..dataservice.dataset_inquery_util import DatasetInqueryUtil +from ..dataservice.service import ActiveOperationTracker, RequiredDataChecksManager from dmod.communication.client import get_or_create_eventloop from dmod.core.dataset import DataCategory, DataDomain, Dataset, DatasetManager, DatasetType from dmod.scheduler.job import RequestedJob from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple, Union -from socket import gethostname class MockDataset(Dataset): @@ -59,31 +58,7 @@ def supported_dataset_types(self) -> Set[DatasetType]: return {DatasetType.FILESYSTEM} -class MockKnownDatasetsServiceManager(ServiceManager): - """ - A mock extension of ::class:`ServiceManager`, with a mocked-up overrided of ::method:`get_known_datasets`. - """ - - def __init__(self, dataset_files: List[Path], *args, **kwargs): - datasets: Dict[str, Dataset] = dict() - for d_file in dataset_files: - with d_file.open("r") as open_file: - dataset: Dataset = MockDataset.factory_init_from_deserialized_json(json.load(open_file)) - datasets[dataset.name] = dataset - dataset_manager_collection = DatasetManagerCollection() - dataset_manager_collection.add(MockDatasetManager(datasets=datasets)) - - # Should be able to get away with no job_util for what we are using this for - super().__init__( - job_util=None, - *args, - settings=ServiceSettings(), - dataset_manager_collection=dataset_manager_collection, - **kwargs - ) - - -class TestServiceManager(unittest.TestCase): +class TestRequiredDataChecksManager(unittest.TestCase): @classmethod def find_git_root_dir(cls, path: Optional[Path] = None) -> str: @@ -108,13 +83,10 @@ def find_git_root_dir(cls, path: Optional[Path] = None) -> str: ValueError : If the rev of the obtained ::class:`git.Repo` couldn't be parsed IndexError: If an invalid reflog index is specified. """ - if path is None: - path = Path('.') - git_repo = git.Repo(path, search_parent_directories=True) - return git_repo.git.rev_parse("--show-toplevel") + return git.Repo(__file__, search_parent_directories=True).working_dir def __init__(self, *args, **kwargs): - super(TestServiceManager, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) self._proj_root = None self._ssl_certs_dir = None @@ -134,10 +106,23 @@ def setUp(self) -> None: self.loop = get_or_create_eventloop() example_serial_datasets_dir = self.proj_root.joinpath('data').joinpath('serialized_dataset_examples') - dataset_files = [p for p in example_serial_datasets_dir.glob('*.json')] - self.manager = MockKnownDatasetsServiceManager(dataset_files=dataset_files, listen_host=gethostname(), - port=33015, ssl_dir=self.ssl_certs_dir) + datasets: Dict[str, Dataset] = dict() + for d_file in example_serial_datasets_dir.glob('*.json'): + with d_file.open("r") as open_file: + dataset: Dataset = MockDataset.factory_init_from_deserialized_json(json.load(open_file)) + datasets[dataset.name] = dataset + dataset_manager_collection = DatasetManagerCollection() + dataset_manager_collection.add(MockDatasetManager(datasets=datasets)) + + dataset_inquery_util = DatasetInqueryUtil(dataset_manager_collection=dataset_manager_collection) + + self.manager = RequiredDataChecksManager( + job_util=None, + dataset_manager_collection=dataset_manager_collection, + checks_underway_tracker=ActiveOperationTracker(), + dataset_inquery_util=dataset_inquery_util, + ) self.example_jobs = []