Hhermes架构解析
Hhermes架构解析
多Agent编排
hermes中,父Agent通过delegate_task工具来派生子Agent,支持单任务委派和批量并行委派两种方式:
-
delegate_task1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
def delegate_task( goal: Optional[str] = None, context: Optional[str] = None, toolsets: Optional[List[str]] = None, tasks: Optional[List[Dict[str, Any]]] = None, max_iterations: Optional[int] = None, acp_command: Optional[str] = None, acp_args: Optional[List[str]] = None, parent_agent=None, ) -> str: """ Spawn one or more child agents to handle delegated tasks. Supports two modes: - Single: provide goal (+ optional context, toolsets) - Batch: provide tasks array [{goal, context, toolsets}, ...] Returns JSON with results array, one entry per task. """
DELEGATE_TASK_SCHEMA = { "name": "delegate_task", "description": ( "Spawn one or more subagents to work on tasks in isolated contexts. " "Each subagent gets its own conversation, terminal session, and toolset. " "Only the final summary is returned -- intermediate tool results " "never enter your context window.\n\n" "TWO MODES (one of 'goal' or 'tasks' is required):\n" "1. Single task: provide 'goal' (+ optional context, toolsets)\n" "2. Batch (parallel): provide 'tasks' array with up to 3 items. " "All run concurrently and results are returned together.\n\n" "WHEN TO USE delegate_task:\n" "- Reasoning-heavy subtasks (debugging, code review, research synthesis)\n" "- Tasks that would flood your context with intermediate data\n" "- Parallel independent workstreams (research A and B simultaneously)\n\n" "WHEN NOT TO USE (use these instead):\n" "- Mechanical multi-step work with no reasoning needed -> use execute_code\n" "- Single tool call -> just call the tool directly\n" "- Tasks needing user interaction -> subagents cannot use clarify\n\n" "IMPORTANT:\n" "- Subagents have NO memory of your conversation. Pass all relevant " "info (file paths, error messages, constraints) via the 'context' field.\n" "- Subagents CANNOT call: delegate_task, clarify, memory, send_message, " "execute_code.\n" "- Each subagent gets its own terminal session (separate working directory and state).\n" "- Results are always returned as an array, one entry per task." ),
单任务委派
1
2
3
4
5
6
7
# Agent 内部调用
delegate_task(
goal="重构 auth 模块,把 JWT 逻辑抽到独立文件",
context="项目路径 /root/project,使用 FastAPI...",
toolsets=["terminal", "file", "web"],
max_iterations=50
)
多任务委派
1
2
3
4
5
6
7
delegate_task(
tasks=[
{"goal": "修复登录页CSS", "toolsets": ["terminal", "file"]},
{"goal": "更新API文档", "toolsets": ["terminal", "file", "web"]},
{"goal": "跑一遍测试套件", "toolsets": ["terminal"]}
]
)
当子Agent任务完成之后,返回一个执行摘要给父Agent
并发模型
-
当主Agent只派生一个子Agent的时候,其本质上仅仅是创建一个
AIAgent实例,也就是说,主Agent和子Agent是同一个进程,主Agent被子Agent所同步阻塞1 2 3 4 5 6 7 8
# 父 Agent 正在执行 delegate_task tool, 会被delegate_task tool阻塞 delegate_task(...) -> _build_child_agent(...) # 创建 AIAgent 对象 -> _run_single_child(...) -> child.run_conversation(user_message=goal) -> return result -> return json.dumps(...) # 父 Agent 收到 tool result,继续下一轮 LLM 调用
-
当主Agent派生多个子Agent的时候,会使用线程池来创建所有的子Agent, 多个子Agent线程并发执行,但是主Agent需要等待所有子Agent完成,最终汇总结果,依旧是同步阻塞
1 2 3 4 5 6 7 8 9 10 11 12
with ThreadPoolExecutor(max_workers=max_children) as executor: futures = { executor.submit(_run_single_child, ...): i for each child } while pending: done, pending = wait(pending, timeout=0.5, return_when=FIRST_COMPLETED) collect finished child results # 全部完成后 sort results return json.dumps({"results": results, ...})
隔离性设计
hermes中,做不到多Agent并发安全,实际上,hermes中抽象出了Environment的概念,每个Environment是环境变量, cwd等内容的集合体, 每个task_id对应一个Environment, 而每个Agent对应一个task_id
hermes中共包含6种Environment
LocalEnvironment # 本地bash
DockerEnvironment # docker bash
SSHEnvironment # ssh bash
ModalEnvironment
ManagedModalEnvironment
SingularityEnvironment
DaytonaEnvironment
比如这里的LocalEnvironment, 实际上就是会创建一个新的bash来执行terminal tool
心跳机制
由于主Agent在委派子Agent时是同步阻塞的,因此如果子Agent执行时间过长,父Agent无法得到任何反馈,无法判断子Agent是死锁了还是崩溃了,因此对于每个子Agent, 其被创建的时候,会同步在后台创建一个daemon后台进程,定期通过父Agent的回调函数来发送heartbeat信号
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def _run_single_child(child, parent, ...):
heartbeat_stop = threading.Event()
def _heartbeat():
"""定期通知父Agent'子Agent还在工作'"""
while not heartbeat_stop.is_set():
time.sleep(30) # 每30秒一次
if not heartbeat_stop.is_set():
# 通过父Agent 的进度回调发送心跳
if parent._tool_progress_callback:
parent._tool_progress_callback({
"type": "heartbeat",
"child": child_index,
"status": "still_running"
})
# 启动心跳线程
hb_thread = threading.Thread(target=_heartbeat, daemon=True)
hb_thread.start()
try:
result = child.run_conversation(...)
finally:
heartbeat_stop.set() # 停止心跳
hb_thread.join(timeout=5)
return result
本文由作者按照 CC BY 4.0 进行授权