quote_publish.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package quote_publish
  2. import (
  3. "fmt"
  4. "mtp20access/global"
  5. "mtp20access/packet"
  6. "mtp20access/publish"
  7. "net"
  8. "sync"
  9. "time"
  10. "go.uber.org/zap"
  11. )
  12. var QuotePublishSev = QuotePublish{}
  13. type QuotePublish struct {
  14. conn net.Conn // 与行情发布服务的连接
  15. mtx sync.RWMutex // 锁
  16. status int // 连接状态 -1:连接已断开 0:初始状态 1:已连接 2:正在连接
  17. }
  18. func (r *QuotePublish) Run() {
  19. // 连接行情发布, 阻塞模式, 直接成功才返回
  20. r.connQuotePublish()
  21. // 读取行情发布连接tcp报文
  22. go r.readQuote()
  23. }
  24. func (r *QuotePublish) setStatus(status int) {
  25. r.mtx.Lock()
  26. defer r.mtx.Unlock()
  27. r.status = status
  28. }
  29. // connQuotePublish 连接行情发布服务
  30. func (r *QuotePublish) connQuotePublish() {
  31. var err error
  32. for {
  33. r.setStatus(2)
  34. global.M2A_LOG.Info("正在尝试连接行情发布服务", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr))
  35. r.conn, err = net.Dial("tcp", global.M2A_CONFIG.System.QuotePublishAddr)
  36. if err != nil {
  37. global.M2A_LOG.Error("尝试连接行情发布服务失败,将在3秒后重试", zap.Any("err", err))
  38. time.Sleep(time.Second * 3)
  39. continue
  40. }
  41. r.setStatus(1)
  42. global.M2A_LOG.Info("连接行情发布服务成功", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr))
  43. break
  44. }
  45. }
  46. // onDisconnected 连接断开事件
  47. func (r *QuotePublish) onDisconnected() {
  48. if r.status == 2 {
  49. return
  50. }
  51. global.M2A_LOG.Error("行情发布服务连接已断开,将会尝试重新连接行情发布服务", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr))
  52. r.setStatus(-1)
  53. go func() {
  54. r.connQuotePublish()
  55. r.readQuote()
  56. }()
  57. }
  58. // readQuote 从行情发布服务读取行情信息
  59. func (r *QuotePublish) readQuote() {
  60. for {
  61. // 从行情发布服务读取行情信息
  62. var p packet.MiQuotePacket
  63. if _, err := p.ReadMessage(&r.conn); err != nil {
  64. go r.onDisconnected()
  65. break
  66. }
  67. // 心跳包
  68. if p.BigType == 0x12 && p.Length <= 24 {
  69. global.M2A_LOG.Debug("接收到行情发页服务心跳回复")
  70. continue
  71. }
  72. fmt.Println(p.OriMsg)
  73. // 分发给订阅者
  74. global.M2A_Publish.Publish(publish.Topic_Quote, &p)
  75. }
  76. }