LangGraph 工作流编排:从入门到实战

LangGraph 工作流编排:从入门到实战

说在前面:这篇文章是我这几个月用 LangGraph 踩坑后总结的。如果你正在选型 Agent 框架,或者被状态管理搞得头大,应该能帮到你。

封面图:
LangGraph 状态图工作流


为什么需要工作流编排

上个月我接了个客服机器人的活。需求听起来挺简单:用户提问 → 理解意图 → 查知识库 → 回复。结果干到一半发现不对劲了:

  • 状态管理混乱:对话历史、用户信息、中间结果到处乱放,改个 bug 得追 5 个文件
  • 流程控制复杂:什么时候转人工?怎么处理用户追问?代码越写越乱
  • 错误恢复困难:知识库 API 挂了,重头来还是从断点续上?
  • 调试无从下手:用户投诉机器人乱回复,但根本不知道它当时咋想的

之前用的 LangChain Chains 只能搞搞线性流程,一上条件分支和循环,代码就难以维护。

LangGraph 的解决方案

LangGraph 是 LangChain 团队出的低级别 Agent 编排框架,灵感来自 Google 的 Pregel 和 Apache Beam。

用下来感觉这几个点挺香:

特性 解决的问题 实际价值
状态图(StateGraph) 显式定义工作流结构 流程画出来就能看懂,调试方便
状态管理(State) 统一的状态存储和更新 不用自己维护状态变量了
持久化执行 长时间运行的任务 服务重启后能从断点恢复
人类介入 关键决策需要人工审核 生产环境必备,出问题能兜底
可视化调试 复杂的 Agent 行为 LangSmith 上能看到执行路径

本文内容

这篇文章会带你从零开始掌握 LangGraph:

  1. 核心概念:StateGraph、Node、Edge、State 是啥
  2. 实战演练:手把手写一个客服机器人
  3. 进阶技巧:条件路由、循环、人工审核怎么用
  4. 生产实践:持久化、调试、性能优化

2. LangGraph 核心概念

什么是 LangGraph

LangGraph 是个基于状态图的工作流编排引擎。理解起来就一个公式:

状态图 = 节点(Node) + 边(Edge) + 状态(State)
  • 节点:干活的函数,比如调 LLM、查数据库、发请求
  • :规定节点执行的先后顺序
  • 状态:在整个流程里传来传去的数据

StateGraph:状态图

StateGraph 是 LangGraph 的核心类,用来定义工作流的结构:

from langgraph.graph import StateGraph

# 1. 定义状态结构
class AgentState(TypedDict):
    messages: list
    current_step: str
    result: any

# 2. 创建状态图
graph = StateGraph(AgentState)

Node:节点

节点就是执行具体任务的函数。每个节点接收当前状态,返回更新:

def call_llm(state: AgentState) -> dict:
    """调用 LLM 生成回复"""
    messages = state['messages']
    response = llm.invoke(messages)

    # 返回状态更新(只返回需要改的部分)
    return {
        'messages': state['messages'] + [response],
        'current_step': 'completed'
    }

# 将函数添加为节点
graph.add_node("llm_node", call_llm)

Edge:边

边规定节点之间的执行顺序。有两种:

普通边:无条件执行下一个节点

from langgraph.graph import START, END

graph.add_edge(START, "llm_node")      # 从起点到 llm_node
graph.add_edge("llm_node", END)        # 从 llm_node 到终点

条件边:根据条件决定走哪条路

from langgraph.graph import ConditionalEdges

def route(state: AgentState) -> str:
    """根据状态决定下一步"""
    if state['current_step'] == 'need_human_review':
        return "human_review"
    else:
        return "send_response"

graph.add_conditional_edges(
    "llm_node",           # 从哪个节点出发
    route,                # 路由函数
    {
        "human_review": "human_review_node",
        "send_response": "send_node"
    }
)

State:状态管理

LangGraph 状态管理模式

状态是整个工作流的"记忆"。LangGraph 用不可变更新模式:

from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages

class AgentState(TypedDict):
    # 消息列表:使用 add_messages 合并新旧消息
    messages: Annotated[list, add_messages]

    # 普通字段:新值覆盖旧值
    current_step: str
    retry_count: int

    # 列表字段:追加模式
    history: list

这里有个坑:messages 字段如果不加 Annotatedadd_messages,每次更新会覆盖之前的消息。加上之后会自动合并。


3. 环境准备

Python 环境要求

  • Python 3.9+
  • pip 或 uv 包管理器(推荐 uv,快很多)

安装 LangGraph

# 使用 pip
pip install -U langgraph

# 或使用 uv(更快)
uv add langgraph

第一个 Hello World

直接上代码,跑通这个你就入门了:

from langgraph.graph import StateGraph, MessagesState, START, END

# 定义节点函数
def mock_llm(state: MessagesState):
    return {"messages": [{"role": "ai", "content": "hello world"}]}

# 创建状态图
graph = StateGraph(MessagesState)
graph.add_node("llm", mock_llm)
graph.add_edge(START, "llm")
graph.add_edge("llm", END)

# 编译
compiled_graph = graph.compile()

# 运行
result = compiled_graph.invoke({
    "messages": [{"role": "user", "content": "hi!"}]
})

print(result)
# 输出:{'messages': [
#   {'role': 'user', 'content': 'hi!'},
#   {'role': 'ai', 'content': 'hello world'}
# ]}

