publish.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package publish
  2. import (
  3. "sync"
  4. )
  5. type Topic string
  6. const (
  7. Topic_Quote = "quote" // 实时行情
  8. Topic_Trading = "trading" // 交易服务通知
  9. )
  10. type Publisher struct {
  11. subscribers map[Topic][]chan interface{}
  12. mu sync.RWMutex
  13. }
  14. func NewPublisher() *Publisher {
  15. return &Publisher{
  16. subscribers: make(map[Topic][]chan interface{}),
  17. }
  18. }
  19. func (p *Publisher) Subscribe(topic Topic) chan interface{} {
  20. p.mu.Lock()
  21. defer p.mu.Unlock()
  22. ch := make(chan interface{}, 100)
  23. p.subscribers[topic] = append(p.subscribers[topic], ch)
  24. return ch
  25. }
  26. func (p *Publisher) Unsubscribe(topic Topic, ch chan interface{}) {
  27. p.mu.Lock()
  28. defer p.mu.Unlock()
  29. subs := p.subscribers[topic]
  30. for i := range subs {
  31. if subs[i] == ch {
  32. p.subscribers[topic] = append(subs[:i], subs[i+1:]...)
  33. break
  34. }
  35. }
  36. close(ch)
  37. }
  38. func (p *Publisher) Publish(topic Topic, data interface{}) {
  39. // fmt.Printf("Publish++++++++++++ 1 \n")
  40. p.mu.RLock()
  41. defer p.mu.RUnlock()
  42. // fmt.Printf("Publish++++++++++++ 2 \n")
  43. if subs, ok := p.subscribers[topic]; ok {
  44. // if topic == Topic_Trading {
  45. // fmt.Printf("Publish-Topic_Trading 获取sub, topic: %v sub: %v \n", topic, subs)
  46. // }
  47. for _, ch := range subs {
  48. select {
  49. case ch <- data:
  50. default:
  51. // 如果消息管道已满,直接跳过,不阻塞订阅者
  52. }
  53. }
  54. } else {
  55. // fmt.Printf("Publish 获取订阅目标失败, topic: %v \n", topic)
  56. }
  57. // fmt.Printf("Publish++++++++++++ 3 \n")
  58. }