Skip to main content
功能API 允许您以最小的现有代码更改,将 LangGraph 的关键特性——持久化记忆人机交互流式传输——添加到您的应用程序中。 它旨在将这些功能集成到可能使用标准语言原语进行分支和控制流的现有代码中,例如 if 语句、for 循环和函数调用。与许多需要将代码重构为显式管道或DAG的数据编排框架不同,功能API允许您在不强制执行严格执行模型的情况下,将这些功能融入其中。 功能API使用两个关键构建块:
  • entrypoint – 入口点封装工作流程逻辑并管理执行流程,包括处理长时间运行的任务和中断。
  • task – 表示离散的工作单元,例如API调用或数据处理步骤,可以在入口点内异步执行。任务返回类似未来的对象,可以等待或同步解决。
这为构建具有状态管理和流式传输的工作流程提供了一个最小化抽象。
有关如何使用功能API的信息,请参阅使用功能API

功能API与图API

对于偏好声明式方法的用户,LangGraph的图API允许您使用图范式定义工作流程。这两个API共享相同的底层运行时,因此您可以在同一应用程序中同时使用它们。 以下是一些关键区别:
  • 控制流:功能API不需要考虑图结构。您可以使用标准的Python结构来定义工作流程。这通常会减少您需要编写的代码量。
  • 短期记忆GraphAPI需要声明一个状态,可能还需要定义reducer来管理图状态的更新。@entrypoint@tasks不需要显式状态管理,因为它们的状态仅限于函数内部,并且不会在函数之间共享。
  • 检查点:两个API都生成并使用检查点。在Graph API中,每次超级步骤之后都会生成一个新的检查点。在Functional API中,当任务执行时,它们的输出会保存到与给定入口点相关联的现有检查点中,而不是创建一个新的检查点。
  • 可视化:Graph API使得将工作流程可视化为图变得容易,这对于调试、理解工作流程和与他人分享都很有用。功能API不支持可视化,因为图是在运行时动态生成的。

示例

以下我们展示了一个简单的应用程序,该程序可以撰写文章并中断以请求人工审核。
import { MemorySaver, entrypoint, task, interrupt } from "@langchain/langgraph";

const writeEssay = task("writeEssay", async (topic: string) => {
  // A placeholder for a long-running task.
  await new Promise((resolve) => setTimeout(resolve, 1000));
  return `An essay about topic: ${topic}`;
});

const workflow = entrypoint(
  { checkpointer: new MemorySaver(), name: "workflow" },
  async (topic: string) => {
    const essay = await writeEssay(topic);
    const isApproved = interrupt({
      // Any json-serializable payload provided to interrupt as argument.
      // It will be surfaced on the client side as an Interrupt when streaming data
      // from the workflow.
      essay, // The essay we want reviewed.
      // We can add any additional information that we need.
      // For example, introduce a key called "action" with some instructions.
      action: "Please approve/reject the essay",
    });

    return {
      essay, // The essay that was generated
      isApproved, // Response from HIL
    };
  }
);
此工作流程将撰写一篇关于“猫”主题的论文,然后暂停以获取人类的评审。工作流程可以在提供评审之前无限期地中断。当工作流恢复时,它将从起点开始执行,但由于 writeEssay 任务的输出已经被保存,任务结果将直接从检查点加载,而不是重新计算。
import { v4 as uuidv4 } from "uuid";
import { MemorySaver, entrypoint, task, interrupt } from "@langchain/langgraph";

const writeEssay = task("writeEssay", async (topic: string) => {
  // This is a placeholder for a long-running task.
  await new Promise(resolve => setTimeout(resolve, 1000));
  return `An essay about topic: ${topic}`;
});

const workflow = entrypoint(
  { checkpointer: new MemorySaver(), name: "workflow" },
  async (topic: string) => {
    const essay = await writeEssay(topic);
    const isApproved = interrupt({
      // Any json-serializable payload provided to interrupt as argument.
      // It will be surfaced on the client side as an Interrupt when streaming data
      // from the workflow.
      essay, // The essay we want reviewed.
      // We can add any additional information that we need.
      // For example, introduce a key called "action" with some instructions.
      action: "Please approve/reject the essay",
    });

    return {
      essay, // The essay that was generated
      isApproved, // Response from HIL
    };
  }
);

const threadId = uuidv4();

const config = {
  configurable: {
    thread_id: threadId
  }
};

for await (const item of workflow.stream("cat", config)) {
  console.log(item);
}
{ writeEssay: 'An essay about topic: cat' }
{
  __interrupt__: [{
    value: { essay: 'An essay about topic: cat', action: 'Please approve/reject the essay' },
    resumable: true,
    ns: ['workflow:f7b8508b-21c0-8b4c-5958-4e8de74d2684'],
    when: 'during'
  }]
}
一篇论文已经撰写完毕,准备进行审阅。一旦审阅完成,我们就可以继续工作流程:
import { Command } from "@langchain/langgraph";

