- 流图状态 — 使用
updates和values模式获取状态更新/值。 - 流子图输出 — 包含父图和任何嵌套子图的输出。
- 流 LLM 令牌 — 从任何地方捕获令牌流:节点内部、子图或工具中。
- 流自定义数据 — 直接从工具函数发送自定义更新或进度信号。
- 使用多种流模式 — 从
values(完整状态)、updates(状态增量)、messages(LLM 令牌 + 元数据)、custom(任意用户数据)或debug(详细跟踪)中选择。
支持的流模式
将以下流模式之一或多个作为列表传递给stream() 方法:
| 模式 | 描述 |
|---|---|
values | 在图的每一步之后流式传输状态的完整值。 |
updates | 在图的每一步之后流式传输状态更新。如果在同一步骤中进行了多次更新(例如,运行了多个节点),则这些更新将分别流式传输。 |
custom | 从您的图节点内部流式传输自定义数据。 |
messages | 从任何调用LLM的图节点流式传输2元组(LLM令牌,元数据)。 |
debug | 在图的执行过程中尽可能流式传输尽可能多的信息。 |
基本用法示例
LangGraph 图暴露了.stream() 方法以生成迭代器形式的流式输出。
for await (const chunk of await graph.stream(inputs, {
streamMode: "updates",
})) {
console.log(chunk);
}
Extended example: streaming updates
Extended example: streaming updates
import { StateGraph, START, END } from "@langchain/langgraph";
import * as z from "zod";
const State = z.object({
topic: z.string(),
joke: z.string(),
});
const graph = new StateGraph(State)
.addNode("refineTopic", (state) => {
return { topic: state.topic + " and cats" };
})
.addNode("generateJoke", (state) => {
return { joke: `This is a joke about ${state.topic}` };
})
.addEdge(START, "refineTopic")
.addEdge("refineTopic", "generateJoke")
.addEdge("generateJoke", END)
.compile();
for await (const chunk of await graph.stream(
{ topic: "ice cream" },
// Set streamMode: "updates" to stream only the updates to the graph state after each node
// Other stream modes are also available. See supported stream modes for details
{ streamMode: "updates" }
)) {
console.log(chunk);
}
{'refineTopic': {'topic': 'ice cream and cats'}}
{'generateJoke': {'joke': 'This is a joke about ice cream and cats'}}
流式传输多种模式
您可以将一个数组作为streamMode 参数传递,以同时流式传输多个模式。
流出的输出将是包含 [mode, chunk] 的元组,其中 mode 是流模式的名称,chunk 是该模式流出的数据。
for await (const [mode, chunk] of await graph.stream(inputs, {
streamMode: ["updates", "custom"],
})) {
console.log(chunk);
}
流图状态
使用updates 和 values 流模式以流的形式执行时传输图的状态。
updates流出图每一步之后的 状态更新。values流出图每一步之后的 状态全值。
import { StateGraph, START, END } from "@langchain/langgraph";
import * as z from "zod";
const State = z.object({
topic: z.string(),
joke: z.string(),
});
const graph = new StateGraph(State)
.addNode("refineTopic", (state) => {
return { topic: state.topic + " and cats" };
})
.addNode("generateJoke", (state) => {
return { joke: `This is a joke about ${state.topic}` };
})
.addEdge(START, "refineTopic")
.addEdge("refineTopic", "generateJoke")
.addEdge("generateJoke", END)
.compile();
- updates
- values
使用此功能仅流式传输节点在每一步返回的状态更新。流式传输的输出包括节点的名称以及更新内容。
for await (const chunk of await graph.stream(
{ topic: "ice cream" },
{ streamMode: "updates" }
)) {
console.log(chunk);
}
使用此功能以流式传输每一步之后的图完整状态。
for await (const chunk of await graph.stream(
{ topic: "ice cream" },
{ streamMode: "values" }
)) {
console.log(chunk);
}
流子图输出
要将 子图 的输出包含在流式输出中,您可以在父图的.stream() 方法中设置 subgraphs: true。这将流式传输来自父图和任何子图的输出。
输出将以元组的形式流式传输 [namespace, data],其中 namespace 是一个包含子图调用节点路径的元组,例如 ["parent_node:<task_id>", "child_node:<task_id>"]。
for await (const chunk of await graph.stream(
{ foo: "foo" },
{
// Set subgraphs: true to stream outputs from subgraphs
subgraphs: true,
streamMode: "updates",
}
)) {
console.log(chunk);
}
Extended example: streaming from subgraphs
Extended example: streaming from subgraphs
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";
// Define subgraph
const SubgraphState = z.object({
foo: z.string(), // note that this key is shared with the parent graph state
bar: z.string(),
});
const subgraphBuilder = new StateGraph(SubgraphState)
.addNode("subgraphNode1", (state) => {
return { bar: "bar" };
})
.addNode("subgraphNode2", (state) => {
return { foo: state.foo + state.bar };
})
.addEdge(START, "subgraphNode1")
.addEdge("subgraphNode1", "subgraphNode2");
const subgraph = subgraphBuilder.compile();
// Define parent graph
const ParentState = z.object({
foo: z.string(),
});
const builder = new StateGraph(ParentState)
.addNode("node1", (state) => {
return { foo: "hi! " + state.foo };
})
.addNode("node2", subgraph)
.addEdge(START, "node1")
.addEdge("node1", "node2");
const graph = builder.compile();
for await (const chunk of await graph.stream(
{ foo: "foo" },
{
streamMode: "updates",
// Set subgraphs: true to stream outputs from subgraphs
subgraphs: true,
}
)) {
console.log(chunk);
}
[[], {'node1': {'foo': 'hi! foo'}}]
[['node2:dfddc4ba-c3c5-6887-5012-a243b5b377c2'], {'subgraphNode1': {'bar': 'bar'}}]
[['node2:dfddc4ba-c3c5-6887-5012-a243b5b377c2'], {'subgraphNode2': {'foo': 'hi! foobar'}}]
[[], {'node2': {'foo': 'hi! foobar'}}]
调试
使用debug 流式模式,在整个图执行过程中尽可能多地传输信息。流出的输出包括节点名称以及完整状态。
for await (const chunk of await graph.stream(
{ topic: "ice cream" },
{ streamMode: "debug" }
)) {
console.log(chunk);
}
大型语言模型令牌
使用messages 流式模式从您的图中的任何部分(包括节点、工具、子图或任务)逐个标记地流式传输大型语言模型(LLM)的输出。
流式输出的结果来自 messages 模式,是一个元组 [message_chunk, metadata],其中:
message_chunk:来自LLM的标记或消息段。metadata:包含关于图节点和LLM调用的详细信息的字典。
custom 模式来流式传输其输出。有关详细信息,请参阅 与任何LLM一起使用。
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";
const MyState = z.object({
topic: z.string(),
joke: z.string().default(""),
});
const model = new ChatOpenAI({ model: "gpt-4o-mini" });
const callModel = async (state: z.infer<typeof MyState>) => {
// Call the LLM to generate a joke about a topic
// Note that message events are emitted even when the LLM is run using .invoke rather than .stream
const modelResponse = await model.invoke([
{ role: "user", content: `Generate a joke about ${state.topic}` },
]);
return { joke: modelResponse.content };
};
const graph = new StateGraph(MyState)
.addNode("callModel", callModel)
.addEdge(START, "callModel")
.compile();
// The "messages" stream mode returns an iterator of tuples [messageChunk, metadata]
// where messageChunk is the token streamed by the LLM and metadata is a dictionary
// with information about the graph node where the LLM was called and other information
for await (const [messageChunk, metadata] of await graph.stream(
{ topic: "ice cream" },
{ streamMode: "messages" }
)) {
if (messageChunk.content) {
console.log(messageChunk.content + "|");
}
}
通过LLM调用进行筛选
您可以关联tags 与 LLM 调用来通过 LLM 调用过滤流出的标记。
import { ChatOpenAI } from "@langchain/openai";
// model1 is tagged with "joke"
const model1 = new ChatOpenAI({
model: "gpt-4o-mini",
tags: ['joke']
});
// model2 is tagged with "poem"
const model2 = new ChatOpenAI({
model: "gpt-4o-mini",
tags: ['poem']
});
const graph = // ... define a graph that uses these LLMs
// The streamMode is set to "messages" to stream LLM tokens
// The metadata contains information about the LLM invocation, including the tags
for await (const [msg, metadata] of await graph.stream(
{ topic: "cats" },
{ streamMode: "messages" }
)) {
// Filter the streamed tokens by the tags field in the metadata to only include
// the tokens from the LLM invocation with the "joke" tag
if (metadata.tags?.includes("joke")) {
console.log(msg.content + "|");
}
}
Extended example: filtering by tags
Extended example: filtering by tags
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";
// The jokeModel is tagged with "joke"
const jokeModel = new ChatOpenAI({
model: "gpt-4o-mini",
tags: ["joke"]
});
// The poemModel is tagged with "poem"
const poemModel = new ChatOpenAI({
model: "gpt-4o-mini",
tags: ["poem"]
});
const State = z.object({
topic: z.string(),
joke: z.string(),
poem: z.string(),
});
const graph = new StateGraph(State)
.addNode("callModel", (state) => {
const topic = state.topic;
console.log("Writing joke...");
const jokeResponse = await jokeModel.invoke([
{ role: "user", content: `Write a joke about ${topic}` }
]);
console.log("\n\nWriting poem...");
const poemResponse = await poemModel.invoke([
{ role: "user", content: `Write a short poem about ${topic}` }
]);
return {
joke: jokeResponse.content,
poem: poemResponse.content
};
})
.addEdge(START, "callModel")
.compile();
// The streamMode is set to "messages" to stream LLM tokens
// The metadata contains information about the LLM invocation, including the tags
for await (const [msg, metadata] of await graph.stream(
{ topic: "cats" },
{ streamMode: "messages" }
)) {
// Filter the streamed tokens by the tags field in the metadata to only include
// the tokens from the LLM invocation with the "joke" tag
if (metadata.tags?.includes("joke")) {
console.log(msg.content + "|");
}
}
按节点筛选
仅从特定节点流式传输令牌时,请使用stream_mode="messages" 并通过流式元数据中的 langgraph_node 字段过滤输出:
// The "messages" stream mode returns a tuple of [messageChunk, metadata]
// where messageChunk is the token streamed by the LLM and metadata is a dictionary
// with information about the graph node where the LLM was called and other information
for await (const [msg, metadata] of await graph.stream(
inputs,
{ streamMode: "messages" }
)) {
// Filter the streamed tokens by the langgraph_node field in the metadata
// to only include the tokens from the specified node
if (msg.content && metadata.langgraph_node === "some_node_name") {
// ...
}
}
Extended example: streaming LLM tokens from specific nodes
Extended example: streaming LLM tokens from specific nodes
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";
const model = new ChatOpenAI({ model: "gpt-4o-mini" });
const State = z.object({
topic: z.string(),
joke: z.string(),
poem: z.string(),
});
const graph = new StateGraph(State)
.addNode("writeJoke", async (state) => {
const topic = state.topic;
const jokeResponse = await model.invoke([
{ role: "user", content: `Write a joke about ${topic}` }
]);
return { joke: jokeResponse.content };
})
.addNode("writePoem", async (state) => {
const topic = state.topic;
const poemResponse = await model.invoke([
{ role: "user", content: `Write a short poem about ${topic}` }
]);
return { poem: poemResponse.content };
})
// write both the joke and the poem concurrently
.addEdge(START, "writeJoke")
.addEdge(START, "writePoem")
.compile();
// The "messages" stream mode returns a tuple of [messageChunk, metadata]
// where messageChunk is the token streamed by the LLM and metadata is a dictionary
// with information about the graph node where the LLM was called and other information
for await (const [msg, metadata] of await graph.stream(
{ topic: "cats" },
{ streamMode: "messages" }
)) {
// Filter the streamed tokens by the langgraph_node field in the metadata
// to only include the tokens from the writePoem node
if (msg.content && metadata.langgraph_node === "writePoem") {
console.log(msg.content + "|");
}
}
流式自定义数据
要从LangGraph节点或工具内部发送自定义用户定义数据,请按照以下步骤操作:- 使用
LangGraphRunnableConfig中的writer参数来发射自定义数据。 - 在调用
.stream()时设置streamMode: "custom"以获取流中的自定义数据。您可以组合多个模式(例如,["updates", "custom"]),但至少必须有一个是"custom"。
- node
- tool
import { StateGraph, START, LangGraphRunnableConfig } from "@langchain/langgraph";
import * as z from "zod";
const State = z.object({
query: z.string(),
answer: z.string(),
});
const graph = new StateGraph(State)
.addNode("node", async (state, config) => {
// Use the writer to emit a custom key-value pair (e.g., progress update)
config.writer({ custom_key: "Generating custom data inside node" });
return { answer: "some data" };
})
.addEdge(START, "node")
.compile();
const inputs = { query: "example" };
// Set streamMode: "custom" to receive the custom data in the stream
for await (const chunk of await graph.stream(inputs, { streamMode: "custom" })) {
console.log(chunk);
}
import { tool } from "@langchain/core/tools";
import { LangGraphRunnableConfig } from "@langchain/langgraph";
import * as z from "zod";
const queryDatabase = tool(
async (input, config: LangGraphRunnableConfig) => {
// Use the writer to emit a custom key-value pair (e.g., progress update)
config.writer({ data: "Retrieved 0/100 records", type: "progress" });
// perform query
// Emit another custom key-value pair
config.writer({ data: "Retrieved 100/100 records", type: "progress" });
return "some-answer";
},
{
name: "query_database",
description: "Query the database.",
schema: z.object({
query: z.string().describe("The query to execute."),
}),
}
);
const graph = // ... define a graph that uses this tool
// Set streamMode: "custom" to receive the custom data in the stream
for await (const chunk of await graph.stream(inputs, { streamMode: "custom" })) {
console.log(chunk);
}
与任何大型语言模型(LLM)一起使用
您可以使用streamMode: "custom" 从 任何 LLM API 流数据 —— 即使该 API 没有 实现 LangChain 聊天模型接口。
这使得您能够集成原始的LLM客户端或提供自身流式接口的外部服务,使LangGraph在定制设置中具有高度灵活性。
import { LangGraphRunnableConfig } from "@langchain/langgraph";
const callArbitraryModel = async (
state: any,
config: LangGraphRunnableConfig
) => {
// Example node that calls an arbitrary model and streams the output
// Assume you have a streaming client that yields chunks
// Generate LLM tokens using your custom streaming client
for await (const chunk of yourCustomStreamingClient(state.topic)) {
// Use the writer to send custom data to the stream
config.writer({ custom_llm_chunk: chunk });
}
return { result: "completed" };
};
const graph = new StateGraph(State)
.addNode("callArbitraryModel", callArbitraryModel)
// Add other nodes and edges as needed
.compile();
// Set streamMode: "custom" to receive the custom data in the stream
for await (const chunk of await graph.stream(
{ topic: "cats" },
{ streamMode: "custom" }
)) {
// The chunk will contain the custom data streamed from the llm
console.log(chunk);
}
Extended example: streaming arbitrary chat model
Extended example: streaming arbitrary chat model
import { StateGraph, START, MessagesZodMeta, LangGraphRunnableConfig } from "@langchain/langgraph";
import { BaseMessage } from "@langchain/core/messages";
import { registry } from "@langchain/langgraph/zod";
import * as z from "zod";
import OpenAI from "openai";
const openaiClient = new OpenAI();
const modelName = "gpt-4o-mini";
async function* streamTokens(modelName: string, messages: any[]) {
const response = await openaiClient.chat.completions.create({
messages,
model: modelName,
stream: true,
});
let role: string | null = null;
for await (const chunk of response) {
const delta = chunk.choices[0]?.delta;
if (delta?.role) {
role = delta.role;
}
if (delta?.content) {
yield { role, content: delta.content };
}
}
}
// this is our tool
const getItems = tool(
async (input, config: LangGraphRunnableConfig) => {
let response = "";
for await (const msgChunk of streamTokens(
modelName,
[
{
role: "user",
content: `Can you tell me what kind of items i might find in the following place: '${input.place}'. List at least 3 such items separating them by a comma. And include a brief description of each item.`,
},
]
)) {
response += msgChunk.content;
config.writer?.(msgChunk);
}
return response;
},
{
name: "get_items",
description: "Use this tool to list items one might find in a place you're asked about.",
schema: z.object({
place: z.string().describe("The place to look up items for."),
}),
}
);
const State = z.object({
messages: z
.array(z.custom<BaseMessage>())
.register(registry, MessagesZodMeta),
});
const graph = new StateGraph(State)
// this is the tool-calling graph node
.addNode("callTool", async (state) => {
const aiMessage = state.messages.at(-1);
const toolCall = aiMessage.tool_calls?.at(-1);
const functionName = toolCall?.function?.name;
if (functionName !== "get_items") {
throw new Error(`Tool ${functionName} not supported`);
}
const functionArguments = toolCall?.function?.arguments;
const args = JSON.parse(functionArguments);
const functionResponse = await getItems.invoke(args);
const toolMessage = {
tool_call_id: toolCall.id,
role: "tool",
name: functionName,
content: functionResponse,
};
return { messages: [toolMessage] };
})
.addEdge(START, "callTool")
.compile();
AIMessage 来调用图。const inputs = {
messages: [
{
content: null,
role: "assistant",
tool_calls: [
{
id: "1",
function: {
arguments: '{"place":"bedroom"}',
name: "get_items",
},
type: "function",
}
],
}
]
};
for await (const chunk of await graph.stream(
inputs,
{ streamMode: "custom" }
)) {
console.log(chunk.content + "|");
}
禁用特定聊天模型的流式传输
如果您的应用程序混合了支持流式传输和不支持流式传输的模型,您可能需要显式禁用不支持流式传输的模型的流式传输功能。 在初始化模型时设置streaming: false。
import { ChatOpenAI } from "@langchain/openai";
const model = new ChatOpenAI({
model: "o1-preview",
// Set streaming: false to disable streaming for the chat model
streaming: false,
});