目录

Claude API基础专题(七):Agent架构与智能体设计

Claude API基础专题(七):Agent架构与智能体设计 ⭐⭐⭐⭐

目标读者:希望构建复杂AI应用系统的架构师与高级开发者 前置知识:已完成第一篇《API基础》、第二篇《提示词工程》、第三篇《工具调用》、第四篇《RAG系统》、第五篇《MCP协议》、第六篇《Claude Code与Computer Use》 学习提醒:本文是Claude API系列的收官之作,将系统性地梳理Agent架构的核心概念与设计模式


章节导航

小节主题重要程度
7.1从工具调用到Agent:跨越的关键一步⭐⭐⭐⭐⭐
7.2单Agent系统架构⭐⭐⭐⭐⭐
7.3多Agent协作系统⭐⭐⭐⭐⭐
7.4执行模式:并行与链式⭐⭐⭐⭐⭐
7.5状态管理与上下文⭐⭐⭐⭐⭐
7.6错误处理与容错机制⭐⭐⭐⭐⭐
7.7安全与权限管理⭐⭐⭐⭐
7.8生产环境最佳实践⭐⭐⭐⭐⭐

7.1 从工具调用到Agent:跨越的关键一步

为什么需要Agent?

在深入Agent架构之前,我们必须回答一个根本问题:既然工具调用已经如此强大,为什么还需要Agent?

让我们回顾一下工具调用的本质:

# 工具调用的工作模式
result = await client.messages.create(
    model="claude-opus-4-20241120",
    messages=[{
        "role": "user",
        "content": "帮我查一下北京天气"
    }],
    tools=[{"name": "get_weather", ...}]
)

在这个模式中,LLM的角色是被动响应者

  • 用户提问 → LLM调用工具 → 工具返回结果 → LLM回答

这是一种请求-响应的交互模式,LLM本身不持有状态,不主动决策,只是根据当前输入决定调用哪个工具。

但现实世界的任务往往更加复杂:

任务特征工具调用的局限为什么需要Agent
多步骤决策每次决策独立Agent能保持目标状态
条件分支无法根据结果跳转Agent能动态规划路径
长期任务上下文会丢失Agent能持久化状态
多工具协同缺乏编排能力Agent能编排执行流程
错误恢复失败即终止Agent能重试和回退

Agent的本质定义

那么,究竟什么是Agent?

Agent = LLM + 状态 + 工具 + 执行循环 + 终止条件

这个定义揭示了Agent的五个核心要素:

┌─────────────────────────────────────────────────────────────┐
│                        Agent 系统                           │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   ┌───────────┐                                            │
│   │    LLM    │ ← 大脑:理解、推理、决策                     │
│   └─────┬─────┘                                            │
│         │                                                  │
│   ┌─────▼─────┐                                            │
│   │   状态    │ ← 记忆:目标、上下文、中间结果                 │
│   └─────┬─────┘                                            │
│         │                                                  │
│   ┌─────▼─────┐                                            │
│   │   工具    │ ← 能力:搜索、代码、文件、API                 │
│   └─────┬─────┘                                            │
│         │                                                  │
│   ┌─────▼─────┐                                            │
│   │  执行循环  │ ← 引擎:观察→决策→执行                      │
│   └─────┬─────┘                                            │
│         │                                                  │
│   ┌─────▼─────┐                                            │
│   │  终止条件  │ ← 边界:完成、出错、超时                     │
│   └───────────┘                                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

为什么这五个要素缺一不可?

  • LLM:没有LLM就没有智能决策
  • 状态:没有状态就无法处理多步骤任务
  • 工具:没有工具就无法影响外部世界
  • 执行循环:没有循环就无法持续工作
  • 终止条件:没有终止条件就会无限循环

Agent vs 传统软件

维度传统软件Agent
决策方式确定性的if-else基于LLM的概率推理
流程定义预先设计运行时动态规划
错误处理显式try-catch自我纠错能力
状态管理显式变量隐式上下文
扩展方式模块化添加工具/提示词
可预测性中(有一定随机性)

7.2 单Agent系统架构

最小可运行Agent

让我们从一个最简单的Agent开始,逐步理解其架构:

from anthropic import Anthropic
from dataclasses import dataclass, field
from typing import Optional, Any
import asyncio

