From 9282a46933f319eae6bc4388dcd439ede97e7559 Mon Sep 17 00:00:00 2001 From: Vladyslav Moisieienkov Date: Mon, 4 Apr 2022 12:45:51 +0200 Subject: [PATCH] job-status-consumer, log DB status of "not alive" workflows closes #437 --- reana_workflow_controller/consumer.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index 3f37027e..b1284865 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -87,22 +87,19 @@ def on_message(self, body, message): Session.query(Workflow) .filter( Workflow.id_ == workflow_uuid, - Workflow.status.in_(ALIVE_STATUSES), ) .one_or_none() ) - if workflow: + if workflow and workflow.status in ALIVE_STATUSES: next_status = body_dict.get("status") if next_status: next_status = RunStatus(next_status) logging.info( - " [x] Received workflow_uuid: {0} status: {1}".format( - workflow_uuid, next_status - ) + f" [x] Received workflow_uuid: {workflow_uuid} status: {next_status}" ) - logs = body_dict.get("logs") or "" if workflow.can_transition_to(next_status): + logs = body_dict.get("logs") or "" _update_workflow_status(workflow, next_status, logs) if "message" in body_dict and body_dict.get("message"): msg = body_dict["message"] @@ -119,17 +116,21 @@ def on_message(self, body, message): f" from status {workflow.status} to" f" {next_status}." ) - elif workflow_uuid: + elif workflow and workflow.status not in ALIVE_STATUSES: logging.warning( - "Event for not alive workflow {workflow_uuid} received:\n" - "{body}\n" - "Ignoring ...".format(workflow_uuid=workflow_uuid, body=body) + f"Event for not alive workflow {workflow.id_} with DB status {workflow.status} received:\n" + f"{body}\nIgnoring..." + ) + else: + logging.warning( + f"Event for workflow {workflow_uuid} that doesn't exist in DB received:\n" + f"{body}\nIgnoring..." ) except REANAWorkflowControllerError as rwce: logging.error(rwce, exc_info=True) except SQLAlchemyError as sae: logging.error( - f"Something went wrong while querying the database for workflow: {workflow.id_}" + f"Something went wrong while querying the database for workflow: {workflow_uuid}" ) logging.error(sae, exc_info=True) except Exception as e: