diff --git a/python/bullmq/flow_producer.py b/python/bullmq/flow_producer.py index fbdcfa2782..57f8674247 100644 --- a/python/bullmq/flow_producer.py +++ b/python/bullmq/flow_producer.py @@ -134,8 +134,8 @@ async def addBulk(self, flows: list[dict]): return result - def close(self): + async def close(self): """ Close the flow instance. """ - return self.redisConnection.close() + await self.redisConnection.close() diff --git a/python/bullmq/queue.py b/python/bullmq/queue.py index 44a54355ed..746ca76370 100644 --- a/python/bullmq/queue.py +++ b/python/bullmq/queue.py @@ -308,11 +308,11 @@ def sanitizeJobTypes(self, types): 'waiting-children' ] - def close(self): + async def close(self): """ Close the queue instance. """ - return self.redisConnection.close() + await self.redisConnection.close() def remove(self, job_id: str, opts: dict = {}): return self.scripts.remove(job_id, opts.get("removeChildren", True)) diff --git a/python/bullmq/redis_connection.py b/python/bullmq/redis_connection.py index cc92f993b6..b60ff5a0b5 100644 --- a/python/bullmq/redis_connection.py +++ b/python/bullmq/redis_connection.py @@ -50,11 +50,14 @@ def disconnect(self): """ return self.conn.disconnect() - def close(self): + async def close(self): """ Close the connection """ - return self.conn.close() + try: + await self.conn.aclose() + except AttributeError: + await self.conn.close() async def getRedisVersion(self): if self.version is not None: