Skip to main content
import ChatModelTabsPy from '/snippets/chat-model-tabs.mdx';
import ChatModelTabsJS from '/snippets/chat-model-tabs-js.mdx';

概述

在本教程中,您将学习如何构建一个智能体,该智能体可以使用LangChain 智能体回答有关SQL数据库的问题。 在较高层面上,智能体将:
1

Fetch the available tables and schemas from the database

2

Decide which tables are relevant to the question

3

Fetch the schemas for the relevant tables

4

Generate a query based on the question and information from the schemas

5

Double-check the query for common mistakes using an LLM

6

Execute the query and return the results

7

Correct mistakes surfaced by the database engine until the query is successful

8

Formulate a response based on the results

构建基于SQL数据库的问答系统需要执行由模型生成的SQL查询。这样做存在固有的风险。请确保您的数据库连接权限始终尽可能窄地针对智能体的需求进行设置。这将减轻,但不会消除,构建模型驱动系统的风险。

概念

我们将介绍以下概念:

安装

安装

npm i langchain @langchain/core typeorm sqlite3 zod

LangSmith

设置 LangSmith 以检查您链或智能体内部发生的情况。然后设置以下环境变量:
export LANGSMITH_TRACING="true"
export LANGSMITH_API_KEY="..."

1. 选择一个大型语言模型

选择支持工具调用的模型: 以下示例中的输出使用了 OpenAI。

2. 配置数据库

您将为本次教程创建一个 SQLite数据库。SQLite是一种轻量级数据库,易于设置和使用。我们将加载chinook数据库,这是一个代表数字媒体店的示例数据库。 为了方便,我们将数据库(Chinook.db)托管在公共GCS存储桶中。
import fs from "node:fs/promises";
import path from "node:path";

const url = "https://storage.googleapis.com/benchmarks-artifacts/chinook/Chinook.db";
const localPath = path.resolve("Chinook.db");

async function resolveDbPath() {
  if (await fs.exists(localPath)) {
    return localPath;
  }
  const resp = await fetch(url);
  if (!resp.ok) throw new Error(`Failed to download DB. Status code: ${resp.status}`);
  const buf = Buffer.from(await resp.arrayBuffer());
  await fs.writeFile(localPath, buf);
  return localPath;
}

3. 添加数据库交互工具

使用在 langchain/sql_db 中可用的 SqlDatabase 包装器与数据库进行交互。该包装器提供了一个简单的接口来执行 SQL 查询并获取结果:
import { SqlDatabase } from "@langchain/classic/sql_db";
import { DataSource } from "typeorm";

let db: SqlDatabase | undefined;
async function getDb() {
  if (!db) {
    const dbPath = await resolveDbFile();
    const datasource = new DataSource({ type: "sqlite", database: dbPath });
    db = await SqlDatabase.fromDataSourceParams({ appDataSource: datasource });
  }
  return db;
}

async function getSchema() {
  const db = await getDb();
  return await db.getTableInfo();
}

6. 实施人工审核循环

在执行智能体的SQL查询之前检查其查询,以避免任何未预期的行为或低效,这是一种谨慎的做法。 LangChain智能体支持内置的人机协同中间件,以便对智能体工具调用进行监督。让我们配置智能体,在调用sql_db_query工具时暂停以供人工审核:
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware 
from langgraph.checkpoint.memory import InMemorySaver 


agent = create_agent(
    model,
    tools,
    system_prompt=system_prompt,
    middleware=[ 
        HumanInTheLoopMiddleware( 
            interrupt_on={"sql_db_query": True}, 
            description_prefix="Tool execution pending approval", 
        ), 
    ], 
    checkpointer=InMemorySaver(), 
)
我们为智能体添加了一个检查点,允许执行暂停和恢复。有关此功能以及可用的中间件配置的详细信息,请参阅人工干预指南
在运行智能体后,它现在将暂停以供审查,然后再执行 sql_db_query 工具:
question = "Which genre on average has the longest tracks?"
config = {"configurable": {"thread_id": "1"}} 

for step in agent.stream(
    {"messages": [{"role": "user", "content": question}]},
    config, 
    stream_mode="values",
):
    if "messages" in step:
        step["messages"][-1].pretty_print()
    elif "__interrupt__" in step: 
        print("INTERRUPTED:") 
        interrupt = step["__interrupt__"][0] 
        for request in interrupt.value: 
            print(request["description"]) 
    else:
        pass
...

INTERRUPTED:
Tool execution pending approval

Tool: sql_db_query
Args: {'query': 'SELECT g.Name AS Genre, AVG(t.Milliseconds) AS AvgTrackLength FROM Track t JOIN Genre g ON t.GenreId = g.GenreId GROUP BY g.Name ORDER BY AvgTrackLength DESC LIMIT 1;'}
我们可以通过使用 命令 来恢复执行,在这种情况下接受查询:
from langgraph.types import Command 