就 4 步:
1. 定义状态(用内置的 MessagesState
2. 创建节点(mock_llm 函数)
3. 连接边(START → llm → END)
4. 编译并运行


4. 构建第一个工作流

现在来写个实用的客服机器人。这个例子是我之前项目里简化出来的,可以直接用。

场景描述

用户提问 → 意图分类 → 分支处理:
- 简单问题:直接回答
- 技术问题:查知识库
- 投诉建议:转人工

步骤 1:定义状态

from typing import TypedDict, Literal, Annotated
from langgraph.graph.message import add_messages

class AgentState(TypedDict):
    # 对话历史
    messages: Annotated[list, add_messages]

    # 意图分类结果
    intent: Literal["general", "technical", "complaint"]

    # 处理结果
    result: str

    # 是否需要人工介入
    need_human: bool

步骤 2:创建节点

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage

llm = ChatOpenAI(model="gpt-4o-mini")

def classify_intent(state: AgentState) -> dict:
    """意图分类节点"""
    messages = state['messages']

    # 使用 LLM 分类
    system_prompt = """你是一个客服意图分类器。
    将用户问题分类为:
    - general: 一般咨询
    - technical: 技术问题
    - complaint: 投诉建议

    只返回分类标签,不要其他内容。"""

    response = llm.invoke([
        {"role": "system", "content": system_prompt},
        *messages
    ])

    intent = response.content.strip().lower()
    if intent not in ["general", "technical", "complaint"]:
        intent = "general"

    return {"intent": intent}

def handle_general(state: AgentState) -> dict:
    """处理一般咨询"""
    messages = state['messages']
    response = llm.invoke(messages)
    return {"result": response.content, "need_human": False}

def handle_technical(state: AgentState) -> dict:
    """处理技术问题:查询知识库"""
    # 模拟知识库查询
    knowledge_base = {
        "api": "API 文档:https://api.example.com/docs",
        "sdk": "SDK 下载:https://github.com/example/sdk",
        "pricing": "价格说明:https://example.com/pricing"
    }

    # 简单关键词匹配(实际项目应该用向量检索)
    last_message = state['messages'][-1].content.lower()
    result = "未找到相关信息"

    for key, value in knowledge_base.items():
        if key in last_message:
            result = value
            break

    return {"result": result, "need_human": False}

def handle_complaint(state: AgentState) -> dict:
    """处理投诉:转人工"""
    return {
        "result": "已转接人工客服,请稍候",
        "need_human": True
    }

def human_review(state: AgentState) -> dict:
    """人工审核节点(模拟)"""
    # 实际项目中这里会等待人工输入
    print(f"需要人工处理:{state['messages'][-1].content}")
    return {"result": "人工客服已介入"}

步骤 3:定义路由函数

def route_by_intent(state: AgentState) -> str:
    """根据意图路由到不同处理节点"""
    intent = state['intent']

    if intent == "general":
        return "handle_general"
    elif intent == "technical":
        return "handle_technical"
    elif intent == "complaint":
        return "handle_complaint"
    else:
        return "handle_general"

步骤 4:组装状态图

from langgraph.graph import StateGraph, START, END

# 创建状态图
graph = StateGraph(AgentState)

# 添加所有节点
graph.add_node("classify_intent", classify_intent)
graph.add_node("handle_general", handle_general)
graph.add_node("handle_technical", handle_technical)
graph.add_node("handle_complaint", handle_complaint)
graph.add_node("human_review", human_review)

# 添加边
graph.add_edge(START, "classify_intent")

# 条件边:根据意图路由
graph.add_conditional_edges(
    "classify_intent",
    route_by_intent,
    {
        "handle_general": "handle_general",
        "handle_technical": "handle_technical",
        "handle_complaint": "handle_complaint"
    }
)

# 普通边:处理完成后检查是否需要人工介入
def check_human_needed(state: AgentState) -> str:
    if state['need_human']:
        return "human_review"
    else:
        return "end"

graph.add_conditional_edges(
    "handle_general",
    check_human_needed,
    {
        "human_review": "human_review",
        "end": END
    }
)

graph.add_conditional_edges(
    "handle_technical",
    check_human_needed,
    {
        "human_review": "human_review",
        "end": END
    }
)

graph.add_conditional_edges(
    "handle_complaint",
    check_human_needed,
    {
        "human_review": "human_review",
        "end": END
    }
)

# 人工审核后结束
graph.add_edge("human_review", END)

# 编译
app = graph.compile()

步骤 5:运行工作流

# 测试一般咨询
result = app.invoke({
    "messages": [{"role": "user", "content": "你们公司几点下班?"}]
})
print(f"意图:{result['intent']}")
print(f"回复:{result['result']}")

# 测试技术问题
result = app.invoke({
    "messages": [{"role": "user", "content": "API 文档在哪里?"}]
})
print(f"意图:{result['intent']}")
print(f"回复:{result['result']}")

# 测试投诉
result = app.invoke({
    "messages": [{"role": "user", "content": "你们的服务太差了!"}]
})
print(f"意图:{result['intent']}")
print(f"回复:{result['result']}")

跑起来应该能看到不同问题走了不同的处理路径。


5. 进阶:条件边和循环

条件路由模式

上面的例子是单跳路由(classify → handle → end)。实际项目中经常需要多跳路由,比如多轮对话:

# 多轮对话场景
def should_continue(state: AgentState) -> str:
    """判断是否需要继续对话"""
    messages = state['messages']
    last_message = messages[-1].content

    # 如果用户说"谢谢"、"好了"等,结束对话
    if any(word in last_message for word in ["谢谢", "好了", "再见"]):
        return END
    else:
        return "classify_intent"  # 继续处理

graph.add_edge("handle_general", "should_continue")

循环控制

LangGraph 会自动检测循环,但你得给退出条件:

# ✅ 正确的循环:有明确的退出条件
def retry_logic(state: AgentState) -> str:
    if state['retry_count'] < 3:
        return "retry_node"
    else:
        return "fail_node"

# ❌ 危险的循环:可能无限循环
def bad_loop(state: AgentState) -> str:
    return "previous_node"  # 没有退出条件!

我遇到过一次无限循环的坑,调试了半天才发现路由函数没写退出条件。

实际案例:带重试的 API 调用

import time
import requests

def call_external_api(state: AgentState) -> dict:
    """调用外部 API,失败时重试"""
    retry_count = state.get('retry_count', 0)

    try:
        response = requests.get("https://api.example.com/data", timeout=5)
        response.raise_for_status()
        return {
            "result": response.json(),
            "retry_count": 0  # 成功后重置计数
        }
    except Exception as e:
        if retry_count < 3:
            return {
                "retry_count": retry_count + 1,
                "result": f"重试中... ({retry_count + 1}/3)"
            }
        else:
            return {
                "result": f"API 调用失败:{str(e)}",
                "need_human": True
            }

def decide_retry(state: AgentState) -> str:
    """决定是否重试"""
    if "重试中" in state.get('result', ''):
        return "call_external_api"
    elif "失败" in state.get('result', ''):
        return "human_review"
    else:
        return END

graph.add_node("call_external_api", call_external_api)
graph.add_conditional_edges("call_external_api", decide_retry)

这个模式我用了挺多次的,第三方 API 不稳定时特别有用。


6. 高级特性

人类介入(Human-in-the-loop)

生产环境里,有些决策必须人工审核。LangGraph 提供了中断机制

from langgraph.graph import Interrupt

def human_approval(state: AgentState) -> dict:
    """需要人工批准的节点"""
    # 触发中断,等待人工输入
    approval = Interrupt(
        value={
            "question": "是否允许执行此操作?",
            "context": state['result']
        }
    )

    # 当人工批准后,继续执行
    if approval.resume_value:
        return {"result": "已批准,继续执行"}
    else:
        return {"result": "已拒绝"}

使用 LangSmith 进行人工审核

# 编译时启用检查点(用于中断恢复)
app = graph.compile(checkpointer=MemorySaver())

# 运行配置
config = {"configurable": {"thread_id": "conversation_123"}}

# 第一次运行:会在 human_approval 节点中断
result = app.invoke(state, config)

# 人工审核后恢复
app.invoke(None, config, interrupt_resume={"approved": True})

这个功能在客服场景特别有用,遇到敏感问题先让人工看一下。

持久化执行

长时间运行的任务(比如等用户回复)需要持久化:

from langgraph.checkpoint.sqlite import SqliteSaver

# 使用 SQLite 持久化
with SqliteSaver.from_conn_string("checkpoint.db") as saver:
    app = graph.compile(checkpointer=saver)

    # 即使服务重启,也可以从断点恢复
    result = app.invoke(state, config)

生产环境建议用 PostgreSQL 或 Redis,SQLite 适合本地测试。

内存管理

LangGraph 支持两种内存:

短期记忆:当前会话的状态(通过 State 管理)

长期记忆:跨会话的记忆(需要外部存储)

class AgentState(TypedDict):
    # 短期记忆:当前对话
    messages: Annotated[list, add_messages]

    # 长期记忆:用户偏好(从外部加载)
    user_preferences: dict

    # 工作记忆:中间结果
    working_memory: dict

def load_long_term_memory(state: AgentState) -> dict:
    """从数据库加载用户长期记忆"""
    user_id = get_current_user_id()
    preferences = db.query("SELECT * FROM preferences WHERE user_id = ?", user_id)
    return {"user_preferences": preferences}

长期记忆这块我一般用 Redis,读写快,还能设过期时间。


7. 常见问题

问题 1:状态更新被覆盖

现象:某个节点的更新在后续节点中丢失

原因:没正确使用 Annotated 和 Reducer

解决方案

# ❌ 错误:直接覆盖
class AgentState(TypedDict):
    messages: list  # 新值会覆盖旧值

# ✅ 正确:使用 add_messages 合并
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]  # 自动合并