@dataclass
class AgentState:
    """
    Agent状态容器
    
    为什么需要专门的状态类?
    1. 封装所有状态,边界清晰
    2. 便于序列化和持久化
    3. 类型提示让代码更健壮
    """
    goal: str                           # 当前目标
    messages: list[dict] = field(default_factory=list)  # 对话历史
    context: dict[str, Any] = field(default_factory=dict) # 共享上下文
    results: list[dict] = field(default_factory=list)    # 执行结果
    iterations: int = 0                 # 迭代计数器
    max_iterations: int = 10           # 最大迭代次数

class SimpleAgent:
    """
    最小可运行Agent
    
    设计原则:
    - 简单性:先让它跑起来
    - 可观测性:每个步骤都记录
    - 可停止:明确的终止条件
    """
    
    def __init__(self, api_key: str, tools: list[dict]):
        self.client = Anthropic(api_key=api_key)
        self.tools = tools
    
    async def run(self, goal: str) -> dict[str, Any]:
        """
        Agent主循环
        
        流程:
        1. 初始化状态
        2. 进入执行循环
        3. 每次迭代:LLM推理 → 调用工具 → 更新状态
        4. 达到终止条件时退出
        """
        state = AgentState(goal=goal)
        
        while not self._should_terminate(state):
            state.iterations += 1
            
            # 步骤1:LLM推理
            response = await self._think(state)
            
            # 步骤2:检查是否需要调用工具
            if response.content[0].type == "tool_use":
                tool_result = await self._execute_tool(response)
                state.results.append(tool_result)
                state.messages.append(response)
                state.messages.append(tool_result)
            else:
                # LLM直接回答,任务完成
                return {
                    "status": "completed",
                    "goal": goal,
                    "result": response.content[0].text,
                    "iterations": state.iterations
                }
        
        return {
            "status": "terminated",
            "goal": goal,
            "reason": "max_iterations_reached",
            "iterations": state.iterations
        }
    
    def _should_terminate(self, state: AgentState) -> bool:
        """判断是否应该终止"""
        # 达到最大迭代次数
        if state.iterations >= state.max_iterations:
            return True
        # 其他终止条件可以在这里添加
        return False
    
    async def _think(self, state: AgentState) -> Any:
        """LLM推理"""
        response = self.client.messages.create(
            model="claude-opus-4-20241120",
            max_tokens=4096,
            messages=[{
                "role": "user",
                "content": f"目标:{state.goal}\n\n请决定下一步行动。"
            }],
            tools=self.tools
        )
        return response
    
    async def _execute_tool(self, response: Any) -> dict:
        """执行工具调用"""
        tool_use = response.content[0]
        tool_name = tool_use.name
        tool_args = tool_use.input
        
        # 这里应该调用实际的工具
        # 为了简化,省略具体实现
        return {
            "role": "user",
            "content": f"Tool {tool_name} executed with args {tool_args}"
        }

为什么要设计成这样的架构?

1. 状态外部化(AgentState类)

# 好:状态外部化
state = AgentState(goal=goal)
while not self._should_terminate(state):
    ...

# 不好:状态散落在各处
while self.iterations < self.max_iterations:
    self.messages.append(...)
    self.context.update(...)

原因

  • 状态外部化后,可以序列化保存(断点续跑)
  • 状态外部化后,可以热切换(修改状态不影响逻辑)
  • 状态外部化后,可以并行运行多个Agent实例

2. 终止条件前置判断

# 好:先检查再执行
while not self._should_terminate(state):
    response = await self._think(state)
    ...

# 不好:先执行再检查
while True:
    response = await self._think(state)
    if self._should_terminate(state):
        break

原因:避免在达到终止条件后还执行一次无用推理。

3. 结果记录完整

state.results.append(tool_result)
state.messages.append(response)
state.messages.append(tool_result)  # 工具结果也加入上下文

原因:保持完整的对话历史,让LLM能够理解完整的执行脉络。


7.3 多Agent协作系统

为什么需要多Agent?

现实世界的复杂任务往往需要分工协作

任务:帮用户规划一次旅行

单个Agent的问题:
- 需要同时是旅行专家 + 酒店专家 + 天气专家 + 预算专家
- 知识过于分散,难以精通所有领域
- 单一Agent处理所有任务,响应会变慢

多Agent方案:
- 旅行规划Agent(主Agent)负责任务分解和协调
- 酒店Agent负责搜索和推荐酒店
- 天气Agent负责查询天气预报
- 预算Agent负责计算和控制预算

