开篇:从"Demo"到"能用的产品"有多远

2025 年,我见过太多这样的场景:

团队兴奋地宣布"我们做了一个 AI Agent!",然后 Demo 演示完美——Agent 流畅地聊天、调用工具、完成任务。所有人都在鼓掌。

三个月后,这个 Agent 被悄悄下线了。

原因?它在线上环境里 30% 的请求会返回奇怪的错误,5% 的对话会陷入死循环,用户投诉"它根本不理解我在说什么"。

这不是某个团队的问题。这是 AI Agent 从 Demo 到生产环境的普遍Gap。

这篇文章,我打算认真聊一下如何在生产环境中构建真正可用的 AI Agent。不是理论,不是"最佳实践清单",而是基于真实踩坑经验的结构化方法。


一、AI Agent 在生产环境中的核心挑战

在讨论具体设计之前,我们需要先理解为什么 Demo 环境和生产环境的差距如此之大。

1.1 挑战一:不确定性

LLM 的输出本质上是概率性的。同一个 prompt,两次调用可能产生完全不同的输出。这在 Demo 阶段不是问题——你可以选那个最好的结果展示。但生产环境需要处理所有可能的结果。

# Demo 阶段:只展示成功案例
def demo_response(user_input: str) -> str:
    response = llm.generate(user_input)
    print(f"Perfect response: {response}")  # 选择性展示

# 生产环境:必须处理所有可能的输出
def production_response(user_input: str) -> str:
    response = llm.generate(user_input)

    # 1. 是否为空?
    if not response:
        return "抱歉,我没有得到有效的响应,请重试。"

    # 2. 是否包含乱码或格式错误?
    if is_malformed(response):
        return "抱歉,响应格式异常,请重试。"

    # 3. 是否越界(说了一些不该说的)?
    if is_out_of_bounds(response):
        return "抱歉,这个问题我无法回答。"

    # 4. 是否调用了不该调用的工具?
    if has_invalid_tool_calls(response):
        return "抱歉,我遇到了一个问题,请重新描述您的需求。"

    # 只有通过所有检查才返回
    return response

Demo 阶段,你展示的是"它能做什么";生产阶段,你必须处理"它不能做什么时怎么办"。

1.2 挑战二:对话状态的复杂性

Demo 的对话通常是短期的、单轮的、或简单多轮。但生产环境的对话可能:

  • 持续数小时甚至数天
  • 包含大量历史上下文
  • 中途被打断,然后恢复
  • 需要跨会话记住用户偏好
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum

class ConversationState(Enum):
    INITIAL = "initial"
    IN_PROGRESS = "in_progress"
    WAITING_FOR_INPUT = "waiting_for_input"
    COMPLETED = "completed"
    FAILED = "failed"
    EXPIRED = "expired"


@dataclass
class ConversationContext:
    """对话上下文:完整记录一次会话的状态"""

    conversation_id: str
    user_id: str
    created_at: datetime
    last_updated: datetime
    state: ConversationState

    # 对话历史(用于 LLM 输入)
    messages: List[Dict] = field(default_factory=list)

    # 用户偏好(跨会话持久化)
    user_preferences: Dict = field(default_factory=dict)

    # 当前任务状态(如果有正在进行的多步骤任务)
    current_task: Optional[Dict] = None

    # 元数据
    metadata: Dict = field(default_factory=dict)

    def add_message(self, role: str, content: str):
        """添加消息到历史"""
        self.messages.append({
            "role": role,
            "content": content,
            "timestamp": datetime.now().isoformat()
        })
        self.last_updated = datetime.now()

    def truncate_history(self, max_messages: int = 50):
        """截断过长的历史,防止 token 溢出"""
        if len(self.messages) > max_messages:
            # 保留最新的消息和系统提示
            system_messages = [m for m in self.messages if m["role"] == "system"]
            other_messages = [m for m in self.messages if m["role"] != "system"]

            # 保留最近的 max_messages - len(system_messages) 条
            kept_messages = other_messages[-max_messages + len(system_messages):]
            self.messages = system_messages + kept_messages

    def is_expired(self, ttl_hours: int = 24) -> bool:
        """检查对话是否过期"""
        if self.state == ConversationState.COMPLETED:
            return True

        elapsed = datetime.now() - self.last_updated
        return elapsed.total_seconds() > (ttl_hours * 3600)

