🌊 LangGraph Streaming API:讓你的AI即時回應! 🎯 什麼是流式處理?為什麼你需要它? 想像一下,你在和ChatGPT聊天時,如果它要等到完全想好所有回答才一次性顯示出來,你會覺得等待時間很漫長。但實際上,你看到的是文字一個一個蹦出來,就像真人在思考並逐字回答一樣。這就是流式處理的魔力! LangGraph Streaming API 讓你的AI應用也能做到: 📝 逐字顯示:像打字機一樣逐漸顯示AI的回答 ⚡ 即時回饋:不用等待完整結果,立即看到進度 🎮 互動體驗:讓使用者感覺AI正在「思考」 📊 過程透明:可以看到AI處理的每個步驟 💡 重要提醒:LangGraph SDK 和 LangGraph Server 都是 LangGraph Platform 的一部分。 🚀 基本使用方法 核心概念解釋 在開始寫程式碼之前,讓我們先理解幾個重要概念: Client(客戶端):你的程式和 LangGraph 服務器溝通的橋樑 Thread(線程):一次對話的容器,可以保存對話歷史 Assistant(助手):你部署的AI圖表,有特定的名稱 Stream Mode(流式模式):決定你想要接收什麼樣的即時資料 Python 實作範例 from langgraph_sdk import get_client # 建立客戶端連線 client = get_client(url=, api_key=) # 使用名為 "agent" 的部署圖表 assistant_id = "agent" # 建立一個對話線程 thread = await client.threads.create() thread_id = thread["thread_id"] # 開始流式處理 async for chunk in client.runs.stream( thread_id, assistant_id, input=inputs, stream_mode="updates" ): print(chunk.data) JavaScript 實作範例 import { Client } from "@langchain/langgraph-sdk"; // 建立客戶端連線 const client = new Client({ apiUrl: , apiKey: }); // 使用名為 "agent" 的部署圖表 const assistantID = "agent"; // 建立一個對話線程 const thread = await client.threads.create(); const threadID = thread["thread_id"]; // 開始流式處理 const streamResponse = client.runs.stream( threadID, assistantID, { input, streamMode: "updates" } ); for await (const chunk of streamResponse) { console.log(chunk.data); } cURL 實作範例 建立線程: curl --request POST \ --url /threads \ --header 'Content-Type: application/json' \ --data '{}' 開始流式處理: curl --request POST \ --url /threads//runs/stream \ --header 'Content-Type: application/json' \ --header 'x-api-key: ' --data "{ \"assistant_id\": \"agent\", \"input\": , \"stream_mode\": \"updates\" }" 🎨 完整範例:建立一個會講笑話的AI 讓我們用一個具體的例子來理解流式處理的威力。我們要建立一個AI,它會先改善你給的主題,然後生成一個笑話。 圖表定義 首先,我們定義一個簡單的處理流程: # graph.py from typing import TypedDict from langgraph.graph import StateGraph, START, END class State(TypedDict): topic: str # 主題 joke: str # 笑話 def refine_topic(state: State): """改善主題,讓它更有趣""" return {"topic": state["topic"] + " and cats"} def generate_joke(state: State): """根據主題生成笑話""" return {"joke": f"This is a joke about {state['topic']}"} # 建立處理流程圖 graph = ( StateGraph(State) .add_node(refine_topic) # 添加改善主題節點 .add_node(generate_joke) # 添加生成笑話節點 .add_edge(START, "refine_topic") # 從開始到改善主題 .add_edge("refine_topic", "generate_joke") # 從改善主題到生成笑話 .add_edge("generate_joke", END) # 從生成笑話到結束 .compile() ) 使用流式處理觀察過程 現在讓我們看看如何即時觀察這個處理過程: Python 版本: from langgraph_sdk import get_client client = get_client(url=) assistant_id = "agent" # 建立對話線程 thread = await client.threads.create() thread_id = thread["thread_id"] # 使用流式處理觀察每個步驟 async for chunk in client.runs.stream( thread_id, assistant_id, input={"topic": "ice cream"}, # 輸入主題:冰淇淋 stream_mode="updates" # 只看狀態更新 ): print(chunk.data) JavaScript 版本: import { Client } from "@langchain/langgraph-sdk"; const client = new Client({ apiUrl: }); const assistantID = "agent"; // 建立對話線程 const thread = await client.threads.create(); const threadID = thread["thread_id"]; // 使用流式處理觀察每個步驟 const streamResponse = client.runs.stream( threadID, assistantID, { input: { topic: "ice cream" }, // 輸入主題:冰淇淋 streamMode: "updates" // 只看狀態更新 } ); for await (const chunk of streamResponse) { console.log(chunk.data); } cURL 版本: 建立線程: curl --request POST \ --url /threads \ --header 'Content-Type: application/json' \ --data '{}' 開始流式處理: curl --request POST \ --url /threads//runs/stream \ --header 'Content-Type: application/json' \ --data "{ \"assistant_id\": \"agent\", \"input\": {\"topic\": \"ice cream\"}, \"stream_mode\": \"updates\" }" 預期輸出結果 當你運行上面的程式碼時,會看到類似這樣的輸出: {'run_id': '1f02c2b3-3cef-68de-b720-eec2a4a8e920', 'attempt': 1} {'refine_topic': {'topic': 'ice cream and cats'}} {'generate_joke': {'joke': 'This is a joke about ice cream and cats'}} 看到了嗎?你可以即時看到AI處理的每個步驟! 🎛️ 六種流式模式詳解 LangGraph 提供了六種不同的流式模式,每種都有不同的用途: 📊 流式模式對照表 values串流完整的圖表狀態想看到每步驟後的完整資料updates串流狀態更新只想看到每步驟的變化messages-tuple串流LLM令牌想看到AI逐字回答debug串流所有可能資訊需要除錯或深度分析custom串流自訂資料想要傳送特殊的自訂資訊events串流所有事件需要最詳細的執行資訊 🔄 同時使用多種模式 你可以同時使用多種模式來獲得更豐富的資訊: Python 版本: async for chunk in client.runs.stream( thread_id, assistant_id, input=inputs, stream_mode=["updates", "custom"] # 同時使用兩種模式 ): print(chunk) JavaScript 版本: const streamResponse = client.runs.stream( threadID, assistantID, { input, streamMode: ["updates", "custom"] // 同時使用兩種模式 } ); for await (const chunk of streamResponse) { console.log(chunk); } cURL 版本: curl --request POST \ --url /threads//runs/stream \ --header 'Content-Type: application/json' \ --data "{ \"assistant_id\": \"agent\", \"input\": , \"stream_mode\": [ \"updates\", \"custom\" ] }" 📈 觀察圖表狀態變化 updates 模式:只看變化 這個模式只會告訴你每個步驟改變了什麼,非常適合追蹤處理進度。 Python 實作: async for chunk in client.runs.stream( thread_id, assistant_id, input={"topic": "ice cream"}, stream_mode="updates" # 只看狀態更新 ): print(chunk.data) JavaScript 實作: const streamResponse = client.runs.stream( threadID, assistantID, { input: { topic: "ice cream" }, streamMode: "updates" // 只看狀態更新 } ); for await (const chunk of streamResponse) { console.log(chunk.data); } cURL 實作: curl --request POST \ --url /threads//runs/stream \ --header 'Content-Type: application/json' \ --data "{ \"assistant_id\": \"agent\", \"input\": {\"topic\": \"ice cream\"}, \"stream_mode\": \"updates\" }" values 模式:看完整狀態 這個模式會顯示每個步驟後的完整狀態,讓你瞭解目前所有的資料。 Python 實作: async for chunk in client.runs.stream( thread_id, assistant_id, input={"topic": "ice cream"}, stream_mode="values" # 看完整狀態 ): print(chunk.data) JavaScript 實作: const streamResponse = client.runs.stream( threadID, assistantID, { input: { topic: "ice cream" }, streamMode: "values" // 看完整狀態 } ); for await (const chunk of streamResponse) { console.log(chunk.data); } cURL 實作: curl --request POST \ --url /threads//runs/stream \ --header 'Content-Type: application/json' \ --data "{ \"assistant_id\": \"agent\", \"input\": {\"topic\": \"ice cream\"}, \"stream_mode\": \"values\" }" 🏗️ 子圖表處理 當你的AI應用變得複雜,可能會有多層的處理結構(子圖表)。LangGraph 也能讓你看到子圖表的處理過程。 啟用子圖表串流 要包含子圖表的輸出,只需要設定 stream_subgraphs=True: for chunk in client.runs.stream( thread_id, assistant_id, input={"foo": "foo"}, stream_subgraphs=True, # 啟用子圖表串流 stream_mode="updates", ): print(chunk) 子圖表範例程式碼 讓我們看一個有子圖表的完整範例: 圖表定義: # graph.py from langgraph.graph import START, StateGraph from typing import TypedDict # 定義子圖表 class SubgraphState(TypedDict): foo: str # 與父圖表共享的鍵 bar: str def subgraph_node_1(state: SubgraphState): return {"bar": "bar"} def subgraph_node_2(state: SubgraphState): return {"foo": state["foo"] + state["bar"]} # 建立子圖表 subgraph_builder = StateGraph(SubgraphState) subgraph_builder.add_node(subgraph_node_1) subgraph_builder.add_node(subgraph_node_2) subgraph_builder.add_edge(START, "subgraph_node_1") subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2") subgraph = subgraph_builder.compile() # 定義父圖表 class ParentState(TypedDict): foo: str def node_1(state: ParentState): return {"foo": "hi! " + state["foo"]} # 建立父圖表 builder = StateGraph(ParentState) builder.add_node("node_1", node_1) builder.add_node("node_2", subgraph) # 將子圖表當作節點 builder.add_edge(START, "node_1") builder.add_edge("node_1", "node_2") graph = builder.compile() 使用子圖表串流: Python 版本: async for chunk in client.runs.stream( thread_id, assistant_id, input={"foo": "foo"}, stream_subgraphs=True, # 啟用子圖表串流 stream_mode="updates", ): print(chunk) JavaScript 版本: const streamResponse = client.runs.stream( threadID, assistantID, { input: { foo: "foo" }, streamSubgraphs: true, // 啟用子圖表串流 streamMode: "updates" } ); for await (const chunk of streamResponse) { console.log(chunk); } cURL 版本: curl --request POST \ --url /threads//runs/stream \ --header 'Content-Type: application/json' \ --data "{ \"assistant_id\": \"agent\", \"input\": {\"foo\": \"foo\"}, \"stream_subgraphs\": true, \"stream_mode\": [\"updates\"] }" 🐛 除錯模式 當你需要詳細瞭解AI的處理過程時,debug 模式是你的最佳朋友。它會提供盡可能多的資訊。 Python 實作: async for chunk in client.runs.stream( thread_id, assistant_id, input={"topic": "ice cream"}, stream_mode="debug" # 除錯模式 ): print(chunk.data) JavaScript 實作: const streamResponse = client.runs.stream( threadID, assistantID, { input: { topic: "ice cream" }, streamMode: "debug" // 除錯模式 } ); for await (const chunk of streamResponse) { console.log(chunk.data); } cURL 實作: curl --request POST \ --url /threads//runs/stream \ --header 'Content-Type: application/json' \ --data "{ \"assistant_id\": \"agent\", \"input\": {\"topic\": \"ice cream\"}, \"stream_mode\": \"debug\" }" 🤖 LLM 令牌串流 這是最酷的功能之一!你可以看到AI一字一字地產生回答,就像真人在打字一樣。 基本概念 messages-tuple 模式會回傳一個元組 (message_chunk, metadata): message_chunk:LLM產生的令牌或訊息片段 metadata:包含圖表節點和LLM調用詳細資訊的字典 範例圖表 首先,讓我們建立一個會調用LLM的圖表: from dataclasses import dataclass from langchain.chat_models import init_chat_model from langgraph.graph import StateGraph, START @dataclass class MyState: topic: str joke: str = "" # 初始化LLM llm = init_chat_model(model="openai:gpt-4o-mini") def call_model(state: MyState): """調用LLM生成關於主題的笑話""" llm_response = llm.invoke([ {"role": "user", "content": f"Generate a joke about {state.topic}"} ]) return {"joke": llm_response.content} # 建立圖表 graph = ( StateGraph(MyState) .add_node(call_model) .add_edge(START, "call_model") .compile() ) 串流LLM令牌 Python 實作: async for chunk in client.runs.stream( thread_id, assistant_id, input={"topic": "ice cream"}, stream_mode="messages-tuple", # LLM令牌模式 ): if chunk.event != "messages": continue message_chunk, metadata = chunk.data # 解包令牌和元資料 if message_chunk["content"]: print(message_chunk["content"], end="|", flush=True) JavaScript 實作: const streamResponse = client.runs.stream( threadID, assistantID, { input: { topic: "ice cream" }, streamMode: "messages-tuple" // LLM令牌模式 } ); for await (const chunk of streamResponse) { if (chunk.event !== "messages") { continue; } console.log(chunk.data[0]["content"]); // 印出令牌內容 } cURL 實作: curl --request POST \ --url /threads//runs/stream \ --header 'Content-Type: application/json' \ --data "{ \"assistant_id\": \"agent\", \"input\": {\"topic\": \"ice cream\"}, \"stream_mode\": \"messages-tuple\" }" 🎨 自訂資料串流 有時候你想要傳送特殊的自訂資料,custom 模式就是為此而生的。 Python 實作: async for chunk in client.runs.stream( thread_id, assistant_id, input={"query": "example"}, stream_mode="custom" # 自訂資料模式 ): print(chunk.data) JavaScript 實作: const streamResponse = client.runs.stream( threadID, assistantID, { input: { query: "example" }, streamMode: "custom" // 自訂資料模式 } ); for await (const chunk of streamResponse) { console.log(chunk.data); } cURL 實作: curl --request POST \ --url /threads//runs/stream \ --header 'Content-Type: application/json' \ --data "{ \"assistant_id\": \"agent\", \"input\": {\"query\": \"example\"}, \"stream_mode\": \"custom\" }" 🌍 事件串流 如果你需要最詳細的資訊,events 模式會串流所有事件,包括圖表的狀態。 Python 實作: async for chunk in client.runs.stream( thread_id, assistant_id, input={"topic": "ice cream"}, stream_mode="events" # 事件模式 ): print(chunk.data) JavaScript 實作: const streamResponse = client.runs.stream( threadID, assistantID, { input: { topic: "ice cream" }, streamMode: "events" // 事件模式 } ); for await (const chunk of streamResponse) { console.log(chunk.data); } cURL 實作: curl --request POST \ --url /threads//runs/stream \ --header 'Content-Type: application/json' \ --data "{ \"assistant_id\": \"agent\", \"input\": {\"topic\": \"ice cream\"}, \"stream_mode\": \"events\" }" 🚀 無狀態執行 如果你不想要保存對話歷史(不使用檢查點),可以建立無狀態的執行: Python 實作: from langgraph_sdk import get_client client = get_client(url=, api_key=) async for chunk in client.runs.stream( None, # 傳入 None 而不是線程ID assistant_id, input=inputs, stream_mode="updates" ): print(chunk.data) JavaScript 實作: import { Client } from "@langchain/langgraph-sdk"; const client = new Client({ apiUrl: , apiKey: }); const streamResponse = client.runs.stream( null, // 傳入 null 而不是線程ID assistantID, { input, streamMode: "updates" } ); for await (const chunk of streamResponse) { console.log(chunk.data); } cURL 實作: curl --request POST \ --url /runs/stream \ --header 'Content-Type: application/json' \ --header 'x-api-key: ' --data "{ \"assistant_id\": \"agent\", \"input\": , \"stream_mode\": \"updates\" }" 🔗 加入並串流 LangGraph Platform 讓你可以加入一個正在執行的背景任務並串流其輸出。 Python 實作: from langgraph_sdk import get_client client = get_client(url=, api_key=) async for chunk in client.runs.join_stream( thread_id, run_id, # 你想要加入的現有執行ID ): print(chunk) JavaScript 實作: import { Client } from "@langchain/langgraph-sdk"; const client = new Client({ apiUrl: , apiKey: }); const streamResponse = client.runs.joinStream( threadID, runId // 你想要加入的現有執行ID ); for await (const chunk of streamResponse) { console.log(chunk); } cURL 實作: curl --request GET \ --url /threads//runs//stream \ --header 'Content-Type: application/json' \ --header 'x-api-key: ' ⚠️ 重要提醒:使用 join_stream 時,輸出不會被緩衝,所以在加入之前產生的任何輸出都不會收到。 🎯 實用建議與最佳實踐 🌟 何時使用哪種模式? 開發聊天應用:使用 messages-tuple 模式來獲得逐字回答效果 監控處理進度:使用 updates 模式來追蹤每個步驟的變化 除錯問題:使用 debug 模式來獲得最詳細的資訊 效能監控:使用 values 模式來查看完整狀態變化 特殊需求:使用 custom 模式來傳送自訂資料 💡 效能優化提示 如果不需要保存對話歷史,使用無狀態執行可以提高效能 選擇合適的串流模式,避免接收不必要的資料 使用多種模式時要考慮網路頻寬和處理能力 🔧 常見問題解決 串流中斷:檢查網路連線和API金鑰 資料格式錯誤:確認使用正確的串流模式 效能問題:考慮使用更輕量的串流模式 🎉 開始你的即時AI之旅! 現在你已經掌握了 LangGraph Streaming API 的所有精髓!無論是想要建立: 💬 即時聊天應用:讓AI像真人一樣逐字回答 📊 處理進度監控:即時查看AI的思考過程 🔍 除錯工具:深入瞭解AI的每個處理步驟 🎮 互動體驗:創造更生動的使用者介面 串流API都能幫你實現!記住,好的使用者體驗往往來自於這些即時的回饋和互動。現在就開始讓你的AI應用「活」起來吧! 🚀 📚 進階學習:想要了解更多API細節,可以參考 API參考文檔。