Max Team — Detailed Design

个人工作团队多 Agent 协作系统 — 详细设计文档
版本 v0.1 语言 Go 1.22+ LLM Claude API 2026-05-21

01项目结构

max-team/
├── cmd/
│ └── max-team/
│ └── main.go // 入口:解析参数、加载配置、启动 Runtime
├── internal/
│ ├── agent/
│ │ ├── agent.go // Agent 接口 + BaseAgent 基类
│ │ ├── max.go // Max: Team Lead 实现
│ │ ├── leo.go // Leo: Coder 实现
│ │ ├── ada.go // Ada: Architect 实现
│ │ ├── ray.go // Ray: Tester 实现
│ │ └── sam.go // Sam: Ops 实现
│ ├── runtime/
│ │ └── runtime.go // Agent 生命周期管理
│ ├── scheduler/
│ │ ├── scheduler.go // DAG 调度器
│ │ └── dag.go // DAG 数据结构 + 拓扑排序
│ ├── task/
│ │ ├── task.go // Task 结构体 + 状态机
│ │ └── manager.go // Task CRUD + 持久化
│ ├── bus/
│ │ └── bus.go // 消息总线
│ ├── llm/
│ │ ├── router.go // 模型路由
│ │ └── claude.go // Claude API 客户端封装
│ ├── tool/
│ │ ├── registry.go // 工具注册中心
│ │ ├── file.go // read_file / write_file / list_dir
│ │ ├── shell.go // run_shell (白名单)
│ │ ├── git.go // git_diff / git_commit / git_status
│ │ ├── test.go // run_test / coverage_report
│ │ ├── search.go // search_code (grep/ast)
│ │ └── ops.go // query_log / query_metrics
│ ├── mcp/
│ │ ├── client.go // MCP 客户端
│ │ └── protocol.go // JSON-RPC 协议类型
│ ├── gate/
│ │ └── approval.go // 用户审批门控
│ └── audit/
│ └── log.go // 审计日志
├── config/
│ └── agents.yaml // Agent 角色配置
├── docs/
│ ├── overview-design.html
│ └── detailed-design.html
├── go.mod
└── README.md

02配置系统

2.1 全局配置 config.yaml

server:
  port: 8080                          # Web UI 端口(Phase 3)
  mode: cli                           # cli | web

llm:
  provider: anthropic
  api_key: ${ANTHROPIC_API_KEY}       # 环境变量引用
  base_url: https://api.anthropic.com
  default_model: claude-sonnet-4-6
  max_retries: 3
  timeout: 120s

audit:
  enabled: true
  output: logs/audit.jsonl            # 结构化日志文件
  level: info                         # debug | info | warn | error

project:
  root: .                             # 项目根目录
  sandbox: true                       # 是否开启目录沙箱

2.2 Agent 配置 config/agents.yaml

agents:
  - name: max
    role: team_lead
    model: claude-sonnet-4-6
    system_prompt: |
      你是 Max,一个专业的项目经理和私人助理。
      你的职责是理解用户需求、拆解任务、制定执行方案、协调团队成员、追踪进度。
      你不直接操作文件或执行命令,而是通过派发任务给团队成员来完成工作。
      关键决策必须先征得用户确认。
    tools:
      - task_create
      - task_update
      - task_query
      - dispatch_task
      - ask_user
    mcp_servers: []
    constraints:
      max_tokens: 4096
      timeout: 60s

  - name: leo
    role: coder
    model: claude-opus-4-7
    system_prompt: |
      你是 Leo,一个资深 Go 开发工程师。
      你的职责是编写高质量的 Go 代码、修复 Bug、进行代码重构。
      你遵循 Go 的最佳实践,代码简洁、可读、安全。
      完成任务后提交代码变更(不自行 push)。
    tools:
      - read_file
      - write_file
      - list_dir
      - run_shell
      - git_diff
      - git_commit
      - git_status
      - search_code
    mcp_servers:
      - name: gitnexus
        command: npx
        args: ["gitnexus-mcp"]
    constraints:
      allowed_dirs:
        - "."
      blocked_patterns:
        - "*.env"
        - "*credentials*"
        - "*secret*"
      allowed_commands:
        - "go build"
        - "go test"
        - "go vet"
        - "go fmt"
        - "go mod tidy"
        - "git diff"
        - "git status"
        - "git log"
        - "grep"
        - "find"
      max_tokens: 8192
      timeout: 300s

  - name: ada
    role: architect
    model: claude-opus-4-7
    system_prompt: |
      你是 Ada,一个资深架构师。
      你的职责是设计技术方案、评审架构、进行技术选型、输出设计文档。
      你有丰富的系统设计经验,善于权衡取舍。
      你只读代码分析,不修改任何文件。
    tools:
      - read_file
      - list_dir
      - search_code
    mcp_servers:
      - name: gitnexus
        command: npx
        args: ["gitnexus-mcp"]
      - name: cooper
        command: npx
        args: ["cooper-mcp"]
    constraints:
      allowed_dirs:
        - "."
      max_tokens: 8192
      timeout: 180s

  - name: ray
    role: tester
    model: claude-sonnet-4-6
    system_prompt: |
      你是 Ray,一个测试工程师。
      你的职责是编写单元测试和集成测试、执行测试、分析覆盖率。
      你只在测试文件(*_test.go)中写入代码。
      你使用 Go 标准测试框架和 testify。
    tools:
      - read_file
      - write_file
      - list_dir
      - run_test
      - coverage_report
    mcp_servers: []
    constraints:
      allowed_dirs:
        - "."
      write_patterns:
        - "*_test.go"
      max_tokens: 4096
      timeout: 180s

  - name: sam
    role: ops
    model: claude-haiku-4-5
    system_prompt: |
      你是 Sam,一个运维工程师。
      你的职责是查询日志、检查监控指标、协助问题排查。
      你不修改任何文件,只做查询和分析。
    tools:
      - query_log
      - query_metrics
    mcp_servers:
      - name: grafana
        command: npx
        args: ["grafana-mcp"]
    constraints:
      max_tokens: 2048
      timeout: 60s

2.3 配置加载

package config

type Config struct {
    Server  ServerConfig  `yaml:"server"`
    LLM     LLMConfig     `yaml:"llm"`
    Audit   AuditConfig   `yaml:"audit"`
    Project ProjectConfig `yaml:"project"`
}

type AgentsConfig struct {
    Agents []AgentConfig `yaml:"agents"`
}

type AgentConfig struct {
    Name         string            `yaml:"name"`
    Role         string            `yaml:"role"`
    Model        string            `yaml:"model"`
    SystemPrompt string            `yaml:"system_prompt"`
    Tools        []string          `yaml:"tools"`
    MCPServers   []MCPServerConfig `yaml:"mcp_servers"`
    Constraints  Constraints       `yaml:"constraints"`
}

