Skip to content

Commit 2605d0e

Browse files
committed
fix:timeout
1 parent 24c2d2b commit 2605d0e

File tree

1 file changed

+14
-14
lines changed

1 file changed

+14
-14
lines changed

scheduler/worker/worker.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -96,16 +96,16 @@ def from_model(cls, model: WorkerModel) -> Self:
9696
return res
9797

9898
def __init__(
99-
self,
100-
queues: Iterable[Union[str, Queue]],
101-
name: str,
102-
maintenance_interval: int = SCHEDULER_CONFIG.DEFAULT_MAINTENANCE_TASK_INTERVAL,
103-
job_monitoring_interval: int = SCHEDULER_CONFIG.DEFAULT_JOB_MONITORING_INTERVAL,
104-
dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT,
105-
fork_job_execution: bool = True,
106-
with_scheduler: bool = True,
107-
burst: bool = False,
108-
model: Optional[WorkerModel] = None,
99+
self,
100+
queues: Iterable[Union[str, Queue]],
101+
name: str,
102+
maintenance_interval: int = SCHEDULER_CONFIG.DEFAULT_MAINTENANCE_TASK_INTERVAL,
103+
job_monitoring_interval: int = SCHEDULER_CONFIG.DEFAULT_JOB_MONITORING_INTERVAL,
104+
dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT,
105+
fork_job_execution: bool = True,
106+
with_scheduler: bool = True,
107+
burst: bool = False,
108+
model: Optional[WorkerModel] = None,
109109
) -> None:
110110
self.fork_job_execution = fork_job_execution
111111
self.job_monitoring_interval: int = job_monitoring_interval
@@ -352,7 +352,7 @@ def run_maintenance_tasks(self) -> None:
352352
self._model.save(connection=self.connection)
353353

354354
def dequeue_job_and_maintain_ttl(
355-
self, timeout: Optional[int], max_idle_time: Optional[int] = None
355+
self, timeout: Optional[int], max_idle_time: Optional[int] = None
356356
) -> Tuple[Optional[JobModel], Optional[Queue]]:
357357
"""Dequeues a job while maintaining the TTL.
358358
:param timeout: The timeout for the dequeue operation.
@@ -523,7 +523,7 @@ def reorder_queues(self, reference_queue: Queue) -> None:
523523
return
524524
if self._dequeue_strategy == DequeueStrategy.ROUND_ROBIN:
525525
pos = self._ordered_queues.index(reference_queue)
526-
self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
526+
self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
527527
return
528528
if self._dequeue_strategy == DequeueStrategy.RANDOM:
529529
shuffle(self._ordered_queues)
@@ -607,7 +607,7 @@ def monitor_job_execution_process(self, job: JobModel, queue: Queue) -> None:
607607
while True:
608608
try:
609609
with SCHEDULER_CONFIG.DEATH_PENALTY_CLASS(
610-
self.job_monitoring_interval, JobExecutionMonitorTimeoutException
610+
self.job_monitoring_interval, JobExecutionMonitorTimeoutException
611611
):
612612
retpid, ret_val = self.wait_for_job_execution_process()
613613
break
@@ -835,7 +835,7 @@ class RoundRobinWorker(Worker):
835835

836836
def reorder_queues(self, reference_queue: Queue) -> None:
837837
pos = self._ordered_queues.index(reference_queue)
838-
self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
838+
self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
839839

840840

841841
class RandomWorker(Worker):

0 commit comments

Comments
 (0)