1.3 挑战三:工具调用的可靠性

AI Agent 的核心能力是调用外部工具。但工具调用失败的形式五花八门:

  • 网络超时
  • 权限不足
  • 输入参数不符合预期
  • 工具返回了意外格式
  • 工具本身有 bug
from dataclasses import dataclass
from typing import Any, Callable, Optional
import asyncio
from enum import Enum

class ToolError(Enum):
    NETWORK_ERROR = "network_error"
    PERMISSION_DENIED = "permission_denied"
    INVALID_PARAMS = "invalid_params"
    UNEXPECTED_RESPONSE = "unexpected_response"
    TIMEOUT = "timeout"
    UNKNOWN = "unknown"


@dataclass
class ToolExecutionResult:
    """工具执行结果"""
    success: bool
    result: Any = None
    error: Optional[ToolError] = None
    error_message: Optional[str] = None
    retry_count: int = 0
    execution_time_ms: float = 0.0


class ReliableToolExecutor:
    """
    可靠的工具执行器:包装工具调用,处理各种失败情况
    """

    def __init__(self, max_retries: int = 3, timeout_seconds: int = 30):
        self.max_retries = max_retries
        self.timeout_seconds = timeout_seconds

    async def execute_with_retry(
        self,
        tool_func: Callable,
        params: Dict,
        tool_name: str
    ) -> ToolExecutionResult:
        """带重试机制的工具执行"""

        last_error = None

        for attempt in range(self.max_retries):
            try:
                start_time = asyncio.get_event_loop().time()

                # 异步执行,支持超时
                result = await asyncio.wait_for(
                    tool_func(**params),
                    timeout=self.timeout_seconds
                )

                execution_time = (asyncio.get_event_loop().time() - start_time) * 1000

                return ToolExecutionResult(
                    success=True,
                    result=result,
                    retry_count=attempt,
                    execution_time_ms=execution_time
                )

            except asyncio.TimeoutError:
                last_error = ToolError.TIMEOUT
                error_message = f"Tool '{tool_name}' timed out after {self.timeout_seconds}s"

            except PermissionError as e:
                last_error = ToolError.PERMISSION_DENIED
                error_message = str(e)

            except (TypeError, ValueError) as e:
                # 参数错误,不要重试
                return ToolExecutionResult(
                    success=False,
                    error=ToolError.INVALID_PARAMS,
                    error_message=f"Invalid parameters for '{tool_name}': {str(e)}",
                    retry_count=attempt
                )

            except Exception as e:
                last_error = ToolError.UNKNOWN
                error_message = str(e)

            # 重试前等待(指数退避)
            if attempt < self.max_retries - 1:
                wait_time = (2 ** attempt) * 0.5  # 0.5s, 1s, 2s
                await asyncio.sleep(wait_time)

        # 所有重试都失败
        return ToolExecutionResult(
            success=False,
            error=last_error,
            error_message=error_message,
            retry_count=self.max_retries
        )

    def validate_tool_result(self, result: Any, expected_schema: Dict) -> bool:
        """验证工具返回是否符合预期格式"""
        if result is None:
            return False

        if not isinstance(result, dict):
            return False

        # 简单检查必要字段
        for field in expected_schema.get("required", []):
            if field not in result:
                return False

        return True

二、Agent 架构设计:从简单到可靠

2.1 单 Agent 模式

最简单的架构,只有一个 Agent 处理所有请求。