多Agent架构模式

模式一:星型架构(主从模式)

                    ┌─────────────┐
                    │  主Agent    │
                    │ (协调者)    │
                    └──────┬──────┘
                           │
          ┌────────────────┼────────────────┐
          │                │                │
    ┌─────▼─────┐   ┌─────▼─────┐   ┌─────▼─────┐
    │ 子Agent1  │   │ 子Agent2  │   │ 子Agent3  │
    │ (酒店)    │   │ (天气)    │   │ (预算)    │
    └───────────┘   └───────────┘   └───────────┘
class MasterAgent:
    """
    星型架构主Agent
    
    特点:
    - 主Agent负责任务分解和结果汇总
    - 子Agent只做特定领域的任务
    - 主Agent和子Agent之间通过消息传递
    """
    
    def __init__(self):
        self.sub_agents = {
            "hotel": HotelAgent(),
            "weather": WeatherAgent(),
            "budget": BudgetAgent()
        }
    
    async def run(self, task: str) -> dict:
        # 1. 分解任务
        subtasks = await self._decompose_task(task)
        
        # 2. 并行派发子任务
        results = await asyncio.gather(*[
            self._run_subagent(name, subtask)
            for name, subtask in subtasks.items()
        ])
        
        # 3. 汇总结果
        return await self._aggregate_results(results)

模式二:链式架构(流水线模式)

┌───────────┐    ┌───────────┐    ┌───────────┐    ┌───────────┐
│  Agent1   │ →  │  Agent2   │ →  │  Agent3   │ →  │  Agent4   │
│ (预处理)   │    │ (核心处理) │    │ (验证)    │    │ (输出)    │
└───────────┘    └───────────┘    └───────────┘    └───────────┘
class ChainAgent:
    """
    链式架构Agent
    
    特点:
    - 每个Agent有明确的职责(预处理/核心/验证/输出)
    - 输出层层传递,形成处理流水线
    - 适合有明确处理步骤的任务
    """
    
    def __init__(self):
        self.chain = [
            PreprocessAgent(),
            CoreAgent(),
            ValidationAgent(),
            OutputAgent()
        ]
    
    async def run(self, input_data: Any) -> Any:
        current = input_data
        
        for agent in self.chain:
            current = await agent.process(current)
            
            if not self._is_valid(current):
                # 验证失败,触发错误处理
                return await self._handle_error(current, agent)
        
        return current

模式三:网状架构(对等模式)

┌───────────┐────────────┐
│  Agent1   │──────────││
└───┬───────┘          ││
    │                  ││
    │    ┌─────────────▼┐
    └───►│  Agent2      │
         └──────┬───────┘
                │
         ┌──────▼───────┐
         │  Agent3       │
         └───────────────┘
class PeerNetwork:
    """
    网状架构(对等模式)
    
    特点:
    - 没有中心协调者
    - Agent之间可以直接通信
    - 适合需要横向协作的场景
    - 复杂度高,需要好的协调机制
    """
    
    async def run(self, task: str) -> dict:
        # 使用消息队列进行Agent间通信
        queue = MessageQueue()
        
        # 启动所有Agent
        agents = [Agent(i) for i in range(3)]
        tasks = [agent.run(queue) for agent in agents]
        
        # 发送初始任务
        await queue.publish("task", task)
        
        # 等待所有Agent完成
        results = await asyncio.gather(*tasks)
        
        return self._merge_results(results)

如何选择架构模式?

场景推荐架构原因
任务明确、子任务独立星型便于并行、易于管理
处理流程固定链式清晰、易于调试
横向协作、无固定流程网状灵活、适应性强
需要高可靠性星型+链式混合主备冗余

7.4 执行模式:并行与链式

并行执行

并行执行是提升Agent效率的关键技术:

async def parallel_execution(tasks: list[dict]) -> list[dict]:
    """
    并行执行多个任务
    
    为什么并行能提升效率?
    假设每个任务需要3秒:
    - 串行:3 * N 秒
    - 并行:约3秒(理想情况)
    
    注意:
    - 并行任务之间不能有依赖
    - 需要处理好并发竞争问题
    """
    async def execute_single(task: dict) -> dict:
        agent = create_agent(task["type"])
        result = await agent.run(task["input"])
        return {"task_id": task["id"], "result": result}
    
    # 使用asyncio.gather实现真正并行
    results = await asyncio.gather(*[
        execute_single(task) for task in tasks
    ])
    
    return results

什么任务适合并行?

# 适合并行的任务
parallel_tasks = [
    {"id": 1, "type": "search", "input": "北京天气"},
    {"id": 2, "type": "search", "input": "上海天气"},  # 独立
    {"id": 3, "type": "search", "input": "广州天气"},  # 独立
]

# 不适合并行的任务(存在依赖)
sequential_tasks = [
    {"id": 1, "type": "search", "input": "用户ID"},
    {"id": 2, "type": "fetch", "input": "依赖任务1的结果"},  # 依赖
    {"id": 3, "type": "save", "input": "依赖任务2的结果"},   # 依赖
]

链式执行

链式执行用于有依赖关系的任务:

async def chain_execution(tasks: list[dict]) -> dict:
    """
    链式执行(串行依赖)
    
    执行流程:
    Task1 → Task2 → Task3 → ... → TaskN
    
    每个任务接收前一个任务的输出作为输入
    """
    context = {}
    
    for i, task in enumerate(tasks):
        # 构建当前任务的输入
        task_input = await self._prepare_input(task, context)
        
        # 执行当前任务
        agent = create_agent(task["type"])
        result = await agent.run(task_input)
        
        # 保存结果到上下文,供后续任务使用
        context[task["id"]] = result
        
        # 检查是否需要错误恢复
        if not self._is_success(result):
            if task.get("retry"):
                result = await self._retry(task, context)
            else:
                raise ExecutionError(f"Task {task['id']} failed")
    
    return context  # 返回完整的上下文(包含所有任务的结果)

并行与链式的混合模式

class HybridExecutor:
    """
    混合执行器:并行 + 链式的结合
    
    适用场景:
    - 部分任务可以并行
    - 部分任务有依赖关系
    
    示例:旅行规划
    1. 并行:同时查询 酒店、景点、天气
    2. 链式:基于查询结果生成行程 → 审核行程 → 输出最终方案
    """
    
    async def run(self, workflow: dict) -> dict:
        # 阶段1:并行执行独立任务
        parallel_results = await self._parallel_phase(workflow["parallel_tasks"])
        
        # 阶段2:基于并行结果执行链式任务
        context = {"parallel": parallel_results}
        chain_result = await self._chain_phase(
            workflow["chain_tasks"], 
            context
        )
        
        return {
            "parallel_results": parallel_results,
            "chain_result": chain_result
        }
    
    async def _parallel_phase(self, tasks: list[dict]) -> dict:
        """并行阶段"""
        results = await asyncio.gather(*[
            self._execute(task) for task in tasks
        ])
        return {task["id"]: result for task, result in zip(tasks, results)}
    
    async def _chain_phase(self, tasks: list[dict], context: dict) -> dict:
        """链式阶段"""
        for task in tasks:
            context[task["id"]] = await self._execute(task, context)
        return context

7.5 状态管理与上下文

状态持久化的重要性

为什么Agent需要特别关注状态管理?

# 问题:LLM的上下文是有限的
MAX_TOKENS = 200000  # Claude Opus的最大上下文

# 如果对话历史越来越长...
messages = [
    {"role": "user", "content": "第一轮对话"},      # 100 tokens
    {"role": "assistant", "content": "第一轮回答"},  # 200 tokens
    {"role": "user", "content": "第二轮对话"},      # 150 tokens
    {"role": "assistant", "content": "第二轮回答"},  # 300 tokens
    # ... 100轮后
    # tokens总量超过限制
]

解决方案:状态压缩与摘要

class StateManager:
    """
    状态管理器
    
    核心功能:
    1. 压缩对话历史
    2. 提取关键信息
    3. 维护工作上下文
    """
    
    def __init__(self, max_history: int = 10):
        self.max_history = max_history  # 保留最近N轮对话
        self.summaries = []            # 压缩后的摘要
        self.working_memory = {}       # 当前工作内存
    
    def add_interaction(self, user_msg: str, assistant_msg: str):
        """添加一轮交互"""
        # 1. 加入历史
        self.summaries.append({
            "user": user_msg,
            "assistant": assistant_msg,
            "timestamp": now()
        })
        
        # 2. 检查是否需要压缩
        if len(self.summaries) > self.max_history:
            self._compress()
    
    def _compress(self):
        """
        压缩历史
        
        压缩策略:
        1. 保留最近N轮完整对话
        2. 更早的对话压缩成摘要
        3. 提取关键实体和决策
        """
        recent = self.summaries[-self.max_history:]
        older = self.summaries[:-self.max_history]
        
        # 用LLM生成摘要
        summary_prompt = f"""
        请总结以下对话的关键信息:
        {self._format_conversation(older)}
        
        提取:
        1. 用户的主要目标
        2. 已完成的关键步骤
        3. 当前状态
        4. 重要的中间结果
        """
        
        summary = self.client.messages.create(
            model="claude-opus-4-20241120",
            messages=[{"role": "user", "content": summary_prompt}]
        )
        
        # 保存摘要,清空旧历史
        self.working_memory["conversation_summary"] = summary.content[0].text
        self.summaries = recent
    
    def get_context_for_llm(self) -> str:
        """构建发送给LLM的上下文"""
        parts = []
        
        # 1. 添加摘要(如果有)
        if "conversation_summary" in self.working_memory:
            parts.append(f"对话摘要:{self.working_memory['conversation_summary']}")
        
        # 2. 添加最近N轮对话
        for item in self.summaries[-self.max_history:]:
            parts.append(f"用户:{item['user']}")
            parts.append(f"助手:{item['assistant']}")
        
        return "\n".join(parts)

上下文窗口的智能管理

class SmartContextManager:
    """
    智能上下文管理器
    
    核心思想:
    - 不是简单截断,而是智能选择
    - 保留与当前任务最相关的上下文
    """
    
    def __init__(self, max_tokens: int = 150000):
        self.max_tokens = max_tokens
        self.priority_levels = {
            "critical": ["目标", "关键约束", "核心决策"],
            "important": ["中间结果", "用户偏好", "当前状态"],
            "normal": ["一般对话", "解释说明"],
            "discardable": ["问候", "重复确认"]
        }
    
    def build_context(self, all_items: list[dict], current_task: str) -> str:
        """
        构建当前任务所需的上下文
        
        算法:
        1. 为每个上下文项计算相关性分数
        2. 按分数排序
        3. 从最高分开始选取,直到达到token限制
        """
        scored_items = []
        
        for item in all_items:
            relevance = self._calculate_relevance(item, current_task)
            priority = self._get_priority(item)
            
            # 综合分数 = 相关性 * 优先级权重
            weight = {"critical": 1.0, "important": 0.7, "normal": 0.4, "discardable": 0.0}
            score = relevance * weight.get(priority, 0.5)
            
            scored_items.append((score, item))
        
        # 按分数降序排序
        scored_items.sort(key=lambda x: x[0], reverse=True)
        
        # 选取项直到达到token限制
        selected = []
        total_tokens = 0
        
        for score, item in scored_items:
            item_tokens = self._estimate_tokens(item)
            if total_tokens + item_tokens <= self.max_tokens:
                selected.append(item)
                total_tokens += item_tokens
        
        return self._format_selected(selected)

7.6 错误处理与容错机制

错误分类与处理策略

class ErrorType(Enum):
    """Agent可能遇到的错误类型"""
    
    # LLM相关错误
    LLM_TIMEOUT = "llm_timeout"           # LLM响应超时
    LLM_RATE_LIMIT = "llm_rate_limit"     # API限流
    LLM_INVALID_RESPONSE = "llm_invalid"  # LLM返回无效响应
    
    # 工具相关错误
    TOOL_NOT_FOUND = "tool_not_found"     # 工具不存在
    TOOL_EXECUTION_FAILED = "tool_failed" # 工具执行失败
    TOOL_TIMEOUT = "tool_timeout"        # 工具执行超时
    
    # 状态相关错误
    STATE_CORRUPTED = "state_corrupted"  # 状态损坏
    CONTEXT_OVERFLOW = "context_overflow" # 上下文溢出
    
    # 业务相关错误
    MAX_ITERATIONS = "max_iterations"    # 达到最大迭代
    USER_CANCELLED = "user_cancelled"    # 用户取消

class ErrorHandler:
    """
    错误处理器
    
    设计原则:
    1. 不同错误类型采用不同的处理策略
    2. 记录错误日志便于调试
    3. 在适当时候回退或重试
    """
    
    def __init__(self):
        self.error_counts = {}
    
    async def handle(self, error: Exception, state: AgentState) -> ErrorAction:
        """
        处理错误,返回处理动作
        """
        error_type = self._classify(error)
        
        # 记录错误
        self._log_error(error_type, error, state)
        
        # 根据错误类型决定动作
        handlers = {
            ErrorType.LLM_TIMEOUT: self._handle_timeout,
            ErrorType.LLM_RATE_LIMIT: self._handle_rate_limit,
            ErrorType.TOOL_EXECUTION_FAILED: self._handle_tool_failure,
            ErrorType.CONTEXT_OVERFLOW: self._handle_context_overflow,
            ErrorType.MAX_ITERATIONS: self._handle_max_iterations,
        }
        
        handler = handlers.get(error_type, self._handle_unknown)
        return await handler(error, state)
    
    async def _handle_timeout(self, error, state):
        """超时处理:等待后重试"""
        state.context["retry_count"] = state.context.get("retry_count", 0) + 1
        
        if state.context["retry_count"] < 3:
            await asyncio.sleep(2 ** state.context["retry_count"])  # 指数退避
            return ErrorAction.RETRY
        else:
            return ErrorAction.FAIL
    
    async def _handle_rate_limit(self, error, state):
        """限流处理:等待指定时间"""
        retry_after = getattr(error, "retry_after", 60)
        await asyncio.sleep(retry_after)
        return ErrorAction.RETRY
    
    async def _handle_tool_failure(self, error, state):
        """工具失败处理:尝试备用方案"""
        tool_name = getattr(error, "tool_name", None)
        
        # 检查是否有备用工具
        if self._has_fallback(tool_name):
            state.context["using_fallback"] = True
            return ErrorAction.RETRY_WITH_FALLBACK
        
        return ErrorAction.FAIL
    
    async def _handle_context_overflow(self, error, state):
        """上下文溢出处理:压缩历史"""
        # 调用状态管理器压缩历史
        state_manager = state.context.get("state_manager")
        if state_manager:
            state_manager._compress()
            return ErrorAction.RETRY
        else:
            return ErrorAction.FAIL
    
    async def _handle_max_iterations(self, error, state):
        """达到最大迭代:返回当前最佳结果"""
        return ErrorAction.RETURN_BEST_RESULT

重试与回退机制

