publish.go 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. package publish
  2. import (
  3. "sync"
  4. )
  5. type Topic string
  6. const (
  7. Topic_Quote = "quote" // 实时行情
  8. Topic_Trading = "trading" // 交易服务通知
  9. )
  10. type Publisher struct {
  11. subscribers map[Topic][]chan interface{}
  12. mu sync.RWMutex
  13. }
  14. func NewPublisher() *Publisher {
  15. return &Publisher{
  16. subscribers: make(map[Topic][]chan interface{}),
  17. }
  18. }
  19. func (p *Publisher) Subscribe(topic Topic) chan interface{} {
  20. p.mu.Lock()
  21. defer p.mu.Unlock()
  22. ch := make(chan interface{}, 1)
  23. p.subscribers[topic] = append(p.subscribers[topic], ch)
  24. return ch
  25. }
  26. func (p *Publisher) Unsubscribe(topic Topic, ch chan interface{}) {
  27. p.mu.Lock()
  28. defer p.mu.Unlock()
  29. subs := p.subscribers[topic]
  30. for i := range subs {
  31. if subs[i] == ch {
  32. p.subscribers[topic] = append(subs[:i], subs[i+1:]...)
  33. break
  34. }
  35. }
  36. close(ch)
  37. }
  38. func (p *Publisher) Publish(topic Topic, data interface{}) {
  39. p.mu.RLock()
  40. defer p.mu.RUnlock()
  41. subs := p.subscribers[topic]
  42. for _, ch := range subs {
  43. select {
  44. case ch <- data:
  45. default:
  46. // 如果消息管道已满,直接跳过,不阻塞订阅者
  47. }
  48. }
  49. }