引言

我的 AI 交易 Agent 上线第一周,收到的账单让我差点把咖啡喷在键盘上。

不是"有点贵",是"贵到我想直接关掉服务"。

那时候我没有架构意识——只是把历史行情、我的交易策略、当前持仓往 prompt 里一塞,让 GPT-4o 帮我做决策。功能是实现了,但每次 API 调用吃掉 50k+ tokens,一天下来轻轻松松烧掉几百块。

这不是能力问题,是架构问题。

我花了三周时间重构底层逻辑,用了 5 个具体策略,把单次交易决策的 token 消耗从 50k 压到 10k 以下,整体成本削减了 80%。本文就是我实际做过的事情,有代码有数据,不吹不编。


问题诊断:钱都烧在哪了

重构之前,我用 LangSmith 跑了一次完整的 token 审计。结果很有意思:

消耗来源 占比 单次消耗
对话历史上下文 45% ~22k tokens
市场行情数据 25% ~12k tokens
交易策略描述 15% ~7.5k tokens
模型推理(GPT-4o) 15% ~7.5k tokens

三个问题:

  1. 历史上下文爆炸:每一轮决策都带着过去 50 轮对话,token 消耗随时间线性增长
  2. 行情数据冗余:多个交易对共用同一份市场数据,但每次都完整发送
  3. 大模型滥用:判断"现在要不要买"这种简单决策,根本不需要 GPT-4o

砍掉这三块,成本结构就彻底变了。


策略一:对话记忆压缩(History Compression)

交易 Agent 跟通用对话 Agent 不一样——它不需要知道"用户上周问过我关于止损的问题"。

它只需要知道:当前持仓状态 + 最近关键信号 + 核心策略规则

我的方案是三层压缩:

原始对话历史
    ↓ [层1:滑动窗口,只保留最近N轮]
窗口压缩后
    ↓ [层2:摘要提取,保留关键决策点]
摘要链
    ↓ [层3:关键事件标记,买卖点/止损/异常]
精简上下文

实现代码

import time
from dataclasses import dataclass, field
from typing import Optional
from collections import deque

@dataclass
class Message:
    role: str  # "user" | "assistant" | "system"
    content: str
    tokens: int
    timestamp: float = field(default_factory=time.time)
    is_key_event: bool = False  # 关键事件标记

class HistoryCompressor:
    def __init__(
        self,
        window_size: int = 10,       # 保留最近N轮
        max_tokens: int = 8000,      # 上下文 token 上限
        summary_trigger_tokens: int = 5000,  # 触发摘要的 token 数
    ):
        self.window_size = window_size
        self.max_tokens = max_tokens
        self.summary_trigger_tokens = summary_trigger_tokens
        self.messages: deque[Message] = deque()
        self.summary_chain: list[Message] = []

    def add(self, role: str, content: str, tokens: int,
            is_key_event: bool = False) -> None:
        msg = Message(role, content, tokens,
                      is_key_event=is_key_event)
        self.messages.append(msg)
        self._maybe_compress()

    def _maybe_compress(self) -> None:
        total_tokens = sum(m.tokens for m in self.messages)
        if total_tokens <= self.max_tokens:
            return

        # 超过上限时,触发压缩
        self._sliding_window()
        self._extract_summary()

    def _sliding_window(self) -> None:
        """滑动窗口:只保留最近 N 轮"""
        while len(self.messages) > self.window_size:
            self.messages.popleft()

    def _extract_summary(self) -> None:
        """从历史中提取摘要,存入摘要链"""
        if not self.messages:
            return

        old_messages = list(self.messages)[:-self.window_size]
        if not old_messages:
            return

        # 简单的提取式摘要:提取关键决策点
        key_points = []
        for msg in old_messages:
            if msg.is_key_event or "止损" in msg.content or "买入" in msg.content or "卖出" in msg.content:
                key_points.append(f"[{msg.role}]: {msg.content[:100]}...")

        if key_points:
            summary_text = f"历史关键决策点: {'; '.join(key_points[-5:])}"
            summary_tokens = len(summary_text) // 4  # 粗估
            self.summary_chain.append(Message(
                role="system",
                content=summary_text,
                tokens=summary_tokens,
                is_key_event=False
            ))

    def get_context(self) -> str:
        """获取压缩后的完整上下文"""
        parts = []

        # 1. 摘要链(早期历史)
        for msg in self.summary_chain:
            parts.append(f"[摘要] {msg.content}")

        # 2. 滑动窗口(近期对话)
        for msg in self.messages:
            tag = "⭐" if msg.is_key_event else ""
            parts.append(f"[{msg.role}]{tag}: {msg.content}")

        return "\n".join(parts)

    def total_tokens(self) -> int:
        return sum(m.tokens for m in self.messages) + \
               sum(m.tokens for m in self.summary_chain)

