How I Used Gemini CLI to Orchestrate a Complex RAG Migration

三个月前,我接到了一个任务:把一个基于早期 LangChain 版本的自建 RAG 系统,迁移到新的技术栈。

这个系统跑了两年,积累了:
- 50 万条文档向量(存在 PostgreSQL 的 pgvector 里)
- 2 万行 Python 代码(LangChain v0.0.x)
- 三个独立的检索服务
- 无数 hardcode 的配置

迁移目标:
- 升级到 LangChain v0.2+
- 从 pgvector 迁移到 Pinecone(统一管理、更好的 ANN 性能)
- 重构检索服务,统一 API
- 支持多模态检索(未来要支持图片)

产品经理给的 deadline:6 周。

传统做法:两个人月的工作量,我一个人用 AI agent,4 周完成了。

这篇文章讲的是这个过程:怎么做、踩了什么坑、gemini CLI 的能力边界在哪里。

项目背景:现有的 RAG 系统

先交代一下背景。

我们 2022 年搭了一套 RAG 系统,当时 LangChain 还是 0.0.x 版本,向量数据库用的 pgvector。

核心架构是这样的:

# 旧版架构(LangChain 0.0.x)
from langchain import OpenAI, VectorDBQA
from pgvector.sqlalchemy import Vector
from sqlalchemy import create_engine

class OldRAGSystem:
    def __init__(self):
        self.llm = OpenAI(temperature=0)
        self.engine = create_engine("postgresql://localhost:5432/vectors")
        self.vectorstore = VectorDBQA.from_connector(
            name="docs",
            description="Company documentation",
            embedding_function=self._get_embedding,
            engine=self.engine,
            table_name="document_vectors"
        )

    def query(self, question: str) -> str:
        # 简单直接:检索 -> 直接返回
        docs = self.vectorstore.similarity_search(question, k=5)
        context = "\n".join([d.page_content for d in docs])
        return self.llm.call(f"Context: {context}\n\nQuestion: {question}")

这套系统有两个主要问题:

  1. LangChain API 大变:0.0.x 到 0.2.x 的 API 完全重写,老代码没法直接升级
  2. pgvector 性能瓶颈:50 万向量、768 维,ANN 查询延迟 200-300ms,P99 更高

迁移不是简单的代码翻译,是一次重构。

为什么选 Gemini CLI

迁移任务的特点:
- 代码量大(2 万行)
- 需要分阶段验证
- 要保持服务不中断
- 涉及多个组件的协调

我需要一个能处理复杂多步骤任务的 AI agent。选了 Gemini CLI,原因是:

  1. 长上下文:Gemini 2.5 Pro 的 100 万 token 上下文,能一次性处理整个模块的代码
  2. 多模态能力:未来要支持图片,需要能理解视觉内容的模型
  3. Code Execution:Gemini CLI 支持在对话中执行代码,能快速验证

实际使用下来,这三个能力的价值排序是:Code Execution > 长上下文 > 多模态

迁移策略:分阶段执行

我把整个迁移分成了 5 个阶段:

Phase 1: 现状分析(Gemini 完成)
Phase 2: 基础设施搭建(Gemini + 人工)
Phase 3: 核心逻辑迁移(Gemini 完成)
Phase 4: 数据迁移(人工 + Gemini 辅助脚本)
Phase 5: 灰度验证(人工 + Gemini 辅助)

Phase 1: 现状分析

这一步我让 Gemini CLI 帮我分析了现有的 2 万行代码。

# 启动 Gemini CLI 分析模式
gemini-cli chat --model gemini-2.5-pro

# 我的输入:
我需要你分析整个 /path/to/old_rag 目录的代码结构。
请输出:
1. 代码模块划分(每个模块的功能)
2. API 调用链路(从 query 到最终返回的完整流程)
3. 外部依赖(有哪些外部服务、数据库、API)
4. 配置管理(hardcode 的配置有哪些)
5. 测试覆盖(有哪些测试,测试覆盖了哪些场景)

Gemini 的分析结果:

## 代码模块划分

### 模块 1: vector_storage(向量存储层)
- `embedding_service.py`: 文本向量化
- `pgvector_store.py`: pgvector 存储和检索
- `chunking_strategy.py`: 文档分块策略

问题:embedding_service 使用的是 OpenAI ada-002,hardcode

### 模块 2: retrieval(检索层)
- `semantic_search.py`: 语义检索
- `keyword_search.py`: 关键词检索
- `hybrid_search.py`: 混合检索(但实现有问题)

问题:混合检索是简单 OR,没有真正融合 score

### 模块 3: generation(生成层)
- `llm_service.py`: LLM 调用封装
- `prompt_template.py`: Prompt 管理

