Skip to main content
LangGraph内置了一个持久化层,通过检查点器实现。当你使用检查点器编译一个图时,检查点器会在每个超级步骤保存图的 checkpoint 状态。这些检查点被保存在一个 thread 中,图执行后可以访问。因为 threads 允许在执行后访问图的状态,所以包括人机交互、记忆、时间旅行和容错在内的几个强大功能都是可能的。下面,我们将更详细地讨论这些概念。 Checkpoints
LangGraph API 自动处理检查点 当使用 LangGraph API 时,您无需手动实现或配置检查点器。API 在幕后为您处理所有持久化基础设施。

线程

线程是检查点器为每个保存的检查点分配的唯一ID或线程标识符。它包含一系列运行的累积状态。当运行执行时,助手底层图的状态将被持久化到线程中。 在调用带有检查点的图时,您必须在配置的 configurable 部分指定一个 thread_id
{"configurable": {"thread_id": "1"}}
线程的当前和历史状态可以检索。为了持久化状态,必须在执行运行之前创建线程。LangSmith API提供了创建和管理线程及其状态的多个端点。有关详细信息,请参阅API参考

检查点

线程在特定时间点的状态被称为检查点。检查点是每个超级步骤保存的图状态的快照,并由以下关键属性表示的 StateSnapshot 对象表示:
  • config:与此检查点相关的配置。
  • metadata:与此检查点相关的元数据。
  • values:在此时间点状态通道的值。
  • next:执行图中的下一个节点名称的元组。
  • tasks:包含有关要执行的下个任务的信息的PregelTask对象的元组。如果步骤之前已尝试执行,它将包含错误信息。如果图在节点内部被中断 动态,任务将包含与中断相关的附加数据。
检查点被持久化,并且可以在稍后时间用于恢复线程的状态。 让我们看看当以如下方式调用一个简单的图时,保存了哪些检查点:
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langchain_core.runnables import RunnableConfig
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: str
    bar: Annotated[list[str], add]

def node_a(state: State):
    return {"foo": "a", "bar": ["a"]}

def node_b(state: State):
    return {"foo": "b", "bar": ["b"]}


workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)

checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

config: RunnableConfig = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": ""}, config)
在运行完图之后,我们期望看到恰好4个检查点:
  • 空检查点,以 START 作为下一个要执行的节点
  • 检查点,用户输入 {'foo': '', 'bar': []}node_a 作为下一个要执行的节点
  • 检查点,以 node_a {'foo': 'a', 'bar': ['a']}node_b 的输出作为下一个要执行的节点
  • 检查点,以 node_b {'foo': 'b', 'bar': ['a', 'b']} 的输出作为下一个要执行的节点,没有后续节点要执行
请注意,我们 bar 通道值包含来自两个节点的输出,因为我们有一个用于 bar 通道的reducer。

获取状态

当与保存的图状态交互时,您必须指定一个线程标识符。您可以通过调用 graph.get_state(config) 来查看图的_最新_状态。这将返回一个 StateSnapshot 对象,该对象对应于配置中提供的线程ID关联的最新检查点,或者如果提供了检查点ID,则对应于线程的检查点。
# get the latest state snapshot
config = {"configurable": {"thread_id": "1"}}
graph.get_state(config)

# get a state snapshot for a specific checkpoint_id
config = {"configurable": {"thread_id": "1", "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"}}
graph.get_state(config)
在我们的示例中,get_state 的输出将如下所示:
StateSnapshot(
    values={'foo': 'b', 'bar': ['a', 'b']},
    next=(),
    config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
    metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
    created_at='2024-08-29T19:19:38.821749+00:00',
    parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}}, tasks=()
)

获取状态历史

