Skip to main content
LangGraph实现了一个流式系统以呈现实时更新。流式传输对于增强基于LLM的应用程序的响应性至关重要。通过逐步显示输出,甚至在完整响应准备就绪之前,流式传输显著提升了用户体验(UX),尤其是在处理LLM的延迟时。 LangGraph流式传输可以实现什么?
  • 流图状态 — 使用 updatesvalues 模式获取状态更新/值。
  • 流子图输出 — 包含父图和任何嵌套子图的输出。
  • 流 LLM 令牌 — 从任何地方捕获令牌流:节点内部、子图或工具中。
  • 流自定义数据 — 直接从工具函数发送自定义更新或进度信号。
  • 使用多种流模式 — 从 values(完整状态)、updates(状态增量)、messages(LLM 令牌 + 元数据)、custom(任意用户数据)或 debug(详细跟踪)中选择。

支持的流模式

将以下流模式之一或多个作为列表传递给 stream() 方法:
模式描述
values在图的每一步之后流式传输状态的完整值。
updates在图的每一步之后流式传输状态更新。如果在同一步骤中进行了多次更新(例如,运行了多个节点),则这些更新将分别流式传输。
custom从您的图节点内部流式传输自定义数据。
messages从任何调用LLM的图节点流式传输2元组(LLM令牌,元数据)。
debug在图的执行过程中尽可能流式传输尽可能多的信息。

基本用法示例

LangGraph 图暴露了 .stream() 方法以生成迭代器形式的流式输出。
for await (const chunk of await graph.stream(inputs, {
  streamMode: "updates",
})) {
  console.log(chunk);
}
import { StateGraph, START, END } from "@langchain/langgraph";
import * as z from "zod";

const State = z.object({
  topic: z.string(),
  joke: z.string(),
});

const graph = new StateGraph(State)
  .addNode("refineTopic", (state) => {
    return { topic: state.topic + " and cats" };
  })
  .addNode("generateJoke", (state) => {
    return { joke: `This is a joke about ${state.topic}` };
  })
  .addEdge(START, "refineTopic")
  .addEdge("refineTopic", "generateJoke")
  .addEdge("generateJoke", END)
  .compile();

for await (const chunk of await graph.stream(
  { topic: "ice cream" },
  // Set streamMode: "updates" to stream only the updates to the graph state after each node
  // Other stream modes are also available. See supported stream modes for details
  { streamMode: "updates" }
)) {
  console.log(chunk);
}
{'refineTopic': {'topic': 'ice cream and cats'}}
{'generateJoke': {'joke': 'This is a joke about ice cream and cats'}}

流式传输多种模式

您可以将一个数组作为 streamMode 参数传递,以同时流式传输多个模式。 流出的输出将是包含 [mode, chunk] 的元组,其中 mode 是流模式的名称,chunk 是该模式流出的数据。
for await (const [mode, chunk] of await graph.stream(inputs, {
  streamMode: ["updates", "custom"],
})) {
  console.log(chunk);
}

流图状态

使用 updatesvalues 流模式以流的形式执行时传输图的状态。
  • updates 流出图每一步之后的 状态更新
  • values 流出图每一步之后的 状态全值
import { StateGraph, START, END } from "@langchain/langgraph";
import * as z from "zod";

const State = z.object({
  topic: z.string(),
  joke: z.string(),
});

const graph = new StateGraph(State)
  .addNode("refineTopic", (state) => {
    return { topic: state.topic + " and cats" };
  })
  .addNode("generateJoke", (state) => {
    return { joke: `This is a joke about ${state.topic}` };
  })
  .addEdge(START, "refineTopic")
  .addEdge("refineTopic", "generateJoke")
  .addEdge("generateJoke", END)
  .compile();
使用此功能仅流式传输节点在每一步返回的状态更新。流式传输的输出包括节点的名称以及更新内容。
for await (const chunk of await graph.stream(
  { topic: "ice cream" },
  { streamMode: "updates" }
)) {
  console.log(chunk);
}

流子图输出

要将 子图 的输出包含在流式输出中,您可以在父图的 .stream() 方法中设置 subgraphs: true。这将流式传输来自父图和任何子图的输出。 输出将以元组的形式流式传输 [namespace, data],其中 namespace 是一个包含子图调用节点路径的元组,例如 ["parent_node:<task_id>", "child_node:<task_id>"]
for await (const chunk of await graph.stream(
  { foo: "foo" },
  {
    // Set subgraphs: true to stream outputs from subgraphs
    subgraphs: true,
    streamMode: "updates",
  }
)) {
  console.log(chunk);
}
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";

