Skip to content

Commit

Permalink
Merge pull request #155 from frappe/fix-rq-timeout
Browse files Browse the repository at this point in the history
fix(rq): Use rq job status for timed-out and killed jobs
  • Loading branch information
adityahase authored Jan 20, 2025
2 parents a7e806f + 13406e8 commit 54561ec
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 0 deletions.
1 change: 1 addition & 0 deletions agent/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def wrapper(wrapped, instance: Base, args, kwargs):
kwargs=kwargs,
timeout=4 * 3600,
result_ttl=24 * 3600,
job_id=str(instance.job_record.model.id),
)
return instance.job_record.model.id

Expand Down
30 changes: 30 additions & 0 deletions agent/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from flask import Flask, Response, jsonify, request
from passlib.hash import pbkdf2_sha256 as pbkdf2
from playhouse.shortcuts import model_to_dict
from rq.exceptions import NoSuchJobError
from rq.job import Job, JobStatus

from agent.builder import ImageBuilder, get_image_build_context_directory
from agent.database import JSONEncoderForSQLQueryResult
Expand Down Expand Up @@ -1123,10 +1125,38 @@ def proxysql_remove_user(username):
return {"job": job}


def get_status_from_rq(job, redis):
RQ_STATUS_MAP = {
JobStatus.QUEUED: "Pending",
JobStatus.FINISHED: "Success",
JobStatus.FAILED: "Failure",
JobStatus.STARTED: "Running",
JobStatus.DEFERRED: "Pending",
JobStatus.SCHEDULED: "Pending",
JobStatus.STOPPED: "Failure",
JobStatus.CANCELED: "Failure",
}
status = None
try:
rq_status = Job.fetch(str(job["id"]), connection=redis).get_status()
status = RQ_STATUS_MAP.get(rq_status)
except NoSuchJobError:
# Handle jobs enqueued before we started setting job_id
pass
return status


def to_dict(model):
redis = connection()
if isinstance(model, JobModel):
job = model_to_dict(model, backrefs=True)
status_from_rq = get_status_from_rq(job, redis)
if status_from_rq:
# Override status from JobModel if rq says the job is already ended
TERMINAL_STATUSES = ["Success", "Failure"]
if job["status"] not in TERMINAL_STATUSES and status_from_rq in TERMINAL_STATUSES:
job["status"] = status_from_rq

job["data"] = json.loads(job["data"]) or {}
job_key = f"agent:job:{job['id']}"
job["commands"] = [json.loads(command) for command in redis.lrange(job_key, 0, -1)]
Expand Down

0 comments on commit 54561ec

Please sign in to comment.