问题:Prompt 散落在代码各处,没有统一管理

### 模块 4: api(对外接口)
- `fastapi_server.py`: API 服务
- `endpoints/`: 各端点定义

### 外部依赖
- PostgreSQL + pgvector
- OpenAI API( ada-002 + GPT-4)
- Redis(缓存)

### Hardcode 配置
- embedding model: text-embedding-ada-002
- chunk_size: 512
- top_k: 5
- temperature: 0

这个分析花了我 2 小时review,但帮了大忙——让我对整个系统有了全局视图,而不是一头扎进代码细节里。

Phase 2: 基础设施搭建

这一阶段是纯人工,Gemini 帮不上忙:

  1. 申请 Pinecone 账号和索引
  2. 搭新的开发环境(Python 3.11 + poetry)
  3. 配置新的 embedding 服务(换成 text-embedding-3-small,便宜 5 倍)
# 新的 embedding 配置
EMBEDDING_CONFIG = {
    "provider": "openai",
    "model": "text-embedding-3-small",  # 从 ada-002 升级
    "dimension": 1536,  # 从 1536 降到 1536(3-small 支持 256-3072)
    "batch_size": 100,
}

Phase 3: 核心逻辑迁移

这是 Gemini CLI 发力的地方。我用了一个策略:先让 Gemini 生成新代码,然后人工 review,最后人工执行

3.1 向量存储层迁移

# Gemini 生成的 Pinecone 存储层代码
from pinecone import Pinecone, ServerlessSpec
from langchain_pinecone import PineconeVectorStore
from langchain_openai import OpenAIEmbeddings

class NewVectorStore:
    def __init__(self, api_key: str, index_name: str):
        self.pc = Pinecone(api_key=api_key)

        # 确保索引存在
        if index_name not in [idx.name for idx in self.pc.list_indexes()]:
            self.pc.create_index(
                name=index_name,
                dimension=1536,
                metric="cosine",
                spec=ServerlessSpec(cloud="aws", region="us-east-1")
            )

        self.index = self.pc.Index(index_name)
        self.embeddings = OpenAIEmbeddings(
            model="text-embedding-3-small"
        )
        self.vectorstore = PineconeVectorStore(
            index=self.index,
            embedding=self.embeddings,
            text_key="text"
        )

    def add_documents(self, documents: list, ids: list = None):
        """批量添加文档"""
        return self.vectorstore.add_documents(documents=documents, ids=ids)

    def similarity_search(self, query: str, k: int = 5, filter: dict = None):
        """相似度检索"""
        return self.vectorstore.similarity_search(
            query=query,
            k=k,
            filter=filter
        )

    def similarity_search_with_score(self, query: str, k: int = 5):
        """带分数的检索"""
        return self.vectorstore.similarity_search_with_score(
            query=query,
            k=k
        )

Gemini 生成的代码有个问题:它用了 text-embedding-3-small,但我的配置里 embedding dimension 是 1536,而旧系统用 ada-002 是 1536 维,没问题。但我后来发现 3-small 其实可以设 1536,不需要改索引配置。

踩坑:Gemini 生成的代码里没有处理 filter 参数的边界情况,我 review 的时候加上了。

3.2 检索层迁移

这是最复杂的部分。旧系统的混合检索实现有问题,我要重新做。

# Gemini 生成的混合检索实现
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
from langchain_core.documents import Document
from rank_bm25 import BM25Okapi
import numpy as np

class HybridSearchRetriever:
    def __init__(self, vectorstore, documents: list):
        self.vectorstore = vectorstore

        # 构建 BM25 检索器
        self.doc_texts = [doc.page_content for doc in documents]
        self.bm25 = BM25Okapi(self.doc_texts)

    def retrieve(self, query: str, k: int = 10):
        """
        混合检索:向量检索 + BM25,RRF 融合
        RRF (Reciprocal Rank Fusion): 1/(rank + k),k=60 是经验值
        """
        # 向量检索
        vector_results = self.vectorstore.similarity_search_with_score(query, k=k)
        vector_docs = [doc for doc, score in vector_results]

        # BM25 检索
        tokenized_query = query.split()
        bm25_scores = self.bm25.get_scores(tokenized_query)
        top_k_indices = np.argsort(bm25_scores)[::-1][:k]
        bm25_docs = [Document(page_content=self.doc_texts[i])
                     for i in top_k_indices]

        # RRF 融合
        k_rrf = 60  # RRF 参数
        rrf_scores = {}

        for rank, (doc, score) in enumerate(vector_results):
            doc_id = doc.page_content[:100]  # 用前 100 字符做 ID
            rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1 / (rank + k_rrf)

        for rank, doc in enumerate(bm25_docs):
            doc_id = doc.page_content[:100]
            rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1 / (rank + k_rrf)

        # 排序返回
        sorted_docs = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)
        final_docs = [Document(page_content=doc_id) for doc_id, _ in sorted_docs[:k]]

        return final_docs

