Skip to main content
LangGraph可以改变你对所构建智能体的看法。当你使用LangGraph构建智能体时,你首先将其分解成离散的步骤,称为节点。然后,你将为每个节点描述不同的决策和转换。最后,你将通过每个节点都可以读取和写入的共享状态将节点连接起来。在本教程中,我们将引导你通过LangGraph构建客户支持电子邮件智能体的思维过程。

从您想要自动化的流程开始

想象一下,你需要构建一个处理客户支持电子邮件的AI智能体。你的产品团队给出了以下要求: 智能体应:
  • 读取客户收件箱中的电子邮件
  • 根据紧急程度和主题进行分类
  • 搜索相关文档以回答问题
  • 起草适当的回复
  • 将复杂问题升级至人工智能体
  • 需要时安排后续跟进
示例场景处理:
  1. 简单的产品问题:“我如何重置我的密码?”
  2. 缺陷报告:“当我选择PDF格式时,导出功能崩溃了”
  3. 紧急账单问题:“我的订阅被重复收费了!”
  4. 功能请求:“你们能否为移动应用添加暗黑模式?”
  5. 复杂的技术问题:“我们的API集成偶尔会因504错误而失败”
在LangGraph中实现一个智能体,通常您会遵循相同的五个步骤。

步骤 1:将您的流程映射为离散步骤

首先,识别您流程中的不同步骤。每个步骤将变成一个节点(一个执行特定功能的函数)。然后绘制这些步骤如何相互连接的草图。 箭头显示了可能的路径,但实际选择哪条路径的决定发生在每个节点内部。 现在您已经识别了工作流程中的组件,让我们了解每个节点需要执行的操作:
  • 阅读邮件:提取并解析邮件内容
  • 分类意图:使用大型语言模型对紧急程度和主题进行分类,然后路由到相应的操作
  • 文档搜索:查询您的知识库以获取相关信息
  • 缺陷跟踪:在跟踪系统中创建或更新问题
  • 草稿回复:生成适当的回复
  • 人工审核:升级至人工智能体进行批准或处理
  • 发送回复:派发邮件响应
请注意,一些节点会决定下一步走向(分类意图、草拟回复、人工审核),而其他节点则始终进行到相同的下一步(阅读邮件总是走向分类意图,文档搜索总是走向草拟回复)。

步骤 2:确定每个步骤需要做什么

对于您图中的每个节点,确定它代表哪种操作以及它需要什么上下文才能正常工作。

LLM Steps

在需要理解、分析、生成文本或进行推理决策时使用

Data Steps

在需要从外部来源检索信息时使用

Action Steps

在需要执行外部操作时使用

User Input Steps

在需要人工干预时使用

LLM 步骤

当需要理解、分析、生成文本或进行推理决策的步骤时:
  • 静态上下文(提示):分类类别、紧急程度定义、响应格式
  • 动态上下文(来自状态):电子邮件内容、发件人信息
  • 预期结果:确定路由的结构化分类
  • 静态上下文(提示):语气指南、公司政策、响应模板
  • 动态上下文(来自状态):分类结果、搜索结果、客户历史
  • 预期结果:准备审查的专业电子邮件回复

数据步骤

当需要从外部源检索信息时:
  • 参数:由意图和主题构建的查询
  • 重试策略:是,对于短暂性故障使用指数退避
  • 缓存:可以缓存常见查询以减少API调用
  • 参数:来自状态的客户电子邮件或ID
  • 重试策略:是,但不可用时回退到基本信息
  • 缓存:是,带有生存时间以平衡新鲜度和性能

操作步骤

当步骤需要执行外部操作时:
  • 执行时机:在批准后(人工或自动化)
  • 重试策略:是,对于网络问题采用指数退避
  • 不应缓存:每次发送都是独特的操作
  • 执行时机:当意图为“bug”时始终执行
  • 重试策略:是,确保不丢失错误报告至关重要
  • 返回:包含在响应中的工单ID

用户输入步骤

当步骤需要人工干预时:
  • 决策背景:原始邮件、草稿回复、紧急程度、分类
  • 预期输入格式:审批布尔值加上可选的编辑后的回复
  • 触发条件:高度紧急、复杂问题或质量问题

步骤 3:设计你的状态

状态是您智能体中所有节点可访问的记忆。将其想象成您的智能体在处理过程中记录所学和所做决策的笔记本。

状态中应该包含什么?

请针对每份数据自问以下问题:

Include in State

是否需要在步骤间持久化?如果是,它将存储在状态中。

Don't Store

