logo
Loading...

Langgraph core components 1 - streaming - AI Agent 開發特訓營:短期實現智能自動化 - Cupoy

🌊 LangGraph Streaming API:讓你的AI即時回應! 🎯 什麼是流式處理?為什麼你需要它? 想像一下,你在和ChatGPT聊天時,如果它要等到完全想好所有回答才一次性顯示出來,...

🌊 LangGraph Streaming API:讓你的AI即時回應! 🎯 什麼是流式處理?為什麼你需要它? 想像一下,你在和ChatGPT聊天時,如果它要等到完全想好所有回答才一次性顯示出來,你會覺得等待時間很漫長。但實際上,你看到的是文字一個一個蹦出來,就像真人在思考並逐字回答一樣。這就是流式處理的魔力! LangGraph Streaming API 讓你的AI應用也能做到: 📝 逐字顯示:像打字機一樣逐漸顯示AI的回答 ⚡ 即時回饋:不用等待完整結果,立即看到進度 🎮 互動體驗:讓使用者感覺AI正在「思考」 📊 過程透明:可以看到AI處理的每個步驟 💡 重要提醒:LangGraph SDK 和 LangGraph Server 都是 LangGraph Platform 的一部分。 🚀 基本使用方法 核心概念解釋 在開始寫程式碼之前,讓我們先理解幾個重要概念: Client(客戶端):你的程式和 LangGraph 服務器溝通的橋樑 Thread(線程):一次對話的容器,可以保存對話歷史 Assistant(助手):你部署的AI圖表,有特定的名稱 Stream Mode(流式模式):決定你想要接收什麼樣的即時資料 Python 實作範例 from langgraph_sdk import get_client # 建立客戶端連線 client = get_client(url=, api_key=) # 使用名為 "agent" 的部署圖表 assistant_id = "agent" # 建立一個對話線程 thread = await client.threads.create() thread_id = thread["thread_id"] # 開始流式處理 async for chunk in client.runs.stream(    thread_id,    assistant_id,    input=inputs,    stream_mode="updates" ):    print(chunk.data) JavaScript 實作範例 import { Client } from "@langchain/langgraph-sdk"; // 建立客戶端連線 const client = new Client({ apiUrl: , apiKey: }); // 使用名為 "agent" 的部署圖表 const assistantID = "agent"; // 建立一個對話線程 const thread = await client.threads.create(); const threadID = thread["thread_id"]; // 開始流式處理 const streamResponse = client.runs.stream(    threadID,    assistantID,    {        input,        streamMode: "updates"    } ); for await (const chunk of streamResponse) {    console.log(chunk.data); } cURL 實作範例 建立線程: curl --request POST \ --url /threads \ --header 'Content-Type: application/json' \ --data '{}' 開始流式處理: curl --request POST \ --url /threads//runs/stream \ --header 'Content-Type: application/json' \ --header 'x-api-key: ' --data "{  \"assistant_id\": \"agent\",  \"input\": ,  \"stream_mode\": \"updates\" }" 🎨 完整範例:建立一個會講笑話的AI 讓我們用一個具體的例子來理解流式處理的威力。我們要建立一個AI,它會先改善你給的主題,然後生成一個笑話。 圖表定義 首先,我們定義一個簡單的處理流程: # graph.py from typing import TypedDict from langgraph.graph import StateGraph, START, END class State(TypedDict):    topic: str  # 主題    joke: str   # 笑話 def refine_topic(state: State):    """改善主題,讓它更有趣"""    return {"topic": state["topic"] + " and cats"} def generate_joke(state: State):    """根據主題生成笑話"""    return {"joke": f"This is a joke about {state['topic']}"} # 建立處理流程圖 graph = (    StateGraph(State)    .add_node(refine_topic)           # 添加改善主題節點    .add_node(generate_joke)          # 添加生成笑話節點    .add_edge(START, "refine_topic")  # 從開始到改善主題    .add_edge("refine_topic", "generate_joke")  # 從改善主題到生成笑話    .add_edge("generate_joke", END)   # 從生成笑話到結束    .compile() ) 使用流式處理觀察過程 現在讓我們看看如何即時觀察這個處理過程: Python 版本: from langgraph_sdk import get_client client = get_client(url=) assistant_id = "agent" # 建立對話線程 thread = await client.threads.create() thread_id = thread["thread_id"] # 使用流式處理觀察每個步驟 async for chunk in client.runs.stream(    thread_id,    assistant_id,    input={"topic": "ice cream"},  # 輸入主題:冰淇淋    stream_mode="updates"          # 只看狀態更新 ):    print(chunk.data) JavaScript 版本: import { Client } from "@langchain/langgraph-sdk"; const client = new Client({ apiUrl: }); const assistantID = "agent"; // 建立對話線程 const thread = await client.threads.create(); const threadID = thread["thread_id"]; // 使用流式處理觀察每個步驟 const streamResponse = client.runs.stream(    threadID,    assistantID,    {        input: { topic: "ice cream" },  // 輸入主題:冰淇淋        streamMode: "updates"           // 只看狀態更新    } ); for await (const chunk of streamResponse) {    console.log(chunk.data); } cURL 版本: 建立線程: curl --request POST \ --url /threads \ --header 'Content-Type: application/json' \ --data '{}' 開始流式處理: curl --request POST \ --url /threads//runs/stream \ --header 'Content-Type: application/json' \ --data "{  \"assistant_id\": \"agent\",  \"input\": {\"topic\": \"ice cream\"},  \"stream_mode\": \"updates\" }" 預期輸出結果 當你運行上面的程式碼時,會看到類似這樣的輸出: {'run_id': '1f02c2b3-3cef-68de-b720-eec2a4a8e920', 'attempt': 1} {'refine_topic': {'topic': 'ice cream and cats'}} {'generate_joke': {'joke': 'This is a joke about ice cream and cats'}} 看到了嗎?你可以即時看到AI處理的每個步驟! 🎛️ 六種流式模式詳解 LangGraph 提供了六種不同的流式模式,每種都有不同的用途: 📊 流式模式對照表 values串流完整的圖表狀態想看到每步驟後的完整資料updates串流狀態更新只想看到每步驟的變化messages-tuple串流LLM令牌想看到AI逐字回答debug串流所有可能資訊需要除錯或深度分析custom串流自訂資料想要傳送特殊的自訂資訊events串流所有事件需要最詳細的執行資訊 🔄 同時使用多種模式 你可以同時使用多種模式來獲得更豐富的資訊: Python 版本: async for chunk in client.runs.stream(    thread_id,    assistant_id,    input=inputs,    stream_mode=["updates", "custom"]  # 同時使用兩種模式 ):    print(chunk) JavaScript 版本: const streamResponse = client.runs.stream(    threadID,    assistantID,    {        input,        streamMode: ["updates", "custom"]  // 同時使用兩種模式    } ); for await (const chunk of streamResponse) {    console.log(chunk); } cURL 版本: curl --request POST \ --url /threads//runs/stream \ --header 'Content-Type: application/json' \ --data "{  \"assistant_id\": \"agent\",  \"input\": ,  \"stream_mode\": [    \"updates\",    \"custom\"  ] }" 📈 觀察圖表狀態變化 updates 模式:只看變化 這個模式只會告訴你每個步驟改變了什麼,非常適合追蹤處理進度。 Python 實作: async for chunk in client.runs.stream(    thread_id,    assistant_id,    input={"topic": "ice cream"},    stream_mode="updates"  # 只看狀態更新 ):    print(chunk.data) JavaScript 實作: const streamResponse = client.runs.stream(    threadID,    assistantID,    {        input: { topic: "ice cream" },        streamMode: "updates"  // 只看狀態更新    } ); for await (const chunk of streamResponse) {    console.log(chunk.data); } cURL 實作: curl --request POST \ --url /threads//runs/stream \ --header 'Content-Type: application/json' \ --data "{  \"assistant_id\": \"agent\",  \"input\": {\"topic\": \"ice cream\"},  \"stream_mode\": \"updates\" }" values 模式:看完整狀態 這個模式會顯示每個步驟後的完整狀態,讓你瞭解目前所有的資料。 Python 實作: async for chunk in client.runs.stream(    thread_id,    assistant_id,    input={"topic": "ice cream"},    stream_mode="values"  # 看完整狀態 ):    print(chunk.data) JavaScript 實作: const streamResponse = client.runs.stream(    threadID,    assistantID,    {        input: { topic: "ice cream" },        streamMode: "values"  // 看完整狀態    } ); for await (const chunk of streamResponse) {    console.log(chunk.data); } cURL 實作: curl --request POST \ --url /threads//runs/stream \ --header 'Content-Type: application/json' \ --data "{  \"assistant_id\": \"agent\",  \"input\": {\"topic\": \"ice cream\"},  \"stream_mode\": \"values\" }" 🏗️ 子圖表處理 當你的AI應用變得複雜,可能會有多層的處理結構(子圖表)。LangGraph 也能讓你看到子圖表的處理過程。 啟用子圖表串流 要包含子圖表的輸出,只需要設定 stream_subgraphs=True: for chunk in client.runs.stream(    thread_id,    assistant_id,    input={"foo": "foo"},    stream_subgraphs=True,  # 啟用子圖表串流    stream_mode="updates", ):    print(chunk) 子圖表範例程式碼 讓我們看一個有子圖表的完整範例: 圖表定義: # graph.py from langgraph.graph import START, StateGraph from typing import TypedDict # 定義子圖表 class SubgraphState(TypedDict):    foo: str  # 與父圖表共享的鍵    bar: str def subgraph_node_1(state: SubgraphState):    return {"bar": "bar"} def subgraph_node_2(state: SubgraphState):    return {"foo": state["foo"] + state["bar"]} # 建立子圖表 subgraph_builder = StateGraph(SubgraphState) subgraph_builder.add_node(subgraph_node_1) subgraph_builder.add_node(subgraph_node_2) subgraph_builder.add_edge(START, "subgraph_node_1") subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2") subgraph = subgraph_builder.compile() # 定義父圖表 class ParentState(TypedDict):    foo: str def node_1(state: ParentState):    return {"foo": "hi! " + state["foo"]} # 建立父圖表 builder = StateGraph(ParentState) builder.add_node("node_1", node_1) builder.add_node("node_2", subgraph)  # 將子圖表當作節點 builder.add_edge(START, "node_1") builder.add_edge("node_1", "node_2") graph = builder.compile() 使用子圖表串流: Python 版本: async for chunk in client.runs.stream(    thread_id,    assistant_id,    input={"foo": "foo"},    stream_subgraphs=True,  # 啟用子圖表串流    stream_mode="updates", ):    print(chunk) JavaScript 版本: const streamResponse = client.runs.stream(    threadID,    assistantID,    {        input: { foo: "foo" },        streamSubgraphs: true,  // 啟用子圖表串流        streamMode: "updates"    } ); for await (const chunk of streamResponse) {    console.log(chunk); } cURL 版本: curl --request POST \ --url /threads//runs/stream \ --header 'Content-Type: application/json' \ --data "{  \"assistant_id\": \"agent\",  \"input\": {\"foo\": \"foo\"},  \"stream_subgraphs\": true,  \"stream_mode\": [\"updates\"] }" 🐛 除錯模式 當你需要詳細瞭解AI的處理過程時,debug 模式是你的最佳朋友。它會提供盡可能多的資訊。 Python 實作: async for chunk in client.runs.stream(    thread_id,    assistant_id,    input={"topic": "ice cream"},    stream_mode="debug"  # 除錯模式 ):    print(chunk.data) JavaScript 實作: const streamResponse = client.runs.stream(    threadID,    assistantID,    {        input: { topic: "ice cream" },        streamMode: "debug"  // 除錯模式    } ); for await (const chunk of streamResponse) {    console.log(chunk.data); } cURL 實作: curl --request POST \ --url /threads//runs/stream \ --header 'Content-Type: application/json' \ --data "{  \"assistant_id\": \"agent\",  \"input\": {\"topic\": \"ice cream\"},  \"stream_mode\": \"debug\" }" 🤖 LLM 令牌串流 這是最酷的功能之一!你可以看到AI一字一字地產生回答,就像真人在打字一樣。 基本概念 messages-tuple 模式會回傳一個元組 (message_chunk, metadata): message_chunk:LLM產生的令牌或訊息片段 metadata:包含圖表節點和LLM調用詳細資訊的字典 範例圖表 首先,讓我們建立一個會調用LLM的圖表: from dataclasses import dataclass from langchain.chat_models import init_chat_model from langgraph.graph import StateGraph, START @dataclass class MyState:    topic: str    joke: str = "" # 初始化LLM llm = init_chat_model(model="openai:gpt-4o-mini") def call_model(state: MyState):    """調用LLM生成關於主題的笑話"""    llm_response = llm.invoke([        {"role": "user", "content": f"Generate a joke about {state.topic}"}    ])    return {"joke": llm_response.content} # 建立圖表 graph = (    StateGraph(MyState)    .add_node(call_model)    .add_edge(START, "call_model")    .compile() ) 串流LLM令牌 Python 實作: async for chunk in client.runs.stream(    thread_id,    assistant_id,    input={"topic": "ice cream"},    stream_mode="messages-tuple",  # LLM令牌模式 ):    if chunk.event != "messages":        continue        message_chunk, metadata = chunk.data  # 解包令牌和元資料    if message_chunk["content"]:        print(message_chunk["content"], end="|", flush=True) JavaScript 實作: const streamResponse = client.runs.stream(    threadID,    assistantID,    {        input: { topic: "ice cream" },        streamMode: "messages-tuple"  // LLM令牌模式    } ); for await (const chunk of streamResponse) {    if (chunk.event !== "messages") {        continue;    }    console.log(chunk.data[0]["content"]);  // 印出令牌內容 } cURL 實作: curl --request POST \ --url /threads//runs/stream \ --header 'Content-Type: application/json' \ --data "{  \"assistant_id\": \"agent\",  \"input\": {\"topic\": \"ice cream\"},  \"stream_mode\": \"messages-tuple\" }" 🎨 自訂資料串流 有時候你想要傳送特殊的自訂資料,custom 模式就是為此而生的。 Python 實作: async for chunk in client.runs.stream(    thread_id,    assistant_id,    input={"query": "example"},    stream_mode="custom"  # 自訂資料模式 ):    print(chunk.data) JavaScript 實作: const streamResponse = client.runs.stream(    threadID,    assistantID,    {        input: { query: "example" },        streamMode: "custom"  // 自訂資料模式    } ); for await (const chunk of streamResponse) {    console.log(chunk.data); } cURL 實作: curl --request POST \ --url /threads//runs/stream \ --header 'Content-Type: application/json' \ --data "{  \"assistant_id\": \"agent\",  \"input\": {\"query\": \"example\"},  \"stream_mode\": \"custom\" }" 🌍 事件串流 如果你需要最詳細的資訊,events 模式會串流所有事件,包括圖表的狀態。 Python 實作: async for chunk in client.runs.stream(    thread_id,    assistant_id,    input={"topic": "ice cream"},    stream_mode="events"  # 事件模式 ):    print(chunk.data) JavaScript 實作: const streamResponse = client.runs.stream(    threadID,    assistantID,    {        input: { topic: "ice cream" },        streamMode: "events"  // 事件模式    } ); for await (const chunk of streamResponse) {    console.log(chunk.data); } cURL 實作: curl --request POST \ --url /threads//runs/stream \ --header 'Content-Type: application/json' \ --data "{  \"assistant_id\": \"agent\",  \"input\": {\"topic\": \"ice cream\"},  \"stream_mode\": \"events\" }" 🚀 無狀態執行 如果你不想要保存對話歷史(不使用檢查點),可以建立無狀態的執行: Python 實作: from langgraph_sdk import get_client client = get_client(url=, api_key=) async for chunk in client.runs.stream(    None,  # 傳入 None 而不是線程ID    assistant_id,    input=inputs,    stream_mode="updates" ):    print(chunk.data) JavaScript 實作: import { Client } from "@langchain/langgraph-sdk"; const client = new Client({ apiUrl: , apiKey: }); const streamResponse = client.runs.stream(    null,  // 傳入 null 而不是線程ID    assistantID,    {        input,        streamMode: "updates"    } ); for await (const chunk of streamResponse) {    console.log(chunk.data); } cURL 實作: curl --request POST \ --url /runs/stream \ --header 'Content-Type: application/json' \ --header 'x-api-key: ' --data "{  \"assistant_id\": \"agent\",  \"input\": ,  \"stream_mode\": \"updates\" }" 🔗 加入並串流 LangGraph Platform 讓你可以加入一個正在執行的背景任務並串流其輸出。 Python 實作: from langgraph_sdk import get_client client = get_client(url=, api_key=) async for chunk in client.runs.join_stream(    thread_id,    run_id,  # 你想要加入的現有執行ID ):    print(chunk) JavaScript 實作: import { Client } from "@langchain/langgraph-sdk"; const client = new Client({ apiUrl: , apiKey: }); const streamResponse = client.runs.joinStream(    threadID,    runId  // 你想要加入的現有執行ID ); for await (const chunk of streamResponse) {    console.log(chunk); } cURL 實作: curl --request GET \ --url /threads//runs//stream \ --header 'Content-Type: application/json' \ --header 'x-api-key: ' ⚠️ 重要提醒:使用 join_stream 時,輸出不會被緩衝,所以在加入之前產生的任何輸出都不會收到。 🎯 實用建議與最佳實踐 🌟 何時使用哪種模式? 開發聊天應用:使用 messages-tuple 模式來獲得逐字回答效果 監控處理進度:使用 updates 模式來追蹤每個步驟的變化 除錯問題:使用 debug 模式來獲得最詳細的資訊 效能監控:使用 values 模式來查看完整狀態變化 特殊需求:使用 custom 模式來傳送自訂資料 💡 效能優化提示 如果不需要保存對話歷史,使用無狀態執行可以提高效能 選擇合適的串流模式,避免接收不必要的資料 使用多種模式時要考慮網路頻寬和處理能力 🔧 常見問題解決 串流中斷:檢查網路連線和API金鑰 資料格式錯誤:確認使用正確的串流模式 效能問題:考慮使用更輕量的串流模式 🎉 開始你的即時AI之旅! 現在你已經掌握了 LangGraph Streaming API 的所有精髓!無論是想要建立: 💬 即時聊天應用:讓AI像真人一樣逐字回答 📊 處理進度監控:即時查看AI的思考過程 🔍 除錯工具:深入瞭解AI的每個處理步驟 🎮 互動體驗:創造更生動的使用者介面 串流API都能幫你實現!記住,好的使用者體驗往往來自於這些即時的回饋和互動。現在就開始讓你的AI應用「活」起來吧! 🚀 📚 進階學習:想要了解更多API細節,可以參考 API參考文檔。