class SimpleAgent:
    """简单 Agent:适合单一职责场景"""

    def __init__(self, llm, tools: List):
        self.llm = llm
        self.tools = {t.name: t for t in tools}

        # 系统提示词
        self.system_prompt = """你是一个{role}
你可以通过调用工具来完成任务。
每次只调用一个工具。"""

    def process(self, user_message: str, context: Dict) -> str:
        """处理用户消息"""

        # 构建 prompt
        prompt = self._build_prompt(user_message, context)

        # 调用 LLM
        response = self.llm.generate(prompt)

        # 检查是否有工具调用
        if response.tool_calls:
            return self._execute_tool_call(response.tool_calls[0])

        return response.content

    def _build_prompt(self, user_message: str, context: Dict) -> str:
        # 实际应用中需要更复杂的 prompt 构建
        return f"{self.system_prompt}\n\n用户: {user_message}"

    def _execute_tool_call(self, tool_call) -> str:
        tool_name = tool_call.name
        tool_params = tool_call.arguments

        if tool_name not in self.tools:
            return f"错误:未知工具 '{tool_name}'"

        tool = self.tools[tool_name]

        try:
            result = tool.execute(**tool_params)
            return f"工具 '{tool_name}' 执行结果: {result}"
        except Exception as e:
            return f"工具 '{tool_name}' 执行失败: {str(e)}"

适用场景:单一任务、低复杂度、低可靠性要求

不适用场景:复杂多步骤任务、高可靠性要求、需要优雅降级

2.2 Supervisor-Agent 模式(推荐用于生产)

对于需要高可靠性的场景,我推荐 Supervisor-Agent 模式:Supervisor 负责任务规划和分发,实际执行交给专门的子 Agent。

from dataclasses import dataclass
from typing import List, Dict, Optional, Callable
from enum import Enum

class TaskType(Enum):
    RESEARCH = "research"       # 信息搜集
    ANALYSIS = "analysis"      # 分析推理
    WRITING = "writing"        # 内容生成
    CODING = "coding"          # 代码编写
    REVIEW = "review"          # 审查审核
    GENERAL = "general"        # 一般对话


@dataclass
class Task:
    """任务描述"""
    task_id: str
    task_type: TaskType
    description: str
    priority: int = 0  # 0-10, 越高越优先
    dependencies: List[str] = field(default_factory=list)  # 依赖的其他任务 ID
    max_retries: int = 3


@dataclass
class TaskResult:
    """任务执行结果"""
    task_id: str
    success: bool
    output: Optional[str] = None
    error: Optional[str] = None
    execution_time_seconds: float = 0.0