能否从其他数据中推导它?如果可以,则在需要时计算它,而不是将其存储在状态中。
为了我们的电子邮件智能体,我们需要跟踪:
  • 原始电子邮件和发件人信息(无法重建这些)
  • 分类结果(多个下游节点需要)
  • 搜索结果和客户数据(重新获取成本高昂)
  • 草稿回复(需要通过审查后持续存在)
  • 执行元数据(用于调试和恢复)

保持状态原始,按需格式化提示

一个关键原则:你的状态应该存储原始数据,而不是格式化文本。当你需要时,在节点内部格式化提示。
这个分离意味着:
  • 不同的节点可以根据自身需求以不同的方式格式化相同的数据
  • 您可以更改提示模板而不修改您的状态模式
  • 调试更清晰 - 您可以确切地看到每个节点接收到的数据
  • 您的智能体可以在不破坏现有状态的情况下进行进化
让我们定义我们的状态:
from typing import TypedDict, Literal

# Define the structure for email classification
class EmailClassification(TypedDict):
    intent: Literal["question", "bug", "billing", "feature", "complex"]
    urgency: Literal["low", "medium", "high", "critical"]
    topic: str
    summary: str

class EmailAgentState(TypedDict):
    # Raw email data
    email_content: str
    sender_email: str
    email_id: str

    # Classification result
    classification: EmailClassification | None

    # Raw search/API results
    search_results: list[str] | None  # List of raw document chunks
    customer_history: dict | None  # Raw customer data from CRM

    # Generated content
    draft_response: str | None
    messages: list[str] | None
请注意,状态中只包含原始数据 - 没有提示模板,没有格式化字符串,没有指令。分类输出以单个字典的形式存储,直接来自LLM。

步骤 4:构建您的节点

现在我们将每个步骤实现为一个函数。LangGraph中的一个节点只是一个Python函数,它接受当前状态并返回对其的更新。

适当地处理错误

不同的错误需要不同的处理策略:
错误类型谁来解决策略何时使用
临时错误(网络问题、速率限制)系统(自动)重试策略通常在重试后解决的临时故障
可由LLM恢复的错误(工具故障、解析问题)LLM在状态中存储错误并回环LLM可以看到错误并调整其方法
可由用户修复的错误(信息缺失、指示不明确)人类使用 interrupt() 暂停需要用户输入才能继续
意外错误开发者允许其冒泡需要调试的未知问题
添加重试策略以自动重试网络问题和速率限制:
from langgraph.types import RetryPolicy

workflow.add_node(
    "search_documentation",
    search_documentation,
    retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0)
)

实现我们的电子邮件智能体节点

我们将每个节点实现为一个简单的函数。记住:节点接收状态、执行操作并返回更新。
from typing import Literal
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command, RetryPolicy
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

llm = ChatOpenAI(model="gpt-4")

def read_email(state: EmailAgentState) -> dict:
    """Extract and parse email content"""
    # In production, this would connect to your email service
    return {
        "messages": [HumanMessage(content=f"Processing email: {state['email_content']}")]
    }

def classify_intent(state: EmailAgentState) -> Command[Literal["search_documentation", "human_review", "draft_response", "bug_tracking"]]:
    """Use LLM to classify email intent and urgency, then route accordingly"""

    # Create structured LLM that returns EmailClassification dict
    structured_llm = llm.with_structured_output(EmailClassification)

    # Format the prompt on-demand, not stored in state
    classification_prompt = f"""
    Analyze this customer email and classify it:

    Email: {state['email_content']}
    From: {state['sender_email']}

    Provide classification including intent, urgency, topic, and summary.
    """

    # Get structured response directly as dict
    classification = structured_llm.invoke(classification_prompt)

    # Determine next node based on classification
    if classification['intent'] == 'billing' or classification['urgency'] == 'critical':
        goto = "human_review"
    elif classification['intent'] in ['question', 'feature']:
        goto = "search_documentation"
    elif classification['intent'] == 'bug':
        goto = "bug_tracking"
    else:
        goto = "draft_response"

    # Store classification as a single dict in state
    return Command(
        update={"classification": classification},
        goto=goto
    )
def search_documentation(state: EmailAgentState) -> Command[Literal["draft_response"]]:
    """Search knowledge base for relevant information"""

    # Build search query from classification
    classification = state.get('classification', {})
    query = f"{classification.get('intent', '')} {classification.get('topic', '')}"

    try:
        # Implement your search logic here
        # Store raw search results, not formatted text
        search_results = [
            "Reset password via Settings > Security > Change Password",
            "Password must be at least 12 characters",
            "Include uppercase, lowercase, numbers, and symbols"
        ]
    except SearchAPIError as e:
        # For recoverable search errors, store error and continue
        search_results = [f"Search temporarily unavailable: {str(e)}"]

    return Command(
        update={"search_results": search_results},  # Store raw results or error
        goto="draft_response"
    )

