From 638b12a7e74f98065eed103cd4251e161c5d8996 Mon Sep 17 00:00:00 2001 From: Kevin Phoenix Date: Wed, 4 Dec 2024 13:45:32 -0700 Subject: [PATCH] Refactor job worker loop to prevent leaving orphaned errored jobs in the queue --- angrmanagement/logic/jobmanager.py | 87 +++++++++++++++++------------- 1 file changed, 51 insertions(+), 36 deletions(-) diff --git a/angrmanagement/logic/jobmanager.py b/angrmanagement/logic/jobmanager.py index 647c81e403..371c441a1a 100644 --- a/angrmanagement/logic/jobmanager.py +++ b/angrmanagement/logic/jobmanager.py @@ -56,52 +56,69 @@ def __init__(self, job_manager: JobManager, id_: int): def run(self) -> None: while True: + # Add a small delay when the queue is empty to avoid busy waiting if self.job_manager.jobs_queue.empty(): gui_thread_schedule(GlobalInfo.main_window.status_bar.progress_done, args=()) + time.sleep(0.1) + continue - if ( - any(job.blocking for job in self.job_manager.jobs) - and GlobalInfo.main_window is not None - and GlobalInfo.main_window.workspace - ): - gui_thread_schedule(GlobalInfo.main_window.status_bar._progress_dialog.hide, args=()) - - job = self.job_manager.jobs_queue.get() - gui_thread_schedule_async(GlobalInfo.main_window.status_bar.progress, args=("Working...", 0.0, True)) - + # Show progress dialog if any job is blocking and the main window is visible if any(job.blocking for job in self.job_manager.jobs) and GlobalInfo.main_window.isVisible(): gui_thread_schedule(GlobalInfo.main_window.status_bar._progress_dialog.show, args=()) - # If job cancelled, then skip it - if job.state == JobState.CANCELLED: + # Get the next job from the queue + self.current_job = self.job_manager.jobs_queue.get() + + # If the job is cancelled, skip it + if self.current_job.state == JobState.CANCELLED: + self.job_manager.jobs.remove(self.current_job) + self.current_job = None continue + + # Indicate that the job is running in the status bar + gui_thread_schedule_async(GlobalInfo.main_window.status_bar.progress, args=("Working...", 0.0, True)) + self.job_manager.callback_worker_new_job(self.current_job) + log.info('Job "%s" started', self.current_job.name) + + # Set up the job context + ctx = JobContext(self.job_manager, self.current_job) + ctx.set_progress(0) + self.current_job.state = JobState.RUNNING + self.current_job.start_at = time.time() + + # Run the job try: - self.current_job = job - self.job_manager.callback_worker_new_job(self.current_job) - ctx = JobContext(self.job_manager, job) - ctx.set_progress(0) - job.state = JobState.RUNNING - log.info('Job "%s" started', job.name) - job.start_at = time.time() - result = job.run(ctx) - if job.state != JobState.CANCELLED: - self.job_manager.callback_job_complete(self.current_job) - now = time.time() - duration = now - job.start_at - log.info('Job "%s" completed after %.2f seconds', job.name, duration) - except (Exception, KeyboardInterrupt) as e: # pylint: disable=broad-except + result = self.current_job.run(ctx) + + except KeyboardInterrupt: + # Handle cancellation + + self.current_job.state = JobState.CANCELLED + log.info('Job "%s" cancelled', self.current_job.name) + + except Exception as e: # pylint: disable=broad-except + # Handle exceptions + sys.last_traceback = e.__traceback__ - self.current_job = None - if job.state != JobState.CANCELLED: - job.state = JobState.FAILED - log.exception('Exception while running job "%s":', job.name) + + self.current_job.state = JobState.FAILED + log.exception('Exception while running job "%s":', self.current_job.name) if self.job_manager.job_worker_exception_callback is not None: - self.job_manager.job_worker_exception_callback(job, e) + self.job_manager.job_worker_exception_callback(self.current_job, e) + else: - job.state = JobState.FINISHED + # Handle successful completion + + duration = time.time() - self.current_job.start_at + if self.current_job.state != JobState.CANCELLED: + self.job_manager.callback_job_complete(self.current_job) + log.info('Job "%s" completed after %.2f seconds', self.current_job.name, duration) + self.current_job.state = JobState.FINISHED + gui_thread_schedule_async(self.current_job.finish, args=(result,)) + + finally: + self.job_manager.jobs.remove(self.current_job) self.current_job = None - self.job_manager.jobs.remove(job) - gui_thread_schedule_async(job.finish, args=(result,)) def keyboard_interrupt(self) -> None: """Called from the GUI thread when the user presses Ctrl+C or presses a cancel button""" @@ -242,5 +259,3 @@ def callback_job_complete(self, job: Job): if self.workspace.view_manager.first_view_in_category("jobs") is not None: jobs_view = self.workspace.view_manager.first_view_in_category("jobs") gui_thread_schedule_async(jobs_view.qjobs.change_job_finish, args=[job]) - - # Private methods