// Define subgraph
const SubgraphState = z.object({
  foo: z.string(), // note that this key is shared with the parent graph state
  bar: z.string(),
});

const subgraphBuilder = new StateGraph(SubgraphState)
  .addNode("subgraphNode1", (state) => {
    return { bar: "bar" };
  })
  .addNode("subgraphNode2", (state) => {
    return { foo: state.foo + state.bar };
  })
  .addEdge(START, "subgraphNode1")
  .addEdge("subgraphNode1", "subgraphNode2");
const subgraph = subgraphBuilder.compile();

// Define parent graph
const ParentState = z.object({
  foo: z.string(),
});

const builder = new StateGraph(ParentState)
  .addNode("node1", (state) => {
    return { foo: "hi! " + state.foo };
  })
  .addNode("node2", subgraph)
  .addEdge(START, "node1")
  .addEdge("node1", "node2");
const graph = builder.compile();

for await (const chunk of await graph.stream(
  { foo: "foo" },
  {
    streamMode: "updates",
    // Set subgraphs: true to stream outputs from subgraphs
    subgraphs: true,
  }
)) {
  console.log(chunk);
}
[[], {'node1': {'foo': 'hi! foo'}}]
[['node2:dfddc4ba-c3c5-6887-5012-a243b5b377c2'], {'subgraphNode1': {'bar': 'bar'}}]
[['node2:dfddc4ba-c3c5-6887-5012-a243b5b377c2'], {'subgraphNode2': {'foo': 'hi! foobar'}}]
[[], {'node2': {'foo': 'hi! foobar'}}]
注意:我们不仅接收节点更新,还接收命名空间,这些命名空间告诉我们我们从哪个图(或子图)进行流式传输。

调试

使用 debug 流式模式,在整个图执行过程中尽可能多地传输信息。流出的输出包括节点名称以及完整状态。
for await (const chunk of await graph.stream(
  { topic: "ice cream" },
  { streamMode: "debug" }
)) {
  console.log(chunk);
}

大型语言模型令牌

使用 messages 流式模式从您的图中的任何部分(包括节点、工具、子图或任务)逐个标记地流式传输大型语言模型(LLM)的输出。 流式输出的结果来自 messages 模式,是一个元组 [message_chunk, metadata],其中:
  • message_chunk:来自LLM的标记或消息段。
  • metadata:包含关于图节点和LLM调用的详细信息的字典。
如果您的大语言模型(LLM)无法作为LangChain集成使用,您可以使用 custom 模式来流式传输其输出。有关详细信息,请参阅 与任何LLM一起使用
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";

const MyState = z.object({
  topic: z.string(),
  joke: z.string().default(""),
});

const model = new ChatOpenAI({ model: "gpt-4o-mini" });

const callModel = async (state: z.infer<typeof MyState>) => {
  // Call the LLM to generate a joke about a topic
  // Note that message events are emitted even when the LLM is run using .invoke rather than .stream
  const modelResponse = await model.invoke([
    { role: "user", content: `Generate a joke about ${state.topic}` },
  ]);
  return { joke: modelResponse.content };
};

const graph = new StateGraph(MyState)
  .addNode("callModel", callModel)
  .addEdge(START, "callModel")
  .compile();

// The "messages" stream mode returns an iterator of tuples [messageChunk, metadata]
// where messageChunk is the token streamed by the LLM and metadata is a dictionary
// with information about the graph node where the LLM was called and other information
for await (const [messageChunk, metadata] of await graph.stream(
  { topic: "ice cream" },
  { streamMode: "messages" }
)) {
  if (messageChunk.content) {
    console.log(messageChunk.content + "|");
  }
}

通过LLM调用进行筛选

您可以关联 tags 与 LLM 调用来通过 LLM 调用过滤流出的标记。
import { ChatOpenAI } from "@langchain/openai";

// model1 is tagged with "joke"
const model1 = new ChatOpenAI({
  model: "gpt-4o-mini",
  tags: ['joke']
});
// model2 is tagged with "poem"
const model2 = new ChatOpenAI({
  model: "gpt-4o-mini",
  tags: ['poem']
});

const graph = // ... define a graph that uses these LLMs

// The streamMode is set to "messages" to stream LLM tokens
// The metadata contains information about the LLM invocation, including the tags
for await (const [msg, metadata] of await graph.stream(
  { topic: "cats" },
  { streamMode: "messages" }
)) {
  // Filter the streamed tokens by the tags field in the metadata to only include
  // the tokens from the LLM invocation with the "joke" tag
  if (metadata.tags?.includes("joke")) {
    console.log(msg.content + "|");
  }
}
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";

