---
title: 中断机制
---
中断允许你在特定点暂停图执行,并在继续之前等待外部输入。这实现了需要外部输入才能继续的“人在回路”模式。当触发中断时,LangGraph 会使用其[持久化](/oss/python/langgraph/persistence)层保存图状态,并无限期等待,直到你恢复执行。
中断通过在图的节点中的任意位置调用 `interrupt()` 函数来工作。该函数接受任何可 JSON 序列化的值,该值会暴露给调用者。当你准备好继续时,通过使用 `Command` 重新调用图来恢复执行,该值随后成为节点内部 `interrupt()` 调用的返回值。
与静态断点(在特定节点之前或之后暂停)不同,中断是**动态的**——它们可以放在代码中的任何位置,并且可以根据你的应用逻辑有条件地触发。
- **检查点保持你的位置:** 检查点写入确切的图状态,因此你可以在以后恢复,即使处于错误状态。
- **`thread_id` 是你的指针:** 设置 `config={"configurable": {"thread_id": ...}}` 来告诉检查点器加载哪个状态。
- **中断负载以 `__interrupt__` 形式暴露:** 你传递给 `interrupt()` 的值会在 `__interrupt__` 字段中返回给调用者,以便你知道图在等待什么。
你选择的 `thread_id` 实际上就是你的持久化游标。重用它会恢复同一个检查点;使用新值则会以空状态启动一个全新的线程。
## 使用 `interrupt` 暂停
`interrupt` 函数暂停图执行并向调用者返回一个值。当你在节点内调用 `interrupt` 时,LangGraph 会保存当前图状态并等待你提供输入以恢复执行。
要使用 `interrupt`,你需要:
1. 一个**检查点器**来持久化图状态(在生产环境中使用持久的检查点器)
2. 配置中的一个**线程 ID**,以便运行时知道从哪个状态恢复
3. 在你想要暂停的地方调用 `interrupt()`(负载必须是可 JSON 序列化的)
```python
from langgraph.types import interrupt
def approval_node(state: State):
# 暂停并请求批准
approved = interrupt("你批准这个操作吗?")
# 当你恢复时,Command(resume=...) 的值会在这里返回
return {"approved": approved}
当你调用 interrupt 时,会发生以下情况:
- 图执行在调用
interrupt 的确切点被挂起
- 状态被保存,使用检查点器,以便以后可以恢复执行。在生产环境中,这应该是一个持久化的检查点器(例如,由数据库支持)
- 值在
__interrupt__ 下返回给调用者;它可以是任何可 JSON 序列化的值(字符串、对象、数组等)
- 图无限期等待,直到你带着响应恢复执行
- 当你恢复时,响应被传递回节点,成为
interrupt() 调用的返回值
恢复中断
中断暂停执行后,你通过再次调用图并提供一个包含恢复值的 Command 来恢复图。恢复值被传递回 interrupt 调用,允许节点使用外部输入继续执行。
from langgraph.types import Command
# 初始运行 - 遇到中断并暂停
# thread_id 是持久化指针(在生产环境中存储稳定的 ID)
config = {"configurable": {"thread_id": "thread-1"}}
result = graph.invoke({"input": "data"}, config=config)
# 检查中断了什么
# __interrupt__ 包含传递给 interrupt() 的负载
print(result["__interrupt__"])
# > [Interrupt(value='你批准这个操作吗?')]
# 带着人工响应恢复
# resume 负载成为节点内部 interrupt() 的返回值
graph.invoke(Command(resume=True), config=config)
关于恢复的关键点:
- 恢复时必须使用与中断发生时相同的线程 ID
- 传递给
Command(resume=...) 的值成为 interrupt 调用的返回值
- 恢复时,节点从调用
interrupt 的节点的开头重新启动,因此 interrupt 之前的任何代码都会再次运行
- 你可以传递任何可 JSON 序列化的值作为恢复值
常见模式
中断解锁的关键能力是能够暂停执行并等待外部输入。这对于各种用例非常有用,包括:
- 审批工作流:在执行关键操作(API 调用、数据库更改、金融交易)之前暂停
- 审查和编辑状态:让人工在继续之前审查和修改 LLM 输出或工具调用
- 中断工具调用:在执行工具调用之前暂停,以便在执行前审查和编辑工具调用
- 验证人工输入:在继续下一步之前暂停以验证人工输入
批准或拒绝
中断最常见的用途之一是在关键操作之前暂停并请求批准。例如,你可能希望请求人工批准一个 API 调用、数据库更改或任何其他重要决策。
from typing import Literal
from langgraph.types import interrupt, Command
def approval_node(state: State) -> Command[Literal["proceed", "cancel"]]:
# 暂停执行;负载出现在 result["__interrupt__"] 下
is_approved = interrupt({
"question": "你想要继续这个操作吗?",
"details": state["action_details"]
})
# 根据响应路由
if is_approved:
return Command(goto="proceed") # 在提供恢复负载后运行
else:
return Command(goto="cancel")
当你恢复图时,传递 true 来批准或 false 来拒绝:
# 批准
graph.invoke(Command(resume=True), config=config)
# 拒绝
graph.invoke(Command(resume=False), config=config)
import sqlite3
from typing import Literal, Optional, TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt
class ApprovalState(TypedDict):
action_details: str
status: Optional[Literal["pending", "approved", "rejected"]]
def approval_node(state: ApprovalState) -> Command[Literal["proceed", "cancel"]]:
# 暴露详细信息,以便调用者可以在 UI 中呈现它们
decision = interrupt({
"question": "批准这个操作吗?",
"details": state["action_details"],
})
# 恢复后路由到适当的节点
return Command(goto="proceed" if decision else "cancel")
def proceed_node(state: ApprovalState):
return {"status": "approved"}
def cancel_node(state: ApprovalState):
return {"status": "rejected"}
builder = StateGraph(ApprovalState)
builder.add_node("approval", approval_node)
builder.add_node("proceed", proceed_node)
builder.add_node("cancel", cancel_node)
builder.add_edge(START, "approval")
builder.add_edge("approval", "proceed")
builder.add_edge("approval", "cancel")
builder.add_edge("proceed", END)
builder.add_edge("cancel", END)
# 在生产环境中使用更持久的检查点器
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "approval-123"}}
initial = graph.invoke(
{"action_details": "转账 $500", "status": "pending"},
config=config,
)
print(initial["__interrupt__"]) # -> [Interrupt(value={'question': ..., 'details': ...})]
# 带着决策恢复;True 路由到 proceed,False 路由到 cancel
resumed = graph.invoke(Command(resume=True), config=config)
print(resumed["status"]) # -> "approved"
审查和编辑状态
有时你希望让人工在继续之前审查并编辑部分图状态。这对于纠正 LLM、添加缺失信息或进行调整非常有用。
from langgraph.types import interrupt
def review_node(state: State):
# 暂停并显示当前内容以供审查(出现在 result["__interrupt__"] 中)
edited_content = interrupt({
"instruction": "审查并编辑此内容",
"content": state["generated_text"]
})
# 使用编辑后的版本更新状态
return {"generated_text": edited_content}
恢复时,提供编辑后的内容:
graph.invoke(
Command(resume="编辑和改进后的文本"), # 该值成为 interrupt() 的返回值
config=config
)
import sqlite3
from typing import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt
class ReviewState(TypedDict):
generated_text: str
def review_node(state: ReviewState):
# 请求审查者编辑生成的内容
updated = interrupt({
"instruction": "审查并编辑此内容",
"content": state["generated_text"],
})
return {"generated_text": updated}
builder = StateGraph(ReviewState)
builder.add_node("review", review_node)
builder.add_edge(START, "review")
builder.add_edge("review", END)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "review-42"}}
initial = graph.invoke({"generated_text": "初始草稿"}, config=config)
print(initial["__interrupt__"]) # -> [Interrupt(value={'instruction': ..., 'content': ...})]
# 带着审查者编辑后的文本恢复
final_state = graph.invoke(
Command(resume="审查后改进的草稿"),
config=config,
)
print(final_state["generated_text"]) # -> "审查后改进的草稿"
工具中的中断
你也可以直接将中断放在工具函数内部。这使得工具本身在每次被调用时都会暂停等待批准,并允许在执行前人工审查和编辑工具调用。
首先,定义一个使用 interrupt 的工具:
from langchain.tools import tool
from langgraph.types import interrupt
@tool
def send_email(to: str, subject: str, body: str):
"""发送邮件给收件人。"""
# 在发送前暂停;负载出现在 result["__interrupt__"] 下
response = interrupt({
"action": "send_email",
"to": to,
"subject": subject,
"body": body,
"message": "批准发送这封邮件吗?"
})
if response.get("action") == "approve":
# 恢复值可以在执行前覆盖输入
final_to = response.get("to", to)
final_subject = response.get("subject", subject)
final_body = response.get("body", body)
return f"邮件已发送给 {final_to},主题为 '{final_subject}'"
return "邮件已被用户取消"
当你希望批准逻辑与工具本身共存,使其可在图的不同部分重用时,这种方法很有用。LLM 可以自然地调用该工具,而中断会在工具被调用时暂停执行,允许你批准、编辑或取消操作。
import sqlite3
from typing import TypedDict
from langchain.tools import tool
from langchain_anthropic import ChatAnthropic
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt
class AgentState(TypedDict):
messages: list[dict]
@tool
def send_email(to: str, subject: str, body: str):
"""发送邮件给收件人。"""
# 在发送前暂停;负载出现在 result["__interrupt__"] 下
response = interrupt({
"action": "send_email",
"to": to,
"subject": subject,
"body": body,
"message": "批准发送这封邮件吗?",
})
if response.get("action") == "approve":
final_to = response.get("to", to)
final_subject = response.get("subject", subject)
final_body = response.get("body", body)
# 实际发送邮件(你的实现在这里)
print(f"[send_email] to={final_to} subject={final_subject} body={final_body}")
return f"邮件已发送给 {final_to}"
return "邮件已被用户取消"
model = ChatAnthropic(model="claude-sonnet-4-5").bind_tools([send_email])
def agent_node(state: AgentState):
# LLM 可能决定调用工具;中断在发送前暂停
result = model.invoke(state["messages"])
return {"messages": state["messages"] + [result]}
builder = StateGraph(AgentState)
builder.add_node("agent", agent_node)
builder.add_edge(START, "agent")
builder.add_edge("agent", END)
checkpointer = SqliteSaver(sqlite3.connect("tool-approval.db"))
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "email-workflow"}}
initial = graph.invoke(
{
"messages": [
{"role": "user", "content": "发送邮件给 alice@example.com 讨论会议事宜"}
]
},
config=config,
)
print(initial["__interrupt__"]) # -> [Interrupt(value={'action': 'send_email', ...})]
# 带着批准和可选编辑的参数恢复
resumed = graph.invoke(
Command(resume={"action": "approve", "subject": "更新后的主题"}),
config=config,
)
print(resumed["messages"][-1]) # -> send_email 返回的工具结果
验证人工输入
有时你需要验证来自人工的输入,并在无效时再次询问。你可以使用循环中的多个 interrupt 调用来实现这一点。
from langgraph.types import interrupt
def get_age_node(state: State):
prompt = "你的年龄是多少?"
while True:
answer = interrupt(prompt) # 负载出现在 result["__interrupt__"] 中
# 验证输入
if isinstance(answer, int) and answer > 0:
# 有效输入 - 继续
break
else:
# 无效输入 - 用更具体的提示再次询问
prompt = f"'{answer}' 不是有效的年龄。请输入一个正数。"
return {"age": answer}
每次你带着无效输入恢复图时,它都会用更清晰的消息再次询问。一旦提供了有效输入,节点就完成并且图继续。
import sqlite3
from typing import TypedDict
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt
class FormState(TypedDict):
age: int | None
def get_age_node(state: FormState):
prompt = "你的年龄是多少?"
while True:
answer = interrupt(prompt) # 负载出现在 result["__interrupt__"] 中
if isinstance(answer, int) and answer > 0:
return {"age": answer}
prompt = f"'{answer}' 不是有效的年龄。请输入一个正数。"
builder = StateGraph(FormState)
builder.add_node("collect_age", get_age_node)
builder.add_edge(START, "collect_age")
builder.add_edge("collect_age", END)
checkpointer = SqliteSaver(sqlite3.connect("forms.db"))
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "form-1"}}
first = graph.invoke({"age": None}, config=config)
print(first["__interrupt__"]) # -> [Interrupt(value='你的年龄是多少?', ...)]
# 提供无效数据;节点重新提示
retry = graph.invoke(Command(resume="三十"), config=config)
print(retry["__interrupt__"]) # -> [Interrupt(value="'三十' 不是有效的年龄...", ...)]
# 提供有效数据;循环退出并更新状态
final = graph.invoke(Command(resume=30), config=config)
print(final["age"]) # -> 30
中断规则
当你在节点内调用 interrupt 时,LangGraph 通过引发一个信号给运行时暂停的异常来挂起执行。这个异常通过调用栈向上传播,并被运行时捕获,该运行时通知图保存当前状态并等待外部输入。
当执行恢复时(在你提供请求的输入之后),运行时从节点的开头重新启动整个节点——它不会从调用 interrupt 的确切行恢复。这意味着在 interrupt 之前运行的任何代码都会再次执行。因此,在使用中断时需要遵循一些重要规则,以确保它们按预期运行。
不要将 interrupt 调用包装在 try/except 中
interrupt 通过在调用点抛出特殊异常来暂停执行。如果你将 interrupt 调用包装在 try/except 块中,你将捕获这个异常,并且中断不会传递回图。
- ✅ 将
interrupt 调用与易出错代码分开
- ✅ 在 try/except 块中使用特定的异常类型
def node_a(state: State):
# ✅ 好:先中断,然后单独处理错误条件
interrupt("你叫什么名字?")
try:
fetch_data() # 这可能失败
except Exception as e:
print(e)
return state
- 🔴 不要将
interrupt 调用包装在裸 try/except 块中
def node_a(state: State):
# ❌ 坏:将 interrupt 包装在裸 try/except 中会捕获中断异常
try:
interrupt("你叫什么名字?")
except Exception as e:
print(e)
return state
不要在节点内重新排序 interrupt 调用
在单个节点中使用多个中断很常见,但如果不小心处理,可能会导致意外行为。
当一个节点包含多个中断调用时,LangGraph 会为执行节点的任务维护一个特定的恢复值列表。每当执行恢复时,它从节点的开头开始。对于遇到的每个中断,LangGraph 检查任务恢复列表中是否存在匹配的值。匹配是严格基于索引的,因此节点内中断调用的顺序很重要。
- ✅ 在节点执行之间保持
interrupt 调用一致
def node_a(state: State):
# ✅ 好:中断调用每次都以相同的顺序发生
name = interrupt("你叫什么名字?")
age = interrupt("你的年龄是多少?")
city = interrupt("你在哪个城市?")
return {
"name": name,
"age": age,
"city": city
}
- 🔴 不要有条件地跳过节点内的
interrupt 调用
- 🔴 不要使用在执行之间非确定性的逻辑来循环
interrupt 调用
def node_a(state: State):
# ❌ 坏:有条件地跳过中断会改变顺序
name = interrupt("你叫什么名字?")
# 第一次运行时,这可能会跳过中断
# 恢复时,它可能不会跳过 - 导致索引不匹配
if state.get("needs_age"):
age = interrupt("你的年龄是多少?")
city = interrupt("你在哪个城市?")
return {"name": name, "city": city}
不要在 interrupt 调用中返回复杂值
根据使用的检查点器不同,复杂值可能无法序列化(例如,你无法序列化一个函数)。为了使你的图能够适应任何部署,最佳实践是仅使用可以合理序列化的值。
- ✅ 传递简单的、可 JSON 序列化的类型给
interrupt
- ✅ 传递包含简单值的字典/对象
def node_a(state: State):
# ✅ 好:传递可序列化的简单类型
name = interrupt("你叫什么名字?")
count = interrupt(42)
approved = interrupt(True)
return {"name": name, "count": count, "approved": approved}
- 🔴 不要将函数、类实例或其他复杂对象传递给
interrupt
def validate_input(value):
return len(value) > 0
def node_a(state: State):
# ❌ 坏:传递函数给 interrupt
# 函数无法序列化
response = interrupt({
"question": "你叫什么名字?",
"validator": validate_input # 这会失败
})
return {"name": response}
在 interrupt 之前调用的副作用必须是幂等的
因为中断通过重新运行它们被调用的节点来工作,所以在 interrupt 之前调用的副作用应该(理想情况下)是幂等的。上下文中的幂等性意味着相同的操作可以应用多次,而不会在初始执行之外改变结果。
例如,你可能有一个在节点内部更新记录的 API 调用。如果在进行该调用之后调用 interrupt,则在节点恢复时将多次重新运行它,可能会覆盖初始更新或创建重复记录。
- ✅ 在
interrupt 之前使用幂等操作
- ✅ 将副作用放在
interrupt 调用之后
- ✅ 尽可能将副作用分离到单独的节点中
def node_a(state: State):
# ✅ 好:使用幂等的 upsert 操作
# 多次运行此操作将具有相同的结果
db.upsert_user(
user_id=state["user_id"],
status="pending_approval"
)
approved = interrupt("批准此更改吗?")
return {"approved": approved}
- 🔴 不要在
interrupt 之前执行非幂等操作
- 🔴 不要在不检查是否存在的情况下创建新记录
def node_a(state: State):
# ❌ 坏:在中断前创建新记录
# 这将在每次恢复时创建重复记录
audit_id = db.create_audit_log({
"user_id": state["user_id"],
"action": "pending_approval",
"timestamp": datetime.now()
})
approved = interrupt("批准此更改吗?")
return {"approved": approved, "audit_id": audit_id}
与作为函数调用的子图一起使用
当在节点内调用子图时,父图将从调用子图并触发 interrupt 的节点的开头恢复执行。同样,子图也将从调用 interrupt 的节点的开头恢复。
def node_in_parent_graph(state: State):
some_code() # <-- 恢复时将重新执行
# 作为函数调用子图。
# 子图包含一个 `interrupt` 调用。
subgraph_result = subgraph.invoke(some_input)
async function node_in_subgraph(state: State) {
someOtherCode(); # <-- 恢复时也将重新执行
result = interrupt("你叫什么名字?")
...
}
使用中断进行调试
为了调试和测试图,你可以使用静态中断作为断点,逐步执行图执行,一次一个节点。静态中断在定义的点触发,要么在节点执行之前,要么在之后。你可以通过在编译图时指定 interrupt_before 和 interrupt_after 来设置这些。
静态中断不推荐用于人在回路工作流。请改用 interrupt 方法。
graph = builder.compile(
interrupt_before=["node_a"],
interrupt_after=["node_b", "node_c"],
checkpointer=checkpointer,
)
# 传递线程 ID 给图
config = {
"configurable": {
"thread_id": "some_thread"
}
}
# 运行图直到断点
graph.invoke(inputs, config=config)
# 恢复图
graph.invoke(None, config=config)
- 断点在
compile 期间设置。
interrupt_before 指定在执行节点之前应暂停执行的节点。
interrupt_after 指定在执行节点之后应暂停执行的节点。
- 需要检查点器来启用断点。
- 运行图直到遇到第一个断点。
- 通过为输入传递
None 来恢复图。这将运行图直到遇到下一个断点。
config = {
"configurable": {
"thread_id": "some_thread"
}
}
# 运行图直到断点
graph.invoke(
inputs,
interrupt_before=["node_a"],
interrupt_after=["node_b", "node_c"],
config=config,
)
# 恢复图
graph.invoke(None, config=config)
- 调用
graph.invoke 时带有 interrupt_before 和 interrupt_after 参数。这是一个运行时配置,可以在每次调用时更改。
interrupt_before 指定在执行节点之前应暂停执行的节点。
interrupt_after 指定在执行节点之后应暂停执行的节点。
- 运行图直到遇到第一个断点。
- 通过为输入传递
None 来恢复图。这将运行图直到遇到下一个断点。
使用 LangGraph Studio
你可以使用 LangGraph Studio 在运行图之前在 UI 中设置静态中断。你还可以使用 UI 检查执行中任意点的图状态。