diff --git a/backend/core/tasks.py b/backend/core/tasks.py index 6aae24e..253a617 100644 --- a/backend/core/tasks.py +++ b/backend/core/tasks.py @@ -11,104 +11,100 @@ from django.conf import settings from django.utils import dateparse, timezone -logger = logging.getLogger("beat") +logger = logging.getLogger() @shared_task() -def check_stopping(): - logger.info("Checking processes stopping...") +def check_processes(): + """ Checks the processing status in Orchestration and update the PZ Server + database with processing information. - procs_updated = [] - monitoring_statuses = ["Stopping"] - procs_stopping = Process.objects.filter(status__in=monitoring_statuses) - - for proc in procs_stopping: - logger.info(f"Consulting the {str(proc)} process status.") - proc_orches_id = proc.orchestration_process_id # type: ignore - - if not proc_orches_id: - message = f"Process {str(proc.pk)} without Orchestration ID." + Returns: + bool: True, if an update was made. False, if no update was made. + """ + + monitoring_statuses = ["Stopping", "Pending", "Running"] + logger.info(f"Monitoring the following statuses: {monitoring_statuses}") + + processes = Process.objects.filter(status__in=monitoring_statuses) + + if not processes: + return False + + maestro = Maestro(settings.ORCHEST_URL) + + for process in processes: + logger.info(f"Consulting the {process} process status.") + process_orch_id = process.orchestration_process_id # type: ignore + + if not process_orch_id: + message = f"Process {str(process.pk)} without Orchestration ID." logger.error(message) - proc.status = "Failed" - proc = update_dates(proc, {}) - proc.save() + process = update_process_info(process, "Failed", {}) continue - maestro = Maestro(settings.ORCHEST_URL) - proc_orchest = maestro.status(proc_orches_id) - proc_orchest_status = proc_orchest.get("status") # type: ignore - - logger.info(f"-> Process orchestration ID: {proc_orches_id}") - logger.info(f"-> Status: {proc_orchest_status}") - - if not proc_orchest_status in monitoring_statuses: - proc.status = proc_orchest_status - proc.save() - logger.info(f"-> Process {str(proc)} updated.") - procs_updated.append(proc_orches_id) + process_orch = maestro.status(process_orch_id) + process_orch_status = process_orch.get("status") # type: ignore - return procs_updated + logger.debug(f"-> Process orchestration ID: {process_orch_id}") + logger.debug(f"-> Status: {process_orch_status}") + if process_orch_status == "Running" and process.status == "Pending": + started_at = process_orch.get("started_at", str(process.created_at)) + process.started_at = dateparse.parse_datetime(started_at) + process.status = process_orch_status + process.save() -@shared_task() -def check_processes_finish(): - logger.info("Checking running processes...") - - procs_updated = [] - active_statuses = ["Pending", "Running"] - procs_running = Process.objects.filter(status__in=active_statuses) - - for proc in procs_running: - logger.info(f"Consulting the {str(proc)} process status.") - proc_orches_id = proc.orchestration_process_id # type: ignore + if process_orch_status == "Successful": + register_outputs(process.pk) + + if process_orch_status != process.status: + process = update_process_info( + process=process, + process_orch_status=process_orch_status, + data=process_orch + ) + logger.info( + f"{process} has been updated (new status: {process.status})" + ) - if not proc_orches_id: - message = f"Process {str(proc.pk)} without Orchestration ID." - logger.error(message) - proc.status = "Failed" - proc = update_dates(proc, {}) - proc.save() - continue + + return True - maestro = Maestro(settings.ORCHEST_URL) - proc_orchest = maestro.status(proc_orches_id) - proc_orchest_status = proc_orchest.get("status") # type: ignore - logger.info(f"-> Process orchestration ID: {proc_orches_id}") - logger.info(f"-> Status: {proc_orchest_status}") - - if proc_orchest_status == "Running" and proc.status == "Pending": - started_at = proc_orchest.get("started_at", str(proc.created_at)) - proc.started_at = dateparse.parse_datetime(started_at) - proc.status = proc_orchest_status - proc.save() - - if not proc_orchest_status in active_statuses: - proc.status = proc_orchest_status - proc = update_dates(proc, proc_orchest) - proc.save() - register_outputs(proc.pk) - logger.info(f"-> Process {str(proc)} updated.") - procs_updated.append(proc_orches_id) - - return procs_updated +def update_process_info(process, process_orch_status, data): + """ Updates basic process information + Args: + process (Process): process object + process_orch_status (str): process orchestration status + data (dict): process info -def update_dates(process, data): + Returns: + Process: process object + """ started_at = data.get("started_at", str(process.created_at)) ended_at = data.get("ended_at", str(timezone.now())) + + if not ended_at: + ended_at = str(timezone.now()) + process.started_at = dateparse.parse_datetime(started_at) process.ended_at = dateparse.parse_datetime(ended_at) + process.status = process_orch_status + process.save() return process def register_outputs(process_id): - """_summary_ + """ Records the outputs in the database Args: - process_id (_type_): _description_ + process_id (int): process ID """ + logger.info(f"[process {process_id}]: starting upload registration...") + file_roles = dict(FileRoles.choices) file_roles = {str(v).lower(): k for k, v in file_roles.items()} @@ -116,6 +112,8 @@ def register_outputs(process_id): process_dir = pathlib.Path(settings.PROCESSING_DIR, process.path) process_file = process_dir.joinpath("process.yml") + logger.debug(f"[process {process_id}]: info filepath {process_file}") + reg_product = RegistryProduct(process.upload.pk) process_file_dict = load_yaml(process_file) @@ -135,17 +133,17 @@ def register_outputs(process_id): process.upload.status = 1 # Published status process.upload.save() process.save() + logger.info(f"[process {process_id}]: registration completed!") except Exception as _: process.upload.status = 9 # Failed status process.upload.save() process.save() - logger.exception("Failed to upload register!") + logger.exception(f"[process {process_id}]: Failed to upload register!") def copy_upload(filepath, upload_dir): filepath = pathlib.Path(filepath) new_filepath = pathlib.Path(settings.MEDIA_ROOT, upload_dir, filepath.name) - logger.debug("new_filepath -> %s", str(new_filepath)) shutil.copyfile(str(filepath), str(new_filepath)) return str(new_filepath) diff --git a/backend/pzserver/celery.py b/backend/pzserver/celery.py index 19d0ace..52687ab 100644 --- a/backend/pzserver/celery.py +++ b/backend/pzserver/celery.py @@ -15,14 +15,10 @@ # https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html app.conf.beat_schedule = { - "check-finish": { - "task": "core.tasks.check_processes_finish", + "check-processes": { + "task": "core.tasks.check_processes", "schedule": 60.0, }, - "check-stopping": { - "task": "core.tasks.check_stopping", - "schedule": 120.0, - }, } app.conf.timezone = "UTC" diff --git a/backend/pzserver/settings.py b/backend/pzserver/settings.py index cdd7909..2d84942 100644 --- a/backend/pzserver/settings.py +++ b/backend/pzserver/settings.py @@ -258,11 +258,6 @@ "backupCount": 5, "formatter": "standard", }, - "beat": { - "level": LOGGING_LEVEL, - "class": "logging.handlers.RotatingFileHandler", - "filename": os.path.join(LOG_DIR, "celerybeat.log"), - }, "saml": { "level": LOGGING_LEVEL, "class": "logging.handlers.RotatingFileHandler", @@ -296,9 +291,6 @@ "level": LOGGING_LEVEL, "propagate": True, }, - "beat": { - "handlers": ["beat"], - }, "saml": { "handlers": ["saml"], "level": LOGGING_LEVEL,