type Constraints struct {
    AllowedDirs     []string      `yaml:"allowed_dirs"`
    BlockedPatterns []string      `yaml:"blocked_patterns"`
    WritePatterns   []string      `yaml:"write_patterns"`
    AllowedCommands []string      `yaml:"allowed_commands"`
    MaxTokens       int           `yaml:"max_tokens"`
    Timeout         time.Duration `yaml:"timeout"`
}

type MCPServerConfig struct {
    Name    string            `yaml:"name"`
    Command string            `yaml:"command"`
    Args    []string          `yaml:"args"`
    Env     map[string]string `yaml:"env,omitempty"`
}

func Load(configPath, agentsPath string) (*Config, *AgentsConfig, error) {
    // 1. 读取 YAML
    // 2. 展开环境变量 ${VAR}
    // 3. 校验必填字段
    // 4. 设置默认值
}

03核心模块详细设计

3.1 Agent Runtime

管理所有 Agent 的生命周期,负责创建、启动、监控和停止。

package runtime

type Runtime struct {
    config     *config.Config
    agents     map[string]agent.Agent   // name → agent
    bus        *bus.MessageBus
    scheduler  *scheduler.Scheduler
    gate       *gate.ApprovalGate
    auditLog   *audit.Logger
    ctx        context.Context
    cancel     context.CancelFunc
    wg         sync.WaitGroup
}

// New 创建 Runtime 实例
func New(cfg *config.Config, agentsCfg *config.AgentsConfig) (*Runtime, error) {
    r := &Runtime{
        config: cfg,
        agents: make(map[string]agent.Agent),
    }
    r.ctx, r.cancel = context.WithCancel(context.Background())

    // 初始化基础设施
    r.bus = bus.New()
    r.auditLog = audit.New(cfg.Audit)
    r.gate = gate.New(r.bus)

    // 创建 LLM Router
    router := llm.NewRouter(cfg.LLM)

    // 创建 Tool Registry
    registry := tool.NewRegistry(cfg.Project)

    // 根据配置创建各 Agent
    for _, ac := range agentsCfg.Agents {
        a, err := agent.New(ac, router, registry, r.bus, r.auditLog)
        if err != nil {
            return nil, fmt.Errorf("create agent %s: %w", ac.Name, err)
        }
        r.agents[ac.Name] = a
    }

    // 创建 Scheduler
    taskMgr := task.NewManager()
    r.scheduler = scheduler.New(taskMgr, r.agents, r.bus, r.gate)

    return r, nil
}

// Start 启动所有 Agent 和调度器
func (r *Runtime) Start() error {
    // 1. 启动消息总线
    r.bus.Start(r.ctx)

    // 2. 启动各 Agent(每个 Agent 一个 goroutine)
    for name, a := range r.agents {
        r.wg.Add(1)
        go func(name string, a agent.Agent) {
            defer r.wg.Done()
            if err := a.Start(r.ctx); err != nil {
                r.auditLog.Error("agent_failed", "name", name, "err", err)
            }
        }(name, a)
    }

    // 3. 启动调度器
    r.wg.Add(1)
    go func() {
        defer r.wg.Done()
        r.scheduler.Run(r.ctx)
    }()

    return nil
}

// Stop 优雅关闭
func (r *Runtime) Stop() {
    r.cancel()
    r.wg.Wait()
    r.auditLog.Close()
}

// Run 主循环:读取用户输入 → 发送给 Max
func (r *Runtime) Run() error {
    if err := r.Start(); err != nil {
        return err
    }
    defer r.Stop()

    max := r.agents["max"]
    scanner := bufio.NewScanner(os.Stdin)

    fmt.Println("Max Team ready. Type your request:")
    for scanner.Scan() {
        input := scanner.Text()
        if input == "/quit" {
            break
        }
        // 用户输入 → 发送给 Max
        r.bus.Send(bus.Message{
            From:    "user",
            To:      "max",
            Type:    bus.MsgUserInput,
            Payload: input,
        })
        // 等待 Max 回复
        reply := r.bus.WaitForReply("max", "user")
        fmt.Println("\n" + reply.Payload.(string))
    }
    return nil
}

生命周期状态

状态说明转换条件
CreatedAgent 已创建,未启动New() 调用后
Starting初始化中(连接 MCP 等)Start() 调用
Ready等待任务初始化完成
Busy正在执行任务收到 TaskAssign
Stopped已停止Stop() 或 ctx 取消

3.2 Message Bus

基于 Go channel 的星型消息总线,所有消息经由中心路由。

package bus

type MessageType int

const (
    MsgUserInput    MessageType = iota  // 用户输入
    MsgUserReply                        // 回复用户
    MsgTaskAssign                       // 分配任务
    MsgTaskResult                       // 任务结果
    MsgTaskFailed                       // 任务失败
    MsgNeedClarify                      // 需要澄清
    MsgProgress                         // 进度更新
    MsgApprovalReq                      // 请求审批
    MsgApprovalResp                     // 审批结果
    MsgShutdown                         // 关闭信号
)

type Message struct {
    ID        string          // 消息唯一 ID
    From      string          // 发送方 agent name
    To        string          // 接收方 agent name
    Type      MessageType
    Payload   any             // 消息内容
    ReplyTo   string          // 关联的请求消息 ID(用于请求-响应配对)
    Timestamp time.Time
}

type MessageBus struct {
    mu       sync.RWMutex
    subs     map[string]chan Message   // agent name → inbox channel
    pending  map[string]chan Message   // 等待回复的 channel(按消息 ID)
    audit    *audit.Logger
}

func New() *MessageBus {
    return &MessageBus{
        subs:    make(map[string]chan Message),
        pending: make(map[string]chan Message),
    }
}

// Subscribe 注册一个 agent 的收件箱
func (b *MessageBus) Subscribe(name string) <-chan Message {
    b.mu.Lock()
    defer b.mu.Unlock()
    ch := make(chan Message, 32)     // 缓冲区防止阻塞
    b.subs[name] = ch
    return ch
}

// Send 发送消息到目标 agent
func (b *MessageBus) Send(msg Message) {
    if msg.ID == "" {
        msg.ID = uuid.New().String()
    }
    msg.Timestamp = time.Now()

    b.mu.RLock()
    ch, ok := b.subs[msg.To]
    b.mu.RUnlock()

    if !ok {
        b.audit.Warn("msg_dropped", "to", msg.To, "reason", "no subscriber")
        return
    }

    // 非阻塞发送,防止死锁
    select {
    case ch <- msg:
        b.audit.Debug("msg_sent", "from", msg.From, "to", msg.To, "type", msg.Type)
    default:
        b.audit.Warn("msg_dropped", "to", msg.To, "reason", "inbox full")
    }

    // 如果有人在等这个消息的回复
    if msg.ReplyTo != "" {
        b.mu.RLock()
        replyCh, ok := b.pending[msg.ReplyTo]
        b.mu.RUnlock()
        if ok {
            replyCh <- msg
        }
    }
}

