quote_publish.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package quote_publish
  2. import (
  3. "mtp20access/global"
  4. "mtp20access/packet"
  5. "mtp20access/publish"
  6. "net"
  7. "sync"
  8. "time"
  9. "go.uber.org/zap"
  10. )
  11. var QuotePublishSer = QuotePublish{}
  12. type QuotePublish struct {
  13. conn net.Conn // 与行情发布服务的连接
  14. closeCh chan struct{} // 连接断开信息
  15. mtx sync.RWMutex // 锁
  16. status int // 连接状态 -1:连接已断开 0:初始状态 1:已连接 2:正在连接
  17. }
  18. func (r *QuotePublish) Run() {
  19. // 连接行情发布, 阻塞模式, 直接成功才返回
  20. r.connQuotePublish()
  21. // 读取行情发布连接tcp报文
  22. go r.readQuote()
  23. }
  24. func (r *QuotePublish) setStatus(status int) {
  25. r.mtx.Lock()
  26. defer r.mtx.Unlock()
  27. r.status = status
  28. }
  29. // connQuotePublish 连接行情发布服务
  30. func (r *QuotePublish) connQuotePublish() {
  31. r.closeCh = make(chan struct{})
  32. var err error
  33. for {
  34. if r.conn != nil {
  35. r.conn.Close()
  36. close(r.closeCh)
  37. }
  38. r.setStatus(2)
  39. global.M2A_LOG.Info("正在尝试连接行情发布服务", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr))
  40. if r.conn, err = net.Dial("tcp", global.M2A_CONFIG.System.QuotePublishAddr); err != nil {
  41. global.M2A_LOG.Error("尝试连接行情发布服务失败,将在3秒后重试", zap.Any("err", err))
  42. time.Sleep(time.Second * 3)
  43. continue
  44. }
  45. r.setStatus(1)
  46. global.M2A_LOG.Info("连接行情发布服务成功", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr))
  47. break
  48. }
  49. }
  50. // onDisconnected 连接断开事件
  51. func (r *QuotePublish) onDisconnected() {
  52. if r.status == 2 {
  53. return
  54. }
  55. global.M2A_LOG.Error("行情发布服务连接已断开,将会尝试重新连接行情发布服务", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr))
  56. close(r.closeCh)
  57. r.setStatus(-1)
  58. go func() {
  59. r.connQuotePublish()
  60. r.readQuote()
  61. }()
  62. }
  63. // readQuote 从行情发布服务读取行情信息
  64. func (r *QuotePublish) readQuote() {
  65. for {
  66. // Set a deadline for reading messages
  67. r.conn.SetReadDeadline(time.Now().Add(15 * time.Second))
  68. select {
  69. case <-r.closeCh:
  70. // Quit signal received, exit loop and clean up
  71. return
  72. default:
  73. // 从行情发布服务读取行情信息
  74. var p packet.MiQuotePacket
  75. if _, err := p.ReadMessage(&r.conn); err != nil {
  76. go r.onDisconnected()
  77. break
  78. }
  79. // 心跳包
  80. if p.BigType == 0x12 && p.Length <= 24 {
  81. global.M2A_LOG.Debug("接收到行情发页服务心跳回复")
  82. continue
  83. }
  84. // 分发给订阅者
  85. global.M2A_Publish.Publish(publish.Topic_Quote, &p)
  86. }
  87. }
  88. }