1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
| import asyncio from typing import AsyncIterator, Optional from .providers.base import LLMRequest, LLMResponse from .router import ModelRouter from .circuit_breaker import CircuitBreaker, CircuitBreakerOpenError from .semantic_cache import SemanticCache from .context_builder import ContextBuilder from .embeddings import EmbeddingService
class LLMPipeline: """ 完整的 LLM 调用管道
调用流程: 1. ContextBuilder 组装 messages 2. SemanticCache 检查缓存 3. CircuitBreaker 保护调用 4. ModelRouter 选择 Provider 5. Provider 执行调用 6. 缓存结果 """
def __init__( self, router: ModelRouter, context_builder: ContextBuilder, cache: Optional[SemanticCache] = None, embedding_service: Optional[EmbeddingService] = None, circuit_breaker: Optional[CircuitBreaker] = None, ): self.router = router self.context_builder = context_builder self.cache = cache self.embedding_service = embedding_service self.circuit_breaker = circuit_breaker
async def chat( self, user_input: str, history: Optional[list[dict]] = None, context: Optional[dict] = None, stream: bool = False, ) -> LLMResponse | AsyncIterator[str]: """执行完整的 LLM 调用管道"""
messages = self.context_builder.build(user_input, history, context)
if self.cache and self.embedding_service and not stream: query = messages[-1]["content"] query_embedding = await self.embedding_service.embed(query) cached = await self.cache.get(query, query_embedding) if cached is not None: return LLMResponse( content=cached, model="cache", provider="cache", usage={"prompt_tokens": 0, "completion_tokens": 0}, cached=True, )
request = LLMRequest( messages=messages, stream=stream, )
last_error = None for attempt in range(3): try: provider, model = await self.router.route(request) request.model = model
if self.circuit_breaker: if stream: response = await self.circuit_breaker.call( provider.chat_stream, request ) else: response = await self.circuit_breaker.call( provider.chat, request ) else: if stream: response = provider.chat_stream(request) else: response = await provider.chat(request)
if ( self.cache and self.embedding_service and not stream and not response.cached ): await self.cache.set( user_input, query_embedding, response.content, )
return response
except (CircuitBreakerOpenError, Exception) as e: last_error = e continue
raise RuntimeError( f"All providers failed. Last error: {last_error}" )
async def chat_stream( self, user_input: str, history: Optional[list[dict]] = None, context: Optional[dict] = None, ) -> AsyncIterator[str]: """流式版本""" result = await self.chat( user_input, history, context, stream=True ) async for chunk in result: yield chunk
|