OpenAI Tells You What You Spent. Not Where. So I Built a Dashboard.

上个月收到账单,$2,847。比上月多了 40%。

我翻遍了 usage 页面,看到的数字是:GPT-4o mini 调用了 180 万次 tokens,GPT-4o 调用了 50 万次 tokens。

但我不知道的是:
- 哪部分业务在用 GPT-4o mini?哪些在用 GPT-4o?
- 哪些用户/请求消耗了最多的 tokens?
- 什么时候出现了异常的峰值?
- 哪个 prompt 模板最费 tokens?

OpenAI 给我的只是一个总数,没有维度分解。

我花了两天做了一个 Dashboard,现在能看到:
- 按业务线分解的成本
- 按用户/请求分解的消耗
- 异常的请求模式
- Prompt 模板的效率分析

这篇文章讲的是:怎么把 OpenAI 的 usage 数据变成可操作的信息

OpenAI Usage API 的限制

先说清楚 OpenAI 给了什么。

OpenAI 的 Usage 页面(https://platform.openai.com/settings/usage)提供:

  1. 总量:每月的总 tokens 和总费用
  2. 日分解:每天的用量
  3. 模型分解:每个模型的用量

但没有:
- 按业务线/功能分解
- 按用户/账户分解
- 按请求时间序列(无法看日内波动)
- Prompt/Completion tokens 分开统计的详细数据
- 每个 API key 的详细使用记录(只给总数)

OpenAI 提供了 Usage API,但返回的数据维度有限:

import openai

client = openai.OpenAI()

# 调用 Usage API
response = client.with_raw_response.get(
    "https://api.openai.com/dashboard/billing/usage",
    params={
        "start_date": "2026-04-01",
        "end_date": "2026-04-30",
    }
)

usage_data = response.json()
print(usage_data)

返回的数据结构:

{
  "object": "list",
  "data": [
    {
      "id": "Usage-xxx",
      "object": "api_credit_grants",
      "line_item": {
        "type": "Tokens",
        "model": "gpt-4o",
        "price_type": "3150",
        "unit_price": "0.000015",
        "quantity": "500000"
      },
      "status": "succeeded",
      "created_date": "2026-04-15",
      "amount": "7.5"
    }
  ],
  "total_usage": "2847.00"
}

这个数据的限制:只有模型维度的聚合,没有请求维度的明细

数据收集方案

要构建有价值的 Dashboard,需要请求维度的明细数据。

解决方案:在调用 OpenAI API 的地方,统一记录每次请求的元数据

日志拦截层

我用一个 Python decorator 拦截所有 OpenAI 调用:

import functools
import json
import time
from datetime import datetime
from typing import Any, Callable
import openai

class OpenAIRequestLogger:
    """记录每次 OpenAI API 请求"""

    def __init__(self, db_path: str = "openai_usage.db"):
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        """初始化 SQLite 数据库"""
        import sqlite3
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute("""
            CREATE TABLE IF NOT EXISTS openai_requests (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TEXT NOT NULL,
                request_id TEXT,
                model TEXT NOT NULL,
                prompt_tokens INTEGER,
                completion_tokens INTEGER,
                total_tokens INTEGER,
                cost_usd REAL,
                user_id TEXT,
                session_id TEXT,
                feature TEXT,
                prompt_hash TEXT,
                response_time_ms INTEGER,
                status TEXT,
                error_message TEXT
            )
        """)

        conn.commit()
        conn.close()

    def log_request(
        self,
        model: str,
        prompt_tokens: int,
        completion_tokens: int,
        total_tokens: int,
        cost_usd: float,
        user_id: str = None,
        session_id: str = None,
        feature: str = None,
        request_id: str = None,
        response_time_ms: int = None,
        status: str = "success",
        error_message: str = None
    ):
        """记录一次请求"""
        import sqlite3
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute("""
            INSERT INTO openai_requests
            (timestamp, request_id, model, prompt_tokens, completion_tokens,
             total_tokens, cost_usd, user_id, session_id, feature,
             response_time_ms, status, error_message)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            datetime.now().isoformat(),
            request_id,
            model,
            prompt_tokens,
            completion_tokens,
            total_tokens,
            cost_usd,
            user_id,
            session_id,
            feature,
            response_time_ms,
            status,
            error_message
        ))

        conn.commit()
        conn.close()

# 全局 logger 实例
logger = OpenAIRequestLogger()

def track_openai_cost(feature: str = None):
    """Decorator:自动记录 OpenAI 请求成本"""
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()

            # 调用原始函数
            result = func(*args, **kwargs)

            # 计算耗时
            elapsed_ms = int((time.time() - start_time) * 1000)

            # 提取模型信息(从返回的 Usage 对象)
            if hasattr(result, 'usage') and result.usage:
                usage = result.usage
                model = result.model
                prompt_tokens = usage.prompt_tokens or 0
                completion_tokens = usage.completion_tokens or 0
                total_tokens = usage.total_tokens or 0

                # 估算成本(GPT-4o Mini 和 GPT-4o 的定价)
                cost_per_1k_prompt = {
                    "gpt-4o-mini": 0.00015,
                    "gpt-4o-mini-2024-07-18": 0.00015,
                    "gpt-4o": 0.0025,
                    "gpt-4o-2024-08-06": 0.0025,
                    "gpt-4o-2024-05-13": 0.005,
                }
                prompt_cost = (prompt_tokens / 1000) * cost_per_1k_prompt.get(model, 0.0015)
                completion_cost = (completion_tokens / 1000) * cost_per_1k_prompt.get(model, 0.0015) * 10
                cost_usd = prompt_cost + completion_cost

                # 记录日志
                logger.log_request(
                    model=model,
                    prompt_tokens=prompt_tokens,
                    completion_tokens=completion_tokens,
                    total_tokens=total_tokens,
                    cost_usd=cost_usd,
                    feature=feature,
                    response_time_ms=elapsed_ms
                )

            return result

        return wrapper
    return decorator

使用方式

# 在业务代码中使用
class AIService:
    def __init__(self):
        self.client = openai.OpenAI()

    @track_openai_cost(feature="chat_completion")
    def chat_completion(self, user_id: str, session_id: str, messages: list):
        """聊天接口"""
        response = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages,
            user=user_id  # OpenAI 建议:用 user 字段标识用户
        )

        # 记录额外信息
        logger.log_request(
            model=response.model,
            prompt_tokens=response.usage.prompt_tokens,
            completion_tokens=response.usage.completion_tokens,
            total_tokens=response.usage.total_tokens,
            cost_usd=self._estimate_cost(response),
            user_id=user_id,
            session_id=session_id,
            feature="chat_completion",
            request_id=response.id
        )

        return response

    def _estimate_cost(self, response):
        """估算单次请求成本"""
        # 这里用简化公式,实际应该用更精确的定价表
        return response.usage.total_tokens * 0.000001

Dashboard 实现

数据收集了 2 周后,我用 Streamlit 做了一个 Dashboard:

1. 成本概览

import streamlit as st
import sqlite3
import pandas as pd
import plotly.express as px

def render_cost_overview(db_path: str):
    """成本概览页面"""

    st.set_page_config(page_title="OpenAI Cost Dashboard", layout="wide")
    st.title("💰 OpenAI Cost Dashboard")

    conn = sqlite3.connect(db_path)
    df = pd.read_sql_query("""
        SELECT
            date(timestamp) as date,
            SUM(cost_usd) as total_cost,
            SUM(total_tokens) as total_tokens,
            COUNT(*) as request_count,
            AVG(response_time_ms) as avg_latency_ms
        FROM openai_requests
        WHERE timestamp >= date('now', '-30 days')
        GROUP BY date(timestamp)
        ORDER BY date
    """, conn)

    col1, col2, col3, col4 = st.columns(4)

    with col1:
        st.metric(
            "Total Cost (30d)",
            f"${df['total_cost'].sum():.2f}",
            delta=f"{(df['total_cost'].sum() / 2847 - 1) * 100:.1f}% vs last month"
        )

    with col2:
        st.metric("Total Tokens", f"{df['total_tokens'].sum()/1_000_000:.1f}M")

    with col3:
        st.metric("Total Requests", f"{df['request_count'].sum():,}")

    with col4:
        st.metric("Avg Latency", f"{df['avg_latency_ms'].mean():.0f}ms")

    # 成本趋势图
    fig = px.line(
        df,
        x='date',
        y='total_cost',
        title='Daily Cost Trend',
        labels={'date': 'Date', 'total_cost': 'Cost (USD)'}
    )
    st.plotly_chart(fig, use_container_width=True)

2. 按 Feature 分解

def render_feature_breakdown(db_path: str):
    """按功能线分解成本"""

    conn = sqlite3.connect(db_path)
    df = pd.read_sql_query("""
        SELECT
            feature,
            SUM(cost_usd) as total_cost,
            SUM(total_tokens) as total_tokens,
            COUNT(*) as request_count,
            AVG(total_tokens) as avg_tokens_per_request,
            AVG(response_time_ms) as avg_latency
        FROM openai_requests
        WHERE timestamp >= date('now', '-30 days')
        GROUP BY feature
        ORDER BY total_cost DESC
    """, conn)

    st.subheader("📊 Cost by Feature")

    # 成本柱状图
    fig = px.bar(
        df,
        x='feature',
        y='total_cost',
        color='feature',
        title='Cost by Feature'
    )
    st.plotly_chart(fig, use_container_width=True)

    # 详细表格
    df['cost_share'] = df['total_cost'] / df['total_cost'].sum() * 100
    df['avg_cost_per_request'] = df['total_cost'] / df['request_count']

    st.dataframe(
        df[['feature', 'total_cost', 'cost_share', 'request_count', 'avg_cost_per_request']],
        column_config={
            "total_cost": st.column_config.NumberColumn("Cost ($)", format="%.2f"),
            "cost_share": st.column_config.NumberColumn("Share (%)", format="%.1f"),
            "avg_cost_per_request": st.column_config.NumberColumn("Cost/Req ($)", format="%.4f"),
        },
        hide_index=True
    )

3. 用户维度分析

def render_user_analysis(db_path: str):
    """按用户维度分析"""

    conn = sqlite3.connect(db_path)
    df = pd.read_sql_query("""
        SELECT
            user_id,
            SUM(cost_usd) as total_cost,
            SUM(total_tokens) as total_tokens,
            COUNT(*) as request_count,
            AVG(total_tokens) as avg_tokens_per_request,
            MIN(timestamp) as first_request,
            MAX(timestamp) as last_request
        FROM openai_requests
        WHERE
            timestamp >= date('now', '-30 days')
            AND user_id IS NOT NULL
        GROUP BY user_id
        ORDER BY total_cost DESC
        LIMIT 50
    """, conn)

    st.subheader("👥 Cost by User (Top 50)")

    col1, col2 = st.columns(2)

    with col1:
        # 成本分布
        fig = px.pie(
            df.head(10),
            values='total_cost',
            names='user_id',
            title='Top 10 Users by Cost'
        )
        st.plotly_chart(fig, use_container_width=True)

    with col2:
        # 请求次数 vs 成本效率
        fig = px.scatter(
            df,
            x='request_count',
            y='total_cost',
            size='total_tokens',
            color='avg_tokens_per_request',
            hover_data=['user_id'],
            title='Request Count vs Cost (size = total tokens)'
        )
        st.plotly_chart(fig, use_container_width=True)

4. 异常检测

def detect_anomalies(db_path: str) -> list:
    """检测异常请求模式"""

    conn = sqlite3.connect(db_path)
    df = pd.read_sql_query("""
        SELECT
            user_id,
            date(timestamp) as date,
            hour(timestamp) as hour,
            SUM(cost_usd) as cost,
            SUM(total_tokens) as tokens,
            COUNT(*) as requests
        FROM openai_requests
        WHERE timestamp >= date('now', '-7 days')
        GROUP BY user_id, date(timestamp), hour(timestamp)
    """, conn)

    anomalies = []

    # 检测 1:单个用户单小时成本异常高
    hourly_avg = df.groupby('user_id')['cost'].mean()
    for _, row in df.iterrows():
        user_avg = hourly_avg.get(row['user_id'], 0)
        if user_avg > 0 and row['cost'] > user_avg * 5:
            anomalies.append({
                'type': 'high_cost',
                'user_id': row['user_id'],
                'date': row['date'],
                'hour': row['hour'],
                'cost': row['cost'],
                'expected': user_avg,
                'actual': row['cost'],
                'ratio': row['cost'] / user_avg
            })

    # 检测 2:单个请求 token 数异常高
    request_df = pd.read_sql_query("""
        SELECT * FROM openai_requests
        WHERE timestamp >= date('now', '-7 days')
    """, conn)

    avg_tokens = request_df['total_tokens'].mean()
    for _, row in request_df.iterrows():
        if row['total_tokens'] > avg_tokens * 10:
            anomalies.append({
                'type': 'high_tokens',
                'user_id': row['user_id'],
                'timestamp': row['timestamp'],
                'tokens': row['total_tokens'],
                'expected': avg_tokens,
                'model': row['model']
            })

    return anomalies

def render_anomaly_alerts(db_path: str):
    """异常告警页面"""

    anomalies = detect_anomalies(db_path)

    st.subheader("🚨 Anomaly Alerts")

    if not anomalies:
        st.success("No anomalies detected in the last 7 days")
        return

    # 按类型分组
    high_cost = [a for a in anomalies if a['type'] == 'high_cost']
    high_tokens = [a for a in anomalies if a['type'] == 'high_tokens']

    if high_cost:
        st.warning(f"Found {len(high_cost)} high-cost events")

        df_cost = pd.DataFrame(high_cost)
        st.dataframe(
            df_cost[['user_id', 'date', 'hour', 'cost', 'expected', 'ratio']],
            column_config={
                "cost": st.column_config.NumberColumn("Actual ($)", format="%.2f"),
                "expected": st.column_config.NumberColumn("Expected ($)", format="%.2f"),
                "ratio": st.column_config.NumberColumn("Multiplier", format="%.1f"),
            },
            hide_index=True
        )

实际发现

Dashboard 跑了 2 周后,我发现了一些有意思的数据:

发现 1:有一个功能占了我 60% 的成本

Dashboard 显示,文档摘要功能消耗了 60% 的成本,但只贡献了 15% 的用户价值(用会话数衡量)。

# Feature 成本分布
feature_cost = {
    "chat_completion": 0.25,      # 25%
    "doc_summarization": 0.60,    # 60%
    "code_generation": 0.10,     # 10%
    "other": 0.05                # 5%
}

原因:文档摘要用的是 GPT-4o,但实际场景不需要那么高的智能。换成 GPT-4o mini,质量和效果差别不大。

改完之后,同等功能,成本降了 70%。

发现 2:有一个用户消耗了 40% 的成本

用户维度分析显示,用户 user_3821 消耗了 40% 的成本,原因是它在做一个批量数据处理任务,调用频率异常高。

但这个用户是内部测试账号,不应该走生产计费。

结论:需要给内部测试账号加白名单,或者单独建一个 internal org。

发现 3:晚间有异常的 API 调用

发现凌晨 2-4 点有持续的 API 调用,但不是用户活跃时间。

调查后发现:有一个定时任务配置错误,同一个任务跑了 3 份副本,重复消耗了 3 倍的成本。

结论:需要加任务锁,避免重复执行。

Dashboard 的局限

做完之后,我发现了一些局限性:

局限 1:无法追踪历史数据

数据是从 Dashboard 搭建之后才开始收集的,历史数据拿不到。

OpenAI 的 Usage API 可以查 90 天内的数据,但只有日聚合,没有请求明细。

解决:对于历史数据,只能估算。可以根据日均 cost 和当时的功能分布推算,但精度有限。

局限 2:成本估算不精确

我用的成本公式是简化版,跟 OpenAI 的实际定价有出入。

OpenAI 的定价是按照 prompt tokens 和 completion tokens 分别计费的,而且有 volume discount。简化公式误差在 5-10% 左右。

解决:如果需要精确数据,可以导出 OpenAI 的 invoice 对账。

局限 3:多租户场景下数据不全

如果你的应用是多租户架构,只用 user 字段标识用户可能不够。但 OpenAI 有 rate limit,频繁请求会触发限制。

解决:可以用 session + user 的组合维度做分析。

总结

这个东西没有多复杂,但很有用。

核心价值就两点:

  1. 从"总共花了多少"到"花在什么地方"
  2. 从被动看账单到主动发现异常

$2,847 → $1,200,成本降了 58%,不是因为砍功能,是因为搞清楚钱花在哪,然后优化。

Dashboard 搭好了,代码在 GitHub 上(链接略),欢迎参考。


如果你也在管 OpenAI API 成本,建议先上日志记录,再上 Dashboard。不难,但很有用。