| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- package quote_publish
- import (
- "mtp20access/global"
- "mtp20access/packet"
- "mtp20access/publish"
- "net"
- "sync"
- "time"
- "go.uber.org/zap"
- )
- var QuotePublishSer = QuotePublish{}
- type QuotePublish struct {
- conn net.Conn // 与行情发布服务的连接
- closeCh chan struct{} // 连接断开信息
- mtx sync.RWMutex // 锁
- status int // 连接状态 -1:连接已断开 0:初始状态 1:已连接 2:正在连接
- }
- func (r *QuotePublish) Run() {
- // 连接行情发布, 阻塞模式, 直接成功才返回
- r.connQuotePublish()
- // 读取行情发布连接tcp报文
- go r.readQuote()
- }
- func (r *QuotePublish) setStatus(status int) {
- r.mtx.Lock()
- defer r.mtx.Unlock()
- r.status = status
- }
- // connQuotePublish 连接行情发布服务
- func (r *QuotePublish) connQuotePublish() {
- r.closeCh = make(chan struct{})
- var err error
- for {
- if r.conn != nil {
- r.conn.Close()
- close(r.closeCh)
- }
- r.setStatus(2)
- global.M2A_LOG.Info("正在尝试连接行情发布服务", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr))
- if r.conn, err = net.Dial("tcp", global.M2A_CONFIG.System.QuotePublishAddr); err != nil {
- global.M2A_LOG.Error("尝试连接行情发布服务失败,将在3秒后重试", zap.Any("err", err))
- time.Sleep(time.Second * 3)
- continue
- }
- r.setStatus(1)
- global.M2A_LOG.Info("连接行情发布服务成功", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr))
- break
- }
- }
- // onDisconnected 连接断开事件
- func (r *QuotePublish) onDisconnected() {
- if r.status == 2 {
- return
- }
- global.M2A_LOG.Error("行情发布服务连接已断开,将会尝试重新连接行情发布服务", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr))
- close(r.closeCh)
- r.setStatus(-1)
- go func() {
- r.connQuotePublish()
- r.readQuote()
- }()
- }
- // readQuote 从行情发布服务读取行情信息
- func (r *QuotePublish) readQuote() {
- for {
- // Set a deadline for reading messages
- r.conn.SetReadDeadline(time.Now().Add(15 * time.Second))
- select {
- case <-r.closeCh:
- // Quit signal received, exit loop and clean up
- return
- default:
- // 从行情发布服务读取行情信息
- var p packet.MiQuotePacket
- if _, err := p.ReadMessage(&r.conn); err != nil {
- go r.onDisconnected()
- break
- }
- // 心跳包
- if p.BigType == 0x12 && p.Length <= 24 {
- global.M2A_LOG.Debug("接收到行情发页服务心跳回复")
- continue
- }
- // 分发给订阅者
- global.M2A_Publish.Publish(publish.Topic_Quote, &p)
- }
- }
- }
|