Skip to content

Latest commit

 

History

History
243 lines (163 loc) · 7.31 KB

README.zh.md

File metadata and controls

243 lines (163 loc) · 7.31 KB

简体中文 | English

SQLAlchemy-Database

SQLAlchemy-Database为SQLAlchemy ORM提供常见数据库操作的快捷方式.

Pytest codecov Package version Chat on Gitter 229036692

项目介绍

  • 支持SQLAlchemySQLModel,推荐使用SQLModel.

安装

pip install sqlalchemy-database

ORM模型

SQLAlchemy模型示例

import datetime

import sqlalchemy as sa
from sqlalchemy.orm import declarative_base

Base = declarative_base()


class User(Base):
    __tablename__ = "User"
    id = sa.Column(sa.Integer, primary_key=True)
    username = sa.Column(sa.String(30), unique=True, index=True, nullable=False)
    password = sa.Column(sa.String(30), default='')
    create_time = sa.Column(sa.DateTime, default=datetime.datetime.utcnow)

SQLModel模型示例

import datetime

from sqlmodel import SQLModel, Field


class User(SQLModel, table=True):
    id: int = Field(default=None, primary_key=True, nullable=False)
    username: str = Field(title='username', max_length=30, unique=True, index=True, nullable=False)
    password: str = Field(default='', title='Password')
    create_time: datetime = Field(default_factory=datetime.now, title='Create Time')

AsyncDatabase

创建连接

from sqlalchemy_database import AsyncDatabase

# 1.创建数据库客户端
db = AsyncDatabase.create('sqlite+aiosqlite:///amisadmin.db?check_same_thread=False')  # sqlite
# db = AsyncDatabase.create('mysql+aiomysql://root:[email protected]:3306/amisadmin?charset=utf8mb4')# mysql
# db = AsyncDatabase.create('postgresql+asyncpg://postgres:[email protected]:5432/amisadmin')# postgresql

Database

创建连接

from sqlalchemy_database import Database

# 1.创建数据库客户端
db = Database.create('sqlite:///amisadmin.db?check_same_thread=False')  # sqlite
# db = Database.create('mysql+pymysql://root:[email protected]:3306/amisadmin?charset=utf8mb4') # mysql
# db = Database.create('postgresql://postgres:[email protected]:5432/amisadmin') # postgresql
# db = Database.create('oracle+cx_oracle://scott:tiger@tnsname') # oracle
# db = Database.create('mssql+pyodbc://scott:tiger@mydsn') # SQL Server

AbcAsyncDatabase

当你在开发一个工具库的时候,你的python程序可能会需要对方提供一个数据库连接.

但是你不能确定对方个人倾向是使用同步连接,还是异步连接.你可以使用async_前缀的异步快捷函数.

AsyncDatabaseDatabase都继承自AbcAsyncDatabase,并且都实现了常用的以async_为前缀的异步快捷函数.

例如: async_execute,async_scalar,async_scalars,async_get,async_delete,async_run_sync.

说明: Databaseasync_前缀异步快捷函数,是通过在线程池中执行对应的同步快捷函数实现的.

异步兼容快捷函数

from sqlalchemy import insert, select, update, delete
from sqlalchemy_database import AsyncDatabase, Database


async def fast_execute(db: Union[AsyncDatabase, Database]):
    # update
    stmt = update(User).where(User.id == 1).values({'username': 'new_user'})
    result = await db.async_execute(stmt)

    # select
    stmt = select(User).where(User.id == 1)
    user = await db.async_execute(stmt, on_close_pre=lambda r: r.scalar())

    # insert
    stmt = insert(User).values({'username': 'User-6', 'password': 'password-6'})
    result = await db.async_execute(stmt)

    # delete
    stmt = delete(User).where(User.id == 6)
    result = await db.async_execute(stmt)

    # scalar
    user = await db.async_scalar(select(User).where(User.id == 1))

    # scalars
    stmt = select(User)
    result = await db.async_scalars(stmt)

    # get
    user = await db.async_get(User, 1)

    # delete
    user = User(id=1, name='test')
    await db.async_delete(user)

    # run_sync
    await db.async_run_sync(Base.metadata.create_all, is_session=False)

FastAPI中使用依赖

app = FastAPI()


# AsyncDatabase
@app.get("/user/{id}")
async def get_user(id: int, session: AsyncSession = Depends(db.session_generator)):
    return await session.get(User, id)


# Database
@app.get("/user/{id}")
def get_user(id: int, session: Session = Depends(db.session_generator)):
    return session.get(User, id)

FastAPI中使用中间件

app = FastAPI()

# Database
sync_db = Database.create("sqlite:///amisadmin.db?check_same_thread=False")

app.add_middleware(sync_db.asgi_middleware)


@app.get("/user/{id}")
def get_user(id: int):
    return sync_db.session.get(User, id)


# AsyncDatabase
async_db = AsyncDatabase.create("sqlite+aiosqlite:///amisadmin.db?check_same_thread=False")

app.add_middleware(async_db.asgi_middleware)


@app.get("/user/{id}")
async def get_user(id: int):
    return await async_db.session.get(User, id)

获取session对象

你可以在任何地方获取session对象,但是你需要自己管理session的生命周期.例如:

  • 1.在FastAPI中,你可以使用中间件,或者依赖来获取session对象.在路由函数中,调用的方法将自动获取上下文中的session对象.

  • 2.在局部工作单元中,你可以使用with语句来获取session对象.在with语句中,调用的方法将自动获取一个新的session对象.

graph LR
session[Get session] --> scopefunc{Read context var}
scopefunc -->|None| gSession[Return the global default session]
scopefunc -->|Not a Session object| sSession[Return the scoped session corresponding to the current context variable]
scopefunc -->|Is a Session object| cSession[Return session in the current context variable]
Loading

更多教程文档

SQLAlchemy-DatabaseSQLAlchemy的基础上添加拓展功能. 更多功能及复杂使用,请参考SQLAlchemy官方文档.

SQLAlchemy非常强大,几乎可以满足你的任何复杂需求.

推荐使用SQLModel定义ORM模型,请参考SQLModel官方文档.

SQLModel由FastAPI作者编写, 完美结合SQLAlchemy+Pydantic, 拥有SQLAlchemyPydantic的所有功能.

相关项目

许可协议

该项目遵循 Apache2.0 许可协议。