Lambda Just Got a File System. I Put AI Agents on It.
写过 S3 触发 Lambda 的人都知道那个流程。
S3 事件来了,Lambda 醒来,第一件事是下载文件到 /tmp,处理,上传结果,清理 /tmp,避免空间用尽。然后为下一个文件重复这个过程。
我做过这个 pattern 多少次了。下载,处理,上传,清理。下载,处理,上传,清理。每多一个 function 要操作同一份数据,就多一份下载的副本,多一份 /tmp 的管理,多一份文件不同步的焦虑。
然后 AWS 推出了 S3 Files。
S3 Files 把 S3 bucket 直接挂载成 Lambda 函数上的本地文件系统。你不需要再下载文件到 /tmp,不需要再上传结果,不需要再清理临时空间。代码直接用 open() 操作文件,S3 Files 在后台处理所有同步。
我做的第一件事:搭了一套 AI 代码审查系统,三个 Lambda 函数同时挂载同一个 S3 bucket,通过文件系统共享工作空间,orchestrator 写文件,agents 读文件。结果,整个项目里最无聊的部分就是文件读写代码。
这就是重点。
/tmp 税:每个 Lambda 开发者都交过
如果你在 Lambda 上处理过 S3 数据,一定写过这样的代码:
import boto3
import os
s3 = boto3.client("s3")
def lambda_handler(event, context):
bucket = event["bucket"]
key = event["key"]
# 第一件事:下载到 /tmp
# S3 不给你文件,只给你对象
local_path = f"/tmp/{key.split('/')[-1]}"
s3.download_file(bucket, key, local_path)
# 读文件,处理
with open(local_path) as f:
content = f.read()
result = process(content)
# 写回去,上传到 S3
s3.put_object(Bucket=bucket, Key=f"output/{key}", Body=result)
# 清理 /tmp,否则下次就没空间了
os.remove(local_path)
这只是"读一个文件然后写回去",却要这么多仪式。
但这还不是最糟糕的。当你有多个 function 需要操作同一份数据时,复杂性急剧上升:
场景一:串行处理
Function A 处理完文件,生成中间结果。Function B 需要读取这个中间结果,继续处理。A 把结果上传到 S3,B 需要再下载一次。每多一个步骤,就多一次上传下载。
场景二:并行处理
Function A 和 Function B 同时需要读取同一份原始数据。各自下载自己的副本到 /tmp。如果原数据有 500MB,两个 function 同时跑,/tmp 就去掉了 1GB。如果处理的是更大的数据集呢?
场景三:数据一致性问题
Function A 写了一个文件,Function B 需要立即读取这个文件。但 A 写完还要上传,B 才能下载。如果上传失败了,或者 B 在上传完成前就读了,那 B 拿到的是旧数据。
这就是我所谓的 /tmp 税:为每个文件操作额外付出的代价,包括下载的带宽、上传的时间、/tmp 的空间管理、以及协调多个 function 之间数据同步的复杂度。
现有方案的局限
有些人会说,用 s3fs 或 smart_open 这样的库可以缓解这个问题。它们把 S3 操作封装成了类似本地文件系统的接口。
import s3fs
fs = s3fs.S3FileSystem()
with fs.open(f"s3://my-bucket/data/file.txt", "r") as f:
content = f.read()
但它们底层还是在调用 SDK。代码跟 S3 的交互,依然是通过 boto3,不是通过文件系统。语义上有差异:错误处理模式不同、权限模型不同、文件系统的隐含保证(比如 close-to-open 一致性)不适用于 s3fs。这意味着你还是要小心处理那些在真正文件系统上不会遇到的问题。
这不是说这些库不好。它们是好的。只是它们解决的是"让 S3 操作更像本地文件"的问题,而 S3 Files 解决的是"让本地文件操作直接工作在 S3 上"的问题。这是本质不同的。
S3 Files for Lambda:直接用文件路径
S3 Files 是 AWS 2026 年 4 月推出的一项功能,它把你的 S3 bucket 挂载成 Lambda 函数上的本地文件系统。
用户 S3 事件触发 Lambda
↓
Lambda 挂载 S3 bucket 到 /mnt/workspace
↓
代码直接读写 /mnt/workspace/*
↓
S3 Files 处理后台同步到 S3
代码看起来是这样的:
from pathlib import Path
WORKSPACE = Path("/mnt/workspace")
def lambda_handler(event, context):
# 直接从挂载点读取,不需要下载
content = (WORKSPACE / "source" / "app.py").read_text()
result = process(content)
# 直接写到挂载点,不需要上传
(WORKSPACE / "output" / "result.json").write_text(result)
不需要 boto3 来读写文件。不需要管理 /tmp。不需要上传步骤。文件系统本身就是接口。
S3 Files 的底层原理
S3 Files 底层基于 Amazon EFS 构建。但它不是简单地把 EFS 暴露出来,而是做了一个深度的集成:
- 对常用数据:S3 Files 在高性能存储上缓存你的工作集,读延迟亚毫秒级
- 对大型顺序读取:直接从 S3 流式传输,不经缓存
- 写入的文件:自动在后台同步到 S3,更改在几分钟内出现在 S3 中
- S3 中的更改:会在几秒内出现在挂载的文件系统上
你得到的是文件系统语义(标准 open/read/write/close 操作)加上 S3 的持久力和经济性。
限制:VPC 是必须的
但有一个重要的注意事项:S3 Files 需要 VPC。你的 Lambda 函数需要与挂载目标处于同一 VPC,且需要 NAT 网关来提供出站互联网访问。
作为一个长期搞 serverless 的人,我一般回避 VPC。VPC 配置曾经意味着冷启动 penalty(10+ 秒),意味着网络配置的复杂性,意味着要维护更多的基础设施。
但 AWS 这些年已经解决掉了大多数障碍:
- VPC-attached Lambda 函数不再有过去的冷启动代价(我实测过,2 秒以内)
- 网络配置可以模板化,写一次然后复用
- AWS 提供 reusable VPC patterns,减少重复工作
为了 S3 Files 给你的东西(多个 Lambda 函数共享文件系统,不需要下载上传 ceremony),这个权衡值得。
我们要搭什么
我想用 S3 Files 测试点比"读个 CSV"更有意思的东西。
我搭了一个 serverless 代码审查系统。用户给它指一个公开的 GitHub 仓库 URL,然后三件事依次发生:
- 一个 durable orchestrator function 把仓库克隆到共享的 S3 Files 工作空间
- 一个安全审查 agent 和一个风格审查 agent 并行分析代码
- 结果以 JSON 文件形式落入同一工作空间,自动同步回 S3
关键设计:三个 Lambda 函数同时挂载同一个 S3 bucket
- Orchestrator 写文件(克隆的仓库)
- 两个 Agent 读文件(分析的代码)
- 结果写入同一工作空间,S3 Files 处理同步
函数之间不传递 S3 key。不下载到 /tmp。不需要任何应用层的协调代码。文件系统就是协调层。
Agents 使用 Strands Agents SDK,配合 Amazon Bedrock。每个 agent 有自定义的文件工具,操作挂载路径。Claude 决定读哪些文件、分析什么、写入什么。整个过程中,agents 完全不知道自己在用 S3——它们以为自己在一个本地文件系统上工作。
Orchestrator 使用 Lambda Durable Functions 编排工作流,自动做 checkpointing。如果执行中断,AWS 会从最后一个 checkpoint 恢复,不需要重新克隆仓库或重启已完成的工作。
这是 AI agent 工作流在 serverless 架构上真正可行的关键:执行时间不确定的 agent 可以运行任意时长,不会因为 Lambda 的 15 分钟超时限制而失败。
完整源码:github.com/singledigit/lambda-s3-files-example
架构图:Lambda + FS + AI Agent 交互
┌─────────────────────────────────────┐
│ S3 Bucket │
│ (持久化存储,系统 of record) │
│ 规模:EB 级别,跨 region 复制 │
└──────────────┬──────────────────────┘
│
┌──────────────▼──────────────────────┐
│ S3 Files FileSystem │
│ (基于 EFS,桥接 S3 和 NFS) │
│ │
│ - 挂载点:/mnt/workspace │
│ - 自动同步到 S3(分钟级) │
│ - S3 更改即时可见(秒级) │
│ - 亚毫秒读延迟(缓存热数据) │
└──────────────┬──────────────────────┘
│
┌────────────────────┼────────────────────┐
│ │ │
┌──────────▼──────────┐ ┌──────▼──────┐ ┌────────▼────────┐
│ Orchestrator │ │ Agent A │ │ Agent B │
│ (Durable Function) │ │ (Security) │ │ (Style) │
│ │ │ │ │ │
│ - GitHub API │ │ - 读取代码 │ │ - 读取代码 │
│ - Clone repo │ │ - 安全分析 │ │ - 风格分析 │
│ - 启动并行 agents │ │ - 写结果 │ │ - 写结果 │
│ - Checkpoint 状态 │ │ JSON │ │ JSON │
└──────────────────────┘ └────────────┘ └─────────────────┘
数据流详解:
- 触发:用户 POST 请求,传入 GitHub 仓库 URL
- Step 1 - 克隆:Orchestrator 调用 GitHub API,克隆仓库到
/mnt/workspace/repo/ - Step 2 - 并行分析:Durable functions 同时启动 Agent A 和 Agent B
- Step 3 - 读取:两个 agent 都读取
/mnt/workspace/repo/中的代码文件 - Step 4 - 写入:各自把分析结果写入
/mnt/workspace/results/security.json和/mnt/workspace/results/style.json - Step 5 - 汇总:Orchestrator 读取两个结果文件,生成汇总报告,写入
/mnt/workspace/results/summary.json - 自动同步:S3 Files 在后台将所有更改同步到 S3 bucket
关键特性:文件系统作为协调层
注意这里没有任何显式的协调代码。没有 SQS 队列传递消息,没有 DynamoDB 存储状态,没有 Redis 做 pub/sub。两个 agent 通过文件系统共享数据:
# Agent A 写入中间状态(供 Agent B 读取)
(WORKSPACE / "state" / "agent_a_progress.json").write_text(
json.dumps({"step": 2, "files_analyzed": 15, "findings": [...]})
)
如果 Agent B 需要知道 Agent A 的进度,它直接读这个文件。文件系统承担了协调职责,代码只需要关注业务逻辑。
SAM 模板:基础设施即代码
这部分是花最多时间迭代的。S3 Files 是全新的功能,CloudFormation 资源类型还没进 linter。IDE 会报红,不用管,照写。
资源链:五个必需的组件
要让 S3 Files 在 Lambda 上跑起来,你需要五个资源,按依赖顺序:
S3 Bucket (启用了版本控制)
↓
IAM Role (S3 Files 用来访问 bucket)
↓
S3 Files FileSystem (桥接 bucket 和 NFS)
↓
Mount Targets (每个 AZ 一个,网络端点)
↓
Access Point (控制 Lambda 的 POSIX 身份)
↓
Lambda Function (挂载 access point)
资源类型是 AWS::S3Files::FileSystem、AWS::S3Files::MountTarget 和 AWS::S3Files::AccessPoint。记住:IDE 的 CloudFormation linter 还不认识它们,忽略红色波浪线,这不是代码问题。
IAM Role 的坑
S3 Files 的 IAM role 信任的是 elasticfilesystem.amazonaws.com,不是 s3files.amazonaws.com。这点坑了我两个小时。
为什么?因为 S3 Files 底层是 EFS,所以信任关系走的是 EFS 服务主体,不是 S3 Files 服务主体。
S3FilesRole:
Type: AWS::IAM::Role
Properties:
Path: /service-role/
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: AllowS3FilesAssumeRole
Effect: Allow
Principal:
Service: elasticfilesystem.amazonaws.com
Action: sts:AssumeRole
Condition:
StringEquals:
aws:SourceAccount: !Ref AWS::AccountId
ArnLike:
aws:SourceArn: !Sub 'arn:aws:s3files:${AWS::Region}:${AWS::AccountId}:file-system/*'
角色需要对 bucket 有读写权限,scope 到你的 bucket ARN:
S3FilesBucketPolicy:
Type: AWS::S3::BucketPolicy
Properties:
Bucket: !Ref S3FilesBucket
PolicyDocument:
Statement:
- Effect: Allow
Principal:
Service: elasticfilesystem.amazonaws.com
Action:
- s3:GetObject
- s3:PutObject
- s3:DeleteObject
- s3:ListBucket
Resource:
- !Sub "${S3FilesBucket.Arn}/*"
- !Sub "${S3FilesBucket.Arn}"
Condition:
StringEquals:
aws:ResourceAccount: !Ref AWS::AccountId
Access Point:最关键的部分
这是对 Lambda 最重要的一部分。Access Point 控制你的函数以什么 POSIX 身份运行,并创建一个可写的根目录。
没有它,Lambda 可以挂载文件系统但写不进去(权限被 root 阻塞)。
S3FilesAccessPoint:
Type: AWS::S3Files::AccessPoint
Properties:
FileSystemId: !GetAtt S3FileSystem.FileSystemId
PosixUser:
Uid: '1000'
Gid: '1000'
RootDirectory:
Path: /lambda
CreationPermissions:
OwnerUid: '1000'
OwnerGid: '1000'
Permissions: '755'
CreationPermissions 属性至关重要。它在客户端首次连接时自动创建 /lambda 目录并设置正确的所有权。如果没有这个属性,根目录属于 root(UID 0),Lambda(以 UID 1000 运行)无法创建子目录,function 会报 "Permission denied" 错误。
Lambda 配置
Lambda 那边的配置,FileSystemConfigs 接收 access point ARN(不是 file system ARN)和本地挂载路径:
OrchestratorFunction:
Type: AWS::Serverless::Function
DependsOn:
- MountTargetA
- MountTargetB
Properties:
FunctionName: !Sub "${AWS::StackName}-orchestrator"
Handler: orchestrator.handler
Runtime: python3.12
MemorySize: 1024
Timeout: 900 # 15 分钟,支持长运行的 durable 流程
Environment:
Variables:
WORKSPACE_PATH: /mnt/workspace
FileSystemConfigs:
- Arn: !GetAtt S3FilesAccessPoint.AccessPointArn
LocalMountPath: /mnt/workspace
VpcConfig:
SecurityGroupIds:
- !GetAtt NetworkingStack.Outputs.LambdaSGId
SubnetIds:
- !GetAtt NetworkingStack.Outputs.PrivateSubnetAId
- !GetAtt NetworkingStack.Outputs.PrivateSubnetBId
Policies:
- AmazonS3FilesClientReadWriteAccess
- AWSLambdaVPCAccessExecutionRole
DependsOn 对 mount targets 的依赖很重要。Lambda 在 mount targets 可用之前无法挂载文件系统,创建 mount targets 大约需要五分钟。如果不设置这个依赖,第一次调用会失败,直到 mount targets 真正就绪。
Agent 代码:自定义文件工具
agents 使用 Strands Agents SDK。每个 agent 有自定义的文件工具,操作挂载路径:
from strands import Agent
from strands.tools import tool
from pathlib import Path
import json
WORKSPACE = Path("/mnt/workspace")
@tool
def read_code_file(relative_path: str) -> str:
"""Read a code file from the shared workspace.
Args:
relative_path: Path relative to the repo root,
e.g., 'src/handlers/auth.py'
Returns:
File contents as string, or error message.
"""
full_path = WORKSPACE / "repo" / relative_path
if not full_path.exists():
return f"Error: File not found: {relative_path}"
if not full_path.is_file():
return f"Error: Not a file: {relative_path}"
return full_path.read_text()
@tool
def list_code_files(extension: str = ".py") -> list[str]:
"""List all code files in the repo with given extension.
Args:
extension: File extension to filter by (default: .py)
Returns:
List of relative file paths.
"""
repo_path = WORKSPACE / "repo"
return [
str(p.relative_to(repo_path))
for p in repo_path.rglob(f"*{extension}")
]
@tool
def write_analysis_result(analysis_type: str, content: str) -> str:
"""Write analysis results to shared workspace.
Args:
analysis_type: Type of analysis (e.g., 'security', 'style')
content: JSON string of analysis results
Returns:
Path to written file.
"""
result_path = WORKSPACE / "results" / f"{analysis_type}.json"
result_path.parent.mkdir(parents=True, exist_ok=True)
result_path.write_text(content)
return f"Written to {result_path}"
# 安全审查 agent
security_agent = Agent(
model="anthropic.claude-3-5-sonnet-20241022",
tools=[read_code_file, list_code_files, write_analysis_result],
systemPrompt="""You are a security review agent.
Your job is to analyze code for security vulnerabilities.
Focus areas:
- SQL injection risks (string concatenation in queries)
- XSS vulnerabilities (unescaped user input in output)
- Secrets hardcoded in code (API keys, passwords, tokens)
- Insecure deserialization (pickle, eval on user input)
- Authentication/authorization bypasses
- Path traversal vulnerabilities
Use list_code_files to find relevant files, read_code_file to
inspect them, and write_analysis_result to save your findings.
Findings should be structured as JSON with severity (HIGH/MEDIUM/LOW),
file path, line number if known, and description."""
)
# 代码风格审查 agent
style_agent = Agent(
model="anthropic.claude-3-5-sonnet-20241022",
tools=[read_code_file, list_code_files, write_analysis_result],
systemPrompt="""You are a code style review agent.
Your job is to analyze code for style issues and maintainability problems.
Focus areas:
- Code smells (long methods, deep nesting, duplicated code)
- Best practice violations (no type hints, magic numbers)
- Error handling missing or incorrect
- Naming conventions (unclear variable names, inconsistent casing)
- Documentation missing or inadequate
Use list_code_files to find relevant files, read_code_file to
inspect them, and write_analysis_result to save your findings.
Findings should be structured as JSON with severity (HIGH/MEDIUM/LOW),
file path, line number if known, and description."""
)
关键是:agents 完全不知道自己在用 S3。它们以为自己在一个本地文件系统上工作。文件系统的协调工作——多个 agent 读写同一份数据——完全由 S3 Files 处理,代码不需要为此做任何特殊处理。
这就是 S3 Files 的真正价值:把分布式系统的复杂性从应用层下沉到基础设施层。
实际案例:AI Agent 文件处理场景
场景一:多 Agent 并行代码审查(我们实际跑通的)
用户传入 GitHub 仓库 URL,orchestrator 克隆仓库到 /mnt/workspace/repo/,然后并行启动安全审查和风格审查两个 agent。
from aws_lambda_powertools import Logger
from amazon.lambda.durable import DeterministicUuid, when_all, when_any
from pathlib import Path
import json
logger = Logger()
@DeterministicUuid
def orchestration_id(event, context):
return event["repo_url"]
def lambda_handler(event, context):
orchestrator = CodeReviewOrchestrator()
return orchestrator.handle(event, context)
class CodeReviewOrchestrator:
def __init__(self):
self.workspace = Path("/mnt/workspace")
self.repo_path = self.workspace / "repo"
self.results_path = self.workspace / "results"
def handle(self, event, context):
repo_url = event["repo_url"]
execution_id = context.execution_id
logger.info(f"Starting code review", extra={"repo": repo_url, "execution": execution_id})
# Step 1: Clone repository to workspace
clone_result = self.clone_repo(repo_url)
if not clone_result["success"]:
return {"status": "error", "message": clone_result["error"]}
# Step 2: Launch agents in parallel
agent_tasks = self.launch_agents(execution_id)
# Step 3: Wait for all agents to complete
# (Durable functions handles checkpointing during wait)
agent_results = self.wait_for_agents(agent_tasks)
# Step 4: Compile results
summary = self.compile_results(agent_results)
return {
"status": "complete",
"execution_id": execution_id,
"summary": summary
}
def clone_repo(self, repo_url):
"""Clone GitHub repo to workspace."""
import subprocess
try:
self.repo_path.mkdir(parents=True, exist_ok=True)
subprocess.run(
["git", "clone", "--depth", "1", repo_url, str(self.repo_path)],
check=True,
capture_output=True
)
return {"success": True}
except Exception as e:
logger.error(f"Clone failed: {e}")
return {"success": False, "error": str(e)}
def launch_agents(self, execution_id):
"""Start security and style agents in parallel."""
# In practice, this uses Lambda durable functions to
# launch child executions for each agent
return [
{"agent": "security", "task_id": f"{execution_id}-security"},
{"agent": "style", "task_id": f"{execution_id}-style"}
]
def wait_for_agents(self, agent_tasks):
"""Wait for all agent tasks to complete."""
results = {}
for task in agent_tasks:
# Read result file written by agent
result_file = self.results_path / f"{task['agent']}.json"
if result_file.exists():
results[task['agent']] = json.loads(result_file.read_text())
else:
results[task['agent']] = {"status": "pending"}
return results
def compile_results(self, agent_results):
"""Compile individual results into summary."""
total_findings = sum(
len(r.get("findings", []))
for r in agent_results.values()
)
return {
"agents_run": len(agent_results),
"total_findings": total_findings,
"by_agent": {
agent: len(r.get("findings", []))
for agent, r in agent_results.items()
}
}
场景二:大型数据集预处理流水线
S3 Files 另一个强场景是数据预处理。对于需要读取大量文件、做转换、然后写回去的 pipeline,不再需要每个 step 都下载上传。
def lambda_handler(event, context):
workspace = Path("/mnt/workspace")
input_dir = workspace / "data" / "raw"
output_dir = workspace / "data" / "processed"
# 确保输出目录存在
output_dir.mkdir(parents=True, exist_ok=True)
# 处理每个 CSV 文件
processed_count = 0
for file_path in input_dir.glob("*.csv"):
try:
df = pd.read_csv(file_path)
# 数据清洗
cleaned = clean_data(df)
# 转换
transformed = transform_data(cleaned)
# 写出结果(直接写,自动同步到 S3)
output_path = output_dir / file_path.name
transformed.to_csv(output_path, index=False)
processed_count += 1
logger.info(f"Processed {file_path.name}")
except Exception as e:
logger.error(f"Failed to process {file_path}: {e}")
return {
"status": "complete",
"processed": processed_count
}
关键点:不再需要手动 s3.upload_file()。写入即在 S3 中可用。如果处理过程中 Lambda 崩溃,EFS 的缓存确保数据不丢失,S3 的最终一致性确保数据最终会持久化。
场景三:多 Agent 协作的共享状态
多 agent 协作时,agent 之间需要共享中间结果或状态。用 S3 Files,这变得异常简单:
# Agent A: 分析第一部分代码,写入进度
(WORKSPACE / "state" / "phase1_findings.json").write_text(
json.dumps({
"phase": 1,
"files_analyzed": 15,
"findings": [...]
})
)
# Agent B: 检查 Phase 1 是否完成,决定是否开始
phase1_state = (WORKSPACE / "state" / "phase1_findings.json")
if phase1_state.exists():
state = json.loads(phase1_state.read_text())
if state["phase"] == 1:
# 开始 Phase 2
pass
不再需要通过 SQS/DynamoDB/Redis 做协调。文件系统就是共享状态层。Agent A 写,Agent B 读,就这么简单。
Lambda Durable Functions:编排 AI 工作流
Lambda Durable Functions 是让这套架构真正 work 的另一块积木。它们让你用顺序代码写长运行的工作流,自动做 checkpointing 和故障恢复。
为什么需要它?
AI agent 的执行时间不确定。一个代码审查 agent 可能 30 秒完成,也可能 5 分钟。如果用普通 Lambda,你要在 15 分钟超时之前完成任务,否则失败重跑。
但更重要的是:如果 agent 执行到一半,Lambda 因为任何原因崩溃了怎么办?普通 Lambda 没有状态保留,崩溃即重来。
Lambda Durable Functions 解决了这两个问题:
- 超时不再是问题:Durable execution 可以运行长达 1 年(虽然我们的场景只需要几分钟)
- 状态自动 checkpoint:每一步执行完,状态自动保存。如果执行中断,AWS 从最后一个 checkpoint 恢复,不需要重新执行已完成的工作
from aws_lambda_powertools import Logger
from amazon.lambda.durable import (
DeterministicUuid,
orchestration_context,
current_step
)
import json
logger = Logger()
def lambda_handler(event, context):
"""Entry point for durable orchestration."""
orchestrator = CodeReviewOrchestrator()
return orchestrator.handle(event, context)
class CodeReviewOrchestrator:
def __init__(self):
self.workspace = Path("/mnt/workspace")
def handle(self, event, context):
repo_url = event["repo_url"]
# 获取当前 orchestrator 状态
state = orchestration_context()
current_step_name = current_step()
if current_step_name == "clone":
return self.step_clone(repo_url)
elif current_step_name == "analyze":
return self.step_analyze()
elif current_step_name == "summarize":
return self.step_summarize()
# 初始调用:从 Step 1 开始
return self.start_orchestration(repo_url)
def start_orchestration(self, repo_url):
"""Start the orchestration pipeline."""
return {
"steps": [
{"name": "clone", "action": "clone_repo", "params": {"repo_url": repo_url}},
{"name": "analyze", "action": "parallel_agents"},
{"name": "summarize", "action": "compile_results"}
]
}
def step_clone(self, repo_url):
"""Clone repository (could take a few minutes for large repos)."""
logger.info(f"Cloning {repo_url}")
# 这个 step 可能需要几分钟,durable function 会自动 checkpoint
clone_result = self.clone_repo(repo_url)
return {
"next_step": "analyze",
"checkpoint_data": {"clone_status": "complete"}
}
def step_analyze(self):
"""Launch agents and wait for completion."""
# 在实际实现中,这里会使用
# durable_function.activity() 来启动 agent 执行
# 并使用 when_all() 等待它们完成
return {
"next_step": "summarize",
"checkpoint_data": {"agents_status": "complete"}
}
def step_summarize(self):
"""Compile and return final results."""
results = self.read_results()
return {
"status": "complete",
"results": results
}
关键点:即使 orchestrator 等待 agent 完成(可能几分钟),整个执行状态都被 checkpoint。如果 Lambda 在等待期间超时或崩溃,AWS 从最后一个 checkpoint 恢复,重新进入等待状态,而不是重新执行整个流程。
个人观点:为什么这个特性重要,以及它的局限
为什么重要
1. 文件系统语义 + S3 规模
过去在 Lambda 上操作 S3 数据,你需要用 SDK,操心下载/上传/清理。现在可以直接用文件系统 API,同时拥有 S3 的规模和持久力。对于需要操作大量文件的 AI workload,这是本质改变。
不是"更好用的 S3",是"真正的本地文件系统,但数据在 S3"。
2. 多 Agent 共享工作空间
在没有 S3 Files 之前,多个 Lambda 函数操作同一份数据,需要通过 event 传递 S3 key,或者用 DynamoDB/Redis 做协调。这些方案都有效,但都需要额外的应用层代码来处理分布式系统的复杂性。
现在文件系统本身就是协调层。多个 agent 同时读写同一份数据,不需要任何应用层协调代码。S3 Files 的底层处理了所有一致性、冲突和同步的问题(close-to-open 一致性模型下)。
3. AI Agent 原生体验
AI agents 用文件、路径、本地脚本思考。这是它们的 mental model。S3 Files 让它们能把 exabyte 规模的 S3 bucket 当成本地硬盘用,不再受困于 API 调用开销。
不再需要告诉 agent:"你要先调用 S3 API 下载这个文件,然后处理,然后调用 S3 API 上传结果。"现在只需说:"文件在 /mnt/workspace/repo/ 下,去读吧。"
4. Serverless AI 工作流成为可能
Lambda Durable Functions + S3 Files = 可以在 Lambda 上跑真正的多步骤 AI 工作流:
- Durable functions 处理长运行的 orchestration 和 checkpointing
- S3 Files 提供 agent 之间的共享工作空间
- 无需管理服务器,无需预置容量,按实际执行时间付费
这是一个我一直想要的架构,但之前缺少关键组件。现在它完整了。
局限
1. 一致性模型:close-to-open
S3 Files 提供 close-to-open 一致性。这意味着:
- 如果 Function A 写了一个文件,Function B 立即去读,B 可能看不到最新版本
- 如果你在 S3 控制台上传了一个文件,Lambda function 立即去读,function 可能看不到这个文件
对于我的场景(orchestrator 先写,agents 后读,顺序是自然的),这不是问题。但如果你需要实时协调并发写入的 agent,比如两个 agent 同时写同一个文件的的不同部分,这个模型会出问题。
解决方案:在需要严格一致性的场景,在应用层做确认(比如写完后立即读回来验证),或者用一个 agent 做写入协调。
2. VPC 复杂度
S3 Files 需要 VPC,这是额外的配置门槛。虽然 AWS 已经大幅简化了 VPC-attached Lambda 的冷启动问题,但:
- 需要创建私有子网(至少两个,用于 HA)
- 需要配置 NAT 网关/网关终端节点
- 需要正确配置安全组(开放 2049 端口给 NFS)
- 需要给 Lambda 函数 VPC 权限
这不是每天都要做的事,但每开始一个新项目就要做一次。AWS 提供了一些 reusable patterns,但还是要花时间理解网络配置。
建议:把这个配置模板化,一次做好,之后复用。
3. 冷启动残留
虽然冷启动已经从过去的 10+ 秒降到了 2 秒左右(我实测过),但对于 latency 敏感的交互式场景,2 秒仍然值得考量。
如果你的用户期待毫秒级响应,S3 Files 可能不是最优选择。考虑用 Lambda 的预置并发(Provisioned Concurrency)来消除冷启动,但这会增加成本。
4. 成本评估
S3 Files 基于 EFS,EFS 的按读写次数计费模式在高频率文件操作场景下成本会比纯 S3 API 调用高。
对于我的场景(代码审查,每个文件读一次,结果写一次),文件操作频率相对低,成本可接受。但对于需要频繁读写(每秒数千次)的场景,需要仔细评估。
建议:用 S3 Files cost calculator 估算你的 workload 成本,和纯 S3 API 方案对比。
5. 调试复杂度
当多个 Lambda 函数通过文件系统交互时,调试变得更复杂:
- 文件系统状态不容易直观查看(不像 S3,可以用控制台浏览)
- 多个函数的日志需要关联才能理解完整的执行流
- 一致性问题可能在特定时序下才出现,难以复现
建议:在代码里加入详细的结构化日志,包括文件操作的时间戳和结果。
部署清单:如果想自己试
-
创建 VPC
- 至少两个私有子网(不同 AZ)
- NAT 网关(或 NAT Instance,用于私有子网的互联网访问)
- 安全组:入方向开放 2049 端口(NFS),出方向允许所有 -
创建 S3 Bucket
- 启用版本控制(必需)
- 记录 bucket ARN -
创建 S3 Files FileSystem
yaml S3FileSystem: Type: AWS::S3Files::FileSystem Properties: Name: my-lambda-workspace -
创建 Mount Targets
- 每个 AZ 创建一个
- 等待它们变为 available 状态(约 5 分钟) -
创建 Access Point
- 设置 PosixUser (Uid: 1000, Gid: 1000)
- 设置 RootDirectory 为 /lambda,CreationPermissions 为 755 -
配置 IAM Role
- 信任 elasticfilesystem.amazonaws.com
- 附加 AmazonS3FilesClientReadWriteAccess 策略 -
部署 Lambda 函数
- 指定 FileSystemConfigs (access point ARN + /mnt/workspace)
- 指定 VpcConfig (子网 + 安全组) -
测试
- 先用简单文件读写验证挂载正常
- 再跑你的 AI agent 逻辑
完整模板:github.com/singledigit/lambda-s3-files-example
结语
S3 Files 不是一个更好的 FUSE 方案。
FUSE 方案(比如 s3fs、gcsfuse)是"让对象存储看起来像文件系统",但底层的语义差异导致各种限制。S3 Files 是"让文件系统操作直接工作在对象存储上",这是本质不同。
文件系统和 S3 的边界正在模糊。对于 AI workload,这意味着:你可以在 Lambda 上跑真正的多步骤 AI 工作流,多个 agent 共享工作空间,文件系统承担协调职责。代码里没有 S3 SDK 的 ceremony,只有文件路径和业务逻辑。
局限是真实存在的——VPC 复杂度、close-to-open 一致性、成本评估——但对于有这类需求的人,这些局限换来的是巨大的架构简化。
如果你在 Lambda 上跑过需要操作 S3 数据的 AI agent,你会发现这个特性的价值。如果没跑过,可以先在 VPC 模板上花 10 分钟,然后用一个简单的文件读写验证挂载是否正常。从那里开始,你已经有了在 Lambda 上跑 AI agent 的基础设施。
剩下的,就是写你的业务逻辑。
延伸阅读