Lambda Just Got a File System. I Put AI Agents on It.

写过 S3 触发 Lambda 的人都知道那个流程。

S3 事件来了,Lambda 醒来,第一件事是下载文件到 /tmp,处理,上传结果,清理 /tmp,避免空间用尽。然后为下一个文件重复这个过程。

我做过这个 pattern 多少次了。下载,处理,上传,清理。下载,处理,上传,清理。每多一个 function 要操作同一份数据,就多一份下载的副本,多一份 /tmp 的管理,多一份文件不同步的焦虑。

然后 AWS 推出了 S3 Files。

S3 Files 把 S3 bucket 直接挂载成 Lambda 函数上的本地文件系统。你不需要再下载文件到 /tmp,不需要再上传结果,不需要再清理临时空间。代码直接用 open() 操作文件,S3 Files 在后台处理所有同步。

我做的第一件事:搭了一套 AI 代码审查系统,三个 Lambda 函数同时挂载同一个 S3 bucket,通过文件系统共享工作空间,orchestrator 写文件,agents 读文件。结果,整个项目里最无聊的部分就是文件读写代码。

这就是重点。


/tmp 税:每个 Lambda 开发者都交过

如果你在 Lambda 上处理过 S3 数据,一定写过这样的代码:

import boto3
import os

s3 = boto3.client("s3")

def lambda_handler(event, context):
    bucket = event["bucket"]
    key = event["key"]

    # 第一件事:下载到 /tmp
    # S3 不给你文件,只给你对象
    local_path = f"/tmp/{key.split('/')[-1]}"
    s3.download_file(bucket, key, local_path)

    # 读文件,处理
    with open(local_path) as f:
        content = f.read()
    result = process(content)

    # 写回去,上传到 S3
    s3.put_object(Bucket=bucket, Key=f"output/{key}", Body=result)

    # 清理 /tmp,否则下次就没空间了
    os.remove(local_path)

这只是"读一个文件然后写回去",却要这么多仪式。

但这还不是最糟糕的。当你有多个 function 需要操作同一份数据时,复杂性急剧上升:

场景一:串行处理

Function A 处理完文件,生成中间结果。Function B 需要读取这个中间结果,继续处理。A 把结果上传到 S3,B 需要再下载一次。每多一个步骤,就多一次上传下载。

场景二:并行处理

Function A 和 Function B 同时需要读取同一份原始数据。各自下载自己的副本到 /tmp。如果原数据有 500MB,两个 function 同时跑,/tmp 就去掉了 1GB。如果处理的是更大的数据集呢?

场景三:数据一致性问题

Function A 写了一个文件,Function B 需要立即读取这个文件。但 A 写完还要上传,B 才能下载。如果上传失败了,或者 B 在上传完成前就读了,那 B 拿到的是旧数据。

这就是我所谓的 /tmp 税:为每个文件操作额外付出的代价,包括下载的带宽、上传的时间、/tmp 的空间管理、以及协调多个 function 之间数据同步的复杂度。

现有方案的局限

有些人会说,用 s3fs 或 smart_open 这样的库可以缓解这个问题。它们把 S3 操作封装成了类似本地文件系统的接口。

import s3fs

fs = s3fs.S3FileSystem()
with fs.open(f"s3://my-bucket/data/file.txt", "r") as f:
    content = f.read()

但它们底层还是在调用 SDK。代码跟 S3 的交互,依然是通过 boto3,不是通过文件系统。语义上有差异:错误处理模式不同、权限模型不同、文件系统的隐含保证(比如 close-to-open 一致性)不适用于 s3fs。这意味着你还是要小心处理那些在真正文件系统上不会遇到的问题。

这不是说这些库不好。它们是好的。只是它们解决的是"让 S3 操作更像本地文件"的问题,而 S3 Files 解决的是"让本地文件操作直接工作在 S3 上"的问题。这是本质不同的。


S3 Files for Lambda:直接用文件路径

S3 Files 是 AWS 2026 年 4 月推出的一项功能,它把你的 S3 bucket 挂载成 Lambda 函数上的本地文件系统。

用户 S3 事件触发 Lambda
         ↓
Lambda 挂载 S3 bucket 到 /mnt/workspace
         ↓
