@@ -56,6 +56,7 @@ func (r *Client) SetQuoteSubs(req request.QuoteSubscribeReq) {
r.quoteSubs = req.QuoteGoodses
+ // 从订阅中心订阅行情通知
if r.quoteChan != nil {
global.M2A_Publish.Unsubscribe(publish.Topic_Quote, r.quoteChan)
}
@@ -59,7 +59,7 @@ func main() {
// 连接行情发布服务
if global.M2A_CONFIG.System.NeedQuotePublish {
- go quote_publish.QuotePublishSer.Run()
+ go quote_publish.QuotePublishSev.Run()
// 启动Http API 服务
@@ -26,7 +26,7 @@ func (p *Publisher) Subscribe(topic Topic) chan interface{} {
p.mu.Lock()
defer p.mu.Unlock()
- ch := make(chan interface{}, 1)
+ ch := make(chan interface{}, 100)
p.subscribers[topic] = append(p.subscribers[topic], ch)
return ch
@@ -11,7 +11,7 @@ import (
"go.uber.org/zap"
)
-var QuotePublishSer = QuotePublish{}
+var QuotePublishSev = QuotePublish{}
type QuotePublish struct {
conn net.Conn // 与行情发布服务的连接
@@ -82,6 +82,7 @@ func SendMQ(c *gin.Context, req *request.MQBodyReq) (err error) {
global.M2A_LOG.Info("[C->S]", zap.Any("req", req.FunCodeReq), zap.Any("SessionId", packet.SessionId), zap.Any("data", req.Data))
// 阻塞线程等待总线回复或超时
+ // 这里make时没有指定长度,所以是发送者与接收者的同步操作,发送者会阻塞,所以一般会在下面使用超时来解锁。
asyncTask.Rsp = make(chan response.MQBodyRsp)
select {
case r := <-asyncTask.Rsp: // 总线回复