class SupervisorAgent:
    """
    Supervisor-Agent 架构:
    - Supervisor负责任务规划、分解、结果汇总
    - Sub-Agent负责实际执行
    """

    def __init__(self, llm, sub_agents: Dict[TaskType, any], tools: Dict):
        self.llm = llm
        self.sub_agents = sub_agents
        self.tools = tools

        # 任务仓库
        self.pending_tasks: List[Task] = []
        self.completed_tasks: Dict[str, TaskResult] = {}

    def process_request(self, user_request: str) -> str:
        """处理用户请求"""

        # Step 1: 任务规划 - LLM 决定需要哪些子任务
        plan = self._plan_tasks(user_request)

        # Step 2: 执行任务(按依赖关系排序)
        results = self._execute_tasks(plan)

        # Step 3: 汇总结果
        final_response = self._aggregate_results(results)

        return final_response

    def _plan_tasks(self, user_request: str) -> List[Task]:
        """任务规划:决定需要哪些任务"""

        prompt = f"""分析以下用户请求,分解为可执行的任务。

用户请求: {user_request}

请分析请求类型,并决定是否需要以下类型的任务:
- RESEARCH: 需要搜集信息
- ANALYSIS: 需要分析推理
- WRITING: 需要生成内容
- CODING: 需要编写代码
- REVIEW: 需要审查审核

请以 JSON 格式返回任务列表:
{{"tasks": [{{"task_type": "...", "description": "...", "priority": 0-10}}]}}
"""

        response = self.llm.generate(prompt)

        # 解析响应并创建 Task 对象
        tasks = self._parse_task_plan(response)

        return tasks

    def _execute_tasks(self, tasks: List[Task]) -> List[TaskResult]:
        """按依赖关系执行任务"""

        # 按依赖关系拓扑排序
        sorted_tasks = self._topological_sort(tasks)

        results = []

        for task in sorted_tasks:
            # 检查依赖是否都已完成
            deps_completed = all(
                self.completed_tasks.get(dep_id, TaskResult(dep_id, False)).success
                for dep_id in task.dependencies
            )

            if not deps_completed:
                # 依赖失败,当前任务跳过
                results.append(TaskResult(
                    task_id=task.task_id,
                    success=False,
                    error="Dependencies failed"
                ))
                continue

            # 查找对应的 sub-agent
            sub_agent = self.sub_agents.get(task.task_type)

            if sub_agent is None:
                # 没有对应的 sub-agent,Supervisor 直接处理
                result = self._execute_as_supervisor(task)
            else:
                result = self._execute_via_subagent(task, sub_agent)

            self.completed_tasks[task.task_id] = result
            results.append(result)

        return results

    def _execute_via_subagent(self, task: Task, sub_agent) -> TaskResult:
        """通过 sub-agent 执行任务"""
        import time
        start = time.time()

        try:
            output = sub_agent.execute(task.description)
            return TaskResult(
                task_id=task.task_id,
                success=True,
                output=output,
                execution_time_seconds=time.time() - start
            )
        except Exception as e:
            return TaskResult(
                task_id=task.task_id,
                success=False,
                error=str(e),
                execution_time_seconds=time.time() - start
            )

    def _aggregate_results(self, results: List[TaskResult]) -> str:
        """汇总任务结果,生成最终响应"""

        successful = [r for r in results if r.success]
        failed = [r for r in results if not r.success]

        if not successful:
            return "抱歉,您的请求处理失败。请重试。"

        # 按执行顺序拼接结果
        response_parts = []

        for result in successful:
            response_parts.append(result.output)

        # 如果有失败任务,给出提示
        if failed:
            response_parts.append(
                f"\n\n注意:有 {len(failed)} 个子任务处理失败,可能影响回答的完整性。"
            )

        return "\n\n".join(response_parts)

2.3 关键设计原则

根据我的踩坑经验,Agent 架构有几个关键原则:

原则一:明确的失败边界

每个组件都必须有明确的失败情况处理。不能假设"它应该能成功"。

def unreliable_component(data):
    """❌ 错误:假设一切顺利"""
    result = risky_operation(data)
    return process_result(result)  # 如果 result 是 None?

def reliable_component(data):
    """✅ 正确:明确的失败边界"""
    result = risky_operation(data)
    if result is None:
        return {"error": "操作失败", "fallback_available": True}
    return process_result(result)

原则二:可观测性

Agent 的每个决策都应该可追踪、可回溯。这对于调试和用户解释至关重要。

class ObservableAgent:
    """可观测的 Agent:记录所有决策"""

    def __init__(self):
        self.decision_log = []

    def log_decision(self, decision_type: str, context: Dict, decision: str, confidence: float):
        """记录决策"""
        self.decision_log.append({
            "timestamp": datetime.now().isoformat(),
            "decision_type": decision_type,
            "context_summary": self._summarize_context(context),
            "decision": decision,
            "confidence": confidence,
        })

    def _summarize_context(self, context: Dict) -> str:
        """将上下文压缩为摘要(节省存储)"""
        return f"messages={len(context.get('messages', []))}, " \
               f"tools={list(context.get('available_tools', {}).keys())}, " \
               f"task={context.get('current_task', 'N/A')[:50]}"

原则三:优雅降级

当 Agent 无法完成复杂任务时,应该能够退回到更简单的处理方式。

