-
-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Help]How to initialize tortoise orm in multiple workers? #1694
Comments
How about this: #!/usr/bin/env python
from datetime import datetime
import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from contextlib import asynccontextmanager
from pathlib import Path
from typing import AsyncGenerator
import uvicorn
from faker import Faker
from fastapi import FastAPI
from tortoise import Model
from tortoise.contrib.fastapi import RegisterTortoise
from tortoise.fields import IntField, TextField
concurrency_nums = multiprocessing.cpu_count()
API_TITLE = "main"
def set_loguru() -> None:
pass
executor = ProcessPoolExecutor(max_workers=concurrency_nums, initializer=set_loguru)
class Group(Model):
id = IntField(primary_key=True)
name = TextField()
async def _initial_groups() -> None:
faker = Faker()
await Group.bulk_create([Group(name=faker.name()) for _ in range(10)])
async def do_govern_entry() -> None:
print(f"Enter do_govern_entry@{datetime.now()}")
async with lifespan(FastAPI()):
group = await Group.get(id=1)
print(f"I'm do_govern_entry: {dict(group)}")
group.name = str(datetime.now())
await group.save()
print(f"{group.id=} updated: {dict(group)}")
def do_async(func, *args, **kwargs) -> None:
try:
asyncio.run(func(*args, **kwargs))
except Exception:
import traceback
traceback.print_exc()
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
async with RegisterTortoise(
app,
db_url="sqlite://db_one.sqlite3",
modules={"models": [__name__]},
generate_schemas=True,
):
if not await Group.all().count():
await _initial_groups()
yield
if app.title == API_TITLE:
executor.shutdown(wait=False, cancel_futures=True)
app = FastAPI(title=API_TITLE, lifespan=lifespan)
@app.get("/groups")
async def group_list() -> list[dict]:
objs = await Group.all()
executor.submit(do_async, do_govern_entry)
return [dict(i) for i in objs]
if __name__ == "__main__":
uvicorn.run(f"{Path(__file__).stem}:app", reload=True) |
Thanks for your reponse and the code demo. However, I see that Tortoise will also be registered more than once, once when running in the main thread and another time in a separate process. 🤔In one sense, it seems similar to what I did but yours are simpler to use. To be frank, I was actually going to ask if there was something like a connection pool 😂 instead of doing it the way you and I are doing. |
As described above, I have a FastAPI application that uses the
concurrent.futures.ProcessPoolExecutor
module to perform CPU-intensive tasks and save the results to MySQL. However, it seems that I encounter some errors when performing database query operations within this process.Encounter errors...
Then I realized that Python's multiprocessing involves independent resources, so I separately initialized Tortoise within this process, and everything worked fine.
However, I feel that this approach is not quite appropriate, so I wanted to ask if there is a better way to handle this.
The text was updated successfully, but these errors were encountered: