publish.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package publish
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. type Topic string
  7. const (
  8. Topic_Quote = "quote" // 实时行情
  9. Topic_Trading = "trading" // 交易服务通知
  10. )
  11. type Publisher struct {
  12. subscribers map[Topic][]chan interface{}
  13. mu sync.RWMutex
  14. }
  15. func NewPublisher() *Publisher {
  16. return &Publisher{
  17. subscribers: make(map[Topic][]chan interface{}),
  18. }
  19. }
  20. func (p *Publisher) Subscribe(topic Topic) chan interface{} {
  21. p.mu.Lock()
  22. defer p.mu.Unlock()
  23. ch := make(chan interface{}, 100)
  24. p.subscribers[topic] = append(p.subscribers[topic], ch)
  25. return ch
  26. }
  27. func (p *Publisher) Unsubscribe(topic Topic, ch chan interface{}) {
  28. p.mu.Lock()
  29. defer p.mu.Unlock()
  30. subs := p.subscribers[topic]
  31. for i := range subs {
  32. if subs[i] == ch {
  33. p.subscribers[topic] = append(subs[:i], subs[i+1:]...)
  34. break
  35. }
  36. }
  37. close(ch)
  38. }
  39. func (p *Publisher) Publish(topic Topic, data interface{}) {
  40. fmt.Printf("Publish++++++++++++ 1 \n")
  41. p.mu.RLock()
  42. defer p.mu.RUnlock()
  43. fmt.Printf("Publish++++++++++++ 2 \n")
  44. if subs, ok := p.subscribers[topic]; ok {
  45. if topic == Topic_Trading {
  46. fmt.Printf("Publish-Topic_Trading 获取sub, topic: %v sub: %v \n", topic, subs)
  47. }
  48. for _, ch := range subs {
  49. select {
  50. case ch <- data:
  51. default:
  52. // 如果消息管道已满,直接跳过,不阻塞订阅者
  53. }
  54. }
  55. } else {
  56. fmt.Printf("Publish 获取订阅目标失败, topic: %v \n", topic)
  57. }
  58. fmt.Printf("Publish++++++++++++ 3 \n")
  59. }