1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
| import asyncio import json import uuid from typing import Optional
import httpx import redis.asyncio as aioredis from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
DEEPSEEK_API_KEY = "your-api-key" MODEL = "deepseek-chat" MAX_CONNECTIONS = 50 RATE_LIMIT = 10
class ConnectionManager: def __init__(self): self.connections: dict[str, WebSocket] = {} self.rate_limits: dict[str, list[float]] = {} async def connect(self, client_id: str, websocket: WebSocket): if len(self.connections) >= MAX_CONNECTIONS: await websocket.close(code=1008, reason="服务器繁忙") return False await websocket.accept() self.connections[client_id] = websocket return True def disconnect(self, client_id: str): self.connections.pop(client_id, None) self.rate_limits.pop(client_id, None) def check_rate_limit(self, client_id: str) -> bool: import time now = time.time() timestamps = self.rate_limits.get(client_id, []) timestamps = [t for t in timestamps if now - t < 1] if len(timestamps) >= RATE_LIMIT: return False timestamps.append(now) self.rate_limits[client_id] = timestamps return True
manager = ConnectionManager()
class SessionStore: def __init__(self): self.redis = None async def init(self): self.redis = await aioredis.from_url("redis://localhost:6379") async def get_messages(self, session_id: str) -> list[dict]: data = await self.redis.get(f"session:{session_id}") return json.loads(data) if data else [] async def add_message(self, session_id: str, message: dict): messages = await self.get_messages(session_id) messages.append(message) messages = messages[-50:] await self.redis.setex(f"session:{session_id}", 7200, json.dumps(messages))
store = SessionStore()
async def stream_llm(websocket: WebSocket, messages: list[dict]): """流式调用 LLM 并逐 chunk 发送给客户端""" full_content = "" async with httpx.AsyncClient(timeout=120) as client: async with client.stream( "POST", "https://api.deepseek.com/v1/chat/completions", headers={"Authorization": f"Bearer {DEEPSEEK_API_KEY}"}, json={"model": MODEL, "messages": messages, "stream": True} ) as response: async for line in response.aiter_lines(): if not line.startswith("data: "): continue data_str = line[6:].strip() if data_str == "[DONE]": await websocket.send_json({ "type": "ai_stream_end", "session_id": messages[0].get("session_id", "") }) return full_content try: chunk = json.loads(data_str) delta = chunk["choices"][0]["delta"] content = delta.get("content", "") if content: full_content += content await websocket.send_json({ "type": "ai_stream_chunk", "content": content }) except json.JSONDecodeError: continue return full_content
@app.websocket("/ai-chat/{session_id}") async def ai_chat(websocket: WebSocket, session_id: str): client_id = str(uuid.uuid4())[:8] if not await manager.connect(client_id, websocket): return history = await store.get_messages(session_id) if history: await websocket.send_json({ "type": "history", "messages": history }) system_prompt = { "role": "system", "content": "你是一个友好的 AI 助手。请用中文回复,保持对话自然流畅。" } messages = [system_prompt] + history try: while True: data = await websocket.receive_json() if not manager.check_rate_limit(client_id): await websocket.send_json({ "type": "error", "content": "消息发送太频繁,请稍后再试" }) continue if data["type"] == "user_message": user_msg = {"role": "user", "content": data["content"]} messages.append(user_msg) await store.add_message(session_id, user_msg) await websocket.send_json({"type": "typing"}) full_reply = await stream_llm(websocket, messages) ai_msg = {"role": "assistant", "content": full_reply} messages.append(ai_msg) await store.add_message(session_id, ai_msg) elif data["type"] == "ping": await websocket.send_json({"type": "pong"}) except WebSocketDisconnect: manager.disconnect(client_id) print(f"客户端 {client_id} 断开,会话 {session_id} 已保存")
@app.on_event("startup") async def startup(): await store.init()
if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
|