package publish import ( "fmt" "sync" ) type Topic string const ( Topic_Quote = "quote" // 实时行情 Topic_Trading = "trading" // 交易服务通知 ) type Publisher struct { subscribers map[Topic][]chan interface{} mu sync.RWMutex } func NewPublisher() *Publisher { return &Publisher{ subscribers: make(map[Topic][]chan interface{}), } } func (p *Publisher) Subscribe(topic Topic) chan interface{} { p.mu.Lock() defer p.mu.Unlock() ch := make(chan interface{}, 100) p.subscribers[topic] = append(p.subscribers[topic], ch) return ch } func (p *Publisher) Unsubscribe(topic Topic, ch chan interface{}) { p.mu.Lock() defer p.mu.Unlock() subs := p.subscribers[topic] for i := range subs { if subs[i] == ch { p.subscribers[topic] = append(subs[:i], subs[i+1:]...) break } } close(ch) } func (p *Publisher) Publish(topic Topic, data interface{}) { p.mu.RLock() defer p.mu.RUnlock() if subs, ok := p.subscribers[topic]; ok { if topic == Topic_Trading { fmt.Printf("Publish-Topic_Trading 获取sub, topic: %v sub: %v \n", topic, subs) } for _, ch := range subs { select { case ch <- data: default: // 如果消息管道已满,直接跳过,不阻塞订阅者 } } } else { fmt.Printf("Publish 获取订阅目标失败, topic: %v \n", topic) } }