11.1 事件总线(Bus)

模型: claude-opus-4-6 (anthropic/claude-opus-4-6) 生成日期: 2026-02-17


在一个由多个模块组成的复杂系统中,模块之间的通信方式决定了系统的耦合度和可维护性。如果 Session 模块每次创建会话后都要直接调用 TUI 模块刷新界面、调用 Server 模块推送 SSE 事件、调用统计模块记录数据,那么 Session 就会依赖所有下游消费者,形成紧密耦合的"蜘蛛网"。

OpenCode 通过**事件总线(Event Bus)**解决这个问题——Session 模块只需发布一个 session.created 事件,所有关心这件事的模块自己订阅并处理。发布者不需要知道有哪些订阅者,订阅者也不需要知道事件从何而来。

衍生解释:发布-订阅模式(Pub/Sub Pattern)

发布-订阅模式(Publish-Subscribe Pattern,简称 Pub/Sub)是一种消息通信模式。在这种模式中:

  • 发布者(Publisher) 将消息发送到一个中间层(消息代理),而不是直接发送给接收者。

  • 订阅者(Subscriber) 向中间层注册自己感兴趣的消息类型。

  • 当发布者发布消息时,中间层负责将消息转发给所有匹配的订阅者。

与直接调用相比,Pub/Sub 的核心优势是解耦——发布者和订阅者互不知晓对方的存在。这使得:

  1. 新增消费者不需要修改发布者代码。

  2. 移除消费者不会影响其他部分。

  3. 同一事件可以有零个、一个或多个消费者。

Pub/Sub 在 GUI 框架(如 DOM 事件)、消息队列(如 Kafka、RabbitMQ)和微服务架构中都被广泛使用。OpenCode 的事件总线是一个进程内的轻量级 Pub/Sub 实现。

11.1.1 BusEvent.define()——类型安全的事件定义

事件总线的基础是事件的定义。OpenCode 使用 BusEvent.define() 工厂函数来创建类型安全的事件定义:

// bus/bus-event.ts
export namespace BusEvent {
  export type Definition = ReturnType<typeof define>

  // 全局事件注册表
  const registry = new Map<string, Definition>()

  export function define<Type extends string, Properties extends ZodType>(
    type: Type,
    properties: Properties
  ) {
    const result = {
      type,        // 事件类型标识符,如 "session.created"
      properties,  // Zod Schema,定义事件携带的数据结构
    }
    registry.set(type, result)
    return result
  }
}

这个设计有几个值得注意的特点:

1. 类型推断define() 使用泛型参数 <Type extends string, Properties extends ZodType>,TypeScript 编译器可以从调用处推断出具体的类型字面量。例如 define("session.created", ...) 的返回值类型中,type 不是 string,而是字面量类型 "session.created"。这为后续的订阅回调提供了精确的类型提示。

2. 全局注册表:每个通过 define() 创建的事件都会被自动注册到全局 registry Map 中。这个注册表支撑了一个重要的功能——payloads() 方法:

payloads() 将所有已注册的事件组合成一个 Zod 可辨识联合类型(Discriminated Union)。这意味着通过检查 type 字段的值,就可以自动推断出 properties 的具体类型。这个联合类型被用于 OpenAPI Schema 的自动生成——Server 的 SSE 推送端点可以据此生成精确的 API 文档。

3. 事件命名规范:所有事件类型都遵循 模块名.动作 的点分命名约定,例如:

事件类型
含义

session.created

会话被创建

message.updated

消息被更新

mcp.tools.changed

MCP 工具列表变化

file.watcher.updated

文件变更被检测到

permission.asked

权限询问被发起

11.1.2 Bus.publish()——事件发布

Bus.publish() 是事件发布的核心方法。它的实现揭示了 OpenCode 事件总线的双层广播机制:

发布一个事件时,会发生两件事:

  1. 本地广播:在当前 Instance 的订阅映射中查找匹配的订阅者,包括精确匹配(def.type)和通配符匹配("*")。所有匹配的回调函数被并发调用,返回值通过 Promise.all() 聚合。

  2. 全局广播:通过 GlobalBus.emit() 将事件转发到全局事件通道。GlobalBus 是一个简单的 Node.js EventEmitter,负责跨 Instance 的事件传播:

这种双层设计的目的是支持 多 Instance 并发 场景。当多个 Worktree 实例同时运行时,每个 Instance 内部的订阅者只接收本 Instance 的事件;而 GlobalBus 的监听者(如 Server 的 SSE 推送端点)则可以收到所有 Instance 的事件,并根据 directory 字段进行路由。

11.1.3 Bus.subscribe() / Bus.once()——事件订阅

订阅事件的 API 简洁而类型安全:

subscribe() 返回一个取消订阅函数,调用方可以在适当的时候取消订阅:

once() 是对 subscribe() 的封装——当回调返回 "done" 字符串时,自动取消订阅。这在"等待某个特定事件发生"的场景中非常有用。

11.1.4 通配符订阅("*")

subscribeAll() 函数允许订阅所有事件:

通配符订阅的典型用途是事件转发。例如,Server 的 SSE 端点需要将所有事件推送给客户端,不关心具体事件类型,只需要将整个事件 payload 序列化为 SSE 消息即可。

底层的 raw() 函数统一处理精确订阅和通配符订阅:

订阅映射使用 Map<string, Subscription[]> 存储,key 是事件类型字符串(或 "*"),value 是回调函数数组。取消订阅时通过 indexOf 找到并移除对应的回调。

11.1.5 GlobalBus:跨 Instance 的全局事件通道

GlobalBus 的实现极其简洁——仅仅是一个带类型参数的 EventEmitter

它只有一个事件类型 "event",每个事件携带 directory(发源 Instance 的工作目录)和 payload(原始事件数据)。

GlobalBus 的主要消费者包括:

  1. Server SSE 端点:将事件推送到连接的 Web 客户端。

  2. TUI 渲染层:监听全局事件以更新终端界面。

  3. 跨 Instance 协调:例如当一个 Instance 被销毁时,其他组件需要收到通知。

Bus(Instance 级别)不同,GlobalBus 是一个单例对象,不绑定任何 Instance 上下文。所有 Instance 的事件都汇聚到这里,消费者通过检查 directory 字段来过滤自己关心的事件。

11.1.6 实例销毁时的事件清理

事件总线的订阅映射使用 Instance.state() 管理,这意味着当 Instance 被销毁时,所有该 Instance 的订阅会被自动清理。同时,清理函数还会向通配符订阅者发送一个 InstanceDisposed 事件:

InstanceDisposed 事件本身也是通过 BusEvent.define() 定义的:

这种"自动清理 + 最终通知"的模式确保了:

  1. Instance 销毁后不会有残留的订阅者继续接收事件(防止内存泄漏)。

  2. 关心 Instance 生命周期的组件可以在销毁时执行清理逻辑。

11.1.7 小结

OpenCode 的事件总线是一个简洁但完备的进程内 Pub/Sub 实现,其设计可以总结为以下几个要点:

特性
实现方式

类型安全

BusEvent.define() + Zod Schema + TypeScript 泛型

事件注册

全局 registry Map,支持 OpenAPI 自动生成

双层广播

Instance 级 Bus + 全局 GlobalBus

通配符订阅

"*" key 匹配所有事件

取消订阅

返回 unsubscribe 函数

生命周期管理

Instance.state() 自动清理

并发安全

每个 Instance 独立的订阅映射

整个事件总线的实现不到 150 行代码(bus-event.ts 44 行 + index.ts 106 行 + global.ts 11 行),但有效地解耦了系统中 20+ 个模块之间的通信。

Last updated