// The jokeModel is tagged with "joke"
const jokeModel = new ChatOpenAI({
  model: "gpt-4o-mini",
  tags: ["joke"]
});
// The poemModel is tagged with "poem"
const poemModel = new ChatOpenAI({
  model: "gpt-4o-mini",
  tags: ["poem"]
});

const State = z.object({
  topic: z.string(),
  joke: z.string(),
  poem: z.string(),
});

const graph = new StateGraph(State)
  .addNode("callModel", (state) => {
    const topic = state.topic;
    console.log("Writing joke...");

    const jokeResponse = await jokeModel.invoke([
      { role: "user", content: `Write a joke about ${topic}` }
    ]);

    console.log("\n\nWriting poem...");
    const poemResponse = await poemModel.invoke([
      { role: "user", content: `Write a short poem about ${topic}` }
    ]);

    return {
      joke: jokeResponse.content,
      poem: poemResponse.content
    };
  })
  .addEdge(START, "callModel")
  .compile();

// The streamMode is set to "messages" to stream LLM tokens
// The metadata contains information about the LLM invocation, including the tags
for await (const [msg, metadata] of await graph.stream(
  { topic: "cats" },
  { streamMode: "messages" }
)) {
  // Filter the streamed tokens by the tags field in the metadata to only include
  // the tokens from the LLM invocation with the "joke" tag
  if (metadata.tags?.includes("joke")) {
    console.log(msg.content + "|");
  }
}

按节点筛选

仅从特定节点流式传输令牌时,请使用 stream_mode="messages" 并通过流式元数据中的 langgraph_node 字段过滤输出:
// The "messages" stream mode returns a tuple of [messageChunk, metadata]
// where messageChunk is the token streamed by the LLM and metadata is a dictionary
// with information about the graph node where the LLM was called and other information
for await (const [msg, metadata] of await graph.stream(
  inputs,
  { streamMode: "messages" }
)) {
  // Filter the streamed tokens by the langgraph_node field in the metadata
  // to only include the tokens from the specified node
  if (msg.content && metadata.langgraph_node === "some_node_name") {
    // ...
  }
}
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";

const model = new ChatOpenAI({ model: "gpt-4o-mini" });

const State = z.object({
  topic: z.string(),
  joke: z.string(),
  poem: z.string(),
});

const graph = new StateGraph(State)
  .addNode("writeJoke", async (state) => {
    const topic = state.topic;
    const jokeResponse = await model.invoke([
      { role: "user", content: `Write a joke about ${topic}` }
    ]);
    return { joke: jokeResponse.content };
  })
  .addNode("writePoem", async (state) => {
    const topic = state.topic;
    const poemResponse = await model.invoke([
      { role: "user", content: `Write a short poem about ${topic}` }
    ]);
    return { poem: poemResponse.content };
  })
  // write both the joke and the poem concurrently
  .addEdge(START, "writeJoke")
  .addEdge(START, "writePoem")
  .compile();

// The "messages" stream mode returns a tuple of [messageChunk, metadata]
// where messageChunk is the token streamed by the LLM and metadata is a dictionary
// with information about the graph node where the LLM was called and other information
for await (const [msg, metadata] of await graph.stream(
  { topic: "cats" },
  { streamMode: "messages" }
)) {
  // Filter the streamed tokens by the langgraph_node field in the metadata
  // to only include the tokens from the writePoem node
  if (msg.content && metadata.langgraph_node === "writePoem") {
    console.log(msg.content + "|");
  }
}

流式自定义数据

要从LangGraph节点或工具内部发送自定义用户定义数据,请按照以下步骤操作:
  1. 使用 LangGraphRunnableConfig 中的 writer 参数来发射自定义数据。
  2. 在调用 .stream() 时设置 streamMode: "custom" 以获取流中的自定义数据。您可以组合多个模式(例如,["updates", "custom"]),但至少必须有一个是 "custom"
import { StateGraph, START, LangGraphRunnableConfig } from "@langchain/langgraph";
import * as z from "zod";

const State = z.object({
  query: z.string(),
  answer: z.string(),
});

const graph = new StateGraph(State)
  .addNode("node", async (state, config) => {
    // Use the writer to emit a custom key-value pair (e.g., progress update)
    config.writer({ custom_key: "Generating custom data inside node" });
    return { answer: "some data" };
  })
  .addEdge(START, "node")
  .compile();

const inputs = { query: "example" };

// Set streamMode: "custom" to receive the custom data in the stream
for await (const chunk of await graph.stream(inputs, { streamMode: "custom" })) {
  console.log(chunk);
}

