diff --git a/README.md b/README.md index f683e32..6e1c0f7 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,12 @@ class NotifyHandler(Handler): ''' # users = await UserRepository.find_all([r.user_id for r in requests]) # await mailer.send_all([construct_message(user) for user in users]) + + def batch_size(self) -> None: + ''' + Optionally, you can ovveride global batch size per handler + ''' + return 128 ``` ## Usage example diff --git a/sharded_queue/__init__.py b/sharded_queue/__init__.py index f113015..2b0a9d8 100644 --- a/sharded_queue/__init__.py +++ b/sharded_queue/__init__.py @@ -47,6 +47,9 @@ async def handle(self, *requests: T) -> None: async def stop(self) -> None: pass + def batch_size(self) -> int: + return self.settings.batch_size + class Tube(NamedTuple): handler: type[Handler] @@ -180,12 +183,6 @@ async def acquire_tube( return None - def page_size(self, limit: Optional[int] = None) -> int: - if limit is None: - return self.settings.batch_size - - return min(limit, self.settings.batch_size) - async def loop( self, limit: Optional[int] = None, @@ -232,12 +229,12 @@ async def process(self, tube: Tube, limit: Optional[int] = None) -> int: limit is None or limit > processed_counter ): if tube.handler is RecurrentHandler: - page_size = self.settings.recurrent_tasks_limit + batch_size = self.settings.recurrent_tasks_limit else: - page_size = self.page_size(limit) + batch_size = min(limit, instance.batch_size()) processed = False for pipe in pipes: - msgs = await storage.range(pipe, page_size) + msgs = await storage.range(pipe, batch_size) if not len(msgs): continue