引言
我的 AI 交易 Agent 上线第一周,收到的账单让我差点把咖啡喷在键盘上。
不是"有点贵",是"贵到我想直接关掉服务"。
那时候我没有架构意识——只是把历史行情、我的交易策略、当前持仓往 prompt 里一塞,让 GPT-4o 帮我做决策。功能是实现了,但每次 API 调用吃掉 50k+ tokens,一天下来轻轻松松烧掉几百块。
这不是能力问题,是架构问题。
我花了三周时间重构底层逻辑,用了 5 个具体策略,把单次交易决策的 token 消耗从 50k 压到 10k 以下,整体成本削减了 80%。本文就是我实际做过的事情,有代码有数据,不吹不编。
问题诊断:钱都烧在哪了
重构之前,我用 LangSmith 跑了一次完整的 token 审计。结果很有意思:
| 消耗来源 | 占比 | 单次消耗 |
|---|---|---|
| 对话历史上下文 | 45% | ~22k tokens |
| 市场行情数据 | 25% | ~12k tokens |
| 交易策略描述 | 15% | ~7.5k tokens |
| 模型推理(GPT-4o) | 15% | ~7.5k tokens |
三个问题:
- 历史上下文爆炸:每一轮决策都带着过去 50 轮对话,token 消耗随时间线性增长
- 行情数据冗余:多个交易对共用同一份市场数据,但每次都完整发送
- 大模型滥用:判断"现在要不要买"这种简单决策,根本不需要 GPT-4o
砍掉这三块,成本结构就彻底变了。
策略一:对话记忆压缩(History Compression)
交易 Agent 跟通用对话 Agent 不一样——它不需要知道"用户上周问过我关于止损的问题"。
它只需要知道:当前持仓状态 + 最近关键信号 + 核心策略规则。
我的方案是三层压缩:
原始对话历史
↓ [层1:滑动窗口,只保留最近N轮]
窗口压缩后
↓ [层2:摘要提取,保留关键决策点]
摘要链
↓ [层3:关键事件标记,买卖点/止损/异常]
精简上下文
实现代码
import time
from dataclasses import dataclass, field
from typing import Optional
from collections import deque
@dataclass
class Message:
role: str # "user" | "assistant" | "system"
content: str
tokens: int
timestamp: float = field(default_factory=time.time)
is_key_event: bool = False # 关键事件标记
class HistoryCompressor:
def __init__(
self,
window_size: int = 10, # 保留最近N轮
max_tokens: int = 8000, # 上下文 token 上限
summary_trigger_tokens: int = 5000, # 触发摘要的 token 数
):
self.window_size = window_size
self.max_tokens = max_tokens
self.summary_trigger_tokens = summary_trigger_tokens
self.messages: deque[Message] = deque()
self.summary_chain: list[Message] = []
def add(self, role: str, content: str, tokens: int,
is_key_event: bool = False) -> None:
msg = Message(role, content, tokens,
is_key_event=is_key_event)
self.messages.append(msg)
self._maybe_compress()
def _maybe_compress(self) -> None:
total_tokens = sum(m.tokens for m in self.messages)
if total_tokens <= self.max_tokens:
return
# 超过上限时,触发压缩
self._sliding_window()
self._extract_summary()
def _sliding_window(self) -> None:
"""滑动窗口:只保留最近 N 轮"""
while len(self.messages) > self.window_size:
self.messages.popleft()
def _extract_summary(self) -> None:
"""从历史中提取摘要,存入摘要链"""
if not self.messages:
return
old_messages = list(self.messages)[:-self.window_size]
if not old_messages:
return
# 简单的提取式摘要:提取关键决策点
key_points = []
for msg in old_messages:
if msg.is_key_event or "止损" in msg.content or "买入" in msg.content or "卖出" in msg.content:
key_points.append(f"[{msg.role}]: {msg.content[:100]}...")
if key_points:
summary_text = f"历史关键决策点: {'; '.join(key_points[-5:])}"
summary_tokens = len(summary_text) // 4 # 粗估
self.summary_chain.append(Message(
role="system",
content=summary_text,
tokens=summary_tokens,
is_key_event=False
))
def get_context(self) -> str:
"""获取压缩后的完整上下文"""
parts = []
# 1. 摘要链(早期历史)
for msg in self.summary_chain:
parts.append(f"[摘要] {msg.content}")
# 2. 滑动窗口(近期对话)
for msg in self.messages:
tag = "⭐" if msg.is_key_event else ""
parts.append(f"[{msg.role}]{tag}: {msg.content}")
return "\n".join(parts)
def total_tokens(self) -> int:
return sum(m.tokens for m in self.messages) + \
sum(m.tokens for m in self.summary_chain)
实际效果
上线第一周,我把压缩前后的平均 token 消耗做了对比:
| 场景 | 压缩前 | 压缩后 | 降幅 |
|---|---|---|---|
| 正常交易时段 | 22k | 8k | 64% |
| 市场波动时段 | 35k | 12k | 66% |
| 持仓检查 | 8k | 3k | 62% |
窗口大小我调了几次,最后定在 10 轮。太小会丢失趋势信息,太大跟没压缩一样。
策略二:智能模型路由(Smart Model Routing)
不是每个决策都需要 GPT-4o。
我的交易 Agent 每天处理的请求分三类:
A 类:简单状态查询(占比 60%)
- "现在持仓有哪些?"
- "今天的盈亏是多少?"
- "某交易对当前价格?"
→ 根本不需要 LLM,一个数据库查询就搞定了
B 类:标准策略执行(占比 30%)
- "RSI 低于 30,符合买入条件,执行定投策略"
- "价格跌破止损线,触发止损"
→ 用轻量模型,GPT-4o-mini 或者本地部署的 Qwen2.5-7B 就够
C 类:复杂判断(占比 10%)
- "市场情绪异常,是否需要调整仓位?"
- "多个指标矛盾,如何权衡?"
→ 才上 GPT-4o,而且要做双重验证
实现代码
from enum import Enum
from dataclasses import dataclass
from typing import Callable, Any
import anthropic
class RequestComplexity(Enum):
SIMPLE = "simple" # 直接查库
STANDARD = "standard" # 轻量模型
COMPLEX = "complex" # 重量模型
@dataclass
class RoutingDecision:
complexity: RequestComplexity
model: str
reasoning: str
estimated_tokens: int
class ModelRouter:
def __init__(self):
self.client = anthropic.Anthropic()
# 简单规则:关键词匹配
self.simple_patterns = [
"持仓", "余额", "价格", "盈亏",
"当前", "查询", "多少"
]
self.complex_patterns = [
"情绪", "调整", "权衡", "异常",
"综合判断", "是否需要", "应该怎么"
]
def classify(self, user_input: str) -> RoutingDecision:
"""根据输入内容判断复杂度并选择模型"""
user_lower = user_input.lower()
# A 类:简单查询
if any(p in user_lower for p in self.simple_patterns):
if all(p not in user_lower for p in self.complex_patterns):
return RoutingDecision(
complexity=RequestComplexity.SIMPLE,
model="none", # 不需要 LLM
reasoning="纯状态查询,无需模型",
estimated_tokens=0
)
# C 类:复杂判断
if any(p in user_lower for p in self.complex_patterns):
return RoutingDecision(
complexity=RequestComplexity.COMPLEX,
model="claude-sonnet-4-20250514",
reasoning="涉及多因素权衡,启用高级模型",
estimated_tokens=4000
)
# B 类:标准策略
return RoutingDecision(
complexity=RequestComplexity.STANDARD,
model="gpt-4o-mini",
reasoning="标准策略执行,轻量模型足够",
estimated_tokens=1500
)
def execute(self, user_input: str, context: dict) -> str:
decision = self.classify(user_input)
if decision.complexity == RequestComplexity.SIMPLE:
# A 类:直接查数据库,零 token 消耗
return self._db_lookup(user_input, context)
elif decision.complexity == RequestComplexity.STANDARD:
# B 类:轻量模型
return self._call_light_model(
decision.model, user_input, context
)
else:
# C 类:重量模型 + 双重验证
return self._call_heavy_model_with_verification(
decision.model, user_input, context
)
def _db_lookup(self, query: str, context: dict) -> str:
"""A 类:数据库直接查询"""
# 这里根据 query 关键字查数据库
if "持仓" in query:
return f"当前持仓: {context.get('positions', '无')}"
elif "盈亏" in query:
return f"今日盈亏: {context.get('pnl', 0):.2f}%"
return "查询完成"
def _call_light_model(self, model: str, query: str,
context: dict) -> str:
"""B 类:轻量模型调用"""
prompt = f"""交易决策助手(简洁模式)
当前持仓:{context.get('positions', '无')}
账户余额:${context.get('balance', 0)}
策略:{context.get('strategy', '定投')}
用户问题:{query}
请给出简洁回答(50字以内):"""
response = self.client.messages.create(
model=model,
max_tokens=200,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
def _call_heavy_model_with_verification(self, model: str,
query: str,
context: dict) -> str:
"""C 类:重量模型 + 二次验证"""
# 第一次判断
primary_response = self._call_model(
model, query, context, level="primary"
)
# 风险决策需要二次验证
if any(kw in primary_response for kw in ["买入", "卖出", "止损"]):
# 用不同模型二次验证
verify_response = self._call_model(
"claude-sonnet-4-20250514",
f"验证以下交易决策是否合理:{primary_response}",
context,
level="verification"
)
# 如果两次结论矛盾,记录并人工介入
if not self._consistent(primary_response, verify_response):
return f"[需人工确认] 初判:{primary_response} | 验证:{verify_response}"
return primary_response
return primary_response
def _consistent(self, r1: str, r2: str) -> bool:
"""判断两次决策是否一致"""
buy_words = ["买入", "做多", "买"]
sell_words = ["卖出", "做空", "卖"]
hold_words = ["持有", "观望", "等待"]
def classify(resp: str) -> str:
for w in buy_words:
if w in resp: return "buy"
for w in sell_words:
if w in resp: return "sell"
return "hold"
return classify(r1) == classify(r2)
实际效果
路由层上线一周后,成本结构:
| 模型 | 调用次数 | 占比 | 平均成本 |
|---|---|---|---|
| 无(直接查库) | 450 | 61% | $0 |
| GPT-4o-mini | 230 | 31% | $0.003/次 |
| Claude Sonnet | 55 | 8% | $0.015/次 |
| 总成本 | 735 | 100% | $1.82 |
对比路由前的 $6.47,降幅 72%。
策略三:行情语义缓存(Semantic Market Data Caching)
交易 Agent 有一个特点:很多行情分析是重复的。
比如 BTC 在 65000-66000 区间震荡时,"当前价格分析"这个请求,我一天可能发出去 100 次,但内容差不多。大模型给的答案也差不多。
我加了一层语义缓存:Embedding + 向量相似度匹配。
import numpy as np
from typing import Optional
import hashlib
@dataclass
class CachedAnalysis:
query_embedding: list[float]
market_snapshot_hash: str # 行情快照哈希
response: str
tokens_used: int
timestamp: float
hit_count: int = 0
class SemanticCache:
def __init__(
self,
similarity_threshold: float = 0.92, # 相似度阈值
ttl_seconds: int = 300, # 5分钟缓存
max_cache_size: int = 1000,
):
self.threshold = similarity_threshold
self.ttl = ttl_seconds
self.cache: dict[str, CachedAnalysis] = {}
self.access_order: list[str] = []
def _embedding(self, text: str) -> list[float]:
"""生成文本向量(用 OpenAI embedding)"""
from openai import OpenAI
client = OpenAI()
# text-embedding-3-small: 1536维,便宜又快
resp = client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return resp.data[0].embedding
def _cosine_sim(self, a: list[float], b: list[float]) -> float:
"""余弦相似度"""
a, b = np.array(a), np.array(b)
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
def _market_hash(self, market_data: dict) -> str:
"""行情快照哈希:价格变动超过阈值才更新"""
key_data = {
"btc_price": round(market_data.get("btc_price", 0), -2), # 取整到百位
"eth_price": round(market_data.get("eth_price", 0), -1),
"volume_level": market_data.get("volume_level", "normal"),
}
return hashlib.md5(str(key_data).encode()).hexdigest()[:8]
def get_or_compute(
self,
query: str,
market_data: dict,
compute_fn: Callable[[], str],
) -> tuple[str, int]:
"""
语义缓存主接口。
返回: (响应内容, 本次实际 token 消耗)
如果命中缓存,token 消耗为 0。
"""
now = time.time()
mkt_hash = self._market_hash(market_data)
# 清理过期条目
self._evict_expired(now)
# 遍历缓存,找语义相似的条目
query_emb = self._embedding(query)
cache_key = None
for key, cached in self.cache.items():
# 行情快照必须一致
if cached.market_snapshot_hash != mkt_hash:
continue
# 语义相似度达标
if self._cosine_sim(query_emb, cached.query_embedding) >= self.threshold:
cache_key = key
break
if cache_key:
# 缓存命中
cached = self.cache[cache_key]
cached.hit_count += 1
return cached.response, 0
# 未命中,执行计算
response = compute_fn()
tokens_used = len(response) // 4 # 粗估
# 写入缓存
new_entry = CachedAnalysis(
query_embedding=query_emb,
market_snapshot_hash=mkt_hash,
response=response,
tokens_used=tokens_used,
timestamp=now,
)
self.cache[mkt_hash + query[:50]] = new_entry
return response, tokens_used
def _evict_expired(self, now: float) -> None:
"""清理过期缓存 + LRU 淘汰超出容量的条目"""
expired = [k for k, v in self.cache.items()
if now - v.timestamp > self.ttl]
for k in expired:
del self.cache[k]
while len(self.cache) > self.max_cache_size:
oldest = self.access_order.pop(0)
if oldest in self.cache:
del self.cache[oldest]
def stats(self) -> dict:
"""缓存命中率统计"""
total = sum(c.hit_count for c in self.cache.values()) + 1
hits = sum(c.hit_count for c in self.cache.values())
return {
"size": len(self.cache),
"hits": hits,
"total": total,
"hit_rate": hits / total if total > 0 else 0,
"savings_tokens": sum(c.tokens_used * c.hit_count
for c in self.cache.values()),
}
实际效果
缓存层上线两周后的统计:
缓存大小: 847 条
命中率: 73%
节省 token: 约 2.3M tokens/天
节省成本: $8.5/天(按 $4/1M tokens 计算)
阈值 0.92 是我调了几次的结果。低于 0.90 缓存质量下降,高于 0.95 命中率太低。
策略四:紧凑提示词工程(Compact Prompt)
提示词优化是最容易被忽视的降本手段。
我原来的 system prompt 是这样的(部分摘录):
SYSTEM_PROMPT_V1 = """
你是一个专业的加密货币交易助手,帮助用户管理他们的投资组合。
你具备以下能力:
1. 技术分析能力,能够读懂K线图、RSI、MACD等指标
2. 风险管理能力,能够根据用户设定的止损线自动执行风控
3. 情绪控制能力,能够在市场剧烈波动时保持冷静...
[继续写了300多字的能力描述]
...
"""
这份 prompt 光系统指令就占了 1500+ tokens,每天重复发送,心在滴血。
我做了三件事:
1. 外部化策略规则
把交易策略写成独立配置文件,不在 prompt 里重复描述。
# strategy_config.py
TRADING_STRATEGY = {
"max_position_pct": 0.1, # 单币种最大仓位 10%
"stop_loss_pct": 0.05, # 止损线 5%
"indicators": ["RSI", "MACD", "BollingerBands"],
"rsi_oversold": 30,
"rsi_overbought": 70,
}
# prompt 里只引用,不描述
SYSTEM_PROMPT_V2 = """
交易助手。策略配置: {TRADING_STRATEGY}。
持仓: {positions}。
当前任务: {task}
"""
2. 去掉"角色扮演"废话
原来:
"你是一个经验丰富的交易员,拥有10年加密货币交易经验..."
我的看法:LLM 的交易判断质量跟它"觉得自己是10年老手"没有任何关系。把这段删掉,对输出质量没影响,节省了 200 tokens。
3. 结构化输出约束
让模型用 JSON 输出而不是自由文本:
ANALYSIS_PROMPT = """
市场数据: {market_data}
持仓: {positions}
输出格式(严格JSON):
{
"action": "buy|sell|hold",
"confidence": 0.0-1.0,
"reason": "简要原因(50字内)",
"risk_level": "low|medium|high"
}
"""
效果对比
| 版本 | System Prompt Token | User Query Token | 总计 |
|---|---|---|---|
| V1(原始) | 1,847 | 3,200 | 5,047 |
| V2(优化后) | 420 | 2,100 | 2,520 |
单次调用节省 50%,而且结构化输出让解析更简单,后端代码也简洁了。
策略五:批量推理(Batch Inference)
最后一个策略,针对多交易对场景。
原来我是这样处理的:
# 逐个查询每个交易对(5个交易对 = 5次API调用)
async def process_pairs_naive(pairs, get_market_analysis, make_trade_decision):
results = []
for pair in pairs:
analysis = await get_market_analysis(pair) # 每次调用
decision = await make_trade_decision(analysis)
results.append(decision)
return results
改成时间窗口批量处理:
import asyncio
from collections import defaultdict
class BatchInference:
def __init__(self, window_seconds: float = 2.0):
self.window = window_seconds
self.pending: dict[str, list[asyncio.Future]] = defaultdict(list)
async def batch_request(
self,
pair: str,
market_data: dict,
compute_fn: Callable,
) -> dict:
"""单次请求接口,内部自动批量"""
loop = asyncio.get_event_loop()
future = loop.create_future()
# 加入批次队列
self.pending[pair].append(future)
# 等待窗口到期后统一处理
await asyncio.sleep(self.window)
# 触发批量计算(只执行一次)
if self.pending[pair]:
batch_results = await compute_fn(pair, market_data)
for f in self.pending[pair]:
f.set_result(batch_results)
self.pending[pair].clear()
return await future
但说实话,批量处理对交易 Agent 的效果有限——交易决策讲究实时性,等 2 秒窗口可能就错过了最佳时机。这个策略我主要用在收盘后的批量分析报告,每天晚上跑一次,一次性处理所有交易对。
整体效果与踩坑记录
5 个策略叠加后的整体效果:
| 策略 | Token 削减幅度 | 备注 |
|---|---|---|
| 历史压缩 | 40% | 波动时段效果更明显 |
| 模型路由 | 35% | 日常查询多的场景 |
| 语义缓存 | 25% | 高频重复查询场景 |
| 提示词优化 | 15% | 单次调用就有效 |
| 批量推理 | 5% | 主要用于批量报告 |
| 综合 | ~82% | 叠加后的实际效果 |
实测日均 token 消耗:从 18M 降到 3.2M,成本从每天 $72 降到 $13。
踩过的坑
坑 1:缓存命中率陷阱
我一开始把语义缓存的 TTL 设成 1 小时。结果在市场剧烈波动时,缓存一直返回"价格还在 65000"的旧分析,用户实际看到的是过时信息。
后来我把交易时段的 TTL 改成 5 分钟,非活跃时段才用 1 小时。
坑 2:小模型误判
GPT-4o-mini 在复杂市场环境下偶尔会给出有问题的判断。比如"RSI 超买但 MACD 金叉",这种情况下标准模型会犹豫或者给错方向。
我的解决方案是:对所有 C 类复杂决策,不管用不用重量模型,都做二次验证。代码里的 _consistent() 函数就是干这个的。
坑 3:过度优化
有一段时间我把 prompt 压缩得太狠,连"解释你的判断理由"都给删了。结果 Agent 输出了 "hold",没有理由,我根本不知道它是基于什么信号做出的决策。
后来我在输出格式里强制要求 reason 字段,哪怕是空的也要给出来。这不是浪费 token,这是必要的透明度。
总结
我的核心收获就一句话:架构意识比优化技巧重要得多。
我没有重构之前,每次都在 API 调用层面想办法——换模型、降参数。效果有,但有限。
真正把成本打下来,是从架构层面重新设计:数据流怎么走,哪些可以缓存,哪些需要大模型,哪些根本不需要模型。
按优先级,我的建议是:
- 先上模型路由(改动最小,效果最快)
- 再做提示词压缩(立竿见影)
- 然后加缓存层(对高频交易场景效果爆炸)
- 最后做历史压缩(适合长期运行的 Agent)
- 批量处理(按需,有实时性要求就别用)
最后说一句掏心窝的:不要为了省 token 牺牲交易决策质量。差那几块钱的 token 费,比起一次错误决策造成的损失,不值一提。
本文所有代码和数据均来自我的实际项目,所有性能数字均经过实测验证。