Skip to content

event bus

在 Go 中实现一个 Event Bus(事件总线)是一种用于解耦应用组件的设计模式,它可以帮助我们管理和分发事件,使得事件的发送者与接收者之间不需要直接依赖关系,从而提高了代码的可维护性和扩展性。

以下是一个简单的事件总线实现,它支持:

  • 注册事件监听器(订阅者)。
  • 发布事件(发布者)。
  • 执行事件处理(回调函数)。

步骤

  1. 定义事件总线(EventBus)结构体,存储事件与监听器的映射关系。
  2. 提供方法来注册事件监听器(Subscribe),发布事件(Publish),和移除事件监听器(Unsubscribe)。
  3. 事件监听器可以是任何符合签名的函数,通常是接受事件数据并返回错误的函数。

实现代码

go
package main

import (
	"fmt"
	"sync"
)

// EventHandler 事件处理器类型,用于定义监听器函数签名
type EventHandler func(event interface{}) error

// EventBus 事件总线结构体,包含一个映射表,用于存储事件和监听器的关系
type EventBus struct {
	// 用于存储每个事件的监听器队列
	handlers map[string][]EventHandler
	mu       sync.RWMutex
}

// NewEventBus 创建并返回一个新的事件总线实例
func NewEventBus() *EventBus {
	return &EventBus{
		handlers: make(map[string][]EventHandler),
	}
}

// Subscribe 注册事件监听器
func (eb *EventBus) Subscribe(event string, handler EventHandler) {
	eb.mu.Lock()
	defer eb.mu.Unlock()

	// 将事件监听器添加到对应事件的处理队列中
	eb.handlers[event] = append(eb.handlers[event], handler)
}

// Unsubscribe 移除事件监听器
func (eb *EventBus) Unsubscribe(event string, handler EventHandler) {
	eb.mu.Lock()
	defer eb.mu.Unlock()

	// 获取该事件的所有监听器
	handlers := eb.handlers[event]
	for i, h := range handlers {
		if fmt.Sprintf("%p", h) == fmt.Sprintf("%p", handler) {
			// 移除匹配的监听器
			eb.handlers[event] = append(handlers[:i], handlers[i+1:]...)
			break
		}
	}
}

// Publish 发布事件,触发所有订阅该事件的监听器
func (eb *EventBus) Publish(event string, data interface{}) {
	eb.mu.RLock()
	defer eb.mu.RUnlock()

	// 获取所有监听器
	handlers, exists := eb.handlers[event]
	if !exists {
		return
	}

	// 执行所有监听器
	for _, handler := range handlers {
		if err := handler(data); err != nil {
			fmt.Printf("Error handling event %s: %s\n", event, err)
		}
	}
}

func main() {
	// 创建事件总线实例
	eventBus := NewEventBus()

	// 定义一个监听器
	handler := func(event interface{}) error {
		fmt.Printf("Received event: %v\n", event)
		return nil
	}

	// 注册事件监听器
	eventBus.Subscribe("user_created", handler)
	eventBus.Subscribe("user_created", func(event interface{}) error {
		fmt.Println("Another handler:", event)
		return nil
	})

	// 发布事件
	eventBus.Publish("user_created", "User ID: 123")

	// 移除事件监听器
	eventBus.Unsubscribe("user_created", handler)

	// 再次发布事件,只有一个监听器会被触发
	eventBus.Publish("user_created", "User ID: 456")
}

实现解析

  1. 事件总线 EventBus

    • handlers 是一个映射表,键是事件的名称(如 "user_created"),值是该事件对应的监听器函数(EventHandler 类型)。
    • mu 是一个 sync.RWMutex,用于确保在并发环境下对 handlers 进行安全的读写操作。
  2. 方法

    • Subscribe:订阅一个事件,并将处理函数(事件监听器)注册到该事件上。
    • Unsubscribe:移除特定事件的监听器。通过比较函数指针来确定移除哪个监听器。
    • Publish:发布一个事件,并将事件数据传递给所有已注册的监听器。
  3. 事件处理函数

    • EventHandler 类型的函数用于处理发布的事件,参数是事件的数据,返回 error 以便在处理时进行错误捕获。
  4. 并发安全

    • 使用 sync.RWMutex 确保事件总线在并发环境下的线程安全。
    • 通过 RLockLock 来分别保证读取和写入时的同步。

运行结果

bash
Received event: User ID: 123
Another handler: User ID: 123
Received event: User ID: 456
  • 当我们发布 user_created 事件时,所有订阅该事件的监听器会被依次触发。
  • 移除第一个事件监听器后,再次发布该事件时,只有剩余的监听器会被触发。

扩展功能

  • 事件参数类型检查:可以通过引入泛型(Go 1.18 及以上版本)来对事件参数进行类型检查。
  • 事件优先级:可以为不同的监听器设置优先级,并根据优先级决定执行顺序。
  • 异步处理:可以将事件处理放入 goroutine 中,以便异步执行,避免阻塞事件的发布。

总结

实现一个事件总线(Event Bus)能够有效地解耦系统中的各个模块,通过发布/订阅机制提高系统的灵活性和扩展性。Go 语言通过并发和同步工具(如 sync.Mutexsync.RWMutex)确保了事件总线在并发环境中的安全性。同时,事件总线可以非常灵活地支持多种功能,如事件处理器、事件参数、异步处理等。