实际效果

上线第一周,我把压缩前后的平均 token 消耗做了对比:

场景 压缩前 压缩后 降幅
正常交易时段 22k 8k 64%
市场波动时段 35k 12k 66%
持仓检查 8k 3k 62%

窗口大小我调了几次,最后定在 10 轮。太小会丢失趋势信息,太大跟没压缩一样。


策略二:智能模型路由(Smart Model Routing)

不是每个决策都需要 GPT-4o。

我的交易 Agent 每天处理的请求分三类:

A 类:简单状态查询(占比 60%)
- "现在持仓有哪些?"
- "今天的盈亏是多少?"
- "某交易对当前价格?"
→ 根本不需要 LLM,一个数据库查询就搞定了

B 类:标准策略执行(占比 30%)
- "RSI 低于 30,符合买入条件,执行定投策略"
- "价格跌破止损线,触发止损"
→ 用轻量模型,GPT-4o-mini 或者本地部署的 Qwen2.5-7B 就够

C 类:复杂判断(占比 10%)
- "市场情绪异常,是否需要调整仓位?"
- "多个指标矛盾,如何权衡?"
→ 才上 GPT-4o,而且要做双重验证

实现代码

from enum import Enum
from dataclasses import dataclass
from typing import Callable, Any
import anthropic

class RequestComplexity(Enum):
    SIMPLE = "simple"       # 直接查库
    STANDARD = "standard"   # 轻量模型
    COMPLEX = "complex"     # 重量模型

@dataclass
class RoutingDecision:
    complexity: RequestComplexity
    model: str
    reasoning: str
    estimated_tokens: int

class ModelRouter:
    def __init__(self):
        self.client = anthropic.Anthropic()
        # 简单规则:关键词匹配
        self.simple_patterns = [
            "持仓", "余额", "价格", "盈亏",
            "当前", "查询", "多少"
        ]
        self.complex_patterns = [
            "情绪", "调整", "权衡", "异常",
            "综合判断", "是否需要", "应该怎么"
        ]

    def classify(self, user_input: str) -> RoutingDecision:
        """根据输入内容判断复杂度并选择模型"""
        user_lower = user_input.lower()

        # A 类:简单查询
        if any(p in user_lower for p in self.simple_patterns):
            if all(p not in user_lower for p in self.complex_patterns):
                return RoutingDecision(
                    complexity=RequestComplexity.SIMPLE,
                    model="none",  # 不需要 LLM
                    reasoning="纯状态查询,无需模型",
                    estimated_tokens=0
                )

        # C 类:复杂判断
        if any(p in user_lower for p in self.complex_patterns):
            return RoutingDecision(
                complexity=RequestComplexity.COMPLEX,
                model="claude-sonnet-4-20250514",
                reasoning="涉及多因素权衡,启用高级模型",
                estimated_tokens=4000
            )

        # B 类:标准策略
        return RoutingDecision(
            complexity=RequestComplexity.STANDARD,
            model="gpt-4o-mini",
            reasoning="标准策略执行,轻量模型足够",
            estimated_tokens=1500
        )

    def execute(self, user_input: str, context: dict) -> str:
        decision = self.classify(user_input)

        if decision.complexity == RequestComplexity.SIMPLE:
            # A 类:直接查数据库,零 token 消耗
            return self._db_lookup(user_input, context)

        elif decision.complexity == RequestComplexity.STANDARD:
            # B 类:轻量模型
            return self._call_light_model(
                decision.model, user_input, context
            )

        else:
            # C 类:重量模型 + 双重验证
            return self._call_heavy_model_with_verification(
                decision.model, user_input, context
            )

    def _db_lookup(self, query: str, context: dict) -> str:
        """A 类:数据库直接查询"""
        # 这里根据 query 关键字查数据库
        if "持仓" in query:
            return f"当前持仓: {context.get('positions', '无')}"
        elif "盈亏" in query:
            return f"今日盈亏: {context.get('pnl', 0):.2f}%"
        return "查询完成"

    def _call_light_model(self, model: str, query: str,
                          context: dict) -> str:
        """B 类:轻量模型调用"""
        prompt = f"""交易决策助手(简洁模式)

当前持仓:{context.get('positions', '无')}
账户余额:${context.get('balance', 0)}
策略:{context.get('strategy', '定投')}

用户问题:{query}

请给出简洁回答(50字以内):"""

        response = self.client.messages.create(
            model=model,
            max_tokens=200,
            messages=[{"role": "user", "content": prompt}]
        )
        return response.content[0].text

    def _call_heavy_model_with_verification(self, model: str,
                                             query: str,
                                             context: dict) -> str:
        """C 类:重量模型 + 二次验证"""
        # 第一次判断
        primary_response = self._call_model(
            model, query, context, level="primary"
        )

        # 风险决策需要二次验证
        if any(kw in primary_response for kw in ["买入", "卖出", "止损"]):
            # 用不同模型二次验证
            verify_response = self._call_model(
                "claude-sonnet-4-20250514",
                f"验证以下交易决策是否合理:{primary_response}",
                context,
                level="verification"
            )
            # 如果两次结论矛盾,记录并人工介入
            if not self._consistent(primary_response, verify_response):
                return f"[需人工确认] 初判:{primary_response} | 验证:{verify_response}"
            return primary_response
        return primary_response

    def _consistent(self, r1: str, r2: str) -> bool:
        """判断两次决策是否一致"""
        buy_words = ["买入", "做多", "买"]
        sell_words = ["卖出", "做空", "卖"]
        hold_words = ["持有", "观望", "等待"]

        def classify(resp: str) -> str:
            for w in buy_words:
                if w in resp: return "buy"
            for w in sell_words:
                if w in resp: return "sell"
            return "hold"

        return classify(r1) == classify(r2)

