Skip to content

Commit

Permalink
Bug fix in process termination check
Browse files Browse the repository at this point in the history
  • Loading branch information
crisingulani committed Sep 24, 2024
1 parent 5415abd commit 64c20a4
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 87 deletions.
144 changes: 71 additions & 73 deletions backend/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,111 +11,109 @@
from django.conf import settings
from django.utils import dateparse, timezone

logger = logging.getLogger("beat")
logger = logging.getLogger()


@shared_task()
def check_stopping():
logger.info("Checking processes stopping...")
def check_processes():
""" Checks the processing status in Orchestration and update the PZ Server
database with processing information.
procs_updated = []
monitoring_statuses = ["Stopping"]
procs_stopping = Process.objects.filter(status__in=monitoring_statuses)

for proc in procs_stopping:
logger.info(f"Consulting the {str(proc)} process status.")
proc_orches_id = proc.orchestration_process_id # type: ignore

if not proc_orches_id:
message = f"Process {str(proc.pk)} without Orchestration ID."
Returns:
bool: True, if an update was made. False, if no update was made.
"""

monitoring_statuses = ["Stopping", "Pending", "Running"]
logger.info(f"Monitoring the following statuses: {monitoring_statuses}")

processes = Process.objects.filter(status__in=monitoring_statuses)

if not processes:
return False

maestro = Maestro(settings.ORCHEST_URL)

for process in processes:
logger.info(f"Consulting the {process} process status.")
process_orch_id = process.orchestration_process_id # type: ignore

if not process_orch_id:
message = f"Process {str(process.pk)} without Orchestration ID."
logger.error(message)
proc.status = "Failed"
proc = update_dates(proc, {})
proc.save()
process = update_process_info(process, "Failed", {})
continue

maestro = Maestro(settings.ORCHEST_URL)
proc_orchest = maestro.status(proc_orches_id)
proc_orchest_status = proc_orchest.get("status") # type: ignore

logger.info(f"-> Process orchestration ID: {proc_orches_id}")
logger.info(f"-> Status: {proc_orchest_status}")

if not proc_orchest_status in monitoring_statuses:
proc.status = proc_orchest_status
proc.save()
logger.info(f"-> Process {str(proc)} updated.")
procs_updated.append(proc_orches_id)
process_orch = maestro.status(process_orch_id)
process_orch_status = process_orch.get("status") # type: ignore

return procs_updated
logger.debug(f"-> Process orchestration ID: {process_orch_id}")
logger.debug(f"-> Status: {process_orch_status}")

if process_orch_status == "Running" and process.status == "Pending":
started_at = process_orch.get("started_at", str(process.created_at))
process.started_at = dateparse.parse_datetime(started_at)
process.status = process_orch_status
process.save()

@shared_task()
def check_processes_finish():
logger.info("Checking running processes...")

procs_updated = []
active_statuses = ["Pending", "Running"]
procs_running = Process.objects.filter(status__in=active_statuses)

for proc in procs_running:
logger.info(f"Consulting the {str(proc)} process status.")
proc_orches_id = proc.orchestration_process_id # type: ignore
if process_orch_status == "Successful":
register_outputs(process.pk)

if process_orch_status != process.status:
process = update_process_info(
process=process,
process_orch_status=process_orch_status,
data=process_orch
)
logger.info(
f"{process} has been updated (new status: {process.status})"
)

if not proc_orches_id:
message = f"Process {str(proc.pk)} without Orchestration ID."
logger.error(message)
proc.status = "Failed"
proc = update_dates(proc, {})
proc.save()
continue

return True

maestro = Maestro(settings.ORCHEST_URL)
proc_orchest = maestro.status(proc_orches_id)
proc_orchest_status = proc_orchest.get("status") # type: ignore

logger.info(f"-> Process orchestration ID: {proc_orches_id}")
logger.info(f"-> Status: {proc_orchest_status}")

if proc_orchest_status == "Running" and proc.status == "Pending":
started_at = proc_orchest.get("started_at", str(proc.created_at))
proc.started_at = dateparse.parse_datetime(started_at)
proc.status = proc_orchest_status
proc.save()

if not proc_orchest_status in active_statuses:
proc.status = proc_orchest_status
proc = update_dates(proc, proc_orchest)
proc.save()
register_outputs(proc.pk)
logger.info(f"-> Process {str(proc)} updated.")
procs_updated.append(proc_orches_id)

return procs_updated
def update_process_info(process, process_orch_status, data):
""" Updates basic process information
Args:
process (Process): process object
process_orch_status (str): process orchestration status
data (dict): process info
def update_dates(process, data):
Returns:
Process: process object
"""
started_at = data.get("started_at", str(process.created_at))
ended_at = data.get("ended_at", str(timezone.now()))

if not ended_at:
ended_at = str(timezone.now())

process.started_at = dateparse.parse_datetime(started_at)
process.ended_at = dateparse.parse_datetime(ended_at)
process.status = process_orch_status
process.save()
return process


def register_outputs(process_id):
"""_summary_
""" Records the outputs in the database
Args:
process_id (_type_): _description_
process_id (int): process ID
"""

logger.info(f"[process {process_id}]: starting upload registration...")

file_roles = dict(FileRoles.choices)
file_roles = {str(v).lower(): k for k, v in file_roles.items()}

process = Process.objects.get(pk=process_id)
process_dir = pathlib.Path(settings.PROCESSING_DIR, process.path)
process_file = process_dir.joinpath("process.yml")

logger.debug(f"[process {process_id}]: info filepath {process_file}")

reg_product = RegistryProduct(process.upload.pk)

process_file_dict = load_yaml(process_file)
Expand All @@ -135,17 +133,17 @@ def register_outputs(process_id):
process.upload.status = 1 # Published status
process.upload.save()
process.save()
logger.info(f"[process {process_id}]: registration completed!")
except Exception as _:
process.upload.status = 9 # Failed status
process.upload.save()
process.save()
logger.exception("Failed to upload register!")
logger.exception(f"[process {process_id}]: Failed to upload register!")


def copy_upload(filepath, upload_dir):
filepath = pathlib.Path(filepath)
new_filepath = pathlib.Path(settings.MEDIA_ROOT, upload_dir, filepath.name)
logger.debug("new_filepath -> %s", str(new_filepath))
shutil.copyfile(str(filepath), str(new_filepath))
return str(new_filepath)

Expand Down
8 changes: 2 additions & 6 deletions backend/pzserver/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,10 @@

# https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html
app.conf.beat_schedule = {
"check-finish": {
"task": "core.tasks.check_processes_finish",
"check-processes": {
"task": "core.tasks.check_processes",
"schedule": 60.0,
},
"check-stopping": {
"task": "core.tasks.check_stopping",
"schedule": 120.0,
},
}
app.conf.timezone = "UTC"

Expand Down
8 changes: 0 additions & 8 deletions backend/pzserver/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,6 @@
"backupCount": 5,
"formatter": "standard",
},
"beat": {
"level": LOGGING_LEVEL,
"class": "logging.handlers.RotatingFileHandler",
"filename": os.path.join(LOG_DIR, "celerybeat.log"),
},
"saml": {
"level": LOGGING_LEVEL,
"class": "logging.handlers.RotatingFileHandler",
Expand Down Expand Up @@ -296,9 +291,6 @@
"level": LOGGING_LEVEL,
"propagate": True,
},
"beat": {
"handlers": ["beat"],
},
"saml": {
"handlers": ["saml"],
"level": LOGGING_LEVEL,
Expand Down

0 comments on commit 64c20a4

Please sign in to comment.