// SendAndWait 发送消息并等待回复(同步调用)
func (b *MessageBus) SendAndWait(msg Message, timeout time.Duration) (Message, error) {
    replyCh := make(chan Message, 1)
    b.mu.Lock()
    b.pending[msg.ID] = replyCh
    b.mu.Unlock()

    defer func() {
        b.mu.Lock()
        delete(b.pending, msg.ID)
        b.mu.Unlock()
    }()

    b.Send(msg)

    select {
    case reply := <-replyCh:
        return reply, nil
    case <-time.After(timeout):
        return Message{}, fmt.Errorf("timeout waiting for reply to %s", msg.ID)
    }
}

3.3 Task Manager

管理任务的创建、查询、状态流转和持久化。

package task

type Status int

const (
    StatusCreated    Status = iota
    StatusPlanned
    StatusApproved
    StatusDispatched
    StatusRunning
    StatusDone
    StatusFailed
)

type Task struct {
    ID          string      `json:"id"`
    Title       string      `json:"title"`
    Description string      `json:"description"`
    AgentName   string      `json:"agent_name"`    // 目标 agent: "leo", "ada", "ray", "sam"
    DependsOn   []string    `json:"depends_on"`    // 前置任务 ID
    Artifacts   []Artifact  `json:"artifacts"`     // 上游产出物
    Status      Status      `json:"status"`
    Result      *Result     `json:"result,omitempty"`
    RetryCount  int         `json:"retry_count"`
    MaxRetries  int         `json:"max_retries"`
    CreatedAt   time.Time   `json:"created_at"`
    UpdatedAt   time.Time   `json:"updated_at"`
    StartedAt   *time.Time  `json:"started_at,omitempty"`
    FinishedAt  *time.Time  `json:"finished_at,omitempty"`
}

type Artifact struct {
    Name    string `json:"name"`     // "design_doc", "code_diff", "test_report"
    Type    string `json:"type"`     // "text", "file_path", "json"
    Content string `json:"content"`
}

type Result struct {
    Success   bool       `json:"success"`
    Output    string     `json:"output"`
    Artifacts []Artifact `json:"artifacts,omitempty"`
    Error     string     `json:"error,omitempty"`
}

// 状态转换合法性校验
var validTransitions = map[Status][]Status{
    StatusCreated:    {StatusPlanned},
    StatusPlanned:    {StatusApproved},
    StatusApproved:   {StatusDispatched},
    StatusDispatched: {StatusRunning},
    StatusRunning:    {StatusDone, StatusFailed},
    StatusFailed:     {StatusDispatched},          // 重试
}

type Manager struct {
    mu    sync.RWMutex
    tasks map[string]*Task
    // TODO: Phase 2 → SQLite 持久化
}

func NewManager() *Manager {
    return &Manager{tasks: make(map[string]*Task)}
}

func (m *Manager) Create(t *Task) error {
    m.mu.Lock()
    defer m.mu.Unlock()
    t.ID = uuid.New().String()[:8]
    t.Status = StatusCreated
    t.CreatedAt = time.Now()
    t.UpdatedAt = time.Now()
    t.MaxRetries = 2
    m.tasks[t.ID] = t
    return nil
}

func (m *Manager) Transition(taskID string, to Status) error {
    m.mu.Lock()
    defer m.mu.Unlock()
    t, ok := m.tasks[taskID]
    if !ok {
        return fmt.Errorf("task %s not found", taskID)
    }
    // 校验状态转换合法性
    allowed := validTransitions[t.Status]
    valid := false
    for _, s := range allowed {
        if s == to {
            valid = true
            break
        }
    }
    if !valid {
        return fmt.Errorf("invalid transition: %v → %v", t.Status, to)
    }
    t.Status = to
    t.UpdatedAt = time.Now()
    if to == StatusRunning {
        now := time.Now()
        t.StartedAt = &now
    }
    if to == StatusDone || to == StatusFailed {
        now := time.Now()
        t.FinishedAt = &now
    }
    return nil
}

// ReadyToDispatch 返回所有前置任务已完成且状态为 Approved 的任务
func (m *Manager) ReadyToDispatch() []*Task {
    m.mu.RLock()
    defer m.mu.RUnlock()
    var ready []*Task
    for _, t := range m.tasks {
        if t.Status != StatusApproved {
            continue
        }
        allDone := true
        for _, depID := range t.DependsOn {
            dep := m.tasks[depID]
            if dep == nil || dep.Status != StatusDone {
                allDone = false
                break
            }
        }
        if allDone {
            ready = append(ready, t)
        }
    }
    return ready
}

// InjectArtifacts 将上游任务的产出物注入下游任务
func (m *Manager) InjectArtifacts(taskID string) {
    m.mu.Lock()
    defer m.mu.Unlock()
    t := m.tasks[taskID]
    if t == nil {
        return
    }
    for _, depID := range t.DependsOn {
        dep := m.tasks[depID]
        if dep != nil && dep.Result != nil {
            t.Artifacts = append(t.Artifacts, dep.Result.Artifacts...)
        }
    }
}

3.4 Scheduler (DAG 调度器)

解析任务依赖图,按拓扑序调度。无依赖的任务并行派发。

package scheduler

type Scheduler struct {
    taskMgr  *task.Manager
    agents   map[string]agent.Agent
    bus      *bus.MessageBus
    gate     *gate.ApprovalGate
    audit    *audit.Logger
    inbox    <-chan bus.Message
}

func New(tm *task.Manager, agents map[string]agent.Agent,
    b *bus.MessageBus, g *gate.ApprovalGate) *Scheduler {
    s := &Scheduler{
        taskMgr: tm,
        agents:  agents,
        bus:     b,
        gate:    g,
    }
    s.inbox = b.Subscribe("scheduler")
    return s
}

// SubmitPlan Max 提交执行计划(一组 Task 构成的 DAG)
func (s *Scheduler) SubmitPlan(tasks []*task.Task) error {
    // 1. 验证 DAG 无环
    if err := s.validateDAG(tasks); err != nil {
        return fmt.Errorf("invalid DAG: %w", err)
    }

    // 2. 所有任务设为 Planned
    for _, t := range tasks {
        s.taskMgr.Create(t)
        s.taskMgr.Transition(t.ID, task.StatusPlanned)
    }

    // 3. 请求用户审批
    approved, err := s.gate.RequestApproval(tasks)
    if err != nil || !approved {
        return fmt.Errorf("plan rejected by user")
    }

    // 4. 审批通过,所有任务设为 Approved
    for _, t := range tasks {
        s.taskMgr.Transition(t.ID, task.StatusApproved)
    }

    // 5. 派发就绪任务
    s.dispatchReady()
    return nil
}

// dispatchReady 派发所有前置完成的任务
func (s *Scheduler) dispatchReady() {
    ready := s.taskMgr.ReadyToDispatch()
    for _, t := range ready {
        // 注入上游 Artifact
        s.taskMgr.InjectArtifacts(t.ID)
        s.taskMgr.Transition(t.ID, task.StatusDispatched)

        // 通过消息总线分配给目标 Agent
        s.bus.Send(bus.Message{
            From:    "scheduler",
            To:      t.AgentName,
            Type:    bus.MsgTaskAssign,
            Payload: t,
        })
    }
}