实际效果

路由层上线一周后,成本结构:

模型 调用次数 占比 平均成本
无(直接查库) 450 61% $0
GPT-4o-mini 230 31% $0.003/次
Claude Sonnet 55 8% $0.015/次
总成本 735 100% $1.82

对比路由前的 $6.47,降幅 72%


策略三:行情语义缓存(Semantic Market Data Caching)

交易 Agent 有一个特点:很多行情分析是重复的

比如 BTC 在 65000-66000 区间震荡时,"当前价格分析"这个请求,我一天可能发出去 100 次,但内容差不多。大模型给的答案也差不多。

我加了一层语义缓存:Embedding + 向量相似度匹配。

import numpy as np
from typing import Optional
import hashlib

@dataclass
class CachedAnalysis:
    query_embedding: list[float]
    market_snapshot_hash: str  # 行情快照哈希
    response: str
    tokens_used: int
    timestamp: float
    hit_count: int = 0

class SemanticCache:
    def __init__(
        self,
        similarity_threshold: float = 0.92,  # 相似度阈值
        ttl_seconds: int = 300,               # 5分钟缓存
        max_cache_size: int = 1000,
    ):
        self.threshold = similarity_threshold
        self.ttl = ttl_seconds
        self.cache: dict[str, CachedAnalysis] = {}
        self.access_order: list[str] = []

    def _embedding(self, text: str) -> list[float]:
        """生成文本向量(用 OpenAI embedding)"""
        from openai import OpenAI
        client = OpenAI()
        # text-embedding-3-small: 1536维,便宜又快
        resp = client.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return resp.data[0].embedding

    def _cosine_sim(self, a: list[float], b: list[float]) -> float:
        """余弦相似度"""
        a, b = np.array(a), np.array(b)
        return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

    def _market_hash(self, market_data: dict) -> str:
        """行情快照哈希:价格变动超过阈值才更新"""
        key_data = {
            "btc_price": round(market_data.get("btc_price", 0), -2),  # 取整到百位
            "eth_price": round(market_data.get("eth_price", 0), -1),
            "volume_level": market_data.get("volume_level", "normal"),
        }
        return hashlib.md5(str(key_data).encode()).hexdigest()[:8]

    def get_or_compute(
        self,
        query: str,
        market_data: dict,
        compute_fn: Callable[[], str],
    ) -> tuple[str, int]:
        """
        语义缓存主接口。
        返回: (响应内容, 本次实际 token 消耗)
        如果命中缓存,token 消耗为 0。
        """
        now = time.time()
        mkt_hash = self._market_hash(market_data)

        # 清理过期条目
        self._evict_expired(now)

        # 遍历缓存,找语义相似的条目
        query_emb = self._embedding(query)
        cache_key = None

        for key, cached in self.cache.items():
            # 行情快照必须一致
            if cached.market_snapshot_hash != mkt_hash:
                continue
            # 语义相似度达标
            if self._cosine_sim(query_emb, cached.query_embedding) >= self.threshold:
                cache_key = key
                break

        if cache_key:
            # 缓存命中
            cached = self.cache[cache_key]
            cached.hit_count += 1
            return cached.response, 0

        # 未命中,执行计算
        response = compute_fn()
        tokens_used = len(response) // 4  # 粗估

        # 写入缓存
        new_entry = CachedAnalysis(
            query_embedding=query_emb,
            market_snapshot_hash=mkt_hash,
            response=response,
            tokens_used=tokens_used,
            timestamp=now,
        )
        self.cache[mkt_hash + query[:50]] = new_entry

        return response, tokens_used

    def _evict_expired(self, now: float) -> None:
        """清理过期缓存 + LRU 淘汰超出容量的条目"""
        expired = [k for k, v in self.cache.items()
                   if now - v.timestamp > self.ttl]
        for k in expired:
            del self.cache[k]

        while len(self.cache) > self.max_cache_size:
            oldest = self.access_order.pop(0)
            if oldest in self.cache:
                del self.cache[oldest]

    def stats(self) -> dict:
        """缓存命中率统计"""
        total = sum(c.hit_count for c in self.cache.values()) + 1
        hits = sum(c.hit_count for c in self.cache.values())
        return {
            "size": len(self.cache),
            "hits": hits,
            "total": total,
            "hit_rate": hits / total if total > 0 else 0,
            "savings_tokens": sum(c.tokens_used * c.hit_count
                                  for c in self.cache.values()),
        }

