有关功能API的概念信息,请参阅功能API。
创建一个简单的流程
当定义一个entrypoint 时,输入仅限于函数的第一个参数。要传递多个输入,可以使用字典。
const checkpointer = new MemorySaver();
const myWorkflow = entrypoint(
{ checkpointer, name: "myWorkflow" },
async (inputs: { value: number; anotherValue: number }) => {
const value = inputs.value;
const anotherValue = inputs.anotherValue;
// ...
}
);
await myWorkflow.invoke({ value: 1, anotherValue: 2 });
Extended example: simple workflow
Extended example: simple workflow
import { v4 as uuidv4 } from "uuid";
import { entrypoint, task, MemorySaver } from "@langchain/langgraph";
// Task that checks if a number is even
const isEven = task("isEven", async (number: number) => {
return number % 2 === 0;
});
// Task that formats a message
const formatMessage = task("formatMessage", async (isEven: boolean) => {
return isEven ? "The number is even." : "The number is odd.";
});
// Create a checkpointer for persistence
const checkpointer = new MemorySaver();
const workflow = entrypoint(
{ checkpointer, name: "workflow" },
async (inputs: { number: number }) => {
// Simple workflow to classify a number
const even = await isEven(inputs.number);
return await formatMessage(even);
}
);
// Run the workflow with a unique thread ID
const config = { configurable: { thread_id: uuidv4() } };
const result = await workflow.invoke({ number: 7 }, config);
console.log(result);
Extended example: Compose an essay with an LLM
Extended example: Compose an essay with an LLM
本示例演示了如何从语法上使用
@task 和 @entrypoint 装饰器。鉴于提供了检查点,工作流程的结果将保存在检查点中。import { v4 as uuidv4 } from "uuid";
import { ChatOpenAI } from "@langchain/openai";
import { entrypoint, task, MemorySaver } from "@langchain/langgraph";
const model = new ChatOpenAI({ model: "gpt-3.5-turbo" });
// Task: generate essay using an LLM
const composeEssay = task("composeEssay", async (topic: string) => {
// Generate an essay about the given topic
const response = await model.invoke([
{ role: "system", content: "You are a helpful assistant that writes essays." },
{ role: "user", content: `Write an essay about ${topic}.` }
]);
return response.content as string;
});
// Create a checkpointer for persistence
const checkpointer = new MemorySaver();
const workflow = entrypoint(
{ checkpointer, name: "workflow" },
async (topic: string) => {
// Simple workflow that generates an essay with an LLM
return await composeEssay(topic);
}
);
// Execute the workflow
const config = { configurable: { thread_id: uuidv4() } };
const result = await workflow.invoke("the history of flight", config);
console.log(result);
并行执行
任务可以通过并发调用并等待结果来并行执行。这对于提高IO密集型任务(例如,调用LLMs的API)的性能非常有用。const addOne = task("addOne", async (number: number) => {
return number + 1;
});
const graph = entrypoint(
{ checkpointer, name: "graph" },
async (numbers: number[]) => {
return await Promise.all(numbers.map(addOne));
}
);
Extended example: parallel LLM calls
Extended example: parallel LLM calls
本示例演示了如何使用 此示例使用LangGraph的并发模型来提高执行时间,尤其是在涉及I/O操作如LLM补全的任务中。
@task 并行运行多个LLM调用。每个调用生成一个不同主题的段落,并将结果合并为单个文本输出。import { v4 as uuidv4 } from "uuid";
import { ChatOpenAI } from "@langchain/openai";
import { entrypoint, task, MemorySaver } from "@langchain/langgraph";
// Initialize the LLM model
const model = new ChatOpenAI({ model: "gpt-3.5-turbo" });
// Task that generates a paragraph about a given topic
const generateParagraph = task("generateParagraph", async (topic: string) => {
const response = await model.invoke([
{ role: "system", content: "You are a helpful assistant that writes educational paragraphs." },
{ role: "user", content: `Write a paragraph about ${topic}.` }
]);
return response.content as string;
});
// Create a checkpointer for persistence
const checkpointer = new MemorySaver();
const workflow = entrypoint(
{ checkpointer, name: "workflow" },
async (topics: string[]) => {
// Generates multiple paragraphs in parallel and combines them
const paragraphs = await Promise.all(topics.map(generateParagraph));
return paragraphs.join("\n\n");
}
);
// Run the workflow
const config = { configurable: { thread_id: uuidv4() } };
const result = await workflow.invoke(["quantum computing", "climate change", "history of aviation"], config);
console.log(result);
调用图
功能API 和 图API 可以在同一个应用程序中一起使用,因为它们共享相同的底层运行时。import { entrypoint } from "@langchain/langgraph";
import { StateGraph } from "@langchain/langgraph";
const builder = new StateGraph(/* ... */);
// ...
const someGraph = builder.compile();
const someWorkflow = entrypoint(
{ name: "someWorkflow" },
async (someInput: Record<string, any>) => {
// Call a graph defined using the graph API
const result1 = await someGraph.invoke(/* ... */);
// Call another graph defined using the graph API
const result2 = await anotherGraph.invoke(/* ... */);
return {
result1,
result2,
};
}
);
Extended example: calling a simple graph from the functional API
Extended example: calling a simple graph from the functional API
import { v4 as uuidv4 } from "uuid";
import { entrypoint, MemorySaver } from "@langchain/langgraph";
import { StateGraph } from "@langchain/langgraph";
import * as z from "zod";
// Define the shared state type
const State = z.object({
foo: z.number(),
});
// Build the graph using the Graph API
const builder = new StateGraph(State)
.addNode("double", (state) => {
return { foo: state.foo * 2 };
})
.addEdge("__start__", "double");
const graph = builder.compile();
// Define the functional API workflow
const checkpointer = new MemorySaver();
const workflow = entrypoint(
{ checkpointer, name: "workflow" },
async (x: number) => {
const result = await graph.invoke({ foo: x });
return { bar: result.foo };
}
);
// Execute the workflow
const config = { configurable: { thread_id: uuidv4() } };
console.log(await workflow.invoke(5, config)); // Output: { bar: 10 }
调用其他入口点
您可以在一个 entrypoint 或 task 内调用其他 entrypoints。// Will automatically use the checkpointer from the parent entrypoint
const someOtherWorkflow = entrypoint(
{ name: "someOtherWorkflow" },
async (inputs: { value: number }) => {
return inputs.value;
}
);
const myWorkflow = entrypoint(
{ checkpointer, name: "myWorkflow" },
async (inputs: { value: number }) => {
const value = await someOtherWorkflow.invoke({ value: 1 });
return value;
}
);
Extended example: calling another entrypoint
Extended example: calling another entrypoint
import { v4 as uuidv4 } from "uuid";
import { entrypoint, MemorySaver } from "@langchain/langgraph";
// Initialize a checkpointer
const checkpointer = new MemorySaver();
// A reusable sub-workflow that multiplies a number
const multiply = entrypoint(
{ name: "multiply" },
async (inputs: { a: number; b: number }) => {
return inputs.a * inputs.b;
}
);
// Main workflow that invokes the sub-workflow
const main = entrypoint(
{ checkpointer, name: "main" },
async (inputs: { x: number; y: number }) => {
const result = await multiply.invoke({ a: inputs.x, b: inputs.y });
return { product: result };
}
);
// Execute the main workflow
const config = { configurable: { thread_id: uuidv4() } };
console.log(await main.invoke({ x: 6, y: 7 }, config)); // Output: { product: 42 }
流媒体
功能API 使用与 图API 相同的流机制。请阅读流指南部分以获取更多详细信息。 示例:使用流式API同时流式传输更新和自定义数据。import {
entrypoint,
MemorySaver,
LangGraphRunnableConfig,
} from "@langchain/langgraph";
const checkpointer = new MemorySaver();
const main = entrypoint(
{ checkpointer, name: "main" },
async (
inputs: { x: number },
config: LangGraphRunnableConfig
): Promise<number> => {
config.writer?.("Started processing");
const result = inputs.x * 2;
config.writer?.(`Result is ${result}`);
return result;
}
);
const config = { configurable: { thread_id: "abc" } };
for await (const [mode, chunk] of await main.stream(
{ x: 5 },
{ streamMode: ["custom", "updates"], ...config }
)) {
console.log(`${mode}: ${JSON.stringify(chunk)}`);
}
- 在计算开始前发出自定义数据。
- 计算结果后发出另一条自定义消息。
- 使用
.stream()处理流式输出。 - 指定要使用的流模式。
updates: {"addOne": 2}
updates: {"addTwo": 3}
custom: "hello"
custom: "world"
updates: {"main": 5}
重试策略
import {
MemorySaver,
entrypoint,
task,
RetryPolicy,
} from "@langchain/langgraph";
// This variable is just used for demonstration purposes to simulate a network failure.
// It's not something you will have in your actual code.
let attempts = 0;
// Let's configure the RetryPolicy to retry on ValueError.
// The default RetryPolicy is optimized for retrying specific network errors.
const retryPolicy: RetryPolicy = { retryOn: (error) => error instanceof Error };
const getInfo = task(
{
name: "getInfo",
retry: retryPolicy,
},
() => {
attempts += 1;
if (attempts < 2) {
throw new Error("Failure");
}
return "OK";
}
);
const checkpointer = new MemorySaver();
const main = entrypoint(
{ checkpointer, name: "main" },
async (inputs: Record<string, any>) => {
return await getInfo();
}
);
const config = {
configurable: {
thread_id: "1",
},
};
await main.invoke({ any_input: "foobar" }, config);
'OK'
缓存任务
import {
InMemoryCache,
entrypoint,
task,
CachePolicy,
} from "@langchain/langgraph";
const slowAdd = task(
{
name: "slowAdd",
cache: { ttl: 120 },
},
async (x: number) => {
await new Promise((resolve) => setTimeout(resolve, 1000));
return x * 2;
}
);
const main = entrypoint(
{ cache: new InMemoryCache(), name: "main" },
async (inputs: { x: number }) => {
const result1 = await slowAdd(inputs.x);
const result2 = await slowAdd(inputs.x);
return { result1, result2 };
}
);
for await (const chunk of await main.stream(
{ x: 5 },
{ streamMode: "updates" }
)) {
console.log(chunk);
}
//> { slowAdd: 10 }
//> { slowAdd: 10, '__metadata__': { cached: true } }
//> { main: { result1: 10, result2: 10 } }
ttl 以秒为单位指定。在此时间后缓存将失效。
错误后恢复
import { entrypoint, task, MemorySaver } from "@langchain/langgraph";
// This variable is just used for demonstration purposes to simulate a network failure.
// It's not something you will have in your actual code.
let attempts = 0;
const getInfo = task("getInfo", async () => {
/**
* Simulates a task that fails once before succeeding.
* Throws an exception on the first attempt, then returns "OK" on subsequent tries.
*/
attempts += 1;
if (attempts < 2) {
throw new Error("Failure"); // Simulate a failure on the first attempt
}
return "OK";
});
// Initialize an in-memory checkpointer for persistence
const checkpointer = new MemorySaver();
const slowTask = task("slowTask", async () => {
/**
* Simulates a slow-running task by introducing a 1-second delay.
*/
await new Promise((resolve) => setTimeout(resolve, 1000));
return "Ran slow task.";
});
const main = entrypoint(
{ checkpointer, name: "main" },
async (inputs: Record<string, any>) => {
/**
* Main workflow function that runs the slowTask and getInfo tasks sequentially.
*
* Parameters:
* - inputs: Record<string, any> containing workflow input values.
*
* The workflow first executes `slowTask` and then attempts to execute `getInfo`,
* which will fail on the first invocation.
*/
const slowTaskResult = await slowTask(); // Blocking call to slowTask
await getInfo(); // Exception will be raised here on the first attempt
return slowTaskResult;
}
);
// Workflow execution configuration with a unique thread identifier
const config = {
configurable: {
thread_id: "1", // Unique identifier to track workflow execution
},
};
// This invocation will take ~1 second due to the slowTask execution
try {
// First invocation will raise an exception due to the `getInfo` task failing
await main.invoke({ any_input: "foobar" }, config);
} catch (err) {
// Handle the failure gracefully
}
slowTask,因为其结果已经保存在检查点中。
await main.invoke(null, config);
'Ran slow task.'
人工参与循环
功能API支持使用interrupt 函数和 Command 基本操作实现 人机交互 工作流程。
基本人机交互工作流程
我们将创建三个任务:- 添加
"bar"。 - 暂停等待人工输入。恢复时,添加人工输入。
- 添加
"qux"。
import { entrypoint, task, interrupt, Command } from "@langchain/langgraph";
const step1 = task("step1", async (inputQuery: string) => {
// Append bar
return `${inputQuery} bar`;
});
const humanFeedback = task("humanFeedback", async (inputQuery: string) => {
// Append user input
const feedback = interrupt(`Please provide feedback: ${inputQuery}`);
return `${inputQuery} ${feedback}`;
});
const step3 = task("step3", async (inputQuery: string) => {
// Append qux
return `${inputQuery} qux`;
});
import { MemorySaver } from "@langchain/langgraph";
const checkpointer = new MemorySaver();
const graph = entrypoint(
{ checkpointer, name: "graph" },
async (inputQuery: string) => {
const result1 = await step1(inputQuery);
const result2 = await humanFeedback(result1);
const result3 = await step3(result2);
return result3;
}
);
step_1——被保留,因此它们在执行 interrupt 后不会再次运行。
让我们发送一个查询字符串:
const config = { configurable: { thread_id: "1" } };
for await (const event of await graph.stream("foo", config)) {
console.log(event);
console.log("\n");
}
step_1 后暂停了 interrupt。中断提供了继续运行的指令。要继续,我们发出一个包含 human_feedback 任务所需数据的 命令。
// Continue execution
for await (const event of await graph.stream(
new Command({ resume: "baz" }),
config
)) {
console.log(event);
console.log("\n");
}
审查工具调用
在执行前审查工具调用,我们添加了一个review_tool_call 函数,该函数调用 interrupt。当调用此函数时,执行将暂停,直到我们发出继续执行的命令。
给定一个工具调用,我们的函数将 interrupt 以供人工审查。在那个阶段,我们可以选择:
- 接受工具调用
- 修改工具调用并继续
- 生成自定义工具消息(例如,指示模型重新格式化其工具调用)
import { ToolCall } from "@langchain/core/messages/tool";
import { ToolMessage } from "@langchain/core/messages";
function reviewToolCall(toolCall: ToolCall): ToolCall | ToolMessage {
// Review a tool call, returning a validated version
const humanReview = interrupt({
question: "Is this correct?",
tool_call: toolCall,
});
const reviewAction = humanReview.action;
const reviewData = humanReview.data;
if (reviewAction === "continue") {
return toolCall;
} else if (reviewAction === "update") {
const updatedToolCall = { ...toolCall, args: reviewData };
return updatedToolCall;
} else if (reviewAction === "feedback") {
return new ToolMessage({
content: reviewData,
name: toolCall.name,
tool_call_id: toolCall.id,
});
}
throw new Error(`Unknown review action: ${reviewAction}`);
}
ToolMessage]。先前任务的结果——在这种情况下是初始模型调用——被保留,因此它们不会在interrupt之后再次运行。
import {
MemorySaver,
entrypoint,
interrupt,
Command,
addMessages,
} from "@langchain/langgraph";
import { ToolMessage, AIMessage, BaseMessage } from "@langchain/core/messages";
const checkpointer = new MemorySaver();
const agent = entrypoint(
{ checkpointer, name: "agent" },
async (
messages: BaseMessage[],
previous?: BaseMessage[]
): Promise<BaseMessage> => {
if (previous !== undefined) {
messages = addMessages(previous, messages);
}
let modelResponse = await callModel(messages);
while (true) {
if (!modelResponse.tool_calls?.length) {
break;
}
// Review tool calls
const toolResults: ToolMessage[] = [];
const toolCalls: ToolCall[] = [];
for (let i = 0; i < modelResponse.tool_calls.length; i++) {
const review = reviewToolCall(modelResponse.tool_calls[i]);
if (review instanceof ToolMessage) {
toolResults.push(review);
} else {
// is a validated tool call
toolCalls.push(review);
if (review !== modelResponse.tool_calls[i]) {
modelResponse.tool_calls[i] = review; // update message
}
}
}
// Execute remaining tool calls
const remainingToolResults = await Promise.all(
toolCalls.map((toolCall) => callTool(toolCall))
);
// Append to message list
messages = addMessages(messages, [
modelResponse,
...toolResults,
...remainingToolResults,
]);
// Call model again
modelResponse = await callModel(messages);
}
// Generate final response
messages = addMessages(messages, modelResponse);
return entrypoint.final({ value: modelResponse, save: messages });
}
);
短期记忆
短期记忆允许在不同 thread id 的相同 调用 之间存储信息。有关详细信息,请参阅 短期记忆。管理检查点
您可以查看和删除检查点存储的信息。查看线程状态
const config = {
configurable: {
thread_id: "1",
// optionally provide an ID for a specific checkpoint,
// otherwise the latest checkpoint is shown
// checkpoint_id: "1f029ca3-1f5b-6704-8004-820c16b69a5a"
},
};
await graph.getState(config);
StateSnapshot {
values: {
messages: [
HumanMessage { content: "hi! I'm bob" },
AIMessage { content: "Hi Bob! How are you doing today?" },
HumanMessage { content: "what's my name?" },
AIMessage { content: "Your name is Bob." }
]
},
next: [],
config: { configurable: { thread_id: '1', checkpoint_ns: '', checkpoint_id: '1f029ca3-1f5b-6704-8004-820c16b69a5a' } },
metadata: {
source: 'loop',
writes: { call_model: { messages: AIMessage { content: "Your name is Bob." } } },
step: 4,
parents: {},
thread_id: '1'
},
createdAt: '2025-05-05T16:01:24.680462+00:00',
parentConfig: { configurable: { thread_id: '1', checkpoint_ns: '', checkpoint_id: '1f029ca3-1790-6b0a-8003-baf965b6a38f' } },
tasks: [],
interrupts: []
}
查看线程历史
const config = {
configurable: {
thread_id: "1",
},
};
const history = [];
for await (const state of graph.getStateHistory(config)) {
history.push(state);
}
[
StateSnapshot {
values: {
messages: [
HumanMessage { content: "hi! I'm bob" },
AIMessage { content: "Hi Bob! How are you doing today? Is there anything I can help you with?" },
HumanMessage { content: "what's my name?" },
AIMessage { content: "Your name is Bob." }
]
},
next: [],
config: { configurable: { thread_id: '1', checkpoint_ns: '', checkpoint_id: '1f029ca3-1f5b-6704-8004-820c16b69a5a' } },
metadata: { source: 'loop', writes: { call_model: { messages: AIMessage { content: "Your name is Bob." } } }, step: 4, parents: {}, thread_id: '1' },
createdAt: '2025-05-05T16:01:24.680462+00:00',
parentConfig: { configurable: { thread_id: '1', checkpoint_ns: '', checkpoint_id: '1f029ca3-1790-6b0a-8003-baf965b6a38f' } },
tasks: [],
interrupts: []
},
// ... more state snapshots
]
将返回值与保存的值解耦
使用entrypoint.final 将返回给调用者的内容与保存在检查点中的内容解耦。这在以下情况下很有用:
- 您希望返回一个计算结果(例如,摘要或状态),但保存不同的内部值以供下一次调用使用。
- 您需要控制下一次运行时传递给前一个参数的内容。
import { entrypoint, MemorySaver } from "@langchain/langgraph";
const checkpointer = new MemorySaver();
const accumulate = entrypoint(
{ checkpointer, name: "accumulate" },
async (n: number, previous?: number) => {
const prev = previous || 0;
const total = prev + n;
// Return the *previous* value to the caller but save the *new* total to the checkpoint.
return entrypoint.final({ value: prev, save: total });
}
);
const config = { configurable: { thread_id: "my-thread" } };
console.log(await accumulate.invoke(1, config)); // 0
console.log(await accumulate.invoke(2, config)); // 1
console.log(await accumulate.invoke(3, config)); // 3
聊天机器人示例
一个使用功能API和InMemorySaver检查点的简单聊天机器人示例。该机器人能够记住之前的对话并从上次停止的地方继续。
import { BaseMessage } from "@langchain/core/messages";
import {
addMessages,
entrypoint,
task,
MemorySaver,
} from "@langchain/langgraph";
import { ChatAnthropic } from "@langchain/anthropic";
const model = new ChatAnthropic({ model: "claude-sonnet-4-5" });
const callModel = task(
"callModel",
async (messages: BaseMessage[]): Promise<BaseMessage> => {
const response = await model.invoke(messages);
return response;
}
);
const checkpointer = new MemorySaver();
const workflow = entrypoint(
{ checkpointer, name: "workflow" },
async (
inputs: BaseMessage[],
previous?: BaseMessage[]
): Promise<BaseMessage> => {
let messages = inputs;
if (previous) {
messages = addMessages(previous, inputs);
}
const response = await callModel(messages);
return entrypoint.final({
value: response,
save: addMessages(messages, response),
});
}
);
const config = { configurable: { thread_id: "1" } };
const inputMessage = { role: "user", content: "hi! I'm bob" };
for await (const chunk of await workflow.stream([inputMessage], {
...config,
streamMode: "values",
})) {
console.log(chunk.content);
}
const inputMessage2 = { role: "user", content: "what's my name?" };
for await (const chunk of await workflow.stream([inputMessage2], {
...config,
streamMode: "values",
})) {
console.log(chunk.content);
}
长期记忆
长期记忆 允许在不同 线程ID 之间存储信息。这可以用于在一个对话中学习关于特定用户的信息,并在另一个对话中使用它。工作流程
- 工作流程和智能体 指南,了解更多如何使用功能API构建工作流程的示例。
集成其他库
- 使用功能API将LangGraph的功能添加到其他框架中:将LangGraph的持久性、记忆和流等功能添加到那些默认不提供这些功能的其他智能体框架中。