Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor job worker loop to prevent leaving orphaned errored jobs in the queue #1359

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 51 additions & 36 deletions angrmanagement/logic/jobmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,52 +56,69 @@ def __init__(self, job_manager: JobManager, id_: int):

def run(self) -> None:
while True:
# Add a small delay when the queue is empty to avoid busy waiting
if self.job_manager.jobs_queue.empty():
gui_thread_schedule(GlobalInfo.main_window.status_bar.progress_done, args=())
time.sleep(0.1)
continue

if (
any(job.blocking for job in self.job_manager.jobs)
and GlobalInfo.main_window is not None
and GlobalInfo.main_window.workspace
):
gui_thread_schedule(GlobalInfo.main_window.status_bar._progress_dialog.hide, args=())

job = self.job_manager.jobs_queue.get()
gui_thread_schedule_async(GlobalInfo.main_window.status_bar.progress, args=("Working...", 0.0, True))

# Show progress dialog if any job is blocking and the main window is visible
if any(job.blocking for job in self.job_manager.jobs) and GlobalInfo.main_window.isVisible():
gui_thread_schedule(GlobalInfo.main_window.status_bar._progress_dialog.show, args=())

# If job cancelled, then skip it
if job.state == JobState.CANCELLED:
# Get the next job from the queue
self.current_job = self.job_manager.jobs_queue.get()

# If the job is cancelled, skip it
if self.current_job.state == JobState.CANCELLED:
self.job_manager.jobs.remove(self.current_job)
self.current_job = None
continue

# Indicate that the job is running in the status bar
gui_thread_schedule_async(GlobalInfo.main_window.status_bar.progress, args=("Working...", 0.0, True))
self.job_manager.callback_worker_new_job(self.current_job)
log.info('Job "%s" started', self.current_job.name)

# Set up the job context
ctx = JobContext(self.job_manager, self.current_job)
ctx.set_progress(0)
self.current_job.state = JobState.RUNNING
self.current_job.start_at = time.time()

# Run the job
try:
self.current_job = job
self.job_manager.callback_worker_new_job(self.current_job)
ctx = JobContext(self.job_manager, job)
ctx.set_progress(0)
job.state = JobState.RUNNING
log.info('Job "%s" started', job.name)
job.start_at = time.time()
result = job.run(ctx)
if job.state != JobState.CANCELLED:
self.job_manager.callback_job_complete(self.current_job)
now = time.time()
duration = now - job.start_at
log.info('Job "%s" completed after %.2f seconds', job.name, duration)
except (Exception, KeyboardInterrupt) as e: # pylint: disable=broad-except
result = self.current_job.run(ctx)

except KeyboardInterrupt:
# Handle cancellation

self.current_job.state = JobState.CANCELLED
log.info('Job "%s" cancelled', self.current_job.name)

except Exception as e: # pylint: disable=broad-except
# Handle exceptions

sys.last_traceback = e.__traceback__
self.current_job = None
if job.state != JobState.CANCELLED:
job.state = JobState.FAILED
log.exception('Exception while running job "%s":', job.name)

self.current_job.state = JobState.FAILED
log.exception('Exception while running job "%s":', self.current_job.name)
if self.job_manager.job_worker_exception_callback is not None:
self.job_manager.job_worker_exception_callback(job, e)
self.job_manager.job_worker_exception_callback(self.current_job, e)

else:
job.state = JobState.FINISHED
# Handle successful completion

duration = time.time() - self.current_job.start_at
if self.current_job.state != JobState.CANCELLED:
self.job_manager.callback_job_complete(self.current_job)
log.info('Job "%s" completed after %.2f seconds', self.current_job.name, duration)
self.current_job.state = JobState.FINISHED
gui_thread_schedule_async(self.current_job.finish, args=(result,))

finally:
self.job_manager.jobs.remove(self.current_job)
self.current_job = None
self.job_manager.jobs.remove(job)
gui_thread_schedule_async(job.finish, args=(result,))

def keyboard_interrupt(self) -> None:
"""Called from the GUI thread when the user presses Ctrl+C or presses a cancel button"""
Expand Down Expand Up @@ -242,5 +259,3 @@ def callback_job_complete(self, job: Job):
if self.workspace.view_manager.first_view_in_category("jobs") is not None:
jobs_view = self.workspace.view_manager.first_view_in_category("jobs")
gui_thread_schedule_async(jobs_view.qjobs.change_job_finish, args=[job])

# Private methods
Loading