def bug_tracking(state: EmailAgentState) -> Command[Literal["draft_response"]]:
    """Create or update bug tracking ticket"""

    # Create ticket in your bug tracking system
    ticket_id = "BUG-12345"  # Would be created via API

    return Command(
        update={
            "search_results": [f"Bug ticket {ticket_id} created"],
            "current_step": "bug_tracked"
        },
        goto="draft_response"
    )
def draft_response(state: EmailAgentState) -> Command[Literal["human_review", "send_reply"]]:
    """Generate response using context and route based on quality"""

    classification = state.get('classification', {})

    # Format context from raw state data on-demand
    context_sections = []

    if state.get('search_results'):
        # Format search results for the prompt
        formatted_docs = "\n".join([f"- {doc}" for doc in state['search_results']])
        context_sections.append(f"Relevant documentation:\n{formatted_docs}")

    if state.get('customer_history'):
        # Format customer data for the prompt
        context_sections.append(f"Customer tier: {state['customer_history'].get('tier', 'standard')}")

    # Build the prompt with formatted context
    draft_prompt = f"""
    Draft a response to this customer email:
    {state['email_content']}

    Email intent: {classification.get('intent', 'unknown')}
    Urgency level: {classification.get('urgency', 'medium')}

    {chr(10).join(context_sections)}

    Guidelines:
    - Be professional and helpful
    - Address their specific concern
    - Use the provided documentation when relevant
    """

    response = llm.invoke(draft_prompt)

    # Determine if human review needed based on urgency and intent
    needs_review = (
        classification.get('urgency') in ['high', 'critical'] or
        classification.get('intent') == 'complex'
    )

    # Route to appropriate next node
    goto = "human_review" if needs_review else "send_reply"

    return Command(
        update={"draft_response": response.content},  # Store only the raw response
        goto=goto
    )

def human_review(state: EmailAgentState) -> Command[Literal["send_reply", END]]:
    """Pause for human review using interrupt and route based on decision"""

    classification = state.get('classification', {})
    
    # interrupt() must come first - any code before it will re-run on resume
    human_decision = interrupt({
        "email_id": state.get('email_id',''),
        "original_email": state.get('email_content',''),
        "draft_response": state.get('draft_response',''),
        "urgency": classification.get('urgency'),
        "intent": classification.get('intent'),
        "action": "Please review and approve/edit this response"
    })

    # Now process the human's decision
    if human_decision.get("approved"):
        return Command(
            update={"draft_response": human_decision.get("edited_response", state.get('draft_response',''))},
            goto="send_reply"
        )
    else:
        # Rejection means human will handle directly
        return Command(update={}, goto=END)

def send_reply(state: EmailAgentState) -> dict:
    """Send the email response"""
    # Integrate with email service
    print(f"Sending reply: {state['draft_response'][:100]}...")
    return {}

步骤 5:连接起来

现在我们将我们的节点连接成一个工作图。由于我们的节点处理自己的路由决策,我们只需要少量基本边即可。 要启用 interrupt() 中的 人机交互,我们需要使用 检查点器 编译以在运行之间保存状态:

Graph compilation code

from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import RetryPolicy

# Create the graph
workflow = StateGraph(EmailAgentState)

# Add nodes with appropriate error handling
workflow.add_node("read_email", read_email)
workflow.add_node("classify_intent", classify_intent)

# Add retry policy for nodes that might have transient failures
workflow.add_node(
    "search_documentation",
    search_documentation,
    retry_policy=RetryPolicy(max_attempts=3)
)
workflow.add_node("bug_tracking", bug_tracking)
workflow.add_node("draft_response", draft_response)
workflow.add_node("human_review", human_review)
workflow.add_node("send_reply", send_reply)

# Add only the essential edges
workflow.add_edge(START, "read_email")
workflow.add_edge("read_email", "classify_intent")
workflow.add_edge("send_reply", END)

# Compile with checkpointer for persistence, in case run graph with Local_Server --> Please compile without checkpointer
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
图结构是最小的,因为路由是通过节点内部的 Command 对象进行的。每个节点通过像 Command[Literal["node1", "node2"]] 这样的类型提示来声明它可以前往的位置,使得流程明确且可追踪。

尝试使用您的智能体

让我们运行我们的智能体,处理一个需要人工审核的紧急账单问题:
# Test with an urgent billing issue
initial_state = {
    "email_content": "I was charged twice for my subscription! This is urgent!",
    "sender_email": "customer@example.com",
    "email_id": "email_123",
    "messages": []
}

# Run with a thread_id for persistence
config = {"configurable": {"thread_id": "customer_123"}}
result = app.invoke(initial_state, config)
# The graph will pause at human_review
print(f"Draft ready for review: {result['draft_response'][:100]}...")