// Get review from a user (e.g., via a UI)
// In this case, we're using a bool, but this can be any json-serializable value.
const humanReview = true;

for await (const item of workflow.stream(new Command({ resume: humanReview }), config)) {
  console.log(item);
}
{ workflow: { essay: 'An essay about topic: cat', isApproved: true } }
工作流程已完成,并已将审阅添加到文章中。

入口点

智能体函数 entrypoint 可以用于从函数创建工作流。它封装工作流逻辑并管理执行流程,包括处理 长时间运行的任务中断

定义

一个 入口点 通过调用 entrypoint 函数并传入配置和一个函数来定义。 该函数必须接受一个位置参数,该参数作为工作流程输入。如果您需要传递多个数据项,请将对象用作第一个参数的输入类型。 使用函数创建一个入口点将生成一个工作流实例,这有助于管理工作流的执行(例如,处理流式传输、恢复和检查点)。 您通常会希望将 检查点器 传递给 entrypoint 函数以启用持久性并使用 人机交互 等功能。
import { entrypoint } from "@langchain/langgraph";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (someInput: Record<string, any>): Promise<number> => {
    // some logic that may involve long-running tasks like API calls,
    // and may be interrupted for human-in-the-loop
    return result;
  }
);
序列化 入口点的 输入输出 必须是可JSON序列化的,以支持检查点。请参阅 序列化 部分,获取更多详细信息。

执行

使用entrypoint函数将返回一个可以使用invokestream方法执行的对象。
const config = {
  configurable: {
    thread_id: "some_thread_id"
  }
};
await myWorkflow.invoke(someInput, config); // Wait for the result

恢复

在执行中断后恢复执行可以通过向 Command 原语传递一个 resume 值来实现。
import { Command } from "@langchain/langgraph";

const config = {
  configurable: {
    thread_id: "some_thread_id"
  }
};

await myWorkflow.invoke(new Command({ resume: someResumeValue }), config);
错误后恢复 在错误后恢复,运行 entrypoint 并使用 null 以及相同的 线程 ID(配置)。 这假设底层错误已被解决,执行可以成功进行。
const config = {
  configurable: {
    thread_id: "some_thread_id"
  }
};

await myWorkflow.invoke(null, config);

短期记忆

当使用 checkpointer 定义 entrypoint 时,它会在 检查点 中存储在相同 线程 ID 的连续调用之间的信息。 这允许使用 getPreviousState 函数访问前一次调用的状态。 默认情况下,getPreviousState 函数返回前一次调用的返回值。
import { entrypoint, getPreviousState } from "@langchain/langgraph";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (number: number) => {
    const previous = getPreviousState<number>() ?? 0;
    return number + previous;
  }
);

const config = {
  configurable: {
    thread_id: "some_thread_id",
  },
};

await myWorkflow.invoke(1, config); // 1 (previous was undefined)
await myWorkflow.invoke(2, config); // 3 (previous was 1 from the previous invocation)

entrypoint.final

entrypoint.final 是一种特殊的原始类型,可以从入口点返回,并允许将 保存在检查点中的值入口点的返回值 解耦 第一个值是入口点的返回值,第二个值是将在检查点中保存的值。
import { entrypoint, getPreviousState } from "@langchain/langgraph";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (number: number) => {
    const previous = getPreviousState<number>() ?? 0;
    // This will return the previous value to the caller, saving
    // 2 * number to the checkpoint, which will be used in the next invocation
    // for the `previous` parameter.
    return entrypoint.final({
      value: previous,
      save: 2 * number,
    });
  }
);

const config = {
  configurable: {
    thread_id: "1",
  },
};

await myWorkflow.invoke(3, config); // 0 (previous was undefined)
await myWorkflow.invoke(1, config); // 6 (previous was 3 * 2 from the previous invocation)

任务

一个 任务 代表一个离散的工作单元,例如API调用或数据处理步骤。它有两个关键特征:
  • 异步执行:任务被设计为异步执行,允许多个操作同时运行而不会阻塞。
  • 检查点:任务结果被保存到检查点,使得可以从最后保存的状态恢复工作流程。(有关更多详细信息,请参阅persistence)。

定义

任务使用 task 函数定义,该函数包装了一个常规函数。
import { task } from "@langchain/langgraph";

const slowComputation = task("slowComputation", async (inputValue: any) => {
  // Simulate a long-running operation
  return result;
});
序列化 任务的输出必须是JSON可序列化的,以支持检查点保存。

执行

任务 只能在 入口点、另一个 任务状态图节点 内部调用。 任务 不能 从主应用程序代码中直接调用。 当你调用一个 任务 时,它返回一个可以被等待的 Promise。
const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (someInput: number): Promise<number> => {
    return await slowComputation(someInput);
  }
);

