diff --git a/agent/job.py b/agent/job.py index b9c88055..a2ea737a 100644 --- a/agent/job.py +++ b/agent/job.py @@ -172,6 +172,7 @@ def wrapper(wrapped, instance: Base, args, kwargs): kwargs=kwargs, timeout=4 * 3600, result_ttl=24 * 3600, + job_id=str(instance.job_record.model.id), ) return instance.job_record.model.id diff --git a/agent/web.py b/agent/web.py index 1aa92355..f24be908 100644 --- a/agent/web.py +++ b/agent/web.py @@ -12,6 +12,8 @@ from flask import Flask, Response, jsonify, request from passlib.hash import pbkdf2_sha256 as pbkdf2 from playhouse.shortcuts import model_to_dict +from rq.exceptions import NoSuchJobError +from rq.job import Job, JobStatus from agent.builder import ImageBuilder, get_image_build_context_directory from agent.database import JSONEncoderForSQLQueryResult @@ -1123,10 +1125,38 @@ def proxysql_remove_user(username): return {"job": job} +def get_status_from_rq(job, redis): + RQ_STATUS_MAP = { + JobStatus.QUEUED: "Pending", + JobStatus.FINISHED: "Success", + JobStatus.FAILED: "Failure", + JobStatus.STARTED: "Running", + JobStatus.DEFERRED: "Pending", + JobStatus.SCHEDULED: "Pending", + JobStatus.STOPPED: "Failure", + JobStatus.CANCELED: "Failure", + } + status = None + try: + rq_status = Job.fetch(str(job["id"]), connection=redis).get_status() + status = RQ_STATUS_MAP.get(rq_status) + except NoSuchJobError: + # Handle jobs enqueued before we started setting job_id + pass + return status + + def to_dict(model): redis = connection() if isinstance(model, JobModel): job = model_to_dict(model, backrefs=True) + status_from_rq = get_status_from_rq(job, redis) + if status_from_rq: + # Override status from JobModel if rq says the job is already ended + TERMINAL_STATUSES = ["Success", "Failure"] + if job["status"] not in TERMINAL_STATUSES and status_from_rq in TERMINAL_STATUSES: + job["status"] = status_from_rq + job["data"] = json.loads(job["data"]) or {} job_key = f"agent:job:{job['id']}" job["commands"] = [json.loads(command) for command in redis.lrange(job_key, 0, -1)]