@@ -281,7 +281,7 @@ def handle_job_failure(self, job: JobModel, queue: Queue, exc_string: str = "")
281
281
if job .status == JobStatus .FAILED :
282
282
self ._model .failed_job_count += 1
283
283
self ._model .completed_jobs += 1
284
- if job .started_at and job .ended_at :
284
+ if job .started_at is not None and job .ended_at is not None :
285
285
self ._model .total_working_time_ms += (job .ended_at - job .started_at ).microseconds / 1000.0
286
286
self ._model .save (connection = queue .connection )
287
287
@@ -613,9 +613,11 @@ def monitor_job_execution_process(self, job: JobModel, queue: Queue) -> None:
613
613
break
614
614
except JobExecutionMonitorTimeoutException :
615
615
# job execution process has not exited yet and is still running. Send a heartbeat to keep the worker alive.
616
- working_time = (utcnow () - job .started_at ).total_seconds ()
617
- self ._model .set_current_job_working_time (working_time , self .connection )
618
-
616
+ if job .started_at is not None :
617
+ working_time = (utcnow () - job .started_at ).total_seconds ()
618
+ self ._model .set_current_job_working_time (working_time , self .connection )
619
+ else :
620
+ logger .warning ("[Worker {self.name}/{self._pid}]: job.started_at is None, cannot set working time" )
619
621
# Kill the job from this side if something is really wrong (interpreter lock/etc).
620
622
if job .timeout != - 1 and self ._model .current_job_working_time > (job .timeout + 60 ):
621
623
self ._model .heartbeat (self .connection , self .job_monitoring_interval + 60 )
@@ -703,7 +705,7 @@ def execute_in_separate_process(self, job: JobModel, queue: Queue) -> None:
703
705
random .seed ()
704
706
self .setup_job_execution_process_signals ()
705
707
self ._is_job_execution_process = True
706
- job = JobModel .get (job .name , self .connection )
708
+ job = JobModel .get (job .name , queue .connection )
707
709
try :
708
710
self .perform_job (job , queue )
709
711
except : # noqa
0 commit comments