FastAPI 多数据库架构实战:PostgreSQL + Redis + MongoDB 三库协同

FastAPI 多数据库架构实战:PostgreSQL + Redis + MongoDB 三库协同

简介

现代后端应用很少只用一种数据库。关系型数据库(PostgreSQL)负责事务性数据和强一致性查询,缓存数据库(Redis)扛高并发读写和会话管理,文档数据库(MongoDB)存储非结构化数据和海量日志。将它们整合到同一个 FastAPI 应用中,需要一套清晰的架构设计。

本文以 AI 语言学习平台的后端为背景,完整讲解如何在 FastAPI 中同时使用 PostgreSQL(SQLAlchemy async)、Redis(redis-py)和 MongoDB(Motor),包括连接管理、依赖注入、事务协调、健康检查和生产部署。所有代码可直接用于生产项目。

前置要求

  • Python 3.11+
  • FastAPI 基础(路由、依赖注入、Pydantic 模型)
  • 了解 SQLAlchemy 2.0 async 基本用法
  • Docker 和 Docker Compose(用于本地运行三数据库)
  • 已安装依赖:pip install fastapi uvicorn sqlalchemy[asyncio] asyncpg redis motor pydantic-settings

一、架构总览

1.1 三库分工

数据库 职责 典型数据 访问方式
PostgreSQL 核心业务数据、关系查询、事务 用户、订单、权限 SQLAlchemy 2.0 async ORM
Redis 缓存、会话、速率限制、发布订阅 会话 Token、缓存结果、实时消息 redis-py async
MongoDB 非结构化数据、日志、对话历史 聊天记录、操作日志、分析事件 Motor async driver

1.2 分层架构

1
2
3
4
5
6
7
8
9
10
11
12
13
┌─────────────────────────────────────────────┐
│ API Layer │
│ FastAPI Routers / Dependencies │
├─────────────────────────────────────────────┤
│ Service Layer │
│ Business Logic / Transaction Coordinator │
├────────┬────────┬────────────────────────────┤
│ PG │ Redis │ MongoDB │
│ Repo │ Cache │ Repo │
├────────┴────────┴────────────────────────────┤
│ Database Connections │
│ SQLAlchemy async │ redis-py │ Motor │
└─────────────────────────────────────────────┘

二、配置管理

2.1 环境变量与 Pydantic Settings

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# config.py
from pydantic_settings import BaseSettings
from functools import lru_cache


class Settings(BaseSettings):
# PostgreSQL
postgres_dsn: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/chatlingo"
postgres_pool_size: int = 20
postgres_max_overflow: int = 10

# Redis
redis_dsn: str = "redis://localhost:***@localhost:27017"
mongodb_dsn: str = "mongodb://root:***@localhost:27017"
mongodb_database: str = "chatlingo"
mongodb_max_pool_size: int = 10

model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}


@lru_cache
def get_settings() -> Settings:
return Settings()

2.2 Docker Compose 编排三数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# docker-compose.yml
version: "3.8"

services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: chatlingo
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 3s
retries: 5

redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redisdata:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 3s
retries: 5

mongo:
image: mongo:7
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: example
ports:
- "27017:27017"
volumes:
- mongodata:/data/db
healthcheck:
test: echo 'db.runCommand("ping").ok' | mongosh --quiet
interval: 5s
timeout: 3s
retries: 5

volumes:
pgdata:
redisdata:
mongodata:

三、数据库连接管理

3.1 PostgreSQL — SQLAlchemy 2.0 Async

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# database/postgres.py
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase
from config import get_settings


class Base(DeclarativeBase):
pass


engine = create_async_engine(
get_settings().postgres_dsn,
pool_size=get_settings().postgres_pool_size,
max_overflow=get_settings().postgres_max_overflow,
echo=False,
pool_pre_ping=True,
)

async_session_factory = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)


async def get_db_session() -> AsyncSession:
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()

关键设计决策:

  • expire_on_commit=False:提交后不自动过期对象,避免在序列化时触发懒加载
  • pool_pre_ping=True:每次从连接池取连接前先 ping 一下,防止拿到已断开的连接
  • async_session_factory 是线程安全的工厂,每个请求通过 get_db_session 获取独立会话

