zhou.xiaoning 2 лет назад
Родитель
Сommit
c389784d14
3 измененных файлов с 84 добавлено и 39 удалено
  1. 66 35
      client/client.go
  2. 8 0
      client/msgRealQuote.go
  3. 10 4
      initialize/rabbitmq.go

+ 66 - 35
client/client.go

@@ -33,9 +33,9 @@ type Client struct {
 	quoteSubs      []request.QuoteGoods // 当前已订阅行情的商品
 	// wsQuoteCloseChan chan struct{}        // 终端WebSocket连接关闭信号
 
-	wsTradeConn    *websocket.Conn // 终端交易WebSocket连接
-	tradeWriteChan chan []byte     // 推送队列 Access -> Client
-	// tradeChan      chan interface{} // 接收交易通知channel RabbitMQ -> Access
+	wsTradeConn    *websocket.Conn  // 终端交易WebSocket连接
+	tradeWriteChan chan []byte      // 推送队列 Access -> Client
+	tradeChan      chan interface{} // 接收交易通知channel RabbitMQ -> Access
 	// wsTradeCloseChan chan struct{}    // 终端WebSocket连接关闭信号
 }
 
@@ -77,48 +77,76 @@ func (r *Client) SetQuoteSubs(req request.QuoteSubscribeReq) {
 	}()
 }
 