class GracefulDegradationAgent:
    """支持优雅降级的 Agent"""

    def __init__(self, primary_llm, fallback_llm):
        self.primary_llm = primary_llm
        self.fallback_llm = fallback_llm

    def process(self, request: str, complexity_estimate: float) -> str:
        """
        complexity_estimate: 0-1, 越高表示任务越复杂
        """

        try:
            # 先尝试用主模型处理
            response = self.primary_llm.generate(request)

            # 检查响应质量
            if self._quality_check(response):
                return response

            # 质量不达标,考虑降级
            if complexity_estimate > 0.7:
                # 复杂任务降级到更可靠的配置
                return self._handle_complex_with_fallback(request)
            else:
                # 简单任务,可能是 LLM 状态波动,重试
                return self.primary_llm.generate(request)

        except Exception as e:
            # 任何异常都触发降级
            return self._handle_with_fallback(request)

    def _handle_complex_with_fallback(self, request: str) -> str:
        """复杂任务降级:简化问题,尝试分段处理"""

        # 分解为更小的子问题
        sub_problems = self._decompose(request)

        results = []
        for sub in sub_problems:
            try:
                result = self.fallback_llm.generate(sub, max_tokens=500)
                results.append(result)
            except:
                results.append("[处理失败]")

        return self._compose_response(results)

    def _handle_with_fallback(self, request: str) -> str:
        """通用降级:直接使用 fallback 模型"""
        return self.fallback_llm.generate(
            f"请简洁回答:{request}",
            max_tokens=300
        )

三、生产环境的关键组件

3.1 健康检查与监控

Agent 需要持续的"生命体征"监控:

from dataclasses import dataclass
import psutil
import time

@dataclass
class AgentHealthStatus:
    """Agent 健康状态"""
    timestamp: datetime
    is_healthy: bool
    active_conversations: int
    queue_depth: int
    error_rate_5min: float
    avg_response_time_ms: float
    memory_usage_percent: float
    cpu_usage_percent: float


class AgentHealthMonitor:
    """Agent 健康监控"""

    def __init__(self, agent, warning_threshold: float = 0.05):
        self.agent = agent
        self.warning_threshold = warning_threshold  # 5% 错误率阈值

        # 健康指标记录
        self.response_times = []
        self.errors = []
        self.last_check_time = time.time()

    def check_health(self) -> AgentHealthStatus:
        """执行健康检查"""

        now = time.time()
        elapsed = now - self.last_check_time

        # 清理 5 分钟前的旧数据
        self._cleanup_old_data(now)

        # 计算指标
        error_rate = len(self.errors) / max(len(self.response_times), 1)
        avg_response_time = sum(self.response_times) / max(len(self.response_times), 1) if self.response_times else 0

        # 系统资源
        memory = psutil.virtual_memory()
        cpu = psutil.cpu_percent(interval=0.1)

        return AgentHealthStatus(
            timestamp=datetime.now(),
            is_healthy=error_rate < self.warning_threshold,
            active_conversations=self.agent.active_conversations,
            queue_depth=self.agent.task_queue.qsize(),
            error_rate_5min=error_rate,
            avg_response_time_ms=avg_response_time,
            memory_usage_percent=memory.percent,
            cpu_usage_percent=cpu,
        )

    def _cleanup_old_data(self, now: float):
        """清理 5 分钟前的数据"""
        five_minutes_ago = now - 300

        self.response_times = [
            (t, rt) for t, rt in self.response_times if t > five_minutes_ago
        ]
        self.errors = [
            (t, e) for t, e in self.errors if t > five_minutes_ago
        ]

3.2 人机协作边界

有些任务不应该完全由 AI 处理。我设计了一个人机协作边界系统:

class HumanInTheLoop:
    """人机协作边界管理"""

    # 需要人工确认的操作类型
    ESCALATE_TO_HUMAN = [
        "financial_transaction",
        "delete_data",
        "send_external_email",
        "modify_permissions",
        "approve_refund",
    ]

    # 完全由 AI 处理的操作类型
    AI_ONLY = [
        "search_knowledge_base",
        "summarize_document",
        "draft_response",
        "analyze_data",
        "generate_report",
    ]

    def requires_human_review(self, action_type: str, risk_level: str) -> bool:
        """判断是否需要人工审核"""

        # 高风险操作必须人工审核
        if action_type in self.ESCALATE_TO_HUMAN or risk_level == "high":
            return True

        # 低风险操作 AI 自行处理
        if action_type in self.AI_ONLY and risk_level == "low":
            return False

        # 中等风险操作,给用户确认选项
        return False  # 但在响应中提示用户可以申请人工审核

3.3 对话记忆与遗忘

Agent 需要有策略地管理记忆——不是所有对话都需要记住:

class ConversationMemory:
    """对话记忆管理:选择性记住"""

    def __init__(self, max_short_term_items: int = 50):
        self.short_term = []  # 短期记忆:最近 N 条消息
        self.long_term = {}   # 长期记忆:用户偏好、重要事实
        self.max_short_term_items = max_short_term_items

    def add_exchange(self, user_message: str, agent_response: str, metadata: Dict):
        """添加一对对话"""

        self.short_term.append({
            "user": user_message,
            "agent": agent_response,
            "timestamp": datetime.now(),
            "metadata": metadata
        })

        # 提取可能需要长期记忆的信息
        self._extract_long_term_memory(user_message, agent_response, metadata)

        # 清理过期的短期记忆
        self._consolidate_short_term()

    def _extract_long_term_memory(self, user_message: str, agent_response: str, metadata: Dict):
        """提取需要长期记忆的信息"""

        # 提取用户偏好
        if "preference" in metadata:
            self.long_term[f"pref_{metadata['preference_type']}"] = metadata["preference"]

        # 提取关键事实
        if "important_fact" in metadata:
            self.long_term[f"fact_{len(self.long_term)}"] = metadata["important_fact"]

        # 提取决策(用于后续参考)
        if metadata.get("is_decision"):
            self.long_term[f"decision_{metadata.get('decision_id')}"] = {
                "context": user_message[:200],
                "decision": agent_response[:200],
                "timestamp": datetime.now().isoformat()
            }

    def _consolidate_short_term(self):
        """整合短期记忆"""

        if len(self.short_term) <= self.max_short_term_items:
            return

        # 保留最新的 + 最"重要"的
        # 简化:保留最新的 max_short_term_items 条
        self.short_term = self.short_term[-self.max_short_term_items:]

    def get_relevant_context(self, current_message: str) -> str:
        """获取与当前消息相关的上下文"""

        # 简单的关键词匹配(实际应用应该用 embedding)
        current_keywords = set(current_message.lower().split())

        relevant = []

        # 检查短期记忆
        for item in self.short_term[-10:]:  # 最近 10 条
            if any(kw in item["user"].lower() for kw in current_keywords):
                relevant.append(f"用户曾说:{item['user']}")

        # 检查长期记忆
        for key, value in self.long_term.items():
            if isinstance(value, str) and any(kw in value.lower() for kw in current_keywords):
                relevant.append(f"记住的事实:{value}")

        return "\n".join(relevant) if relevant else "无相关上下文"

四、真实案例:客服 Agent 的生产级实现

4.1 背景

我曾帮一个电商平台重构他们的 AI 客服系统。原系统是一个纯规则匹配的系统,问题覆盖率只有 60%,剩下 40% 用户只能被转人工。

目标:构建一个 AI Agent,覆盖率 90%+,同时把转人工率从 40% 降到 15%。

4.2 架构设计

用户消息
    ↓
意图识别(轻量模型,<100ms)
    ↓
┌─────────────┬──────────────┬──────────────┐
↓            ↓              ↓
查询类      业务类        投诉类
(RAG)       (API调用)     (转人工)
    ↓            ↓              ↓