// Run 调度器主循环
func (s *Scheduler) Run(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case msg := <-s.inbox:
            switch msg.Type {
            case bus.MsgTaskResult:
                s.handleTaskResult(msg)
            case bus.MsgTaskFailed:
                s.handleTaskFailed(msg)
            }
        }
    }
}

func (s *Scheduler) handleTaskResult(msg bus.Message) {
    result := msg.Payload.(*task.Result)
    taskID := result.TaskID

    s.taskMgr.SetResult(taskID, result)
    s.taskMgr.Transition(taskID, task.StatusDone)

    // 检查是否触发下游任务
    s.dispatchReady()

    // 检查是否全部完成
    if s.taskMgr.AllDone() {
        s.bus.Send(bus.Message{
            From: "scheduler",
            To:   "max",
            Type: bus.MsgAllTasksDone,
        })
    }
}

func (s *Scheduler) handleTaskFailed(msg bus.Message) {
    taskID := msg.Payload.(string)
    t := s.taskMgr.Get(taskID)

    if t.RetryCount < t.MaxRetries {
        // 自动重试
        t.RetryCount++
        s.taskMgr.Transition(taskID, task.StatusDispatched)
        s.bus.Send(bus.Message{
            From:    "scheduler",
            To:      t.AgentName,
            Type:    bus.MsgTaskAssign,
            Payload: t,
        })
    } else {
        // 超过重试上限,通知 Max 决策
        s.bus.Send(bus.Message{
            From:    "scheduler",
            To:      "max",
            Type:    bus.MsgTaskFailed,
            Payload: t,
        })
    }
}

// validateDAG 拓扑排序检测环
func (s *Scheduler) validateDAG(tasks []*task.Task) error {
    // Kahn's algorithm
    inDegree := make(map[string]int)
    graph := make(map[string][]string)

    for _, t := range tasks {
        inDegree[t.ID] = len(t.DependsOn)
        for _, dep := range t.DependsOn {
            graph[dep] = append(graph[dep], t.ID)
        }
    }

    var queue []string
    for _, t := range tasks {
        if inDegree[t.ID] == 0 {
            queue = append(queue, t.ID)
        }
    }

    visited := 0
    for len(queue) > 0 {
        node := queue[0]
        queue = queue[1:]
        visited++
        for _, next := range graph[node] {
            inDegree[next]--
            if inDegree[next] == 0 {
                queue = append(queue, next)
            }
        }
    }

    if visited != len(tasks) {
        return fmt.Errorf("cycle detected in task dependencies")
    }
    return nil
}

3.5 LLM Router

封装 Claude API 调用,支持多模型路由和 tool use 循环。

package llm

type Router struct {
    client  *anthropic.Client
    config  config.LLMConfig
    audit   *audit.Logger
}

func NewRouter(cfg config.LLMConfig) *Router {
    client := anthropic.NewClient(cfg.APIKey)
    return &Router{client: client, config: cfg}
}

type ChatRequest struct {
    Model        string
    System       string
    Messages     []anthropic.MessageParam
    Tools        []anthropic.ToolParam
    MaxTokens    int
}

type ChatResponse struct {
    Content   string
    ToolCalls []ToolCall
    StopReason string
    Usage     Usage
}

type ToolCall struct {
    ID     string
    Name   string
    Input  json.RawMessage
}

type Usage struct {
    InputTokens  int
    OutputTokens int
    CacheRead    int
    CacheWrite   int
}

// Chat 单次 LLM 调用
func (r *Router) Chat(ctx context.Context, req ChatRequest) (*ChatResponse, error) {
    model := req.Model
    if model == "" {
        model = r.config.DefaultModel
    }

    resp, err := r.client.Messages.New(ctx, anthropic.MessageNewParams{
        Model:     model,
        System:    anthropic.NewTextBlock(req.System),
        Messages:  req.Messages,
        Tools:     req.Tools,
        MaxTokens: int64(req.MaxTokens),
    })
    if err != nil {
        return nil, fmt.Errorf("claude API error: %w", err)
    }

    return r.parseResponse(resp), nil
}

// RunToolLoop Agent 的核心循环:LLM 调用 → 工具执行 → 结果回传 → 继续
func (r *Router) RunToolLoop(ctx context.Context, req ChatRequest,
    executor func(ToolCall) (string, error)) (*ChatResponse, error) {

    messages := req.Messages

    for {
        resp, err := r.Chat(ctx, ChatRequest{
            Model:     req.Model,
            System:    req.System,
            Messages:  messages,
            Tools:     req.Tools,
            MaxTokens: req.MaxTokens,
        })
        if err != nil {
            return nil, err
        }

        // 没有 tool call,返回最终结果
        if len(resp.ToolCalls) == 0 || resp.StopReason == "end_turn" {
            return resp, nil
        }

        // 执行所有 tool calls
        assistantMsg := anthropic.NewAssistantMessage(resp.rawContent...)
        messages = append(messages, assistantMsg)

        var toolResults []anthropic.ContentBlockParam
        for _, tc := range resp.ToolCalls {
            result, err := executor(tc)
            if err != nil {
                toolResults = append(toolResults, anthropic.NewToolResultBlock(
                    tc.ID, err.Error(), true,
                ))
            } else {
                toolResults = append(toolResults, anthropic.NewToolResultBlock(
                    tc.ID, result, false,
                ))
            }
        }
        messages = append(messages, anthropic.NewUserMessage(toolResults...))

        r.audit.Debug("tool_loop_iteration",
            "tool_calls", len(resp.ToolCalls),
            "total_messages", len(messages),
        )
    }
}

3.6 Tool Registry

Tool vs 基础设施:关键区分

系统中存在两种 I/O 操作,不可混淆:
Tool(工具)基础设施
调用者LLM 通过 tool_call 决策Go 代码直接调用
权限控制按 AgentConfig.Tools 白名单过滤所有 Agent 共享
示例read_file, write_file, run_shellAudit Logger, MessageBus, MCP Client
可审计每次调用都记录审计日志自身就是审计基础设施
例如:Max 没有 write_file 工具,LLM 无法写任意文件。但 Max 的 Go 代码仍可通过 Audit Logger 写日志——这是基础设施层面的 I/O,不受 Tool 权限约束。

工具注册中心,按 Agent 配置过滤可用工具。

package tool

// Tool 工具接口
type Tool interface {
    Name() string
    Description() string
    Parameters() json.RawMessage         // JSON Schema
    Execute(ctx context.Context, input json.RawMessage) (string, error)
}

// Registry 工具注册中心
type Registry struct {
    mu    sync.RWMutex
    tools map[string]Tool
}