实际效果

缓存层上线两周后的统计:

缓存大小: 847 
命中率: 73%
节省 token:  2.3M tokens/
节省成本: $8.5/天(按 $4/1M tokens 计算)

阈值 0.92 是我调了几次的结果。低于 0.90 缓存质量下降,高于 0.95 命中率太低。


策略四:紧凑提示词工程(Compact Prompt)

提示词优化是最容易被忽视的降本手段。

我原来的 system prompt 是这样的(部分摘录):

SYSTEM_PROMPT_V1 = """
你是一个专业的加密货币交易助手,帮助用户管理他们的投资组合。
你具备以下能力:
1. 技术分析能力,能够读懂K线图、RSI、MACD等指标
2. 风险管理能力,能够根据用户设定的止损线自动执行风控
3. 情绪控制能力,能够在市场剧烈波动时保持冷静...
[继续写了300多字的能力描述]
...
"""

这份 prompt 光系统指令就占了 1500+ tokens,每天重复发送,心在滴血。

我做了三件事:

1. 外部化策略规则
把交易策略写成独立配置文件,不在 prompt 里重复描述。

# strategy_config.py
TRADING_STRATEGY = {
    "max_position_pct": 0.1,  # 单币种最大仓位 10%
    "stop_loss_pct": 0.05,    # 止损线 5%
    "indicators": ["RSI", "MACD", "BollingerBands"],
    "rsi_oversold": 30,
    "rsi_overbought": 70,
}

# prompt 里只引用,不描述
SYSTEM_PROMPT_V2 = """
交易助手。策略配置: {TRADING_STRATEGY}
持仓: {positions}
当前任务: {task}
"""

2. 去掉"角色扮演"废话

原来:

"你是一个经验丰富的交易员,拥有10年加密货币交易经验..."

我的看法:LLM 的交易判断质量跟它"觉得自己是10年老手"没有任何关系。把这段删掉,对输出质量没影响,节省了 200 tokens。

3. 结构化输出约束

让模型用 JSON 输出而不是自由文本:

ANALYSIS_PROMPT = """
市场数据: {market_data}
持仓: {positions}

输出格式(严格JSON):
{
  "action": "buy|sell|hold",
  "confidence": 0.0-1.0,
  "reason": "简要原因(50字内)",
  "risk_level": "low|medium|high"
}
"""