3.2 Redis — 连接池与异步客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# database/redis.py
import redis.asyncio as aioredis
from config import get_settings


redis_pool = aioredis.ConnectionPool.from_url(
get_settings().redis_dsn,
max_connections=20,
decode_responses=True,
)


async def get_redis() -> aioredis.Redis:
redis = aioredis.Redis(connection_pool=redis_pool)
try:
yield redis
finally:
pass

关键点: Redis 连接池在应用启动时创建一次,所有请求共享。decode_responses=True 让返回结果自动从 bytes 转为 str。

3.3 MongoDB — Motor 异步驱动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# database/mongodb.py
from motor.motor_asyncio import AsyncIOMotorClient
from config import get_settings


mongo_client: AsyncIOMotorClient | None = None


async def init_mongo():
global mongo_client
settings = get_settings()
mongo_client = AsyncIOMotorClient(
settings.mongodb_dsn,
maxPoolSize=settings.mongodb_max_pool_size,
)


async def close_mongo():
global mongo_client
if mongo_client:
mongo_client.close()
mongo_client = None


def get_database():
if mongo_client is None:
raise RuntimeError("MongoDB not initialized")
return mongo_client[get_settings().mongodb_database]


async def get_mongo_db():
yield get_database()

3.4 应用生命周期管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from database.postgres import engine, Base
from database.mongodb import init_mongo, close_mongo
from database.redis import redis_pool


@asynccontextmanager
async def lifespan(app: FastAPI):
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
await init_mongo()
yield
await engine.dispose()
await close_mongo()
await redis_pool.disconnect()


app = FastAPI(lifespan=lifespan)

四、数据模型与仓储模式

4.1 PostgreSQL ORM 模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# models/user.py
import uuid
from datetime import datetime
from sqlalchemy import String, DateTime, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column
from database.postgres import Base


class User(Base):
__tablename__ = "users"

id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
username: Mapped[str] = mapped_column(
String(50), unique=True, nullable=False, index=True
)
email: Mapped[str] = mapped_column(
String(255), unique=True, nullable=False, index=True
)
hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
language_preference: Mapped[str] = mapped_column(
String(10), default="en", nullable=False
)
cefr_level: Mapped[str | None] = mapped_column(String(5), default="A1")
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
)

4.2 MongoDB 文档模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# models/conversation.py
from datetime import datetime
from pydantic import BaseModel, Field
from typing import List, Optional


class Message(BaseModel):
role: str
content: str
timestamp: datetime = Field(default_factory=datetime.utcnow)
metadata: dict = Field(default_factory=dict)


class Conversation(BaseModel):
id: str = Field(alias="_id")
user_id: str
title: str = "新对话"
messages: List[Message] = []
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)

4.3 仓储模式实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# repositories/user_repo.py
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from models.user import User


class UserRepository:
def __init__(self, session: AsyncSession):
self.session = session

async def get_by_id(self, user_id: UUID) -> User | None:
result = await self.session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()

async def get_by_email(self, email: str) -> User | None:
result = await self.session.execute(
select(User).where(User.email == email)
)
return result.scalar_one_or_none()

async def create(self, user: User) -> User:
self.session.add(user)
await self.session.flush()
return user
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# repositories/conversation_repo.py
from motor.motor_asyncio import AsyncIOMotorDatabase
from bson.objectid import ObjectId


class ConversationRepository:
def __init__(self, db: AsyncIOMotorDatabase):
self.collection = db["conversations"]

async def create(self, user_id: str, title: str = "新对话") -> str:
result = await self.collection.insert_one({
"user_id": user_id,
"title": title,
"messages": [],
"created_at": None,
"updated_at": None,
})
return str(result.inserted_id)

async def append_message(
self, conversation_id: str, message: dict
) -> bool:
result = await self.collection.update_one(
{"_id": ObjectId(conversation_id)},
{
"$push": {"messages": message},
"$set": {"updated_at": None},
},
)
return result.modified_count > 0