func NewRegistry(projectCfg config.ProjectConfig) *Registry {
    r := &Registry{tools: make(map[string]Tool)}

    // 注册所有内置工具
    r.Register(NewReadFile(projectCfg))
    r.Register(NewWriteFile(projectCfg))
    r.Register(NewListDir(projectCfg))
    r.Register(NewRunShell())
    r.Register(NewGitDiff())
    r.Register(NewGitCommit())
    r.Register(NewGitStatus())
    r.Register(NewSearchCode())
    r.Register(NewRunTest())
    r.Register(NewCoverageReport())
    r.Register(NewQueryLog())
    r.Register(NewQueryMetrics())
    // Max 专用工具
    r.Register(NewTaskCreate())
    r.Register(NewTaskUpdate())
    r.Register(NewTaskQuery())
    r.Register(NewDispatchTask())
    r.Register(NewAskUser())

    return r
}

func (r *Registry) Register(t Tool) {
    r.mu.Lock()
    defer r.mu.Unlock()
    r.tools[t.Name()] = t
}

// ForAgent 返回指定 Agent 可用的工具列表
func (r *Registry) ForAgent(allowedNames []string) []Tool {
    r.mu.RLock()
    defer r.mu.RUnlock()

    allowed := make(map[string]bool)
    for _, name := range allowedNames {
        allowed[name] = true
    }

    var result []Tool
    for name, t := range r.tools {
        if allowed[name] {
            result = append(result, t)
        }
    }
    return result
}

// ToAnthropicTools 转换为 Claude API 的工具定义格式
func ToAnthropicTools(tools []Tool) []anthropic.ToolParam {
    var params []anthropic.ToolParam
    for _, t := range tools {
        params = append(params, anthropic.ToolParam{
            Name:        t.Name(),
            Description: anthropic.String(t.Description()),
            InputSchema: t.Parameters(),
        })
    }
    return params
}

内置工具实现示例

// ========== read_file ==========
type ReadFile struct {
    projectRoot string
    sandbox     bool
}

func (t *ReadFile) Name() string        { return "read_file" }
func (t *ReadFile) Description() string { return "读取指定路径的文件内容" }
func (t *ReadFile) Parameters() json.RawMessage {
    return json.RawMessage(`{
        "type": "object",
        "properties": {
            "path": {"type": "string", "description": "文件路径(相对于项目根目录)"}
        },
        "required": ["path"]
    }`)
}

func (t *ReadFile) Execute(ctx context.Context, input json.RawMessage) (string, error) {
    var params struct{ Path string `json:"path"` }
    json.Unmarshal(input, &params)

    absPath := filepath.Join(t.projectRoot, params.Path)
    // 沙箱检查
    if t.sandbox && !strings.HasPrefix(absPath, t.projectRoot) {
        return "", fmt.Errorf("access denied: path outside project root")
    }
    content, err := os.ReadFile(absPath)
    if err != nil {
        return "", err
    }
    return string(content), nil
}

// ========== write_file (带权限控制) ==========
type WriteFile struct {
    projectRoot     string
    blockedPatterns []string
    writePatterns   []string       // 空 = 不限制
}

func (t *WriteFile) Execute(ctx context.Context, input json.RawMessage) (string, error) {
    var params struct {
        Path    string `json:"path"`
        Content string `json:"content"`
    }
    json.Unmarshal(input, &params)

    // 沙箱检查
    absPath := filepath.Join(t.projectRoot, params.Path)
    if !strings.HasPrefix(absPath, t.projectRoot) {
        return "", fmt.Errorf("access denied: path outside project root")
    }
    // 黑名单检查
    for _, pattern := range t.blockedPatterns {
        if matched, _ := filepath.Match(pattern, filepath.Base(params.Path)); matched {
            return "", fmt.Errorf("access denied: %s matches blocked pattern %s", params.Path, pattern)
        }
    }
    // 白名单检查(如 Ray 只能写 *_test.go)
    if len(t.writePatterns) > 0 {
        allowed := false
        for _, pattern := range t.writePatterns {
            if matched, _ := filepath.Match(pattern, filepath.Base(params.Path)); matched {
                allowed = true
                break
            }
        }
        if !allowed {
            return "", fmt.Errorf("access denied: %s not in write whitelist", params.Path)
        }
    }

    if err := os.MkdirAll(filepath.Dir(absPath), 0755); err != nil {
        return "", err
    }
    if err := os.WriteFile(absPath, []byte(params.Content), 0644); err != nil {
        return "", err
    }
    return fmt.Sprintf("written %d bytes to %s", len(params.Content), params.Path), nil
}

// ========== run_shell (白名单) ==========
type RunShell struct {
    allowedCommands []string
}

func (t *RunShell) Execute(ctx context.Context, input json.RawMessage) (string, error) {
    var params struct{ Command string `json:"command"` }
    json.Unmarshal(input, &params)

    // 白名单检查
    allowed := false
    for _, prefix := range t.allowedCommands {
        if strings.HasPrefix(params.Command, prefix) {
            allowed = true
            break
        }
    }
    if !allowed {
        return "", fmt.Errorf("command not allowed: %s", params.Command)
    }

    ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
    defer cancel()

    cmd := exec.CommandContext(ctx, "sh", "-c", params.Command)
    output, err := cmd.CombinedOutput()
    if err != nil {
        return string(output), fmt.Errorf("command failed: %w\noutput: %s", err, output)
    }
    return string(output), nil
}

3.7 MCP Manager

实现 MCP (Model Context Protocol) 客户端,通过 stdio 与 MCP Server 通信。

package mcp

// JSON-RPC 2.0 协议类型
type Request struct {
    JSONRPC string `json:"jsonrpc"`
    ID      int    `json:"id"`
    Method  string `json:"method"`
    Params  any    `json:"params,omitempty"`
}

type Response struct {
    JSONRPC string          `json:"jsonrpc"`
    ID      int             `json:"id"`
    Result  json.RawMessage `json:"result,omitempty"`
    Error   *RPCError       `json:"error,omitempty"`
}

type RPCError struct {
    Code    int    `json:"code"`
    Message string `json:"message"`
}

// MCP Client
type Client struct {
    name    string
    proc    *exec.Cmd
    stdin   io.WriteCloser
    stdout  *bufio.Scanner
    nextID  int
    mu      sync.Mutex
    tools   []tool.Tool
}

func NewClient(cfg config.MCPServerConfig) *Client {
    return &Client{name: cfg.Name}
}

// Connect 启动 MCP Server 进程并完成握手
func (c *Client) Connect(ctx context.Context, cfg config.MCPServerConfig) error {
    // 1. 启动进程
    c.proc = exec.CommandContext(ctx, cfg.Command, cfg.Args...)
    for k, v := range cfg.Env {
        c.proc.Env = append(os.Environ(), k+"="+v)
    }
    var err error
    c.stdin, err = c.proc.StdinPipe()
    if err != nil {
        return err
    }
    stdout, err := c.proc.StdoutPipe()
    if err != nil {
        return err
    }
    c.stdout = bufio.NewScanner(stdout)
    if err := c.proc.Start(); err != nil {
        return fmt.Errorf("start MCP server %s: %w", c.name, err)
    }

    // 2. Initialize handshake
    resp, err := c.call("initialize", map[string]any{
        "protocolVersion": "2024-11-05",
        "capabilities":    map[string]any{},
        "clientInfo": map[string]any{
            "name":    "max-team",
            "version": "0.1.0",
        },
    })
    if err != nil {
        return fmt.Errorf("MCP initialize failed: %w", err)
    }

    // 3. 发送 initialized 通知
    c.notify("notifications/initialized", nil)

    // 4. 获取工具列表
    return c.refreshTools()
}