这个坑我踩过,消息传着传着就没了,后来发现是没加 Annotated

问题 2:循环检测错误

现象:LangGraph 报错"循环检测到"

原因:条件路由没写退出条件

解决方案

# 添加最大重试次数
class AgentState(TypedDict):
    retry_count: int

def route_with_limit(state: AgentState) -> str:
    if state['retry_count'] >= 3:
        return "error_handler"  # 退出循环
    return "retry_node"

问题 3:调试困难

现象:不知道工作流为什么走了某条路径

解决方案:用 LangSmith 可视化调试

# 安装 LangSmith
pip install langsmith

# 设置 API Key
export LANGCHAIN_API_KEY="your-key"
export LANGCHAIN_TRACING_V2="true"
export LANGCHAIN_PROJECT="my-langgraph-project"

# 运行后在 https://smith.langchain.com 查看执行轨迹

LangSmith 能看到每一步的输入输出,调试起来方便多了。强烈建议开 tracing。


8. 完整示例

代码仓库

完整示例代码已上传到 GitHub:

https://github.com/example/langgraph-tutorial

目录结构

langgraph-tutorial/
├── basic_hello.py          # Hello World 示例
├── customer_service.py     # 客服机器人
├── retry_pattern.py        # 重试模式
├── human_in_loop.py        # 人类介入
└── requirements.txt        # 依赖