这段代码里,RRF 融合的实现有一个 bug:用的是 doc.page_content[:100] 作为 doc_id,但如果有两条文档的前 100 个字符相同,就会被错误地当作同一条文档。

踩坑记录:我在测试的时候发现了这个问题,改成了用 document metadata 里的唯一 ID。

# 修正后的实现
def retrieve(self, query: str, k: int = 10, doc_ids: list = None):
    """
    混合检索 RRF 融合
    doc_ids: 文档唯一 ID 列表
    """
    # ... 向量检索和 BM25 检索同上 ...

    # 用 doc_id 而不是 page_content 前 100 字符
    for rank, (doc, score) in enumerate(vector_results):
        doc_id = doc.metadata.get('id') or doc_ids[rank]
        rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1 / (rank + k_rrf)

    for rank, (doc, score) in enumerate(bm25_scores[:k]):
        doc_id = doc_ids[rank]
        rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1 / (rank + k_rrf)

3.3 生成层迁移

# Gemini 生成的 LLM 服务封装
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

class NewLLMService:
    def __init__(self, model: str = "gpt-4o", temperature: float = 0):
        self.llm = ChatOpenAI(
            model=model,
            temperature=temperature
        )

    def create_chain(self, system_prompt: str, retrieval_chain):
        """创建 RAG chain"""
        prompt = ChatPromptTemplate.from_messages([
            ("system", system_prompt),
            ("human", "{question}")
        ])

        chain = (
            {"context": retrieval_chain, "question": RunnablePassthrough()}
            | prompt
            | self.llm
            | StrOutputParser()
        )

        return chain

    def query(self, question: str, context: str) -> str:
        """直接调用(不走 retrieval chain)"""
        prompt = f"""Given the following context:
{context}

Answer the question:
{question}
"""
        return self.llm.invoke(prompt)

Phase 4: 数据迁移

这是最危险的部分:把 50 万条向量从 pgvector 迁移到 Pinecone。

我的策略:
1. 先迁移 1000 条,验证流程
2. 没问题的话,再批量迁移
3. 迁移过程中保持旧系统运行

# 数据迁移脚本
from tqdm import tqdm

class VectorMigrator:
    def __init__(self, old_pg_conn, new_pinecone_index):
        self.old_pg = old_pg_conn
        self.new_index = new_pinecone_index

    def migrate_batch(self, batch_size: int = 1000):
        """
        批量迁移向量数据
        """
        # 从 pgvector 读取
        query = f"""
        SELECT id, content, embedding, metadata
        FROM document_vectors
        WHERE migrated = FALSE
        ORDER BY id
        LIMIT {batch_size}
        """

        rows = self.old_pg.execute(query).fetchall()

        if not rows:
            print("No more rows to migrate")
            return 0

        # 转换成 Pinecone 格式
        vectors = []
        ids_to_update = []

        for row in rows:
            vectors.append({
                "id": str(row.id),
                "values": row.embedding,  # pgvector 是 list 类型
                "metadata": {
                    "content": row.content[:2000],  # Pinecone metadata 有 40KB 限制
                    **row.metadata
                }
            })
            ids_to_update.append(row.id)

        # 写入 Pinecone
        self.new_index.upsert(vectors)

        # 标记已迁移
        update_query = """
        UPDATE document_vectors
        SET migrated = TRUE, migrated_at = NOW()
        WHERE id = ANY(:ids)
        """
        self.old_pg.execute(update_query, {"ids": ids_to_update})

        return len(rows)

    def migrate_all(self, total: int):
        """
        完整迁移
        """
        migrated = 0
        batch_size = 1000

        with tqdm(total=total) as pbar:
            while True:
                count = self.migrate_batch(batch_size)
                if count == 0:
                    break
                migrated += count
                pbar.update(count)

        print(f"Migration complete: {migrated} vectors")

# 使用
migrator = VectorMigrator(old_pg, pinecone_index)
migrator.migrate_all(total=500000)

迁移 50 万条向量花了大约 8 小时,主要是网络 IO 等待。实际迁移过程中发现的问题:

  1. 旧数据有 2000 条是脏数据(embedding 维度不对),需要先清理
  2. 部分 content 超过 40KB,需要截断
  3. metadata 里有 Python object(pickle 序列化的),需要反序列化后重新序列化

