diff --git a/cwl_wes/app.py b/cwl_wes/app.py index 0fc8fce..e2f4110 100644 --- a/cwl_wes/app.py +++ b/cwl_wes/app.py @@ -6,7 +6,6 @@ from flask import current_app from foca import Foca -from cwl_wes.tasks.register_celery import register_task_service from cwl_wes.ga4gh.wes.service_info import ServiceInfo from cwl_wes.exceptions import NotFound @@ -25,7 +24,6 @@ def init_app() -> App: service_info.set_service_info( data=current_app.config.foca.custom.service_info.dict() ) - register_task_service() return app diff --git a/cwl_wes/config.yaml b/cwl_wes/config.yaml index 95cf9fa..59e038d 100644 --- a/cwl_wes/config.yaml +++ b/cwl_wes/config.yaml @@ -118,13 +118,27 @@ custom: tmp_dir: '/data/tmp' remote_storage_url: 'ftp://ftp-private.ebi.ac.uk/upload/foivos' celery: - monitor: - timeout: 0.1 + timeout: 0.1 message_maxsize: 16777216 - endpoint_params: + controller: default_page_size: 5 timeout_cancel_run: 60 timeout_run_workflow: null + tes_server: + url: 'http://62.217.122.249:31567/' + timeout: 5 + status_query_params: 'FULL' + drs_server: + port: null # use this port for resolving DRS URIs; set to `null` to use default (443) + base_path: null # use this base path for resolving DRS URIs; set to `null` to use default (`ga4gh/drs/v1`) + use_http: False # use `http` for resolving DRS URIs; set to `False` to use default (`https`) + file_types: # extensions of files to scan for DRS URI resolution + - cwl + - yaml + - yml + runs_id: + length: 6 + charset: string.ascii_uppercase + string.digits service_info: contact_info: 'https://github.com/elixir-cloud-aai/cwl-WES' auth_instructions_url: 'https://www.elixir-europe.org/services/compute/aai' @@ -147,19 +161,3 @@ custom: default_value: '5' tags: known_tes_endpoints: 'https://tes.tsi.ebi.ac.uk/|https://tes-dev.tsi.ebi.ac.uk/|https://csc-tesk.c03.k8s-popup.csc.fi/|https://tesk.c01.k8s-popup.csc.fi/' - app_version: 0.15.0 - tes_server: - url: 'https://csc-tesk.c03.k8s-popup.csc.fi/' - timeout: 5 - status_query_params: 'FULL' - drs_server: - port: null # use this port for resolving DRS URIs; set to `null` to use default (443) - base_path: null # use this base path for resolving DRS URIs; set to `null` to use default (`ga4gh/drs/v1`) - use_http: False # use `http` for resolving DRS URIs; set to `False` to use default (`https`) - file_types: # extensions of files to scan for DRS URI resolution - - cwl - - yaml - - yml - runs_id: - length: 6 - charset: string.ascii_uppercase + string.digits diff --git a/cwl_wes/custom_config.py b/cwl_wes/custom_config.py index 1358add..d66be5c 100644 --- a/cwl_wes/custom_config.py +++ b/cwl_wes/custom_config.py @@ -3,25 +3,61 @@ from typing import Dict, List, Optional from foca.models.config import FOCABaseConfig + class StorageConfig(FOCABaseConfig): + """Model for task run and storage configuration. + + Args: + tmp_dir: Temporary run directory path + permanent_dir: Permanent working directory path + remote_storage_url: Remote file storage FTP endpoint + + Attributes: + tmp_dir: Temporary run directory path + permanent_dir: Permanent working directory path + remote_storage_url: Remote file storage FTP endpoint + + Raises: + pydantic.ValidationError: The class was instantianted with an illegal + data type. + + Example: + >>> StorageConfig( + ... tmp_dir='/data/tmp', + ... permanent_dir='/data/output', + ... remote_storage_url='ftp://ftp.private/upload' + ... ) + StorageConfig(tmp_dir='/data/tmp', permanent_dir='/data/output', remote_storage_url='ftp://ftp.private/upload') + """ permanent_dir: str = '/data/output' tmp_dir: str = '/data/tmp' remote_storage_url: str = 'ftp://ftp-private.ebi.ac.uk/upload/foivos' -class MonitorConfig(FOCABaseConfig): - timeout: float = 0.1 +class CeleryConfig(FOCABaseConfig): + """Model for celery configurations. + Args: + timeout: Celery task timeout. + message_maxsize: Celery message max size. -class CeleryConfig(FOCABaseConfig): - monitor: MonitorConfig = MonitorConfig() - message_maxsize: int = 16777216 + Attributes: + timeout: Celery task timeout. + message_maxsize: Celery message max size. + Raises: + pydantic.ValidationError: The class was instantianted with an illegal + data type. -class EndpointConfig(FOCABaseConfig): - default_page_size: int = 5 - timeout_cancel_run: int = 60 - timeout_run_workflow: Optional[int] = None + Example: + >>> CeleryConfig( + ... timeout=15, + ... message_maxsize=1024 + ... ) + CeleryConfig(timeout=15, message_maxsize=1024) + """ + timeout: float = 0.1 + message_maxsize: int = 16777216 class WorkflowTypeVersionConfig(FOCABaseConfig): @@ -29,20 +65,48 @@ class WorkflowTypeVersionConfig(FOCABaseConfig): Args: workflow_type_version: List of one or more acceptable versions for the workflow type. + + Attributes: + workflow_type_version: List of one or more acceptable versions for the + workflow type. + + Raises: + pydantic.ValidationError: The class was instantianted with an illegal + data type. + + Example: + >>> WorkflowTypeVersionConfig( + ... workflow_type_version=['v1.0'] + ... ) + WorkflowTypeVersionConfig(workflow_type_version=['v1.0']) """ workflow_type_version: Optional[List[str]] = [] class DefaultWorkflowEngineParameterConfig(FOCABaseConfig): """Model for default workflow engine parameters. + Args: name: Parameter name. type: Parameter type. default_value: Stringified version of default parameter. + Attributes: name: Parameter name. type: Parameter type. default_value: Stringified version of default parameter. + + Raises: + pydantic.ValidationError: The class was instantianted with an illegal + data type. + + Example: + >>> DefaultWorkflowEngineParameterConfig( + ... name='name', + ... type='str', + ... default_value='default' + ... ) + DefaultWorkflowEngineParameterConfig(name='name', type='str', default_value='default') """ name: Optional[str] type: Optional[str] @@ -50,11 +114,87 @@ class DefaultWorkflowEngineParameterConfig(FOCABaseConfig): class TagsConfig(FOCABaseConfig): + """Model for service info tag configuration. + + Args: + known_tes_endpoints: Valid TES endpoints. + + Attributes: + known_tes_endpoints: Valid TES endpoints. + + Raises: + pydantic.ValidationError: The class was instantianted with an illegal + data type. + + Example: + >>> TagsConfig( + ... known_tes_endpoints='https://tes.endpoint', + ... ) + TagsConfig(known_tes_endpoints='https://tes.endpoint') + """ known_tes_endpoints: str = 'https://tes.tsi.ebi.ac.uk/|https://tes-dev.tsi.ebi.ac.uk/|https://csc-tesk.c03.k8s-popup.csc.fi/|https://tesk.c01.k8s-popup.csc.fi/' - app_version: str = '0.15.0' class ServiceInfoConfig(FOCABaseConfig): + """Model for service info configurations. + + Args: + contact_info: Email address/webpage URL with contact information. + auth_instructions_url: Web page URL with information about how to get an + authorization token necessary to use a specific endpoint. + supported_filesystem_protocols: Filesystem protocols supported by this + service. + supported_wes_versions: Version(s) of the WES schema supported by this + service. + workflow_type_versions: Map with keys as the workflow format type name and + value is a `WorkflowTypeVersionConfig` object which simply contains an + array of one or more version strings. + workflow_engine_versions: Workflow engine(s) used by this WES service. + default_workflow_engine_parameters: Each workflow engine can present additional + parameters that can be sent to the workflow engine. + tags: A key-value map of arbitrary, extended metadata outside the scope of the above but + useful to report back. + + Attributes: + contact_info: Email address/webpage URL with contact information. + auth_instructions_url: Web page URL with information about how to get an + authorization token necessary to use a specific endpoint. + supported_filesystem_protocols: Filesystem protocols supported by this + service. + supported_wes_versions: Version(s) of the WES schema supported by this + service. + workflow_type_versions: Map with keys as the workflow format type name and + value is a `WorkflowTypeVersionConfig` object which simply contains an + array of one or more version strings. + workflow_engine_versions: Workflow engine(s) used by this WES service. + default_workflow_engine_parameters: Each workflow engine can present additional + parameters that can be sent to the workflow engine. + tags: A key-value map of arbitrary, extended metadata outside the scope of the above but + useful to report back. + + Raises: + pydantic.ValidationError: The class was instantianted with an illegal + data type. + + Example: + >>> ServiceInfoConfig( + ... contact_info='https://contact.url', + ... auth_instructions_url='https://auth.url', + ... supported_filesystem_protocols=['ftp', 'https', 'local'], + ... supported_wes_versions=['1.0.0'], + ... workflow_type_versions={'CWL': WorkflowTypeVersionConfig(workflow_type_version=['v1.0'])}, + ... workflow_engine_versions={}, + ... default_workflow_engine_parameters=[], + ... tags=TagsConfig(known_tes_endpoints='https://tes.endpoint/') + ... ) + ServiceInfoConfig(contact_info='https://github.com/elixir-cloud-aai/cwl-WES', auth_instruc\ + tions_url='https://www.elixir-europe.org/services/compute/aai', supported_filesystem_proto\ + cols=['ftp', 'https', 'local'], supported_wes_versions=['1.0.0'], workflow_type_versions={\ + 'CWL': WorkflowTypeVersionConfig(workflow_type_version=['v1.0'])}, workflow_engine_version\ + s={}, default_workflow_engine_parameters=[], tags=TagsConfig(known_tes_endpoints='https://\ + tes.tsi.ebi.ac.uk/|https://tes-dev.tsi.ebi.ac.uk/|https://csc-tesk.c03.k8s-popup.csc.fi/|h\ + ttps://tesk.c01.k8s-popup.csc.fi/')) + """ contact_info: str = 'https://github.com/elixir-cloud-aai/cwl-WES' auth_instructions_url: str = 'https://www.elixir-europe.org/services/compute/aai' supported_filesystem_protocols: List[str] = ['ftp', 'https', 'local'] @@ -70,12 +210,69 @@ class ServiceInfoConfig(FOCABaseConfig): class TesServerConfig(FOCABaseConfig): + """Model for tes server configuration. + + Args: + url: TES Endpoint URL. + timeout: Request time out. + status_query_params: Request query parameters. + + Attributes: + url: TES Endpoint URL. + timeout: Request time out. + status_query_params: Request query parameters. + + Raises: + pydantic.ValidationError: The class was instantianted with an illegal + data type. + + Example: + >>> TesServerConfig( + ... url='https://tes.endpoint', + ... timeout=5, + ... status_query_params='FULL' + ... ) + TesServerConfig(url='https://tes.endpoint', timeout=5, status_query_params='FULL') + """ url: str = 'https://csc-tesk.c03.k8s-popup.csc.fi/' timeout: int = 5 status_query_params: str = 'FULL' class DRSServerConfig(FOCABaseConfig): + """Model for DRS server configuration. + + Args: + port: Port for resolving DRS URIs; + set to `null` to use default (443). + base_path: Base path for resolving DRS URIs; + set to `null` to use default (`ga4gh/drs/v1`). + use_http: Use `http` for resolving DRS URIs; + set to `False` to use default (`https`). + file_types: Extensions of files to scan for DRS URI resolution. + + Attributes: + port: Port for resolving DRS URIs; + set to `null` to use default (443). + base_path: Base path for resolving DRS URIs; + set to `null` to use default (`ga4gh/drs/v1`). + use_http: Use `http` for resolving DRS URIs; + set to `False` to use default (`https`). + file_types: Extensions of files to scan for DRS URI resolution. + + Raises: + pydantic.ValidationError: The class was instantianted with an illegal + data type. + + Example: + >>> DRSServerConfig( + ... port=443, + ... base_path='ga4gh/drs/v1', + ... use_http=False, + ... file_types=['cwl', 'yaml', 'yml'] + ... ) + DRSServerConfig(port=443, base_path='ga4gh/drs/v1', use_http=False, file_types=['cwl', 'yaml', 'yml']) + """ port: Optional[int] = None base_path: Optional[str] = None use_http: bool = False @@ -110,12 +307,66 @@ class IdConfig(FOCABaseConfig): charset: str = string.ascii_uppercase + string.digits +class ControllerConfig(FOCABaseConfig): + """Model for controller configurations. + + Args: + default_page_size: Pagination page size. + timeout_cancel_run: Timeout for `cancel_run` workflow. + timeout_run_workflow: Timeout for `run_workflow` workflow. + tes_server: TES Server config parameters. + drs_server: DRS Server config parameters. + runs_id: Identifier config parameters. + + Attributes: + default_page_size: Pagination page size. + timeout_cancel_run: Timeout for `cancel_run` workflow. + timeout_run_workflow: Timeout for `run_workflow` workflow. + tes_server: TES Server config parameters. + drs_server: DRS Server config parameters. + runs_id: Identifier config parameters. + + Raises: + pydantic.ValidationError: The class was instantianted with an illegal + data type. + + Example: + >>> ControllerConfig( + ... default_page_size=5, + ... timeout_cancel_run=60, + ... timeout_run_workflow=None + ... ) + ControllerConfig(default_page_size=5, timeout_cancel_run=60, timeout_run_workflow=60) + """ + default_page_size: int = 5 + timeout_cancel_run: int = 60 + timeout_run_workflow: Optional[int] = None + tes_server: TesServerConfig = TesServerConfig() + drs_server: DRSServerConfig = DRSServerConfig() + runs_id: IdConfig = IdConfig() + + class CustomConfig(FOCABaseConfig): + """Model for custom configuration parameters. + + Args: + storage: Storage config parameters. + celery: Celery config parameters. + controller: Controller config parameters. + service_info: Service Info config parameters. + + Attributes: + storage: Storage config parameters. + celery: Celery config parameters. + controller: Controller config parameters. + service_info: Service Info config parameters. + + Raises: + pydantic.ValidationError: The class was instantianted with an illegal + data type. + """ storage: StorageConfig = StorageConfig() celery: CeleryConfig = CeleryConfig() - endpoint_params: EndpointConfig = EndpointConfig() + controller: ControllerConfig = ControllerConfig() service_info: ServiceInfoConfig = ServiceInfoConfig() - tes_server: TesServerConfig = TesServerConfig() - drs_server: DRSServerConfig = DRSServerConfig() - runs_id: IdConfig = IdConfig() - \ No newline at end of file + diff --git a/cwl_wes/ga4gh/wes/endpoints/cancel_run.py b/cwl_wes/ga4gh/wes/endpoints/cancel_run.py index 0050c66..6c5e7fa 100644 --- a/cwl_wes/ga4gh/wes/endpoints/cancel_run.py +++ b/cwl_wes/ga4gh/wes/endpoints/cancel_run.py @@ -62,7 +62,7 @@ def cancel_run( if document['api']['state'] in States.CANCELABLE: # Get timeout duration - timeout_duration = foca_config.custom.endpoint_params.timeout_cancel_run + timeout_duration = foca_config.custom.controller.timeout_cancel_run # Execute cancelation task in background task_id = uuid() diff --git a/cwl_wes/ga4gh/wes/endpoints/list_runs.py b/cwl_wes/ga4gh/wes/endpoints/list_runs.py index bd7b2f4..485443a 100644 --- a/cwl_wes/ga4gh/wes/endpoints/list_runs.py +++ b/cwl_wes/ga4gh/wes/endpoints/list_runs.py @@ -24,7 +24,7 @@ def list_runs( if 'page_size' in kwargs: page_size = kwargs['page_size'] else: - page_size = config.foca.custom.endpoint_params.default_page_size + page_size = config.foca.custom.controller.default_page_size # Extract/set page token if 'page_token' in kwargs: diff --git a/cwl_wes/ga4gh/wes/endpoints/run_workflow.py b/cwl_wes/ga4gh/wes/endpoints/run_workflow.py index b9a8479..d7f2842 100644 --- a/cwl_wes/ga4gh/wes/endpoints/run_workflow.py +++ b/cwl_wes/ga4gh/wes/endpoints/run_workflow.py @@ -188,8 +188,8 @@ def __create_run_environment( collection_runs: Collection = config.foca.db.dbs['cwl-wes-db'].collections['runs'].client out_dir = config.foca.custom.storage.permanent_dir tmp_dir = config.foca.custom.storage.tmp_dir - run_id_charset = eval(config.foca.custom.runs_id.charset) - run_id_length = config.foca.custom.runs_id.length + run_id_charset = eval(config.foca.custom.controller.runs_id.charset) + run_id_length = config.foca.custom.controller.runs_id.length # Keep on trying until a unique run id was found and inserted # TODO: If no more possible IDs => inf loop; fix (raise custom error; 500 @@ -256,7 +256,7 @@ def __create_run_environment( break # translate DRS URIs to access URLs - drs_server_conf = current_app.config.foca.custom.drs_server + drs_server_conf = current_app.config.foca.custom.controller.drs_server service_info_conf = current_app.config.foca.custom.service_info file_types: List[str] = drs_server_conf.file_types supported_access_methods: List[str] = service_info_conf.supported_filesystem_protocols @@ -502,7 +502,7 @@ def __run_workflow( **kwargs ) -> None: """Helper function `run_workflow()`.""" - tes_url = config.foca.custom.tes_server.url + tes_url = config.foca.custom.controller.tes_server.url remote_storage_url = config.foca.custom.storage.remote_storage_url run_id = document['run_id'] task_id = document['task_id'] @@ -546,7 +546,7 @@ def __run_workflow( # ] # Get timeout duration - timeout_duration = config.foca.custom.endpoint_params.timeout_run_workflow + timeout_duration = config.foca.custom.controller.timeout_run_workflow # Execute command as background task logger.info( diff --git a/cwl_wes/tasks/celery_task_monitor.py b/cwl_wes/tasks/celery_task_monitor.py deleted file mode 100644 index a47660a..0000000 --- a/cwl_wes/tasks/celery_task_monitor.py +++ /dev/null @@ -1,592 +0,0 @@ -"""Celery task monitor, event handlers and related utility functions.""" - -from ast import literal_eval -from datetime import datetime -import logging -import os -import re -from shlex import quote -from threading import Thread -from time import sleep -from typing import (Dict, List, Optional) - -from celery import Celery -from celery.events import Event -from celery.events.receiver import EventReceiver -from kombu.connection import Connection # noqa: F401 -from pymongo import collection as Collection -import tes - -import cwl_wes.utils.db_utils as db_utils - - -# Get logger instance -logger = logging.getLogger(__name__) - - -# Set string time format -strf: str = '%Y-%m-%d %H:%M:%S.%f' - - -class TaskMonitor(): - """Celery task monitor.""" - - def __init__( - self, - celery_app: Celery, - collection: Collection, - tes_config: Dict[str, str], - timeout: float = 0, - authorization: bool = True, - ) -> None: - """Starts Celery task monitor daemon process.""" - self.celery_app = celery_app - self.collection = collection - self.timeout = timeout - self.authorization = authorization - self.tes_config = tes_config - - self.thread = Thread(target=self.run, args=()) - self.thread.daemon = True - self.thread.start() - - logger.debug('Celery task monitor daemon process started...') - - def run(self) -> None: - """Daemon process for Celery task monitor.""" - while True: - - try: - - with self.celery_app.connection() as \ - connection: # type: Connection - - listener: EventReceiver = self.celery_app.events.Receiver( - connection, - handlers={ - 'task-received': - self.on_task_received, - 'task-started': - self.on_task_started, - 'task-failed': - self.on_task_failed, - 'task-succeeded': - self.on_task_succeeded, - 'task-tes-task-update': - self.on_task_tes_task_update, - } - ) - listener.capture(limit=None, timeout=None, wakeup=True) - - except KeyboardInterrupt as e: - logger.exception( - ( - 'Task monitor interrupted. Execution aborted. ' - 'Original error message: {type}: {msg}' - ).format( - type=type(e).__name__, - msg=e, - ) - ) - raise SystemExit - - except Exception as e: - logger.exception( - ( - 'Unknown error in task monitor occurred. Original ' - 'error message: {type}: {msg}' - ).format( - type=type(e).__name__, - msg=e, - ) - ) - pass - - # Sleep for specified interval - sleep(self.timeout) - - def on_task_received( - self, - event: Event, - ) -> None: - """Event handler for received Celery tasks.""" - if not event['name'] == 'tasks.run_workflow': - return None - # Parse subprocess inputs - try: - kwargs = literal_eval(event['kwargs']) - except Exception as e: - logger.exception( - ( - "Field 'kwargs' in event message malformed. Original " - 'error message: {type}: {msg}' - ).format( - type=type(e).__name__, - msg=e, - ) - ) - pass - - # Build command - if 'command_list' in kwargs: - if self.authorization: - kwargs['command_list'][3] = '' - kwargs['command_list'][5] = '' - command = ' '.join( - [quote(item) for item in kwargs['command_list']] - ) - else: - command = 'N/A' - - # Create dictionary for internal parameters - internal = dict() - internal['task_received'] = datetime.utcfromtimestamp( - event['timestamp'] - ) - internal['process_id_worker'] = event['pid'] - internal['host'] = event['hostname'] - - # Update run document in database - try: - self.update_run_document( - event=event, - state='QUEUED', - internal=internal, - task_received=datetime.utcfromtimestamp( - event['timestamp'] - ).strftime(strf), - command=command, - utc_offset=event['utcoffset'], - max_retries=event['retries'], - expires=event['expires'], - ) - except Exception as e: - logger.exception( - ( - 'Database error. Could not update log information for ' - "task '{task}'. Original error message: {type}: {msg}" - ).format( - task=event['uuid'], - type=type(e).__name__, - msg=e, - ) - ) - - def on_task_started( - self, - event: Event, - ) -> None: - """Event handler for started Celery tasks.""" - if not self.collection.find_one({'task_id': event['uuid']}): - return None - internal = dict() - internal['task_started'] = datetime.utcfromtimestamp( - event['timestamp'] - ) - # Update run document in database - try: - self.update_run_document( - event=event, - state='RUNNING', - internal=internal, - task_started=datetime.utcfromtimestamp( - event['timestamp'] - ).strftime(strf), - ) - except Exception as e: - logger.exception( - ( - 'Database error. Could not update log information for ' - "task '{task}'. Original error message: {type}: {msg}" - ).format( - task=event['uuid'], - type=type(e).__name__, - msg=e, - ) - ) - - def on_task_failed( - self, - event: Event, - ) -> None: - """Event handler for failed (system error) Celery tasks.""" - if not self.collection.find_one({'task_id': event['uuid']}): - return None - # Create dictionary for internal parameters - internal = dict() - internal['task_finished'] = datetime.utcfromtimestamp( - event['timestamp'] - ) - internal['traceback'] = event['traceback'] - - # Update run document in databse - self.update_run_document( - event=event, - state='SYSTEM_ERROR', - internal=internal, - task_finished=datetime.utcfromtimestamp( - event['timestamp'] - ).strftime(strf), - exception=event['exception'], - ) - - def on_task_succeeded( - self, - event: Event, - ) -> None: - """Event handler for successful, failed and canceled Celery - tasks.""" - if not self.collection.find_one({'task_id': event['uuid']}): - return None - # Parse subprocess results - try: - (returncode, log, tes_ids, token) = literal_eval(event['result']) - log_list=log - log = os.linesep.join(log) - except Exception as e: - logger.exception( - ( - "Field 'result' in event message malformed. Original " - 'error message: {type}: {msg}' - ).format( - type=type(e).__name__, - msg=e, - ) - ) - pass - - # Create dictionary for internal parameters - internal = dict() - internal['task_finished'] = datetime.utcfromtimestamp( - event['timestamp'] - ) - - # Set final state to be set - document = self.collection.find_one( - filter={'task_id': event['uuid']}, - projection={ - 'api.state': True, - '_id': False, - } - ) - if document and document['api']['state'] == 'CANCELING': - state = 'CANCELED' - elif returncode: - state = 'EXECUTOR_ERROR' - else: - state = 'COMPLETE' - - # Extract run outputs - #outputs = self.__cwl_tes_outputs_parser(log) - outputs = self.__cwl_tes_outputs_parser_list(log_list) - - # Get task logs - task_logs = self.__get_tes_task_logs( - tes_ids=tes_ids, - token=token, - ) - - # Update run document in database - try: - self.update_run_document( - event=event, - state=state, - internal=internal, - outputs=outputs, - task_logs=task_logs, - task_finished=datetime.utcfromtimestamp( - event['timestamp'] - ).strftime(strf), - return_code=returncode, - stdout=log, - stderr='', - ) - except Exception as e: - logger.exception( - ( - 'Database error. Could not update log information for ' - "task '{task}'. Original error message: {type}: {msg}" - ).format( - task=event['uuid'], - type=type(e).__name__, - msg=e, - ) - ) - pass - - def on_task_tes_task_update( - self, - event: Event, - ) -> None: - """Event handler for TES task state changes.""" - # If TES task is new, add task log to database - if not event['tes_state']: - tes_log = self.__get_tes_task_log( - tes_id=event['tes_id'], - token=event['token'], - ) - try: - db_utils.append_to_tes_task_logs( - collection=self.collection, - task_id=event['uuid'], - tes_log=tes_log, - ) - except Exception as e: - logger.exception( - ( - 'Database error. Could not update log information for ' - "task '{task}'. Original error message: {type}: {msg}" - ).format( - task=event['uuid'], - type=type(e).__name__, - msg=e, - ) - ) - pass - - # Otherwise only update state - else: - try: - db_utils.update_tes_task_state( - collection=self.collection, - task_id=event['uuid'], - tes_id=event['tes_id'], - state=event['tes_state'], - ) - logger.info( - ( - "State of TES task '{tes_id}' of run with task ID " - "'{task_id}' changed to '{state}'." - ).format( - task_id=event['uuid'], - tes_id=event['tes_id'], - state=event['tes_state'], - ) - ) - except Exception as e: - logger.exception( - ( - 'Database error. Could not update log information for ' - "task '{task}'. Original error message: {type}: {msg}" - ).format( - task=event['uuid'], - type=type(e).__name__, - msg=e, - ) - ) - pass - - def update_run_document( - self, - event: Event, - state: Optional[str] = None, - internal: Optional[Dict] = None, - outputs: Optional[Dict] = None, - task_logs: Optional[List[Dict]] = None, - **run_log_params - ): - """Updates state, internal and run log parameters in database - document. - """ - # TODO: Minimize db ops; try to compile entire object & update once - # Update internal parameters - if internal: - document = db_utils.upsert_fields_in_root_object( - collection=self.collection, - task_id=event['uuid'], - root='internal', - **internal, - ) - - # Update outputs - if outputs: - document = db_utils.upsert_fields_in_root_object( - collection=self.collection, - task_id=event['uuid'], - root='api.outputs', - **outputs, - ) - - # Update task logs - if task_logs: - document = db_utils.upsert_fields_in_root_object( - collection=self.collection, - task_id=event['uuid'], - root='api', - task_logs=task_logs, - ) - - # Update run log parameters - if run_log_params: - document = db_utils.upsert_fields_in_root_object( - collection=self.collection, - task_id=event['uuid'], - root='api.run_log', - **run_log_params, - ) - - # Calculate queue, execution and run time - if document and document['internal']: - run_log = document['internal'] - durations = dict() - - if 'task_started' in run_log_params: - if 'task_started' in run_log and 'task_received' in run_log: - pass - durations['time_queue'] = ( - run_log['task_started'] - run_log['task_received'] - ).total_seconds() - - if 'task_finished' in run_log_params: - if 'task_finished' in run_log and 'task_started' in run_log: - pass - durations['time_execution'] = ( - run_log['task_finished'] - run_log['task_started'] - ).total_seconds() - if 'task_finished' in run_log and 'task_received' in run_log: - pass - durations['time_total'] = ( - run_log['task_finished'] - run_log['task_received'] - ).total_seconds() - - if durations: - document = db_utils.upsert_fields_in_root_object( - collection=self.collection, - task_id=event['uuid'], - root='api.run_log', - **durations, - ) - - # Update state - if state: - try: - document = db_utils.update_run_state( - collection=self.collection, - task_id=event['uuid'], - state=state, - ) - except Exception: - raise - - # Log info message - if document: - logger.info( - ( - "State of run '{run_id}' (task id: '{task_id}') changed " - "to '{state}'." - ).format( - run_id=document['run_id'], - task_id=event['uuid'], - state=state, - ) - ) - - return document - - @staticmethod - def __cwl_tes_outputs_parser(log: str) -> Dict: - """Parses outputs from cwl-tes log.""" - # Find outputs object in log string - re_outputs = re.compile( - r'(^\{$\n^ {4}"\S+": [\[\{]$\n(^ {4,}.*$\n)*^ {4}[\]\}]$\n^\}$\n)', - re.MULTILINE - ) - m = re_outputs.search(log) - if m: - return literal_eval(m.group(1)) - else: - return dict() - - @staticmethod - def __cwl_tes_outputs_parser_list(log: List) -> Dict: - """This function parses outputs from the cwl-tes log""" - """The outputs JSON starts at the line before last in the logs""" - """So unless the outputs are empty ({}), parse upward,""" - """until you find the beginning of the JSON containing the outputs""" - - indices=range(len(log)-1,-1,-1) - - start=-1 - end=-1 - for index in indices: - if log[index].rstrip()=='{}': - return dict() - elif log[index].rstrip()=='}': - end=index - break - - # No valid JSON was found and the previous loop - # reached the end of the log - if end==0: - return dict() - - indices=range(end-1,-1,-1) - for index in indices: - if log[index].rstrip()=='{': - start=index - break - - json=os.linesep.join(log[start:end+1]) - - try: - return literal_eval(json) - except ValueError as verr: - logger.exception( - "ValueError when evaluation JSON: '%s'. Original error message: %s" % \ - (json, verr) - ) - return dict() - except SyntaxError as serr: - logger.exception( - "SyntaxError when evaluation JSON: '%s'. Original error message: %s" % \ - (json, serr) - ) - return dict() - - def __get_tes_task_logs( - self, - tes_ids: List = list(), - token: Optional[str] = None, - ) -> List[Dict]: - """Gets multiple task logs from TES instance.""" - task_logs = list() - for tes_id in tes_ids: - task_logs.append( - self.__get_tes_task_log( - tes_id=tes_id, - token=token, - ) - ) - return task_logs - - def __get_tes_task_log( - self, - tes_id: str, - token: Optional[str] = None, - ) -> Dict: - """Gets task log from TES instance.""" - tes_client = tes.HTTPClient( - url=self.tes_config['url'], - timeout=self.tes_config['timeout'], - token=token, - ) - - task_log = {} - - try: - task_log = tes_client.get_task( - task_id=tes_id, - view=self.tes_config['query_params'], - ).as_dict() - except Exception as e: - # TODO: handle more robustly: only 400/Bad Request is okay; - # TODO: other errors (e.g. 500) should be dealt with - logger.warning( - "Could not obtain task log. Setting default. Original error " - f"message: {type(e).__name__}: {e}" - ) - task_log = {} - - logger.debug(f'Task log: {task_log}') - - return task_log diff --git a/cwl_wes/tasks/register_celery.py b/cwl_wes/tasks/register_celery.py deleted file mode 100644 index e1856e1..0000000 --- a/cwl_wes/tasks/register_celery.py +++ /dev/null @@ -1,35 +0,0 @@ -"""Function to create Celery app instance and register task monitor.""" - -from cwl_wes.worker import celery_app -import logging -import os - -from foca.models.config import Config -from cwl_wes.tasks.celery_task_monitor import TaskMonitor - - -# Get logger instance -logger = logging.getLogger(__name__) - - -def register_task_service() -> None: - """Instantiates Celery app and registers task monitor.""" - # Ensure that code is executed only once when app reloader is used - if os.environ.get("WERKZEUG_RUN_MAIN") != 'true': - # Start task monitor daemon - foca_config: Config = celery_app.conf.foca - custom_config = foca_config.custom - TaskMonitor( - celery_app=celery_app, - collection=foca_config.db.dbs['cwl-wes-db'].collections['runs'].client, - tes_config={ - 'url': custom_config.tes_server.url, - 'query_params': custom_config.tes_server.status_query_params, - 'timeout': custom_config.tes_server.timeout - }, - timeout=custom_config.celery.monitor.timeout, - authorization=foca_config.security.auth.required, - ) - logger.info('Celery task monitor registered.') - - return None diff --git a/cwl_wes/tasks/tasks/cancel_run.py b/cwl_wes/tasks/tasks/cancel_run.py index 7246865..3e4313f 100644 --- a/cwl_wes/tasks/tasks/cancel_run.py +++ b/cwl_wes/tasks/tasks/cancel_run.py @@ -52,11 +52,12 @@ def task__cancel_run( try: # Cancel individual TES tasks + tes_server_config = foca_config.custom.controller.tes_server __cancel_tes_tasks( collection=collection, run_id=run_id, - url=foca_config.custom.tes_server.tes_server.url, - timeout=foca_config.custom.tes_server.tes_server.timeout, + url=tes_server_config.url, + timeout=tes_server_config.timeout, token=token, ) except SoftTimeLimitExceeded as e: diff --git a/cwl_wes/tasks/tasks/cwl_log_processor.py b/cwl_wes/tasks/tasks/cwl_log_processor.py new file mode 100644 index 0000000..c26c60f --- /dev/null +++ b/cwl_wes/tasks/tasks/cwl_log_processor.py @@ -0,0 +1,297 @@ +import re +import os +import logging +from _io import TextIOWrapper +from typing import (Dict, List, Optional, Tuple) +from ast import literal_eval + +import tes +from cwl_wes.worker import celery_app +import cwl_wes.utils.db_utils as db_utils + +# Get logger instance +logger = logging.getLogger(__name__) + + +class CWLLogProcessor: + + def __init__(self, tes_config, collection) -> None: + self.tes_config = tes_config + self.collection = collection + + def process_cwl_logs( + self, + task: celery_app.Task, + stream: TextIOWrapper, + token: Optional[str] = None, + ) -> Tuple[List, List]: + """Parses combinend cwl-tes STDOUT/STDERR and sends TES task IDs and state + updates to broker.""" + stream_container: List = list() + tes_states: Dict = dict() + + # Iterate over STDOUT/STDERR stream + for line in iter(stream.readline, ''): + + line = line.rstrip() + + # Replace single quote characters to avoid `literal_eval()` errors + line = line.replace("'", '"') + + # Handle special cases + lines = self.process_tes_log(line) + for line in lines: + stream_container.append(line) + logger.info(f"[{task}] {line}") + continue + + # Detect TES task state changes + (tes_id, tes_state) = self.extract_tes_state(line) + if tes_id: + + # Handle new task + if tes_id not in tes_states: + tes_states[tes_id] = tes_state + self.capture_tes_task_update( + task, + tes_id=tes_id, + token=token, + ) + # Handle state change + elif tes_states[tes_id] != tes_state: + tes_states[tes_id] = tes_state + self.capture_tes_task_update( + task, + tes_id=tes_id, + tes_state=tes_state, + ) + logger.info(line) + continue + + stream_container.append(line) + logger.info(line) + + return (stream_container, list(tes_states.keys())) + + + def process_tes_log(self, line: str) -> List[str]: + """Handles irregularities arising from log parsing.""" + lines: List = list() + + # Handle special case where FTP and cwl-tes logs are on same line + re_ftp_cwl_tes = re.compile( + r'^(\*cmd\* .*)(\[step \w*\] produced output \{)$' + ) + m = re_ftp_cwl_tes.match(line) + if m: + lines.append(m.group(1)) + + return lines + + def extract_tes_state( + self, + line: str, + ) -> Tuple[Optional[str], Optional[str]]: + """Extracts task ID and state from cwl-tes log.""" + task_id: Optional[str] = None + task_state: Optional[str] = None + + # Extract new task ID + re_task_new = re.compile(r"^\[job [\w\-]*\] task id: (\S*)$") + m = re_task_new.match(line) + if m: + task_id = m.group(1) + + # Extract task ID and state + re_task_state_poll = re.compile( + r'^\[job [\w\-]*\] POLLING "(\S*)", result: (\w*)' + ) + m = re_task_state_poll.match(line) + if m: + task_id = m.group(1) + task_state = m.group(2) + + return (task_id, task_state) + + + def capture_tes_task_update( + self, + task: celery_app.Task, + tes_id: str, + tes_state: Optional[str] = None, + token: Optional[str] = None, + ) -> None: + """Event handler for TES task state changes.""" + # If TES task is new, add task log to database + logger.info(f"TES_STATE------------->{tes_state}") + cwl_tes_processor = CWLTesProcessor(tes_config=self.tes_config) + if not tes_state: + tes_log = cwl_tes_processor.__get_tes_task_log( + tes_id=tes_id, + token=token, + ) + logger.info(f"LOG------------->{tes_log}") + try: + db_utils.append_to_tes_task_logs( + collection=self.collection, + task_id=task.task_id, + tes_log=tes_log, + ) + except Exception as e: + logger.exception( + ( + 'Database error. Could not update log information for ' + "task '{task}'. Original error message: {type}: {msg}" + ).format( + task=task.task_id, + type=type(e).__name__, + msg=e, + ) + ) + pass + + # Otherwise only update state + else: + try: + db_utils.update_tes_task_state( + collection=self.collection, + task_id=task.task_id, + tes_id=tes_id, + state=tes_state, + ) + logger.info( + ( + "State of TES task '{tes_id}' of run with task ID " + "'{task_id}' changed to '{state}'." + ).format( + task_id=task.task_id, + tes_id=tes_id, + state=tes_state, + ) + ) + except Exception as e: + logger.exception( + ( + 'Database error. Could not update log information for ' + "task '{task}'. Original error message: {type}: {msg}" + ).format( + task=task.task_id, + type=type(e).__name__, + msg=e, + ) + ) + pass + + +class CWLTesProcessor: + + def __init__(self, tes_config) -> None: + self.tes_config = tes_config + + @staticmethod + def __cwl_tes_outputs_parser(log: str) -> Dict: + """Parses outputs from cwl-tes log.""" + # Find outputs object in log string + re_outputs = re.compile( + r'(^\{$\n^ {4}"\S+": [\[\{]$\n(^ {4,}.*$\n)*^ {4}[\]\}]$\n^\}$\n)', + re.MULTILINE + ) + m = re_outputs.search(log) + if m: + return literal_eval(m.group(1)) + else: + return dict() + + @staticmethod + def __cwl_tes_outputs_parser_list(log: List) -> Dict: + """This function parses outputs from the cwl-tes log""" + """The outputs JSON starts at the line before last in the logs""" + """So unless the outputs are empty ({}), parse upward,""" + """until you find the beginning of the JSON containing the outputs""" + + indices=range(len(log)-1,-1,-1) + + start=-1 + end=-1 + for index in indices: + if log[index].rstrip()=='{}': + return dict() + elif log[index].rstrip()=='}': + end=index + break + + # No valid JSON was found and the previous loop + # reached the end of the log + if end==0: + return dict() + + indices=range(end-1,-1,-1) + for index in indices: + if log[index].rstrip()=='{': + start=index + break + + json=os.linesep.join(log[start:end+1]) + + try: + return literal_eval(json) + except ValueError as verr: + logger.exception( + "ValueError when evaluation JSON: '%s'. Original error message: %s" % \ + (json, verr) + ) + return dict() + except SyntaxError as serr: + logger.exception( + "SyntaxError when evaluation JSON: '%s'. Original error message: %s" % \ + (json, serr) + ) + return dict() + + def __get_tes_task_logs( + self, + tes_ids: List = list(), + token: Optional[str] = None, + ) -> List[Dict]: + """Gets multiple task logs from TES instance.""" + task_logs = list() + for tes_id in tes_ids: + task_logs.append( + self.__get_tes_task_log( + tes_id=tes_id, + token=token, + ) + ) + return task_logs + + def __get_tes_task_log( + self, + tes_id: str, + token: Optional[str] = None, + ) -> Dict: + """Gets task log from TES instance.""" + tes_client = tes.HTTPClient( + url=self.tes_config['url'], + timeout=self.tes_config['timeout'], + token=token, + ) + + task_log = {} + + try: + task_log = tes_client.get_task( + task_id=tes_id, + view=self.tes_config['query_params'], + ).as_dict() + except Exception as e: + # TODO: handle more robustly: only 400/Bad Request is okay; + # TODO: other errors (e.g. 500) should be dealt with + logger.warning( + "Could not obtain task log. Setting default. Original error " + f"message: {type(e).__name__}: {e}" + ) + task_log = {} + + logger.debug(f'Task log: {task_log}') + + return task_log \ No newline at end of file diff --git a/cwl_wes/tasks/tasks/run_workflow.py b/cwl_wes/tasks/tasks/run_workflow.py index aa3c94f..e910ea5 100644 --- a/cwl_wes/tasks/tasks/run_workflow.py +++ b/cwl_wes/tasks/tasks/run_workflow.py @@ -1,12 +1,10 @@ """Celery background task to start workflow run.""" -from _io import TextIOWrapper import logging -import re -import subprocess -from typing import (Dict, List, Optional, Tuple) +from typing import (List, Optional, Tuple) from cwl_wes.worker import celery_app +from cwl_wes.tasks.tasks.workflow_run_manager import WorkflowRunManager # Get logger instance @@ -27,131 +25,11 @@ def task__run_workflow( ) -> Tuple[int, List[str], List[str], Optional[str]]: """Adds workflow run to task queue.""" # Execute task in background - proc = subprocess.Popen( - command_list, - cwd=tmp_dir, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - universal_newlines=True, + workflow_run_manager = WorkflowRunManager( + task=self, + command_list=command_list, + tmp_dir=tmp_dir, + token=token ) - # Parse output in real-time - log, tes_ids = __process_cwl_logs( - self, - stream=proc.stdout, - token=token, - ) - - returncode = proc.wait() - - return (returncode, log, tes_ids, token) - - -def __process_cwl_logs( - task: celery_app.Task, - stream: TextIOWrapper, - token: Optional[str] = None, -) -> Tuple[List, List]: - """Parses combinend cwl-tes STDOUT/STDERR and sends TES task IDs and state - updates to broker.""" - stream_container: List = list() - tes_states: Dict = dict() - - # Iterate over STDOUT/STDERR stream - for line in iter(stream.readline, ''): - - line = line.rstrip() - - # Replace single quote characters to avoid `literal_eval()` errors - line = line.replace("'", '"') - - # Handle special cases - lines = __handle_cwl_tes_log_irregularities(line) - for line in lines: - stream_container.append(line) - logger.info(f"[{task}] {line}") - continue - - # Detect TES task state changes - (tes_id, tes_state) = __extract_tes_task_state_from_cwl_tes_log(line) - if tes_id: - - # Handle new task - if tes_id not in tes_states: - tes_states[tes_id] = tes_state - __send_event_tes_task_update( - task, - tes_id=tes_id, - token=token, - ) - # Handle state change - elif tes_states[tes_id] != tes_state: - tes_states[tes_id] = tes_state - __send_event_tes_task_update( - task, - tes_id=tes_id, - tes_state=tes_state, - ) - logger.info(line) - continue - - stream_container.append(line) - logger.info(line) - - return (stream_container, list(tes_states.keys())) - - -def __handle_cwl_tes_log_irregularities(line: str) -> List[str]: - """Handles irregularities arising from log parsing.""" - lines: List = list() - - # Handle special case where FTP and cwl-tes logs are on same line - re_ftp_cwl_tes = re.compile( - r'^(\*cmd\* .*)(\[step \w*\] produced output \{)$' - ) - m = re_ftp_cwl_tes.match(line) - if m: - lines.append(m.group(1)) - - return lines - - -def __extract_tes_task_state_from_cwl_tes_log( - line: str, -) -> Tuple[Optional[str], Optional[str]]: - """Extracts task ID and state from cwl-tes log.""" - task_id: Optional[str] = None - task_state: Optional[str] = None - - # Extract new task ID - re_task_new = re.compile(r"^\[job [\w\-]*\] task id: (\S*)$") - m = re_task_new.match(line) - if m: - task_id = m.group(1) - - # Extract task ID and state - re_task_state_poll = re.compile( - r'^\[job [\w\-]*\] POLLING "(\S*)", result: (\w*)' - ) - m = re_task_state_poll.match(line) - if m: - task_id = m.group(1) - task_state = m.group(2) - - return (task_id, task_state) - - -def __send_event_tes_task_update( - task: celery_app.Task, - tes_id: str, - tes_state: Optional[str] = None, - token: Optional[str] = None, -) -> None: - """Sends custom event to inform about TES task state change.""" - task.send_event( - 'task-tes-task-update', - tes_id=tes_id, - tes_state=tes_state, - token=token, - ) - - return None + return_val = workflow_run_manager.run_workflow() + return return_val diff --git a/cwl_wes/tasks/tasks/workflow_run_manager.py b/cwl_wes/tasks/tasks/workflow_run_manager.py new file mode 100644 index 0000000..68ec64a --- /dev/null +++ b/cwl_wes/tasks/tasks/workflow_run_manager.py @@ -0,0 +1,394 @@ +import os +import logging +import subprocess +from typing import (Dict, List, Optional) +import time +from datetime import datetime + +from foca.models.config import Config + +from cwl_wes.worker import celery_app +from cwl_wes.tasks.tasks.cwl_log_processor import CWLLogProcessor, CWLTesProcessor +import cwl_wes.utils.db_utils as db_utils + +# Get logger instance +logger = logging.getLogger(__name__) + + +class WorkflowRunManager: + """Workflow run manager. + """ + + def __init__( + self, + command_list: List, + task: celery_app.Task, + tmp_dir: str, + token: Optional[str] = None + ) -> None: + """Initiate workflow run manager instance. + + Args: + task: Celery task instance for initiating workflow run. + task_id: Unique identifier for workflow run task. + command_list: List of commands to be executed as a part of workflow run. + tmp_dir: Current working directory to be passed for child process execution + context. + token: JSON Web Token (JWT). + foca_config: :py:class:`foca.models.config.Config` instance + describing configurations registered with `celery_app`. + custom_config: :py:class:`cwl_wes.custom_config.CustomConfig` instance + describing custom configuration model for cwl-WES specific + configurations. + collection: Collection client for saving task run progress. + tes_config: TES (Task Execution Service) endpoint configurations. + authorization: Boolean to define the security auth configuration for + the app. + string_format: String time format for task timestamps. + + Attributes: + task: Celery task instance for initiating workflow run. + task_id: Unique identifier for workflow run task. + command_list: List of commands to be executed as a part of workflow run. + tmp_dir: Current working directory to be passed for child process execution + context. + token: JSON Web Token (JWT). + foca_config: :py:class:`foca.models.config.Config` instance + describing configurations registered with `celery_app`. + custom_config: :py:class:`cwl_wes.custom_config.CustomConfig` instance + describing custom configuration model for cwl-WES specific + configurations. + collection: Collection client for saving task run progress. + tes_config: TES (Task Execution Service) endpoint configurations. + authorization: Boolean to define the security auth configuration for + the app. + string_format: String time format for task timestamps. + """ + self.task = task + self.task_id = self.task.request.id + self.command_list = command_list + self.tmp_dir = tmp_dir + self.token = token + self.foca_config: Config = celery_app.conf.foca + self.controller_config = self.foca_config.custom.controller + self.collection = self.foca_config.db.dbs['cwl-wes-db'].collections['runs'].client + self.tes_config= { + 'url': self.controller_config.tes_server.url, + 'query_params': self.controller_config.tes_server.status_query_params, + 'timeout': self.controller_config.tes_server.timeout + } + self.authorization = self.foca_config.security.auth.required + self.string_format: str = '%Y-%m-%d %H:%M:%S.%f' + + def trigger_task_start_events(self) -> None: + """Method to trigger task start events. + """ + if not self.collection.find_one({'task_id': self.task.request.id}): + return None + internal = dict() + current_ts = time.time() + internal['task_started'] = datetime.utcfromtimestamp( + current_ts + ) + # Update run document in database + try: + self.update_run_document( + state='RUNNING', + internal=internal, + task_started=datetime.utcfromtimestamp( + current_ts + ).strftime(self.string_format), + ) + except Exception as e: + logger.exception( + ( + 'Database error. Could not update log information for ' + "task '{task}'. Original error message: {type}: {msg}" + ).format( + task=self.task_id, + type=type(e).__name__, + msg=e, + ) + ) + + def trigger_task_failure_events(self, task_end_ts): + """Method to trigger task failure events. + """ + if not self.collection.find_one({'task_id': self.task_id}): + return None + + # Create dictionary for internal parameters + internal = dict() + internal['task_finished'] = datetime.utcfromtimestamp( + task_end_ts + ) + task_meta_data = celery_app.AsyncResult(id=self.task_id) + internal['traceback'] = task_meta_data.traceback + + # Update run document in databse + self.update_run_document( + state='SYSTEM_ERROR', + internal=internal, + task_finished=datetime.utcfromtimestamp( + task_end_ts + ).strftime(self.string_format), + exception=task_meta_data.result, + ) + + def trigger_task_success_events( + self, + returncode: int, + log: str, + tes_ids: List[str], + token: str, + task_end_ts: float + ) -> None: + """Method to trigger task success events. + + Args: + returncode: Task completion status code. + log: Task run log. + tes_ids: TES task identifiers. + token: TES token. + task_end_ts: Task end timestamp. + """ + if not self.collection.find_one({'task_id': self.task_id}): + return None + + # Parse subprocess results + try: + log_list = log + log = os.linesep.join(log) + except Exception as e: + logger.exception( + ( + "Field 'result' in event message malformed. Original " + 'error message: {type}: {msg}' + ).format( + type=type(e).__name__, + msg=e, + ) + ) + pass + + # Create dictionary for internal parameters + internal = dict() + internal['task_finished'] = datetime.utcfromtimestamp( + task_end_ts + ) + + # Set final state to be set + document = self.collection.find_one( + filter={'task_id': self.task_id}, + projection={ + 'api.state': True, + '_id': False, + } + ) + if document and document['api']['state'] == 'CANCELING': + state = 'CANCELED' + elif returncode: + state = 'EXECUTOR_ERROR' + else: + state = 'COMPLETE' + + # Extract run outputs + cwl_tes_processor = CWLTesProcessor(tes_config=self.tes_config) + outputs = cwl_tes_processor.__cwl_tes_outputs_parser_list(log=log_list) + + # Get task logs + task_logs = cwl_tes_processor.__get_tes_task_logs( + tes_ids=tes_ids, + token=token, + ) + + # Update run document in database + try: + self.update_run_document( + state=state, + internal=internal, + outputs=outputs, + task_logs=task_logs, + task_finished=datetime.utcfromtimestamp( + task_end_ts + ).strftime(self.string_format), + return_code=returncode, + stdout=log, + stderr='', + ) + except Exception as e: + logger.exception( + ( + 'Database error. Could not update log information for ' + "task '{task}'. Original error message: {type}: {msg}" + ).format( + task=self.task_id, + type=type(e).__name__, + msg=e, + ) + ) + pass + + def trigger_task_end_events( + self, + returncode: int, + log: str, + tes_ids: List[str], + token: str + ) -> None: + """Method to trigger task completion events. + + Args: + returncode: Task completion status code. + log: Task run log. + tes_ids: TES task identifiers. + token: TES token. + task_end_ts: Task end timestamp. + """ + task_end_ts = time.time() + if returncode == 0: + self.trigger_task_success_events( + log=log, tes_ids=tes_ids, token=token, + task_end_ts=task_end_ts, returncode=returncode + ) + else: + self.trigger_task_failure_events(task_end_ts=task_end_ts) + + def update_run_document( + self, + state: Optional[str] = None, + internal: Optional[Dict] = None, + outputs: Optional[Dict] = None, + task_logs: Optional[List[Dict]] = None, + **run_log_params + ): + """Updates state, internal and run log parameters in database + document. + + Args: + state: Task state. + internal: Task specific internal parameters. + outputs: Task specific output parameters. + task_logs: Task run logs. + + """ + # TODO: Minimize db ops; try to compile entire object & update once + # Update internal parameters + if internal: + document = db_utils.upsert_fields_in_root_object( + collection=self.collection, + task_id=self.task_id, + root='internal', + **internal, + ) + + # Update outputs + if outputs: + document = db_utils.upsert_fields_in_root_object( + collection=self.collection, + task_id=self.task_id, + root='api.outputs', + **outputs, + ) + + # Update task logs + if task_logs: + document = db_utils.upsert_fields_in_root_object( + collection=self.collection, + task_id=self.task_id, + root='api', + task_logs=task_logs, + ) + + # Update run log parameters + if run_log_params: + document = db_utils.upsert_fields_in_root_object( + collection=self.collection, + task_id=self.task_id, + root='api.run_log', + **run_log_params, + ) + + # Calculate queue, execution and run time + if document and document['internal']: + run_log = document['internal'] + durations = dict() + + if 'task_started' in run_log_params: + if 'task_started' in run_log and 'task_received' in run_log: + pass + durations['time_queue'] = ( + run_log['task_started'] - run_log['task_received'] + ).total_seconds() + + if 'task_finished' in run_log_params: + if 'task_finished' in run_log and 'task_started' in run_log: + pass + durations['time_execution'] = ( + run_log['task_finished'] - run_log['task_started'] + ).total_seconds() + if 'task_finished' in run_log and 'task_received' in run_log: + pass + durations['time_total'] = ( + run_log['task_finished'] - run_log['task_received'] + ).total_seconds() + + if durations: + document = db_utils.upsert_fields_in_root_object( + collection=self.collection, + task_id=self.task_id, + root='api.run_log', + **durations, + ) + + # Update state + if state: + try: + document = db_utils.update_run_state( + collection=self.collection, + task_id=self.task_id, + state=state, + ) + except Exception: + raise + + # Log info message + if document: + logger.info( + ( + "State of run '{run_id}' (task id: '{task_id}') changed " + "to '{state}'." + ).format( + run_id=document['run_id'], + task_id=self.task_id, + state=state, + ) + ) + + return document + + + def run_workflow(self): + """Method to initiate workflow run. + """ + self.trigger_task_start_events() + proc = subprocess.Popen( + self.command_list, + cwd=self.tmp_dir, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + ) + # Parse output in real-time + cwl_log_processor = CWLLogProcessor(tes_config=self.tes_config, collection=self.collection) + log, tes_ids = cwl_log_processor.process_cwl_logs( + self.task, + stream=proc.stdout, + token=self.token, + ) + returncode = proc.wait() + self.trigger_task_end_events( + token=self.token, + returncode=returncode, + log=log, tes_ids=tes_ids + ) \ No newline at end of file