// refreshTools 调用 tools/list 获取可用工具
func (c *Client) refreshTools() error {
    resp, err := c.call("tools/list", nil)
    if err != nil {
        return err
    }
    var result struct {
        Tools []struct {
            Name        string          `json:"name"`
            Description string          `json:"description"`
            InputSchema json.RawMessage `json:"inputSchema"`
        } `json:"tools"`
    }
    json.Unmarshal(resp, &result)

    c.tools = nil
    for _, t := range result.Tools {
        c.tools = append(c.tools, &MCPTool{
            client:      c,
            name:        t.Name,
            description: t.Description,
            schema:      t.InputSchema,
        })
    }
    return nil
}

// CallTool 调用 MCP 工具
func (c *Client) CallTool(name string, args json.RawMessage) (string, error) {
    resp, err := c.call("tools/call", map[string]any{
        "name":      name,
        "arguments": json.RawMessage(args),
    })
    if err != nil {
        return "", err
    }
    var result struct {
        Content []struct {
            Type string `json:"type"`
            Text string `json:"text"`
        } `json:"content"`
    }
    json.Unmarshal(resp, &result)
    if len(result.Content) > 0 {
        return result.Content[0].Text, nil
    }
    return "", nil
}

// Tools 返回发现的工具(实现 tool.Tool 接口)
func (c *Client) Tools() []tool.Tool { return c.tools }

// call JSON-RPC 调用
func (c *Client) call(method string, params any) (json.RawMessage, error) {
    c.mu.Lock()
    defer c.mu.Unlock()

    c.nextID++
    req := Request{
        JSONRPC: "2.0",
        ID:      c.nextID,
        Method:  method,
        Params:  params,
    }

    data, _ := json.Marshal(req)
    data = append(data, '\n')
    if _, err := c.stdin.Write(data); err != nil {
        return nil, err
    }

    // 读取响应
    if !c.stdout.Scan() {
        return nil, fmt.Errorf("MCP server closed connection")
    }
    var resp Response
    if err := json.Unmarshal(c.stdout.Bytes(), &resp); err != nil {
        return nil, err
    }
    if resp.Error != nil {
        return nil, fmt.Errorf("MCP error %d: %s", resp.Error.Code, resp.Error.Message)
    }
    return resp.Result, nil
}

// Close 关闭 MCP Server
func (c *Client) Close() error {
    c.stdin.Close()
    return c.proc.Wait()
}

// MCPTool 将 MCP 发现的工具包装为 tool.Tool 接口
type MCPTool struct {
    client      *Client
    name        string
    description string
    schema      json.RawMessage
}

func (t *MCPTool) Name() string                 { return t.name }
func (t *MCPTool) Description() string           { return t.description }
func (t *MCPTool) Parameters() json.RawMessage    { return t.schema }
func (t *MCPTool) Execute(ctx context.Context, input json.RawMessage) (string, error) {
    return t.client.CallTool(t.name, input)
}

3.8 Approval Gate

关键决策点暂停执行,等待用户确认。

package gate

type ApprovalGate struct {
    bus *bus.MessageBus
}

func New(b *bus.MessageBus) *ApprovalGate {
    return &ApprovalGate{bus: b}
}

// RequestApproval 向用户展示计划,等待确认
func (g *ApprovalGate) RequestApproval(tasks []*task.Task) (bool, error) {
    // 构建方案摘要
    summary := g.buildPlanSummary(tasks)

    // 通过 Max → User 发送审批请求
    reply, err := g.bus.SendAndWait(bus.Message{
        From:    "scheduler",
        To:      "user",
        Type:    bus.MsgApprovalReq,
        Payload: summary,
    }, 10*time.Minute)

    if err != nil {
        return false, err
    }

    resp := reply.Payload.(*ApprovalResponse)
    return resp.Approved, nil
}

type ApprovalResponse struct {
    Approved bool
    Feedback string    // 用户修改意见(如有)
}

func (g *ApprovalGate) buildPlanSummary(tasks []*task.Task) string {
    var sb strings.Builder
    sb.WriteString("=== 执行计划 ===\n\n")
    for i, t := range tasks {
        deps := "无"
        if len(t.DependsOn) > 0 {
            deps = strings.Join(t.DependsOn, ", ")
        }
        fmt.Fprintf(&sb, "%d. [%s] %s\n   负责人: %s\n   依赖: %s\n\n",
            i+1, t.ID, t.Title, t.AgentName, deps)
    }
    sb.WriteString("确认执行?(y/n): ")
    return sb.String()
}

3.9 Audit Log

结构化审计日志,记录所有关键操作。

package audit

type EventType string

const (
    EventLLMCall    EventType = "llm_call"
    EventToolExec   EventType = "tool_exec"
    EventMsgSent    EventType = "msg_sent"
    EventTaskUpdate EventType = "task_update"
    EventApproval   EventType = "approval"
    EventError      EventType = "error"
)

type Event struct {
    Timestamp time.Time         `json:"ts"`
    Type      EventType         `json:"type"`
    Agent     string            `json:"agent"`
    Data      map[string]any    `json:"data"`
}

type Logger struct {
    writer io.Writer
    mu     sync.Mutex
    level  string
}

