zhou.xiaoning 2 lat temu
rodzic
commit
a4b8f8e531
2 zmienionych plików z 43 dodań i 40 usunięć
  1. 42 39
      client/client.go
  2. 1 1
      client/msgRealQuote.go

+ 42 - 39
client/client.go

@@ -26,17 +26,16 @@ type Client struct {
 	curSerialNumber uint32                // 当前业务流水号
 	asyncTasks      map[string]*AsyncTask // key:SessionId_FuncodeRsp_SerialNumber
 
-	wsQuoteConn      *websocket.Conn      // 终端行情WebSocket连接
-	quoteWriteChan   chan []byte          // 推送队列 Access -> Client
-	quoteChan        chan interface{}     // 接收实时行情订阅channel QuoteServer -> Access
-	quoteSubs        []request.QuoteGoods // 当前已订阅行情的商品
-	unSubscribe      chan struct{}        // 取消订阅信号
-	wsQuoteCloseChan chan struct{}        // 终端WebSocket连接关闭信号
-
-	wsTradeConn      *websocket.Conn  // 终端交易WebSocket连接
-	tradeWriteChan   chan []byte      // 推送队列 Access -> Client
-	tradeChan        chan interface{} // 接收交易通知channel RabbitMQ -> Access
-	wsTradeCloseChan chan struct{}    // 终端WebSocket连接关闭信号
+	wsQuoteConn    *websocket.Conn      // 终端行情WebSocket连接
+	quoteWriteChan chan []byte          // 推送队列 Access -> Client
+	quoteChan      chan interface{}     // 接收实时行情订阅channel QuoteServer -> Access
+	quoteSubs      []request.QuoteGoods // 当前已订阅行情的商品
+	// wsQuoteCloseChan chan struct{}        // 终端WebSocket连接关闭信号
+
+	wsTradeConn    *websocket.Conn  // 终端交易WebSocket连接
+	tradeWriteChan chan []byte      // 推送队列 Access -> Client
+	tradeChan      chan interface{} // 接收交易通知channel RabbitMQ -> Access
+	// wsTradeCloseChan chan struct{}    // 终端WebSocket连接关闭信号
 }
 
 // GetSerialNumber 获取可用流水号
@@ -58,25 +57,19 @@ func (r *Client) SetQuoteSubs(req request.QuoteSubscribeReq) {
 	r.quoteSubs = req.QuoteGoodses
 
 	if r.quoteChan != nil {
-		// r.unSubscribe <- struct{}{}
 		global.M2A_Publish.Unsubscribe(publish.Topic_Quote, r.quoteChan)
 	}
 	r.quoteChan = global.M2A_Publish.Subscribe(publish.Topic_Quote)
 
 	go func() {
 		for {
-			select {
-			case msg, ok := <-r.quoteChan:
-				if !ok {
-					return // 管道已关闭,退出循环
-				}
-				// 向客户端发送行情信息
-				if p, ok := msg.(*packet.MiQuotePacket); ok {
-					DispatchRealQuote(p, r)
-				}
-			case <-r.unSubscribe:
-				// 取消订阅信息
-				return
+			msg, ok := <-r.quoteChan
+			if !ok {
+				return // 管道已关闭,退出循环
+			}
+			// 向客户端发送行情信息
+			if p, ok := msg.(*packet.MiQuotePacket); ok {
+				DispatchRealQuote(p, r)
 			}
 		}
 	}()
@@ -204,14 +197,25 @@ func (r *Client) readClientWsQuoteMessage() {
 // 所以由统一协程写入
 func (r *Client) writeClientWsQuoteMessage() {
 	// defer r.close()
+	// for {
+	// 	select {
+	// 	case buf := <-r.quoteWriteChan:
+	// 		err := r.wsQuoteConn.WriteMessage(websocket.BinaryMessage, buf)
+	// 		if err != nil {
+	// 			return
+	// 		}
+	// 	case <-r.wsQuoteCloseChan: // 与终端连接关闭信息
+	// 		return
+	// 	}
+	// }
 	for {
-		select {
-		case buf := <-r.quoteWriteChan:
-			err := r.wsQuoteConn.WriteMessage(websocket.BinaryMessage, buf)
-			if err != nil {
-				return
-			}
-		case <-r.wsQuoteCloseChan: // 与终端连接关闭信息
+		buf, ok := <-r.quoteWriteChan
+		if !ok {
+			// 通道已关闭
+			return
+		}
+		err := r.wsQuoteConn.WriteMessage(websocket.BinaryMessage, buf)
+		if err != nil {
 			return
 		}
 	}
@@ -290,15 +294,14 @@ func (r *Client) readClientWsTradeMessage() {
 // writeClientWsQuoteMessage 由于websocket非线程安全,
 // 所以由统一协程写入
 func (r *Client) writeClientWsTradeMessage() {
-	// defer r.close()
 	for {
-		select {
-		case buf := <-r.tradeWriteChan:
-			err := r.wsTradeConn.WriteMessage(websocket.BinaryMessage, buf)
-			if err != nil {
-				return
-			}
-		case <-r.wsTradeCloseChan: // 与终端连接关闭信息
+		buf, ok := <-r.tradeWriteChan
+		if !ok {
+			// 通道已关闭
+			return
+		}
+		err := r.wsTradeConn.WriteMessage(websocket.BinaryMessage, buf)
+		if err != nil {
 			return
 		}
 	}

+ 1 - 1
client/msgRealQuote.go

@@ -41,7 +41,7 @@ func DispatchRealQuote(p *packet.MiQuotePacket, clinet *Client) {
 		sendBuf := quote.EnPack()
 
 		// 发送给客户端
-		clinet.quoteChan <- sendBuf
+		clinet.quoteWriteChan <- sendBuf
 	}
 }