This repo adds integration between your aiogram application and taskiq.
It runs all startup and shutdown events of your application and adds 3 dependencies, that you can use in your tasks.
Dispatcher
- that were used along with executor;Bot
- your main bot instance;List[Bot]
- all your bots.
Define startup and shutdown events for your dispatcher. We use events, because we need to identify what your bot wants to do on startup and shutdown.
Also, it's useful for bot developers to distinct buisness logic from startup of the bot.
Below you'll find an example, how to integrate taskiq with your favorite bot framework.
import asyncio
import logging
import sys
from aiogram import Bot, Dispatcher
# Your taskiq broker
from your_project.tkq import broker
dp = Dispatcher()
bot = Bot(token="TOKEN")
bot2 = Bot(token="TOKEN")
@dp.startup()
async def setup_taskiq(bot: Bot, *_args, **_kwargs):
# Here we check if it's a clien-side,
# Becuase otherwise you're going to
# create infinite loop of startup events.
if not broker.is_worker_process:
logging.info("Setting up taskiq")
await broker.startup()
@dp.shutdown()
async def shutdown_taskiq(bot: Bot, *_args, **_kwargs):
if not broker.is_worker_process:
logging.info("Shutting down taskiq")
await broker.shutdown()
async def main():
await dp.start_polling(bot, bot2)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
asyncio.run(main())
The only thing that left is to add few lines to your broker definition.
# Please use real broker for taskiq.
from taskiq_broker import MyBroker
import taskiq_aiogram
broker = MyBroker()
# This line is going to initialize everything.
taskiq_aiogram.init(
broker,
"your_project.__main__:dp",
"your_project.__main__:bot",
"your_project.__main__:bot2",
)
That's it.
Let's create some tasks! I created task in a separate module,
named tasks.py
.
from aiogram import Bot
from your_project.tkq import broker
@broker.task(task_name="my_task")
async def my_task(chat_id: int, bot: Bot = TaskiqDepends()) -> None:
print("I'm a task")
await asyncio.sleep(4)
await bot.send_message(chat_id, "task completed")
Now let's call our new task somewhere in bot commands.
from aiogram import types
from aiogram.filters import Command
from tasks import my_task
@dp.message(Command("task"))
async def message(message: types.Message):
await my_task.kiq(message.chat.id)
To start the worker, please type:
taskiq worker your_project.tkq:broker --fs-discover
We use --fs-discover
to find all tasks.py modules recursively
and import all tasks into broker.
Now we can fire the task and see everything in action.