event bus
在 Go 中实现一个 Event Bus(事件总线)是一种用于解耦应用组件的设计模式,它可以帮助我们管理和分发事件,使得事件的发送者与接收者之间不需要直接依赖关系,从而提高了代码的可维护性和扩展性。
以下是一个简单的事件总线实现,它支持:
- 注册事件监听器(订阅者)。
- 发布事件(发布者)。
- 执行事件处理(回调函数)。
步骤
- 定义事件总线(EventBus)结构体,存储事件与监听器的映射关系。
- 提供方法来注册事件监听器(
Subscribe),发布事件(Publish),和移除事件监听器(Unsubscribe)。 - 事件监听器可以是任何符合签名的函数,通常是接受事件数据并返回错误的函数。
实现代码
go
package main
import (
"fmt"
"sync"
)
// EventHandler 事件处理器类型,用于定义监听器函数签名
type EventHandler func(event interface{}) error
// EventBus 事件总线结构体,包含一个映射表,用于存储事件和监听器的关系
type EventBus struct {
// 用于存储每个事件的监听器队列
handlers map[string][]EventHandler
mu sync.RWMutex
}
// NewEventBus 创建并返回一个新的事件总线实例
func NewEventBus() *EventBus {
return &EventBus{
handlers: make(map[string][]EventHandler),
}
}
// Subscribe 注册事件监听器
func (eb *EventBus) Subscribe(event string, handler EventHandler) {
eb.mu.Lock()
defer eb.mu.Unlock()
// 将事件监听器添加到对应事件的处理队列中
eb.handlers[event] = append(eb.handlers[event], handler)
}
// Unsubscribe 移除事件监听器
func (eb *EventBus) Unsubscribe(event string, handler EventHandler) {
eb.mu.Lock()
defer eb.mu.Unlock()
// 获取该事件的所有监听器
handlers := eb.handlers[event]
for i, h := range handlers {
if fmt.Sprintf("%p", h) == fmt.Sprintf("%p", handler) {
// 移除匹配的监听器
eb.handlers[event] = append(handlers[:i], handlers[i+1:]...)
break
}
}
}
// Publish 发布事件,触发所有订阅该事件的监听器
func (eb *EventBus) Publish(event string, data interface{}) {
eb.mu.RLock()
defer eb.mu.RUnlock()
// 获取所有监听器
handlers, exists := eb.handlers[event]
if !exists {
return
}
// 执行所有监听器
for _, handler := range handlers {
if err := handler(data); err != nil {
fmt.Printf("Error handling event %s: %s\n", event, err)
}
}
}
func main() {
// 创建事件总线实例
eventBus := NewEventBus()
// 定义一个监听器
handler := func(event interface{}) error {
fmt.Printf("Received event: %v\n", event)
return nil
}
// 注册事件监听器
eventBus.Subscribe("user_created", handler)
eventBus.Subscribe("user_created", func(event interface{}) error {
fmt.Println("Another handler:", event)
return nil
})
// 发布事件
eventBus.Publish("user_created", "User ID: 123")
// 移除事件监听器
eventBus.Unsubscribe("user_created", handler)
// 再次发布事件,只有一个监听器会被触发
eventBus.Publish("user_created", "User ID: 456")
}实现解析
事件总线
EventBus:handlers是一个映射表,键是事件的名称(如"user_created"),值是该事件对应的监听器函数(EventHandler类型)。mu是一个sync.RWMutex,用于确保在并发环境下对handlers进行安全的读写操作。
方法:
Subscribe:订阅一个事件,并将处理函数(事件监听器)注册到该事件上。Unsubscribe:移除特定事件的监听器。通过比较函数指针来确定移除哪个监听器。Publish:发布一个事件,并将事件数据传递给所有已注册的监听器。
事件处理函数:
EventHandler类型的函数用于处理发布的事件,参数是事件的数据,返回error以便在处理时进行错误捕获。
并发安全:
- 使用
sync.RWMutex确保事件总线在并发环境下的线程安全。 - 通过
RLock和Lock来分别保证读取和写入时的同步。
- 使用
运行结果
bash
Received event: User ID: 123
Another handler: User ID: 123
Received event: User ID: 456- 当我们发布
user_created事件时,所有订阅该事件的监听器会被依次触发。 - 移除第一个事件监听器后,再次发布该事件时,只有剩余的监听器会被触发。
扩展功能
- 事件参数类型检查:可以通过引入泛型(Go 1.18 及以上版本)来对事件参数进行类型检查。
- 事件优先级:可以为不同的监听器设置优先级,并根据优先级决定执行顺序。
- 异步处理:可以将事件处理放入 goroutine 中,以便异步执行,避免阻塞事件的发布。
总结
实现一个事件总线(Event Bus)能够有效地解耦系统中的各个模块,通过发布/订阅机制提高系统的灵活性和扩展性。Go 语言通过并发和同步工具(如 sync.Mutex 和 sync.RWMutex)确保了事件总线在并发环境中的安全性。同时,事件总线可以非常灵活地支持多种功能,如事件处理器、事件参数、异步处理等。