Skip to content

ps

发布-订阅模式(Publish-Subscribe Pattern)是一种常见的消息传递模式,在这种模式下,发布者将消息发布到一个或多个主题上,而订阅者订阅感兴趣的主题并接收相关消息。发布者和订阅者之间没有直接的耦合关系,发布者只关心消息的发布,而订阅者则关心接收到的消息内容。这种模式使得系统的模块之间松耦合,从而提高了系统的可扩展性和灵活性。

在 Go 中实现发布-订阅模式,通常需要有以下几个关键组件:

  1. Publisher(发布者):发布消息到主题或频道。
  2. Subscriber(订阅者):订阅特定的主题或频道,接收消息。
  3. Topic/Channel(主题/频道):连接发布者和订阅者的中介,管理消息的分发。

发布订阅模式的基本实现

在 Go 中,我们可以通过 Go 的 goroutine 和 channel 来实现发布-订阅模式。每个主题或频道都可以对应一个 goroutine 来处理消息的发布和接收。


实现代码

go
package main

import (
	"fmt"
	"sync"
)

// Event 事件结构体,表示要发布的事件
type Event struct {
	Topic string
	Data  string
}

// Subscriber 订阅者接口
type Subscriber interface {
	Notify(event Event)
}

// Publisher 发布者结构体,持有一个订阅者列表
type Publisher struct {
	subscribers map[string][]Subscriber
	mu          sync.Mutex
}

// NewPublisher 创建一个新的发布者
func NewPublisher() *Publisher {
	return &Publisher{
		subscribers: make(map[string][]Subscriber),
	}
}

// Subscribe 订阅某个主题
func (p *Publisher) Subscribe(topic string, subscriber Subscriber) {
	p.mu.Lock()
	defer p.mu.Unlock()

	p.subscribers[topic] = append(p.subscribers[topic], subscriber)
}

// Unsubscribe 取消订阅某个主题
func (p *Publisher) Unsubscribe(topic string, subscriber Subscriber) {
	p.mu.Lock()
	defer p.mu.Unlock()

	subscribers := p.subscribers[topic]
	for i, s := range subscribers {
		if s == subscriber {
			// 移除订阅者
			p.subscribers[topic] = append(subscribers[:i], subscribers[i+1:]...)
			break
		}
	}
}

// Publish 发布事件到指定主题
func (p *Publisher) Publish(event Event) {
	p.mu.Lock()
	defer p.mu.Unlock()

	// 获取所有订阅该主题的观察者,并通知他们
	for _, subscriber := range p.subscribers[event.Topic] {
		subscriber.Notify(event)
	}
}

// ConcreteSubscriber 具体订阅者结构体,处理接收到的事件
type ConcreteSubscriber struct {
	name string
}

// Notify 处理接收到的事件
func (s *ConcreteSubscriber) Notify(event Event) {
	fmt.Printf("%s received event: Topic=%s, Data=%s\n", s.name, event.Topic, event.Data)
}

func main() {
	// 创建发布者
	publisher := NewPublisher()

	// 创建订阅者
	subscriber1 := &ConcreteSubscriber{name: "Subscriber 1"}
	subscriber2 := &ConcreteSubscriber{name: "Subscriber 2"}

	// 订阅主题
	publisher.Subscribe("news", subscriber1)
	publisher.Subscribe("sports", subscriber2)

	// 发布事件
	publisher.Publish(Event{Topic: "news", Data: "Breaking news!"})
	publisher.Publish(Event{Topic: "sports", Data: "Sports update!"})

	// 取消订阅
	publisher.Unsubscribe("news", subscriber1)

	// 再次发布事件,只有订阅了该主题的订阅者会收到通知
	publisher.Publish(Event{Topic: "news", Data: "More news!"})
	publisher.Publish(Event{Topic: "sports", Data: "More sports updates!"})
}

实现解析

  1. Publisher(发布者)

    • Publisher 结构体维护了一个 subscribers 映射,存储每个主题及其订阅者列表。
    • Subscribe(topic string, subscriber Subscriber):订阅指定主题。
    • Unsubscribe(topic string, subscriber Subscriber):取消订阅指定主题。
    • Publish(event Event):发布一个事件,遍历所有订阅该主题的订阅者,并通知他们。
  2. Subscriber(订阅者)

    • Subscriber 接口定义了 Notify(event Event) 方法,每个订阅者实现这个方法来接收并处理事件。
    • ConcreteSubscriber 结构体实现了 Subscriber 接口,处理接收到的事件。
  3. Event(事件)

    • Event 结构体包含 Topic(事件的主题)和 Data(事件的数据)。
  4. 主程序

    • 创建 Publisher 和多个 Subscriber 实例。
    • 订阅不同的主题,并通过 Publish 方法发布事件。
    • 取消订阅后,再次发布事件时,已取消订阅的订阅者不会收到通知。

运行结果

bash
Subscriber 1 received event: Topic=news, Data=Breaking news!
Subscriber 2 received event: Topic=sports, Data=Sports update!
Subscriber 2 received event: Topic=sports, Data=More sports updates!
Subscriber 1 received event: Topic=news, Data=More news!
  • Subscriber 1 订阅了 news 主题,并接收到了 news 主题的事件。
  • Subscriber 2 订阅了 sports 主题,并接收到了 sports 主题的事件。
  • 当我们取消 Subscriber 1news 主题的订阅后,发布 news 主题事件时,Subscriber 1 不再接收到通知。

扩展功能

  1. 异步处理:通过 goroutine 异步处理事件的通知,避免阻塞发布者的操作。
  2. 主题过滤:允许订阅者基于某些条件只接收特定类型的事件。
  3. 事件队列:对于高频事件,可以使用事件队列来缓存消息并批量通知订阅者。
  4. 优先级:可以为不同的订阅者设定优先级,确保优先级高的订阅者先接收到事件。

总结

发布-订阅模式是一种强大的设计模式,特别适用于解耦系统中的组件,使得发布者和订阅者之间不直接依赖,能够提高系统的灵活性和可扩展性。在 Go 中,结合 goroutines 和 channels,我们可以高效地实现发布-订阅模式,特别适合用于事件驱动架构、消息系统、异步任务处理等场景。