async def get_recent(
self, user_id: str, limit: int = 20
) -> list[dict]:
cursor = self.collection.find(
{"user_id": user_id}
).sort("updated_at", -1).limit(limit)
return await cursor.to_list(length=limit)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# repositories/cache_repo.py
import json
import redis.asyncio as aioredis
from typing import Any, Optional


class CacheRepository:
def __init__(self, redis: aioredis.Redis):
self.redis = redis

async def get(self, key: str) -> Optional[Any]:
data = await self.redis.get(key)
if data is None:
return None
return json.loads(data)

async def set(self, key: str, value: Any, ttl: int = 300) -> None:
await self.redis.set(key, json.dumps(value), ex=ttl)

async def delete(self, key: str) -> None:
await self.redis.delete(key)

async def exists(self, key: str) -> bool:
return await self.redis.exists(key) > 0

五、Service 层:三库协同

5.1 事务协调器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# services/user_service.py
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from redis.asyncio import Redis
from motor.motor_asyncio import AsyncIOMotorDatabase
from repositories.user_repo import UserRepository
from repositories.cache_repo import CacheRepository
from repositories.conversation_repo import ConversationRepository


class UserService:
def __init__(
self,
pg_session: AsyncSession,
redis: Redis,
mongo_db: AsyncIOMotorDatabase,
):
self.user_repo = UserRepository(pg_session)
self.cache_repo = CacheRepository(redis)
self.conv_repo = ConversationRepository(mongo_db)

async def register_user(self, user_data: dict) -> dict:
user = await self.user_repo.create(
User(
username=user_data["username"],
email=user_data["email"],
hashed_password=hash_password(user_data["password"]),
)
)
await self.cache_repo.set(
f"user:{user.id}",
{"id": str(user.id), "username": user.username, "email": user.email},
ttl=3600,
)
conv_id = await self.conv_repo.create(
user_id=str(user.id), title="欢迎"
)
return {
"user_id": str(user.id),
"welcome_conversation_id": conv_id,
}

5.2 缓存穿透防护

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async def get_user_profile(self, user_id: UUID) -> dict:
cache_key = f"user:{user_id}"
cached = await self.cache_repo.get(cache_key)
if cached is not None:
return cached

user = await self.user_repo.get_by_id(user_id)
if user is None:
raise HTTPException(status_code=404, detail="User not found")

profile = {
"id": str(user.id),
"username": user.username,
"email": user.email,
"language": user.language_preference,
"level": user.cefr_level,
}
await self.cache_repo.set(cache_key, profile, ttl=300)
return profile

六、FastAPI 依赖注入整合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# dependencies.py
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from redis.asyncio import Redis
from motor.motor_asyncio import AsyncIOMotorDatabase
from database.postgres import get_db_session
from database.redis import get_redis
from database.mongodb import get_mongo_db


class DatabaseHolder:
def __init__(
self,
pg: AsyncSession = Depends(get_db_session),
redis: Redis = Depends(get_redis),
mongo: AsyncIOMotorDatabase = Depends(get_mongo_db),
):
self.pg = pg
self.redis = redis
self.mongo = mongo


@router.post("/users/register")
async def register(
data: UserCreate,
db: DatabaseHolder = Depends(),
):
service = UserService(db.pg, db.redis, db.mongo)
return await service.register_user(data.model_dump())

七、健康检查端点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# routers/health.py
from fastapi import APIRouter, Depends
from sqlalchemy import text
from redis.asyncio import Redis
from motor.motor_asyncio import AsyncIOMotorDatabase
from database.postgres import get_db_session
from database.redis import get_redis
from database.mongodb import get_mongo_db

router = APIRouter(tags=["health"])


@router.get("/health")
async def health_check(
pg_session=Depends(get_db_session),
redis: Redis = Depends(get_redis),
mongo_db: AsyncIOMotorDatabase = Depends(get_mongo_db),
):
status = {"status": "ok", "databases": {}}

try:
await pg_session.execute(text("SELECT 1"))
status["databases"]["postgresql"] = "connected"
except Exception as e:
status["databases"]["postgresql"] = f"error: {str(e)}"
status["status"] = "degraded"

try:
await redis.ping()
status["databases"]["redis"] = "connected"
except Exception as e:
status["databases"]["redis"] = f"error: {str(e)}"
status["status"] = "degraded"

