LLM 生产管道架构设计:Provider 抽象、熔断、语义缓存与流式输出

LLM 生产管道架构设计:Provider 抽象、熔断、语义缓存与流式输出

简介

将 LLM 集成到生产应用中远不止调用一个 API 那么简单。你需要处理多 Provider 切换、API 故障熔断、重复查询缓存、流式响应、上下文窗口管理等一系列工程问题。如果这些逻辑散落在业务代码中,项目很快就会变得难以维护。

本文从零构建一个生产级的 LLM 管道(LLM Pipeline),包含 Provider 抽象层、ModelRouter、Circuit Breaker(熔断器)、Semantic Cache(语义缓存)、Streaming 流式输出和 Context Builder(上下文构建器)。所有代码来自 AI 对话产品的实战经验,可直接复用。

前置要求

  • Python 3.11+
  • FastAPI 基础(路由、依赖注入)
  • 了解 async/await 异步编程
  • 有调用 OpenAI / DeepSeek / Claude API 的经验
  • 已安装:pip install openai redis numpy scikit-learn pydantic

一、架构总览

1.1 管道设计模式

LLM 管道采用管道过滤器(Pipeline & Filter)模式,每个组件负责一个独立关注点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
用户请求


┌──────────────┐
│ Context │ 构建系统提示词 + 对话历史 + 用户输入
│ Builder │ → 组装成完整的 messages 数组
└──────┬───────┘

┌──────────────┐
│ Semantic │ 语义相似度匹配,命中则直接返回缓存
│ Cache │ → 降低延迟 60-80%,节省 Token 费用
└──────┬───────┘

┌──────────────┐
│ Circuit │ 检测 Provider 故障率,自动熔断
│ Breaker │ → 防止级联故障
└──────┬───────┘

┌──────────────┐
│ ModelRouter │ 按策略选择 Provider + Model
│ │ → 主备切换、按成本路由、按能力路由
└──────┬───────┘

┌──────────────┐
│ LLM Provider │ 统一接口,屏蔽 API 差异
│ Abstraction │ → OpenAI / DeepSeek / Mock
└──────┬───────┘

响应输出(流式 / 非流式)

1.2 核心接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import AsyncIterator, Optional


@dataclass
class LLMRequest:
messages: list[dict] # OpenAI 格式的 messages
model: str = "gpt-4o-mini"
temperature: float = 0.7
max_tokens: int = 2048
stream: bool = False
user_id: Optional[str] = None


@dataclass
class LLMResponse:
content: str
model: str
provider: str
usage: dict # {"prompt_tokens": N, "completion_tokens": N}
cached: bool = False

二、Provider 抽象层

2.1 统一接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# llm/providers/base.py
class BaseLLMProvider(ABC):
"""所有 LLM Provider 必须实现的接口"""

@abstractmethod
async def chat(self, request: LLMRequest) -> LLMResponse:
"""非流式对话"""
...

@abstractmethod
async def chat_stream(
self, request: LLMRequest
) -> AsyncIterator[str]:
"""流式对话,逐 chunk 产出文本"""
...

@property
@abstractmethod
def provider_name(self) -> str:
"""Provider 标识,如 'openai', 'deepseek'"""
...

2.2 OpenAI Provider 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# llm/providers/openai_provider.py
from openai import AsyncOpenAI
from .base import BaseLLMProvider, LLMRequest, LLMResponse


class OpenAIProvider(BaseLLMProvider):
def __init__(self, api_key: str, base_url: str = None):
self.client = AsyncOpenAI(
api_key=api_key,
base_url=base_url, # 兼容 DeepSeek 等兼容 OpenAI API 的服务
)

@property
def provider_name(self) -> str:
return "openai"

async def chat(self, request: LLMRequest) -> LLMResponse:
response = await self.client.chat.completions.create(
model=request.model,
messages=request.messages,
temperature=request.temperature,
max_tokens=request.max_tokens,
stream=False,
)
choice = response.choices[0]
return LLMResponse(
content=choice.message.content,
model=response.model,
provider=self.provider_name,
usage={
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
},
)

async def chat_stream(
self, request: LLMRequest
) -> AsyncIterator[str]:
stream = await self.client.chat.completions.create(
model=request.model,
messages=request.messages,
temperature=request.temperature,
max_tokens=request.max_tokens,
stream=True,
)
async for chunk in stream:
delta = chunk.choices[0].delta if chunk.choices else None
if delta and delta.content:
yield delta.content

