quote_publish.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package quote_publish
  2. import (
  3. "mtp20access/global"
  4. "mtp20access/packet"
  5. "mtp20access/publish"
  6. "net"
  7. "sync"
  8. "time"
  9. "go.uber.org/zap"
  10. )
  11. var QuotePublishSev = QuotePublish{}
  12. type QuotePublish struct {
  13. conn net.Conn // 与行情发布服务的连接
  14. mtx sync.RWMutex // 锁
  15. status int // 连接状态 -1:连接已断开 0:初始状态 1:已连接 2:正在连接
  16. }
  17. func (r *QuotePublish) Run() {
  18. // 连接行情发布, 阻塞模式, 直接成功才返回
  19. r.connQuotePublish()
  20. // 读取行情发布连接tcp报文
  21. go r.readQuote()
  22. }
  23. func (r *QuotePublish) setStatus(status int) {
  24. r.mtx.Lock()
  25. defer r.mtx.Unlock()
  26. r.status = status
  27. }
  28. // connQuotePublish 连接行情发布服务
  29. func (r *QuotePublish) connQuotePublish() {
  30. var err error
  31. for {
  32. r.setStatus(2)
  33. global.M2A_LOG.Info("正在尝试连接行情发布服务", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr))
  34. r.conn, err = net.Dial("tcp", global.M2A_CONFIG.System.QuotePublishAddr)
  35. if err != nil {
  36. global.M2A_LOG.Error("尝试连接行情发布服务失败,将在3秒后重试", zap.Any("err", err))
  37. time.Sleep(time.Second * 3)
  38. continue
  39. }
  40. r.setStatus(1)
  41. global.M2A_LOG.Info("连接行情发布服务成功", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr))
  42. break
  43. }
  44. }
  45. // onDisconnected 连接断开事件
  46. func (r *QuotePublish) onDisconnected() {
  47. if r.status == 2 {
  48. return
  49. }
  50. global.M2A_LOG.Error("行情发布服务连接已断开,将会尝试重新连接行情发布服务", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr))
  51. r.setStatus(-1)
  52. go func() {
  53. r.connQuotePublish()
  54. r.readQuote()
  55. }()
  56. }
  57. // readQuote 从行情发布服务读取行情信息
  58. func (r *QuotePublish) readQuote() {
  59. for {
  60. // 从行情发布服务读取行情信息
  61. var p packet.MiQuotePacket
  62. if _, err := p.ReadMessage(&r.conn); err != nil {
  63. go r.onDisconnected()
  64. break
  65. }
  66. // 心跳包
  67. if p.BigType == 0x12 && p.Length <= 24 {
  68. global.M2A_LOG.Debug("接收到行情发页服务心跳回复")
  69. continue
  70. }
  71. // 分发给订阅者
  72. global.M2A_Publish.Publish(publish.Topic_Quote, &p)
  73. }
  74. }