Skip to content

Commit

Permalink
Dont close a connection that is not there
Browse files Browse the repository at this point in the history
  • Loading branch information
sverhoeven committed Oct 8, 2024
1 parent 3737f5e commit 8e0010d
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions src/bartender/schedulers/arq.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ class ArqSchedulerConfig(BaseModel):
type: Literal["arq"] = "arq"
redis_dsn: RedisDsn = parse_obj_as(RedisDsn, "redis://localhost:6379")
queue: str = "arq:queue"
max_jobs: PositiveInt = 10 # noqa: WPS462
"""Maximum number of jobs to run at a time inside a single worker.""" # noqa: E501, WPS322, WPS428
job_timeout: Union[PositiveInt, timedelta] = 3600 # noqa: WPS462
max_jobs: PositiveInt = 10
"""Maximum number of jobs to run at a time inside a single worker."""
job_timeout: Union[PositiveInt, timedelta] = 3600
"""Maximum job run time.
Default is one hour.
In seconds or string in `ISO 8601 duration format <https://en.wikipedia.org/wiki/ISO_8601#Durations>`_.
For example, "PT12H" represents a max runtime of "twelve hours".
""" # noqa: E501, WPS428
"""

@property
def redis_settings(self) -> RedisSettings:
Expand All @@ -72,19 +72,19 @@ def __init__(self, config: ArqSchedulerConfig) -> None:
self.config: ArqSchedulerConfig = config
self.connection: Optional[ArqRedis] = None

async def close(self) -> None: # noqa: D102
pool = await self._pool()
await pool.close()
async def close(self) -> None:
if self.connection is not None:
await self.connection.close()

async def submit(self, description: JobDescription) -> str: # noqa: D102
async def submit(self, description: JobDescription) -> str:
pool = await self._pool()
job = await pool.enqueue_job("_exec", description)
if job is None:
# TODO better error?
raise RuntimeError("Job already exists")
return job.job_id

async def state(self, job_id: str) -> State: # noqa: D102
async def state(self, job_id: str) -> State:
pool = await self._pool()
job = Job(job_id, pool)
arq_status = await job.status()
Expand All @@ -100,7 +100,7 @@ async def state(self, job_id: str) -> State: # noqa: D102
success = result.success
return _map_arq_status(arq_status, success)

async def cancel(self, job_id: str) -> None: # noqa: D102
async def cancel(self, job_id: str) -> None:
pool = await self._pool()
job = Job(job_id, pool)
try:
Expand Down Expand Up @@ -129,7 +129,7 @@ class JobFailureError(Exception):
"""Error during job running."""


async def _exec( # noqa: WPS210
async def _exec(
ctx: dict[Any, Any],
description: JobDescription,
) -> None:
Expand Down

0 comments on commit 8e0010d

Please sign in to comment.