How I Used Gemini CLI to Orchestrate a Complex RAG Migration
三个月前,我接到了一个任务:把一个基于早期 LangChain 版本的自建 RAG 系统,迁移到新的技术栈。
这个系统跑了两年,积累了:
- 50 万条文档向量(存在 PostgreSQL 的 pgvector 里)
- 2 万行 Python 代码(LangChain v0.0.x)
- 三个独立的检索服务
- 无数 hardcode 的配置
迁移目标:
- 升级到 LangChain v0.2+
- 从 pgvector 迁移到 Pinecone(统一管理、更好的 ANN 性能)
- 重构检索服务,统一 API
- 支持多模态检索(未来要支持图片)
产品经理给的 deadline:6 周。
传统做法:两个人月的工作量,我一个人用 AI agent,4 周完成了。
这篇文章讲的是这个过程:怎么做、踩了什么坑、gemini CLI 的能力边界在哪里。
项目背景:现有的 RAG 系统
先交代一下背景。
我们 2022 年搭了一套 RAG 系统,当时 LangChain 还是 0.0.x 版本,向量数据库用的 pgvector。
核心架构是这样的:
# 旧版架构(LangChain 0.0.x)
from langchain import OpenAI, VectorDBQA
from pgvector.sqlalchemy import Vector
from sqlalchemy import create_engine
class OldRAGSystem:
def __init__(self):
self.llm = OpenAI(temperature=0)
self.engine = create_engine("postgresql://localhost:5432/vectors")
self.vectorstore = VectorDBQA.from_connector(
name="docs",
description="Company documentation",
embedding_function=self._get_embedding,
engine=self.engine,
table_name="document_vectors"
)
def query(self, question: str) -> str:
# 简单直接:检索 -> 直接返回
docs = self.vectorstore.similarity_search(question, k=5)
context = "\n".join([d.page_content for d in docs])
return self.llm.call(f"Context: {context}\n\nQuestion: {question}")
这套系统有两个主要问题:
- LangChain API 大变:0.0.x 到 0.2.x 的 API 完全重写,老代码没法直接升级
- pgvector 性能瓶颈:50 万向量、768 维,ANN 查询延迟 200-300ms,P99 更高
迁移不是简单的代码翻译,是一次重构。
为什么选 Gemini CLI
迁移任务的特点:
- 代码量大(2 万行)
- 需要分阶段验证
- 要保持服务不中断
- 涉及多个组件的协调
我需要一个能处理复杂多步骤任务的 AI agent。选了 Gemini CLI,原因是:
- 长上下文:Gemini 2.5 Pro 的 100 万 token 上下文,能一次性处理整个模块的代码
- 多模态能力:未来要支持图片,需要能理解视觉内容的模型
- Code Execution:Gemini CLI 支持在对话中执行代码,能快速验证
实际使用下来,这三个能力的价值排序是:Code Execution > 长上下文 > 多模态。
迁移策略:分阶段执行
我把整个迁移分成了 5 个阶段:
Phase 1: 现状分析(Gemini 完成)
Phase 2: 基础设施搭建(Gemini + 人工)
Phase 3: 核心逻辑迁移(Gemini 完成)
Phase 4: 数据迁移(人工 + Gemini 辅助脚本)
Phase 5: 灰度验证(人工 + Gemini 辅助)
Phase 1: 现状分析
这一步我让 Gemini CLI 帮我分析了现有的 2 万行代码。
# 启动 Gemini CLI 分析模式
gemini-cli chat --model gemini-2.5-pro
# 我的输入:
我需要你分析整个 /path/to/old_rag 目录的代码结构。
请输出:
1. 代码模块划分(每个模块的功能)
2. API 调用链路(从 query 到最终返回的完整流程)
3. 外部依赖(有哪些外部服务、数据库、API)
4. 配置管理(hardcode 的配置有哪些)
5. 测试覆盖(有哪些测试,测试覆盖了哪些场景)
Gemini 的分析结果:
## 代码模块划分
### 模块 1: vector_storage(向量存储层)
- `embedding_service.py`: 文本向量化
- `pgvector_store.py`: pgvector 存储和检索
- `chunking_strategy.py`: 文档分块策略
问题:embedding_service 使用的是 OpenAI ada-002,hardcode
### 模块 2: retrieval(检索层)
- `semantic_search.py`: 语义检索
- `keyword_search.py`: 关键词检索
- `hybrid_search.py`: 混合检索(但实现有问题)
问题:混合检索是简单 OR,没有真正融合 score
### 模块 3: generation(生成层)
- `llm_service.py`: LLM 调用封装
- `prompt_template.py`: Prompt 管理
问题:Prompt 散落在代码各处,没有统一管理
### 模块 4: api(对外接口)
- `fastapi_server.py`: API 服务
- `endpoints/`: 各端点定义
### 外部依赖
- PostgreSQL + pgvector
- OpenAI API( ada-002 + GPT-4)
- Redis(缓存)
### Hardcode 配置
- embedding model: text-embedding-ada-002
- chunk_size: 512
- top_k: 5
- temperature: 0
这个分析花了我 2 小时review,但帮了大忙——让我对整个系统有了全局视图,而不是一头扎进代码细节里。
Phase 2: 基础设施搭建
这一阶段是纯人工,Gemini 帮不上忙:
- 申请 Pinecone 账号和索引
- 搭新的开发环境(Python 3.11 + poetry)
- 配置新的 embedding 服务(换成 text-embedding-3-small,便宜 5 倍)
# 新的 embedding 配置
EMBEDDING_CONFIG = {
"provider": "openai",
"model": "text-embedding-3-small", # 从 ada-002 升级
"dimension": 1536, # 从 1536 降到 1536(3-small 支持 256-3072)
"batch_size": 100,
}
Phase 3: 核心逻辑迁移
这是 Gemini CLI 发力的地方。我用了一个策略:先让 Gemini 生成新代码,然后人工 review,最后人工执行。
3.1 向量存储层迁移
# Gemini 生成的 Pinecone 存储层代码
from pinecone import Pinecone, ServerlessSpec
from langchain_pinecone import PineconeVectorStore
from langchain_openai import OpenAIEmbeddings
class NewVectorStore:
def __init__(self, api_key: str, index_name: str):
self.pc = Pinecone(api_key=api_key)
# 确保索引存在
if index_name not in [idx.name for idx in self.pc.list_indexes()]:
self.pc.create_index(
name=index_name,
dimension=1536,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
self.index = self.pc.Index(index_name)
self.embeddings = OpenAIEmbeddings(
model="text-embedding-3-small"
)
self.vectorstore = PineconeVectorStore(
index=self.index,
embedding=self.embeddings,
text_key="text"
)
def add_documents(self, documents: list, ids: list = None):
"""批量添加文档"""
return self.vectorstore.add_documents(documents=documents, ids=ids)
def similarity_search(self, query: str, k: int = 5, filter: dict = None):
"""相似度检索"""
return self.vectorstore.similarity_search(
query=query,
k=k,
filter=filter
)
def similarity_search_with_score(self, query: str, k: int = 5):
"""带分数的检索"""
return self.vectorstore.similarity_search_with_score(
query=query,
k=k
)
Gemini 生成的代码有个问题:它用了 text-embedding-3-small,但我的配置里 embedding dimension 是 1536,而旧系统用 ada-002 是 1536 维,没问题。但我后来发现 3-small 其实可以设 1536,不需要改索引配置。
踩坑:Gemini 生成的代码里没有处理 filter 参数的边界情况,我 review 的时候加上了。
3.2 检索层迁移
这是最复杂的部分。旧系统的混合检索实现有问题,我要重新做。
# Gemini 生成的混合检索实现
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
from langchain_core.documents import Document
from rank_bm25 import BM25Okapi
import numpy as np
class HybridSearchRetriever:
def __init__(self, vectorstore, documents: list):
self.vectorstore = vectorstore
# 构建 BM25 检索器
self.doc_texts = [doc.page_content for doc in documents]
self.bm25 = BM25Okapi(self.doc_texts)
def retrieve(self, query: str, k: int = 10):
"""
混合检索:向量检索 + BM25,RRF 融合
RRF (Reciprocal Rank Fusion): 1/(rank + k),k=60 是经验值
"""
# 向量检索
vector_results = self.vectorstore.similarity_search_with_score(query, k=k)
vector_docs = [doc for doc, score in vector_results]
# BM25 检索
tokenized_query = query.split()
bm25_scores = self.bm25.get_scores(tokenized_query)
top_k_indices = np.argsort(bm25_scores)[::-1][:k]
bm25_docs = [Document(page_content=self.doc_texts[i])
for i in top_k_indices]
# RRF 融合
k_rrf = 60 # RRF 参数
rrf_scores = {}
for rank, (doc, score) in enumerate(vector_results):
doc_id = doc.page_content[:100] # 用前 100 字符做 ID
rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1 / (rank + k_rrf)
for rank, doc in enumerate(bm25_docs):
doc_id = doc.page_content[:100]
rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1 / (rank + k_rrf)
# 排序返回
sorted_docs = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)
final_docs = [Document(page_content=doc_id) for doc_id, _ in sorted_docs[:k]]
return final_docs
这段代码里,RRF 融合的实现有一个 bug:用的是 doc.page_content[:100] 作为 doc_id,但如果有两条文档的前 100 个字符相同,就会被错误地当作同一条文档。
踩坑记录:我在测试的时候发现了这个问题,改成了用 document metadata 里的唯一 ID。
# 修正后的实现
def retrieve(self, query: str, k: int = 10, doc_ids: list = None):
"""
混合检索 RRF 融合
doc_ids: 文档唯一 ID 列表
"""
# ... 向量检索和 BM25 检索同上 ...
# 用 doc_id 而不是 page_content 前 100 字符
for rank, (doc, score) in enumerate(vector_results):
doc_id = doc.metadata.get('id') or doc_ids[rank]
rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1 / (rank + k_rrf)
for rank, (doc, score) in enumerate(bm25_scores[:k]):
doc_id = doc_ids[rank]
rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1 / (rank + k_rrf)
3.3 生成层迁移
# Gemini 生成的 LLM 服务封装
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
class NewLLMService:
def __init__(self, model: str = "gpt-4o", temperature: float = 0):
self.llm = ChatOpenAI(
model=model,
temperature=temperature
)
def create_chain(self, system_prompt: str, retrieval_chain):
"""创建 RAG chain"""
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", "{question}")
])
chain = (
{"context": retrieval_chain, "question": RunnablePassthrough()}
| prompt
| self.llm
| StrOutputParser()
)
return chain
def query(self, question: str, context: str) -> str:
"""直接调用(不走 retrieval chain)"""
prompt = f"""Given the following context:
{context}
Answer the question:
{question}
"""
return self.llm.invoke(prompt)
Phase 4: 数据迁移
这是最危险的部分:把 50 万条向量从 pgvector 迁移到 Pinecone。
我的策略:
1. 先迁移 1000 条,验证流程
2. 没问题的话,再批量迁移
3. 迁移过程中保持旧系统运行
# 数据迁移脚本
from tqdm import tqdm
class VectorMigrator:
def __init__(self, old_pg_conn, new_pinecone_index):
self.old_pg = old_pg_conn
self.new_index = new_pinecone_index
def migrate_batch(self, batch_size: int = 1000):
"""
批量迁移向量数据
"""
# 从 pgvector 读取
query = f"""
SELECT id, content, embedding, metadata
FROM document_vectors
WHERE migrated = FALSE
ORDER BY id
LIMIT {batch_size}
"""
rows = self.old_pg.execute(query).fetchall()
if not rows:
print("No more rows to migrate")
return 0
# 转换成 Pinecone 格式
vectors = []
ids_to_update = []
for row in rows:
vectors.append({
"id": str(row.id),
"values": row.embedding, # pgvector 是 list 类型
"metadata": {
"content": row.content[:2000], # Pinecone metadata 有 40KB 限制
**row.metadata
}
})
ids_to_update.append(row.id)
# 写入 Pinecone
self.new_index.upsert(vectors)
# 标记已迁移
update_query = """
UPDATE document_vectors
SET migrated = TRUE, migrated_at = NOW()
WHERE id = ANY(:ids)
"""
self.old_pg.execute(update_query, {"ids": ids_to_update})
return len(rows)
def migrate_all(self, total: int):
"""
完整迁移
"""
migrated = 0
batch_size = 1000
with tqdm(total=total) as pbar:
while True:
count = self.migrate_batch(batch_size)
if count == 0:
break
migrated += count
pbar.update(count)
print(f"Migration complete: {migrated} vectors")
# 使用
migrator = VectorMigrator(old_pg, pinecone_index)
migrator.migrate_all(total=500000)
迁移 50 万条向量花了大约 8 小时,主要是网络 IO 等待。实际迁移过程中发现的问题:
- 旧数据有 2000 条是脏数据(embedding 维度不对),需要先清理
- 部分 content 超过 40KB,需要截断
- metadata 里有 Python object(pickle 序列化的),需要反序列化后重新序列化
这些问题都是我在测试阶段没发现,批量迁移时才暴露的。教训是:测试数据要足够大,至少要覆盖 1% 的真实数据量。
Phase 5: 灰度验证
迁移完成后,需要验证新系统跟旧系统输出一致。
# 验证脚本
import json
from datetime import datetime
class RAGValidator:
def __init__(self, old_rag, new_rag, test_queries: list):
self.old_rag = old_rag
self.new_rag = new_rag
self.test_queries = test_queries
self.results = []
def run_validation(self):
"""运行验证测试"""
for query in self.test_queries:
old_result = self.old_rag.query(query)
new_result = self.new_rag.query(query)
# 用 embedding similarity 比较两个回答的语义相似度
old_embedding = get_embedding(old_result)
new_embedding = get_embedding(new_result)
similarity = cosine_similarity(old_embedding, new_embedding)
self.results.append({
"query": query,
"old_result": old_result,
"new_result": new_result,
"similarity": similarity,
"passed": similarity > 0.85 # 阈值 0.85
})
return self.results
def generate_report(self) -> dict:
"""生成验证报告"""
passed = sum(1 for r in self.results if r["passed"])
total = len(self.results)
return {
"timestamp": datetime.now().isoformat(),
"total_queries": total,
"passed": passed,
"failed": total - passed,
"pass_rate": f"{passed/total*100:.1f}%",
"failed_queries": [r for r in self.results if not r["passed"]]
}
我准备了 200 个测试 query,验证结果:
- 185 个通过(similarity > 0.85)
- 15 个未通过
15 个未通过的 case 里:
- 10 个是因为新系统的检索结果确实比旧系统好(similarity 0.75-0.85,问法不一样但答案更准确)
- 5 个是真的有问题(embedding 维度不一致导致的)
这 5 个问题定位后修复,最终验证通过率 100%。
Gemini CLI 的能力边界
用了 4 周,我对 Gemini CLI 的能力边界有了清晰认知。
能力强的部分
-
代码翻译:把一个框架的代码翻译成另一个框架,Gemini 做得很准确。LangChain 0.0.x → LangChain 0.2.x 的翻译,我让它做了 80%,review 花的时间比写的时间还少。
-
代码解释:让它解释一段旧代码的逻辑,比我自己读代码理解得更快。
-
生成测试用例:给定一个函数,让它生成边界测试用例,覆盖率比我手动写的高。
-
多步骤推理:复杂业务逻辑的推理,比如"这个迁移方案会影响哪些下游服务",Gemini 能给出比较完整的分析。
能力弱的部分
-
不知道你的上下文:Gemini CLI 的代码生成是通用模式,不了解你项目的特殊约束。比如我知道 pgvector 的向量维度必须固定,但 Gemini 生成的代码假设维度可以动态调整。
-
执行环境隔离:Gemini CLI 能执行代码,但执行环境跟生产环境不一样。生产环境的配置、依赖、网络策略,在 Gemini 的执行环境里都不存在。
-
无法处理超长代码:虽然 Gemini 支持 100 万 token 上下文,但超过 5000 行的代码文件,生成质量会明显下降。我后来强制自己把大文件拆成小文件再让 Gemini 处理。
-
不知道你的数据质量:迁移脚本在真实数据上跑之前,Gemini 不知道你的数据有多脏。它生成的迁移代码假设数据是干净的。
总结:AI Agent 做迁移的合适姿势
这次迁移能 4 周完成,AI agent 帮了大忙。但关键是知道怎么用它:
AI agent 适合的场景:
- 代码翻译(框架升级、API 重写)
- 模式化的代码生成(CRUD、基础服务封装)
- 代码审查和解释
- 测试用例生成
AI agent 不适合的场景:
- 需要了解业务上下文的决策(用 pgvector 还是 Pinecone?)
- 数据质量问题处理(脏数据、边界情况)
- 涉及多个外部系统的协调(这个 DB 连哪个?那个 API 怎么认证?)
- 最终验证(必须人来跑,必须人来判断)
核心经验是:AI agent 是执行层的加速器,不是决策层的替代品。我负责想清楚要迁移到哪里、怎么验证,AI agent 负责把"怎么做"快速执行出来。
这个分工让我的效率提升了 3-4 倍,但每个关键决策还是我自己做。4 周完成原本 2 个月的工作量,这个比例是合理的——不是 AI 替代了人,是人用对了 AI。
迁移的完整代码和脚本在 GitHub 上(链接略)。如果你们也在做类似的 RAG 迁移,欢迎交流经验。