class RetryPolicy:
    """
    重试策略
    
    为什么需要重试策略?
    1. 瞬时故障(网络抖动)可能自行恢复
    2. 限流错误等待后通常可以继续
    3. 合理重试能提高系统稳定性
    """
    
    def __init__(
        self,
        max_retries: int = 3,
        base_delay: float = 1.0,
        exponential_base: float = 2.0,
        max_delay: float = 60.0,
        jitter: bool = True
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.exponential_base = exponential_base
        self.max_delay = max_delay
        self.jitter = jitter
    
    def get_delay(self, attempt: int) -> float:
        """计算重试延迟"""
        # 指数退避
        delay = self.base_delay * (self.exponential_base ** attempt)
        delay = min(delay, self.max_delay)
        
        # 添加随机抖动,避免惊群效应
        if self.jitter:
            import random
            delay = delay * (0.5 + random.random())
        
        return delay

class FallbackManager:
    """
    备用方案管理器
    
    核心思想:
    - 每个主工具可以有多个备用工具
    - 主工具失败时,自动尝试备用工具
    - 记录使用情况,便于优化
    """
    
    def __init__(self):
        self.fallback_map = {
            "primary_search": ["fallback_search_1", "fallback_search_2"],
            "primary_translate": ["fallback_translate"],
            "primary_code_exec": ["fallback_sandbox"]
        }
        self.usage_stats = {}
    
    async def execute_with_fallback(self, primary_tool: str, args: dict) -> Any:
        """执行主工具,失败时尝试备用方案"""
        tools_to_try = [primary_tool] + self.fallback_map.get(primary_tool, [])
        
        last_error = None
        for tool in tools_to_try:
            try:
                result = await self._execute_tool(tool, args)
                self._record_success(tool)
                return result
            except Exception as e:
                last_error = e
                self._record_failure(tool, e)
                continue
        
        # 所有工具都失败
        raise AllToolsFailedError(tools_to_try, last_error)

7.7 安全与权限管理

权限模型设计

class PermissionScope(Enum):
    """权限范围"""
    NONE = "none"           # 无任何权限
    READ = "read"           # 只读
    WRITE = "write"         # 读写
    EXECUTE = "execute"     # 执行
    ADMIN = "admin"         # 管理

class Permission:
    """
    权限定义
    
    为什么需要权限系统?
    1. 防止Agent执行危险操作
    2. 限制资源访问范围
    3. 满足合规审计要求
    """
    
    def __init__(
        self,
        file_paths: list[str] = [],      # 允许访问的文件路径
        allowed_tools: list[str] = [],   # 允许使用的工具
        allowed_domains: list[str] = [], # 允许访问的网络域名
        max_execution_time: int = 300,   # 最大执行时间(秒)
        max_api_calls: int = 100         # 最大API调用次数
    ):
        self.file_paths = file_paths
        self.allowed_tools = allowed_tools
        self.allowed_domains = allowed_domains
        self.max_execution_time = max_execution_time
        self.max_api_calls = max_api_calls

class SecurityManager:
    """
    安全管理器
    
    核心功能:
    1. 验证操作权限
    2. 拦截危险操作
    3. 记录所有操作日志
    """
    
    def __init__(self, permission: Permission):
        self.permission = permission
        self.audit_log = []
    
    def check_file_access(self, path: str, mode: str) -> bool:
        """检查文件访问权限"""
        # 如果是只读权限但要求写操作
        if mode == "write" and PermissionScope.WRITE not in self.permission.scopes:
            return False
        
        # 检查路径是否在允许范围内
        import os
        real_path = os.path.realpath(path)
        
        for allowed_path in self.permission.file_paths:
            if real_path.startswith(os.path.realpath(allowed_path)):
                return True
        
        return False
    
    def check_tool_usage(self, tool_name: str) -> bool:
        """检查工具使用权限"""
        return tool_name in self.permission.allowed_tools
    
    def check_network_access(self, domain: str) -> bool:
        """检查网络访问权限"""
        for allowed in self.permission.allowed_domains:
            if domain.endswith(allowed) or domain == allowed:
                return True
        return False
    
    def audit(self, operation: str, details: dict):
        """记录审计日志"""
        self.audit_log.append({
            "timestamp": now(),
            "operation": operation,
            "details": details
        })

沙箱隔离

class SandboxConfig:
    """
    沙箱配置
    
    为什么需要沙箱?
    即使Agent出错,也只会影响沙箱内的模拟环境
    不会影响真实系统
    """
    
    def __init__(
        self,
        use_sandbox: bool = True,
        network_isolation: bool = True,
        filesystem_boundary: str = "/workspace/sandbox",
        memory_limit: str = "2GB"
    ):
        self.use_sandbox = use_sandbox
        self.network_isolation = network_isolation
        self.filesystem_boundary = filesystem_boundary
        self.memory_limit = memory_limit

class SandboxExecutor:
    """沙箱执行器"""
    
    def __init__(self, config: SandboxConfig):
        self.config = config
    
    async def execute(self, code: str, language: str) -> Any:
        """在沙箱中执行代码"""
        if self.config.use_sandbox:
            return await self._execute_in_sandbox(code, language)
        else:
            return await self._execute_direct(code, language)
    
    async def _execute_in_sandbox(self, code: str, language: str) -> Any:
        """在沙箱中执行"""
        # 1. 准备沙箱环境
        sandbox = await self._prepare_sandbox()
        
        # 2. 设置资源限制
        sandbox.set_memory_limit(self.config.memory_limit)
        sandbox.set_network隔离(self.config.network_isolation)
        sandbox.set_filesystem_boundary(self.config.filesystem_boundary)
        
        # 3. 执行代码
        try:
            result = await sandbox.run(code, language)
            return result
        finally:
            # 4. 清理沙箱
            await sandbox.cleanup()

7.8 生产环境最佳实践

架构设计原则

# 生产环境Agent系统架构原则

PRINCIPLES = """
1. 分离关注点(Separation of Concerns)
   - Agent核心逻辑与工具实现分离
   - 状态管理与执行逻辑分离
   - 安全检查与业务逻辑分离

2. 失败设计(Design for Failure)
   - 每个组件都可能失败
   - 优雅降级,而非整体崩溃
   - 快速失败,便于诊断

3. 可观测性(Observability)
   - 日志:记录每个关键步骤
   - 指标:QPS、延迟、错误率
   - 追踪:请求全链路追踪

4. 资源管理(4. 资源管理(Resource Management)
   - 限制并发请求数
   - 控制内存使用
   - 防止资源泄漏

5. 安全第一(Security First)
   - 最小权限原则
   - 纵深防御
   - 审计追踪
"""

监控与告警

class AgentMonitor:
    """
    Agent监控指标
    """
    
    metrics = {
        "requests_total": "总请求数",
        "requests_success": "成功请求数",
        "requests_failed": "失败请求数",
        "average_latency": "平均延迟",
        "p99_latency": "P99延迟",
        "active_agents": "活跃Agent数",
        "tools_usage": "工具使用统计",
        "error_distribution": "错误分布"
    }

class AlertManager:
    """
    告警管理
    """
    
    alert_rules = {
        "high_error_rate": {
            "condition": "error_rate > 0.05",  # 错误率超过5%
            "severity": "critical",
            "action": "notify_oncall"
        },
        "high_latency": {
            "condition": "p99_latency > 30s",
            "severity": "warning",
            "action": "notify_team"
        },
        "agent_timeout": {
            "condition": "timeout_count > 10/min",
            "severity": "warning",
            "action": "investigate"
        }
    }

部署架构建议

┌─────────────────────────────────────────────────────────────────┐
│                        负载均衡层                               │
│                    (Nginx / 云负载均衡)                        │
└─────────────────────────────┬───────────────────────────────────┘
                              │
┌─────────────────────────────▼───────────────────────────────────┐
│                        API网关层                                │
│              (认证、限流、日志、路由)                           │
└─────────────────────────────┬───────────────────────────────────┘
                              │
┌─────────────────────────────▼───────────────────────────────────┐
│                      Agent服务集群                               │
│  ┌────────────┐  ┌────────────┐  ┌────────────┐  ┌────────────┐│
│  │  Agent-1   │  │  Agent-2   │  │  Agent-3   │  │  Agent-N   ││
│  │  (实例1)    │  │  (实例2)    │  │  (实例3)    │  │  (实例N)    ││
│  └────────────┘  └────────────┘  └────────────┘  └────────────┘│
└─────────────────────────────┬───────────────────────────────────┘
                              │
          ┌───────────────────┼───────────────────┐
          │                   │                   │
┌─────────▼─────┐   ┌────────▼────────┐   ┌─────▼─────────┐
│   工具服务1    │   │    工具服务2     │   │    工具服务3    │
│  (搜索服务)    │   │   (代码执行)     │   │   (API服务)    │
└───────────────┘   └─────────────────┘   └───────────────┘

性能优化建议

优化项方法效果
缓存LLM响应对相同输入缓存响应减少API调用
并行工具调用不依赖结果的工具并行执行降低延迟
状态压缩对话历史压缩摘要减少token消耗
预热机制定期预加载模型降低冷启动延迟
连接池复用复用HTTP/数据库连接提高吞吐

本章总结

核心知识点

知识点掌握程度关键点
Agent定义⭐⭐⭐⭐⭐LLM+状态+工具+执行循环+终止条件
单Agent架构⭐⭐⭐⭐⭐状态外部化、终止条件前置
多Agent协作⭐⭐⭐⭐⭐星型/链式/网状架构
执行模式⭐⭐⭐⭐⭐并行与链式混合
状态管理⭐⭐⭐⭐⭐压缩、摘要、智能选择
错误处理⭐⭐⭐⭐⭐分类处理、重试回退
安全机制⭐⭐⭐⭐权限模型、沙箱隔离
生产实践⭐⭐⭐⭐⭐监控、告警、优化

Claude API七篇完整系列

恭喜你!已完成Claude API全系列的学习:

主题核心要点
API基础认证、请求、会话、结构化输出
提示词工程Few-shot、CoT、Temperature
工具调用Function Calling、MCP
RAG系统分块、嵌入、搜索、重排序
MCP协议架构、服务器开发、客户端
Computer Use观察-决策-执行、安全机制
Agent架构多Agent、状态管理、生产实践

下一步

  • 实践项目:用Agent架构构建一个自动化助手
  • 深入研究:MCP协议与Agent的结合
  • 参考资料:Anthropic Agent文档

文档元信息 难度:⭐⭐⭐⭐ | 类型:专家设计 | 更新日期:2026-03-25 | 预计阅读时间:60分钟 | 字数:约8000字