Skip to content

Commit 61cb395

Browse files
committed
fix:registries do not keep connections
1 parent c02517d commit 61cb395

File tree

6 files changed

+140
-127
lines changed

6 files changed

+140
-127
lines changed

docs/changelog.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## v4.0.8 🌈
4+
5+
### 🐛 Bug Fixes
6+
7+
- fix:registries do not keep connections
8+
39
## v4.0.7 🌈
410

511
### 🐛 Bug Fixes

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "django-tasks-scheduler"
7-
version = "4.0.7"
7+
version = "4.0.8"
88
description = "An async job scheduler for django using redis/valkey brokers"
99
authors = [{ name = "Daniel Moran", email = "[email protected]" }]
1010
requires-python = ">=3.10"

scheduler/tests/test_worker/test_scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def test_scheduler_schedules_tasks(self):
2727
self.assertFalse(task.rqueue.queued_job_registry.exists(task.rqueue.connection, task.job_name))
2828
self.assertTrue(task.rqueue.scheduled_job_registry.exists(task.rqueue.connection, task.job_name))
2929

30-
scheduler = WorkerScheduler([task.rqueue], worker_name="fake-worker", connection=task.rqueue.connection)
30+
scheduler = WorkerScheduler([task.rqueue], worker_name="fake-worker")
3131

3232
# act
3333
traveller.move_to(50)

scheduler/worker/scheduler.py

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,19 @@
33
import traceback
44
from datetime import datetime
55
from enum import Enum
6+
from logging import DEBUG, INFO
67
from threading import Thread
78
from typing import List, Set, Optional, Sequence, Dict
89

910
import django
1011

1112
from scheduler.helpers.queues import Queue
1213
from scheduler.helpers.queues import get_queue
14+
from scheduler.helpers.queues.getters import get_queue_connection
1315
from scheduler.helpers.utils import current_timestamp
1416
from scheduler.models import Task
1517
from scheduler.redis_models import SchedulerLock, JobModel, ScheduledJobRegistry
1618
from scheduler.settings import SCHEDULER_CONFIG, logger
17-
from scheduler.types import ConnectionType
1819

1920

2021
class SchedulerStatus(str, Enum):
@@ -31,22 +32,15 @@ def _reschedule_tasks() -> None:
3132

3233

