LangGraph Agentic RAG 完整教學:建構智能檢索代理系統 簡介 Agentic RAG(Retrieval-Augmented Generation)是一個強大的架構,結合了檢索增強生成和代理系統的概念。這個系統能夠智能地決定何時需要從知識庫中檢索資訊,何時可以直接回應用戶。本教學將逐步帶你了解如何使用 LangGraph 建構一個完整的 Agentic RAG 系統。 系統概述 核心功能 在這個教學中,我們將建立一個檢索代理,能夠: ✅ 智能判斷:自動決定是否需要從向量資料庫中檢索上下文 ✅ 相關性評估:評估檢索文件的相關性 ✅ 查詢最佳化:必要時重寫查詢以獲得更好的結果 ✅ 準確回答:基於檢索到的內容生成準確的回答 系統架構圖 graph TD A[開始] --> B[生成查詢或回應] B --> C{需要檢索?} C -->|是| D[檢索工具] C -->|否| E[結束] D --> F{文件相關?} F -->|是| G[生成答案] F -->|否| H[重寫問題] H --> B G --> E 環境設置 安裝必要套件 pip install -U --quiet langgraph "langchain[openai]" langchain-community langchain-text-splitters 設置 API 金鑰 import getpass import os def _set_env(key: str): if key not in os.environ: os.environ[key] = getpass.getpass(f"{key}: ") _set_env("OPENAI_API_KEY") 步驟 1:文件預處理 1.1 載入網路文件 使用 WebBaseLoader 從指定的 URL 載入文件: from langchain_community.document_loaders import WebBaseLoader # 定義要載入的文件 URL urls = [ "https://lilianweng.github.io/posts/2024-11-28-reward-hacking/", "https://lilianweng.github.io/posts/2024-07-07-hallucination/", "https://lilianweng.github.io/posts/2024-04-12-diffusion-video/", ] # 載入所有文件 docs = [WebBaseLoader(url).load() for url in urls] print(f"成功載入 {len(docs)} 個文件") 1.2 文件分割處理 將長文件分割成較小的區塊以便建立索引: from langchain_text_splitters import RecursiveCharacterTextSplitter # 展平文件列表 docs_list = [item for sublist in docs for item in sublist] # 配置文件分割器 text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder( chunk_size=100, # 每個區塊的大小 chunk_overlap=50 # 區塊間的重疊 ) # 執行分割 doc_splits = text_splitter.split_documents(docs_list) print(f"文件分割完成,共 {len(doc_splits)} 個區塊") 步驟 2:建立檢索工具 2.1 創建向量存儲 使用 OpenAI 嵌入模型創建記憶體向量存儲: from langchain_core.vectorstores import InMemoryVectorStore from langchain_openai import OpenAIEmbeddings # 初始化嵌入模型 embeddings = OpenAIEmbeddings() # 創建向量存儲 vectorstore = InMemoryVectorStore.from_documents( documents=doc_splits, embedding=embeddings ) # 創建檢索器 retriever = vectorstore.as_retriever( search_type="similarity", search_kwargs={"k": 5} # 返回最相似的 5 個文件 ) print("向量存儲建立完成") 2.2 包裝成檢索工具 將檢索器包裝成 LangChain 工具: from langchain.tools.retriever import create_retriever_tool retriever_tool = create_retriever_tool( retriever, "retrieve_blog_posts", "搜尋並返回關於 Lilian Weng 部落格文章的資訊。用於回答與機器學習、AI 研究相關的問題。", ) print("檢索工具創建完成") 步驟 3:定義圖形節點 3.1 生成查詢或回應節點 這是系統的主要決策節點,決定是否需要檢索外部資訊: from langgraph.graph import MessagesState from langchain.chat_models import init_chat_model # 初始化回應模型 response_model = init_chat_model("openai:gpt-4", temperature=0) def generate_query_or_respond(state: MessagesState): """ 呼叫模型生成回應。根據問題決定是否使用檢索工具。 如果問題需要特定的外部資訊,模型會選擇使用檢索工具。 如果問題可以直接回答,模型會直接生成回應。 """ print("🤖 正在分析查詢並決定是否需要檢索...") # 將檢索工具綁定到模型 model_with_tools = response_model.bind_tools([retriever_tool]) # 生成回應 response = model_with_tools.invoke(state["messages"]) return {"messages": [response]} 3.2 文件評分節點 評估檢索到的文件是否與查詢相關: from pydantic import BaseModel, Field from typing import Literal class GradeDocuments(BaseModel): """使用二元分數評分文件相關性""" binary_score: str = Field( description="相關性分數:'yes' 表示相關,'no' 表示不相關" ) # 初始化評分模型 grader_model = init_chat_model("openai:gpt-4", temperature=0) def grade_documents(state: MessagesState) -> Literal["generate_answer", "rewrite_question"]: """ 判斷檢索到的文件是否與問題相關 返回: - "generate_answer": 如果文件相關 - "rewrite_question": 如果文件不相關 """ print("📊 正在評估檢索文件的相關性...") # 提取問題和上下文 question = state["messages"][0].content last_message = state["messages"][-1] # 確保最後一條訊息包含檢索結果 if hasattr(last_message, 'content'): context = last_message.content else: context = str(last_message) # 評分提示 grading_prompt = f""" 用戶問題: {question} 檢索到的文件: {context} 請評估檢索到的文件是否與用戶問題相關。 如果文件包含回答問題所需的資訊,請回答 'yes'。 如果文件與問題不相關或無法提供有用資訊,請回答 'no'。 """ # 使用結構化輸出 structured_grader = grader_model.with_structured_output(GradeDocuments) score = structured_grader.invoke([{"role": "user", "content": grading_prompt}]) print(f"📋 相關性評分: {score.binary_score}") if score.binary_score == "yes": print("✅ 文件相關,準備生成答案") return "generate_answer" else: print("❌ 文件不相關,重寫問題") return "rewrite_question" 3.3 重寫問題節點 當檢索到的文件不相關時,重寫原始問題以改善檢索效果: def rewrite_question(state: MessagesState): """ 重寫原始用戶問題以改善檢索效果 """ print("✏️ 正在重寫問題以改善檢索效果...") messages = state["messages"] question = messages[0].content # 重寫提示 rewrite_prompt = f""" 原始問題: {question} 之前的檢索沒有找到相關資訊。請重寫這個問題,使其更適合從 Lilian Weng 的部落格文章中檢索資訊。 重寫要求: 1. 保持原始問題的核心意圖 2. 使用更具體的關鍵詞 3. 考慮 AI/ML 領域的術語 4. 讓問題更適合學術文章檢索 重寫後的問題: """ response = response_model.invoke([{"role": "user", "content": rewrite_prompt}]) print(f"🔄 重寫後的問題: {response.content}") return {"messages": [{"role": "user", "content": response.content}]} 3.4 生成答案節點 基於檢索到的相關內容生成最終答案: def generate_answer(state: MessagesState): """ 基於檢索到的上下文生成最終答案 """ print("📝 正在基於檢索內容生成答案...") # 提取問題和上下文 question = state["messages"][0].content last_message = state["messages"][-1] if hasattr(last_message, 'content'): context = last_message.content else: context = str(last_message) # 生成答案的提示 answer_prompt = f""" 基於以下檢索到的內容回答用戶問題。 用戶問題: {question} 檢索到的內容: {context} 請提供一個詳細、準確的答案。如果檢索內容不足以完全回答問題,請說明這一點。 答案: """ response = response_model.invoke([{"role": "user", "content": answer_prompt}]) print("✅ 答案生成完成") return {"messages": [response]} 步驟 4:組裝工作流程圖 4.1 建立圖形結構 from langgraph.graph import StateGraph, START, END from langgraph.prebuilt import ToolNode, tools_condition # 初始化狀態圖 workflow = StateGraph(MessagesState) # 添加節點 workflow.add_node("generate_query_or_respond", generate_query_or_respond) workflow.add_node("retrieve", ToolNode([retriever_tool])) workflow.add_node("rewrite_question", rewrite_question) workflow.add_node("generate_answer", generate_answer) print("🏗️ 工作流程節點添加完成") 4.2 定義邊和條件邊 # 添加起始邊 workflow.add_edge(START, "generate_query_or_respond") # 添加條件邊:決定是否需要檢索 workflow.add_conditional_edges( "generate_query_or_respond", tools_condition, { "tools": "retrieve", # 如果需要使用工具,去檢索節點 END: END, # 如果不需要工具,直接結束 }, ) # 添加條件邊:評估檢索結果 workflow.add_conditional_edges( "retrieve", grade_documents, { "generate_answer": "generate_answer", # 文件相關,生成答案 "rewrite_question": "rewrite_question", # 文件不相關,重寫問題 }, ) # 添加固定邊 workflow.add_edge("generate_answer", END) workflow.add_edge("rewrite_question", "generate_query_or_respond") print("🔗 工作流程邊定義完成") 4.3 編譯圖形 # 編譯工作流程圖 graph = workflow.compile() print("🎯 Agentic RAG 系統編譯完成!") 步驟 5:系統測試與運行 5.1 基本查詢測試 def test_agentic_rag(question: str): """測試 Agentic RAG 系統""" print(f"🔍 處理問題: {question}") print("=" * 50) # 執行查詢 for chunk in graph.stream( { "messages": [ { "role": "user", "content": question, } ] } ): for node, update in chunk.items(): print(f"📍 來自節點 '{node}' 的更新:") if "messages" in update and update["messages"]: last_message = update["messages"][-1] if hasattr(last_message, 'pretty_print'): last_message.pretty_print() else: print(last_message) print("-" * 30) # 測試不同類型的問題 test_questions = [ "Lilian Weng 對獎勵駭客的類型說了什麼?", "什麼是幻覺問題在大語言模型中?", "今天天氣如何?", # 這個問題不需要檢索 ] for question in test_questions: test_agentic_rag(question) print("\n" + "="*70 + "\n") 5.2 進階查詢範例 # 處理複雜查詢的完整示例 def run_complex_query(): """運行複雜查詢的範例""" complex_question = """ 請詳細解釋 Lilian Weng 文章中關於強化學習中獎勵駭客的主要觀點, 並說明這對 AI 安全有什麼影響? """ print("🚀 執行複雜查詢範例") print(f"問題: {complex_question}") print("=" * 70) # 執行查詢並收集結果 result = graph.invoke({ "messages": [ { "role": "user", "content": complex_question, } ] }) # 顯示最終答案 final_answer = result["messages"][-1].content print("🎯 最終答案:") print(final_answer) return result # 執行範例 complex_result = run_complex_query() 系統監控與除錯 添加詳細日誌 import logging from datetime import datetime # 設置日誌 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def log_node_execution(node_name: str, state: MessagesState): """記錄節點執行狀態""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") message_count = len(state.get("messages", [])) logger.info(f"[{timestamp}] 節點 '{node_name}' 執行,當前訊息數: {message_count}") # 在每個節點函數中添加日誌記錄 def enhanced_generate_query_or_respond(state: MessagesState): log_node_execution("generate_query_or_respond", state) return generate_query_or_respond(state) # 類似地增強其他節點... 性能監控 import time class PerformanceMonitor: def __init__(self): self.start_time = None self.node_times = {} def start_timing(self): self.start_time = time.time() def record_node_time(self, node_name: str): if self.start_time: elapsed = time.time() - self.start_time self.node_times[node_name] = elapsed def get_summary(self): total_time = sum(self.node_times.values()) return { "total_time": total_time, "node_breakdown": self.node_times, "average_per_node": total_time / len(self.node_times) if self.node_times else 0 } # 使用範例 monitor = PerformanceMonitor() monitor.start_timing() # 執行查詢... result = graph.invoke({"messages": [{"role": "user", "content": "測試問題"}]}) # 顯示性能報告 performance_summary = monitor.get_summary() print(f"執行總時間: {performance_summary['total_time']:.2f} 秒") 最佳實踐與最佳化 1. 文件處理最佳化 # 進階文件分割策略 def create_optimized_splitter(content_type: str = "blog"): """根據內容類型創建最佳化的分割器""" if content_type == "blog": return RecursiveCharacterTextSplitter.from_tiktoken_encoder( chunk_size=200, chunk_overlap=50, separators=["\n\n", "\n", ". ", " ", ""] ) elif content_type == "academic": return RecursiveCharacterTextSplitter.from_tiktoken_encoder( chunk_size=500, chunk_overlap=100, separators=["\n\n", "\n", ". ", " ", ""] ) else: return RecursiveCharacterTextSplitter.from_tiktoken_encoder( chunk_size=300, chunk_overlap=75 ) # 智能文件標記 def add_metadata_to_documents(docs, source_info): """為文件添加元數據以改善檢索""" for doc in docs: doc.metadata.update({ "source_type": source_info.get("type", "unknown"), "domain": source_info.get("domain", "general"), "language": source_info.get("language", "en"), "creation_date": datetime.now().isoformat() }) return docs 2. 檢索策略改進 # 混合檢索策略 from langchain.retrievers import EnsembleRetriever from langchain_community.retrievers import BM25Retriever def create_hybrid_retriever(doc_splits): """創建混合檢索器,結合語義和關鍵詞檢索""" # 語義檢索器 vectorstore = InMemoryVectorStore.from_documents( documents=doc_splits, embedding=OpenAIEmbeddings() ) semantic_retriever = vectorstore.as_retriever(search_kwargs={"k": 3}) # 關鍵詞檢索器 keyword_retriever = BM25Retriever.from_documents(doc_splits) keyword_retriever.k = 3 # 組合檢索器 ensemble_retriever = EnsembleRetriever( retrievers=[semantic_retriever, keyword_retriever], weights=[0.7, 0.3] # 70% 語義,30% 關鍵詞 ) return ensemble_retriever # 使用混合檢索器 hybrid_retriever = create_hybrid_retriever(doc_splits) enhanced_retriever_tool = create_retriever_tool( hybrid_retriever, "enhanced_retrieve_blog_posts", "使用混合檢索策略搜尋 Lilian Weng 部落格文章" ) 3. 快取機制 from functools import lru_cache import hashlib class QueryCache: def __init__(self, max_size=100): self.cache = {} self.max_size = max_size def _hash_query(self, query: str) -> str: return hashlib.md5(query.encode()).hexdigest() def get(self, query: str): query_hash = self._hash_query(query) return self.cache.get(query_hash) def set(self, query: str, result): if len(self.cache) >= self.max_size: # 移除最舊的條目 oldest_key = next(iter(self.cache)) del self.cache[oldest_key] query_hash = self._hash_query(query) self.cache[query_hash] = result def clear(self): self.cache.clear() # 全域快取實例 query_cache = QueryCache() # 在檢索函數中使用快取 def cached_retrieve(query: str): # 檢查快取 cached_result = query_cache.get(query) if cached_result: print(f"🎯 快取命中: {query}") return cached_result # 執行檢索 result = retriever.get_relevant_documents(query) # 儲存到快取 query_cache.set(query, result) return result 擴展功能 1. 多語言支援 from langchain.schema import Document def detect_and_translate(text: str, target_language: str = "en"): """檢測文本語言並翻譯""" # 這裡可以整合翻譯服務 # 例如 Google Translate API 或 Azure Translator pass def multilingual_processing(docs: list[Document]): """多語言文件處理""" processed_docs = [] for doc in docs: # 檢測語言 detected_lang = "zh" # 簡化示例 # 如果不是英文,進行翻譯 if detected_lang != "en": translated_content = detect_and_translate(doc.page_content, "en") # 創建新文件,保留原始內容 new_doc = Document( page_content=translated_content, metadata={ **doc.metadata, "original_language": detected_lang, "original_content": doc.page_content } ) processed_docs.append(new_doc) else: processed_docs.append(doc) return processed_docs 2. 知識圖譜整合 class KnowledgeGraphRetriever: def __init__(self): self.entities = {} self.relationships = {} def extract_entities(self, text: str): """從文本中提取實體""" # 使用 NER 模型提取實體 # 這裡是簡化示例 pass def build_graph(self, documents): """構建知識圖譜""" for doc in documents: entities = self.extract_entities(doc.page_content) # 建立實體關係 # ... def graph_enhanced_retrieval(self, query: str): """基於知識圖譜的增強檢索""" # 1. 常規向量檢索 vector_results = retriever.get_relevant_documents(query) # 2. 提取查詢實體 query_entities = self.extract_entities(query) # 3. 圖譜擴展 related_entities = self.find_related_entities(query_entities) # 4. 組合結果 enhanced_results = self.combine_results(vector_results, related_entities) return enhanced_results 3. 用戶回饋學習 class FeedbackLearner: def __init__(self): self.feedback_data = [] def collect_feedback(self, query: str, response: str, rating: int, comments: str = ""): """收集用戶回饋""" feedback = { "query": query, "response": response, "rating": rating, "comments": comments, "timestamp": datetime.now().isoformat() } self.feedback_data.append(feedback) def analyze_feedback(self): """分析回饋數據""" if not self.feedback_data: return {} ratings = [f["rating"] for f in self.feedback_data] avg_rating = sum(ratings) / len(ratings) low_rated_queries = [ f for f in self.feedback_data if f["rating"] < 3 ] return { "average_rating": avg_rating, "total_feedback": len(self.feedback_data), "low_rated_count": len(low_rated_queries), "low_rated_queries": low_rated_queries } def improve_system(self): """基於回饋改進系統""" analysis = self.analyze_feedback() # 識別需要改進的查詢類型 problematic_queries = analysis.get("low_rated_queries", []) # 生成改進建議 suggestions = [] for query_data in problematic_queries: suggestion = { "original_query": query_data["query"], "issue": query_data["comments"], "improvement_needed": "檢索策略調整" } suggestions.append(suggestion) return suggestions # 使用回饋學習 feedback_learner = FeedbackLearner() # 模擬用戶回饋 feedback_learner.collect_feedback( "什麼是 reward hacking?", "獎勵駭客是指...", 4, "回答很好但可以更詳細" ) 部署與監控 1. 生產環境配置 from langchain.callbacks import get_openai_callback import os class ProductionRAGSystem: def __init__(self, config_path: str = None): self.config = self.load_config(config_path) self.setup_monitoring() self.setup_error_handling() def load_config(self, config_path): """載入生產環境配置""" default_config = { "model_name": "gpt-4", "temperature": 0, "max_tokens": 1000, "chunk_size": 200, "chunk_overlap": 50, "max_retrieval_documents": 5, "cache_enabled": True, "logging_level": "INFO" } if config_path and os.path.exists(config_path): # 載入外部配置檔案 pass return default_config def setup_monitoring(self): """設置監控""" self.metrics = { "total_queries": 0, "successful_queries": 0, "failed_queries": 0, "average_response_time": 0, "token_usage": 0 } def setup_error_handling(self): """設置錯誤處理""" self.error_handlers = { "retrieval_error": self.handle_retrieval_error, "generation_error": self.handle_generation_error, "timeout_error": self.handle_timeout_error } def handle_retrieval_error(self, error): """處理檢索錯誤""" print(f"檢索錯誤: {error}") return "抱歉,檢索系統暫時無法使用,請稍後再試。" def handle_generation_error(self, error): """處理生成錯誤""" print(f"生成錯誤: {error}") return "抱歉,回答生成過程中發生錯誤,請重新提問。" def handle_timeout_error(self, error): """處理超時錯誤""" print(f"超時錯誤: {error}") return "請求處理時間過長,請簡化您的問題後重試。" def process_query_with_monitoring(self, query: str): """帶監控的查詢處理""" start_time = time.time() self.metrics["total_queries"] += 1 try: with get_openai_callback() as cb: result = graph.invoke({ "messages": [{"role": "user", "content": query}] }) # 更新指標 self.metrics["successful_queries"] += 1 self.metrics["token_usage"] += cb.total_tokens response_time = time.time() - start_time self.update_average_response_time(response_time) return result except Exception as e: self.metrics["failed_queries"] += 1 error_type = type(e).__name__ if "retrieval" in str(e).lower(): return self.handle_retrieval_error(e) elif "timeout" in str(e).lower(): return self.handle_timeout_error(e) else: return self.handle_generation_error(e) def update_average_response_time(self, new_time): """更新平均回應時間""" total_successful = self.metrics["successful_queries"] current_avg = self.metrics["average_response_time"] # 計算新的平均值 self.metrics["average_response_time"] = ( (current_avg * (total_successful - 1) + new_time) / total_successful ) def get_health_status(self): """獲取系統健康狀態""" total_queries = self.metrics["total_queries"] success_rate = ( self.metrics["successful_queries"] / total_queries * 100 if total_queries > 0 else 0 ) return { "status": "healthy" if success_rate > 95 else "degraded", "success_rate": success_rate, "average_response_time": self.metrics["average_response_time"], "total_queries": total_queries, "token_usage": self.metrics["token_usage"] } # 初始化生產系統 production_system = ProductionRAGSystem() 2. API 服務化 from fastapi import FastAPI, HTTPException from pydantic import BaseModel import uvicorn app = FastAPI(title="Agentic RAG API", version="1.0.0") class QueryRequest(BaseModel): question: str user_id: str = None session_id: str = None class QueryResponse(BaseModel): answer: str confidence: float sources: list[str] processing_time: float @app.post("/query", response_model=QueryResponse) async def process_query(request: QueryRequest): """處理查詢請求""" try: start_time = time.time() # 執行 RAG 查詢 result = production_system.process_query_with_monitoring(request.question) processing_time = time.time() - start_time # 提取答案和來源 answer = result["messages"][-1].content if result["messages"] else "無法生成回答" # 計算信心分數(簡化版) confidence = 0.8 if len(answer) > 100 else 0.6 # 提取來源 sources = ["Lilian Weng Blog"] # 簡化版 return QueryResponse( answer=answer, confidence=confidence, sources=sources, processing_time=processing_time ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/health") async def health_check(): """健康檢查端點""" return production_system.get_health_status() @app.get("/metrics") async def get_metrics(): """獲取系統指標""" return production_system.metrics # 啟動服務 if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000) 3. Docker 容器化 # Dockerfile FROM python:3.11-slim WORKDIR /app # 安裝系統依賴 RUN apt-get update && apt-get install -y \ gcc \ g++ \ && rm -rf /var/lib/apt/lists/* # 複製需求檔案 COPY requirements.txt . # 安裝 Python 依賴 RUN pip install --no-cache-dir -r requirements.txt # 複製應用程式碼 COPY . . # 暴露端口 EXPOSE 8000 # 設置環境變數 ENV PYTHONPATH=/app ENV PYTHONUNBUFFERED=1 # 啟動命令 CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] # docker-compose.yml version: '3.8' services: agentic-rag: build: . ports: - "8000:8000" environment: - OPENAI_API_KEY=${OPENAI_API_KEY} - LOG_LEVEL=INFO volumes: - ./logs:/app/logs - ./data:/app/data restart: unless-stopped redis: image: redis:alpine ports: - "6379:6379" volumes: - redis_data:/data restart: unless-stopped nginx: image: nginx:alpine ports: - "80:80" - "443:443" volumes: - ./nginx.conf:/etc/nginx/nginx.conf - ./ssl:/etc/nginx/ssl depends_on: - agentic-rag restart: unless-stopped volumes: redis_data: 測試與驗證 1. 單元測試 import unittest from unittest.mock import Mock, patch class TestAgenticRAG(unittest.TestCase): def setUp(self): """設置測試環境""" self.test_state = { "messages": [ {"role": "user", "content": "什麼是 reward hacking?"} ] } def test_generate_query_or_respond(self): """測試查詢生成節點""" with patch('langchain.chat_models.init_chat_model') as mock_model: # 模擬模型回應 mock_response = Mock() mock_response.content = "測試回應" mock_model.return_value.bind_tools.return_value.invoke.return_value = mock_response result = generate_query_or_respond(self.test_state) self.assertIn("messages", result) self.assertEqual(len(result["messages"]), 1) def test_grade_documents_relevant(self): """測試文件評分 - 相關情況""" test_state = { "messages": [ {"role": "user", "content": "什麼是 reward hacking?"}, Mock(content="Reward hacking 是指 AI 系統找到意外方法來最大化獎勵...") ] } with patch('langchain.chat_models.init_chat_model') as mock_model: mock_grader = Mock() mock_grader.binary_score = "yes" mock_model.return_value.with_structured_output.return_value.invoke.return_value = mock_grader result = grade_documents(test_state) self.assertEqual(result, "generate_answer") def test_grade_documents_irrelevant(self): """測試文件評分 - 不相關情況""" test_state = { "messages": [ {"role": "user", "content": "什麼是 reward hacking?"}, Mock(content="今天天氣很好...") ] } with patch('langchain.chat_models.init_chat_model') as mock_model: mock_grader = Mock() mock_grader.binary_score = "no" mock_model.return_value.with_structured_output.return_value.invoke.return_value = mock_grader result = grade_documents(test_state) self.assertEqual(result, "rewrite_question") # 運行測試 if __name__ == "__main__": unittest.main() 2. 整合測試 class IntegrationTest: def __init__(self): self.test_cases = [ { "question": "什麼是 reward hacking?", "expected_keywords": ["獎勵", "駭客", "AI", "強化學習"], "should_retrieve": True }, { "question": "今天天氣如何?", "expected_keywords": [], "should_retrieve": False }, { "question": "Lilian Weng 對 hallucination 的看法?", "expected_keywords": ["幻覺", "語言模型", "生成"], "should_retrieve": True } ] def run_integration_tests(self): """運行整合測試""" results = [] for i, test_case in enumerate(self.test_cases): print(f"執行測試案例 {i+1}: {test_case['question']}") try: # 執行查詢 result = graph.invoke({ "messages": [{"role": "user", "content": test_case["question"]}] }) # 驗證結果 answer = result["messages"][-1].content # 檢查是否包含預期關鍵詞 keywords_found = any( keyword.lower() in answer.lower() for keyword in test_case["expected_keywords"] ) test_result = { "test_case": i+1, "question": test_case["question"], "passed": keywords_found or not test_case["expected_keywords"], "answer_length": len(answer), "keywords_found": keywords_found } results.append(test_result) except Exception as e: results.append({ "test_case": i+1, "question": test_case["question"], "passed": False, "error": str(e) }) return results def generate_test_report(self, results): """生成測試報告""" passed_tests = sum(1 for r in results if r.get("passed", False)) total_tests = len(results) print(f"\n📊 測試報告") print(f"通過率: {passed_tests}/{total_tests} ({passed_tests/total_tests*100:.1f}%)") print("\n詳細結果:") for result in results: status = "✅ 通過" if result.get("passed", False) else "❌ 失敗" print(f" 測試 {result['test_case']}: {status}") if "error" in result: print(f" 錯誤: {result['error']}") # 執行整合測試 integration_test = IntegrationTest() test_results = integration_test.run_integration_tests() integration_test.generate_test_report(test_results) 3. 性能基準測試 import concurrent.futures import statistics class PerformanceBenchmark: def __init__(self, num_queries=100): self.num_queries = num_queries self.test_queries = [ "什麼是 reward hacking?", "解釋 AI hallucination 問題", "Diffusion model 的工作原理", "強化學習的挑戰", "語言模型的局限性" ] * (num_queries // 5) def single_query_benchmark(self, query): """單個查詢的基準測試""" start_time = time.time() try: result = graph.invoke({ "messages": [{"role": "user", "content": query}] }) end_time = time.time() return { "query": query, "response_time": end_time - start_time, "success": True, "answer_length": len(result["messages"][-1].content) } except Exception as e: return { "query": query, "response_time": time.time() - start_time, "success": False, "error": str(e) } def concurrent_benchmark(self, max_workers=5): """並發基準測試""" print(f"🚀 開始並發基準測試,{self.num_queries} 個查詢,{max_workers} 個並發") with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [ executor.submit(self.single_query_benchmark, query) for query in self.test_queries ] results = [] for future in concurrent.futures.as_completed(futures): result = future.result() results.append(result) if len(results) % 10 == 0: print(f"完成 {len(results)}/{self.num_queries} 個查詢") return results def analyze_results(self, results): """分析基準測試結果""" successful_results = [r for r in results if r["success"]] failed_results = [r for r in results if not r["success"]] if successful_results: response_times = [r["response_time"] for r in successful_results] answer_lengths = [r["answer_length"] for r in successful_results] analysis = { "total_queries": len(results), "successful_queries": len(successful_results), "failed_queries": len(failed_results), "success_rate": len(successful_results) / len(results) * 100, "avg_response_time": statistics.mean(response_times), "median_response_time": statistics.median(response_times), "min_response_time": min(response_times), "max_response_time": max(response_times), "avg_answer_length": statistics.mean(answer_lengths), "queries_per_second": len(successful_results) / sum(response_times) } return analysis else: return {"error": "所有查詢都失敗了"} def generate_benchmark_report(self, analysis): """生成基準測試報告""" print("\n📈 性能基準測試報告") print("=" * 50) print(f"總查詢數: {analysis['total_queries']}") print(f"成功查詢數: {analysis['successful_queries']}") print(f"成功率: {analysis['success_rate']:.2f}%") print(f"平均回應時間: {analysis['avg_response_time']:.2f} 秒") print(f"中位數回應時間: {analysis['median_response_time']:.2f} 秒") print(f"最快回應時間: {analysis['min_response_time']:.2f} 秒") print(f"最慢回應時間: {analysis['max_response_time']:.2f} 秒") print(f"平均答案長度: {analysis['avg_answer_length']:.0f} 字符") print(f"每秒查詢數 (QPS): {analysis['queries_per_second']:.2f}") # 執行性能基準測試 benchmark = PerformanceBenchmark(num_queries=50) benchmark_results = benchmark.concurrent_benchmark(max_workers=3) analysis = benchmark.analyze_results(benchmark_results) benchmark.generate_benchmark_report(analysis) 故障排除指南 常見問題與解決方案 1. 檢索質量問題 def diagnose_retrieval_issues(): """診斷檢索質量問題""" print("🔍 檢索質量診斷") # 測試查詢 test_query = "什麼是 reward hacking?" # 檢索文件 docs = retriever.get_relevant_documents(test_query) print(f"查詢: {test_query}") print(f"檢索到 {len(docs)} 個文件") # 分析檢索結果 for i, doc in enumerate(docs): print(f"\n文件 {i+1}:") print(f"內容長度: {len(doc.page_content)} 字符") print(f"內容預覽: {doc.page_content[:100]}...") # 檢查關鍵詞匹配 query_words = test_query.lower().split() doc_content = doc.page_content.lower() matches = [word for word in query_words if word in doc_content] print(f"關鍵詞匹配: {matches}") # 計算相似度分數(簡化版) similarity = len(matches) / len(query_words) print(f"相似度分數: {similarity:.2f}") # 執行診斷 diagnose_retrieval_issues() 2. 模型回應問題 def diagnose_model_issues(): """診斷模型回應問題""" print("🤖 模型回應診斷") # 測試不同溫度設置 temperatures = [0, 0.3, 0.7, 1.0] test_prompt = "請解釋什麼是人工智能。" for temp in temperatures: print(f"\n溫度設置: {temp}") test_model = init_chat_model("openai:gpt-4", temperature=temp) response = test_model.invoke([{"role": "user", "content": test_prompt}]) print(f"回應長度: {len(response.content)} 字符") print(f"回應預覽: {response.content[:100]}...") # 分析回應質量指標 sentences = response.content.split('。') avg_sentence_length = sum(len(s) for s in sentences) / len(sentences) print(f"平均句子長度: {avg_sentence_length:.1f} 字符") print(f"句子數量: {len(sentences)}") # 執行診斷 diagnose_model_issues() 3. 圖形流程問題 def diagnose_graph_flow(): """診斷圖形流程問題""" print("🔄 圖形流程診斷") # 追蹤每個節點的執行 class FlowTracker: def __init__(self): self.execution_log = [] def log_node_execution(self, node_name, input_data, output_data): self.execution_log.append({ "node": node_name, "timestamp": datetime.now().isoformat(), "input_size": len(str(input_data)), "output_size": len(str(output_data)) }) def print_execution_log(self): print("\n執行日誌:") for log_entry in self.execution_log: print(f" {log_entry['timestamp']}: 節點 '{log_entry['node']}'") print(f" 輸入大小: {log_entry['input_size']} 字符") print(f" 輸出大小: {log_entry['output_size']} 字符") # 使用流程追蹤器 flow_tracker = FlowTracker() # 測試查詢 test_query = "什麼是機器學習?" print(f"測試查詢: {test_query}") try: # 執行圖形並追蹤 for chunk in graph.stream({ "messages": [{"role": "user", "content": test_query}] }): for node_name, update in chunk.items(): flow_tracker.log_node_execution( node_name, {"messages": [{"role": "user", "content": test_query}]}, update ) print(f"✅ 節點 '{node_name}' 執行完成") flow_tracker.print_execution_log() except Exception as e: print(f"❌ 圖形執行失敗: {e}") flow_tracker.print_execution_log() # 執行診斷 diagnose_graph_flow() 總結 系統優勢 Agentic RAG 系統結合了智能決策和檢索增強生成的優勢,主要特點包括: 智能判斷能力 - 自動決定何時需要外部資訊 自我修正機制 - 能夠評估和改進檢索品質 靈活的工作流程 - 易於擴展和客製化 可觀測性 - 完整的監控和診斷能力 最佳實踐總結 文件處理 根據內容類型選擇合適的分割策略 為文件添加豐富的元數據 使用混合檢索策略提高準確性 模型配置 針對不同任務調整溫度參數 使用結構化輸出提高可靠性 實施適當的快取機制 系統監控 建立完整的指標收集系統 實施健康檢查和自動恢復 收集和分析用戶回饋 擴展性設計 模組化設計便於維護 支援水平擴展 實施適當的負載均衡 未來發展方向 技術改進 整合更先進的檢索技術 實施知識圖譜增強 添加多模態支援 用戶體驗 改善回應速度 提供更好的解釋性 支援對話式互動 企業級功能 增強安全和隱私保護 支援多租戶架構 實施完整的 RBAC 系統 資源與參考 LangGraph 官方文檔: https://langchain-ai.github.io/langgraph/ LangChain 文檔: https://docs.langchain.com/ OpenAI API 文檔: https://platform.openai.com/docs 向量資料庫比較: Pinecone, Weaviate, Chroma 希望這個完整的 Agentic RAG 教學能幫助您成功建立自己的智能檢索代理系統!如果您在實施過程中遇到任何問題,請參考故障排除指南或查閱相關文檔。