| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- package publish
- import (
- "sync"
- )
- type Topic string
- const (
- Topic_Quote = "quote" // 实时行情
- Topic_Trading = "trading" // 交易服务通知
- )
- type Publisher struct {
- subscribers map[Topic][]chan interface{}
- mu sync.RWMutex
- }
- func NewPublisher() *Publisher {
- return &Publisher{
- subscribers: make(map[Topic][]chan interface{}),
- }
- }
- func (p *Publisher) Subscribe(topic Topic) chan interface{} {
- p.mu.Lock()
- defer p.mu.Unlock()
- ch := make(chan interface{}, 100)
- p.subscribers[topic] = append(p.subscribers[topic], ch)
- return ch
- }
- func (p *Publisher) Unsubscribe(topic Topic, ch chan interface{}) {
- p.mu.Lock()
- defer p.mu.Unlock()
- subs := p.subscribers[topic]
- for i := range subs {
- if subs[i] == ch {
- p.subscribers[topic] = append(subs[:i], subs[i+1:]...)
- break
- }
- }
- close(ch)
- }
- func (p *Publisher) Publish(topic Topic, data interface{}) {
- // fmt.Printf("Publish++++++++++++ 1 \n")
- p.mu.RLock()
- defer p.mu.RUnlock()
- // fmt.Printf("Publish++++++++++++ 2 \n")
- if subs, ok := p.subscribers[topic]; ok {
- // if topic == Topic_Trading {
- // fmt.Printf("Publish-Topic_Trading 获取sub, topic: %v sub: %v \n", topic, subs)
- // }
- for _, ch := range subs {
- select {
- case ch <- data:
- default:
- // 如果消息管道已满,直接跳过,不阻塞订阅者
- }
- }
- } else {
- // fmt.Printf("Publish 获取订阅目标失败, topic: %v \n", topic)
- }
- // fmt.Printf("Publish++++++++++++ 3 \n")
- }
|