知识库检索   订单系统      HumanAgent
    ↓            ↓              ↓
    └────────────┴──────────────┘
                ↓
         响应生成 + 安全检查
                ↓
         用户消息

4.3 核心实现

class EcommerceCustomerServiceAgent:
    """电商客服 Agent"""

    def __init__(self):
        self.intent_classifier = self._load_intent_classifier()
        self.rag_system = self._load_rag_system()
        self.order_api = OrderAPI()
        self.human_agent_pool = HumanAgentPool()

        # 意图到处理的映射
        self.intent_handlers = {
            "order_status": self._handle_order_status,
            "return_request": self._handle_return_request,
            "product_inquiry": self._handle_product_inquiry,
            "complaint": self._handle_complaint,
            "general": self._handle_general,
        }

    def process(self, user_message: str, user_context: Dict) -> str:
        """处理用户消息"""

        # Step 1: 意图识别(快速,不走 LLM)
        intent = self.intent_classifier.predict(user_message)

        # Step 2: 查找对应的处理器
        handler = self.intent_handlers.get(intent, self._handle_general)

        # Step 3: 执行处理
        try:
            response = handler(user_message, user_context)

            # Step 4: 安全检查
            if not self._passes_safety_check(response):
                return self._fallback_response(user_message)

            return response

        except Exception as e:
            # 任何异常,检查是否需要转人工
            if self._should_escalate(user_message, e):
                return self._escalate_to_human(user_message, user_context)
            return self._apology_and_retry(user_message)

    def _handle_order_status(self, message: str, context: Dict) -> str:
        """处理订单状态查询"""

        # 提取订单号
        order_id = extract_order_id(message)

        if not order_id:
            return "请提供您的订单号,以便我查询。"

        # 调用订单 API
        order = self.order_api.get_order(order_id)

        if order is None:
            return "抱歉,未找到该订单。请核对订单号是否正确。"

        # 生成自然语言响应
        return (
            f"您的订单 {order_id} 当前状态是:{order['status_text']}。"
            f"预计{order.get('estimated_delivery', '近日')}送达。"
            f"如有其他问题,请随时告诉我。"
        )

    def _handle_complaint(self, message: str, context: Dict) -> str:
        """处理投诉:直接转人工"""
        return self._escalate_to_human(message, context)

    def _should_escalate(self, message: str, error: Exception) -> bool:
        """判断是否应该转人工"""

        # 明确的转人工关键词
        escalate_keywords = ["退款", "赔偿", "投诉经理", "升级处理"]

        if any(kw in message for kw in escalate_keywords):
            return True

        # 连续失败
        if self.consecutive_failures >= 3:
            return True

        # 特定类型的异常
        if isinstance(error, (PermissionError, ValueError)):
            return True

        return False

4.4 效果

重构后的系统运行了 6 个月:

客服 Agent 性能对比
================================
指标                 重构前      重构后
----------------------------------------------------------------
AI 独立解决率        60%        88%
平均响应时间         45s        3.2s
转人工率             40%        14%
用户满意度           3.2/5      4.1/5
日均处理量           2,000      8,500
每月 API 成本        N/A        $3,200
================================

五、我的观点:AI Agent 的成功取决于"失败设计"

写到最后,我想强调一个观点:

AI Agent 能不能在生产环境成功,不是取决于它"能做什么",而是取决于它"不能做什么时怎么办"。

Demo 展示的是"晴天";生产环境需要应对"雨天"。

好的 Agent 设计,在 90% 的情况下表现优秀;但更重要的是——剩下的 10% 情况不会崩溃,不会产生有害输出,不会让用户感到沮丧。

这就是为什么我在这篇文章里花了大量篇幅讲:
- 明确的失败边界
- 可靠的工具调用
- 优雅降级
- 人机协作
- 可观测性

这些"防御性设计"不会让 Demo 更亮眼,但它们是生产环境的基石。


文章由文字工作者编写。代码示例基于真实项目经验的抽象表达。