package quote_publish import ( "mtp20access/global" "mtp20access/packet" "mtp20access/publish" "net" "sync" "time" "go.uber.org/zap" ) var QuotePublishSev = QuotePublish{} type QuotePublish struct { conn net.Conn // 与行情发布服务的连接 mtx sync.RWMutex // 锁 status int // 连接状态 -1:连接已断开 0:初始状态 1:已连接 2:正在连接 } func (r *QuotePublish) Run() { // 连接行情发布, 阻塞模式, 直接成功才返回 r.connQuotePublish() // 读取行情发布连接tcp报文 go r.readQuote() } func (r *QuotePublish) setStatus(status int) { r.mtx.Lock() defer r.mtx.Unlock() r.status = status } // connQuotePublish 连接行情发布服务 func (r *QuotePublish) connQuotePublish() { var err error for { r.setStatus(2) global.M2A_LOG.Info("正在尝试连接行情发布服务", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr)) r.conn, err = net.Dial("tcp", global.M2A_CONFIG.System.QuotePublishAddr) if err != nil { global.M2A_LOG.Error("尝试连接行情发布服务失败,将在3秒后重试", zap.Any("err", err)) time.Sleep(time.Second * 3) continue } r.setStatus(1) global.M2A_LOG.Info("连接行情发布服务成功", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr)) break } } // onDisconnected 连接断开事件 func (r *QuotePublish) onDisconnected() { if r.status == 2 { return } global.M2A_LOG.Error("行情发布服务连接已断开,将会尝试重新连接行情发布服务", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr)) r.setStatus(-1) go func() { r.connQuotePublish() r.readQuote() }() } // readQuote 从行情发布服务读取行情信息 func (r *QuotePublish) readQuote() { for { // 从行情发布服务读取行情信息 var p packet.MiQuotePacket if _, err := p.ReadMessage(&r.conn); err != nil { go r.onDisconnected() break } // 心跳包 if p.BigType == 0x12 && p.Length <= 24 { global.M2A_LOG.Debug("接收到行情发页服务心跳回复") continue } // 分发给订阅者 global.M2A_Publish.Publish(publish.Topic_Quote, &p) } }