构建一套基于 Go 的多 Agent 协作框架,模拟"私人工作团队"的运作方式:
| 模块 | 职责 | 关键接口 |
|---|---|---|
| Agent Runtime | Agent 生命周期管理:创建、启动、停止、心跳检测、超时处理 | AgentRuntime.Spawn() .Stop() |
| Scheduler | 解析任务 DAG,按拓扑序调度,并行派发无依赖任务 | Scheduler.Submit(dag) .OnTaskDone() |
| Task Manager | 任务 CRUD、状态流转、依赖追踪 | TaskManager.Create() .UpdateStatus() |
| Message Bus | Agent 间消息传递,基于 Go channel,星型拓扑 | Bus.Send(msg) Bus.Subscribe(agentID) |
| LLM Router | 模型调用路由,支持不同 Agent 使用不同模型/参数 | Router.Chat(model, messages, tools) |
| Tool Registry | 工具注册、发现、按 Agent 角色过滤 | Registry.Register(tool) .ForRole(role) |
| Skill Engine | Skill 加载、Agent 可用性校验、prompt 注入、I/O 契约验证 | SkillEngine.Execute() SkillRegistry.ForAgent() |
| MCP Manager | MCP Server 进程管理、协议通信、工具动态发现 | MCPManager.Connect(server) .CallTool() |
| Approval Gate | 关键决策点暂停执行,等待用户确认 | Gate.RequestApproval(plan) .OnApproved() |
| Audit Log | 记录所有 LLM 调用、工具执行、消息流转 | AuditLog.Record(event) |
Claude Sonnet
工具:task_create / dispatch / ask_user
无文件系统操作权限
Claude Opus
工具:read_file / write_file / run_shell(受限) / git_*
MCP:gitnexus
Claude Opus
工具:read_file(只读) / search_code
MCP:gitnexus / cooper
无写文件权限
Claude Sonnet
工具:read_file / write_file(仅 *_test.go) / run_test
Claude Haiku
工具:query_log / query_metrics
MCP:grafana
无文件写权限
新 Agent 类型通过配置 AgentConfig 即可接入,无需修改框架代码。
潜在扩展:Reviewer(代码审查)、DocWriter(文档生成)、DBA(数据库操作)等
Max 将用户需求拆解为多个 Task 节点,通过 DependsOn 字段声明依赖关系,形成 DAG:
Task{ID: "T1", Agent: "Ada", Desc: "设计接口方案"}
Task{ID: "T2", Agent: "Leo", Desc: "实现代码", DependsOn: ["T1"]}
Task{ID: "T3", Agent: "Ray", Desc: "编写测试", DependsOn: ["T2"]}
Task{ID: "T4", Agent: "Leo", Desc: "修复测试失败", DependsOn: ["T3"]} // 条件触发
Done 后自动触发| 类型 | 方向 | 说明 |
|---|---|---|
TaskAssign | Max → Worker | 分配任务,含任务描述 + 上游 Artifact |
TaskResult | Worker → Max | 任务完成,返回产出物 |
TaskFailed | Worker → Max | 任务失败,附错误信息 |
NeedClarify | Worker → Max | 需要更多信息,由 Max 决定是否上报用户 |
Progress | Worker → Max | 进度更新(可选) |
ApprovalReq | Max → User | 方案/关键决策需用户确认 |
ApprovalResp | User → Max | 用户确认/拒绝/修改 |
// 中心消息总线
type MessageBus struct {
inbox chan Message // 所有消息汇聚于此
subs map[string]chan Message // agentID → 该 agent 的收件箱
}
// 发送消息
func (b *MessageBus) Send(msg Message) {
b.subs[msg.To] <- msg
}
// Agent 监听自己的收件箱
func (a *Agent) listen() {
for msg := range a.inbox {
a.handleMessage(msg)
}
}
type Tool interface {
Name() string
Description() string
Parameters() json.RawMessage // JSON Schema
Execute(ctx context.Context, params json.RawMessage) (ToolResult, error)
}
type ToolResult struct {
Content string
IsError bool
}
| 工具 | 说明 | 可用角色 |
|---|---|---|
read_file | 读取文件内容 | Leo / Ada / Ray |
write_file | 写入文件内容 | Leo / Ray(受限) |
list_dir | 列出目录 | All Workers |
run_shell | 执行 shell 命令(白名单控制) | Leo |
run_test | 执行 go test | Ray |
git_diff | 查看 git diff | Leo |
git_commit | 提交代码 | Leo |
search_code | 代码搜索(grep/ast) | Ada / Leo |
query_log | 查询日志系统 | Sam |
query_metrics | 查询监控指标 | Sam |
coverage_report | 生成覆盖率报告 | Ray |
Skill 是 Tool 之上的可复用工作流模板,由 Prompt 模板 + 工具子集 + I/O 契约 组成。Agent 激活 skill 后,按专用 prompt 和受限工具集执行,产出结构化结果。
type Skill struct {
Name string // "code_review" / "bug_triage"
Description string
Prompt string // 针对该 skill 的专用 prompt 模板
RequiredTools []string // 需要的工具列表
InputSchema json.RawMessage // 输入参数 JSON Schema
OutputSchema json.RawMessage // 输出结构 JSON Schema
Timeout time.Duration
}
type SkillRegistry struct {
skills map[string]*Skill
}
func (r *SkillRegistry) Load(path string) error // 从 YAML 加载
func (r *SkillRegistry) ForAgent(agent *Agent) []*Skill // 返回 agent 可用的 skill
func (r *SkillRegistry) Get(name string) *Skill // 按名称查找
| 内置 Skill | 说明 | 所需工具 | 可用 Agent |
|---|---|---|---|
code_review | 代码审查,输出问题列表和修复建议 | read_file, git_diff, search_code | Ada / Leo |
bug_triage | Bug 分诊,定位根因并给出修复方案 | read_file, search_code, query_log | Ada(+ Sam 配合) |
test_coverage_analysis | 测试覆盖率分析,识别未覆盖路径 | run_test, coverage_report, read_file | Ray |
impact_analysis | 变更影响分析,评估改动波及范围 | read_file, search_code | Ada |
每个 Agent 可独立连接自己的 MCP Server,通过标准 MCP 协议(JSON-RPC over stdio)通信:
type MCPServer struct {
Name string
Command string // e.g. "npx @anthropic/gitnexus-mcp"
Args []string
Env map[string]string
}
type MCPClient struct {
proc *exec.Cmd
stdin io.WriteCloser
stdout *bufio.Reader
tools []Tool // 通过 tools/list 动态发现
}
// Agent 初始化时连接 MCP
func (a *Agent) initMCP() {
for _, srv := range a.config.MCPServers {
client := NewMCPClient(srv)
client.Connect()
// MCP 发现的工具自动合并到 agent 工具集
a.tools = append(a.tools, client.Tools()...)
}
}
run_shell 工具内置命令白名单:
AllowedCommands: [ "go build", "go test", "go vet", "git diff", "git status", "git log", "grep", "find", "wc" ]
write_file 拒绝写入 .env、credentials 等敏感文件。
type Agent interface {
// 基础信息
ID() string
Role() AgentRole
// 生命周期
Start(ctx context.Context) error
Stop() error
// 任务处理
HandleTask(task *Task) (*TaskResult, error)
// 消息处理
HandleMessage(msg Message) error
}
type AgentRole string
const (
RoleMax AgentRole = "max" // Team Lead
RoleLeo AgentRole = "leo" // Coder
RoleAda AgentRole = "ada" // Architect
RoleRay AgentRole = "ray" // Tester
RoleSam AgentRole = "sam" // Ops
)
type Task struct {
ID string
Type AgentRole // 应该由哪种角色处理
Title string
Description string
DependsOn []string // 前置任务 ID
Artifacts []Artifact // 上游产出物
Status TaskStatus
AssignedTo string // Agent ID
Result *TaskResult
CreatedAt time.Time
UpdatedAt time.Time
}
type TaskStatus int
const (
TaskCreated TaskStatus = iota
TaskPlanned // 已纳入 DAG
TaskApproved // 用户确认通过
TaskDispatched // 已分配给 Agent
TaskRunning // Agent 执行中
TaskDone // 执行完成
TaskFailed // 执行失败
)
type Artifact struct {
Name string // "design_doc" / "code_diff" / "test_report"
Type string // "text" / "file" / "json"
Content string
}
type AgentConfig struct {
Role AgentRole
SystemPrompt string
Model string // "claude-opus-4-7" / "claude-sonnet-4-6" / "claude-haiku-4-5"
Tools []Tool
MCPServers []MCPServerConfig
Constraints AgentConstraints
}
type AgentConstraints struct {
AllowedDirs []string // 可访问的目录
AllowedCommands []string // shell 白名单
MaxTokens int // 单次调用最大 token
Timeout time.Duration // 任务超时时间
}
type SkillEngine struct {
registry *SkillRegistry
}
// 激活 skill:注入 prompt、限制工具集、执行、校验输出
func (e *SkillEngine) Execute(ctx context.Context, agent *Agent, skillName string, input json.RawMessage) (*SkillResult, error)
type SkillResult struct {
SkillName string
Output json.RawMessage // 符合 OutputSchema 的结构化结果
ToolCalls []ToolCallRecord // 执行过程中的工具调用记录
Usage TokenUsage
}
type LLMRouter interface {
Chat(ctx context.Context, req ChatRequest) (*ChatResponse, error)
}
type ChatRequest struct {
Model string
Messages []Message
Tools []ToolDefinition
System string
}
type ChatResponse struct {
Content string
ToolCalls []ToolCall
Usage TokenUsage
}
type Scheduler struct {
tasks map[string]*Task
agents map[AgentRole]*Agent
bus *MessageBus
gate *ApprovalGate
}
func (s *Scheduler) Submit(dag *TaskDAG) error // 提交任务 DAG
func (s *Scheduler) OnTaskDone(taskID string) error // 任务完成回调,触发下游
func (s *Scheduler) GetStatus() *DAGStatus // 获取整体进度
| 组件 | 选型 | 理由 |
|---|---|---|
| 语言 | Go 1.22+ | 团队熟悉,并发性能好,goroutine 天然适合多 Agent |
| LLM | Claude API (Anthropic) | 统一后端,按角色选择 Opus/Sonnet/Haiku |
| LLM SDK | anthropic-go (官方) | 原生支持 tool use / streaming / system prompt |
| MCP 协议 | 自实现 JSON-RPC over stdio | MCP 协议简单,Go 无成熟库,自实现可控 |
| 消息通信 | Go channel | 单进程内足够,后续可换 NATS/Redis |
| 任务持久化 | SQLite (初期) → PostgreSQL | 初期单机,后续可扩展 |
| 配置管理 | YAML | Agent 角色、工具集、MCP 等通过配置文件定义 |
| 日志审计 | zerolog + 结构化日志 | 性能好,支持 JSON 输出 |
| 用户交互 | CLI (初期) → Web UI | 先实现核心功能,后续加 UI |