for step in agent.stream(
    Command(resume=[{"type": "accept"}]), 
    config,
    stream_mode="values",
):
    if "messages" in step:
        step["messages"][-1].pretty_print()
    elif "__interrupt__" in step:
        print("INTERRUPTED:")
        interrupt = step["__interrupt__"][0]
        for request in interrupt.value:
            print(request["description"])
    else:
        pass
================================== Ai Message ==================================
Tool Calls:
  sql_db_query (call_7oz86Epg7lYRqi9rQHbZPS1U)
 Call ID: call_7oz86Epg7lYRqi9rQHbZPS1U
  Args:
    query: SELECT Genre.Name, AVG(Track.Milliseconds) AS AvgDuration FROM Track JOIN Genre ON Track.GenreId = Genre.GenreId GROUP BY Genre.Name ORDER BY AvgDuration DESC LIMIT 5;
================================= Tool Message =================================
Name: sql_db_query

[('Sci Fi & Fantasy', 2911783.0384615385), ('Science Fiction', 2625549.076923077), ('Drama', 2575283.78125), ('TV Shows', 2145041.0215053763), ('Comedy', 1585263.705882353)]
================================== Ai Message ==================================

The genre with the longest average track length is "Sci Fi & Fantasy" with an average duration of about 2,911,783 milliseconds, followed by "Science Fiction" and "Drama."
请参阅人机交互指南以获取详细信息。

4. 执行 SQL 查询

在运行命令之前,请检查 _safe_sql 中由 LLM 生成的命令。

const DENY_RE = /\b(INSERT|UPDATE|DELETE|ALTER|DROP|CREATE|REPLACE|TRUNCATE)\b/i;
const HAS_LIMIT_TAIL_RE = /\blimit\b\s+\d+(\s*,\s*\d+)?\s*;?\s*$/i;

function sanitizeSqlQuery(q) {
  let query = String(q ?? "").trim();

  // block multiple statements (allow one optional trailing ;)
  const semis = [...query].filter((c) => c === ";").length;
  if (semis > 1 || (query.endsWith(";") && query.slice(0, -1).includes(";"))) {
    throw new Error("multiple statements are not allowed.")
  }
  query = query.replace(/;+\s*$/g, "").trim();

  // read-only gate
  if (!query.toLowerCase().startsWith("select")) {
    throw new Error("Only SELECT statements are allowed")
  }
  if (DENY_RE.test(query)) {
    throw new Error("DML/DDL detected. Only read-only queries are permitted.")
  }

  // append LIMIT only if not already present
  if (!HAS_LIMIT_TAIL_RE.test(query)) {
    query += " LIMIT 5";
  }
  return query;
}

然后,使用来自 SQLDatabaserun 来使用 execute_sql 工具执行命令:
import { tool } from "langchain"
import * as z from "zod";

const executeSql = tool(
  async ({ query }) => {
    const q = sanitizeSqlQuery(query);
    try {
      const result = await db.run(q);
      return typeof result === "string" ? result : JSON.stringify(result, null, 2);
    } catch (e) {
      throw new Error(e?.message ?? String(e))
    }
  },
  {
    name: "execute_sql",
    description: "Execute a READ-ONLY SQLite SELECT query and return results.",
    schema: z.object({
      query: z.string().describe("SQLite SELECT query to execute (read-only)."),
    }),
  }
);

5. 使用 createAgent

使用 createAgent 以最少的代码构建一个 ReAct 智能体。智能体会解析请求并生成一个 SQL 命令。工具将检查命令的安全性,然后尝试执行该命令。如果命令存在错误,错误信息将被返回给模型。然后模型可以检查原始请求和新的错误信息,并生成一个新的命令。这个过程可以持续进行,直到 LLM 成功生成命令或达到结束计数。向模型提供反馈(在这种情况下为错误信息)的模式非常强大。 使用描述性系统提示初始化智能体以自定义其行为:
import { SystemMessage } from "langchain";

const getSystemPrompt = async () => new SystemMessage(`You are a careful SQLite analyst.

Authoritative schema (do not invent columns/tables):
${await getSchema()}

Rules:
- Think step-by-step.
- When you need data, call the tool \`execute_sql\` with ONE SELECT query.
- Read-only only; no INSERT/UPDATE/DELETE/ALTER/DROP/CREATE/REPLACE/TRUNCATE.
- Limit to 5 rows unless user explicitly asks otherwise.
- If the tool returns 'Error:', revise the SQL and try again.
- Limit the number of attempts to 5.
- If you are not successful after 5 attempts, return a note to the user.
- Prefer explicit column lists; avoid SELECT *.
`);
现在,创建一个包含模型、工具和提示的智能体:
import { createAgent } from "langchain";

const agent = createAgent({
  model: "openai:gpt-5",
  tools: [executeSql],
  systemPrompt: getSystemPrompt,
});

6. 运行智能体