2.3 Mock Provider(测试用)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# llm/providers/mock_provider.py
import asyncio
from .base import BaseLLMProvider, LLMRequest, LLMResponse


class MockProvider(BaseLLMProvider):
"""开发/测试用,模拟 LLM 响应"""

def __init__(self, delay: float = 0.1, fail_rate: float = 0.0):
self.delay = delay
self.fail_rate = fail_rate # 0.0 ~ 1.0,模拟故障率

@property
def provider_name(self) -> str:
return "mock"

async def chat(self, request: LLMRequest) -> LLMResponse:
await asyncio.sleep(self.delay)
if random.random() < self.fail_rate:
raise RuntimeError("Simulated provider failure")

last_msg = request.messages[-1]["content"]
return LLMResponse(
content=f"[Mock] Echo: {last_msg[:50]}...",
model="mock-model",
provider=self.provider_name,
usage={"prompt_tokens": 50, "completion_tokens": 20},
)

async def chat_stream(
self, request: LLMRequest
) -> AsyncIterator[str]:
response = await self.chat(request)
for char in response.content:
yield char
await asyncio.sleep(0.02) # 模拟流式延迟

三、ModelRouter:智能路由

3.1 路由策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# llm/router.py
import random
from typing import Optional
from .providers.base import BaseLLMProvider


class RouteStrategy:
"""路由策略枚举"""
PRIORITY = "priority" # 按优先级顺序尝试
FALLBACK = "fallback" # 主用 + 备用
COST_FIRST = "cost_first" # 优先使用成本最低的
RANDOM = "random" # 随机选择


class ModelRouter:
"""
模型路由器:按策略选择合适的 Provider + Model
"""

def __init__(self, strategy: str = RouteStrategy.FALLBACK):
self.strategy = strategy
self._routes: list[dict] = []

def add_route(
self,
provider: BaseLLMProvider,
model: str,
priority: int = 0,
cost_per_1k: float = 0.0,
):
"""注册一条路由"""
self._routes.append({
"provider": provider,
"model": model,
"priority": priority,
"cost": cost_per_1k,
})
self._routes.sort(key=lambda r: r["priority"])

async def route(
self, request: LLMRequest
) -> tuple[BaseLLMProvider, str]:
"""根据策略选择 Provider + Model"""
if self.strategy == RouteStrategy.PRIORITY:
return self._route_by_priority()

elif self.strategy == RouteStrategy.FALLBACK:
return self._route_by_fallback(request.model)

elif self.strategy == RouteStrategy.COST_FIRST:
return self._route_by_cost()

elif self.strategy == RouteStrategy.RANDOM:
route = random.choice(self._routes)
return route["provider"], route["model"]

return self._routes[0]["provider"], self._routes[0]["model"]

def _route_by_priority(self):
route = self._routes[0]
return route["provider"], route["model"]

def _route_by_fallback(self, preferred_model: str):
"""主用指定 model,不可用时 fallback 到优先级下一个"""
for route in self._routes:
if route["model"] == preferred_model:
return route["provider"], route["model"]
# Fallback 到最高优先级
return self._routes[0]["provider"], self._routes[0]["model"]

def _route_by_cost(self):
route = min(self._routes, key=lambda r: r["cost"])
return route["provider"], route["model"]

3.2 路由配置示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 初始化 Provider
openai = OpenAIProvider(api_key="sk-xxx")
deepseek = OpenAIProvider(
api_key="sk-ds-xxx",
base_url="https://api.deepseek.com/v1",
)
mock = MockProvider()

# 配置路由
router = ModelRouter(strategy=RouteStrategy.FALLBACK)
router.add_route(openai, "gpt-4o", priority=1, cost_per_1k=10.0)
router.add_route(deepseek, "deepseek-chat", priority=2, cost_per_1k=0.5)
router.add_route(mock, "mock-model", priority=3, cost_per_1k=0.0)

# 使用
provider, model = await router.route(request)
response = await provider.chat(request)

四、Circuit Breaker:熔断器

4.1 状态机实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# llm/circuit_breaker.py
import asyncio
import time
from enum import Enum
from typing import Optional


class CircuitState(Enum):
CLOSED = "closed" # 正常,请求通过
OPEN = "open" # 熔断,直接拒绝
HALF_OPEN = "half_open" # 半开,尝试恢复


