You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hi, I'm very interested in getting Taskiq setup for my FastAPI project but am having trouble setting it up for even a basic test. If anyone could take a look to tell me what i'm doing wrong that would be very helpful.
I'm trying to using Redis for both queuing tasks and as the results backend. It's working successfully as a results backend when I use the InMemoryBroker to kiq, but the task doesn't get triggered when I use ListQueueBroker to kiq. Not sure what I'm doing wrong.
Here's the relevant code in my main.py:
# doesnt work with this brokerbroker=ListQueueBroker(settings.redis_url).with_result_backend(
RedisAsyncResultBackend(settings.redis_url)
)
# works with this broker# # broker = InMemoryBroker().with_result_backend(# RedisAsyncResultBackend(settings.redis_url)# )redis_client=Redis.from_url(settings.redis_url)
app=FastAPI()
taskiq_fastapi.init(broker, "app.main:app")
@broker.task()asyncdefadd(x: int, y: int) ->int:
logging.info("IN TASK") # logged when using in memory broker, not when using list queue brokerawaitasyncio.sleep(5)
returnx+y@app.on_event("startup")asyncdefstartup_event():
awaitredis.flushall() # clearing queueifnotbroker.is_worker_process:
awaitbroker.startup()
task=awaitadd.kiq(1, 2)
current_keys=awaitredis_client.keys(*)
logging.info(f"Current keys: {current_keys}") # empty when inmemorybroker, but message when list broker not picked upresults=awaittask.wait_result() # completes when InMemoryBroker, gets hung when using list queue brokerupdated_keys=awaitredis_client.keys(*)
print(f"Updated keys: {updated_keys}") # shows results message for completed task when using in memory broker@app.on_event("shutdown")asyncdefshutdown_event():
ifbroker.is_worker_process:
awaitbroker.shutdown()
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hi, I'm very interested in getting Taskiq setup for my FastAPI project but am having trouble setting it up for even a basic test. If anyone could take a look to tell me what i'm doing wrong that would be very helpful.
I'm trying to using Redis for both queuing tasks and as the results backend. It's working successfully as a results backend when I use the InMemoryBroker to kiq, but the task doesn't get triggered when I use ListQueueBroker to kiq. Not sure what I'm doing wrong.
Here's the relevant code in my main.py:
Here's my docker-compose for reference as well:
Beta Was this translation helpful? Give feedback.
All reactions