与任何大型语言模型(LLM)一起使用

您可以使用 streamMode: "custom"任何 LLM API 流数据 —— 即使该 API 没有 实现 LangChain 聊天模型接口。 这使得您能够集成原始的LLM客户端或提供自身流式接口的外部服务,使LangGraph在定制设置中具有高度灵活性。
import { LangGraphRunnableConfig } from "@langchain/langgraph";

const callArbitraryModel = async (
  state: any,
  config: LangGraphRunnableConfig
) => {
  // Example node that calls an arbitrary model and streams the output
  // Assume you have a streaming client that yields chunks
  // Generate LLM tokens using your custom streaming client
  for await (const chunk of yourCustomStreamingClient(state.topic)) {
    // Use the writer to send custom data to the stream
    config.writer({ custom_llm_chunk: chunk });
  }
  return { result: "completed" };
};

const graph = new StateGraph(State)
  .addNode("callArbitraryModel", callArbitraryModel)
  // Add other nodes and edges as needed
  .compile();

// Set streamMode: "custom" to receive the custom data in the stream
for await (const chunk of await graph.stream(
  { topic: "cats" },
  { streamMode: "custom" }
)) {
  // The chunk will contain the custom data streamed from the llm
  console.log(chunk);
}
import { StateGraph, START, MessagesZodMeta, LangGraphRunnableConfig } from "@langchain/langgraph";
import { BaseMessage } from "@langchain/core/messages";
import { registry } from "@langchain/langgraph/zod";
import * as z from "zod";
import OpenAI from "openai";

const openaiClient = new OpenAI();
const modelName = "gpt-4o-mini";

async function* streamTokens(modelName: string, messages: any[]) {
  const response = await openaiClient.chat.completions.create({
    messages,
    model: modelName,
    stream: true,
  });

  let role: string | null = null;
  for await (const chunk of response) {
    const delta = chunk.choices[0]?.delta;

    if (delta?.role) {
      role = delta.role;
    }

    if (delta?.content) {
      yield { role, content: delta.content };
    }
  }
}

// this is our tool
const getItems = tool(
  async (input, config: LangGraphRunnableConfig) => {
    let response = "";
    for await (const msgChunk of streamTokens(
      modelName,
      [
        {
          role: "user",
          content: `Can you tell me what kind of items i might find in the following place: '${input.place}'. List at least 3 such items separating them by a comma. And include a brief description of each item.`,
        },
      ]
    )) {
      response += msgChunk.content;
      config.writer?.(msgChunk);
    }
    return response;
  },
  {
    name: "get_items",
    description: "Use this tool to list items one might find in a place you're asked about.",
    schema: z.object({
      place: z.string().describe("The place to look up items for."),
    }),
  }
);

const State = z.object({
  messages: z
    .array(z.custom<BaseMessage>())
    .register(registry, MessagesZodMeta),
});

const graph = new StateGraph(State)
  // this is the tool-calling graph node
  .addNode("callTool", async (state) => {
    const aiMessage = state.messages.at(-1);
    const toolCall = aiMessage.tool_calls?.at(-1);

    const functionName = toolCall?.function?.name;
    if (functionName !== "get_items") {
      throw new Error(`Tool ${functionName} not supported`);
    }

    const functionArguments = toolCall?.function?.arguments;
    const args = JSON.parse(functionArguments);

    const functionResponse = await getItems.invoke(args);
    const toolMessage = {
      tool_call_id: toolCall.id,
      role: "tool",
      name: functionName,
      content: functionResponse,
    };
    return { messages: [toolMessage] };
  })
  .addEdge(START, "callTool")
  .compile();
让我们使用一个包含工具调用的 AIMessage 来调用图。
const inputs = {
  messages: [
    {
      content: null,
      role: "assistant",
      tool_calls: [
        {
          id: "1",
          function: {
            arguments: '{"place":"bedroom"}',
            name: "get_items",
          },
          type: "function",
        }
      ],
    }
  ]
};

for await (const chunk of await graph.stream(
  inputs,
  { streamMode: "custom" }
)) {
  console.log(chunk.content + "|");
}

禁用特定聊天模型的流式传输

如果您的应用程序混合了支持流式传输和不支持流式传输的模型,您可能需要显式禁用不支持流式传输的模型的流式传输功能。 在初始化模型时设置 streaming: false
import { ChatOpenAI } from "@langchain/openai";

const model = new ChatOpenAI({
  model: "o1-preview",
  // Set streaming: false to disable streaming for the chat model
  streaming: false,
});