3334
class WorkerScheduler:
34-
def __init__(
35-
self,
36-
queues: Sequence[Queue],
37-
connection: ConnectionType,
38-
worker_name: str,
39-
interval: Optional[int] = None,
40-
) -> None:
41-
interval = interval or SCHEDULER_CONFIG.SCHEDULER_INTERVAL
35+
def __init__(self, queues: Sequence[Queue], worker_name: str, interval: Optional[int] = None) -> None:
4236
self._queues = queues
37+
if len(queues) == 0:
38+
raise ValueError("At least one queue must be provided to WorkerScheduler")
4339
self._scheduled_job_registries: List[ScheduledJobRegistry] = []
4440
self.lock_acquisition_time: Optional[datetime] = None
45-
self._pool_class = connection.connection_pool.connection_class
46-
self._pool_kwargs = connection.connection_pool.connection_kwargs.copy()
4741
self._locks: Dict[str, SchedulerLock] = dict()
48-
self.connection = connection
49-
self.interval = interval
42+
self.connection = get_queue_connection(queues[0].name)
43+
self.interval = interval or SCHEDULER_CONFIG.SCHEDULER_INTERVAL
5044
self._stop_requested = False
5145
self.status = SchedulerStatus.STOPPED
5246
self._thread: Optional[Thread] = None
@@ -57,6 +51,9 @@ def __init__(
5751
def pid(self) -> Optional[int]:
5852
return self._pid
5953

54+
def log(self, level: int, message: str, *args, **kwargs) -> None:
55+
logger.log(level, f"[Scheduler {self.worker_name}/{self._pid}]: {message}", *args, **kwargs)
56+
6057
def _should_reacquire_locks(self) -> bool:
6158
"""Returns True if lock_acquisition_time is longer than 10 minutes ago"""
6259
if not self.lock_acquisition_time:
@@ -70,12 +67,10 @@ def _acquire_locks(self) -> Set[str]:
7067
if self.pid is None:
7168
self._pid = os.getpid()
7269
queue_names = [queue.name for queue in self._queues]
73-
logger.debug(
74-
f"""[Scheduler {self.worker_name}/{self.pid}] Trying to acquire locks for {", ".join(queue_names)}"""
75-
)
70+
self.log(DEBUG, f"""Trying to acquire locks for {", ".join(queue_names)}""")
7671
for queue in self._queues:
7772
lock = SchedulerLock(queue.name)
78-
if lock.acquire(self.pid, connection=queue.connection, expire=self.interval + 60):
73+
if lock.acquire(self.pid, connection=self.connection, expire=self.interval + 60):
7974
self._locks[queue.name] = lock
8075
successful_locks.add(queue.name)
8176

@@ -85,7 +80,7 @@ def _acquire_locks(self) -> Set[str]:
8580
for queue_name in self._locks:
8681
queue = get_queue(queue_name)
8782
self._scheduled_job_registries.append(queue.scheduled_job_registry)
88-
logger.debug(f"[Scheduler {self.worker_name}/{self.pid}] Locks acquired for {', '.join(self._locks.keys())}")
83+
self.log(DEBUG, f"Locks acquired for {', '.join(self._locks.keys())}")
8984
return successful_locks
9085

9186
def start(self) -> None:
@@ -98,24 +93,22 @@ def start(self) -> None:
9893

9994
def request_stop_and_wait(self) -> None:
10095
"""Toggle self._stop_requested that's checked on every loop"""
101-
logger.debug(f"[Scheduler {self.worker_name}/{self.pid}] Stop Scheduler requested")
96+
self.log(DEBUG, f"Stop Scheduler requested")
10297
self._stop_requested = True
10398
if self._thread is not None:
10499
self._thread.join()
105100

106101
def heartbeat(self) -> None:
107102
"""Updates the TTL on scheduler keys and the locks"""
108103
lock_keys = ", ".join(self._locks.keys())
109-
logger.debug(f"[Scheduler {self.worker_name}/{self.pid}] Scheduler updating lock for queue {lock_keys}")
104+
self.log(DEBUG, f"Scheduler updating lock for queue {lock_keys}")
110105
with self.connection.pipeline() as pipeline:
111106
for lock in self._locks.values():
112107
lock.expire(self.connection, expire=self.interval + 60)
113108
pipeline.execute()
114109

115110
def stop(self) -> None:
116-
logger.info(
117-
f"[Scheduler {self.worker_name}/{self.pid}] Stopping scheduler, releasing locks for {', '.join(self._locks.keys())}..."
118-
)
111+
self.log(INFO, f"Stopping scheduler, releasing locks for {', '.join(self._locks.keys())}...")
119112
self.release_locks()
120113
self.status = SchedulerStatus.STOPPED
121114

@@ -128,7 +121,7 @@ def release_locks(self) -> None:
128121

129122
def work(self) -> None:
130123
queue_names = [queue.name for queue in self._queues]
131-
logger.info(f"""[Scheduler {self.worker_name}/{self.pid}] Scheduler for {", ".join(queue_names)} started""")
124+
self.log(INFO, f"""Scheduler for {", ".join(queue_names)} started""")
132125
django.setup()
133126

134127
while True:

scheduler/worker/worker.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -97,16 +97,16 @@ def from_model(cls, model: WorkerModel) -> Self:
9797
return res
9898

9999
def __init__(
100-
self,
101-
queues: Iterable[Union[str, Queue]],
102-
name: str,
103-
maintenance_interval: int = SCHEDULER_CONFIG.DEFAULT_MAINTENANCE_TASK_INTERVAL,
104-
job_monitoring_interval: int = SCHEDULER_CONFIG.DEFAULT_JOB_MONITORING_INTERVAL,
105-
dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT,
106-
fork_job_execution: bool = True,
107-
with_scheduler: bool = True,
108-
burst: bool = False,
109-
model: Optional[WorkerModel] = None,
100+
self,
101+
queues: Iterable[Union[str, Queue]],
102+
name: str,
103+
maintenance_interval: int = SCHEDULER_CONFIG.DEFAULT_MAINTENANCE_TASK_INTERVAL,
104+
job_monitoring_interval: int = SCHEDULER_CONFIG.DEFAULT_JOB_MONITORING_INTERVAL,
105+
dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT,
106+
fork_job_execution: bool = True,
107+
with_scheduler: bool = True,
108+
burst: bool = False,
109+
model: Optional[WorkerModel] = None,
110110
) -> None:
111111
self.fork_job_execution = fork_job_execution
112112
self.job_monitoring_interval: int = job_monitoring_interval
@@ -290,7 +290,7 @@ def bootstrap(self) -> None:
290290
self.log(INFO, f"Worker {self.name} started with PID {os.getpid()}")
291291
self._command_listener.start()
292292
if self.with_scheduler:
293-
self.scheduler = WorkerScheduler(self.queues, worker_name=self.name, connection=self.connection)
293+
self.scheduler = WorkerScheduler(self.queues, worker_name=self.name)
294294
self.scheduler.start()
295295
self._model.has_scheduler = True
296296
self._model.save(connection=self.connection)
@@ -345,7 +345,7 @@ def run_maintenance_tasks(self) -> None:
345345
self._model.save(connection=self.connection)
346346

347347
def dequeue_job_and_maintain_ttl(
348-
self, timeout: Optional[int], max_idle_time: Optional[int] = None
348+
self, timeout: Optional[int], max_idle_time: Optional[int] = None
349349
) -> Tuple[Optional[JobModel], Optional[Queue]]:
350350
"""Dequeues a job while maintaining the TTL.
351351
:param timeout: The timeout for the dequeue operation.
@@ -504,7 +504,7 @@ def reorder_queues(self, reference_queue: Queue) -> None:
504504
return
505505
if self._dequeue_strategy == DequeueStrategy.ROUND_ROBIN:
506506
pos = self._ordered_queues.index(reference_queue)
507-
self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
507+
self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
508508
return
509509
if self._dequeue_strategy == DequeueStrategy.RANDOM:
510510
shuffle(self._ordered_queues)
@@ -584,7 +584,7 @@ def monitor_job_execution_process(self, job: JobModel, queue: Queue) -> None:
584584
while True:
585585
try:
586586
with SCHEDULER_CONFIG.DEATH_PENALTY_CLASS(
587-
self.job_monitoring_interval, JobExecutionMonitorTimeoutException
587+
self.job_monitoring_interval, JobExecutionMonitorTimeoutException
588588
):
589589
retpid, ret_val = self._wait_for_job_execution_process()
590590
break
@@ -801,7 +801,7 @@ class RoundRobinWorker(Worker):
801801

802802
def reorder_queues(self, reference_queue: Queue) -> None:
803803
pos = self._ordered_queues.index(reference_queue)
804-
self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
804+
self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
805805

806806

807807
class RandomWorker(Worker):

0 commit comments

Comments
 (0)