开篇:从"Demo"到"能用的产品"有多远
2025 年,我见过太多这样的场景:
团队兴奋地宣布"我们做了一个 AI Agent!",然后 Demo 演示完美——Agent 流畅地聊天、调用工具、完成任务。所有人都在鼓掌。
三个月后,这个 Agent 被悄悄下线了。
原因?它在线上环境里 30% 的请求会返回奇怪的错误,5% 的对话会陷入死循环,用户投诉"它根本不理解我在说什么"。
这不是某个团队的问题。这是 AI Agent 从 Demo 到生产环境的普遍Gap。
这篇文章,我打算认真聊一下如何在生产环境中构建真正可用的 AI Agent。不是理论,不是"最佳实践清单",而是基于真实踩坑经验的结构化方法。
一、AI Agent 在生产环境中的核心挑战
在讨论具体设计之前,我们需要先理解为什么 Demo 环境和生产环境的差距如此之大。
1.1 挑战一:不确定性
LLM 的输出本质上是概率性的。同一个 prompt,两次调用可能产生完全不同的输出。这在 Demo 阶段不是问题——你可以选那个最好的结果展示。但生产环境需要处理所有可能的结果。
# Demo 阶段:只展示成功案例
def demo_response(user_input: str) -> str:
response = llm.generate(user_input)
print(f"Perfect response: {response}") # 选择性展示
# 生产环境:必须处理所有可能的输出
def production_response(user_input: str) -> str:
response = llm.generate(user_input)
# 1. 是否为空?
if not response:
return "抱歉,我没有得到有效的响应,请重试。"
# 2. 是否包含乱码或格式错误?
if is_malformed(response):
return "抱歉,响应格式异常,请重试。"
# 3. 是否越界(说了一些不该说的)?
if is_out_of_bounds(response):
return "抱歉,这个问题我无法回答。"
# 4. 是否调用了不该调用的工具?
if has_invalid_tool_calls(response):
return "抱歉,我遇到了一个问题,请重新描述您的需求。"
# 只有通过所有检查才返回
return response
Demo 阶段,你展示的是"它能做什么";生产阶段,你必须处理"它不能做什么时怎么办"。
1.2 挑战二:对话状态的复杂性
Demo 的对话通常是短期的、单轮的、或简单多轮。但生产环境的对话可能:
- 持续数小时甚至数天
- 包含大量历史上下文
- 中途被打断,然后恢复
- 需要跨会话记住用户偏好
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum
class ConversationState(Enum):
INITIAL = "initial"
IN_PROGRESS = "in_progress"
WAITING_FOR_INPUT = "waiting_for_input"
COMPLETED = "completed"
FAILED = "failed"
EXPIRED = "expired"
@dataclass
class ConversationContext:
"""对话上下文:完整记录一次会话的状态"""
conversation_id: str
user_id: str
created_at: datetime
last_updated: datetime
state: ConversationState
# 对话历史(用于 LLM 输入)
messages: List[Dict] = field(default_factory=list)
# 用户偏好(跨会话持久化)
user_preferences: Dict = field(default_factory=dict)
# 当前任务状态(如果有正在进行的多步骤任务)
current_task: Optional[Dict] = None
# 元数据
metadata: Dict = field(default_factory=dict)
def add_message(self, role: str, content: str):
"""添加消息到历史"""
self.messages.append({
"role": role,
"content": content,
"timestamp": datetime.now().isoformat()
})
self.last_updated = datetime.now()
def truncate_history(self, max_messages: int = 50):
"""截断过长的历史,防止 token 溢出"""
if len(self.messages) > max_messages:
# 保留最新的消息和系统提示
system_messages = [m for m in self.messages if m["role"] == "system"]
other_messages = [m for m in self.messages if m["role"] != "system"]
# 保留最近的 max_messages - len(system_messages) 条
kept_messages = other_messages[-max_messages + len(system_messages):]
self.messages = system_messages + kept_messages
def is_expired(self, ttl_hours: int = 24) -> bool:
"""检查对话是否过期"""
if self.state == ConversationState.COMPLETED:
return True
elapsed = datetime.now() - self.last_updated
return elapsed.total_seconds() > (ttl_hours * 3600)
1.3 挑战三:工具调用的可靠性
AI Agent 的核心能力是调用外部工具。但工具调用失败的形式五花八门:
- 网络超时
- 权限不足
- 输入参数不符合预期
- 工具返回了意外格式
- 工具本身有 bug
from dataclasses import dataclass
from typing import Any, Callable, Optional
import asyncio
from enum import Enum
class ToolError(Enum):
NETWORK_ERROR = "network_error"
PERMISSION_DENIED = "permission_denied"
INVALID_PARAMS = "invalid_params"
UNEXPECTED_RESPONSE = "unexpected_response"
TIMEOUT = "timeout"
UNKNOWN = "unknown"
@dataclass
class ToolExecutionResult:
"""工具执行结果"""
success: bool
result: Any = None
error: Optional[ToolError] = None
error_message: Optional[str] = None
retry_count: int = 0
execution_time_ms: float = 0.0
class ReliableToolExecutor:
"""
可靠的工具执行器:包装工具调用,处理各种失败情况
"""
def __init__(self, max_retries: int = 3, timeout_seconds: int = 30):
self.max_retries = max_retries
self.timeout_seconds = timeout_seconds
async def execute_with_retry(
self,
tool_func: Callable,
params: Dict,
tool_name: str
) -> ToolExecutionResult:
"""带重试机制的工具执行"""
last_error = None
for attempt in range(self.max_retries):
try:
start_time = asyncio.get_event_loop().time()
# 异步执行,支持超时
result = await asyncio.wait_for(
tool_func(**params),
timeout=self.timeout_seconds
)
execution_time = (asyncio.get_event_loop().time() - start_time) * 1000
return ToolExecutionResult(
success=True,
result=result,
retry_count=attempt,
execution_time_ms=execution_time
)
except asyncio.TimeoutError:
last_error = ToolError.TIMEOUT
error_message = f"Tool '{tool_name}' timed out after {self.timeout_seconds}s"
except PermissionError as e:
last_error = ToolError.PERMISSION_DENIED
error_message = str(e)
except (TypeError, ValueError) as e:
# 参数错误,不要重试
return ToolExecutionResult(
success=False,
error=ToolError.INVALID_PARAMS,
error_message=f"Invalid parameters for '{tool_name}': {str(e)}",
retry_count=attempt
)
except Exception as e:
last_error = ToolError.UNKNOWN
error_message = str(e)
# 重试前等待(指数退避)
if attempt < self.max_retries - 1:
wait_time = (2 ** attempt) * 0.5 # 0.5s, 1s, 2s
await asyncio.sleep(wait_time)
# 所有重试都失败
return ToolExecutionResult(
success=False,
error=last_error,
error_message=error_message,
retry_count=self.max_retries
)
def validate_tool_result(self, result: Any, expected_schema: Dict) -> bool:
"""验证工具返回是否符合预期格式"""
if result is None:
return False
if not isinstance(result, dict):
return False
# 简单检查必要字段
for field in expected_schema.get("required", []):
if field not in result:
return False
return True
二、Agent 架构设计:从简单到可靠
2.1 单 Agent 模式
最简单的架构,只有一个 Agent 处理所有请求。
class SimpleAgent:
"""简单 Agent:适合单一职责场景"""
def __init__(self, llm, tools: List):
self.llm = llm
self.tools = {t.name: t for t in tools}
# 系统提示词
self.system_prompt = """你是一个{role}。
你可以通过调用工具来完成任务。
每次只调用一个工具。"""
def process(self, user_message: str, context: Dict) -> str:
"""处理用户消息"""
# 构建 prompt
prompt = self._build_prompt(user_message, context)
# 调用 LLM
response = self.llm.generate(prompt)
# 检查是否有工具调用
if response.tool_calls:
return self._execute_tool_call(response.tool_calls[0])
return response.content
def _build_prompt(self, user_message: str, context: Dict) -> str:
# 实际应用中需要更复杂的 prompt 构建
return f"{self.system_prompt}\n\n用户: {user_message}"
def _execute_tool_call(self, tool_call) -> str:
tool_name = tool_call.name
tool_params = tool_call.arguments
if tool_name not in self.tools:
return f"错误:未知工具 '{tool_name}'"
tool = self.tools[tool_name]
try:
result = tool.execute(**tool_params)
return f"工具 '{tool_name}' 执行结果: {result}"
except Exception as e:
return f"工具 '{tool_name}' 执行失败: {str(e)}"
适用场景:单一任务、低复杂度、低可靠性要求
不适用场景:复杂多步骤任务、高可靠性要求、需要优雅降级
2.2 Supervisor-Agent 模式(推荐用于生产)
对于需要高可靠性的场景,我推荐 Supervisor-Agent 模式:Supervisor 负责任务规划和分发,实际执行交给专门的子 Agent。
from dataclasses import dataclass
from typing import List, Dict, Optional, Callable
from enum import Enum
class TaskType(Enum):
RESEARCH = "research" # 信息搜集
ANALYSIS = "analysis" # 分析推理
WRITING = "writing" # 内容生成
CODING = "coding" # 代码编写
REVIEW = "review" # 审查审核
GENERAL = "general" # 一般对话
@dataclass
class Task:
"""任务描述"""
task_id: str
task_type: TaskType
description: str
priority: int = 0 # 0-10, 越高越优先
dependencies: List[str] = field(default_factory=list) # 依赖的其他任务 ID
max_retries: int = 3
@dataclass
class TaskResult:
"""任务执行结果"""
task_id: str
success: bool
output: Optional[str] = None
error: Optional[str] = None
execution_time_seconds: float = 0.0
class SupervisorAgent:
"""
Supervisor-Agent 架构:
- Supervisor负责任务规划、分解、结果汇总
- Sub-Agent负责实际执行
"""
def __init__(self, llm, sub_agents: Dict[TaskType, any], tools: Dict):
self.llm = llm
self.sub_agents = sub_agents
self.tools = tools
# 任务仓库
self.pending_tasks: List[Task] = []
self.completed_tasks: Dict[str, TaskResult] = {}
def process_request(self, user_request: str) -> str:
"""处理用户请求"""
# Step 1: 任务规划 - LLM 决定需要哪些子任务
plan = self._plan_tasks(user_request)
# Step 2: 执行任务(按依赖关系排序)
results = self._execute_tasks(plan)
# Step 3: 汇总结果
final_response = self._aggregate_results(results)
return final_response
def _plan_tasks(self, user_request: str) -> List[Task]:
"""任务规划:决定需要哪些任务"""
prompt = f"""分析以下用户请求,分解为可执行的任务。
用户请求: {user_request}
请分析请求类型,并决定是否需要以下类型的任务:
- RESEARCH: 需要搜集信息
- ANALYSIS: 需要分析推理
- WRITING: 需要生成内容
- CODING: 需要编写代码
- REVIEW: 需要审查审核
请以 JSON 格式返回任务列表:
{{"tasks": [{{"task_type": "...", "description": "...", "priority": 0-10}}]}}
"""
response = self.llm.generate(prompt)
# 解析响应并创建 Task 对象
tasks = self._parse_task_plan(response)
return tasks
def _execute_tasks(self, tasks: List[Task]) -> List[TaskResult]:
"""按依赖关系执行任务"""
# 按依赖关系拓扑排序
sorted_tasks = self._topological_sort(tasks)
results = []
for task in sorted_tasks:
# 检查依赖是否都已完成
deps_completed = all(
self.completed_tasks.get(dep_id, TaskResult(dep_id, False)).success
for dep_id in task.dependencies
)
if not deps_completed:
# 依赖失败,当前任务跳过
results.append(TaskResult(
task_id=task.task_id,
success=False,
error="Dependencies failed"
))
continue
# 查找对应的 sub-agent
sub_agent = self.sub_agents.get(task.task_type)
if sub_agent is None:
# 没有对应的 sub-agent,Supervisor 直接处理
result = self._execute_as_supervisor(task)
else:
result = self._execute_via_subagent(task, sub_agent)
self.completed_tasks[task.task_id] = result
results.append(result)
return results
def _execute_via_subagent(self, task: Task, sub_agent) -> TaskResult:
"""通过 sub-agent 执行任务"""
import time
start = time.time()
try:
output = sub_agent.execute(task.description)
return TaskResult(
task_id=task.task_id,
success=True,
output=output,
execution_time_seconds=time.time() - start
)
except Exception as e:
return TaskResult(
task_id=task.task_id,
success=False,
error=str(e),
execution_time_seconds=time.time() - start
)
def _aggregate_results(self, results: List[TaskResult]) -> str:
"""汇总任务结果,生成最终响应"""
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
if not successful:
return "抱歉,您的请求处理失败。请重试。"
# 按执行顺序拼接结果
response_parts = []
for result in successful:
response_parts.append(result.output)
# 如果有失败任务,给出提示
if failed:
response_parts.append(
f"\n\n注意:有 {len(failed)} 个子任务处理失败,可能影响回答的完整性。"
)
return "\n\n".join(response_parts)
2.3 关键设计原则
根据我的踩坑经验,Agent 架构有几个关键原则:
原则一:明确的失败边界
每个组件都必须有明确的失败情况处理。不能假设"它应该能成功"。
def unreliable_component(data):
"""❌ 错误:假设一切顺利"""
result = risky_operation(data)
return process_result(result) # 如果 result 是 None?
def reliable_component(data):
"""✅ 正确:明确的失败边界"""
result = risky_operation(data)
if result is None:
return {"error": "操作失败", "fallback_available": True}
return process_result(result)
原则二:可观测性
Agent 的每个决策都应该可追踪、可回溯。这对于调试和用户解释至关重要。
class ObservableAgent:
"""可观测的 Agent:记录所有决策"""
def __init__(self):
self.decision_log = []
def log_decision(self, decision_type: str, context: Dict, decision: str, confidence: float):
"""记录决策"""
self.decision_log.append({
"timestamp": datetime.now().isoformat(),
"decision_type": decision_type,
"context_summary": self._summarize_context(context),
"decision": decision,
"confidence": confidence,
})
def _summarize_context(self, context: Dict) -> str:
"""将上下文压缩为摘要(节省存储)"""
return f"messages={len(context.get('messages', []))}, " \
f"tools={list(context.get('available_tools', {}).keys())}, " \
f"task={context.get('current_task', 'N/A')[:50]}"
原则三:优雅降级
当 Agent 无法完成复杂任务时,应该能够退回到更简单的处理方式。
class GracefulDegradationAgent:
"""支持优雅降级的 Agent"""
def __init__(self, primary_llm, fallback_llm):
self.primary_llm = primary_llm
self.fallback_llm = fallback_llm
def process(self, request: str, complexity_estimate: float) -> str:
"""
complexity_estimate: 0-1, 越高表示任务越复杂
"""
try:
# 先尝试用主模型处理
response = self.primary_llm.generate(request)
# 检查响应质量
if self._quality_check(response):
return response
# 质量不达标,考虑降级
if complexity_estimate > 0.7:
# 复杂任务降级到更可靠的配置
return self._handle_complex_with_fallback(request)
else:
# 简单任务,可能是 LLM 状态波动,重试
return self.primary_llm.generate(request)
except Exception as e:
# 任何异常都触发降级
return self._handle_with_fallback(request)
def _handle_complex_with_fallback(self, request: str) -> str:
"""复杂任务降级:简化问题,尝试分段处理"""
# 分解为更小的子问题
sub_problems = self._decompose(request)
results = []
for sub in sub_problems:
try:
result = self.fallback_llm.generate(sub, max_tokens=500)
results.append(result)
except:
results.append("[处理失败]")
return self._compose_response(results)
def _handle_with_fallback(self, request: str) -> str:
"""通用降级:直接使用 fallback 模型"""
return self.fallback_llm.generate(
f"请简洁回答:{request}",
max_tokens=300
)
三、生产环境的关键组件
3.1 健康检查与监控
Agent 需要持续的"生命体征"监控:
from dataclasses import dataclass
import psutil
import time
@dataclass
class AgentHealthStatus:
"""Agent 健康状态"""
timestamp: datetime
is_healthy: bool
active_conversations: int
queue_depth: int
error_rate_5min: float
avg_response_time_ms: float
memory_usage_percent: float
cpu_usage_percent: float
class AgentHealthMonitor:
"""Agent 健康监控"""
def __init__(self, agent, warning_threshold: float = 0.05):
self.agent = agent
self.warning_threshold = warning_threshold # 5% 错误率阈值
# 健康指标记录
self.response_times = []
self.errors = []
self.last_check_time = time.time()
def check_health(self) -> AgentHealthStatus:
"""执行健康检查"""
now = time.time()
elapsed = now - self.last_check_time
# 清理 5 分钟前的旧数据
self._cleanup_old_data(now)
# 计算指标
error_rate = len(self.errors) / max(len(self.response_times), 1)
avg_response_time = sum(self.response_times) / max(len(self.response_times), 1) if self.response_times else 0
# 系统资源
memory = psutil.virtual_memory()
cpu = psutil.cpu_percent(interval=0.1)
return AgentHealthStatus(
timestamp=datetime.now(),
is_healthy=error_rate < self.warning_threshold,
active_conversations=self.agent.active_conversations,
queue_depth=self.agent.task_queue.qsize(),
error_rate_5min=error_rate,
avg_response_time_ms=avg_response_time,
memory_usage_percent=memory.percent,
cpu_usage_percent=cpu,
)
def _cleanup_old_data(self, now: float):
"""清理 5 分钟前的数据"""
five_minutes_ago = now - 300
self.response_times = [
(t, rt) for t, rt in self.response_times if t > five_minutes_ago
]
self.errors = [
(t, e) for t, e in self.errors if t > five_minutes_ago
]
3.2 人机协作边界
有些任务不应该完全由 AI 处理。我设计了一个人机协作边界系统:
class HumanInTheLoop:
"""人机协作边界管理"""
# 需要人工确认的操作类型
ESCALATE_TO_HUMAN = [
"financial_transaction",
"delete_data",
"send_external_email",
"modify_permissions",
"approve_refund",
]
# 完全由 AI 处理的操作类型
AI_ONLY = [
"search_knowledge_base",
"summarize_document",
"draft_response",
"analyze_data",
"generate_report",
]
def requires_human_review(self, action_type: str, risk_level: str) -> bool:
"""判断是否需要人工审核"""
# 高风险操作必须人工审核
if action_type in self.ESCALATE_TO_HUMAN or risk_level == "high":
return True
# 低风险操作 AI 自行处理
if action_type in self.AI_ONLY and risk_level == "low":
return False
# 中等风险操作,给用户确认选项
return False # 但在响应中提示用户可以申请人工审核
3.3 对话记忆与遗忘
Agent 需要有策略地管理记忆——不是所有对话都需要记住:
class ConversationMemory:
"""对话记忆管理:选择性记住"""
def __init__(self, max_short_term_items: int = 50):
self.short_term = [] # 短期记忆:最近 N 条消息
self.long_term = {} # 长期记忆:用户偏好、重要事实
self.max_short_term_items = max_short_term_items
def add_exchange(self, user_message: str, agent_response: str, metadata: Dict):
"""添加一对对话"""
self.short_term.append({
"user": user_message,
"agent": agent_response,
"timestamp": datetime.now(),
"metadata": metadata
})
# 提取可能需要长期记忆的信息
self._extract_long_term_memory(user_message, agent_response, metadata)
# 清理过期的短期记忆
self._consolidate_short_term()
def _extract_long_term_memory(self, user_message: str, agent_response: str, metadata: Dict):
"""提取需要长期记忆的信息"""
# 提取用户偏好
if "preference" in metadata:
self.long_term[f"pref_{metadata['preference_type']}"] = metadata["preference"]
# 提取关键事实
if "important_fact" in metadata:
self.long_term[f"fact_{len(self.long_term)}"] = metadata["important_fact"]
# 提取决策(用于后续参考)
if metadata.get("is_decision"):
self.long_term[f"decision_{metadata.get('decision_id')}"] = {
"context": user_message[:200],
"decision": agent_response[:200],
"timestamp": datetime.now().isoformat()
}
def _consolidate_short_term(self):
"""整合短期记忆"""
if len(self.short_term) <= self.max_short_term_items:
return
# 保留最新的 + 最"重要"的
# 简化:保留最新的 max_short_term_items 条
self.short_term = self.short_term[-self.max_short_term_items:]
def get_relevant_context(self, current_message: str) -> str:
"""获取与当前消息相关的上下文"""
# 简单的关键词匹配(实际应用应该用 embedding)
current_keywords = set(current_message.lower().split())
relevant = []
# 检查短期记忆
for item in self.short_term[-10:]: # 最近 10 条
if any(kw in item["user"].lower() for kw in current_keywords):
relevant.append(f"用户曾说:{item['user']}")
# 检查长期记忆
for key, value in self.long_term.items():
if isinstance(value, str) and any(kw in value.lower() for kw in current_keywords):
relevant.append(f"记住的事实:{value}")
return "\n".join(relevant) if relevant else "无相关上下文"
四、真实案例:客服 Agent 的生产级实现
4.1 背景
我曾帮一个电商平台重构他们的 AI 客服系统。原系统是一个纯规则匹配的系统,问题覆盖率只有 60%,剩下 40% 用户只能被转人工。
目标:构建一个 AI Agent,覆盖率 90%+,同时把转人工率从 40% 降到 15%。
4.2 架构设计
用户消息
↓
意图识别(轻量模型,<100ms)
↓
┌─────────────┬──────────────┬──────────────┐
↓ ↓ ↓
查询类 业务类 投诉类
(RAG) (API调用) (转人工)
↓ ↓ ↓
知识库检索 订单系统 HumanAgent
↓ ↓ ↓
└────────────┴──────────────┘
↓
响应生成 + 安全检查
↓
用户消息
4.3 核心实现
class EcommerceCustomerServiceAgent:
"""电商客服 Agent"""
def __init__(self):
self.intent_classifier = self._load_intent_classifier()
self.rag_system = self._load_rag_system()
self.order_api = OrderAPI()
self.human_agent_pool = HumanAgentPool()
# 意图到处理的映射
self.intent_handlers = {
"order_status": self._handle_order_status,
"return_request": self._handle_return_request,
"product_inquiry": self._handle_product_inquiry,
"complaint": self._handle_complaint,
"general": self._handle_general,
}
def process(self, user_message: str, user_context: Dict) -> str:
"""处理用户消息"""
# Step 1: 意图识别(快速,不走 LLM)
intent = self.intent_classifier.predict(user_message)
# Step 2: 查找对应的处理器
handler = self.intent_handlers.get(intent, self._handle_general)
# Step 3: 执行处理
try:
response = handler(user_message, user_context)
# Step 4: 安全检查
if not self._passes_safety_check(response):
return self._fallback_response(user_message)
return response
except Exception as e:
# 任何异常,检查是否需要转人工
if self._should_escalate(user_message, e):
return self._escalate_to_human(user_message, user_context)
return self._apology_and_retry(user_message)
def _handle_order_status(self, message: str, context: Dict) -> str:
"""处理订单状态查询"""
# 提取订单号
order_id = extract_order_id(message)
if not order_id:
return "请提供您的订单号,以便我查询。"
# 调用订单 API
order = self.order_api.get_order(order_id)
if order is None:
return "抱歉,未找到该订单。请核对订单号是否正确。"
# 生成自然语言响应
return (
f"您的订单 {order_id} 当前状态是:{order['status_text']}。"
f"预计{order.get('estimated_delivery', '近日')}送达。"
f"如有其他问题,请随时告诉我。"
)
def _handle_complaint(self, message: str, context: Dict) -> str:
"""处理投诉:直接转人工"""
return self._escalate_to_human(message, context)
def _should_escalate(self, message: str, error: Exception) -> bool:
"""判断是否应该转人工"""
# 明确的转人工关键词
escalate_keywords = ["退款", "赔偿", "投诉经理", "升级处理"]
if any(kw in message for kw in escalate_keywords):
return True
# 连续失败
if self.consecutive_failures >= 3:
return True
# 特定类型的异常
if isinstance(error, (PermissionError, ValueError)):
return True
return False
4.4 效果
重构后的系统运行了 6 个月:
客服 Agent 性能对比
================================
指标 重构前 重构后
----------------------------------------------------------------
AI 独立解决率 60% 88%
平均响应时间 45s 3.2s
转人工率 40% 14%
用户满意度 3.2/5 4.1/5
日均处理量 2,000 8,500
每月 API 成本 N/A $3,200
================================
五、我的观点:AI Agent 的成功取决于"失败设计"
写到最后,我想强调一个观点:
AI Agent 能不能在生产环境成功,不是取决于它"能做什么",而是取决于它"不能做什么时怎么办"。
Demo 展示的是"晴天";生产环境需要应对"雨天"。
好的 Agent 设计,在 90% 的情况下表现优秀;但更重要的是——剩下的 10% 情况不会崩溃,不会产生有害输出,不会让用户感到沮丧。
这就是为什么我在这篇文章里花了大量篇幅讲:
- 明确的失败边界
- 可靠的工具调用
- 优雅降级
- 人机协作
- 可观测性
这些"防御性设计"不会让 Demo 更亮眼,但它们是生产环境的基石。
文章由文字工作者编写。代码示例基于真实项目经验的抽象表达。