FastAPI 多数据库架构实战:PostgreSQL + Redis + MongoDB 三库协同
FastAPI 多数据库架构实战:PostgreSQL + Redis + MongoDB 三库协同 简介 现代后端应用很少只用一种数据库。关系型数据库(PostgreSQL)负责事务性数据和强一致性查询,缓存数据库(Redis)扛高并发读写和会话管理,文档数据库(MongoDB)存储非结构化数据和海量日志。将它们整合到同一个 FastAPI 应用中,需要一套清晰的架构设计。
本文以 AI 语言学习平台的后端为背景,完整讲解如何在 FastAPI 中同时使用 PostgreSQL(SQLAlchemy async)、Redis(redis-py)和 MongoDB(Motor),包括连接管理、依赖注入、事务协调、健康检查和生产部署。所有代码可直接用于生产项目。
前置要求
Python 3.11+
FastAPI 基础(路由、依赖注入、Pydantic 模型)
了解 SQLAlchemy 2.0 async 基本用法
Docker 和 Docker Compose(用于本地运行三数据库)
已安装依赖:pip install fastapi uvicorn sqlalchemy[asyncio] asyncpg redis motor pydantic-settings
一、架构总览 1.1 三库分工
数据库
职责
典型数据
访问方式
PostgreSQL
核心业务数据、关系查询、事务
用户、订单、权限
SQLAlchemy 2.0 async ORM
Redis
缓存、会话、速率限制、发布订阅
会话 Token、缓存结果、实时消息
redis-py async
MongoDB
非结构化数据、日志、对话历史
聊天记录、操作日志、分析事件
Motor async driver
1.2 分层架构 1 2 3 4 5 6 7 8 9 10 11 12 13 ┌─────────────────────────────────────────────┐ │ API Layer │ │ FastAPI Routers / Dependencies │ ├─────────────────────────────────────────────┤ │ Service Layer │ │ Business Logic / Transaction Coordinator │ ├────────┬────────┬────────────────────────────┤ │ PG │ Redis │ MongoDB │ │ Repo │ Cache │ Repo │ ├────────┴────────┴────────────────────────────┤ │ Database Connections │ │ SQLAlchemy async │ redis-py │ Motor │ └─────────────────────────────────────────────┘
二、配置管理 2.1 环境变量与 Pydantic Settings 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from pydantic_settings import BaseSettingsfrom functools import lru_cacheclass Settings (BaseSettings ): postgres_dsn: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/chatlingo" postgres_pool_size: int = 20 postgres_max_overflow: int = 10 redis_dsn: str = "redis://localhost:***@localhost:27017" mongodb_dsn: str = "mongodb://root:***@localhost:27017" mongodb_database: str = "chatlingo" mongodb_max_pool_size: int = 10 model_config = {"env_file" : ".env" , "env_file_encoding" : "utf-8" } @lru_cache def get_settings () -> Settings: return Settings()
2.2 Docker Compose 编排三数据库 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 version: "3.8" services: postgres: image: postgres:16-alpine environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres POSTGRES_DB: chatlingo ports: - "5432:5432" volumes: - pgdata:/var/lib/postgresql/data healthcheck: test: ["CMD-SHELL" , "pg_isready -U postgres" ] interval: 5s timeout: 3s retries: 5 redis: image: redis:7-alpine ports: - "6379:6379" volumes: - redisdata:/data healthcheck: test: ["CMD" , "redis-cli" , "ping" ] interval: 5s timeout: 3s retries: 5 mongo: image: mongo:7 environment: MONGO_INITDB_ROOT_USERNAME: root MONGO_INITDB_ROOT_PASSWORD: example ports: - "27017:27017" volumes: - mongodata:/data/db healthcheck: test: echo 'db.runCommand("ping").ok' | mongosh --quiet interval: 5s timeout: 3s retries: 5 volumes: pgdata: redisdata: mongodata:
三、数据库连接管理 3.1 PostgreSQL — SQLAlchemy 2.0 Async 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 from sqlalchemy.ext.asyncio import ( AsyncSession, async_sessionmaker, create_async_engine, ) from sqlalchemy.orm import DeclarativeBasefrom config import get_settingsclass Base (DeclarativeBase ): pass engine = create_async_engine( get_settings().postgres_dsn, pool_size=get_settings().postgres_pool_size, max_overflow=get_settings().postgres_max_overflow, echo=False , pool_pre_ping=True , ) async_session_factory = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) async def get_db_session () -> AsyncSession: async with async_session_factory() as session: try : yield session await session.commit() except Exception: await session.rollback() raise finally : await session.close()
关键设计决策:
expire_on_commit=False:提交后不自动过期对象,避免在序列化时触发懒加载
pool_pre_ping=True:每次从连接池取连接前先 ping 一下,防止拿到已断开的连接
async_session_factory 是线程安全的工厂,每个请求通过 get_db_session 获取独立会话
3.2 Redis — 连接池与异步客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import redis.asyncio as aioredisfrom config import get_settingsredis_pool = aioredis.ConnectionPool.from_url( get_settings().redis_dsn, max_connections=20 , decode_responses=True , ) async def get_redis () -> aioredis.Redis: redis = aioredis.Redis(connection_pool=redis_pool) try : yield redis finally : pass
关键点: Redis 连接池在应用启动时创建一次,所有请求共享。decode_responses=True 让返回结果自动从 bytes 转为 str。
3.3 MongoDB — Motor 异步驱动 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 from motor.motor_asyncio import AsyncIOMotorClientfrom config import get_settingsmongo_client: AsyncIOMotorClient | None = None async def init_mongo (): global mongo_client settings = get_settings() mongo_client = AsyncIOMotorClient( settings.mongodb_dsn, maxPoolSize=settings.mongodb_max_pool_size, ) async def close_mongo (): global mongo_client if mongo_client: mongo_client.close() mongo_client = None def get_database (): if mongo_client is None : raise RuntimeError("MongoDB not initialized" ) return mongo_client[get_settings().mongodb_database] async def get_mongo_db (): yield get_database()
3.4 应用生命周期管理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from contextlib import asynccontextmanagerfrom fastapi import FastAPIfrom database.postgres import engine, Basefrom database.mongodb import init_mongo, close_mongofrom database.redis import redis_pool@asynccontextmanager async def lifespan (app: FastAPI ): async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) await init_mongo() yield await engine.dispose() await close_mongo() await redis_pool.disconnect() app = FastAPI(lifespan=lifespan)
四、数据模型与仓储模式 4.1 PostgreSQL ORM 模型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import uuidfrom datetime import datetimefrom sqlalchemy import String, DateTime, funcfrom sqlalchemy.dialects.postgresql import UUIDfrom sqlalchemy.orm import Mapped, mapped_columnfrom database.postgres import Baseclass User (Base ): __tablename__ = "users" id : Mapped[uuid.UUID] = mapped_column( UUID(as_uuid=True ), primary_key=True , default=uuid.uuid4 ) username: Mapped[str ] = mapped_column( String(50 ), unique=True , nullable=False , index=True ) email: Mapped[str ] = mapped_column( String(255 ), unique=True , nullable=False , index=True ) hashed_password: Mapped[str ] = mapped_column(String(255 ), nullable=False ) language_preference: Mapped[str ] = mapped_column( String(10 ), default="en" , nullable=False ) cefr_level: Mapped[str | None ] = mapped_column(String(5 ), default="A1" ) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True ), server_default=func.now() ) updated_at: Mapped[datetime] = mapped_column( DateTime(timezone=True ), server_default=func.now(), onupdate=func.now() )
4.2 MongoDB 文档模型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from datetime import datetimefrom pydantic import BaseModel, Fieldfrom typing import List , Optional class Message (BaseModel ): role: str content: str timestamp: datetime = Field(default_factory=datetime.utcnow) metadata: dict = Field(default_factory=dict ) class Conversation (BaseModel ): id : str = Field(alias="_id" ) user_id: str title: str = "新对话" messages: List [Message] = [] created_at: datetime = Field(default_factory=datetime.utcnow) updated_at: datetime = Field(default_factory=datetime.utcnow)
4.3 仓储模式实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from uuid import UUIDfrom sqlalchemy import selectfrom sqlalchemy.ext.asyncio import AsyncSessionfrom models.user import Userclass UserRepository : def __init__ (self, session: AsyncSession ): self.session = session async def get_by_id (self, user_id: UUID ) -> User | None : result = await self.session.execute( select(User).where(User.id == user_id) ) return result.scalar_one_or_none() async def get_by_email (self, email: str ) -> User | None : result = await self.session.execute( select(User).where(User.email == email) ) return result.scalar_one_or_none() async def create (self, user: User ) -> User: self.session.add(user) await self.session.flush() return user
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 from motor.motor_asyncio import AsyncIOMotorDatabasefrom bson.objectid import ObjectIdclass ConversationRepository : def __init__ (self, db: AsyncIOMotorDatabase ): self.collection = db["conversations" ] async def create (self, user_id: str , title: str = "新对话" ) -> str : result = await self.collection.insert_one({ "user_id" : user_id, "title" : title, "messages" : [], "created_at" : None , "updated_at" : None , }) return str (result.inserted_id) async def append_message ( self, conversation_id: str , message: dict ) -> bool : result = await self.collection.update_one( {"_id" : ObjectId(conversation_id)}, { "$push" : {"messages" : message}, "$set" : {"updated_at" : None }, }, ) return result.modified_count > 0 async def get_recent ( self, user_id: str , limit: int = 20 ) -> list [dict ]: cursor = self.collection.find( {"user_id" : user_id} ).sort("updated_at" , -1 ).limit(limit) return await cursor.to_list(length=limit)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import jsonimport redis.asyncio as aioredisfrom typing import Any , Optional class CacheRepository : def __init__ (self, redis: aioredis.Redis ): self.redis = redis async def get (self, key: str ) -> Optional [Any ]: data = await self.redis.get(key) if data is None : return None return json.loads(data) async def set (self, key: str , value: Any , ttl: int = 300 ) -> None : await self.redis.set (key, json.dumps(value), ex=ttl) async def delete (self, key: str ) -> None : await self.redis.delete(key) async def exists (self, key: str ) -> bool : return await self.redis.exists(key) > 0
五、Service 层:三库协同 5.1 事务协调器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from uuid import UUIDfrom sqlalchemy.ext.asyncio import AsyncSessionfrom redis.asyncio import Redisfrom motor.motor_asyncio import AsyncIOMotorDatabasefrom repositories.user_repo import UserRepositoryfrom repositories.cache_repo import CacheRepositoryfrom repositories.conversation_repo import ConversationRepositoryclass UserService : def __init__ ( self, pg_session: AsyncSession, redis: Redis, mongo_db: AsyncIOMotorDatabase, ): self.user_repo = UserRepository(pg_session) self.cache_repo = CacheRepository(redis) self.conv_repo = ConversationRepository(mongo_db) async def register_user (self, user_data: dict ) -> dict : user = await self.user_repo.create( User( username=user_data["username" ], email=user_data["email" ], hashed_password=hash_password(user_data["password" ]), ) ) await self.cache_repo.set ( f"user:{user.id } " , {"id" : str (user.id ), "username" : user.username, "email" : user.email}, ttl=3600 , ) conv_id = await self.conv_repo.create( user_id=str (user.id ), title="欢迎" ) return { "user_id" : str (user.id ), "welcome_conversation_id" : conv_id, }
5.2 缓存穿透防护 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 async def get_user_profile (self, user_id: UUID ) -> dict : cache_key = f"user:{user_id} " cached = await self.cache_repo.get(cache_key) if cached is not None : return cached user = await self.user_repo.get_by_id(user_id) if user is None : raise HTTPException(status_code=404 , detail="User not found" ) profile = { "id" : str (user.id ), "username" : user.username, "email" : user.email, "language" : user.language_preference, "level" : user.cefr_level, } await self.cache_repo.set (cache_key, profile, ttl=300 ) return profile
六、FastAPI 依赖注入整合 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from fastapi import Dependsfrom sqlalchemy.ext.asyncio import AsyncSessionfrom redis.asyncio import Redisfrom motor.motor_asyncio import AsyncIOMotorDatabasefrom database.postgres import get_db_sessionfrom database.redis import get_redisfrom database.mongodb import get_mongo_dbclass DatabaseHolder : def __init__ ( self, pg: AsyncSession = Depends(get_db_session ), redis: Redis = Depends(get_redis ), mongo: AsyncIOMotorDatabase = Depends(get_mongo_db ), ): self.pg = pg self.redis = redis self.mongo = mongo @router.post("/users/register" ) async def register ( data: UserCreate, db: DatabaseHolder = Depends( ), ): service = UserService(db.pg, db.redis, db.mongo) return await service.register_user(data.model_dump())
七、健康检查端点 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 from fastapi import APIRouter, Dependsfrom sqlalchemy import textfrom redis.asyncio import Redisfrom motor.motor_asyncio import AsyncIOMotorDatabasefrom database.postgres import get_db_sessionfrom database.redis import get_redisfrom database.mongodb import get_mongo_dbrouter = APIRouter(tags=["health" ]) @router.get("/health" ) async def health_check ( pg_session=Depends(get_db_session ), redis: Redis = Depends(get_redis ), mongo_db: AsyncIOMotorDatabase = Depends(get_mongo_db ), ): status = {"status" : "ok" , "databases" : {}} try : await pg_session.execute(text("SELECT 1" )) status["databases" ]["postgresql" ] = "connected" except Exception as e: status["databases" ]["postgresql" ] = f"error: {str (e)} " status["status" ] = "degraded" try : await redis.ping() status["databases" ]["redis" ] = "connected" except Exception as e: status["databases" ]["redis" ] = f"error: {str (e)} " status["status" ] = "degraded" try : await mongo_db.command("ping" ) status["databases" ]["mongodb" ] = "connected" except Exception as e: status["databases" ]["mongodb" ] = f"error: {str (e)} " status["status" ] = "degraded" return status
八、生产部署注意事项 8.1 连接池调优
参数
PostgreSQL
Redis
MongoDB
连接池大小
CPU 核心数 × 2~4
应用实例数 × 10
CPU 核心数 × 2
超时
30s 连接超时
5s 操作超时
10s 操作超时
健康检查
pool_pre_ping=True
内置
内置
8.2 事务边界 PostgreSQL 事务通过 get_db_session 的 commit/rollback 管理。Redis 和 MongoDB 不支持跨数据库事务。关键原则:
PostgreSQL 优先提交 :涉及 PostgreSQL 写入的操作先提交,再操作 Redis/MongoDB
最终一致性 :Redis/MongoDB 操作失败时,通过重试或补偿机制恢复
幂等设计 :所有跨库操作尽量设计为幂等的
九、常见问题 Q1: SQLAlchemy 报 MissingGreenlet 错误 原因: 在 async 模式下访问未加载的关联属性,SQLAlchemy 需要 greenlet 来执行懒加载。
解决: 使用 selectinload 或 joinedload 预加载:
1 2 3 4 from sqlalchemy.orm import selectinloadstmt = select(User).options(selectinload(User.conversations)) result = await session.execute(stmt)
Q2: Redis 连接池耗尽 原因: 创建了太多 Redis 客户端实例,每个都创建新连接。
解决: 全局共享一个连接池:
1 2 3 4 5 6 7 8 9 redis_pool = aioredis.ConnectionPool.from_url(DSN) async def get_redis (): return aioredis.Redis(connection_pool=redis_pool) async def get_redis (): return aioredis.from_url(DSN)
Q3: MongoDB 游标超时 原因: 长时间未消费的游标被 MongoDB 服务器自动关闭。
解决: 设置 no_cursor_timeout=True 并确保游标被关闭:
1 2 3 4 5 cursor = collection.find().no_cursor_timeout(True ) try : results = await cursor.to_list(length=1000 ) finally : await cursor.close()
Q4: 如何做数据库迁移? PostgreSQL 推荐使用 Alembic,MongoDB 用脚本管理索引变更:
1 2 3 alembic init migrations alembic revision --autogenerate -m "add user table" alembic upgrade head
十、完整项目结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 project/ ├── main.py # 应用入口 + lifespan ├── config.py # Pydantic Settings ├── docker-compose.yml # 三数据库编排 ├── .env # 环境变量 ├── database/ │ ├── __init__.py │ ├── postgres.py # SQLAlchemy engine + session │ ├── redis.py # Redis 连接池 │ └── mongodb.py # Motor client ├── models/ │ ├── __init__.py │ ├── user.py # SQLAlchemy ORM │ └── conversation.py # Pydantic 文档模型 ├── repositories/ │ ├── __init__.py │ ├── user_repo.py # PostgreSQL 仓储 │ ├── conversation_repo.py # MongoDB 仓储 │ └── cache_repo.py # Redis 仓储 ├── services/ │ ├── __init__.py │ └── user_service.py # 业务逻辑 + 事务协调 ├── routers/ │ ├── __init__.py │ ├── health.py # 健康检查 │ └── users.py # 用户路由 └── dependencies.py # FastAPI 依赖注入
总结 FastAPI 多数据库架构的核心在于三点:
连接管理 :每个数据库使用独立的连接池,通过 FastAPI 的 lifespan 管理生命周期
仓储模式 :每个数据库有独立的 Repository,Service 层组合多个 Repository 完成业务逻辑
事务协调 :明确事务边界,PostgreSQL 负责强一致性,Redis/MongoDB 负责高性能和灵活性
这套架构在 ChatLingo AI 语言学习平台中得到验证,支撑了用户管理(PG)、会话缓存(Redis)和对话历史存储(MongoDB)三套数据系统的协同工作。