class CircuitBreaker:
"""
熔断器:保护 LLM Provider 不被过量请求打垮

状态转换:
CLOSED → OPEN (失败次数超过阈值)
OPEN → HALF_OPEN (等待超时后)
HALF_OPEN → CLOSED (探测请求成功)
HALF_OPEN → OPEN (探测请求失败)
"""

def __init__(
self,
failure_threshold: int = 5, # 连续失败 N 次后熔断
recovery_timeout: float = 30.0, # 熔断持续时间(秒)
half_open_max_requests: int = 1, # 半开状态允许的探测请求数
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_requests = half_open_max_requests

self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time: Optional[float] = None
self.half_open_requests = 0
self._lock = asyncio.Lock()

async def call(self, func, *args, **kwargs):
"""安全调用被保护的函数"""
async with self._lock:
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_requests = 0
else:
raise CircuitBreakerOpenError(
f"Circuit breaker is OPEN for {self._remaining_cooldown():.0f}s"
)

if self.state == CircuitState.HALF_OPEN:
if self.half_open_requests >= self.half_open_max_requests:
raise CircuitBreakerOpenError(
"Circuit breaker is HALF_OPEN, max probe requests reached"
)
self.half_open_requests += 1

try:
result = await func(*args, **kwargs)
except Exception as e:
async with self._lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise e

# 成功
async with self._lock:
if self.state == CircuitState.HALF_OPEN:
# 探测成功,恢复
self.state = CircuitState.CLOSED
self.failure_count = 0
self.half_open_requests = 0
else:
# 正常成功,重置失败计数
self.failure_count = 0

return result

def _remaining_cooldown(self) -> float:
if self.last_failure_time is None:
return 0
elapsed = time.time() - self.last_failure_time
return max(0.0, self.recovery_timeout - elapsed)


class CircuitBreakerOpenError(Exception):
pass

4.2 在管道中使用

1
2
3
4
5
6
7
8
9
10
11
12
13
# 为每个 Provider 创建独立的熔断器
circuit_breakers = {
"openai": CircuitBreaker(failure_threshold=3, recovery_timeout=30),
"deepseek": CircuitBreaker(failure_threshold=5, recovery_timeout=60),
}

async def safe_chat(provider: BaseLLMProvider, request: LLMRequest):
cb = circuit_breakers[provider.provider_name]
try:
return await cb.call(provider.chat, request)
except CircuitBreakerOpenError:
# 熔断中,尝试下一个 Provider
raise

五、Semantic Cache:语义缓存

5.1 缓存原理

语义缓存不是简单的 key-value 精确匹配,而是通过文本嵌入向量计算语义相似度。当用户问”今天天气怎么样”时,缓存能命中之前缓存的”今天天气如何”。

1
2
3
用户输入 → 嵌入向量 → 向量相似度搜索 → 命中则返回缓存
↓ 未命中
调用 LLM → 缓存结果

5.2 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# llm/semantic_cache.py
import json
import hashlib
import numpy as np
from typing import Optional
from sklearn.metrics.pairwise import cosine_similarity


class SemanticCache:
"""
语义缓存:基于嵌入向量的相似度匹配

使用简单的 numpy 实现,生产环境可替换为 Milvus / Pinecone / Qdrant
"""

def __init__(
self,
redis_client,
similarity_threshold: float = 0.92,
ttl: int = 3600,
):
self.redis = redis_client
self.similarity_threshold = similarity_threshold
self.ttl = ttl

async def get(self, query: str, query_embedding: list[float]) -> Optional[str]:
"""查找语义相似的缓存"""
# 1. 从 Redis 获取所有缓存的 key
cursor, keys = await self.redis.scan(match="llm:cache:*")
if not keys:
return None

best_score = 0.0
best_key = None

for key in keys:
# 2. 读取缓存的嵌入向量
cached_data = await self.redis.get(key)
if cached_data is None:
continue

entry = json.loads(cached_data)
cached_embedding = entry["embedding"]

# 3. 计算余弦相似度
score = cosine_similarity(
[query_embedding], [cached_embedding]
)[0][0]

if score > best_score:
best_score = score
best_key = key

# 4. 超过阈值则命中
if best_score >= self.similarity_threshold:
entry = json.loads(await self.redis.get(best_key))
return entry["response"]

return None

async def set(
self,
query: str,
query_embedding: list[float],
response: str,
) -> None:
"""缓存 LLM 响应"""
key = f"llm:cache:{hashlib.md5(query.encode()).hexdigest()}"
entry = {
"query": query,
"embedding": query_embedding,
"response": response,
"timestamp": None, # 用 Redis TTL 管理过期
}
await self.redis.set(key, json.dumps(entry), ex=self.ttl)

async def invalidate(self, pattern: str = "llm:cache:*") -> int:
"""清除缓存"""
cursor, keys = await self.redis.scan(match=pattern)
if keys:
return await self.redis.delete(*keys)
return 0

5.3 嵌入向量生成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# llm/embeddings.py
from openai import AsyncOpenAI


class EmbeddingService:
"""生成文本嵌入向量"""

def __init__(self, api_key: str, model: str = "text-embedding-3-small"):
self.client = AsyncOpenAI(api_key=api_key)
self.model = model

async def embed(self, text: str) -> list[float]:
response = await self.client.embeddings.create(
model=self.model,
input=text,
)
return response.data[0].embedding

async def embed_batch(self, texts: list[str]) -> list[list[float]]:
response = await self.client.embeddings.create(
model=self.model,
input=texts,
)
return [data.embedding for data in response.data]

六、Context Builder:上下文构建器

6.1 系统提示词体系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# llm/context_builder.py
from typing import Optional


class ContextBuilder:
"""
构建 LLM 调用的 messages 数组

支持三层结构:
1. System Prompt(角色设定 + 行为约束)
2. Conversation History(对话历史)
3. Current Input(当前用户输入)
"""

def __init__(self, system_prompt: str, max_context_tokens: int = 8000):
self.system_prompt = system_prompt
self.max_context_tokens = max_context_tokens

def build(
self,
user_input: str,
history: Optional[list[dict]] = None,
extra_context: Optional[dict] = None,
) -> list[dict]:
messages = [{"role": "system", "content": self.system_prompt}]

# 添加额外上下文(用户资料、知识库片段等)
if extra_context:
context_block = self._format_context(extra_context)
messages.append({
"role": "system",
"content": f"[Context]\n{context_block}",
})

# 添加对话历史(按 token 预算裁剪)
if history:
messages.extend(self._trim_history(history))

# 添加当前输入
messages.append({"role": "user", "content": user_input})

return messages

def _format_context(self, ctx: dict) -> str:
"""格式化额外上下文"""
parts = []
if "user_profile" in ctx:
p = ctx["user_profile"]
parts.append(
f"User: {p.get('name', 'Unknown')}\n"
f"Language Level: {p.get('cefr_level', 'A1')}\n"
f"Learning Goal: {p.get('goal', 'General')}"
)
if "knowledge" in ctx:
parts.append(f"Reference:\n{ctx['knowledge']}")
return "\n\n".join(parts)

def _trim_history(
self, history: list[dict], max_messages: int = 20
) -> list[dict]:
"""裁剪对话历史,保留最近的 N 条"""
return history[-max_messages:]

6.2 教育型 System Prompt 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
TEACHER_SYSTEM_PROMPT = """You are an English conversation tutor named Alex.

## Teaching Approach
- Adapt language complexity to the student's CEFR level (A1-C2)
- Correct errors naturally by modeling correct usage in your response
- Use the "sandwich method": acknowledge → correct → continue
- Ask follow-up questions to keep the conversation flowing

## Behavior Rules
- NEVER output translations unless explicitly asked
- NEVER use Chinese unless the student is at A1 level
- Keep responses concise: 2-3 sentences for A1, 3-5 for B1, full paragraphs for C1
- After every 3 exchanges, insert a teaching card with a grammar/vocabulary tip

## Error Correction
- For minor errors: model the correction naturally in your response
- For major errors: gently point it out and provide the correct form
- Never make the student feel embarrassed about mistakes

## Conversation Flow
1. Start with a greeting and a simple question
2. Listen and respond naturally
3. Gradually introduce new vocabulary
4. Periodically review previously learned words
5. End with a preview of the next topic"""

七、管道组装:完整管线

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# llm/pipeline.py
import asyncio
from typing import AsyncIterator, Optional
from .providers.base import LLMRequest, LLMResponse
from .router import ModelRouter
from .circuit_breaker import CircuitBreaker, CircuitBreakerOpenError
from .semantic_cache import SemanticCache
from .context_builder import ContextBuilder
from .embeddings import EmbeddingService


class LLMPipeline:
"""
完整的 LLM 调用管道

调用流程:
1. ContextBuilder 组装 messages
2. SemanticCache 检查缓存
3. CircuitBreaker 保护调用
4. ModelRouter 选择 Provider
5. Provider 执行调用
6. 缓存结果
"""

def __init__(
self,
router: ModelRouter,
context_builder: ContextBuilder,
cache: Optional[SemanticCache] = None,
embedding_service: Optional[EmbeddingService] = None,
circuit_breaker: Optional[CircuitBreaker] = None,
):
self.router = router
self.context_builder = context_builder
self.cache = cache
self.embedding_service = embedding_service
self.circuit_breaker = circuit_breaker

async def chat(
self,
user_input: str,
history: Optional[list[dict]] = None,
context: Optional[dict] = None,
stream: bool = False,
) -> LLMResponse | AsyncIterator[str]:
"""执行完整的 LLM 调用管道"""

# 1. 构建上下文
messages = self.context_builder.build(user_input, history, context)

# 2. 尝试语义缓存(仅非流式)
if self.cache and self.embedding_service and not stream:
query = messages[-1]["content"]
query_embedding = await self.embedding_service.embed(query)
cached = await self.cache.get(query, query_embedding)
if cached is not None:
return LLMResponse(
content=cached,
model="cache",
provider="cache",
usage={"prompt_tokens": 0, "completion_tokens": 0},
cached=True,
)

# 3. 创建请求
request = LLMRequest(
messages=messages,
stream=stream,
)

# 4. 路由 + 熔断保护
last_error = None
for attempt in range(3): # 最多尝试 3 个 Provider
try:
provider, model = await self.router.route(request)
request.model = model

if self.circuit_breaker:
if stream:
response = await self.circuit_breaker.call(
provider.chat_stream, request
)
else:
response = await self.circuit_breaker.call(
provider.chat, request
)
else:
if stream:
response = provider.chat_stream(request)
else:
response = await provider.chat(request)

# 5. 缓存结果(非流式)
if (
self.cache
and self.embedding_service
and not stream
and not response.cached
):
await self.cache.set(
user_input,
query_embedding,
response.content,
)

return response

except (CircuitBreakerOpenError, Exception) as e:
last_error = e
# 标记当前 Provider 不可用,Router 下次选别的
continue

raise RuntimeError(
f"All providers failed. Last error: {last_error}"
)

async def chat_stream(
self,
user_input: str,
history: Optional[list[dict]] = None,
context: Optional[dict] = None,
) -> AsyncIterator[str]:
"""流式版本"""
result = await self.chat(
user_input, history, context, stream=True
)
async for chunk in result:
yield chunk

八、FastAPI 集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# routers/chat.py
from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect
from llm.pipeline import LLMPipeline

router = APIRouter(prefix="/chat", tags=["chat"])


# 非流式 API
@router.post("/message")
async def send_message(
input: str,
history: list[dict] = [],
pipeline: LLMPipeline = Depends(get_pipeline),
):
response = await pipeline.chat(input, history)
return {
"content": response.content,
"model": response.model,
"cached": response.cached,
"usage": response.usage,
}


# WebSocket 流式
@router.websocket("/ws")
async def chat_websocket(
websocket: WebSocket,
pipeline: LLMPipeline = Depends(get_pipeline),
):
await websocket.accept()
history = []

try:
while True:
data = await websocket.receive_json()
user_input = data["message"]

# 发送开始标记
await websocket.send_json({"type": "start"})

# 流式输出
full_response = ""
async for chunk in pipeline.chat_stream(
user_input, history
):
full_response += chunk
await websocket.send_json({
"type": "chunk",
"content": chunk,
})

# 发送完成标记
await websocket.send_json({
"type": "done",
"content": full_response,
})

# 更新历史
history.append({"role": "user", "content": user_input})
history.append({"role": "assistant", "content": full_response})

except WebSocketDisconnect:
pass

九、测试与验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# tests/test_pipeline.py
import pytest
from llm.pipeline import LLMPipeline
from llm.providers.mock_provider import MockProvider
from llm.router import ModelRouter
from llm.context_builder import ContextBuilder


@pytest.fixture
def pipeline():
router = ModelRouter()
router.add_route(MockProvider(delay=0.05), "mock-model")
builder = ContextBuilder("You are a helpful assistant.")
return LLMPipeline(router=router, context_builder=builder)


@pytest.mark.asyncio
async def test_basic_chat(pipeline):
response = await pipeline.chat("Hello!")
assert response.content.startswith("[Mock]")
assert response.provider == "mock"


@pytest.mark.asyncio
async def test_streaming(pipeline):
chunks = []
async for chunk in pipeline.chat_stream("Hello!"):
chunks.append(chunk)
assert len(chunks) > 1
assert "".join(chunks).startswith("[Mock]")


@pytest.mark.asyncio
async def test_circuit_breaker_fallback():
"""测试熔断后自动 fallback"""
router = ModelRouter()
failing = MockProvider(fail_rate=1.0) # 100% 失败
working = MockProvider(delay=0.05)
router.add_route(failing, "failing", priority=1)
router.add_route(working, "working", priority=2)

pipeline = LLMPipeline(
router=router,
context_builder=ContextBuilder("test"),
)

response = await pipeline.chat("test")
assert response.provider == "mock" # fallback 成功

十、生产部署注意事项

10.1 性能指标

组件 延迟预算 优化方向
Context Builder < 5ms 预计算、缓存 token 计数
Semantic Cache < 50ms 使用向量数据库、索引优化
Circuit Breaker < 1ms 纯内存操作
ModelRouter < 1ms 本地路由表
Provider 调用 500ms-10s 流式输出、超时控制

10.2 监控指标

1
2
3
4
5
6
7
8
9
# 关键监控指标
METRICS = {
"llm_request_total": "Counter: 总请求数",
"llm_request_duration_seconds": "Histogram: 请求延迟",
"llm_cache_hit_ratio": "Gauge: 缓存命中率",
"llm_circuit_breaker_state": "Gauge: 熔断器状态 (0=CLOSED, 1=OPEN, 2=HALF_OPEN)",
"llm_provider_failures": "Counter: 各 Provider 失败数",
"llm_token_usage": "Counter: Token 消耗",
}

10.3 超时控制

1
2
3
4
5
6
7
8
9
10
11
12
13
# 为每个 Provider 设置不同的超时
TIMEOUTS = {
"openai": {
"connect": 10,
"read": 30,
"write": 10,
},
"deepseek": {
"connect": 5,
"read": 60, # 国内 API 可能更慢
"write": 10,
},
}

常见问题

Q1: 语义缓存准确率不够高怎么办?

调整 similarity_threshold 参数。0.92 是较保守的值,适合 QA 类场景。对于创意写作类场景,建议降低到 0.85 以下,或完全关闭缓存。

Q2: 熔断器频繁误触发怎么办?

增大 failure_thresholdrecovery_timeout。建议先观察一周的 Provider 错误率,设置阈值为 P99 错误率的 2 倍。

Q3: 流式模式下如何做缓存?

流式模式下缓存效果有限,因为用户期望看到实时输出。建议策略:

  • 首次请求:流式输出 + 后台缓存完整响应
  • 后续请求:非流式缓存命中 + 前端模拟打字效果

Q4: 如何支持更多 Provider?

只需实现 BaseLLMProvider 接口,然后注册到 ModelRouter:

1
2
3
4
5
6
7
8
class ClaudeProvider(BaseLLMProvider):
@property
def provider_name(self): return "claude"

async def chat(self, request): ...
async def chat_stream(self, request): ...

router.add_route(ClaudeProvider(api_key="sk-ant-xxx"), "claude-sonnet-4")

总结

生产级 LLM 管道的核心设计原则:

  1. 抽象 Provider 接口:屏蔽不同 API 的差异,方便切换和测试
  2. 熔断保护:防止 Provider 故障级联影响整个系统
  3. 语义缓存:降低延迟和成本,适合重复性高的对话场景
  4. 智能路由:按成本、优先级、能力灵活选择模型
  5. 管道化架构:每个组件独立可替换,便于测试和演进

这套架构在 ChatLingo AI 语言学习平台中得到验证,支撑了多 Provider 切换、语义缓存命中率约 35%、熔断器零误触发,整体 P95 延迟从 3.2s 降至 1.8s(含缓存命中)。