@@ -182,7 +182,7 @@ def clean_registries(self) -> None:
182
182
def _install_signal_handlers (self ) -> None :
183
183
"""Installs signal handlers for handling SIGINT and SIGTERM gracefully."""
184
184
if threading .current_thread () is not threading .main_thread ():
185
- self .log (DEBUG , f "Running in a thread, skipping signal handlers installation" )
185
+ self .log (DEBUG , "Running in a thread, skipping signal handlers installation" )
186
186
return
187
187
signal .signal (signal .SIGINT , self .request_stop )
188
188
signal .signal (signal .SIGTERM , self .request_stop )
@@ -213,14 +213,14 @@ def work(self, max_jobs: Optional[int] = None, max_idle_time: Optional[int] = No
213
213
self .run_maintenance_tasks ()
214
214
215
215
if self ._model .shutdown_requested_date :
216
- self .log (INFO , f "stopping on request" )
216
+ self .log (INFO , "stopping on request" )
217
217
break
218
218
219
219
timeout = None if self .burst else (SCHEDULER_CONFIG .DEFAULT_WORKER_TTL - 15 )
220
220
job , queue = self .dequeue_job_and_maintain_ttl (timeout , max_idle_time )
221
221
if job is None or queue is None :
222
222
if self .burst :
223
- self .log (INFO , f "done, quitting" )
223
+ self .log (INFO , "done, quitting" )
224
224
break
225
225
elif max_idle_time is not None :
226
226
self .log (INFO , f"idle for { max_idle_time } seconds, quitting" )
@@ -239,13 +239,13 @@ def work(self, max_jobs: Optional[int] = None, max_idle_time: Optional[int] = No
239
239
return self ._model .completed_jobs > 0
240
240
241
241
except TimeoutErrorTypes :
242
- self .log (ERROR , f "Redis connection timeout, quitting..." )
242
+ self .log (ERROR , "Redis connection timeout, quitting..." )
243
243
except StopRequested :
244
- self .log (INFO , f "Worker was requested to stop, quitting" )
244
+ self .log (INFO , "Worker was requested to stop, quitting" )
245
245
except SystemExit : # Cold shutdown detected
246
246
raise
247
247
except Exception :
248
- self .log (ERROR , f "found an unhandled exception, quitting..." , exc_info = True )
248
+ self .log (ERROR , "found an unhandled exception, quitting..." , exc_info = True )
249
249
finally :
250
250
self .teardown ()
251
251
return False
@@ -264,10 +264,10 @@ def handle_job_failure(self, job: JobModel, queue: Queue, exc_string: str = "")
264
264
stopped_job_name = self ._model .get_field ("stopped_job_name" , self .connection )
265
265
self ._model .current_job_name = None
266
266
if stopped_job_name == job .name :
267
- self .log (DEBUG , f "Job was stopped, setting status to STOPPED" )
267
+ self .log (DEBUG , "Job was stopped, setting status to STOPPED" )
268
268
new_job_status = JobStatus .STOPPED
269
269
else :
270
- self .log (DEBUG , f "Job has failed, setting status to FAILED" )
270
+ self .log (DEBUG , "Job has failed, setting status to FAILED" )
271
271
new_job_status = JobStatus .FAILED
272
272
273
273
queue .job_handle_failure (new_job_status , job , exc_string )
@@ -309,12 +309,12 @@ def _check_for_suspension(self, burst: bool) -> None:
309
309
while self ._model .is_suspended :
310
310
if burst :
311
311
self .log (
312
- INFO , f "Suspended in burst mode, exiting, Note: There could still be unfinished jobs on the queue"
312
+ INFO , "Suspended in burst mode, exiting, Note: There could still be unfinished jobs on the queue"
313
313
)
314
314
raise StopRequested ()
315
315
316
316
if not notified :
317
- self .log (INFO , f "Worker suspended, trigger ResumeCommand" )
317
+ self .log (INFO , "Worker suspended, trigger ResumeCommand" )
318
318
before_state = self ._model .state
319
319
self ._model .set_field ("state" , WorkerStatus .SUSPENDED , connection = self .connection )
320
320
notified = True
@@ -332,14 +332,14 @@ def run_maintenance_tasks(self) -> None:
332
332
if not self .with_scheduler :
333
333
return
334
334
if self .scheduler is None and self .with_scheduler :
335
- self .log (DEBUG , f "Creating scheduler" )
335
+ self .log (DEBUG , "Creating scheduler" )
336
336
self .scheduler = WorkerScheduler (self .queues , worker_name = self .name , connection = self .connection )
337
337
if self .scheduler .status == SchedulerStatus .STOPPED :
338
- self .log (DEBUG , f "Starting scheduler thread" )
338
+ self .log (DEBUG , "Starting scheduler thread" )
339
339
self .scheduler .start ()
340
340
self ._model .has_scheduler = True
341
341
if self .burst :
342
- self .log (DEBUG , f "Stopping scheduler thread (burst mode)" )
342
+ self .log (DEBUG , "Stopping scheduler thread (burst mode)" )
343
343
self .scheduler .request_stop_and_wait ()
344
344
self ._model .has_scheduler = False
345
345
self ._model .save (connection = self .connection )
@@ -407,7 +407,7 @@ def _validate_name_uniqueness(self) -> None:
407
407
408
408
def worker_start (self ) -> None :
409
409
"""Registers its own birth."""
410
- self .log (DEBUG , f "Registering birth" )
410
+ self .log (DEBUG , "Registering birth" )
411
411
now = utcnow ()
412
412
self ._model .birth = now
413
413
self ._model .last_heartbeat = now
@@ -426,7 +426,7 @@ def _kill_job_execution_process(self, sig: signal.Signals = SIGKILL) -> None:
426
426
except OSError as e :
427
427
if e .errno != errno .ESRCH : # "No such process" is fine with us
428
428
raise
429
- self .log (DEBUG , f "Job execution process already dead" )
429
+ self .log (DEBUG , "Job execution process already dead" )
430
430
431
431
def _wait_for_job_execution_process (self ) -> Tuple [Optional [int ], Optional [int ]]:
432
432
"""Waits for the job execution process to complete.
@@ -449,10 +449,10 @@ def request_force_stop(self, signum: int, frame: Optional[FrameType]) -> None:
449
449
# when a user hits Ctrl+C. In this case, if we receive the second signal within 1 second, we ignore it.
450
450
shutdown_date = self ._model .shutdown_requested_date
451
451
if shutdown_date is not None and (utcnow () - shutdown_date ) < timedelta (seconds = 1 ):
452
- self .log (DEBUG , f "Shutdown signal ignored, received twice in less than 1 second" )
452
+ self .log (DEBUG , "Shutdown signal ignored, received twice in less than 1 second" )
453
453
return
454
454
455
- self .log (WARNING , f "Could shut down" )
455
+ self .log (WARNING , "Could shut down" )
456
456
457
457
# Take down the job execution process with the worker
458
458
if self ._model .job_execution_process_pid :
@@ -472,7 +472,7 @@ def request_stop(self, signum: int, frame: Optional[FrameType]) -> None:
472
472
signal .signal (signal .SIGINT , self .request_force_stop )
473
473
signal .signal (signal .SIGTERM , self .request_force_stop )
474
474
475
- self .log (INFO , f "warm shut down requested" )
475
+ self .log (INFO , "warm shut down requested" )
476
476
477
477
self .stop_scheduler ()
478
478
# If shutdown is requested in the middle of a job, wait until finish before shutting down and save the request.
@@ -481,7 +481,7 @@ def request_stop(self, signum: int, frame: Optional[FrameType]) -> None:
481
481
482
482
self .log (
483
483
DEBUG ,
484
- f "Stopping after current job execution process is finished. Press Ctrl+C again for a cold shutdown." ,
484
+ "Stopping after current job execution process is finished. Press Ctrl+C again for a cold shutdown." ,
485
485
)
486
486
else :
487
487
raise StopRequested ()
@@ -526,7 +526,7 @@ def stop_scheduler(self) -> None:
526
526
return
527
527
self .log (INFO , f"Stopping scheduler thread { self .scheduler .pid } " )
528
528
self .scheduler .request_stop_and_wait ()
529
- self .log (DEBUG , f "Scheduler thread stopped" )
529
+ self .log (DEBUG , "Scheduler thread stopped" )
530
530
self .scheduler = None
531
531
532
532
def refresh (self , update_queues : bool = False ) -> None :
@@ -626,10 +626,10 @@ def monitor_job_execution_process(self, job: JobModel, queue: Queue) -> None:
626
626
stopped_job_name = self ._model .get_field ("stopped_job_name" , self .connection )
627
627
628
628
if job_status is None :
629
- self .log (WARNING , f "Job status is None, completed and expired?" )
629
+ self .log (WARNING , "Job status is None, completed and expired?" )
630
630
return
631
631
elif stopped_job_name == job .name : # job execution process killed deliberately
632
- self .log (WARNING , f "Job stopped by user, moving job to failed-jobs-registry" )
632
+ self .log (WARNING , "Job stopped by user, moving job to failed-jobs-registry" )
633
633
job .call_stopped_callback ()
634
634
self .handle_job_failure (
635
635
job , queue = queue , exc_string = "Job stopped by user, job execution process terminated."
0 commit comments