config.yaml
server:
port: 8080 # Web UI 端口(Phase 3)
mode: cli # cli | web
llm:
provider: anthropic
api_key: ${ANTHROPIC_API_KEY} # 环境变量引用
base_url: https://api.anthropic.com
default_model: claude-sonnet-4-6
max_retries: 3
timeout: 120s
audit:
enabled: true
output: logs/audit.jsonl # 结构化日志文件
level: info # debug | info | warn | error
project:
root: . # 项目根目录
sandbox: true # 是否开启目录沙箱
config/agents.yaml
agents:
- name: max
role: team_lead
model: claude-sonnet-4-6
system_prompt: |
你是 Max,一个专业的项目经理和私人助理。
你的职责是理解用户需求、拆解任务、制定执行方案、协调团队成员、追踪进度。
你不直接操作文件或执行命令,而是通过派发任务给团队成员来完成工作。
关键决策必须先征得用户确认。
tools:
- task_create
- task_update
- task_query
- dispatch_task
- ask_user
mcp_servers: []
constraints:
max_tokens: 4096
timeout: 60s
- name: leo
role: coder
model: claude-opus-4-7
system_prompt: |
你是 Leo,一个资深 Go 开发工程师。
你的职责是编写高质量的 Go 代码、修复 Bug、进行代码重构。
你遵循 Go 的最佳实践,代码简洁、可读、安全。
完成任务后提交代码变更(不自行 push)。
tools:
- read_file
- write_file
- list_dir
- run_shell
- git_diff
- git_commit
- git_status
- search_code
mcp_servers:
- name: gitnexus
command: npx
args: ["gitnexus-mcp"]
constraints:
allowed_dirs:
- "."
blocked_patterns:
- "*.env"
- "*credentials*"
- "*secret*"
allowed_commands:
- "go build"
- "go test"
- "go vet"
- "go fmt"
- "go mod tidy"
- "git diff"
- "git status"
- "git log"
- "grep"
- "find"
max_tokens: 8192
timeout: 300s
- name: ada
role: architect
model: claude-opus-4-7
system_prompt: |
你是 Ada,一个资深架构师。
你的职责是设计技术方案、评审架构、进行技术选型、输出设计文档。
你有丰富的系统设计经验,善于权衡取舍。
你只读代码分析,不修改任何文件。
tools:
- read_file
- list_dir
- search_code
mcp_servers:
- name: gitnexus
command: npx
args: ["gitnexus-mcp"]
- name: cooper
command: npx
args: ["cooper-mcp"]
constraints:
allowed_dirs:
- "."
max_tokens: 8192
timeout: 180s
- name: ray
role: tester
model: claude-sonnet-4-6
system_prompt: |
你是 Ray,一个测试工程师。
你的职责是编写单元测试和集成测试、执行测试、分析覆盖率。
你只在测试文件(*_test.go)中写入代码。
你使用 Go 标准测试框架和 testify。
tools:
- read_file
- write_file
- list_dir
- run_test
- coverage_report
mcp_servers: []
constraints:
allowed_dirs:
- "."
write_patterns:
- "*_test.go"
max_tokens: 4096
timeout: 180s
- name: sam
role: ops
model: claude-haiku-4-5
system_prompt: |
你是 Sam,一个运维工程师。
你的职责是查询日志、检查监控指标、协助问题排查。
你不修改任何文件,只做查询和分析。
tools:
- query_log
- query_metrics
mcp_servers:
- name: grafana
command: npx
args: ["grafana-mcp"]
constraints:
max_tokens: 2048
timeout: 60s
package config
type Config struct {
Server ServerConfig `yaml:"server"`
LLM LLMConfig `yaml:"llm"`
Audit AuditConfig `yaml:"audit"`
Project ProjectConfig `yaml:"project"`
}
type AgentsConfig struct {
Agents []AgentConfig `yaml:"agents"`
}
type AgentConfig struct {
Name string `yaml:"name"`
Role string `yaml:"role"`
Model string `yaml:"model"`
SystemPrompt string `yaml:"system_prompt"`
Tools []string `yaml:"tools"`
MCPServers []MCPServerConfig `yaml:"mcp_servers"`
Constraints Constraints `yaml:"constraints"`
}
type Constraints struct {
AllowedDirs []string `yaml:"allowed_dirs"`
BlockedPatterns []string `yaml:"blocked_patterns"`
WritePatterns []string `yaml:"write_patterns"`
AllowedCommands []string `yaml:"allowed_commands"`
MaxTokens int `yaml:"max_tokens"`
Timeout time.Duration `yaml:"timeout"`
}
type MCPServerConfig struct {
Name string `yaml:"name"`
Command string `yaml:"command"`
Args []string `yaml:"args"`
Env map[string]string `yaml:"env,omitempty"`
}
func Load(configPath, agentsPath string) (*Config, *AgentsConfig, error) {
// 1. 读取 YAML
// 2. 展开环境变量 ${VAR}
// 3. 校验必填字段
// 4. 设置默认值
}
管理所有 Agent 的生命周期,负责创建、启动、监控和停止。
package runtime
type Runtime struct {
config *config.Config
agents map[string]agent.Agent // name → agent
bus *bus.MessageBus
scheduler *scheduler.Scheduler
gate *gate.ApprovalGate
auditLog *audit.Logger
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// New 创建 Runtime 实例
func New(cfg *config.Config, agentsCfg *config.AgentsConfig) (*Runtime, error) {
r := &Runtime{
config: cfg,
agents: make(map[string]agent.Agent),
}
r.ctx, r.cancel = context.WithCancel(context.Background())
// 初始化基础设施
r.bus = bus.New()
r.auditLog = audit.New(cfg.Audit)
r.gate = gate.New(r.bus)
// 创建 LLM Router
router := llm.NewRouter(cfg.LLM)
// 创建 Tool Registry
registry := tool.NewRegistry(cfg.Project)
// 根据配置创建各 Agent
for _, ac := range agentsCfg.Agents {
a, err := agent.New(ac, router, registry, r.bus, r.auditLog)
if err != nil {
return nil, fmt.Errorf("create agent %s: %w", ac.Name, err)
}
r.agents[ac.Name] = a
}
// 创建 Scheduler
taskMgr := task.NewManager()
r.scheduler = scheduler.New(taskMgr, r.agents, r.bus, r.gate)
return r, nil
}
// Start 启动所有 Agent 和调度器
func (r *Runtime) Start() error {
// 1. 启动消息总线
r.bus.Start(r.ctx)
// 2. 启动各 Agent(每个 Agent 一个 goroutine)
for name, a := range r.agents {
r.wg.Add(1)
go func(name string, a agent.Agent) {
defer r.wg.Done()
if err := a.Start(r.ctx); err != nil {
r.auditLog.Error("agent_failed", "name", name, "err", err)
}
}(name, a)
}
// 3. 启动调度器
r.wg.Add(1)
go func() {
defer r.wg.Done()
r.scheduler.Run(r.ctx)
}()
return nil
}
// Stop 优雅关闭
func (r *Runtime) Stop() {
r.cancel()
r.wg.Wait()
r.auditLog.Close()
}
// Run 主循环:读取用户输入 → 发送给 Max
func (r *Runtime) Run() error {
if err := r.Start(); err != nil {
return err
}
defer r.Stop()
max := r.agents["max"]
scanner := bufio.NewScanner(os.Stdin)
fmt.Println("Max Team ready. Type your request:")
for scanner.Scan() {
input := scanner.Text()
if input == "/quit" {
break
}
// 用户输入 → 发送给 Max
r.bus.Send(bus.Message{
From: "user",
To: "max",
Type: bus.MsgUserInput,
Payload: input,
})
// 等待 Max 回复
reply := r.bus.WaitForReply("max", "user")
fmt.Println("\n" + reply.Payload.(string))
}
return nil
}
| 状态 | 说明 | 转换条件 |
|---|---|---|
Created | Agent 已创建,未启动 | New() 调用后 |
Starting | 初始化中(连接 MCP 等) | Start() 调用 |
Ready | 等待任务 | 初始化完成 |
Busy | 正在执行任务 | 收到 TaskAssign |
Stopped | 已停止 | Stop() 或 ctx 取消 |
基于 Go channel 的星型消息总线,所有消息经由中心路由。
package bus
type MessageType int
const (
MsgUserInput MessageType = iota // 用户输入
MsgUserReply // 回复用户
MsgTaskAssign // 分配任务
MsgTaskResult // 任务结果
MsgTaskFailed // 任务失败
MsgNeedClarify // 需要澄清
MsgProgress // 进度更新
MsgApprovalReq // 请求审批
MsgApprovalResp // 审批结果
MsgShutdown // 关闭信号
)
type Message struct {
ID string // 消息唯一 ID
From string // 发送方 agent name
To string // 接收方 agent name
Type MessageType
Payload any // 消息内容
ReplyTo string // 关联的请求消息 ID(用于请求-响应配对)
Timestamp time.Time
}
type MessageBus struct {
mu sync.RWMutex
subs map[string]chan Message // agent name → inbox channel
pending map[string]chan Message // 等待回复的 channel(按消息 ID)
audit *audit.Logger
}
func New() *MessageBus {
return &MessageBus{
subs: make(map[string]chan Message),
pending: make(map[string]chan Message),
}
}
// Subscribe 注册一个 agent 的收件箱
func (b *MessageBus) Subscribe(name string) <-chan Message {
b.mu.Lock()
defer b.mu.Unlock()
ch := make(chan Message, 32) // 缓冲区防止阻塞
b.subs[name] = ch
return ch
}
// Send 发送消息到目标 agent
func (b *MessageBus) Send(msg Message) {
if msg.ID == "" {
msg.ID = uuid.New().String()
}
msg.Timestamp = time.Now()
b.mu.RLock()
ch, ok := b.subs[msg.To]
b.mu.RUnlock()
if !ok {
b.audit.Warn("msg_dropped", "to", msg.To, "reason", "no subscriber")
return
}
// 非阻塞发送,防止死锁
select {
case ch <- msg:
b.audit.Debug("msg_sent", "from", msg.From, "to", msg.To, "type", msg.Type)
default:
b.audit.Warn("msg_dropped", "to", msg.To, "reason", "inbox full")
}
// 如果有人在等这个消息的回复
if msg.ReplyTo != "" {
b.mu.RLock()
replyCh, ok := b.pending[msg.ReplyTo]
b.mu.RUnlock()
if ok {
replyCh <- msg
}
}
}
// SendAndWait 发送消息并等待回复(同步调用)
func (b *MessageBus) SendAndWait(msg Message, timeout time.Duration) (Message, error) {
replyCh := make(chan Message, 1)
b.mu.Lock()
b.pending[msg.ID] = replyCh
b.mu.Unlock()
defer func() {
b.mu.Lock()
delete(b.pending, msg.ID)
b.mu.Unlock()
}()
b.Send(msg)
select {
case reply := <-replyCh:
return reply, nil
case <-time.After(timeout):
return Message{}, fmt.Errorf("timeout waiting for reply to %s", msg.ID)
}
}
管理任务的创建、查询、状态流转和持久化。
package task
type Status int
const (
StatusCreated Status = iota
StatusPlanned
StatusApproved
StatusDispatched
StatusRunning
StatusDone
StatusFailed
)
type Task struct {
ID string `json:"id"`
Title string `json:"title"`
Description string `json:"description"`
AgentName string `json:"agent_name"` // 目标 agent: "leo", "ada", "ray", "sam"
DependsOn []string `json:"depends_on"` // 前置任务 ID
Artifacts []Artifact `json:"artifacts"` // 上游产出物
Status Status `json:"status"`
Result *Result `json:"result,omitempty"`
RetryCount int `json:"retry_count"`
MaxRetries int `json:"max_retries"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
FinishedAt *time.Time `json:"finished_at,omitempty"`
}
type Artifact struct {
Name string `json:"name"` // "design_doc", "code_diff", "test_report"
Type string `json:"type"` // "text", "file_path", "json"
Content string `json:"content"`
}
type Result struct {
Success bool `json:"success"`
Output string `json:"output"`
Artifacts []Artifact `json:"artifacts,omitempty"`
Error string `json:"error,omitempty"`
}
// 状态转换合法性校验
var validTransitions = map[Status][]Status{
StatusCreated: {StatusPlanned},
StatusPlanned: {StatusApproved},
StatusApproved: {StatusDispatched},
StatusDispatched: {StatusRunning},
StatusRunning: {StatusDone, StatusFailed},
StatusFailed: {StatusDispatched}, // 重试
}
type Manager struct {
mu sync.RWMutex
tasks map[string]*Task
// TODO: Phase 2 → SQLite 持久化
}
func NewManager() *Manager {
return &Manager{tasks: make(map[string]*Task)}
}
func (m *Manager) Create(t *Task) error {
m.mu.Lock()
defer m.mu.Unlock()
t.ID = uuid.New().String()[:8]
t.Status = StatusCreated
t.CreatedAt = time.Now()
t.UpdatedAt = time.Now()
t.MaxRetries = 2
m.tasks[t.ID] = t
return nil
}
func (m *Manager) Transition(taskID string, to Status) error {
m.mu.Lock()
defer m.mu.Unlock()
t, ok := m.tasks[taskID]
if !ok {
return fmt.Errorf("task %s not found", taskID)
}
// 校验状态转换合法性
allowed := validTransitions[t.Status]
valid := false
for _, s := range allowed {
if s == to {
valid = true
break
}
}
if !valid {
return fmt.Errorf("invalid transition: %v → %v", t.Status, to)
}
t.Status = to
t.UpdatedAt = time.Now()
if to == StatusRunning {
now := time.Now()
t.StartedAt = &now
}
if to == StatusDone || to == StatusFailed {
now := time.Now()
t.FinishedAt = &now
}
return nil
}
// ReadyToDispatch 返回所有前置任务已完成且状态为 Approved 的任务
func (m *Manager) ReadyToDispatch() []*Task {
m.mu.RLock()
defer m.mu.RUnlock()
var ready []*Task
for _, t := range m.tasks {
if t.Status != StatusApproved {
continue
}
allDone := true
for _, depID := range t.DependsOn {
dep := m.tasks[depID]
if dep == nil || dep.Status != StatusDone {
allDone = false
break
}
}
if allDone {
ready = append(ready, t)
}
}
return ready
}
// InjectArtifacts 将上游任务的产出物注入下游任务
func (m *Manager) InjectArtifacts(taskID string) {
m.mu.Lock()
defer m.mu.Unlock()
t := m.tasks[taskID]
if t == nil {
return
}
for _, depID := range t.DependsOn {
dep := m.tasks[depID]
if dep != nil && dep.Result != nil {
t.Artifacts = append(t.Artifacts, dep.Result.Artifacts...)
}
}
}
解析任务依赖图,按拓扑序调度。无依赖的任务并行派发。
package scheduler
type Scheduler struct {
taskMgr *task.Manager
agents map[string]agent.Agent
bus *bus.MessageBus
gate *gate.ApprovalGate
audit *audit.Logger
inbox <-chan bus.Message
}
func New(tm *task.Manager, agents map[string]agent.Agent,
b *bus.MessageBus, g *gate.ApprovalGate) *Scheduler {
s := &Scheduler{
taskMgr: tm,
agents: agents,
bus: b,
gate: g,
}
s.inbox = b.Subscribe("scheduler")
return s
}
// SubmitPlan Max 提交执行计划(一组 Task 构成的 DAG)
func (s *Scheduler) SubmitPlan(tasks []*task.Task) error {
// 1. 验证 DAG 无环
if err := s.validateDAG(tasks); err != nil {
return fmt.Errorf("invalid DAG: %w", err)
}
// 2. 所有任务设为 Planned
for _, t := range tasks {
s.taskMgr.Create(t)
s.taskMgr.Transition(t.ID, task.StatusPlanned)
}
// 3. 请求用户审批
approved, err := s.gate.RequestApproval(tasks)
if err != nil || !approved {
return fmt.Errorf("plan rejected by user")
}
// 4. 审批通过,所有任务设为 Approved
for _, t := range tasks {
s.taskMgr.Transition(t.ID, task.StatusApproved)
}
// 5. 派发就绪任务
s.dispatchReady()
return nil
}
// dispatchReady 派发所有前置完成的任务
func (s *Scheduler) dispatchReady() {
ready := s.taskMgr.ReadyToDispatch()
for _, t := range ready {
// 注入上游 Artifact
s.taskMgr.InjectArtifacts(t.ID)
s.taskMgr.Transition(t.ID, task.StatusDispatched)
// 通过消息总线分配给目标 Agent
s.bus.Send(bus.Message{
From: "scheduler",
To: t.AgentName,
Type: bus.MsgTaskAssign,
Payload: t,
})
}
}
// Run 调度器主循环
func (s *Scheduler) Run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case msg := <-s.inbox:
switch msg.Type {
case bus.MsgTaskResult:
s.handleTaskResult(msg)
case bus.MsgTaskFailed:
s.handleTaskFailed(msg)
}
}
}
}
func (s *Scheduler) handleTaskResult(msg bus.Message) {
result := msg.Payload.(*task.Result)
taskID := result.TaskID
s.taskMgr.SetResult(taskID, result)
s.taskMgr.Transition(taskID, task.StatusDone)
// 检查是否触发下游任务
s.dispatchReady()
// 检查是否全部完成
if s.taskMgr.AllDone() {
s.bus.Send(bus.Message{
From: "scheduler",
To: "max",
Type: bus.MsgAllTasksDone,
})
}
}
func (s *Scheduler) handleTaskFailed(msg bus.Message) {
taskID := msg.Payload.(string)
t := s.taskMgr.Get(taskID)
if t.RetryCount < t.MaxRetries {
// 自动重试
t.RetryCount++
s.taskMgr.Transition(taskID, task.StatusDispatched)
s.bus.Send(bus.Message{
From: "scheduler",
To: t.AgentName,
Type: bus.MsgTaskAssign,
Payload: t,
})
} else {
// 超过重试上限,通知 Max 决策
s.bus.Send(bus.Message{
From: "scheduler",
To: "max",
Type: bus.MsgTaskFailed,
Payload: t,
})
}
}
// validateDAG 拓扑排序检测环
func (s *Scheduler) validateDAG(tasks []*task.Task) error {
// Kahn's algorithm
inDegree := make(map[string]int)
graph := make(map[string][]string)
for _, t := range tasks {
inDegree[t.ID] = len(t.DependsOn)
for _, dep := range t.DependsOn {
graph[dep] = append(graph[dep], t.ID)
}
}
var queue []string
for _, t := range tasks {
if inDegree[t.ID] == 0 {
queue = append(queue, t.ID)
}
}
visited := 0
for len(queue) > 0 {
node := queue[0]
queue = queue[1:]
visited++
for _, next := range graph[node] {
inDegree[next]--
if inDegree[next] == 0 {
queue = append(queue, next)
}
}
}
if visited != len(tasks) {
return fmt.Errorf("cycle detected in task dependencies")
}
return nil
}
封装 Claude API 调用,支持多模型路由和 tool use 循环。
package llm
type Router struct {
client *anthropic.Client
config config.LLMConfig
audit *audit.Logger
}
func NewRouter(cfg config.LLMConfig) *Router {
client := anthropic.NewClient(cfg.APIKey)
return &Router{client: client, config: cfg}
}
type ChatRequest struct {
Model string
System string
Messages []anthropic.MessageParam
Tools []anthropic.ToolParam
MaxTokens int
}
type ChatResponse struct {
Content string
ToolCalls []ToolCall
StopReason string
Usage Usage
}
type ToolCall struct {
ID string
Name string
Input json.RawMessage
}
type Usage struct {
InputTokens int
OutputTokens int
CacheRead int
CacheWrite int
}
// Chat 单次 LLM 调用
func (r *Router) Chat(ctx context.Context, req ChatRequest) (*ChatResponse, error) {
model := req.Model
if model == "" {
model = r.config.DefaultModel
}
resp, err := r.client.Messages.New(ctx, anthropic.MessageNewParams{
Model: model,
System: anthropic.NewTextBlock(req.System),
Messages: req.Messages,
Tools: req.Tools,
MaxTokens: int64(req.MaxTokens),
})
if err != nil {
return nil, fmt.Errorf("claude API error: %w", err)
}
return r.parseResponse(resp), nil
}
// RunToolLoop Agent 的核心循环:LLM 调用 → 工具执行 → 结果回传 → 继续
func (r *Router) RunToolLoop(ctx context.Context, req ChatRequest,
executor func(ToolCall) (string, error)) (*ChatResponse, error) {
messages := req.Messages
for {
resp, err := r.Chat(ctx, ChatRequest{
Model: req.Model,
System: req.System,
Messages: messages,
Tools: req.Tools,
MaxTokens: req.MaxTokens,
})
if err != nil {
return nil, err
}
// 没有 tool call,返回最终结果
if len(resp.ToolCalls) == 0 || resp.StopReason == "end_turn" {
return resp, nil
}
// 执行所有 tool calls
assistantMsg := anthropic.NewAssistantMessage(resp.rawContent...)
messages = append(messages, assistantMsg)
var toolResults []anthropic.ContentBlockParam
for _, tc := range resp.ToolCalls {
result, err := executor(tc)
if err != nil {
toolResults = append(toolResults, anthropic.NewToolResultBlock(
tc.ID, err.Error(), true,
))
} else {
toolResults = append(toolResults, anthropic.NewToolResultBlock(
tc.ID, result, false,
))
}
}
messages = append(messages, anthropic.NewUserMessage(toolResults...))
r.audit.Debug("tool_loop_iteration",
"tool_calls", len(resp.ToolCalls),
"total_messages", len(messages),
)
}
}
| Tool(工具) | 基础设施 | |
|---|---|---|
| 调用者 | LLM 通过 tool_call 决策 | Go 代码直接调用 |
| 权限控制 | 按 AgentConfig.Tools 白名单过滤 | 所有 Agent 共享 |
| 示例 | read_file, write_file, run_shell | Audit Logger, MessageBus, MCP Client |
| 可审计 | 每次调用都记录审计日志 | 自身就是审计基础设施 |
write_file 工具,LLM 无法写任意文件。但 Max 的 Go 代码仍可通过 Audit Logger 写日志——这是基础设施层面的 I/O,不受 Tool 权限约束。
工具注册中心,按 Agent 配置过滤可用工具。
package tool
// Tool 工具接口
type Tool interface {
Name() string
Description() string
Parameters() json.RawMessage // JSON Schema
Execute(ctx context.Context, input json.RawMessage) (string, error)
}
// Registry 工具注册中心
type Registry struct {
mu sync.RWMutex
tools map[string]Tool
}
func NewRegistry(projectCfg config.ProjectConfig) *Registry {
r := &Registry{tools: make(map[string]Tool)}
// 注册所有内置工具
r.Register(NewReadFile(projectCfg))
r.Register(NewWriteFile(projectCfg))
r.Register(NewListDir(projectCfg))
r.Register(NewRunShell())
r.Register(NewGitDiff())
r.Register(NewGitCommit())
r.Register(NewGitStatus())
r.Register(NewSearchCode())
r.Register(NewRunTest())
r.Register(NewCoverageReport())
r.Register(NewQueryLog())
r.Register(NewQueryMetrics())
// Max 专用工具
r.Register(NewTaskCreate())
r.Register(NewTaskUpdate())
r.Register(NewTaskQuery())
r.Register(NewDispatchTask())
r.Register(NewAskUser())
return r
}
func (r *Registry) Register(t Tool) {
r.mu.Lock()
defer r.mu.Unlock()
r.tools[t.Name()] = t
}
// ForAgent 返回指定 Agent 可用的工具列表
func (r *Registry) ForAgent(allowedNames []string) []Tool {
r.mu.RLock()
defer r.mu.RUnlock()
allowed := make(map[string]bool)
for _, name := range allowedNames {
allowed[name] = true
}
var result []Tool
for name, t := range r.tools {
if allowed[name] {
result = append(result, t)
}
}
return result
}
// ToAnthropicTools 转换为 Claude API 的工具定义格式
func ToAnthropicTools(tools []Tool) []anthropic.ToolParam {
var params []anthropic.ToolParam
for _, t := range tools {
params = append(params, anthropic.ToolParam{
Name: t.Name(),
Description: anthropic.String(t.Description()),
InputSchema: t.Parameters(),
})
}
return params
}
// ========== read_file ==========
type ReadFile struct {
projectRoot string
sandbox bool
}
func (t *ReadFile) Name() string { return "read_file" }
func (t *ReadFile) Description() string { return "读取指定路径的文件内容" }
func (t *ReadFile) Parameters() json.RawMessage {
return json.RawMessage(`{
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件路径(相对于项目根目录)"}
},
"required": ["path"]
}`)
}
func (t *ReadFile) Execute(ctx context.Context, input json.RawMessage) (string, error) {
var params struct{ Path string `json:"path"` }
json.Unmarshal(input, ¶ms)
absPath := filepath.Join(t.projectRoot, params.Path)
// 沙箱检查
if t.sandbox && !strings.HasPrefix(absPath, t.projectRoot) {
return "", fmt.Errorf("access denied: path outside project root")
}
content, err := os.ReadFile(absPath)
if err != nil {
return "", err
}
return string(content), nil
}
// ========== write_file (带权限控制) ==========
type WriteFile struct {
projectRoot string
blockedPatterns []string
writePatterns []string // 空 = 不限制
}
func (t *WriteFile) Execute(ctx context.Context, input json.RawMessage) (string, error) {
var params struct {
Path string `json:"path"`
Content string `json:"content"`
}
json.Unmarshal(input, ¶ms)
// 沙箱检查
absPath := filepath.Join(t.projectRoot, params.Path)
if !strings.HasPrefix(absPath, t.projectRoot) {
return "", fmt.Errorf("access denied: path outside project root")
}
// 黑名单检查
for _, pattern := range t.blockedPatterns {
if matched, _ := filepath.Match(pattern, filepath.Base(params.Path)); matched {
return "", fmt.Errorf("access denied: %s matches blocked pattern %s", params.Path, pattern)
}
}
// 白名单检查(如 Ray 只能写 *_test.go)
if len(t.writePatterns) > 0 {
allowed := false
for _, pattern := range t.writePatterns {
if matched, _ := filepath.Match(pattern, filepath.Base(params.Path)); matched {
allowed = true
break
}
}
if !allowed {
return "", fmt.Errorf("access denied: %s not in write whitelist", params.Path)
}
}
if err := os.MkdirAll(filepath.Dir(absPath), 0755); err != nil {
return "", err
}
if err := os.WriteFile(absPath, []byte(params.Content), 0644); err != nil {
return "", err
}
return fmt.Sprintf("written %d bytes to %s", len(params.Content), params.Path), nil
}
// ========== run_shell (白名单) ==========
type RunShell struct {
allowedCommands []string
}
func (t *RunShell) Execute(ctx context.Context, input json.RawMessage) (string, error) {
var params struct{ Command string `json:"command"` }
json.Unmarshal(input, ¶ms)
// 白名单检查
allowed := false
for _, prefix := range t.allowedCommands {
if strings.HasPrefix(params.Command, prefix) {
allowed = true
break
}
}
if !allowed {
return "", fmt.Errorf("command not allowed: %s", params.Command)
}
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, "sh", "-c", params.Command)
output, err := cmd.CombinedOutput()
if err != nil {
return string(output), fmt.Errorf("command failed: %w\noutput: %s", err, output)
}
return string(output), nil
}
实现 MCP (Model Context Protocol) 客户端,通过 stdio 与 MCP Server 通信。
package mcp
// JSON-RPC 2.0 协议类型
type Request struct {
JSONRPC string `json:"jsonrpc"`
ID int `json:"id"`
Method string `json:"method"`
Params any `json:"params,omitempty"`
}
type Response struct {
JSONRPC string `json:"jsonrpc"`
ID int `json:"id"`
Result json.RawMessage `json:"result,omitempty"`
Error *RPCError `json:"error,omitempty"`
}
type RPCError struct {
Code int `json:"code"`
Message string `json:"message"`
}
// MCP Client
type Client struct {
name string
proc *exec.Cmd
stdin io.WriteCloser
stdout *bufio.Scanner
nextID int
mu sync.Mutex
tools []tool.Tool
}
func NewClient(cfg config.MCPServerConfig) *Client {
return &Client{name: cfg.Name}
}
// Connect 启动 MCP Server 进程并完成握手
func (c *Client) Connect(ctx context.Context, cfg config.MCPServerConfig) error {
// 1. 启动进程
c.proc = exec.CommandContext(ctx, cfg.Command, cfg.Args...)
for k, v := range cfg.Env {
c.proc.Env = append(os.Environ(), k+"="+v)
}
var err error
c.stdin, err = c.proc.StdinPipe()
if err != nil {
return err
}
stdout, err := c.proc.StdoutPipe()
if err != nil {
return err
}
c.stdout = bufio.NewScanner(stdout)
if err := c.proc.Start(); err != nil {
return fmt.Errorf("start MCP server %s: %w", c.name, err)
}
// 2. Initialize handshake
resp, err := c.call("initialize", map[string]any{
"protocolVersion": "2024-11-05",
"capabilities": map[string]any{},
"clientInfo": map[string]any{
"name": "max-team",
"version": "0.1.0",
},
})
if err != nil {
return fmt.Errorf("MCP initialize failed: %w", err)
}
// 3. 发送 initialized 通知
c.notify("notifications/initialized", nil)
// 4. 获取工具列表
return c.refreshTools()
}
// refreshTools 调用 tools/list 获取可用工具
func (c *Client) refreshTools() error {
resp, err := c.call("tools/list", nil)
if err != nil {
return err
}
var result struct {
Tools []struct {
Name string `json:"name"`
Description string `json:"description"`
InputSchema json.RawMessage `json:"inputSchema"`
} `json:"tools"`
}
json.Unmarshal(resp, &result)
c.tools = nil
for _, t := range result.Tools {
c.tools = append(c.tools, &MCPTool{
client: c,
name: t.Name,
description: t.Description,
schema: t.InputSchema,
})
}
return nil
}
// CallTool 调用 MCP 工具
func (c *Client) CallTool(name string, args json.RawMessage) (string, error) {
resp, err := c.call("tools/call", map[string]any{
"name": name,
"arguments": json.RawMessage(args),
})
if err != nil {
return "", err
}
var result struct {
Content []struct {
Type string `json:"type"`
Text string `json:"text"`
} `json:"content"`
}
json.Unmarshal(resp, &result)
if len(result.Content) > 0 {
return result.Content[0].Text, nil
}
return "", nil
}
// Tools 返回发现的工具(实现 tool.Tool 接口)
func (c *Client) Tools() []tool.Tool { return c.tools }
// call JSON-RPC 调用
func (c *Client) call(method string, params any) (json.RawMessage, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.nextID++
req := Request{
JSONRPC: "2.0",
ID: c.nextID,
Method: method,
Params: params,
}
data, _ := json.Marshal(req)
data = append(data, '\n')
if _, err := c.stdin.Write(data); err != nil {
return nil, err
}
// 读取响应
if !c.stdout.Scan() {
return nil, fmt.Errorf("MCP server closed connection")
}
var resp Response
if err := json.Unmarshal(c.stdout.Bytes(), &resp); err != nil {
return nil, err
}
if resp.Error != nil {
return nil, fmt.Errorf("MCP error %d: %s", resp.Error.Code, resp.Error.Message)
}
return resp.Result, nil
}
// Close 关闭 MCP Server
func (c *Client) Close() error {
c.stdin.Close()
return c.proc.Wait()
}
// MCPTool 将 MCP 发现的工具包装为 tool.Tool 接口
type MCPTool struct {
client *Client
name string
description string
schema json.RawMessage
}
func (t *MCPTool) Name() string { return t.name }
func (t *MCPTool) Description() string { return t.description }
func (t *MCPTool) Parameters() json.RawMessage { return t.schema }
func (t *MCPTool) Execute(ctx context.Context, input json.RawMessage) (string, error) {
return t.client.CallTool(t.name, input)
}
关键决策点暂停执行,等待用户确认。
package gate
type ApprovalGate struct {
bus *bus.MessageBus
}
func New(b *bus.MessageBus) *ApprovalGate {
return &ApprovalGate{bus: b}
}
// RequestApproval 向用户展示计划,等待确认
func (g *ApprovalGate) RequestApproval(tasks []*task.Task) (bool, error) {
// 构建方案摘要
summary := g.buildPlanSummary(tasks)
// 通过 Max → User 发送审批请求
reply, err := g.bus.SendAndWait(bus.Message{
From: "scheduler",
To: "user",
Type: bus.MsgApprovalReq,
Payload: summary,
}, 10*time.Minute)
if err != nil {
return false, err
}
resp := reply.Payload.(*ApprovalResponse)
return resp.Approved, nil
}
type ApprovalResponse struct {
Approved bool
Feedback string // 用户修改意见(如有)
}
func (g *ApprovalGate) buildPlanSummary(tasks []*task.Task) string {
var sb strings.Builder
sb.WriteString("=== 执行计划 ===\n\n")
for i, t := range tasks {
deps := "无"
if len(t.DependsOn) > 0 {
deps = strings.Join(t.DependsOn, ", ")
}
fmt.Fprintf(&sb, "%d. [%s] %s\n 负责人: %s\n 依赖: %s\n\n",
i+1, t.ID, t.Title, t.AgentName, deps)
}
sb.WriteString("确认执行?(y/n): ")
return sb.String()
}
结构化审计日志,记录所有关键操作。
package audit
type EventType string
const (
EventLLMCall EventType = "llm_call"
EventToolExec EventType = "tool_exec"
EventMsgSent EventType = "msg_sent"
EventTaskUpdate EventType = "task_update"
EventApproval EventType = "approval"
EventError EventType = "error"
)
type Event struct {
Timestamp time.Time `json:"ts"`
Type EventType `json:"type"`
Agent string `json:"agent"`
Data map[string]any `json:"data"`
}
type Logger struct {
writer io.Writer
mu sync.Mutex
level string
}
func New(cfg config.AuditConfig) *Logger {
if !cfg.Enabled {
return &Logger{writer: io.Discard}
}
f, _ := os.OpenFile(cfg.Output, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
return &Logger{writer: f, level: cfg.Level}
}
func (l *Logger) Record(evt Event) {
l.mu.Lock()
defer l.mu.Unlock()
data, _ := json.Marshal(evt)
l.writer.Write(append(data, '\n'))
}
// 便捷方法
func (l *Logger) LLMCall(agent, model string, inputTokens, outputTokens int) {
l.Record(Event{
Timestamp: time.Now(),
Type: EventLLMCall,
Agent: agent,
Data: map[string]any{
"model": model,
"input_tokens": inputTokens,
"output_tokens": outputTokens,
},
})
}
func (l *Logger) ToolExec(agent, toolName string, duration time.Duration, err error) {
data := map[string]any{
"tool": toolName,
"duration": duration.String(),
"success": err == nil,
}
if err != nil {
data["error"] = err.Error()
}
l.Record(Event{
Timestamp: time.Now(),
Type: EventToolExec,
Agent: agent,
Data: data,
})
}
所有 Agent 共享的基础实现。
package agent
type BaseAgent struct {
name string
config config.AgentConfig
router *llm.Router
tools []tool.Tool
mcpClients []*mcp.Client
bus *bus.MessageBus
inbox <-chan bus.Message
audit *audit.Logger
status AgentStatus
mu sync.RWMutex
}
type AgentStatus int
const (
StatusCreated AgentStatus = iota
StatusStarting
StatusReady
StatusBusy
StatusStopped
)
func NewBase(cfg config.AgentConfig, router *llm.Router,
registry *tool.Registry, b *bus.MessageBus, auditLog *audit.Logger) *BaseAgent {
return &BaseAgent{
name: cfg.Name,
config: cfg,
router: router,
tools: registry.ForAgent(cfg.Tools),
bus: b,
inbox: b.Subscribe(cfg.Name),
audit: auditLog,
status: StatusCreated,
}
}
// Start 启动 Agent:连接 MCP → 进入消息循环
func (a *BaseAgent) Start(ctx context.Context) error {
a.setStatus(StatusStarting)
// 连接 MCP Servers
for _, mcpCfg := range a.config.MCPServers {
client := mcp.NewClient(mcpCfg)
if err := client.Connect(ctx, mcpCfg); err != nil {
return fmt.Errorf("MCP connect %s: %w", mcpCfg.Name, err)
}
a.mcpClients = append(a.mcpClients, client)
// 合并 MCP 工具
a.tools = append(a.tools, client.Tools()...)
}
a.setStatus(StatusReady)
return nil
}
// RunLLM 执行一次完整的 LLM tool-use 循环
func (a *BaseAgent) RunLLM(ctx context.Context, userMessage string) (string, error) {
a.setStatus(StatusBusy)
defer a.setStatus(StatusReady)
messages := []anthropic.MessageParam{
anthropic.NewUserMessage(anthropic.NewTextBlock(userMessage)),
}
resp, err := a.router.RunToolLoop(ctx, llm.ChatRequest{
Model: a.config.Model,
System: a.config.SystemPrompt,
Messages: messages,
Tools: tool.ToAnthropicTools(a.tools),
MaxTokens: a.config.Constraints.MaxTokens,
}, func(tc llm.ToolCall) (string, error) {
// 查找并执行工具
for _, t := range a.tools {
if t.Name() == tc.Name {
start := time.Now()
result, err := t.Execute(ctx, tc.Input)
a.audit.ToolExec(a.name, tc.Name, time.Since(start), err)
return result, err
}
}
return "", fmt.Errorf("unknown tool: %s", tc.Name)
})
if err != nil {
return "", err
}
return resp.Content, nil
}
// Stop 清理资源
func (a *BaseAgent) Stop() error {
a.setStatus(StatusStopped)
for _, c := range a.mcpClients {
c.Close()
}
return nil
}
package agent
type Max struct {
*BaseAgent
scheduler *scheduler.Scheduler
}
func NewMax(cfg config.AgentConfig, router *llm.Router,
registry *tool.Registry, b *bus.MessageBus,
sched *scheduler.Scheduler, auditLog *audit.Logger) *Max {
return &Max{
BaseAgent: NewBase(cfg, router, registry, b, auditLog),
scheduler: sched,
}
}
// Run Max 的主循环
func (m *Max) Run(ctx context.Context) error {
if err := m.Start(ctx); err != nil {
return err
}
defer m.Stop()
for {
select {
case <-ctx.Done():
return nil
case msg := <-m.inbox:
switch msg.Type {
case bus.MsgUserInput:
m.handleUserInput(ctx, msg)
case bus.MsgTaskResult:
m.handleTaskResult(ctx, msg)
case bus.MsgTaskFailed:
m.handleTaskFailed(ctx, msg)
case bus.MsgNeedClarify:
m.handleClarification(ctx, msg)
case bus.MsgAllTasksDone:
m.handleAllDone(ctx, msg)
}
}
}
}
func (m *Max) handleUserInput(ctx context.Context, msg bus.Message) {
userInput := msg.Payload.(string)
// 1. 调用 LLM 理解需求并生成任务计划
// Max 的 system prompt 指导他输出结构化的任务列表
prompt := fmt.Sprintf(`用户需求:%s
请分析这个需求,拆解为具体的执行任务。
每个任务指定负责人(leo/ada/ray/sam)和依赖关系。
输出 JSON 格式的任务列表。`, userInput)
result, err := m.RunLLM(ctx, prompt)
if err != nil {
m.replyToUser("抱歉,我遇到了一个错误:" + err.Error())
return
}
// 2. 解析 LLM 输出为任务列表
tasks, err := m.parsePlan(result)
if err != nil {
// 非结构化输出,直接回复用户
m.replyToUser(result)
return
}
// 3. 提交给 Scheduler(会触发 Approval Gate)
if err := m.scheduler.SubmitPlan(tasks); err != nil {
m.replyToUser("计划未通过:" + err.Error())
return
}
m.replyToUser("计划已确认,开始执行...")
}
func (m *Max) handleAllDone(ctx context.Context, msg bus.Message) {
// 汇总所有任务结果
summary := m.scheduler.GetSummary()
// 调用 LLM 生成人类可读的汇总
prompt := fmt.Sprintf("以下是所有任务的执行结果,请汇总为简洁的报告:\n%s", summary)
result, _ := m.RunLLM(ctx, prompt)
m.replyToUser(result)
}
func (m *Max) replyToUser(content string) {
m.bus.Send(bus.Message{
From: "max",
To: "user",
Type: bus.MsgUserReply,
Payload: content,
})
}
package agent
type Leo struct {
*BaseAgent
}
func NewLeo(cfg config.AgentConfig, router *llm.Router,
registry *tool.Registry, b *bus.MessageBus, auditLog *audit.Logger) *Leo {
return &Leo{BaseAgent: NewBase(cfg, router, registry, b, auditLog)}
}
func (l *Leo) Run(ctx context.Context) error {
if err := l.Start(ctx); err != nil {
return err
}
defer l.Stop()
for {
select {
case <-ctx.Done():
return nil
case msg := <-l.inbox:
if msg.Type == bus.MsgTaskAssign {
l.handleTask(ctx, msg)
}
}
}
}
func (l *Leo) handleTask(ctx context.Context, msg bus.Message) {
t := msg.Payload.(*task.Task)
// 构建 prompt,包含任务描述 + 上游 Artifact
prompt := l.buildPrompt(t)
// 通知 Max 开始执行
l.bus.Send(bus.Message{
From: "leo", To: "scheduler",
Type: bus.MsgProgress,
Payload: fmt.Sprintf("开始执行: %s", t.Title),
})
// 执行 LLM tool-use 循环(读代码、写代码、跑测试…)
result, err := l.RunLLM(ctx, prompt)
if err != nil {
l.bus.Send(bus.Message{
From: "leo", To: "scheduler",
Type: bus.MsgTaskFailed,
Payload: t.ID,
})
return
}
// 生成 git diff 作为产出物
diffOutput, _ := exec.CommandContext(ctx, "git", "diff").Output()
l.bus.Send(bus.Message{
From: "leo", To: "scheduler",
Type: bus.MsgTaskResult,
Payload: &task.Result{
TaskID: t.ID,
Success: true,
Output: result,
Artifacts: []task.Artifact{
{Name: "code_diff", Type: "text", Content: string(diffOutput)},
},
},
})
}
func (l *Leo) buildPrompt(t *task.Task) string {
var sb strings.Builder
sb.WriteString("## 任务\n" + t.Description + "\n\n")
if len(t.Artifacts) > 0 {
sb.WriteString("## 上游参考\n")
for _, a := range t.Artifacts {
fmt.Fprintf(&sb, "### %s\n%s\n\n", a.Name, a.Content)
}
}
sb.WriteString("请完成上述任务。使用工具读取、修改代码。完成后说明你做了什么。")
return sb.String()
}
结构与 Leo 类似,区别在于:
design_doc(文本形式的设计文档)区别:
*_test.go 文件run_test 封装 go test -v -gcflags="all=-N -l"coverage_report 封装 go test -coverprofiletest_report(测试结果 + 覆盖率)区别:
query_log(对接日志系统 API)和 query_metrics(对接监控 API)ops_report(日志分析 + 指标摘要)| 错误类型 | 处理策略 | 最大重试 |
|---|---|---|
| LLM API 调用失败(网络/限流) | 指数退避重试:1s → 2s → 4s | 3 次 |
| 工具执行失败(命令报错) | 错误信息回传 LLM,让 Agent 自行修正 | LLM 循环内处理 |
| MCP Server 连接断开 | 自动重连,重连失败则降级(移除 MCP 工具) | 3 次 |
| 任务执行超时 | 取消当前 LLM 调用,标记 Failed,通知 Max | — |
| 任务执行失败(Agent 报错) | Scheduler 自动重试,超过上限后上报 Max 决策 | 2 次 |
| Max 决策失败的任务 | 可选:跳过 / 手动修复 / 调整方案重试 | 上报用户 |
func (r *Router) chatWithRetry(ctx context.Context, req ChatRequest) (*ChatResponse, error) {
var lastErr error
for i := 0; i < r.config.MaxRetries; i++ {
resp, err := r.Chat(ctx, req)
if err == nil {
return resp, nil
}
lastErr = err
// 判断是否可重试
if !isRetryable(err) {
return nil, err
}
// 指数退避
backoff := time.Duration(1<<uint(i)) * time.Second
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(backoff):
}
}
return nil, fmt.Errorf("max retries exceeded: %w", lastErr)
}
func isRetryable(err error) bool {
// 429 (rate limit), 500, 502, 503 可重试
// 400, 401, 403 不可重试
var apiErr *anthropic.APIError
if errors.As(err, &apiErr) {
switch apiErr.StatusCode {
case 429, 500, 502, 503:
return true
}
}
return false
}
| 依赖 | 用途 | 版本 |
|---|---|---|
github.com/anthropics/anthropic-sdk-go | Claude API 客户端 | latest |
github.com/google/uuid | 消息/任务 ID 生成 | v1.6+ |
gopkg.in/yaml.v3 | YAML 配置解析 | v3 |
github.com/rs/zerolog | 结构化日志 | v1.33+ |
# 构建 go build -o max-team ./cmd/max-team # 运行 export ANTHROPIC_API_KEY=sk-ant-... ./max-team --config config.yaml --agents config/agents.yaml # 开发模式 go run ./cmd/max-team --config config.yaml --agents config/agents.yaml
package main
import (
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"github.com/horizon6666/max-team/internal/config"
"github.com/horizon6666/max-team/internal/runtime"
)
func main() {
configPath := flag.String("config", "config.yaml", "global config path")
agentsPath := flag.String("agents", "config/agents.yaml", "agents config path")
flag.Parse()
// 加载配置
cfg, agentsCfg, err := config.Load(*configPath, *agentsPath)
if err != nil {
fmt.Fprintf(os.Stderr, "config error: %v\n", err)
os.Exit(1)
}
// 创建 Runtime
rt, err := runtime.New(cfg, agentsCfg)
if err != nil {
fmt.Fprintf(os.Stderr, "runtime error: %v\n", err)
os.Exit(1)
}
// 优雅关闭
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
fmt.Println("\nShutting down...")
rt.Stop()
}()
// 运行
if err := rt.Run(); err != nil {
fmt.Fprintf(os.Stderr, "runtime error: %v\n", err)
os.Exit(1)
}
}