| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- package publish
- import (
- "sync"
- )
- type Topic string
- const (
- Topic_Quote = "quote" // 实时行情
- Topic_Trading = "trading" // 交易服务通知
- )
- type Publisher struct {
- subscribers map[Topic][]chan interface{}
- mu sync.RWMutex
- }
- func NewPublisher() *Publisher {
- return &Publisher{
- subscribers: make(map[Topic][]chan interface{}),
- }
- }
- func (p *Publisher) Subscribe(topic Topic) chan interface{} {
- p.mu.Lock()
- defer p.mu.Unlock()
- ch := make(chan interface{}, 1)
- p.subscribers[topic] = append(p.subscribers[topic], ch)
- return ch
- }
- func (p *Publisher) Unsubscribe(topic Topic, ch chan interface{}) {
- p.mu.Lock()
- defer p.mu.Unlock()
- subs := p.subscribers[topic]
- for i := range subs {
- if subs[i] == ch {
- p.subscribers[topic] = append(subs[:i], subs[i+1:]...)
- break
- }
- }
- close(ch)
- }
- func (p *Publisher) Publish(topic Topic, data interface{}) {
- p.mu.RLock()
- defer p.mu.RUnlock()
- subs := p.subscribers[topic]
- for _, ch := range subs {
- select {
- case ch <- data:
- default:
- // 如果消息管道已满,直接跳过,不阻塞订阅者
- }
- }
- }
|