我用 LangGraph 做了個會說人話的 SQL Agent,再也不用寫查詢了 老闆:「欸,幫我查一下哪個部門最愛開會?」我:「好的」打開這個 Agent我:「哪個部門最愛開會?」Agent:「技術部開了 28 場會議,佔總數的 35%...」老闆:「...你怎麼這麼快?」我:😏 🤦♂️ 起因:又被 SQL 搞瘋了 你是不是也有過這種經歷: 手上有一堆會議記錄的 CSV,老闆突然要各種奇奇怪怪的數據分析。一下要「按部門統計」,一下要「找出會議之王」,一下又要「分析會議效率」。 每次都要: 打開 Excel/SQL 工具 想半天查詢語法 跑出結果發現格式不對 重新整理給老闆看 更慘的是,當老闆站在你旁邊等結果的時候... 那個壓力 😅 所以我想:能不能做一個會說人話的查詢工具? 直接問:「哪個部門最愛開會?」就得到答案,不用寫任何 SQL。 結果真的做出來了,而且比想像中還聰明。 💡 為什麼選 LangGraph 而不是普通 Agent? 剛開始我也想用 LangChain 的 ReActAgent 就好,簡單粗暴: # 看起來很簡單對吧? agent = create_react_agent(llm, tools, prompt) result = agent.invoke({"input": "哪個部門開會最多?"}) 但實際測試後發現一個嚴重問題:不可控。 有時候 Agent 會: 直接瞎猜答案(沒查資料庫) 跳過檢查表格結構的步驟 生成錯誤的 SQL 語法 回答跟問題完全無關的東西 這在 demo 時還好,但要給老闆用的話... 💀 LangGraph 的好處是可以強制執行工作流程: 必須先看資料庫有什麼表 ↓ 必須了解表格結構 ↓ 必須生成查詢 ↓ 必須檢查查詢安全性 ↓ 必須執行並格式化結果 每個步驟都不能跳過,可預測又可靠。 🛠️ 開工:設置環境 必要套件 pip install langchain langgraph langchain-openai langchain-community pip install pandas sqlite3 python-dotenv API Key 設置 去 OpenAI 申請一個 API key,然後: # 方法 1: 環境變數 export OPENAI_API_KEY="sk-your-key-here" # 方法 2: .env 檔案 echo "OPENAI_API_KEY=sk-your-key-here" > .env 測試資料準備 如果手邊沒有會議記錄,可以快速生成一些測試資料: import pandas as pd import random from datetime import datetime, timedelta def generate_meeting_data(n=100): departments = ['技術部', '行銷部', '人事部', '財務部', '產品部', '營運部'] meeting_types = ['專案會議', '週例會', '月報', 'Retro', '1on1', 'All hands'] hosts = ['Alice Chen', 'Bob Wang', 'Carol Liu', 'David Lin', 'Eve Wu'] priorities = ['高', '中', '低'] meetings = [] base_date = datetime.now() - timedelta(days=90) for i in range(n): meeting_date = base_date + timedelta(days=random.randint(0, 90)) duration = random.randint(30, 180) meeting = { '會議ID': f'MTG-{i+1:03d}', '會議主題': f'{random.choice(meeting_types)} - Week {i//7 + 1}', '會議類型': random.choice(meeting_types), '會議日期': meeting_date.strftime('%Y-%m-%d'), '開始時間': f'{random.randint(9,17)}:00', '結束時間': f'{random.randint(10,18)}:00', '會議時長_分鐘': duration, '主辦部門': random.choice(departments), '會議主持人': random.choice(hosts), '參與人數': random.randint(3, 20), '會議地點': f'{random.randint(1,5)}F-會議室{random.choice("ABCD")}', '會議優先級': random.choice(priorities), '會議狀態': random.choice(['已完成', '進行中', '已取消']), '會議費用': random.randint(0, 3000), '會議評分': round(random.uniform(2.0, 5.0), 1), '創建時間': meeting_date.strftime('%Y-%m-%d %H:%M:%S'), '最後更新時間': meeting_date.strftime('%Y-%m-%d %H:%M:%S') } meetings.append(meeting) return pd.DataFrame(meetings) # 生成測試資料 df = generate_meeting_data(100) df.to_csv('meeting_records.csv', index=False, encoding='utf-8') print(f"✅ 生成了 {len(df)} 筆測試資料") 🏗️ 核心架構:讓 Agent 有邏輯 整體設計思路 我設計的 SQL Agent 工作流程是這樣的: 用戶問問題 ↓ 列出所有資料表 (知道有什麼可以查) ↓ 獲取表格 schema (了解欄位結構) ↓ 生成 SQL 查詢 (根據問題和結構生成) ↓ 檢查查詢安全性 (避免危險操作) ↓ 執行查詢 (實際跑 SQL) ↓ 格式化結果 (讓答案更好看) ↓ 回傳給用戶 主要類別結構 import os import sqlite3 import pandas as pd import logging from typing import Literal from langchain_openai import ChatOpenAI from langchain_community.utilities import SQLDatabase from langchain_community.agent_toolkits import SQLDatabaseToolkit from langchain_core.messages import AIMessage, HumanMessage, SystemMessage from langgraph.graph import END, START, MessagesState, StateGraph from langgraph.prebuilt import ToolNode class MeetingSQLAgent: """會議記錄 SQL 查詢助手""" def __init__(self, csv_file_path: str, openai_api_key: str = None): # 設置 LLM if openai_api_key: os.environ["OPENAI_API_KEY"] = openai_api_key self.llm = ChatOpenAI( model="gpt-4o-mini", temperature=0, max_tokens=2000 ) # 建立資料庫 self._setup_database(csv_file_path) # 設置工具 self._setup_tools() # 建構工作流程 self._build_workflow() print("✅ SQL Agent 初始化完成") 資料庫設置 def _setup_database(self, csv_file_path): """將 CSV 轉換成 SQLite 資料庫""" try: # 讀取 CSV df = pd.read_csv(csv_file_path, encoding='utf-8') # 簡單的資料清理 df['會議日期'] = pd.to_datetime(df['會議日期'], errors='coerce') df['會議時長_分鐘'] = pd.to_numeric(df['會議時長_分鐘'], errors='coerce') df['會議評分'] = pd.to_numeric(df['會議評分'], errors='coerce') # 建立 SQLite 資料庫 self.db_path = "meeting_records.db" conn = sqlite3.connect(self.db_path) df.to_sql('meeting_records', conn, if_exists='replace', index=False) # 建立索引提升查詢效能 indexes = [ "CREATE INDEX IF NOT EXISTS idx_department ON meeting_records(主辦部門)", "CREATE INDEX IF NOT EXISTS idx_date ON meeting_records(會議日期)", "CREATE INDEX IF NOT EXISTS idx_host ON meeting_records(會議主持人)", "CREATE INDEX IF NOT EXISTS idx_type ON meeting_records(會議類型)" ] for idx_sql in indexes: conn.execute(idx_sql) conn.commit() conn.close() # 連接到 LangChain SQLDatabase self.db = SQLDatabase.from_uri(f"sqlite:///{self.db_path}") print(f"📊 成功載入 {len(df)} 筆會議記錄") except Exception as e: print(f"❌ 資料庫設置失敗: {e}") raise 🔧 工具設置:給 Agent 各種能力 def _setup_tools(self): """設置 SQL 相關工具""" toolkit = SQLDatabaseToolkit(db=self.db, llm=self.llm) self.tools = toolkit.get_tools() # 取得特定工具的參考 self.list_tables_tool = next(t for t in self.tools if t.name == "sql_db_list_tables") self.get_schema_tool = next(t for t in self.tools if t.name == "sql_db_schema") self.run_query_tool = next(t for t in self.tools if t.name == "sql_db_query") # 建立工具節點 self.get_schema_node = ToolNode([self.get_schema_tool], name="get_schema") self.run_query_node = ToolNode([self.run_query_tool], name="run_query") print("🔧 工具設置完成") 系統提示:給 Agent 個性和規則 這部分很重要,決定了 Agent 的行為模式: SYSTEM_PROMPT = """ 你是一個專門分析會議記錄的 SQL 助手。 ## 重要規則: 1. 我們使用 SQLite,不要用 MySQL 語法 (如 SHOW TABLES) 2. 中文欄位名要用反引號:`主辦部門`, `會議時長_分鐘` 3. 除非特別要求,否則限制結果:LIMIT 10 4. 絕對不要執行 DROP, DELETE, UPDATE 等危險操作 5. 查詢失敗時要給出有用的建議 ## 資料庫結構: meeting_records 表包含: - 基本資訊:會議ID, 會議主題, 會議類型, 會議日期 - 時間相關:開始時間, 結束時間, 會議時長_分鐘 - 人員相關:主辦部門, 會議主持人, 參與人數 - 評估相關:會議優先級, 會議狀態, 會議評分, 會議費用 ## 回答風格: - 用數據說話,提供具體數字 - 如果有趣的發現,主動提及 - 保持專業但友善的語調 記住:你的目標是幫助用戶快速獲得會議數據的洞察! """ 🎭 工作流程節點:每個步驟的實作 節點 1:列出資料表 def _list_tables(self, state: MessagesState): """列出資料庫中的所有表格""" tool_call = { "name": "sql_db_list_tables", "args": {}, "id": "list_tables_001", "type": "tool_call", } tool_call_message = AIMessage(content="", tool_calls=[tool_call]) tool_result = self.list_tables_tool.invoke(tool_call) response = AIMessage(f"📋 發現資料表:{tool_result.content}") return {"messages": [tool_call_message, tool_result, response]} 節點 2:獲取表格結構 def _call_get_schema(self, state: MessagesState): """強制呼叫 schema 工具獲取表格結構""" llm_with_tools = self.llm.bind_tools([self.get_schema_tool], tool_choice="any") instruction = SystemMessage(content=""" 請獲取 meeting_records 表的詳細結構資訊。 我們使用 SQLite,請用正確的語法獲取欄位資訊和樣本資料。 """) response = llm_with_tools.invoke([instruction] + state["messages"]) return {"messages": [response]} 節點 3:生成查詢 def _generate_query(self, state: MessagesState): """根據用戶問題生成 SQL 查詢""" system_message = SystemMessage(content=SYSTEM_PROMPT) llm_with_tools = self.llm.bind_tools([self.run_query_tool]) response = llm_with_tools.invoke([system_message] + state["messages"]) return {"messages": [response]} 節點 4:檢查查詢安全性 這一步很關鍵,避免執行危險的 SQL: def _check_query(self, state: MessagesState): """檢查 SQL 查詢的安全性和正確性""" safety_prompt = """ 作為 SQLite 專家,請檢查這個查詢是否安全且正確: 🚫 安全性檢查: - 是否使用了 SHOW 語句 (SQLite 不支援) - 是否有 DROP, DELETE, UPDATE 等危險操作 - 是否有 SQL 注入風險 ✅ 正確性檢查: - SQLite 語法是否正確 - 中文欄位名是否用反引號包圍 - 邏輯是否合理 如果有問題,請修正查詢。如果沒問題,保持原樣。 """ system_message = SystemMessage(content=safety_prompt) tool_call = state["messages"][-1].tool_calls[0] user_message = HumanMessage(content=f"請檢查:{tool_call['args']['query']}") llm_with_tools = self.llm.bind_tools([self.run_query_tool], tool_choice="any") response = llm_with_tools.invoke([system_message, user_message]) response.id = state["messages"][-1].id return {"messages": [response]} 節點 5:結果格式化 def _format_response(self, state: MessagesState): """格式化查詢結果,讓它更好看""" last_message = state["messages"][-1] if hasattr(last_message, 'content') and last_message.content: content = last_message.content # 處理錯誤訊息 if "Error" in content: formatted_content = f""" ❌ 查詢執行時出現錯誤: {content} 💡 建議: • 檢查問題描述是否清楚 • 試試更簡單的問題,如:「總共有多少場會議?」 • 確認要查詢的資料是否存在 """ else: # 美化正常結果 lines = content.split('\n') formatted_content = "📊 查詢結果:\n\n" for line in lines: if line.strip(): formatted_content += f"• {line.strip()}\n" formatted_message = AIMessage(content=formatted_content) return {"messages": [formatted_message]} return {"messages": []} 🔗 組裝工作流程 def _build_workflow(self): """建構 LangGraph 工作流程""" builder = StateGraph(MessagesState) # 添加所有節點 builder.add_node("list_tables", self._list_tables) builder.add_node("call_get_schema", self._call_get_schema) builder.add_node("get_schema", self.get_schema_node) builder.add_node("generate_query", self._generate_query) builder.add_node("check_query", self._check_query) builder.add_node("run_query", self.run_query_node) builder.add_node("format_response", self._format_response) # 定義工作流程 builder.add_edge(START, "list_tables") builder.add_edge("list_tables", "call_get_schema") builder.add_edge("call_get_schema", "get_schema") builder.add_edge("get_schema", "generate_query") builder.add_conditional_edges("generate_query", self._should_continue) builder.add_edge("check_query", "run_query") builder.add_edge("run_query", "format_response") builder.add_edge("format_response", END) # 編譯工作流程 self.agent = builder.compile() print("🔗 工作流程建構完成") def _should_continue(self, state: MessagesState) -> Literal[END, "check_query"]: """決定是否繼續執行""" messages = state["messages"] last_message = messages[-1] if not last_message.tool_calls: return END # 沒有工具呼叫,結束 else: return "check_query" # 有查詢需要檢查 主要查詢方法 def query(self, question: str) -> str: """執行查詢並返回結果""" try: print(f"🔍 處理問題:{question}") result = self.agent.invoke({ "messages": [HumanMessage(content=question)] }) final_message = result["messages"][-1] return final_message.content except Exception as e: return f"❌ 查詢失敗:{str(e)}" 🧪 實際測試:看看效果如何 基礎測試 def test_basic_queries(): """測試基本查詢功能""" # 初始化 Agent agent = MeetingSQLAgent("meeting_records.csv") test_cases = [ "總共有多少場會議?", "哪個部門開會最多?", "平均會議時間多長?", "誰主持最多會議?", "評分最高的 5 場會議是什麼?", "本月有多少場會議?" ] print("🧪 開始基礎測試...") for question in test_cases: print(f"\n❓ 問題:{question}") try: result = agent.query(question) print(f"✅ 回答:{result}") except Exception as e: print(f"❌ 失敗:{e}") print("-" * 50) if __name__ == "__main__": test_basic_queries() 進階測試 def test_complex_queries(): """測試複雜查詢""" agent = MeetingSQLAgent("meeting_records.csv") complex_questions = [ "分析各部門的會議效率(時長 vs 評分)", "找出可能需要優化的會議(時間長但評分低)", "比較不同會議類型的參與度", "統計每個月的會議趨勢", "找出最活躍的會議主持人 TOP 5" ] print("🔬 開始複雜查詢測試...") for question in complex_questions: print(f"\n🤔 複雜問題:{question}") result = agent.query(question) print(f"🧠 智能分析:{result}") print("=" * 60) 互動式測試 def interactive_test(): """互動式測試環境""" agent = MeetingSQLAgent("meeting_records.csv") print("🎯 歡迎使用會議記錄查詢助手!") print("💡 你可以用自然語言問任何關於會議數據的問題") print("🚪 輸入 'exit' 退出,'help' 查看範例") examples = [ "哪個部門最愛開會?", "會議平均多長時間?", "誰是開會之王?", "有什麼有趣的發現嗎?", "本週開了多少會議?" ] while True: user_input = input("\n❓ 你的問題:").strip() if user_input.lower() == 'exit': print("👋 掰掰!") break elif user_input.lower() == 'help': print("\n💡 範例問題:") for i, example in enumerate(examples, 1): print(f" {i}. {example}") continue elif not user_input: continue print("🤖 AI 思考中...") try: result = agent.query(user_input) print(f"📊 {result}") except Exception as e: print(f"💥 出錯了:{e}") # 執行互動測試 if __name__ == "__main__": interactive_test() 🐛 踩坑記錄:我遇到的問題 問題 1:SQLite vs MySQL 語法 錯誤現象: sqlite3.OperationalError: near "SHOW": syntax error [SQL: SHOW COLUMNS FROM meeting_records;] 原因:LLM 習慣生成 MySQL 語法,但我們用的是 SQLite 解決方案:在系統提示中明確指出使用 SQLite,並提供正確的語法範例: SQLITE_RULES = """ ⚠️ 重要:我們使用 SQLite,不是 MySQL! ❌ 錯誤語法: - SHOW TABLES - SHOW COLUMNS FROM table - LIMIT 0,10 ✅ 正確語法: - SELECT name FROM sqlite_master WHERE type='table' - PRAGMA table_info(table_name) - LIMIT 10 OFFSET 0 """ 問題 2:中文欄位名處理 錯誤現象: sqlite3.OperationalError: no such column: 主辦部門 解決方案:提醒 LLM 用反引號包圍中文欄位: CHINESE_FIELD_RULES = """ 🔤 中文欄位名稱處理: - 必須用反引號包圍:`主辦部門`, `會議時長_分鐘` - 不要用單引號或雙引號 - 欄位名稱要完全匹配,注意底線和空格 """ 問題 3:工作流程卡住 現象:Agent 有時會在某個節點卡住不動 排查方法: # 添加調試信息 def _debug_workflow_state(self, state, node_name): messages = state.get("messages", []) print(f"🔍 {node_name} 節點狀態:") print(f" 訊息數量: {len(messages)}") if messages: last_msg = messages[-1] print(f" 最後訊息類型: {type(last_msg)}") if hasattr(last_msg, 'tool_calls'): print(f" 工具呼叫: {len(last_msg.tool_calls) if last_msg.tool_calls else 0}") 解決方案:確保條件邊的邏輯正確,特別是 _should_continue 方法。 ⚡ 效能優化:讓它跑得更快 資料庫索引優化 def _create_performance_indexes(self, conn): """建立效能索引""" indexes = [ # 基礎索引 "CREATE INDEX IF NOT EXISTS idx_department ON meeting_records(`主辦部門`)", "CREATE INDEX IF NOT EXISTS idx_date ON meeting_records(`會議日期`)", "CREATE INDEX IF NOT EXISTS idx_host ON meeting_records(`會議主持人`)", # 複合索引 "CREATE INDEX IF NOT EXISTS idx_dept_date ON meeting_records(`主辦部門`, `會議日期`)", "CREATE INDEX IF NOT EXISTS idx_type_status ON meeting_records(`會議類型`, `會議狀態`)", # 數值範圍索引 "CREATE INDEX IF NOT EXISTS idx_duration ON meeting_records(`會議時長_分鐘`)", "CREATE INDEX IF NOT EXISTS idx_score ON meeting_records(`會議評分`)" ] for idx_sql in indexes: try: conn.execute(idx_sql) except Exception as e: print(f"⚠️ 索引建立失敗: {e}") conn.commit() print("🚀 效能索引建立完成") 查詢結果快取 from functools import lru_cache import hashlib class CachedMeetingSQLAgent(MeetingSQLAgent): """帶快取功能的 SQL Agent""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._query_cache = {} def _get_cache_key(self, question: str) -> str: """生成快取鍵""" return hashlib.md5(question.encode()).hexdigest() def query(self, question: str) -> str: """帶快取的查詢方法""" cache_key = self._get_cache_key(question) # 檢查快取 if cache_key in self._query_cache: print("⚡ 從快取回傳結果") return self._query_cache[cache_key] # 執行查詢 result = super().query(question) # 儲存到快取 self._query_cache[cache_key] = result return result LLM 呼叫優化 def _optimize_llm_calls(self): """優化 LLM 呼叫設定""" self.llm = ChatOpenAI( model="gpt-4o-mini", # 使用較快的模型 temperature=0, max_tokens=1500, # 減少 token 數量 timeout=30, # 設定超時 max_retries=2, # 減少重試次數 streaming=True # 啟用串流回應 ) 🛡️ 安全性:不讓 Agent 搞破壞 SQL 注入防護 ```pythondef _validate_query_safety(self, query: str) -> bool: """檢查查詢是否安全""" # 危險關鍵字檢查 dangerous_keywords = [ 'DROP', 'DELETE', 'UPDATE', 'INSERT',