- 构建多智能体系统
- 在多个图中重用一组节点
- 分布式开发:当您希望不同的团队独立地工作在图的各个不同部分时,您可以定义每个部分为一个子图,只要尊重子图接口(输入和输出模式),就可以构建父图,而无需了解子图的任何细节
设置
npm install @langchain/langgraph
从节点调用图
实现子图的一种简单方法是在另一个图的节点内部调用图。在这种情况下,子图可以与父图具有完全不同的模式(没有共享键)。例如,您可能希望为多智能体系统中的每个智能体保留一个私有消息历史记录。 如果您的应用程序是这样的情况,您需要定义一个调用子图的节点函数。此函数需要在调用子图之前将输入(父)状态转换为子图状态,并在返回节点状态更新之前将结果转换回父状态。import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";
const SubgraphState = z.object({
bar: z.string(),
});
// Subgraph
const subgraphBuilder = new StateGraph(SubgraphState)
.addNode("subgraphNode1", (state) => {
return { bar: "hi! " + state.bar };
})
.addEdge(START, "subgraphNode1");
const subgraph = subgraphBuilder.compile();
// Parent graph
const State = z.object({
foo: z.string(),
});
// Transform the state to the subgraph state and back
const builder = new StateGraph(State)
.addNode("node1", async (state) => {
const subgraphOutput = await subgraph.invoke({ bar: state.foo });
return { foo: subgraphOutput.bar };
})
.addEdge(START, "node1");
const graph = builder.compile();
Full example: different state schemas
Full example: different state schemas
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";
// Define subgraph
const SubgraphState = z.object({
// note that none of these keys are shared with the parent graph state
bar: z.string(),
baz: z.string(),
});
const subgraphBuilder = new StateGraph(SubgraphState)
.addNode("subgraphNode1", (state) => {
return { baz: "baz" };
})
.addNode("subgraphNode2", (state) => {
return { bar: state.bar + state.baz };
})
.addEdge(START, "subgraphNode1")
.addEdge("subgraphNode1", "subgraphNode2");
const subgraph = subgraphBuilder.compile();
// Define parent graph
const ParentState = z.object({
foo: z.string(),
});
const builder = new StateGraph(ParentState)
.addNode("node1", (state) => {
return { foo: "hi! " + state.foo };
})
.addNode("node2", async (state) => {
const response = await subgraph.invoke({ bar: state.foo });
return { foo: response.bar };
})
.addEdge(START, "node1")
.addEdge("node1", "node2");
const graph = builder.compile();
for await (const chunk of await graph.stream(
{ foo: "foo" },
{ subgraphs: true }
)) {
console.log(chunk);
}
- 将状态转换为子图状态
- 将响应转换回父状态
[[], { node1: { foo: 'hi! foo' } }]
[['node2:9c36dd0f-151a-cb42-cbad-fa2f851f9ab7'], { subgraphNode1: { baz: 'baz' } }]
[['node2:9c36dd0f-151a-cb42-cbad-fa2f851f9ab7'], { subgraphNode2: { bar: 'hi! foobaz' } }]
[[], { node2: { foo: 'hi! foobaz' } }]
Full example: different state schemas (two levels of subgraphs)
Full example: different state schemas (two levels of subgraphs)
这是一个包含两层子图示例:父级 -> 子级 -> 孙级。
import { StateGraph, START, END } from "@langchain/langgraph";
import * as z from "zod";
// Grandchild graph
const GrandChildState = z.object({
myGrandchildKey: z.string(),
});
const grandchild = new StateGraph(GrandChildState)
.addNode("grandchild1", (state) => {
// NOTE: child or parent keys will not be accessible here
return { myGrandchildKey: state.myGrandchildKey + ", how are you" };
})
.addEdge(START, "grandchild1")
.addEdge("grandchild1", END);
const grandchildGraph = grandchild.compile();
// Child graph
const ChildState = z.object({
myChildKey: z.string(),
});
const child = new StateGraph(ChildState)
.addNode("child1", async (state) => {
// NOTE: parent or grandchild keys won't be accessible here
const grandchildGraphInput = { myGrandchildKey: state.myChildKey };
const grandchildGraphOutput = await grandchildGraph.invoke(grandchildGraphInput);
return { myChildKey: grandchildGraphOutput.myGrandchildKey + " today?" };
})
.addEdge(START, "child1")
.addEdge("child1", END);
const childGraph = child.compile();
// Parent graph
const ParentState = z.object({
myKey: z.string(),
});
const parent = new StateGraph(ParentState)
.addNode("parent1", (state) => {
// NOTE: child or grandchild keys won't be accessible here
return { myKey: "hi " + state.myKey };
})
.addNode("child", async (state) => {
const childGraphInput = { myChildKey: state.myKey };
const childGraphOutput = await childGraph.invoke(childGraphInput);
return { myKey: childGraphOutput.myChildKey };
})
.addNode("parent2", (state) => {
return { myKey: state.myKey + " bye!" };
})
.addEdge(START, "parent1")
.addEdge("parent1", "child")
.addEdge("child", "parent2")
.addEdge("parent2", END);
const parentGraph = parent.compile();
for await (const chunk of await parentGraph.stream(
{ myKey: "Bob" },
{ subgraphs: true }
)) {
console.log(chunk);
}
- 我们将状态从子状态通道(
myChildKey)转换为孙状态通道(myGrandchildKey) - 我们将状态从孙状态通道(
myGrandchildKey)转换回子状态通道(myChildKey) - 我们在这里传递一个函数,而不是仅仅编译后的图(
grandchildGraph) - 我们将状态从父状态通道(
myKey)转换为子状态通道(myChildKey) - 我们将状态从子状态通道(
myChildKey)转换回父状态通道(myKey) - 我们在这里传递一个函数,而不是仅仅一个编译后的图(
childGraph)
[[], { parent1: { myKey: 'hi Bob' } }]
[['child:2e26e9ce-602f-862c-aa66-1ea5a4655e3b', 'child1:781bb3b1-3971-84ce-810b-acf819a03f9c'], { grandchild1: { myGrandchildKey: 'hi Bob, how are you' } }]
[['child:2e26e9ce-602f-862c-aa66-1ea5a4655e3b'], { child1: { myChildKey: 'hi Bob, how are you today?' } }]
[[], { child: { myKey: 'hi Bob, how are you today?' } }]
[[], { parent2: { myKey: 'hi Bob, how are you today? bye!' } }]
添加一个图作为节点
当父图和子图可以通过共享状态键(通道)在schema中进行通信时,您可以将一个图作为node添加到另一个图中。例如,在multi-agent系统中,智能体通常通过共享消息键进行通信。
- 定义子图工作流程(以下示例中的
subgraphBuilder)并编译它 - 在定义父图工作流程时,将编译后的子图传递给
.addNode方法
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";
const State = z.object({
foo: z.string(),
});
// Subgraph
const subgraphBuilder = new StateGraph(State)
.addNode("subgraphNode1", (state) => {
return { foo: "hi! " + state.foo };
})
.addEdge(START, "subgraphNode1");
const subgraph = subgraphBuilder.compile();
// Parent graph
const builder = new StateGraph(State)
.addNode("node1", subgraph)
.addEdge(START, "node1");
const graph = builder.compile();
Full example: shared state schemas
Full example: shared state schemas
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";
// Define subgraph
const SubgraphState = z.object({
foo: z.string(),
bar: z.string(),
});
const subgraphBuilder = new StateGraph(SubgraphState)
.addNode("subgraphNode1", (state) => {
return { bar: "bar" };
})
.addNode("subgraphNode2", (state) => {
// note that this node is using a state key ('bar') that is only available in the subgraph
// and is sending update on the shared state key ('foo')
return { foo: state.foo + state.bar };
})
.addEdge(START, "subgraphNode1")
.addEdge("subgraphNode1", "subgraphNode2");
const subgraph = subgraphBuilder.compile();
// Define parent graph
const ParentState = z.object({
foo: z.string(),
});
const builder = new StateGraph(ParentState)
.addNode("node1", (state) => {
return { foo: "hi! " + state.foo };
})
.addNode("node2", subgraph)
.addEdge(START, "node1")
.addEdge("node1", "node2");
const graph = builder.compile();
for await (const chunk of await graph.stream({ foo: "foo" })) {
console.log(chunk);
}
- 此密钥与父图状态共享
- 此密钥仅属于
SubgraphState,对父图不可见
{ node1: { foo: 'hi! foo' } }
{ node2: { foo: 'hi! foobar' } }
添加持久化
您只需在编译父图时提供检查点器。LangGraph会自动将检查点器传播到子子图中。import { StateGraph, START, MemorySaver } from "@langchain/langgraph";
import * as z from "zod";
const State = z.object({
foo: z.string(),
});
// Subgraph
const subgraphBuilder = new StateGraph(State)
.addNode("subgraphNode1", (state) => {
return { foo: state.foo + "bar" };
})
.addEdge(START, "subgraphNode1");
const subgraph = subgraphBuilder.compile();
// Parent graph
const builder = new StateGraph(State)
.addNode("node1", subgraph)
.addEdge(START, "node1");
const checkpointer = new MemorySaver();
const graph = builder.compile({ checkpointer });
const subgraphBuilder = new StateGraph(...)
const subgraph = subgraphBuilder.compile({ checkpointer: true });
查看子图状态
当您启用 持久化 时,您可以通过适当的方法 检查图状态(检查点)。要查看子图状态,您可以使用子图选项。 您可以通过graph.getState(config) 检查图状态。要查看子图状态,您可以使用 graph.getState(config, { subgraphs: true })。
仅在中断时可用
子图状态只能在子图被中断时查看。一旦您恢复图,您将无法访问子图状态。
View interrupted subgraph state
View interrupted subgraph state
import { StateGraph, START, MemorySaver, interrupt, Command } from "@langchain/langgraph";
import * as z from "zod";
const State = z.object({
foo: z.string(),
});
// Subgraph
const subgraphBuilder = new StateGraph(State)
.addNode("subgraphNode1", (state) => {
const value = interrupt("Provide value:");
return { foo: state.foo + value };
})
.addEdge(START, "subgraphNode1");
const subgraph = subgraphBuilder.compile();
// Parent graph
const builder = new StateGraph(State)
.addNode("node1", subgraph)
.addEdge(START, "node1");
const checkpointer = new MemorySaver();
const graph = builder.compile({ checkpointer });
const config = { configurable: { thread_id: "1" } };
await graph.invoke({ foo: "" }, config);
const parentState = await graph.getState(config);
const subgraphState = (await graph.getState(config, { subgraphs: true })).tasks[0].state;
// resume the subgraph
await graph.invoke(new Command({ resume: "bar" }), config);
流子图输出
要将子图输出包含在流输出中,您可以在父图的流方法中设置子图选项。这将流式传输来自父图和任何子图的输出。for await (const chunk of await graph.stream(
{ foo: "foo" },
{
subgraphs: true,
streamMode: "updates",
}
)) {
console.log(chunk);
}
subgraphs: true 设置为从子图中流式传输输出。
Stream from subgraphs
Stream from subgraphs
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";
// Define subgraph
const SubgraphState = z.object({
foo: z.string(),
bar: z.string(),
});
const subgraphBuilder = new StateGraph(SubgraphState)
.addNode("subgraphNode1", (state) => {
return { bar: "bar" };
})
.addNode("subgraphNode2", (state) => {
// note that this node is using a state key ('bar') that is only available in the subgraph
// and is sending update on the shared state key ('foo')
return { foo: state.foo + state.bar };
})
.addEdge(START, "subgraphNode1")
.addEdge("subgraphNode1", "subgraphNode2");
const subgraph = subgraphBuilder.compile();
// Define parent graph
const ParentState = z.object({
foo: z.string(),
});
const builder = new StateGraph(ParentState)
.addNode("node1", (state) => {
return { foo: "hi! " + state.foo };
})
.addNode("node2", subgraph)
.addEdge(START, "node1")
.addEdge("node1", "node2");
const graph = builder.compile();
for await (const chunk of await graph.stream(
{ foo: "foo" },
{
streamMode: "updates",
subgraphs: true,
}
)) {
console.log(chunk);
}
- 将
subgraphs: true设置为从子图中流式传输输出。
[[], { node1: { foo: 'hi! foo' } }]
[['node2:e58e5673-a661-ebb0-70d4-e298a7fc28b7'], { subgraphNode1: { bar: 'bar' } }]
[['node2:e58e5673-a661-ebb0-70d4-e298a7fc28b7'], { subgraphNode2: { foo: 'hi! foobar' } }]
[[], { node2: { foo: 'hi! foobar' } }]