func New(cfg config.AuditConfig) *Logger {
    if !cfg.Enabled {
        return &Logger{writer: io.Discard}
    }
    f, _ := os.OpenFile(cfg.Output, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
    return &Logger{writer: f, level: cfg.Level}
}

func (l *Logger) Record(evt Event) {
    l.mu.Lock()
    defer l.mu.Unlock()
    data, _ := json.Marshal(evt)
    l.writer.Write(append(data, '\n'))
}

// 便捷方法
func (l *Logger) LLMCall(agent, model string, inputTokens, outputTokens int) {
    l.Record(Event{
        Timestamp: time.Now(),
        Type:      EventLLMCall,
        Agent:     agent,
        Data: map[string]any{
            "model":         model,
            "input_tokens":  inputTokens,
            "output_tokens": outputTokens,
        },
    })
}

func (l *Logger) ToolExec(agent, toolName string, duration time.Duration, err error) {
    data := map[string]any{
        "tool":     toolName,
        "duration": duration.String(),
        "success":  err == nil,
    }
    if err != nil {
        data["error"] = err.Error()
    }
    l.Record(Event{
        Timestamp: time.Now(),
        Type:      EventToolExec,
        Agent:     agent,
        Data:      data,
    })
}

04Agent 详细设计

4.0 BaseAgent 基类

所有 Agent 共享的基础实现。

package agent

type BaseAgent struct {
    name       string
    config     config.AgentConfig
    router     *llm.Router
    tools      []tool.Tool
    mcpClients []*mcp.Client
    bus        *bus.MessageBus
    inbox      <-chan bus.Message
    audit      *audit.Logger
    status     AgentStatus
    mu         sync.RWMutex
}

type AgentStatus int

const (
    StatusCreated  AgentStatus = iota
    StatusStarting
    StatusReady
    StatusBusy
    StatusStopped
)

func NewBase(cfg config.AgentConfig, router *llm.Router,
    registry *tool.Registry, b *bus.MessageBus, auditLog *audit.Logger) *BaseAgent {
    return &BaseAgent{
        name:   cfg.Name,
        config: cfg,
        router: router,
        tools:  registry.ForAgent(cfg.Tools),
        bus:    b,
        inbox:  b.Subscribe(cfg.Name),
        audit:  auditLog,
        status: StatusCreated,
    }
}

// Start 启动 Agent:连接 MCP → 进入消息循环
func (a *BaseAgent) Start(ctx context.Context) error {
    a.setStatus(StatusStarting)

    // 连接 MCP Servers
    for _, mcpCfg := range a.config.MCPServers {
        client := mcp.NewClient(mcpCfg)
        if err := client.Connect(ctx, mcpCfg); err != nil {
            return fmt.Errorf("MCP connect %s: %w", mcpCfg.Name, err)
        }
        a.mcpClients = append(a.mcpClients, client)
        // 合并 MCP 工具
        a.tools = append(a.tools, client.Tools()...)
    }

    a.setStatus(StatusReady)
    return nil
}

// RunLLM 执行一次完整的 LLM tool-use 循环
func (a *BaseAgent) RunLLM(ctx context.Context, userMessage string) (string, error) {
    a.setStatus(StatusBusy)
    defer a.setStatus(StatusReady)

    messages := []anthropic.MessageParam{
        anthropic.NewUserMessage(anthropic.NewTextBlock(userMessage)),
    }

    resp, err := a.router.RunToolLoop(ctx, llm.ChatRequest{
        Model:     a.config.Model,
        System:    a.config.SystemPrompt,
        Messages:  messages,
        Tools:     tool.ToAnthropicTools(a.tools),
        MaxTokens: a.config.Constraints.MaxTokens,
    }, func(tc llm.ToolCall) (string, error) {
        // 查找并执行工具
        for _, t := range a.tools {
            if t.Name() == tc.Name {
                start := time.Now()
                result, err := t.Execute(ctx, tc.Input)
                a.audit.ToolExec(a.name, tc.Name, time.Since(start), err)
                return result, err
            }
        }
        return "", fmt.Errorf("unknown tool: %s", tc.Name)
    })

    if err != nil {
        return "", err
    }
    return resp.Content, nil
}

// Stop 清理资源
func (a *BaseAgent) Stop() error {
    a.setStatus(StatusStopped)
    for _, c := range a.mcpClients {
        c.Close()
    }
    return nil
}

4.1 Max (Team Lead)

核心职责:用户唯一对话入口,需求理解 → 任务拆解 → DAG 生成 → 方案确认 → 调度执行 → 结果汇总
package agent

type Max struct {
    *BaseAgent
    scheduler *scheduler.Scheduler
}

func NewMax(cfg config.AgentConfig, router *llm.Router,
    registry *tool.Registry, b *bus.MessageBus,
    sched *scheduler.Scheduler, auditLog *audit.Logger) *Max {
    return &Max{
        BaseAgent: NewBase(cfg, router, registry, b, auditLog),
        scheduler: sched,
    }
}

// Run Max 的主循环
func (m *Max) Run(ctx context.Context) error {
    if err := m.Start(ctx); err != nil {
        return err
    }
    defer m.Stop()

    for {
        select {
        case <-ctx.Done():
            return nil
        case msg := <-m.inbox:
            switch msg.Type {
            case bus.MsgUserInput:
                m.handleUserInput(ctx, msg)
            case bus.MsgTaskResult:
                m.handleTaskResult(ctx, msg)
            case bus.MsgTaskFailed:
                m.handleTaskFailed(ctx, msg)
            case bus.MsgNeedClarify:
                m.handleClarification(ctx, msg)
            case bus.MsgAllTasksDone:
                m.handleAllDone(ctx, msg)
            }
        }
    }
}

func (m *Max) handleUserInput(ctx context.Context, msg bus.Message) {
    userInput := msg.Payload.(string)

    // 1. 调用 LLM 理解需求并生成任务计划
    //    Max 的 system prompt 指导他输出结构化的任务列表
    prompt := fmt.Sprintf(`用户需求:%s

请分析这个需求,拆解为具体的执行任务。
每个任务指定负责人(leo/ada/ray/sam)和依赖关系。
输出 JSON 格式的任务列表。`, userInput)

    result, err := m.RunLLM(ctx, prompt)
    if err != nil {
        m.replyToUser("抱歉,我遇到了一个错误:" + err.Error())
        return
    }

    // 2. 解析 LLM 输出为任务列表
    tasks, err := m.parsePlan(result)
    if err != nil {
        // 非结构化输出,直接回复用户
        m.replyToUser(result)
        return
    }

    // 3. 提交给 Scheduler(会触发 Approval Gate)
    if err := m.scheduler.SubmitPlan(tasks); err != nil {
        m.replyToUser("计划未通过:" + err.Error())
        return
    }

    m.replyToUser("计划已确认,开始执行...")
}

func (m *Max) handleAllDone(ctx context.Context, msg bus.Message) {
    // 汇总所有任务结果
    summary := m.scheduler.GetSummary()

    // 调用 LLM 生成人类可读的汇总
    prompt := fmt.Sprintf("以下是所有任务的执行结果,请汇总为简洁的报告:\n%s", summary)
    result, _ := m.RunLLM(ctx, prompt)

    m.replyToUser(result)
}

func (m *Max) replyToUser(content string) {
    m.bus.Send(bus.Message{
        From:    "max",
        To:      "user",
        Type:    bus.MsgUserReply,
        Payload: content,
    })
}

4.2 Leo (Coder)

核心职责:编写/修改代码、修复 Bug、重构、Git 操作
package agent

type Leo struct {
    *BaseAgent
}

func NewLeo(cfg config.AgentConfig, router *llm.Router,
    registry *tool.Registry, b *bus.MessageBus, auditLog *audit.Logger) *Leo {
    return &Leo{BaseAgent: NewBase(cfg, router, registry, b, auditLog)}
}

func (l *Leo) Run(ctx context.Context) error {
    if err := l.Start(ctx); err != nil {
        return err
    }
    defer l.Stop()

    for {
        select {
        case <-ctx.Done():
            return nil
        case msg := <-l.inbox:
            if msg.Type == bus.MsgTaskAssign {
                l.handleTask(ctx, msg)
            }
        }
    }
}

func (l *Leo) handleTask(ctx context.Context, msg bus.Message) {
    t := msg.Payload.(*task.Task)

    // 构建 prompt,包含任务描述 + 上游 Artifact
    prompt := l.buildPrompt(t)

    // 通知 Max 开始执行
    l.bus.Send(bus.Message{
        From: "leo", To: "scheduler",
        Type: bus.MsgProgress,
        Payload: fmt.Sprintf("开始执行: %s", t.Title),
    })

    // 执行 LLM tool-use 循环(读代码、写代码、跑测试…)
    result, err := l.RunLLM(ctx, prompt)
    if err != nil {
        l.bus.Send(bus.Message{
            From: "leo", To: "scheduler",
            Type: bus.MsgTaskFailed,
            Payload: t.ID,
        })
        return
    }

    // 生成 git diff 作为产出物
    diffOutput, _ := exec.CommandContext(ctx, "git", "diff").Output()

    l.bus.Send(bus.Message{
        From: "leo", To: "scheduler",
        Type: bus.MsgTaskResult,
        Payload: &task.Result{
            TaskID:  t.ID,
            Success: true,
            Output:  result,
            Artifacts: []task.Artifact{
                {Name: "code_diff", Type: "text", Content: string(diffOutput)},
            },
        },
    })
}

func (l *Leo) buildPrompt(t *task.Task) string {
    var sb strings.Builder
    sb.WriteString("## 任务\n" + t.Description + "\n\n")
    if len(t.Artifacts) > 0 {
        sb.WriteString("## 上游参考\n")
        for _, a := range t.Artifacts {
            fmt.Fprintf(&sb, "### %s\n%s\n\n", a.Name, a.Content)
        }
    }
    sb.WriteString("请完成上述任务。使用工具读取、修改代码。完成后说明你做了什么。")
    return sb.String()
}

4.3 Ada (Architect)

核心职责:技术方案设计、架构评审、只读代码分析

结构与 Leo 类似,区别在于:

  • 只有只读工具(read_file, list_dir, search_code)
  • MCP 接入 gitnexus + cooper
  • 产出物为 design_doc(文本形式的设计文档)
  • System Prompt 引导输出结构化方案

4.4 Ray (Tester)

核心职责:编写测试、执行测试、覆盖率分析

区别:

  • write_file 限制为 *_test.go 文件
  • 专用工具 run_test 封装 go test -v -gcflags="all=-N -l"
  • 专用工具 coverage_report 封装 go test -coverprofile
  • 产出物为 test_report(测试结果 + 覆盖率)

4.5 Sam (Ops)

核心职责:日志查询、监控检查、问题排查辅助

区别:

  • 无文件读写权限
  • 工具为 query_log(对接日志系统 API)和 query_metrics(对接监控 API)
  • MCP 接入 grafana
  • 使用 Haiku 模型(查询类任务不需要强推理)
  • 产出物为 ops_report(日志分析 + 指标摘要)

05核心流程时序图

5.1 新功能开发完整流程

User Max Scheduler Ada Leo Ray 需求描述 LLM: 理解需求,生成 DAG SubmitPlan(tasks) ApprovalReq: 执行方案 Approved ✓ TaskAssign: T1 设计方案 LLM + read_file + gitnexus TaskResult: design_doc TaskAssign: T2 实现代码 (+design_doc) LLM + read/write_file + run_shell + git TaskResult: code_diff TaskAssign: T3 编写测试 (+code_diff) LLM + run_test TaskResult: test_report AllTasksDone 汇总报告

06错误处理与重试

错误类型处理策略最大重试
LLM API 调用失败(网络/限流) 指数退避重试:1s → 2s → 4s 3 次
工具执行失败(命令报错) 错误信息回传 LLM,让 Agent 自行修正 LLM 循环内处理
MCP Server 连接断开 自动重连,重连失败则降级(移除 MCP 工具) 3 次
任务执行超时 取消当前 LLM 调用,标记 Failed,通知 Max
任务执行失败(Agent 报错) Scheduler 自动重试,超过上限后上报 Max 决策 2 次
Max 决策失败的任务 可选:跳过 / 手动修复 / 调整方案重试 上报用户

LLM API 重试实现

func (r *Router) chatWithRetry(ctx context.Context, req ChatRequest) (*ChatResponse, error) {
    var lastErr error
    for i := 0; i < r.config.MaxRetries; i++ {
        resp, err := r.Chat(ctx, req)
        if err == nil {
            return resp, nil
        }
        lastErr = err

        // 判断是否可重试
        if !isRetryable(err) {
            return nil, err
        }

        // 指数退避
        backoff := time.Duration(1<<uint(i)) * time.Second
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        case <-time.After(backoff):
        }
    }
    return nil, fmt.Errorf("max retries exceeded: %w", lastErr)
}