这些问题都是我在测试阶段没发现,批量迁移时才暴露的。教训是:测试数据要足够大,至少要覆盖 1% 的真实数据量

Phase 5: 灰度验证

迁移完成后,需要验证新系统跟旧系统输出一致。

# 验证脚本
import json
from datetime import datetime

class RAGValidator:
    def __init__(self, old_rag, new_rag, test_queries: list):
        self.old_rag = old_rag
        self.new_rag = new_rag
        self.test_queries = test_queries
        self.results = []

    def run_validation(self):
        """运行验证测试"""
        for query in self.test_queries:
            old_result = self.old_rag.query(query)
            new_result = self.new_rag.query(query)

            # 用 embedding similarity 比较两个回答的语义相似度
            old_embedding = get_embedding(old_result)
            new_embedding = get_embedding(new_result)
            similarity = cosine_similarity(old_embedding, new_embedding)

            self.results.append({
                "query": query,
                "old_result": old_result,
                "new_result": new_result,
                "similarity": similarity,
                "passed": similarity > 0.85  # 阈值 0.85
            })

        return self.results

    def generate_report(self) -> dict:
        """生成验证报告"""
        passed = sum(1 for r in self.results if r["passed"])
        total = len(self.results)

        return {
            "timestamp": datetime.now().isoformat(),
            "total_queries": total,
            "passed": passed,
            "failed": total - passed,
            "pass_rate": f"{passed/total*100:.1f}%",
            "failed_queries": [r for r in self.results if not r["passed"]]
        }

我准备了 200 个测试 query,验证结果:

  • 185 个通过(similarity > 0.85)
  • 15 个未通过

15 个未通过的 case 里:
- 10 个是因为新系统的检索结果确实比旧系统好(similarity 0.75-0.85,问法不一样但答案更准确)
- 5 个是真的有问题(embedding 维度不一致导致的)

这 5 个问题定位后修复,最终验证通过率 100%。

Gemini CLI 的能力边界

用了 4 周,我对 Gemini CLI 的能力边界有了清晰认知。

能力强的部分

  1. 代码翻译:把一个框架的代码翻译成另一个框架,Gemini 做得很准确。LangChain 0.0.x → LangChain 0.2.x 的翻译,我让它做了 80%,review 花的时间比写的时间还少。

  2. 代码解释:让它解释一段旧代码的逻辑,比我自己读代码理解得更快。

  3. 生成测试用例:给定一个函数,让它生成边界测试用例,覆盖率比我手动写的高。

  4. 多步骤推理:复杂业务逻辑的推理,比如"这个迁移方案会影响哪些下游服务",Gemini 能给出比较完整的分析。

能力弱的部分

  1. 不知道你的上下文:Gemini CLI 的代码生成是通用模式,不了解你项目的特殊约束。比如我知道 pgvector 的向量维度必须固定,但 Gemini 生成的代码假设维度可以动态调整。

  2. 执行环境隔离:Gemini CLI 能执行代码,但执行环境跟生产环境不一样。生产环境的配置、依赖、网络策略,在 Gemini 的执行环境里都不存在。

  3. 无法处理超长代码:虽然 Gemini 支持 100 万 token 上下文,但超过 5000 行的代码文件,生成质量会明显下降。我后来强制自己把大文件拆成小文件再让 Gemini 处理。

  4. 不知道你的数据质量:迁移脚本在真实数据上跑之前,Gemini 不知道你的数据有多脏。它生成的迁移代码假设数据是干净的。

总结:AI Agent 做迁移的合适姿势

这次迁移能 4 周完成,AI agent 帮了大忙。但关键是知道怎么用它:

AI agent 适合的场景
- 代码翻译(框架升级、API 重写)
- 模式化的代码生成(CRUD、基础服务封装)
- 代码审查和解释
- 测试用例生成

AI agent 不适合的场景
- 需要了解业务上下文的决策(用 pgvector 还是 Pinecone?)
- 数据质量问题处理(脏数据、边界情况)
- 涉及多个外部系统的协调(这个 DB 连哪个?那个 API 怎么认证?)
- 最终验证(必须人来跑,必须人来判断)

核心经验是:AI agent 是执行层的加速器,不是决策层的替代品。我负责想清楚要迁移到哪里、怎么验证,AI agent 负责把"怎么做"快速执行出来。

这个分工让我的效率提升了 3-4 倍,但每个关键决策还是我自己做。4 周完成原本 2 个月的工作量,这个比例是合理的——不是 AI 替代了人,是人用对了 AI。


迁移的完整代码和脚本在 GitHub 上(链接略)。如果你们也在做类似的 RAG 迁移,欢迎交流经验。