在样本查询上运行智能体并观察其行为:
const question = "Which genre, on average, has the longest tracks?";
const stream = await agent.stream(
  { messages: [{ role: "user", content: question }] },
  { streamMode: "values" }
);
for await (const step of stream) {
  const message = step.messages.at(-1);
  console.log(`${message.role}: ${JSON.stringify(message.content, null, 2)}`);
}
human: Which genre, on average, has the longest tracks?
ai:
tool: [{"Genre":"Sci Fi & Fantasy","AvgMilliseconds":2911783.0384615385}]
ai: Sci Fi & Fantasy — average track length ≈ 48.5 minutes (about 2,911,783 ms).
智能体正确地编写了一个查询,检查了该查询,并运行它以告知其最终响应。
您可以检查上述运行的各个方面,包括采取的步骤、调用的工具、LLM 看到的提示等,更多详情请参阅 LangSmith 追踪

(可选)使用 Studio

工作室 提供了“客户端”循环以及内存,因此您可以将其作为聊天界面运行并查询数据库。您可以提出诸如“告诉我数据库的架构”或“显示前5位客户的发票”等问题。您将看到生成的SQL命令和结果输出。如何开始此操作的详细信息如下。
除了之前提到的包之外,您还需要:
npm i -g langgraph-cli@latest
在您将要运行的目录中,您需要一个包含以下内容的 langgraph.json 文件:
{
  "dependencies": ["."],
  "graphs": {
      "agent": "./sqlAgent.ts:agent",
      "graph": "./sqlAgentLanggraph.ts:graph"
  },
  "env": ".env"
}
import fs from "node:fs/promises";
import path from "node:path";
import { SqlDatabase } from "@langchain/classic/sql_db";
import { DataSource } from "typeorm";
import { SystemMessage, createAgent, tool } from "langchain"
import * as z from "zod";

const url = "https://storage.googleapis.com/benchmarks-artifacts/chinook/Chinook.db";
const localPath = path.resolve("Chinook.db");

async function resolveDbPath() {
  if (await fs.exists(localPath)) {
    return localPath;
  }
  const resp = await fetch(url);
  if (!resp.ok) throw new Error(`Failed to download DB. Status code: ${resp.status}`);
  const buf = Buffer.from(await resp.arrayBuffer());
  await fs.writeFile(localPath, buf);
  return localPath;
}

let db: SqlDatabase | undefined;
async function getDb() {
  if (!db) {
    const dbPath = await resolveDbPath();
    const datasource = new DataSource({ type: "sqlite", database: dbPath });
    db = await SqlDatabase.fromDataSourceParams({ appDataSource: datasource });
  }
  return db;
}

async function getSchema() {
  const db = await getDb();
  return await db.getTableInfo();
}

const DENY_RE = /\b(INSERT|UPDATE|DELETE|ALTER|DROP|CREATE|REPLACE|TRUNCATE)\b/i;
const HAS_LIMIT_TAIL_RE = /\blimit\b\s+\d+(\s*,\s*\d+)?\s*;?\s*$/i;

function sanitizeSqlQuery(q) {
  let query = String(q ?? "").trim();

  // block multiple statements (allow one optional trailing ;)
  const semis = [...query].filter((c) => c === ";").length;
  if (semis > 1 || (query.endsWith(";") && query.slice(0, -1).includes(";"))) {
    throw new Error("multiple statements are not allowed.")
  }
  query = query.replace(/;+\s*$/g, "").trim();

  // read-only gate
  if (!query.toLowerCase().startsWith("select")) {
    throw new Error("Only SELECT statements are allowed")
  }
  if (DENY_RE.test(query)) {
    throw new Error("DML/DDL detected. Only read-only queries are permitted.")
  }

  // append LIMIT only if not already present
  if (!HAS_LIMIT_TAIL_RE.test(query)) {
    query += " LIMIT 5";
  }
  return query;
}

const executeSql = tool(
  async ({ query }) => {
    const q = sanitizeSqlQuery(query);
    try {
      const result = await db.run(q);
      return typeof result === "string" ? result : JSON.stringify(result, null, 2);
    } catch (e) {
      throw new Error(e?.message ?? String(e))
    }
  },
  {
    name: "execute_sql",
    description: "Execute a READ-ONLY SQLite SELECT query and return results.",
    schema: z.object({
      query: z.string().describe("SQLite SELECT query to execute (read-only)."),
    }),
  }
);

const getSystemPrompt = async () => new SystemMessage(`You are a careful SQLite analyst.

Authoritative schema (do not invent columns/tables):
${await getSchema()}

Rules:
- Think step-by-step.
- When you need data, call the tool \`execute_sql\` with ONE SELECT query.
- Read-only only; no INSERT/UPDATE/DELETE/ALTER/DROP/CREATE/REPLACE/TRUNCATE.
- Limit to 5 rows unless user explicitly asks otherwise.
- If the tool returns 'Error:', revise the SQL and try again.
- Limit the number of attempts to 5.
- If you are not successful after 5 attempts, return a note to the user.
- Prefer explicit column lists; avoid SELECT *.
`);

export const agent = createAgent({
  model: "openai:gpt-5",
  tools: [executeSql],
  systemPrompt: getSystemPrompt,
});

下一步

为了进行更深入的定制,请查看本教程,了解如何直接使用LangGraph原语实现SQL智能体。