try:
await mongo_db.command("ping")
status["databases"]["mongodb"] = "connected"
except Exception as e:
status["databases"]["mongodb"] = f"error: {str(e)}"
status["status"] = "degraded"

return status

八、生产部署注意事项

8.1 连接池调优

参数 PostgreSQL Redis MongoDB
连接池大小 CPU 核心数 × 2~4 应用实例数 × 10 CPU 核心数 × 2
超时 30s 连接超时 5s 操作超时 10s 操作超时
健康检查 pool_pre_ping=True 内置 内置

8.2 事务边界

PostgreSQL 事务通过 get_db_sessioncommit/rollback 管理。Redis 和 MongoDB 不支持跨数据库事务。关键原则:

  1. PostgreSQL 优先提交:涉及 PostgreSQL 写入的操作先提交,再操作 Redis/MongoDB
  2. 最终一致性:Redis/MongoDB 操作失败时,通过重试或补偿机制恢复
  3. 幂等设计:所有跨库操作尽量设计为幂等的

九、常见问题

Q1: SQLAlchemy 报 MissingGreenlet 错误

原因: 在 async 模式下访问未加载的关联属性,SQLAlchemy 需要 greenlet 来执行懒加载。

解决: 使用 selectinloadjoinedload 预加载:

1
2
3
4
from sqlalchemy.orm import selectinload

stmt = select(User).options(selectinload(User.conversations))
result = await session.execute(stmt)

Q2: Redis 连接池耗尽

原因: 创建了太多 Redis 客户端实例,每个都创建新连接。

解决: 全局共享一个连接池:

1
2
3
4
5
6
7
8
9
# ✅ 正确:共享连接池
redis_pool = aioredis.ConnectionPool.from_url(DSN)

async def get_redis():
return aioredis.Redis(connection_pool=redis_pool)

# ❌ 错误:每次请求创建新连接池
async def get_redis():
return aioredis.from_url(DSN)

Q3: MongoDB 游标超时

原因: 长时间未消费的游标被 MongoDB 服务器自动关闭。

解决: 设置 no_cursor_timeout=True 并确保游标被关闭:

1
2
3
4
5
cursor = collection.find().no_cursor_timeout(True)
try:
results = await cursor.to_list(length=1000)
finally:
await cursor.close()

Q4: 如何做数据库迁移?

PostgreSQL 推荐使用 Alembic,MongoDB 用脚本管理索引变更:

1
2
3
alembic init migrations
alembic revision --autogenerate -m "add user table"
alembic upgrade head

十、完整项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
project/
├── main.py # 应用入口 + lifespan
├── config.py # Pydantic Settings
├── docker-compose.yml # 三数据库编排
├── .env # 环境变量
├── database/
│ ├── __init__.py
│ ├── postgres.py # SQLAlchemy engine + session
│ ├── redis.py # Redis 连接池
│ └── mongodb.py # Motor client
├── models/
│ ├── __init__.py
│ ├── user.py # SQLAlchemy ORM
│ └── conversation.py # Pydantic 文档模型
├── repositories/
│ ├── __init__.py
│ ├── user_repo.py # PostgreSQL 仓储
│ ├── conversation_repo.py # MongoDB 仓储
│ └── cache_repo.py # Redis 仓储
├── services/
│ ├── __init__.py
│ └── user_service.py # 业务逻辑 + 事务协调
├── routers/
│ ├── __init__.py
│ ├── health.py # 健康检查
│ └── users.py # 用户路由
└── dependencies.py # FastAPI 依赖注入

总结

FastAPI 多数据库架构的核心在于三点:

  1. 连接管理:每个数据库使用独立的连接池,通过 FastAPI 的 lifespan 管理生命周期
  2. 仓储模式:每个数据库有独立的 Repository,Service 层组合多个 Repository 完成业务逻辑
  3. 事务协调:明确事务边界,PostgreSQL 负责强一致性,Redis/MongoDB 负责高性能和灵活性

这套架构在 ChatLingo AI 语言学习平台中得到验证,支撑了用户管理(PG)、会话缓存(Redis)和对话历史存储(MongoDB)三套数据系统的协同工作。