diff --git a/taskiq/brokers/inmemory_broker.py b/taskiq/brokers/inmemory_broker.py index 544289f..b0bcd37 100644 --- a/taskiq/brokers/inmemory_broker.py +++ b/taskiq/brokers/inmemory_broker.py @@ -87,7 +87,7 @@ async def set_progress( progress: TaskProgress[Any], ) -> None: """ - Set progress of task exection. + Set progress of task execution. :param task_id: task id :param progress: task execution progress diff --git a/taskiq/cli/scheduler/args.py b/taskiq/cli/scheduler/args.py index 59230db..1850f36 100644 --- a/taskiq/cli/scheduler/args.py +++ b/taskiq/cli/scheduler/args.py @@ -32,7 +32,10 @@ def from_cli(cls, args: Optional[Sequence[str]] = None) -> "SchedulerArgs": formatter_class=ArgumentDefaultsHelpFormatter, description="Subcommand to run scheduler", ) - parser.add_argument("scheduler", help="Path to scheduler") + parser.add_argument( + "scheduler", + help="Path to scheduler or scheduler factory function", + ) parser.add_argument( "modules", help="List of modules where to look for tasks.", diff --git a/taskiq/cli/scheduler/run.py b/taskiq/cli/scheduler/run.py index 9155324..8c15fae 100644 --- a/taskiq/cli/scheduler/run.py +++ b/taskiq/cli/scheduler/run.py @@ -1,4 +1,5 @@ import asyncio +import inspect import sys from datetime import datetime, timedelta from logging import basicConfig, getLevelName, getLogger @@ -189,8 +190,11 @@ async def run_scheduler(args: SchedulerArgs) -> None: ), ) getLogger("taskiq").setLevel(level=getLevelName(args.log_level)) + if isinstance(args.scheduler, str): scheduler = import_object(args.scheduler) + if inspect.isfunction(scheduler): + scheduler = scheduler() else: scheduler = args.scheduler if not isinstance(scheduler, TaskiqScheduler): @@ -198,6 +202,7 @@ async def run_scheduler(args: SchedulerArgs) -> None: "Imported scheduler is not a subclass of TaskiqScheduler.", ) sys.exit(1) + scheduler.broker.is_scheduler_process = True import_tasks(args.modules, args.tasks_pattern, args.fs_discover) for source in scheduler.sources: diff --git a/taskiq/cli/worker/args.py b/taskiq/cli/worker/args.py index 7350bfa..d2580f9 100644 --- a/taskiq/cli/worker/args.py +++ b/taskiq/cli/worker/args.py @@ -61,7 +61,7 @@ def from_cli( parser.add_argument( "broker", help=( - "Where to search for broker. " + "Where to search for broker or broker factory function. " "This string must be specified in " "'module.module:variable' format." ), diff --git a/taskiq/cli/worker/run.py b/taskiq/cli/worker/run.py index 727a02a..d5d5cf5 100644 --- a/taskiq/cli/worker/run.py +++ b/taskiq/cli/worker/run.py @@ -1,4 +1,5 @@ import asyncio +import inspect import logging import os import signal @@ -121,9 +122,16 @@ def interrupt_handler(signum: int, _frame: Any) -> None: # broker is running as a worker. # We must set this field before importing tasks, # so broker will remember all tasks it's related to. + broker = import_object(args.broker) + if inspect.isfunction(broker): + broker = broker() if not isinstance(broker, AsyncBroker): - raise ValueError("Unknown broker type. Please use AsyncBroker instance.") + raise ValueError( + "Unknown broker type. Please use AsyncBroker instance " + "or pass broker factory function that returns an AsyncBroker instance.", + ) + broker.is_worker_process = True import_tasks(args.modules, args.tasks_pattern, args.fs_discover)