|
|
@@ -14,9 +14,10 @@ import (
|
|
|
var QuotePublishSer = QuotePublish{}
|
|
|
|
|
|
type QuotePublish struct {
|
|
|
- conn net.Conn // 与行情发布服务的连接
|
|
|
- status int // 连接状态 -1:连接已断开 0:初始状态 1:已连接 2:正在连接
|
|
|
- mtx sync.RWMutex // 锁
|
|
|
+ conn net.Conn // 与行情发布服务的连接
|
|
|
+ closeCh chan struct{} // 连接断开信息
|
|
|
+ mtx sync.RWMutex // 锁
|
|
|
+ status int // 连接状态 -1:连接已断开 0:初始状态 1:已连接 2:正在连接
|
|
|
}
|
|
|
|
|
|
func (r *QuotePublish) Run() {
|
|
|
@@ -35,10 +36,13 @@ func (r *QuotePublish) setStatus(status int) {
|
|
|
|
|
|
// connQuotePublish 连接行情发布服务
|
|
|
func (r *QuotePublish) connQuotePublish() {
|
|
|
+ r.closeCh = make(chan struct{})
|
|
|
+
|
|
|
var err error
|
|
|
for {
|
|
|
if r.conn != nil {
|
|
|
r.conn.Close()
|
|
|
+ close(r.closeCh)
|
|
|
}
|
|
|
|
|
|
r.setStatus(2)
|
|
|
@@ -49,8 +53,8 @@ func (r *QuotePublish) connQuotePublish() {
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- global.M2A_LOG.Info("连接行情发布服务成功", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr))
|
|
|
r.setStatus(1)
|
|
|
+ global.M2A_LOG.Info("连接行情发布服务成功", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr))
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
@@ -62,6 +66,7 @@ func (r *QuotePublish) onDisconnected() {
|
|
|
}
|
|
|
global.M2A_LOG.Error("行情发布服务连接已断开,将会尝试重新连接行情发布服务", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr))
|
|
|
|
|
|
+ close(r.closeCh)
|
|
|
r.setStatus(-1)
|
|
|
go func() {
|
|
|
r.connQuotePublish()
|
|
|
@@ -72,19 +77,29 @@ func (r *QuotePublish) onDisconnected() {
|
|
|
// readQuote 从行情发布服务读取行情信息
|
|
|
func (r *QuotePublish) readQuote() {
|
|
|
for {
|
|
|
- var p packet.MiQuotePacket
|
|
|
- if _, err := p.ReadMessage(&r.conn); err != nil {
|
|
|
- go r.onDisconnected()
|
|
|
- break
|
|
|
- }
|
|
|
+ // Set a deadline for reading messages
|
|
|
+ r.conn.SetReadDeadline(time.Now().Add(15 * time.Second))
|
|
|
|
|
|
- // 心跳包
|
|
|
- if p.BigType == 0x12 && p.Length <= 24 {
|
|
|
- global.M2A_LOG.Debug("接收到行情发页服务心跳回复")
|
|
|
- continue
|
|
|
- }
|
|
|
+ select {
|
|
|
+ case <-r.closeCh:
|
|
|
+ // Quit signal received, exit loop and clean up
|
|
|
+ return
|
|
|
+ default:
|
|
|
+ // 从行情发布服务读取行情信息
|
|
|
+ var p packet.MiQuotePacket
|
|
|
+ if _, err := p.ReadMessage(&r.conn); err != nil {
|
|
|
+ go r.onDisconnected()
|
|
|
+ break
|
|
|
+ }
|
|
|
|
|
|
- // 分发给订阅者
|
|
|
- global.M2A_Publish.Publish(publish.Topic_Quote, &p)
|
|
|
+ // 心跳包
|
|
|
+ if p.BigType == 0x12 && p.Length <= 24 {
|
|
|
+ global.M2A_LOG.Debug("接收到行情发页服务心跳回复")
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ // 分发给订阅者
|
|
|
+ global.M2A_Publish.Publish(publish.Topic_Quote, &p)
|
|
|
+ }
|
|
|
}
|
|
|
}
|