代码直接读写 /mnt/workspace/*
         ↓
S3 Files 处理后台同步到 S3

代码看起来是这样的:

from pathlib import Path

WORKSPACE = Path("/mnt/workspace")

def lambda_handler(event, context):
    # 直接从挂载点读取,不需要下载
    content = (WORKSPACE / "source" / "app.py").read_text()
    result = process(content)

    # 直接写到挂载点,不需要上传
    (WORKSPACE / "output" / "result.json").write_text(result)

不需要 boto3 来读写文件。不需要管理 /tmp。不需要上传步骤。文件系统本身就是接口。

S3 Files 的底层原理

S3 Files 底层基于 Amazon EFS 构建。但它不是简单地把 EFS 暴露出来,而是做了一个深度的集成:

  • 对常用数据:S3 Files 在高性能存储上缓存你的工作集,读延迟亚毫秒级
  • 对大型顺序读取:直接从 S3 流式传输,不经缓存
  • 写入的文件:自动在后台同步到 S3,更改在几分钟内出现在 S3 中
  • S3 中的更改:会在几秒内出现在挂载的文件系统上

你得到的是文件系统语义(标准 open/read/write/close 操作)加上 S3 的持久力和经济性。

限制:VPC 是必须的

但有一个重要的注意事项:S3 Files 需要 VPC。你的 Lambda 函数需要与挂载目标处于同一 VPC,且需要 NAT 网关来提供出站互联网访问。

作为一个长期搞 serverless 的人,我一般回避 VPC。VPC 配置曾经意味着冷启动 penalty(10+ 秒),意味着网络配置的复杂性,意味着要维护更多的基础设施。

但 AWS 这些年已经解决掉了大多数障碍:

  • VPC-attached Lambda 函数不再有过去的冷启动代价(我实测过,2 秒以内)
  • 网络配置可以模板化,写一次然后复用
  • AWS 提供 reusable VPC patterns,减少重复工作

为了 S3 Files 给你的东西(多个 Lambda 函数共享文件系统,不需要下载上传 ceremony),这个权衡值得。


我们要搭什么

我想用 S3 Files 测试点比"读个 CSV"更有意思的东西。

我搭了一个 serverless 代码审查系统。用户给它指一个公开的 GitHub 仓库 URL,然后三件事依次发生:

  1. 一个 durable orchestrator function 把仓库克隆到共享的 S3 Files 工作空间
  2. 一个安全审查 agent 和一个风格审查 agent 并行分析代码
  3. 结果以 JSON 文件形式落入同一工作空间,自动同步回 S3

关键设计:三个 Lambda 函数同时挂载同一个 S3 bucket

  • Orchestrator 写文件(克隆的仓库)
  • 两个 Agent 读文件(分析的代码)
  • 结果写入同一工作空间,S3 Files 处理同步

函数之间不传递 S3 key。不下载到 /tmp。不需要任何应用层的协调代码。文件系统就是协调层。

Agents 使用 Strands Agents SDK,配合 Amazon Bedrock。每个 agent 有自定义的文件工具,操作挂载路径。Claude 决定读哪些文件、分析什么、写入什么。整个过程中,agents 完全不知道自己在用 S3——它们以为自己在一个本地文件系统上工作。

Orchestrator 使用 Lambda Durable Functions 编排工作流,自动做 checkpointing。如果执行中断,AWS 会从最后一个 checkpoint 恢复,不需要重新克隆仓库或重启已完成的工作。

这是 AI agent 工作流在 serverless 架构上真正可行的关键:执行时间不确定的 agent 可以运行任意时长,不会因为 Lambda 的 15 分钟超时限制而失败。

完整源码:github.com/singledigit/lambda-s3-files-example


架构图:Lambda + FS + AI Agent 交互

                    ┌─────────────────────────────────────┐
                               S3 Bucket                  
                       (持久化存储,系统 of record)        
                       规模:EB 级别,跨 region 复制        
                    └──────────────┬──────────────────────┘
                                   
                    ┌──────────────▼──────────────────────┐
                         S3 Files FileSystem              
                       (基于 EFS,桥接 S3  NFS)          
                                                          
                       - 挂载点:/mnt/workspace            
                       - 自动同步到 S3(分钟级)           
                       - S3 更改即时可见(秒级)           
                       - 亚毫秒读延迟(缓存热数据)         
                    └──────────────┬──────────────────────┘
                                   
              ┌────────────────────┼────────────────────┐
                                                      
   ┌──────────▼──────────┐  ┌──────▼──────┐  ┌────────▼────────┐
     Orchestrator            Agent A         Agent B       
     (Durable Function)     (Security)        (Style)      
                                                           
     - GitHub API           - 读取代码      - 读取代码      
     - Clone repo           - 安全分析      - 风格分析     
     - 启动并行 agents      - 写结果        - 写结果        
     - Checkpoint 状态        JSON           JSON          
   └──────────────────────┘  └────────────┘  └─────────────────┘

数据流详解:

  1. 触发:用户 POST 请求,传入 GitHub 仓库 URL
  2. Step 1 - 克隆:Orchestrator 调用 GitHub API,克隆仓库到 /mnt/workspace/repo/
  3. Step 2 - 并行分析:Durable functions 同时启动 Agent A 和 Agent B
  4. Step 3 - 读取:两个 agent 都读取 /mnt/workspace/repo/ 中的代码文件
  5. Step 4 - 写入:各自把分析结果写入 /mnt/workspace/results/security.json/mnt/workspace/results/style.json
  6. Step 5 - 汇总:Orchestrator 读取两个结果文件,生成汇总报告,写入 /mnt/workspace/results/summary.json
  7. 自动同步:S3 Files 在后台将所有更改同步到 S3 bucket

关键特性:文件系统作为协调层

注意这里没有任何显式的协调代码。没有 SQS 队列传递消息,没有 DynamoDB 存储状态,没有 Redis 做 pub/sub。两个 agent 通过文件系统共享数据:

# Agent A 写入中间状态(供 Agent B 读取)
(WORKSPACE / "state" / "agent_a_progress.json").write_text(
    json.dumps({"step": 2, "files_analyzed": 15, "findings": [...]})
)

如果 Agent B 需要知道 Agent A 的进度,它直接读这个文件。文件系统承担了协调职责,代码只需要关注业务逻辑。


SAM 模板:基础设施即代码

这部分是花最多时间迭代的。S3 Files 是全新的功能,CloudFormation 资源类型还没进 linter。IDE 会报红,不用管,照写。

资源链:五个必需的组件

要让 S3 Files 在 Lambda 上跑起来,你需要五个资源,按依赖顺序:

S3 Bucket (启用了版本控制)
    ↓
IAM Role (S3 Files 用来访问 bucket)
    ↓
S3 Files FileSystem (桥接 bucket 和 NFS)
    ↓
Mount Targets (每个 AZ 一个,网络端点)
    ↓
Access Point (控制 Lambda 的 POSIX 身份)
    ↓
Lambda Function (挂载 access point)

资源类型是 AWS::S3Files::FileSystemAWS::S3Files::MountTargetAWS::S3Files::AccessPoint。记住:IDE 的 CloudFormation linter 还不认识它们,忽略红色波浪线,这不是代码问题。

IAM Role 的坑

S3 Files 的 IAM role 信任的是 elasticfilesystem.amazonaws.com,不是 s3files.amazonaws.com。这点坑了我两个小时。

为什么?因为 S3 Files 底层是 EFS,所以信任关系走的是 EFS 服务主体,不是 S3 Files 服务主体。

S3FilesRole:
  Type: AWS::IAM::Role
  Properties:
    Path: /service-role/
    AssumeRolePolicyDocument:
      Version: '2012-10-17'
      Statement:
        - Sid: AllowS3FilesAssumeRole
          Effect: Allow
          Principal:
            Service: elasticfilesystem.amazonaws.com
          Action: sts:AssumeRole
          Condition:
            StringEquals:
              aws:SourceAccount: !Ref AWS::AccountId
            ArnLike:
              aws:SourceArn: !Sub 'arn:aws:s3files:${AWS::Region}:${AWS::AccountId}:file-system/*'

角色需要对 bucket 有读写权限,scope 到你的 bucket ARN:

S3FilesBucketPolicy:
  Type: AWS::S3::BucketPolicy
  Properties:
    Bucket: !Ref S3FilesBucket
    PolicyDocument:
      Statement:
        - Effect: Allow
          Principal:
            Service: elasticfilesystem.amazonaws.com
          Action:
            - s3:GetObject
            - s3:PutObject
            - s3:DeleteObject
            - s3:ListBucket
          Resource:
            - !Sub "${S3FilesBucket.Arn}/*"
            - !Sub "${S3FilesBucket.Arn}"
          Condition:
            StringEquals:
              aws:ResourceAccount: !Ref AWS::AccountId

Access Point:最关键的部分

这是对 Lambda 最重要的一部分。Access Point 控制你的函数以什么 POSIX 身份运行,并创建一个可写的根目录。

没有它,Lambda 可以挂载文件系统但写不进去(权限被 root 阻塞)。

S3FilesAccessPoint:
  Type: AWS::S3Files::AccessPoint
  Properties:
    FileSystemId: !GetAtt S3FileSystem.FileSystemId
    PosixUser:
      Uid: '1000'
      Gid: '1000'
    RootDirectory:
      Path: /lambda
      CreationPermissions:
        OwnerUid: '1000'
        OwnerGid: '1000'
        Permissions: '755'

CreationPermissions 属性至关重要。它在客户端首次连接时自动创建 /lambda 目录并设置正确的所有权。如果没有这个属性,根目录属于 root(UID 0),Lambda(以 UID 1000 运行)无法创建子目录,function 会报 "Permission denied" 错误。

Lambda 配置

Lambda 那边的配置,FileSystemConfigs 接收 access point ARN(不是 file system ARN)和本地挂载路径:

OrchestratorFunction:
  Type: AWS::Serverless::Function
  DependsOn:
    - MountTargetA
    - MountTargetB
  Properties:
    FunctionName: !Sub "${AWS::StackName}-orchestrator"
    Handler: orchestrator.handler
    Runtime: python3.12
    MemorySize: 1024
    Timeout: 900  # 15 分钟,支持长运行的 durable 流程
    Environment:
      Variables:
        WORKSPACE_PATH: /mnt/workspace
    FileSystemConfigs:
      - Arn: !GetAtt S3FilesAccessPoint.AccessPointArn
        LocalMountPath: /mnt/workspace
    VpcConfig:
      SecurityGroupIds:
        - !GetAtt NetworkingStack.Outputs.LambdaSGId
      SubnetIds:
        - !GetAtt NetworkingStack.Outputs.PrivateSubnetAId
        - !GetAtt NetworkingStack.Outputs.PrivateSubnetBId
    Policies:
      - AmazonS3FilesClientReadWriteAccess
      - AWSLambdaVPCAccessExecutionRole

DependsOn 对 mount targets 的依赖很重要。Lambda 在 mount targets 可用之前无法挂载文件系统,创建 mount targets 大约需要五分钟。如果不设置这个依赖,第一次调用会失败,直到 mount targets 真正就绪。


Agent 代码:自定义文件工具

agents 使用 Strands Agents SDK。每个 agent 有自定义的文件工具,操作挂载路径:

from strands import Agent
from strands.tools import tool
from pathlib import Path
import json

WORKSPACE = Path("/mnt/workspace")

@tool
def read_code_file(relative_path: str) -> str:
    """Read a code file from the shared workspace.

    Args:
        relative_path: Path relative to the repo root,
                       e.g., 'src/handlers/auth.py'
    Returns:
        File contents as string, or error message.
    """
    full_path = WORKSPACE / "repo" / relative_path
    if not full_path.exists():
        return f"Error: File not found: {relative_path}"
    if not full_path.is_file():
        return f"Error: Not a file: {relative_path}"
    return full_path.read_text()

@tool
def list_code_files(extension: str = ".py") -> list[str]:
    """List all code files in the repo with given extension.

    Args:
        extension: File extension to filter by (default: .py)
    Returns:
        List of relative file paths.
    """
    repo_path = WORKSPACE / "repo"
    return [
        str(p.relative_to(repo_path))
        for p in repo_path.rglob(f"*{extension}")
    ]

@tool
def write_analysis_result(analysis_type: str, content: str) -> str:
    """Write analysis results to shared workspace.

    Args:
        analysis_type: Type of analysis (e.g., 'security', 'style')
        content: JSON string of analysis results
    Returns:
        Path to written file.
    """
    result_path = WORKSPACE / "results" / f"{analysis_type}.json"
    result_path.parent.mkdir(parents=True, exist_ok=True)
    result_path.write_text(content)
    return f"Written to {result_path}"

# 安全审查 agent
security_agent = Agent(
    model="anthropic.claude-3-5-sonnet-20241022",
    tools=[read_code_file, list_code_files, write_analysis_result],
    systemPrompt="""You are a security review agent.

    Your job is to analyze code for security vulnerabilities.

    Focus areas:
    - SQL injection risks (string concatenation in queries)
    - XSS vulnerabilities (unescaped user input in output)
    - Secrets hardcoded in code (API keys, passwords, tokens)
    - Insecure deserialization (pickle, eval on user input)
    - Authentication/authorization bypasses
    - Path traversal vulnerabilities

    Use list_code_files to find relevant files, read_code_file to
    inspect them, and write_analysis_result to save your findings.

    Findings should be structured as JSON with severity (HIGH/MEDIUM/LOW),
    file path, line number if known, and description."""
)

# 代码风格审查 agent
style_agent = Agent(
    model="anthropic.claude-3-5-sonnet-20241022",
    tools=[read_code_file, list_code_files, write_analysis_result],
    systemPrompt="""You are a code style review agent.

    Your job is to analyze code for style issues and maintainability problems.

    Focus areas:
    - Code smells (long methods, deep nesting, duplicated code)
    - Best practice violations (no type hints, magic numbers)
    - Error handling missing or incorrect
    - Naming conventions (unclear variable names, inconsistent casing)
    - Documentation missing or inadequate

    Use list_code_files to find relevant files, read_code_file to
    inspect them, and write_analysis_result to save your findings.

    Findings should be structured as JSON with severity (HIGH/MEDIUM/LOW),
    file path, line number if known, and description."""
)

关键是:agents 完全不知道自己在用 S3。它们以为自己在一个本地文件系统上工作。文件系统的协调工作——多个 agent 读写同一份数据——完全由 S3 Files 处理,代码不需要为此做任何特殊处理。

这就是 S3 Files 的真正价值:把分布式系统的复杂性从应用层下沉到基础设施层。


实际案例:AI Agent 文件处理场景

场景一:多 Agent 并行代码审查(我们实际跑通的)

用户传入 GitHub 仓库 URL,orchestrator 克隆仓库到 /mnt/workspace/repo/,然后并行启动安全审查和风格审查两个 agent。

from aws_lambda_powertools import Logger
from amazon.lambda.durable import DeterministicUuid, when_all, when_any
from pathlib import Path
import json

logger = Logger()

@DeterministicUuid
def orchestration_id(event, context):
    return event["repo_url"]

def lambda_handler(event, context):
    orchestrator = CodeReviewOrchestrator()
    return orchestrator.handle(event, context)

class CodeReviewOrchestrator:
    def __init__(self):
        self.workspace = Path("/mnt/workspace")
        self.repo_path = self.workspace / "repo"
        self.results_path = self.workspace / "results"

    def handle(self, event, context):
        repo_url = event["repo_url"]
        execution_id = context.execution_id

        logger.info(f"Starting code review", extra={"repo": repo_url, "execution": execution_id})

        # Step 1: Clone repository to workspace
        clone_result = self.clone_repo(repo_url)
        if not clone_result["success"]:
            return {"status": "error", "message": clone_result["error"]}

        # Step 2: Launch agents in parallel
        agent_tasks = self.launch_agents(execution_id)

        # Step 3: Wait for all agents to complete
        # (Durable functions handles checkpointing during wait)
        agent_results = self.wait_for_agents(agent_tasks)

        # Step 4: Compile results
        summary = self.compile_results(agent_results)

        return {
            "status": "complete",
            "execution_id": execution_id,
            "summary": summary
        }

    def clone_repo(self, repo_url):
        """Clone GitHub repo to workspace."""
        import subprocess
        try:
            self.repo_path.mkdir(parents=True, exist_ok=True)
            subprocess.run(
                ["git", "clone", "--depth", "1", repo_url, str(self.repo_path)],
                check=True,
                capture_output=True
            )
            return {"success": True}
        except Exception as e:
            logger.error(f"Clone failed: {e}")
            return {"success": False, "error": str(e)}

    def launch_agents(self, execution_id):
        """Start security and style agents in parallel."""
        # In practice, this uses Lambda durable functions to
        # launch child executions for each agent
        return [
            {"agent": "security", "task_id": f"{execution_id}-security"},
            {"agent": "style", "task_id": f"{execution_id}-style"}
        ]

    def wait_for_agents(self, agent_tasks):
        """Wait for all agent tasks to complete."""
        results = {}
        for task in agent_tasks:
            # Read result file written by agent
            result_file = self.results_path / f"{task['agent']}.json"
            if result_file.exists():
                results[task['agent']] = json.loads(result_file.read_text())
            else:
                results[task['agent']] = {"status": "pending"}
        return results

    def compile_results(self, agent_results):
        """Compile individual results into summary."""
        total_findings = sum(
            len(r.get("findings", []))
            for r in agent_results.values()
        )
        return {
            "agents_run": len(agent_results),
            "total_findings": total_findings,
            "by_agent": {
                agent: len(r.get("findings", []))
                for agent, r in agent_results.items()
            }
        }

场景二:大型数据集预处理流水线

S3 Files 另一个强场景是数据预处理。对于需要读取大量文件、做转换、然后写回去的 pipeline,不再需要每个 step 都下载上传。

def lambda_handler(event, context):
    workspace = Path("/mnt/workspace")
    input_dir = workspace / "data" / "raw"
    output_dir = workspace / "data" / "processed"

    # 确保输出目录存在
    output_dir.mkdir(parents=True, exist_ok=True)

    # 处理每个 CSV 文件
    processed_count = 0
    for file_path in input_dir.glob("*.csv"):
        try:
            df = pd.read_csv(file_path)

            # 数据清洗
            cleaned = clean_data(df)

            # 转换
            transformed = transform_data(cleaned)

            # 写出结果(直接写,自动同步到 S3)
            output_path = output_dir / file_path.name
            transformed.to_csv(output_path, index=False)

            processed_count += 1
            logger.info(f"Processed {file_path.name}")

        except Exception as e:
            logger.error(f"Failed to process {file_path}: {e}")

    return {
        "status": "complete",
        "processed": processed_count
    }

关键点:不再需要手动 s3.upload_file()。写入即在 S3 中可用。如果处理过程中 Lambda 崩溃,EFS 的缓存确保数据不丢失,S3 的最终一致性确保数据最终会持久化。

场景三:多 Agent 协作的共享状态

多 agent 协作时,agent 之间需要共享中间结果或状态。用 S3 Files,这变得异常简单:

# Agent A: 分析第一部分代码,写入进度
(WORKSPACE / "state" / "phase1_findings.json").write_text(
    json.dumps({
        "phase": 1,
        "files_analyzed": 15,
        "findings": [...]
    })
)

# Agent B: 检查 Phase 1 是否完成,决定是否开始
phase1_state = (WORKSPACE / "state" / "phase1_findings.json")
if phase1_state.exists():
    state = json.loads(phase1_state.read_text())
    if state["phase"] == 1:
        # 开始 Phase 2
        pass

不再需要通过 SQS/DynamoDB/Redis 做协调。文件系统就是共享状态层。Agent A 写,Agent B 读,就这么简单。


Lambda Durable Functions:编排 AI 工作流

Lambda Durable Functions 是让这套架构真正 work 的另一块积木。它们让你用顺序代码写长运行的工作流,自动做 checkpointing 和故障恢复。

为什么需要它?

AI agent 的执行时间不确定。一个代码审查 agent 可能 30 秒完成,也可能 5 分钟。如果用普通 Lambda,你要在 15 分钟超时之前完成任务,否则失败重跑。

但更重要的是:如果 agent 执行到一半,Lambda 因为任何原因崩溃了怎么办?普通 Lambda 没有状态保留,崩溃即重来。

Lambda Durable Functions 解决了这两个问题:

  1. 超时不再是问题:Durable execution 可以运行长达 1 年(虽然我们的场景只需要几分钟)
  2. 状态自动 checkpoint:每一步执行完,状态自动保存。如果执行中断,AWS 从最后一个 checkpoint 恢复,不需要重新执行已完成的工作
from aws_lambda_powertools import Logger
from amazon.lambda.durable import (
    DeterministicUuid,
    orchestration_context,
    current_step
)
import json

logger = Logger()

def lambda_handler(event, context):
    """Entry point for durable orchestration."""
    orchestrator = CodeReviewOrchestrator()
    return orchestrator.handle(event, context)

class CodeReviewOrchestrator:
    def __init__(self):
        self.workspace = Path("/mnt/workspace")

    def handle(self, event, context):
        repo_url = event["repo_url"]

        # 获取当前 orchestrator 状态
        state = orchestration_context()
        current_step_name = current_step()

        if current_step_name == "clone":
            return self.step_clone(repo_url)

        elif current_step_name == "analyze":
            return self.step_analyze()

        elif current_step_name == "summarize":
            return self.step_summarize()

        # 初始调用:从 Step 1 开始
        return self.start_orchestration(repo_url)

    def start_orchestration(self, repo_url):
        """Start the orchestration pipeline."""
        return {
            "steps": [
                {"name": "clone", "action": "clone_repo", "params": {"repo_url": repo_url}},
                {"name": "analyze", "action": "parallel_agents"},
                {"name": "summarize", "action": "compile_results"}
            ]
        }

    def step_clone(self, repo_url):
        """Clone repository (could take a few minutes for large repos)."""
        logger.info(f"Cloning {repo_url}")
        # 这个 step 可能需要几分钟,durable function 会自动 checkpoint
        clone_result = self.clone_repo(repo_url)
        return {
            "next_step": "analyze",
            "checkpoint_data": {"clone_status": "complete"}
        }

    def step_analyze(self):
        """Launch agents and wait for completion."""
        # 在实际实现中,这里会使用
        # durable_function.activity() 来启动 agent 执行
        # 并使用 when_all() 等待它们完成
        return {
            "next_step": "summarize",
            "checkpoint_data": {"agents_status": "complete"}
        }

    def step_summarize(self):
        """Compile and return final results."""
        results = self.read_results()
        return {
            "status": "complete",
            "results": results
        }

关键点:即使 orchestrator 等待 agent 完成(可能几分钟),整个执行状态都被 checkpoint。如果 Lambda 在等待期间超时或崩溃,AWS 从最后一个 checkpoint 恢复,重新进入等待状态,而不是重新执行整个流程。


个人观点:为什么这个特性重要,以及它的局限

为什么重要

1. 文件系统语义 + S3 规模

过去在 Lambda 上操作 S3 数据,你需要用 SDK,操心下载/上传/清理。现在可以直接用文件系统 API,同时拥有 S3 的规模和持久力。对于需要操作大量文件的 AI workload,这是本质改变。

不是"更好用的 S3",是"真正的本地文件系统,但数据在 S3"。

2. 多 Agent 共享工作空间

在没有 S3 Files 之前,多个 Lambda 函数操作同一份数据,需要通过 event 传递 S3 key,或者用 DynamoDB/Redis 做协调。这些方案都有效,但都需要额外的应用层代码来处理分布式系统的复杂性。

现在文件系统本身就是协调层。多个 agent 同时读写同一份数据,不需要任何应用层协调代码。S3 Files 的底层处理了所有一致性、冲突和同步的问题(close-to-open 一致性模型下)。

3. AI Agent 原生体验

AI agents 用文件、路径、本地脚本思考。这是它们的 mental model。S3 Files 让它们能把 exabyte 规模的 S3 bucket 当成本地硬盘用,不再受困于 API 调用开销。

不再需要告诉 agent:"你要先调用 S3 API 下载这个文件,然后处理,然后调用 S3 API 上传结果。"现在只需说:"文件在 /mnt/workspace/repo/ 下,去读吧。"

4. Serverless AI 工作流成为可能

Lambda Durable Functions + S3 Files = 可以在 Lambda 上跑真正的多步骤 AI 工作流:

  • Durable functions 处理长运行的 orchestration 和 checkpointing
  • S3 Files 提供 agent 之间的共享工作空间
  • 无需管理服务器,无需预置容量,按实际执行时间付费

这是一个我一直想要的架构,但之前缺少关键组件。现在它完整了。

局限

1. 一致性模型:close-to-open

S3 Files 提供 close-to-open 一致性。这意味着:

  • 如果 Function A 写了一个文件,Function B 立即去读,B 可能看不到最新版本
  • 如果你在 S3 控制台上传了一个文件,Lambda function 立即去读,function 可能看不到这个文件

对于我的场景(orchestrator 先写,agents 后读,顺序是自然的),这不是问题。但如果你需要实时协调并发写入的 agent,比如两个 agent 同时写同一个文件的的不同部分,这个模型会出问题。

解决方案:在需要严格一致性的场景,在应用层做确认(比如写完后立即读回来验证),或者用一个 agent 做写入协调。

2. VPC 复杂度

S3 Files 需要 VPC,这是额外的配置门槛。虽然 AWS 已经大幅简化了 VPC-attached Lambda 的冷启动问题,但:

  • 需要创建私有子网(至少两个,用于 HA)
  • 需要配置 NAT 网关/网关终端节点
  • 需要正确配置安全组(开放 2049 端口给 NFS)
  • 需要给 Lambda 函数 VPC 权限

这不是每天都要做的事,但每开始一个新项目就要做一次。AWS 提供了一些 reusable patterns,但还是要花时间理解网络配置。

建议:把这个配置模板化,一次做好,之后复用。

3. 冷启动残留

虽然冷启动已经从过去的 10+ 秒降到了 2 秒左右(我实测过),但对于 latency 敏感的交互式场景,2 秒仍然值得考量。

如果你的用户期待毫秒级响应,S3 Files 可能不是最优选择。考虑用 Lambda 的预置并发(Provisioned Concurrency)来消除冷启动,但这会增加成本。

4. 成本评估

S3 Files 基于 EFS,EFS 的按读写次数计费模式在高频率文件操作场景下成本会比纯 S3 API 调用高。

对于我的场景(代码审查,每个文件读一次,结果写一次),文件操作频率相对低,成本可接受。但对于需要频繁读写(每秒数千次)的场景,需要仔细评估。

建议:用 S3 Files cost calculator 估算你的 workload 成本,和纯 S3 API 方案对比。

5. 调试复杂度

当多个 Lambda 函数通过文件系统交互时,调试变得更复杂:

  • 文件系统状态不容易直观查看(不像 S3,可以用控制台浏览)
  • 多个函数的日志需要关联才能理解完整的执行流
  • 一致性问题可能在特定时序下才出现,难以复现

建议:在代码里加入详细的结构化日志,包括文件操作的时间戳和结果。


部署清单:如果想自己试

  1. 创建 VPC
    - 至少两个私有子网(不同 AZ)
    - NAT 网关(或 NAT Instance,用于私有子网的互联网访问)
    - 安全组:入方向开放 2049 端口(NFS),出方向允许所有

  2. 创建 S3 Bucket
    - 启用版本控制(必需)
    - 记录 bucket ARN

  3. 创建 S3 Files FileSystem
    yaml S3FileSystem: Type: AWS::S3Files::FileSystem Properties: Name: my-lambda-workspace

  4. 创建 Mount Targets
    - 每个 AZ 创建一个
    - 等待它们变为 available 状态(约 5 分钟)

  5. 创建 Access Point
    - 设置 PosixUser (Uid: 1000, Gid: 1000)
    - 设置 RootDirectory 为 /lambda,CreationPermissions 为 755

  6. 配置 IAM Role
    - 信任 elasticfilesystem.amazonaws.com
    - 附加 AmazonS3FilesClientReadWriteAccess 策略

  7. 部署 Lambda 函数
    - 指定 FileSystemConfigs (access point ARN + /mnt/workspace)
    - 指定 VpcConfig (子网 + 安全组)

  8. 测试
    - 先用简单文件读写验证挂载正常
    - 再跑你的 AI agent 逻辑

完整模板:github.com/singledigit/lambda-s3-files-example


结语

S3 Files 不是一个更好的 FUSE 方案。

FUSE 方案(比如 s3fs、gcsfuse)是"让对象存储看起来像文件系统",但底层的语义差异导致各种限制。S3 Files 是"让文件系统操作直接工作在对象存储上",这是本质不同。

文件系统和 S3 的边界正在模糊。对于 AI workload,这意味着:你可以在 Lambda 上跑真正的多步骤 AI 工作流,多个 agent 共享工作空间,文件系统承担协调职责。代码里没有 S3 SDK 的 ceremony,只有文件路径和业务逻辑。

局限是真实存在的——VPC 复杂度、close-to-open 一致性、成本评估——但对于有这类需求的人,这些局限换来的是巨大的架构简化。

如果你在 Lambda 上跑过需要操作 S3 数据的 AI agent,你会发现这个特性的价值。如果没跑过,可以先在 VPC 模板上花 10 分钟,然后用一个简单的文件读写验证挂载是否正常。从那里开始,你已经有了在 Lambda 上跑 AI agent 的基础设施。

剩下的,就是写你的业务逻辑。


延伸阅读