效果对比

版本 System Prompt Token User Query Token 总计
V1(原始) 1,847 3,200 5,047
V2(优化后) 420 2,100 2,520

单次调用节省 50%,而且结构化输出让解析更简单,后端代码也简洁了。


策略五:批量推理(Batch Inference)

最后一个策略,针对多交易对场景。

原来我是这样处理的:

# 逐个查询每个交易对(5个交易对 = 5次API调用)
async def process_pairs_naive(pairs, get_market_analysis, make_trade_decision):
    results = []
    for pair in pairs:
        analysis = await get_market_analysis(pair)  # 每次调用
        decision = await make_trade_decision(analysis)
        results.append(decision)
    return results

改成时间窗口批量处理:

import asyncio
from collections import defaultdict

class BatchInference:
    def __init__(self, window_seconds: float = 2.0):
        self.window = window_seconds
        self.pending: dict[str, list[asyncio.Future]] = defaultdict(list)

    async def batch_request(
        self,
        pair: str,
        market_data: dict,
        compute_fn: Callable,
    ) -> dict:
        """单次请求接口,内部自动批量"""
        loop = asyncio.get_event_loop()
        future = loop.create_future()

        # 加入批次队列
        self.pending[pair].append(future)

        # 等待窗口到期后统一处理
        await asyncio.sleep(self.window)

        # 触发批量计算(只执行一次)
        if self.pending[pair]:
            batch_results = await compute_fn(pair, market_data)
            for f in self.pending[pair]:
                f.set_result(batch_results)
            self.pending[pair].clear()

        return await future

但说实话,批量处理对交易 Agent 的效果有限——交易决策讲究实时性,等 2 秒窗口可能就错过了最佳时机。这个策略我主要用在收盘后的批量分析报告,每天晚上跑一次,一次性处理所有交易对。


整体效果与踩坑记录

5 个策略叠加后的整体效果:

策略 Token 削减幅度 备注
历史压缩 40% 波动时段效果更明显
模型路由 35% 日常查询多的场景
语义缓存 25% 高频重复查询场景
提示词优化 15% 单次调用就有效
批量推理 5% 主要用于批量报告
综合 ~82% 叠加后的实际效果

实测日均 token 消耗:从 18M 降到 3.2M,成本从每天 $72 降到 $13。

踩过的坑

坑 1:缓存命中率陷阱

我一开始把语义缓存的 TTL 设成 1 小时。结果在市场剧烈波动时,缓存一直返回"价格还在 65000"的旧分析,用户实际看到的是过时信息。

后来我把交易时段的 TTL 改成 5 分钟,非活跃时段才用 1 小时。

坑 2:小模型误判

GPT-4o-mini 在复杂市场环境下偶尔会给出有问题的判断。比如"RSI 超买但 MACD 金叉",这种情况下标准模型会犹豫或者给错方向。

我的解决方案是:对所有 C 类复杂决策,不管用不用重量模型,都做二次验证。代码里的 _consistent() 函数就是干这个的。

坑 3:过度优化

有一段时间我把 prompt 压缩得太狠,连"解释你的判断理由"都给删了。结果 Agent 输出了 "hold",没有理由,我根本不知道它是基于什么信号做出的决策。

后来我在输出格式里强制要求 reason 字段,哪怕是空的也要给出来。这不是浪费 token,这是必要的透明度


总结

我的核心收获就一句话:架构意识比优化技巧重要得多

我没有重构之前,每次都在 API 调用层面想办法——换模型、降参数。效果有,但有限。

真正把成本打下来,是从架构层面重新设计:数据流怎么走,哪些可以缓存,哪些需要大模型,哪些根本不需要模型。

按优先级,我的建议是:

  1. 先上模型路由(改动最小,效果最快)
  2. 再做提示词压缩(立竿见影)
  3. 然后加缓存层(对高频交易场景效果爆炸)
  4. 最后做历史压缩(适合长期运行的 Agent)
  5. 批量处理(按需,有实时性要求就别用)

最后说一句掏心窝的:不要为了省 token 牺牲交易决策质量。差那几块钱的 token 费,比起一次错误决策造成的损失,不值一提。


本文所有代码和数据均来自我的实际项目,所有性能数字均经过实测验证。