文章

Hhermes架构解析

Hhermes架构解析

多Agent编排

hermes中,父Agent通过delegate_task工具来派生子Agent,支持单任务委派和批量并行委派两种方式:

  • delegate_task

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    
    def delegate_task(
        goal: Optional[str] = None,
        context: Optional[str] = None,
        toolsets: Optional[List[str]] = None,
        tasks: Optional[List[Dict[str, Any]]] = None,
        max_iterations: Optional[int] = None,
        acp_command: Optional[str] = None,
        acp_args: Optional[List[str]] = None,
        parent_agent=None,
    ) -> str:
        """
        Spawn one or more child agents to handle delegated tasks.
      
        Supports two modes:
          - Single: provide goal (+ optional context, toolsets)
          - Batch:  provide tasks array [{goal, context, toolsets}, ...]
      
        Returns JSON with results array, one entry per task.
        """
    
    DELEGATE_TASK_SCHEMA = {
        "name": "delegate_task",
        "description": (
            "Spawn one or more subagents to work on tasks in isolated contexts. "
            "Each subagent gets its own conversation, terminal session, and toolset. "
            "Only the final summary is returned -- intermediate tool results "
            "never enter your context window.\n\n"
            "TWO MODES (one of 'goal' or 'tasks' is required):\n"
            "1. Single task: provide 'goal' (+ optional context, toolsets)\n"
            "2. Batch (parallel): provide 'tasks' array with up to 3 items. "
            "All run concurrently and results are returned together.\n\n"
            "WHEN TO USE delegate_task:\n"
            "- Reasoning-heavy subtasks (debugging, code review, research synthesis)\n"
            "- Tasks that would flood your context with intermediate data\n"
            "- Parallel independent workstreams (research A and B simultaneously)\n\n"
            "WHEN NOT TO USE (use these instead):\n"
            "- Mechanical multi-step work with no reasoning needed -> use execute_code\n"
            "- Single tool call -> just call the tool directly\n"
            "- Tasks needing user interaction -> subagents cannot use clarify\n\n"
            "IMPORTANT:\n"
            "- Subagents have NO memory of your conversation. Pass all relevant "
            "info (file paths, error messages, constraints) via the 'context' field.\n"
            "- Subagents CANNOT call: delegate_task, clarify, memory, send_message, "
            "execute_code.\n"
            "- Each subagent gets its own terminal session (separate working directory and state).\n"
            "- Results are always returned as an array, one entry per task."
        ),
    

单任务委派

1
2
3
4
5
6
7
# Agent 内部调用
delegate_task(
    goal="重构 auth 模块,把 JWT 逻辑抽到独立文件",
    context="项目路径 /root/project,使用 FastAPI...",
    toolsets=["terminal", "file", "web"],
    max_iterations=50
)

多任务委派

1
2
3
4
5
6
7
delegate_task(
    tasks=[
        {"goal": "修复登录页CSS", "toolsets": ["terminal", "file"]},
        {"goal": "更新API文档", "toolsets": ["terminal", "file", "web"]},
        {"goal": "跑一遍测试套件", "toolsets": ["terminal"]}
    ]
)

当子Agent任务完成之后,返回一个执行摘要给父Agent

并发模型

  • 当主Agent只派生一个子Agent的时候,其本质上仅仅是创建一个AIAgent实例,也就是说,主Agent和子Agent是同一个进程,主Agent被子Agent所同步阻塞

    1
    2
    3
    4
    5
    6
    7
    8
    
      # 父 Agent 正在执行 delegate_task tool, 会被delegate_task tool阻塞
      delegate_task(...)
        -> _build_child_agent(...)   # 创建 AIAgent 对象
        -> _run_single_child(...)
             -> child.run_conversation(user_message=goal)
             -> return result
        -> return json.dumps(...)
      # 父 Agent 收到 tool result,继续下一轮 LLM 调用
    
  • 当主Agent派生多个子Agent的时候,会使用线程池来创建所有的子Agent, 多个子Agent线程并发执行,但是主Agent需要等待所有子Agent完成,最终汇总结果,依旧是同步阻塞

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    
     with ThreadPoolExecutor(max_workers=max_children) as executor:
          futures = {
              executor.submit(_run_single_child, ...): i
              for each child
          }
      
          while pending:
              done, pending = wait(pending, timeout=0.5, return_when=FIRST_COMPLETED)
              collect finished child results
      
      # 全部完成后 sort results
      return json.dumps({"results": results, ...})
    

隔离性设计

hermes中,做不到多Agent并发安全,实际上,hermes中抽象出了Environment的概念,每个Environment是环境变量, cwd等内容的集合体, 每个task_id对应一个Environment, 而每个Agent对应一个task_id

hermes中共包含6种Environment

  LocalEnvironment # 本地bash
  DockerEnvironment # docker bash
  SSHEnvironment   # ssh bash
  ModalEnvironment
  ManagedModalEnvironment
  SingularityEnvironment
  DaytonaEnvironment

比如这里的LocalEnvironment, 实际上就是会创建一个新的bash来执行terminal tool

心跳机制

由于主Agent在委派子Agent时是同步阻塞的,因此如果子Agent执行时间过长,父Agent无法得到任何反馈,无法判断子Agent是死锁了还是崩溃了,因此对于每个子Agent, 其被创建的时候,会同步在后台创建一个daemon后台进程,定期通过父Agent的回调函数来发送heartbeat信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def _run_single_child(child, parent, ...):
    heartbeat_stop = threading.Event()
    
    def _heartbeat():
        """定期通知父Agent'子Agent还在工作'"""
        while not heartbeat_stop.is_set():
            time.sleep(30)  # 每30秒一次
            if not heartbeat_stop.is_set():
                # 通过父Agent 的进度回调发送心跳
                if parent._tool_progress_callback:
                    parent._tool_progress_callback({
                        "type": "heartbeat",
                        "child": child_index,
                        "status": "still_running"
                    })
    
    # 启动心跳线程
    hb_thread = threading.Thread(target=_heartbeat, daemon=True)
    hb_thread.start()
    
    try:
        result = child.run_conversation(...)
    finally:
        heartbeat_stop.set()  # 停止心跳
        hb_thread.join(timeout=5)
    
    return result
本文由作者按照 CC BY 4.0 进行授权