-// GetQuoteSubs 获取已订阅行情商品信息列表
-func (r *Client) GetQuoteSubs() (quoteSubs []request.QuoteGoods) {
+// SetQuoteSubs 设置交易通知订阅信息
+func (r *Client) SetTradeNft() {
 	r.mtx.Lock()
 	defer r.mtx.Unlock()
 
-	return r.quoteSubs
-}
-
-// WriteWsBuf 向客户端发送实时行情
-func (r *Client) WriteQuoteWsBuf(buf []byte) (err error) {
-	if r.quoteWriteChan != nil {
-		r.quoteWriteChan <- buf
+	// 从订阅中心订阅交易通知
+	if r.tradeChan != nil {
+		global.M2A_Publish.Unsubscribe(publish.Topic_Trading, r.tradeChan)
 	}
+	r.tradeChan = global.M2A_Publish.Subscribe(publish.Topic_Trading)
 
-	return
+	go func() {
+		for {
+			msg, ok := <-r.tradeChan
+			if !ok {
+				return // 管道已关闭,退出循环
+			}
+			// 向客户端发送交易通知信息
+			if m, ok := msg.(map[string]*[]byte); ok {
+				// 判断是否自己的通知
+				if p, ok := m[r.SessionID]; ok {
+					DispatchNftTrade(p, r)
+				}
+			}
+		}
+	}()
 }
 
-func (r *Client) WriteTradeWsBuf(buf []byte) (err error) {
-	if r.tradeWriteChan != nil {
-		r.tradeWriteChan <- buf
-	} else {
-		global.M2A_LOG.Info("WriteTradeWsBuf::tradeWriteChan == nil", zap.Any("LoginID", r.LoginID), zap.Any("SessionID", r.SessionID))
-	}
-
-	if r.wsTradeConn != nil {
-		// r.tradeWriteChan <- buf
-	} else {
-		global.M2A_LOG.Info("WriteTradeWsBuf::wsTradeConn == nil", zap.Any("LoginID", r.LoginID), zap.Any("SessionID", r.SessionID))
-		global.M2A_LOG.Info("当前所有Client----")
-		for _, item := range Clients {
-			global.M2A_LOG.Info("client",
-				zap.Any("LoginID", item.LoginID),
-				zap.Any("Group", item.Group), zap.Any("SessionID", item.SessionID),
-				zap.Any("wsTradeConn", item.wsTradeConn), zap.Any("tradeWriteChan", item.tradeWriteChan),
-				zap.Any("wsQuoteConn", item.wsQuoteConn), zap.Any("quoteWriteChan", item.quoteWriteChan))
-		}
-		global.M2A_LOG.Info("----------------")
-	}
+// GetQuoteSubs 获取已订阅行情商品信息列表
+func (r *Client) GetQuoteSubs() (quoteSubs []request.QuoteGoods) {
+	r.mtx.Lock()
+	defer r.mtx.Unlock()
 
-	return
+	return r.quoteSubs
 }
 
+// // WriteWsBuf 向客户端发送实时行情
+// func (r *Client) WriteQuoteWsBuf(buf []byte) (err error) {
+// 	if r.quoteWriteChan != nil {
+// 		r.quoteWriteChan <- buf
+// 	}
+
+// 	return
+// }
+
+// func (r *Client) WriteTradeWsBuf(buf []byte) (err error) {
+// 	if r.tradeWriteChan != nil {
+// 		r.tradeWriteChan <- buf
+// 	} else {
+// 		global.M2A_LOG.Info("WriteTradeWsBuf::tradeWriteChan == nil", zap.Any("LoginID", r.LoginID), zap.Any("SessionID", r.SessionID))
+// 	}
+
+// 	if r.wsTradeConn != nil {
+// 		// r.tradeWriteChan <- buf
+// 	} else {
+// 		global.M2A_LOG.Info("WriteTradeWsBuf::wsTradeConn == nil", zap.Any("LoginID", r.LoginID), zap.Any("SessionID", r.SessionID))
+// 		global.M2A_LOG.Info("当前所有Client----")
+// 		for _, item := range Clients {
+// 			global.M2A_LOG.Info("client",
+// 				zap.Any("LoginID", item.LoginID),
+// 				zap.Any("Group", item.Group), zap.Any("SessionID", item.SessionID),
+// 				zap.Any("wsTradeConn", item.wsTradeConn), zap.Any("tradeWriteChan", item.tradeWriteChan),
+// 				zap.Any("wsQuoteConn", item.wsQuoteConn), zap.Any("quoteWriteChan", item.quoteWriteChan))
+// 		}
+// 		global.M2A_LOG.Info("----------------")
+// 	}
+
+// 	return
+// }
+
 // GetAsyncTask 获取目标异步任务
 // key:SessionId_FuncodeRsp
 func (r *Client) GetAsyncTask(key string) (asyncTask *AsyncTask) {
@@ -295,6 +323,9 @@ func (r *Client) SetTradeWebSocket(ws *websocket.Conn) (err error) {
 	// 开始推送客户端信息循环
 	go r.writeClientWsTradeMessage()
 
+	// 设置交易通知订阅信息
+	r.SetTradeNft()
+
 	global.M2A_LOG.Info("交易长连客户端接入", zap.Any("LoginID", r.LoginID), zap.Any("SessionID", r.SessionID))
 
 	return

+ 8 - 0
client/msgRealQuote.go

@@ -48,6 +48,14 @@ func DispatchRealQuote(p *packet.MiQuotePacket, clinet *Client) {
 	}
 }
 
+// 分发通知
+func DispatchNftTrade(p *[]byte, clinet *Client) {
+	// 发送给客户端
+	if clinet.tradeWriteChan != nil {
+		clinet.tradeWriteChan <- *p
+	}
+}
+
 // parseWareInfo 从报文中解析出所有报价商品
 func parseWareInfo(p *packet.MiQuotePacket) []wareInfo {
 	ware := make([]wareInfo, 0)

+ 10 - 4
initialize/rabbitmq.go

@@ -7,6 +7,7 @@ import (
 	"mtp20access/global"
 	rsp "mtp20access/model/mq/response"
 	"mtp20access/packet"
+	"mtp20access/publish"
 	"mtp20access/rabbitmq"
 	"mtp20access/res/pb"
 	accountSrv "mtp20access/service/account"
@@ -252,12 +253,17 @@ func (t *MQProc) onNtf(funcode uint32, sessionId uint32, bytes *[]byte) {
 		// 发送信息
 		for _, item := range clients {
 			// c := clients[i]
-			sessionId, _ := strconv.Atoi(item.SessionID)
-			c := client.Clients[sessionId]
-			c.WriteTradeWsBuf(b)
+			// sessionId, _ := strconv.Atoi(item.SessionID)
+			// c := client.Clients[sessionId]
+			// c.WriteTradeWsBuf(b)
+
+			// 分发给订阅者
+			m := make(map[string]*[]byte)
+			m[item.SessionID] = &b
+			global.M2A_Publish.Publish(publish.Topic_Trading, &b)
 
 			// 给客户端通知
-			global.M2A_LOG.Info("[S->C]给客户端通知", zap.Any("ntf", funcode), zap.Any("clients", c.LoginID), zap.Any("SessionID", c.SessionID), zap.Any("len", len(b)))
+			// global.M2A_LOG.Info("[S->C]给客户端通知", zap.Any("ntf", funcode), zap.Any("clients", c.LoginID), zap.Any("SessionID", c.SessionID), zap.Any("len", len(b)))
 		}
 	}
 }