您可以通过调用 graph.get_state_history(config) 获取给定线程的图执行完整历史记录。这将返回与配置中提供的线程 ID 相关的 StateSnapshot 对象列表。重要的是,检查点将按时间顺序排列,最新的检查点 / StateSnapshot 将列表中的第一个。
config = {"configurable": {"thread_id": "1"}}
list(graph.get_state_history(config))
在我们的示例中,get_state_history 的输出将如下所示:
[
    StateSnapshot(
        values={'foo': 'b', 'bar': ['a', 'b']},
        next=(),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
        metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
        created_at='2024-08-29T19:19:38.821749+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        tasks=(),
    ),
    StateSnapshot(
        values={'foo': 'a', 'bar': ['a']},
        next=('node_b',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        metadata={'source': 'loop', 'writes': {'node_a': {'foo': 'a', 'bar': ['a']}}, 'step': 1},
        created_at='2024-08-29T19:19:38.819946+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        tasks=(PregelTask(id='6fb7314f-f114-5413-a1f3-d37dfe98ff44', name='node_b', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'foo': '', 'bar': []},
        next=('node_a',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        metadata={'source': 'loop', 'writes': None, 'step': 0},
        created_at='2024-08-29T19:19:38.817813+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        tasks=(PregelTask(id='f1b14528-5ee5-579c-949b-23ef9bfbed58', name='node_a', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'bar': []},
        next=('__start__',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        metadata={'source': 'input', 'writes': {'foo': ''}, 'step': -1},
        created_at='2024-08-29T19:19:38.816205+00:00',
        parent_config=None,
        tasks=(PregelTask(id='6d27aa2e-d72b-5504-a36f-8620e54a76dd', name='__start__', error=None, interrupts=()),),
    )
]
State

回放

它还可能播放回之前的图执行。如果我们 invoke 一个包含 thread_idcheckpoint_id 的图,那么我们将 重放 在对应于 checkpoint_id 的检查点之前的先前执行的步骤,并且只执行检查点之后的步骤。
  • thread_id 是线程的 ID。
  • checkpoint_id 是一个标识符,它指向线程中的特定检查点。
在调用图时,必须将这些参数传递作为配置的 configurable 部分的一部分:
config = {"configurable": {"thread_id": "1", "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}}
graph.invoke(None, config=config)
重要的是,LangGraph知道特定的步骤是否之前已经执行过。如果是的话,LangGraph只需在图中 重放 那个特定的步骤,而不重新执行该步骤,但仅限于在提供的 checkpoint_id 之前的步骤。所有在 checkpoint_id 之后的步骤都将被执行(即,一个新的分支),即使它们之前已经执行过。请参阅这个时间旅行指南,了解有关重放更多内容 Replay

更新状态

除了重新播放特定 checkpoints 中的图之外,我们还可以 编辑 图的状态。我们使用 graph.update_state() 来完成这项操作。此方法接受三个不同的参数:

config

配置应包含 thread_id 指定要更新的线程。当仅传递 thread_id 时,我们更新(或分叉)当前状态。可选地,如果我们包含 checkpoint_id 字段,那么我们将分叉所选的检查点。

values

这些值将用于更新状态。请注意,此更新与从节点接收的任何更新处理方式完全相同。这意味着如果图状态中的一些通道定义了reducer函数,这些值将被传递给这些函数。这意味着 update_state 不会自动覆盖每个通道的通道值,而只会覆盖没有reducers的通道。让我们通过一个例子来了解一下。 让我们假设您已经使用以下架构定义了您的图状态(参见上面的完整示例):
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: int
    bar: Annotated[list[str], add]
让我们现在假设图当前的状
{"foo": 1, "bar": ["a"]}
如果您更新状态如下:
graph.update_state(config, {"foo": 2, "bar": ["b"]})
然后,图的新的状态将是:
{"foo": 2, "bar": ["a", "b"]}
foo 键(通道)被完全更改(因为未指定该通道的reducer,所以 update_state 会覆盖它)。然而,已指定 bar 键的reducer,因此它将 "b" 追加到 bar 的状态中。

as_node

在调用 update_state 时,您可以可选地指定最后一个 as_node。如果您提供了它,更新将像来自节点 as_node 一样应用。如果没有提供 as_node,它将设置为最后更新状态的节点,如果不存在歧义。这之所以重要,是因为接下来要执行的步骤取决于最后提供更新的节点,因此这可以用来控制哪个节点接下来执行。有关分叉状态的更多了解,请参阅这个 时间旅行指南 Update

记忆存储

共享状态模型 一个 状态模式 指定了一组在图执行过程中填充的键。如上所述,状态可以被检查点写入每个图步骤的线程中,从而实现状态持久化。 但是,如果我们想保留一些信息 跨线程 呢?考虑这样一个聊天机器人的情况,我们希望在整个与该用户的 所有 聊天对话(例如,线程)中保留关于该用户的具体信息! 仅使用检查点器,我们无法在线程之间共享信息。这促使我们需要Store接口。作为一个例子,我们可以定义一个InMemoryStore来存储跨线程的用户信息。我们只需像以前一样使用检查点器编译我们的图,并使用我们新的in_memory_store变量。
LangGraph API 自动处理存储 当使用 LangGraph API 时,您无需手动实现或配置存储。API 在幕后为您处理所有存储基础设施。

基本用法

首先,让我们在不使用LangGraph的情况下单独展示这个功能。
from langgraph.store.memory import InMemoryStore
in_memory_store = InMemoryStore()
记忆通过一个 tuple 进行命名空间划分,在这个特定示例中将是 (<user_id>, "memories")。命名空间可以是任何长度,可以代表任何内容,不一定是用户特定的。
user_id = "1"
namespace_for_memory = (user_id, "memories")
我们使用 store.put 方法将记忆保存到存储中的命名空间。当我们这样做时,我们会指定命名空间,如上所述,以及一个记忆的键值对:键是记忆的简单唯一标识符 (memory_id),值(一个字典)则是记忆本身。
memory_id = str(uuid.uuid4())
memory = {"food_preference" : "I like pizza"}
in_memory_store.put(namespace_for_memory, memory_id, memory)
我们可以使用 store.search 方法在我们的命名空间中读取记忆,该方法将返回给定用户的全部记忆作为一个列表。最新的记忆位于列表的末尾。
memories = in_memory_store.search(namespace_for_memory)
memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
 'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
 'namespace': ['1', 'memories'],
 'created_at': '2024-10-02T17:22:31.590602+00:00',
 'updated_at': '2024-10-02T17:22:31.590605+00:00'}
每种内存类型都是一个Python类([Item](https://langchain-ai.github.io/langgraph/reference/store/#langgraph.store.base.Item)),具有某些属性。我们可以通过上述的`.dict`将其转换为字典来访问。 该智能体具有以下属性:
  • value:此内存的值(本身也是一个字典)
  • key:在此命名空间中此内存的唯一键
  • namespace:字符串列表,此内存类型的命名空间
  • created_at:此内存创建时的戳记
  • updated_at:此内存更新时的戳记

语义搜索

除了简单的检索之外,该存储还支持语义搜索,允许您根据意义而非精确匹配来查找记忆。要启用此功能,请使用嵌入模型配置存储:
from langchain.embeddings import init_embeddings

store = InMemoryStore(
    index={
        "embed": init_embeddings("openai:text-embedding-3-small"),  # Embedding provider
        "dims": 1536,                              # Embedding dimensions
        "fields": ["food_preference", "$"]              # Fields to embed
    }
)
现在在搜索时,您可以使用自然语言查询来查找相关记忆:
# Find memories about food preferences
# (This can be done after putting memories into the store)
memories = store.search(
    namespace_for_memory,
    query="What does the user like to eat?",
    limit=3  # Return top 3 matches
)
您可以通过配置 fields 参数或在存储记忆时指定 index 参数来控制哪些记忆部分被嵌入:
# Store with specific fields to embed
store.put(
    namespace_for_memory,
    str(uuid.uuid4()),
    {
        "food_preference": "I love Italian cuisine",
        "context": "Discussing dinner plans"
    },
    index=["food_preference"]  # Only embed "food_preferences" field
)

# Store without embedding (still retrievable, but not searchable)
store.put(
    namespace_for_memory,
    str(uuid.uuid4()),
    {"system_info": "Last updated: 2024-01-01"},
    index=False
)

在 LangGraph 中使用

有了这一切,我们在LangGraph中使用in_memory_storein_memory_store与检查点器协同工作:检查点器将状态保存到线程中,如上所述,而in_memory_store允许我们在线程之间存储任意信息。我们使用检查点器和in_memory_store编译图,如下所示。
from langgraph.checkpoint.memory import InMemorySaver

# We need this because we want to enable threads (conversations)
checkpointer = InMemorySaver()

# ... Define the graph ...

# Compile the graph with the checkpointer and store
graph = graph.compile(checkpointer=checkpointer, store=in_memory_store)
我们使用 thread_id 调用图,与之前一样,还使用 user_id,我们将用它来命名空间我们的记忆,以便为特定用户存储,正如上面所展示的。
# Invoke the graph
user_id = "1"
config = {"configurable": {"thread_id": "1", "user_id": user_id}}

# First let's just say hi to the AI
for update in graph.stream(
    {"messages": [{"role": "user", "content": "hi"}]}, config, stream_mode="updates"
):
    print(update)
我们可以通过传递 store: BaseStoreconfig: RunnableConfig 作为节点参数,在 任何节点 中访问 in_memory_storeuser_id。以下是我们在节点中使用语义搜索以查找相关记忆的方法:
def update_memory(state: MessagesState, config: RunnableConfig, *, store: BaseStore):

    # Get the user id from the config
    user_id = config["configurable"]["user_id"]

    # Namespace the memory
    namespace = (user_id, "memories")

    # ... Analyze conversation and create a new memory

    # Create a new memory ID
    memory_id = str(uuid.uuid4())

    # We create a new memory
    store.put(namespace, memory_id, {"memory": memory})

如上所示,我们也可以在任何节点中访问存储,并使用 store.search 方法来获取记忆。回忆一下,记忆是以对象列表的形式返回的,这些对象可以转换为字典。
memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
 'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
 'namespace': ['1', 'memories'],
 'created_at': '2024-10-02T17:22:31.590602+00:00',
 'updated_at': '2024-10-02T17:22:31.590605+00:00'}
我们可以访问记忆并使用它们在我们的模型调用中。
def call_model(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
    # Get the user id from the config
    user_id = config["configurable"]["user_id"]

    # Namespace the memory
    namespace = (user_id, "memories")

    # Search based on the most recent message
    memories = store.search(
        namespace,
        query=state["messages"][-1].content,
        limit=3
    )
    info = "\n".join([d.value["memory"] for d in memories])

    # ... Use memories in the model call
如果我们创建一个新的线程,只要 user_id 相同,我们仍然可以访问相同的记忆。
# Invoke the graph
config = {"configurable": {"thread_id": "2", "user_id": "1"}}

# Let's say hi again
for update in graph.stream(
    {"messages": [{"role": "user", "content": "hi, tell me about my memories"}]}, config, stream_mode="updates"
):
    print(update)
当我们在本地使用LangSmith(例如,在Studio中)或托管在LangSmith上时,默认情况下可以使用基础存储,无需在图编译期间指定。然而,要启用语义搜索,您需要在您的langgraph.json文件中配置索引设置。例如:
{
    ...
    "store": {
        "index": {
            "embed": "openai:text-embeddings-3-small",
            "dims": 1536,
            "fields": ["$"]
        }
    }
}
查看部署指南以获取更多详细信息及配置选项。

检查点库

在底层,检查点功能由符合BaseCheckpointSaver接口的检查点对象提供支持。LangGraph提供了多个检查点实现,所有实现均通过独立的、可安装的库完成:

检查点接口

每个检查点保存器都遵循BaseCheckpointSaver接口,并实现了以下方法:
  • .put - 存储带有其配置和元数据的检查点。
  • .put_writes - 存储与检查点相关联的中间写入操作(即待写入操作)。
  • .get_tuple - 使用给定的配置(thread_idcheckpoint_id)获取检查点元组。这用于在 graph.get_state() 中填充 StateSnapshot
  • .list - 列出与给定配置和筛选标准匹配的检查点。这用于在 graph.get_state_history() 中填充状态历史记录。
如果检查点器与异步图执行一起使用(即通过 .ainvoke.astream.abatch 执行图),将使用上述方法的异步版本(.aput.aput_writes.aget_tuple.alist)。
为了异步运行您的图,您可以使用 InMemorySaver,或者 Sqlite/Postgres 检查点的异步版本 — AsyncSqliteSaver / AsyncPostgresSaver 检查点。
序列化器 当检查点保存图状态时,它们需要将状态中的通道值进行序列化。这是通过使用序列化对象来完成的。 langgraph_checkpoint 定义了 协议,用于实现序列化器,提供了一个默认实现 (JsonPlusSerializer),该实现可以处理多种类型,包括 LangChain 和 LangGraph 原语、日期时间、枚举等。

使用 pickle 进行序列化

默认序列化器 JsonPlusSerializer 在底层使用 ormsgpack 和 JSON,这并不适合所有类型的对象。 如果您想回退到pickle来处理我们msgpack编码器目前不支持的对象(例如Pandas数据框),您可以使用JsonPlusSerializerpickle_fallback参数:
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer

# ... Define the graph ...
graph.compile(
    checkpointer=InMemorySaver(serde=JsonPlusSerializer(pickle_fallback=True))
)

加密

检查点器可以选择加密所有持久化状态。要启用此功能,请将 EncryptedSerializer 的实例传递给任何 BaseCheckpointSaver 实现的 serde 参数。创建加密序列化器的最简单方法是使用 from_pycryptodome_aes,它从 LANGGRAPH_AES_KEY 环境变量(或接受一个 key 参数)中读取 AES 密钥:
import sqlite3

from langgraph.checkpoint.serde.encrypted import EncryptedSerializer
from langgraph.checkpoint.sqlite import SqliteSaver

serde = EncryptedSerializer.from_pycryptodome_aes()  # reads LANGGRAPH_AES_KEY
checkpointer = SqliteSaver(sqlite3.connect("checkpoint.db"), serde=serde)
from langgraph.checkpoint.serde.encrypted import EncryptedSerializer
from langgraph.checkpoint.postgres import PostgresSaver

serde = EncryptedSerializer.from_pycryptodome_aes()
checkpointer = PostgresSaver.from_conn_string("postgresql://...", serde=serde)
checkpointer.setup()
当在LangSmith上运行时,只要存在 LANGGRAPH_AES_KEY,加密就会自动启用,因此您只需提供环境变量。可以通过实现 CipherProtocol 并将其提供给 EncryptedSerializer 来使用其他加密方案。

功能

人工增强循环

首先,检查点器通过允许人类检查、中断和批准图步骤,促进了人机交互工作流程。这些工作流程需要检查点器,因为人类必须能够在任何时候查看图的状态,并且图必须在人类对状态进行任何更新后能够恢复执行。请参阅操作指南以获取示例。

记忆

其次,检查点器允许在交互之间进行”记忆”。在重复的人机交互(如对话)的情况下,任何后续消息都可以发送到该线程,该线程将保留之前交互的记忆。有关如何使用检查点器添加和管理对话记忆的信息,请参阅添加记忆

时间旅行

第三,检查点器允许进行”时间旅行”,使用户能够回放之前的图执行以审查和/或调试特定的图步骤。此外,检查点器使得在任意检查点分叉图状态成为可能,以探索替代轨迹。

容错性

最后,检查点机制还提供了容错和错误恢复功能:如果在某个超级步骤中一个或多个节点失败,您可以从最后一个成功的步骤重新启动您的图。此外,当图节点在给定超级步骤的执行过程中失败时,LangGraph会存储来自在该超级步骤中成功完成的任何其他节点的挂起检查点写入,这样当我们从该超级步骤恢复图执行时,我们不会重新运行成功的节点。

待写入

此外,当图节点在给定的超级步骤中执行过程中失败时,LangGraph会存储在该超级步骤中成功完成的任何其他节点的挂起检查点写入,这样,无论何时我们从该超级步骤恢复图执行,我们都不会重新运行成功的节点。