func isRetryable(err error) bool {
    // 429 (rate limit), 500, 502, 503 可重试
    // 400, 401, 403 不可重试
    var apiErr *anthropic.APIError
    if errors.As(err, &apiErr) {
        switch apiErr.StatusCode {
        case 429, 500, 502, 503:
            return true
        }
    }
    return false
}

07依赖与构建

7.1 Go 模块依赖

依赖用途版本
github.com/anthropics/anthropic-sdk-goClaude API 客户端latest
github.com/google/uuid消息/任务 ID 生成v1.6+
gopkg.in/yaml.v3YAML 配置解析v3
github.com/rs/zerolog结构化日志v1.33+

7.2 构建与运行

# 构建
go build -o max-team ./cmd/max-team

# 运行
export ANTHROPIC_API_KEY=sk-ant-...
./max-team --config config.yaml --agents config/agents.yaml

# 开发模式
go run ./cmd/max-team --config config.yaml --agents config/agents.yaml

7.3 入口文件

package main

import (
    "flag"
    "fmt"
    "os"
    "os/signal"
    "syscall"

    "github.com/horizon6666/max-team/internal/config"
    "github.com/horizon6666/max-team/internal/runtime"
)

func main() {
    configPath := flag.String("config", "config.yaml", "global config path")
    agentsPath := flag.String("agents", "config/agents.yaml", "agents config path")
    flag.Parse()

    // 加载配置
    cfg, agentsCfg, err := config.Load(*configPath, *agentsPath)
    if err != nil {
        fmt.Fprintf(os.Stderr, "config error: %v\n", err)
        os.Exit(1)
    }

    // 创建 Runtime
    rt, err := runtime.New(cfg, agentsCfg)
    if err != nil {
        fmt.Fprintf(os.Stderr, "runtime error: %v\n", err)
        os.Exit(1)
    }

    // 优雅关闭
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        <-sigCh
        fmt.Println("\nShutting down...")
        rt.Stop()
    }()

    // 运行
    if err := rt.Run(); err != nil {
        fmt.Fprintf(os.Stderr, "runtime error: %v\n", err)
        os.Exit(1)
    }
}