# When ready, provide human input to resume
from langgraph.types import Command

human_response = Command(
    resume={
        "approved": True,
        "edited_response": "We sincerely apologize for the double charge. I've initiated an immediate refund..."
    }
)

# Resume execution
final_result = app.invoke(human_response, config)
print(f"Email sent successfully!")
图在遇到 interrupt() 时暂停,将所有内容保存到检查点,并等待。它可以在几天后恢复,从上次停止的地方继续。thread_id 确保这次对话的所有状态都一起保存。

摘要及下一步行动

关键见解

构建这个电子邮件智能体让我们领略了LangGraph的思维方式:

Break into discrete steps

每个节点都擅长一项任务。这种分解使得可以流式传输进度更新,实现可暂停和恢复的持久执行,以及清晰的调试,因为您可以在步骤之间检查状态。

State is shared memory

存储原始数据,而非格式化文本。这使得不同的节点可以以不同的方式使用相同的信息。

Nodes are functions

它们接受状态、执行工作并返回更新。当它们需要做出路由决策时,它们会指定状态更新和下一个目的地。

Errors are part of the flow

临时故障会进行重试,LLM可恢复的错误会带上下文回环,用户可修复的问题会暂停以等待输入,意外错误会冒泡以供调试。

Human input is first-class

interrupt() 函数会无限期暂停执行,保存所有状态,并在您提供输入时从上次停止的地方恢复执行。当与其他节点操作结合使用时,它必须首先执行。

Graph structure emerges naturally

您定义了必要的连接,您的节点处理自己的路由逻辑。这使控制流明确且可追踪 - 您可以通过查看当前节点始终了解您的智能体接下来会做什么。

高级考虑因素

本节探讨了节点粒度设计的权衡。大多数应用程序可以跳过这部分,并使用上面展示的模式。
你可能想知道:为什么要把 Read EmailClassify Intent 合并成一个节点呢?或者为什么要把文档搜索和草稿回复分开呢?答案涉及在弹性和可观察性之间的权衡。弹性考虑因素: LangGraph的持久执行在节点边界创建检查点。当工作流在中断或失败后恢复时,它从执行停止的节点开始。节点越小,检查点出现得越频繁,这意味着如果出现问题,需要重复的工作就越少。如果您将多个操作组合成一个大的节点,那么接近末尾的失败意味着需要从该节点的开始重新执行一切。为什么我们选择了这种拆分方式用于电子邮件智能体:
  • 外部服务的隔离: 文档搜索和错误跟踪是独立的节点,因为它们调用外部API。如果搜索服务速度慢或失败,我们希望将其与LLM调用隔离开。我们可以为这些特定节点添加重试策略,而不会影响其他节点。
  • 中间可见性:Classify Intent 作为独立节点,让我们能够检查 LLM 在采取行动之前所做的决定。这对于调试和监控非常有价值——您可以清楚地看到智能体何时以及为何路由到人工审核。
  • 不同的故障模式: LLM 调用、数据库查找和电子邮件发送有不同的重试策略。独立的节点允许您独立配置这些策略。
  • 可重用性和测试: 节点越小,越容易在隔离状态下进行测试,并在其他工作流程中重用。
一种不同的有效方法:您可以将 Read EmailClassify Intent 合并成一个单独的节点。这样,您将失去在分类之前检查原始电子邮件的能力,并且在该节点失败时将重复执行这两个操作。对于大多数应用程序来说,单独节点带来的可观察性和调试优势值得这种权衡。应用级关注点:第2步(是否缓存搜索结果)中的缓存讨论是一个应用级决策,而不是LangGraph框架的功能。您可以根据具体需求在节点函数中实现缓存——LangGraph不对此做出规定。性能考虑:节点数量增加并不意味着执行速度变慢。LangGraph默认以异步持久化模式(异步持久化模式)在后台写入检查点,因此您的图继续运行,无需等待检查点完成。这意味着您将获得频繁的检查点,同时性能影响最小。如果需要,您可以调整此行为——使用 "exit" 模式仅在完成时进行检查点,或使用 "sync" 模式阻塞执行,直到每个检查点写入完成。

从这里去哪里

这是关于使用LangGraph构建智能体的思考介绍。您可以通过以下方式扩展这个基础:

Human-in-the-loop patterns

学习如何在执行前添加工具审批、批量审批以及其他模式
创建用于复杂多步操作的子图

Streaming

为用户添加流式传输以显示实时进度

Observability

使用LangSmith添加可观察性以进行调试和监控

Tool Integration

集成更多用于网络搜索、数据库查询和API调用的工具

Retry Logic

实现失败操作的指数退避重试逻辑