何时使用任务

任务在以下场景中非常有用:
  • 检查点: 当您需要将长时间运行的操作的结果保存到检查点时,以便在恢复工作流程时无需重新计算。
  • 人工干预: 如果您正在构建需要人工干预的工作流程,您必须使用任务来封装任何随机性(例如API调用),以确保工作流程可以正确恢复。有关更多详细信息,请参阅确定性部分。
  • 并行执行: 对于I/O密集型任务,任务可以启用并行执行,允许多个操作同时运行而不会阻塞(例如,调用多个API)。
  • 可观察性: 使用任务封装操作提供了一种跟踪工作流程进度和监控单个操作执行的方式,使用LangSmith
  • 可重试工作: 当工作需要重试以处理失败或不一致性时,任务提供了一种封装和管理重试逻辑的方式。

序列化

LangGraph中的序列化有两个关键方面:
  1. entrypoint 的输入和输出必须是可 JSON 序列化的。
  2. task 的输出必须是可 JSON 序列化的。
这些要求对于启用检查点和工作流程恢复是必要的。使用对象、数组、字符串、数字和布尔值等基本类型,以确保您的输入和输出可序列化。 序列化确保工作流状态,如任务结果和中间值,可以可靠地保存和恢复。这对于实现人工参与、容错性和并行执行至关重要。 提供不可序列化的输入或输出,当工作流配置了检查点器时,将导致运行时错误。

决定性

为了利用如人机交互等特性,任何随机性都应该封装在任务中。这保证了当执行被暂停(例如,为了人机交互)然后恢复时,它将遵循相同的_步骤序列_,即使任务的结果是非确定性的。 LangGraph通过在执行过程中持久化任务子图的结果来实现这种行为。一个精心设计的流程确保了恢复执行遵循相同的步骤序列,从而可以正确检索先前计算的结果,而无需重新执行它们。这对于长时间运行的任务或具有非确定性结果的任务特别有用,因为它避免了重复之前已完成的工作,并允许从本质上相同的状态恢复。 尽管工作流的每次运行可能产生不同的结果,但恢复特定运行时,应始终遵循记录的步骤序列。这允许LangGraph高效地查找在图被中断之前执行的任务子图结果,从而避免重新计算它们。

一致性

幂等性确保多次执行相同的操作会产生相同的结果。这有助于防止重复的API调用和冗余处理,如果由于失败而重新执行某个步骤。始终将API调用放在任务函数中进行检查点设置,并设计它们在重新执行时保持幂等性。如果任务开始但未成功完成,则可能发生重新执行。然后,如果工作流恢复,任务将再次运行。使用幂等性键或验证现有结果以避免重复。

常见陷阱

处理副作用

将副作用(例如,写入文件、发送电子邮件)封装在任务中,以确保在恢复工作流时不会多次执行。
在本例中,一个副作用(写入文件)被直接包含在工作流程中,因此当恢复工作流程时,它将被执行第二次。
import { entrypoint, interrupt } from "@langchain/langgraph";
import fs from "fs";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow },
  async (inputs: Record<string, any>) => {
    // This code will be executed a second time when resuming the workflow.
    // Which is likely not what you want.
    fs.writeFileSync("output.txt", "Side effect executed");
    const value = interrupt("question");
    return value;
  }
);

非确定性控制流

可能每次都会产生不同结果的操作(如获取当前时间或随机数)应封装在任务中,以确保在恢复时返回相同的结果。
  • 在任务中:获取随机数(5)→ 中断 → 恢复 →(再次返回5)→ …
  • 不在任务中:获取随机数(5)→ 中断 → 恢复 → 获取新的随机数(7)→ …
这在使用人机交互工作流程和多个中断调用时尤为重要。LangGraph为每个任务/入口点保留一个恢复值的列表。当遇到中断时,它会与相应的恢复值进行匹配。这种匹配是严格基于索引的,因此恢复值的顺序应与中断的顺序相匹配。 如果在恢复执行时未保持执行顺序,一个 interrupt 调用可能与错误的 resume 值匹配,从而导致结果不正确。 请阅读关于确定性的部分以获取更多详细信息。
在本例中,工作流程使用当前时间来确定要执行哪个任务。这是非确定性的,因为工作流程的结果取决于执行的时间。
import { entrypoint, interrupt } from "@langchain/langgraph";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (inputs: { t0: number }) => {
    const t1 = Date.now();

    const deltaT = t1 - inputs.t0;

    if (deltaT > 1000) {
      const result = await slowTask(1);
      const value = interrupt("question");
      return { result, value };
    } else {
      const result = await slowTask(2);
      const value = interrupt("question");
      return { result, value };
    }
  }
);