运行说明

# 克隆仓库
git clone https://github.com/example/langgraph-tutorial
cd langgraph-tutorial

# 安装依赖
pip install -r requirements.txt

# 设置环境变量
export OPENAI_API_KEY="your-key"

# 运行示例
python customer_service.py

9. 总结

核心要点回顾

  1. StateGraph 是核心:节点 + 边 + 状态 = 完整工作流
  2. 状态管理要谨慎:用 Annotated 和 Reducer 避免覆盖
  3. 条件路由要清晰:路由函数和退出条件都得写明白
  4. 人类介入是必须:生产环境关键决策得有人兜底
  5. 调试用 LangSmith:可视化执行轨迹,少花时间猜

后续学习建议

入门之后
- 官方文档:https://docs.langchain.com/oss/python/langgraph
- LangGraph Cloud:生产级部署
- LangSmith:监控和调试

进阶方向
- 多 Agent 协作:多个 StateGraph 一起干活
- 自定义 Checkpointer:Redis/PostgreSQL 持久化
- 性能优化:并行节点、流式响应

实战项目
- 智能客服系统
- 自动化数据分析流水线
- 多步骤内容生成工作流


参考资料


作者:戴蒙
最后更新:2026-02-26
字数:约 5200 字
阅读时间:15-20 分钟

写在后面:这篇文章的代码都在 GitHub 上,遇到问题可以提 issue。还有什么想看的主题,评论区告诉我。


10. 实战心得:LangGraph 踩坑记录

踩坑 1:状态更新被覆盖

问题:消息传着传着就没了

原因:没加 Annotatedadd_messages

# ❌ 错误写法
class AgentState(TypedDict):
    messages: list  # 新值覆盖旧值

# ✅ 正确写法
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]  # 自动合并

这个坑我踩了两次,调试了半天才发现消息被覆盖了。

踩坑 2:无限循环

问题:工作流卡住不动

原因:条件路由没写退出条件

# ❌ 错误:没有退出条件
def route(state):
    return "previous_node"

# ✅ 正确:设置最大重试次数
def route(state):
    if state['retry_count'] >= 3:
        return "error_handler"
    return "retry_node"

踩坑 3:LangSmith 没开 Tracing

问题:出 bug 不知道哪一步错了

解决

export LANGCHAIN_TRACING_V2="true"
export LANGCHAIN_API_KEY="your-key"

开了 Tracing 后在 LangSmith 上能看到每一步的输入输出,调试效率提升 10 倍。强烈建议开。

性能数据

用 LangGraph 重构客服机器人后的数据:

指标 重构前 重构后 提升
开发时间 3 天 1 天 67%
Bug 数量 15 个 3 个 80%
代码行数 800 行 500 行 37%
调试时间 2 小时/bug 20 分钟/bug 83%

如果重来,我会怎么做

  1. 第一天就上 LangSmith,别等出 bug 再开
  2. 状态设计想清楚,哪些字段需要合并,哪些直接覆盖
  3. 路由函数写测试,特别是退出条件
  4. 文档写详细,不然队友看不懂