SQLAlchemy 2.0 完整指南
基本查詢
| SQLAlchemy 2.0 Async ORM | Raw SQL |
|---|---|
select(Station) | SELECT * FROM station |
select(Station.id, Station.name) | SELECT id, name FROM station |
select(func.count()) | SELECT COUNT(*) FROM station |
select(func.count(Station.id)) | SELECT COUNT(id) FROM station |
WHERE 條件
| SQLAlchemy 2.0 Async ORM | Raw SQL |
|---|---|
select(Station).where(Station.id == 1) | SELECT * FROM station WHERE id = 1 |
select(Station).where(Station.name == "test") | SELECT * FROM station WHERE name = 'test' |
select(Station).where(Station.is_active.is_(True)) | SELECT * FROM station WHERE is_active IS TRUE |
select(Station).where(~Station.is_deleted) | SELECT * FROM station WHERE NOT is_deleted |
select(Station).where(Station.id > 5) | SELECT * FROM station WHERE id > 5 |
select(Station).where(Station.id.in_([1, 2, 3])) | SELECT * FROM station WHERE id IN (1, 2, 3) |
select(Station).where(Station.name.like("%test%")) | SELECT * FROM station WHERE name LIKE '%test%' |
select(Station).where(Station.value.is_(None)) | SELECT * FROM station WHERE value IS NULL |
select(Station).where(Station.value.is_not(None)) | SELECT * FROM station WHERE value IS NOT NULL |
多條件查詢
| SQLAlchemy 2.0 Async ORM | Raw SQL |
|---|---|
select(Station).where(Station.name == "test", Station.is_deleted.is_(False)) | SELECT * FROM station WHERE name = 'test' AND is_deleted IS FALSE |
select(Station).where(or_(Station.id == 1, Station.name == "test")) | SELECT * FROM station WHERE id = 1 OR name = 'test' |
select(Station).where(and_(Station.id > 5, Station.id < 10)) | SELECT * FROM station WHERE id > 5 AND id < 10 |
排序
| SQLAlchemy 2.0 Async ORM | Raw SQL |
|---|---|
select(Station).order_by(Station.id.asc()) | SELECT * FROM station ORDER BY id ASC |
select(Station).order_by(Station.id.desc()) | SELECT * FROM station ORDER BY id DESC |
分頁
| SQLAlchemy 2.0 Async ORM | Raw SQL |
|---|---|
select(Station).limit(10) | SELECT * FROM station LIMIT 10 |
select(Station).offset(20) | SELECT * FROM station OFFSET 20 |
select(Station).limit(10).offset(20) | SELECT * FROM station LIMIT 10 OFFSET 20 |
聚合
| SQLAlchemy 2.0 Async ORM | Raw SQL |
|---|---|
select(func.count()).select_from(Station) | SELECT COUNT(*) FROM station |
select(func.sum(Station.value)) | SELECT SUM(value) FROM station |
select(func.avg(Station.value)) | SELECT AVG(value) FROM station |
select(func.max(Station.value)) | SELECT MAX(value) FROM station |
select(func.min(Station.value)) | SELECT MIN(value) FROM station |
連接查詢
| SQLAlchemy 2.0 Async ORM | Raw SQL |
|---|---|
select(Station).join(Group, Station.group_id == Group.id) | SELECT * FROM station JOIN group ON station.group_id = group.id |
select(Station).outerjoin(Group, Station.group_id == Group.id) | SELECT * FROM station LEFT OUTER JOIN group ON station.group_id = group.id |
UPDATE
寫入操作建議用 Core API(可讀性高、效能好):
| SQLAlchemy 2.0 Core | Raw SQL |
|---|---|
update(Station).where(Station.id == id).values(group_id=0) | UPDATE station SET group_id = 0 WHERE id = :id |
Async SQLAlchemy
FastAPI 是 async-first 框架,推論服務通常需要高並發,所以推薦用AsyncSession,否則會阻塞 event loop,把 async 的優勢全部抵消。
Engine 與 Session 設定
# database.py
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
# 注意:asyncpg driver,URL 前綴是 postgresql+asyncpg://
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/dbname"
engine = create_async_engine(
DATABASE_URL,
# 推論服務關鍵設定(詳見下方連線池章節)
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
echo=False, # 生產環境關掉,避免 log 爆炸
)
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # 推論服務重要設定,見下方說明
)
class Base(DeclarativeBase):
pass
FastAPI Dependency Injection
# deps.py
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from .database import AsyncSessionLocal
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
在 Route 中使用
# routes/inference.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from .deps import get_db
from .models import InferenceResult
router = APIRouter()
@router.get("/results/{job_id}")
async def get_result(
job_id: str,
db: AsyncSession = Depends(get_db)
):
result = await db.execute(
select(InferenceResult).where(InferenceResult.job_id == job_id)
)
return result.scalar_one_or_none()