Skip to main content
持久执行是一种技术,其中某个进程或工作流在关键点保存其进度,允许它暂停并在稍后从上次停止的地方继续执行。这在需要人工干预的场景中特别有用,用户可以在继续之前检查、验证或修改流程,以及在可能遇到中断或错误的长时间运行的任务中(例如,调用LLM超时)。通过保留已完成的工作,持久执行使进程能够在不重新处理之前步骤的情况下恢复——即使是在显著延迟之后(例如,一周后)。 LangGraph 内置的 持久化 层为工作流提供持久的执行,确保每个执行步骤的状态都保存到持久存储中。这一功能保证,如果工作流被中断——无论是由于系统故障还是 人机交互 ——它可以从最后记录的状态恢复。
如果您在使用带有检查点的LangGraph,您已经启用了持久执行。您可以在任何时刻暂停和恢复工作流程,即使在中断或失败之后。 为了充分利用持久执行,请确保您的工作流程被设计为确定性幂等性,并将任何副作用或非确定性操作封装在任务中。您可以从StateGraph (Graph API)功能API中使用任务

需求

为了利用LangGraph中的持久执行,您需要:
  1. 在您的流程中启用persistence,通过指定一个checkpointer来保存流程进度。
  2. 在执行流程时指定一个线程标识符。这将跟踪特定流程实例的执行历史。
将任何非确定性操作(例如,随机数生成)或具有副作用的操作(例如,文件写入、API调用)放在@[task]内,以确保在恢复工作流时,这些操作不会在特定运行中重复,而是从持久化层检索其结果。有关更多信息,请参阅确定性及一致重放

确定性及一致回放

当您恢复工作流运行时,代码不会从停止执行的同一条代码行继续;相反,它将识别一个合适的 起始点,从该点开始继续执行。这意味着工作流将从 起始点 重新播放所有步骤,直到达到停止的位置。 因此,在编写用于持久执行的流程时,您必须将任何非确定性操作(例如,随机数生成)以及任何具有副作用的操作(例如,文件写入、API调用)封装在任务节点中。 为确保您的流程是确定性的并且可以一致性地重放,请遵循以下指南:
  • 避免重复工作:如果一个 节点 包含多个具有副作用(例如,日志记录、文件写入或网络调用)的操作,请将每个操作包裹在一个单独的 任务 中。这确保了当工作流程恢复时,操作不会被重复执行,并且可以从持久化层检索其结果。
  • 封装非确定性操作:将可能产生非确定性结果(例如,随机数生成)的任何代码包裹在 任务节点 中。这确保了在工作流程恢复后,工作流程将遵循记录的精确步骤序列,并产生相同的结果。
  • 使用幂等操作:当可能时,确保副作用(例如,API 调用、文件写入)是幂等的。这意味着如果操作在工作流程失败后重试,它将产生与第一次执行相同的效果。这对于导致数据写入的操作尤为重要。如果 任务 开始但未能成功完成,工作流程的恢复将重新运行该 任务,依靠记录的结果来保持一致性。使用幂等性键或验证现有结果以避免意外重复,确保工作流程执行平稳且可预测。
以下是一些需要避免的陷阱的示例,请参阅功能API中的常见陷阱部分,该部分展示了如何使用任务来构建您的代码以避免这些问题。同样的原则也适用于状态图(Graph API)

耐久度模式

LangGraph 支持三种耐用模式,允许您根据应用程序的需求在性能和数据一致性之间进行平衡。耐用模式从最不耐用到最耐用依次如下: 更高的耐用性模式会增加工作流程执行的额外开销。
自 v0.6.0 版本开始添加 使用 durability 参数代替 checkpoint_during(自 v0.6.0 版本已弃用)进行持久化策略管理:
  • durability="async" 替换 checkpoint_during=True
  • durability="exit" 替换 checkpoint_during=False
用于持久性策略管理,具有以下映射:
  • checkpoint_during=True -> durability="async"
  • checkpoint_during=False -> durability="exit"

"exit"

更改仅在图执行完成后(无论是成功还是出错)才会持久化。这为长时间运行的图提供了最佳性能,但意味着中间状态不会被保存,因此您无法从执行中途的失败中恢复或中断图执行。

"async"

更改在执行下一步时异步持久化。这提供了良好的性能和持久性,但存在一个小的风险,如果在执行过程中进程崩溃,检查点可能不会被写入。

"sync"

在下一步开始之前,更改被同步持久化。这确保在继续执行之前每个检查点都已写入,以牺牲一些性能开销为代价,提供高可靠性。 您可以在调用任何图执行方法时指定耐用模式:
graph.stream(
    {"input": "test"},
    durability="sync"
)

在节点中使用任务

如果一个 节点 包含多个操作,您可能会发现将每个操作转换为 任务 而不是将操作重构为单独的节点更容易一些。
from typing import NotRequired
from typing_extensions import TypedDict
import uuid

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
import requests

# Define a TypedDict to represent the state
class State(TypedDict):
    url: str
    result: NotRequired[str]

def call_api(state: State):
    """Example node that makes an API request."""
    result = requests.get(state['url']).text[:100]  # Side-effect  #
    return {
        "result": result
    }

# Create a StateGraph builder and add a node for the call_api function
builder = StateGraph(State)
builder.add_node("call_api", call_api)

# Connect the start and end nodes to the call_api node
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)

# Specify a checkpointer
checkpointer = InMemorySaver()

# Compile the graph with the checkpointer
graph = builder.compile(checkpointer=checkpointer)

# Define a config with a thread ID.
thread_id = uuid.uuid4()
config = {"configurable": {"thread_id": thread_id}}

# Invoke the graph
graph.invoke({"url": "https://www.example.com"}, config)

恢复工作流

一旦您在您的流程中启用了持久执行,您可以恢复以下场景的执行:
  • 暂停和恢复工作流程: 使用中断函数在特定点暂停工作流程,并使用Command原语以更新状态恢复它。有关更多详细信息,请参阅中断
  • 从失败中恢复: 在异常(例如,LLM 提供商中断)后,自动从最后一个成功的检查点恢复工作流程。这涉及到通过提供 None 作为输入值以相同的线程标识符执行工作流程(请参阅使用功能 API 的此 示例)。

工作流程恢复的起点

  • 如果您正在使用 StateGraph (Graph API),起始点将是执行停止的 节点 的开始处。
  • 如果您在节点内部进行子图调用,起始点将是调用已停止的子图的 父节点。 在子图内部,起始点将是执行停止的特定 节点
  • 如果您正在使用功能API,起始点将是执行停止的 入口点 的开始处。