zhou.xiaoning 2 年之前
父節點
當前提交
556df7f33b
共有 3 個文件被更改,包括 22 次插入20 次删除
  1. 15 13
      client/client.go
  2. 4 4
      config.yaml
  3. 3 3
      initialize/rabbitmq.go

+ 15 - 13
client/client.go

@@ -31,11 +31,13 @@ type Client struct {
 	quoteWriteChan chan []byte          // 推送队列 Access -> Client
 	quoteChan      chan interface{}     // 接收实时行情订阅channel QuoteServer -> Access
 	quoteSubs      []request.QuoteGoods // 当前已订阅行情的商品
+	qmtx           sync.RWMutex
 	// wsQuoteCloseChan chan struct{}        // 终端WebSocket连接关闭信号
 
 	wsTradeConn    *websocket.Conn  // 终端交易WebSocket连接
 	tradeWriteChan chan []byte      // 推送队列 Access -> Client
 	tradeChan      chan interface{} // 接收交易通知channel RabbitMQ -> Access
+	tmtx           sync.RWMutex
 	// wsTradeCloseChan chan struct{}    // 终端WebSocket连接关闭信号
 }
 
@@ -52,8 +54,8 @@ func (r *Client) GetSerialNumber() uint32 {
 
 // SetQuoteSubs 设置商品行情订阅信息
 func (r *Client) SetQuoteSubs(req request.QuoteSubscribeReq) {
-	r.mtx.Lock()
-	defer r.mtx.Unlock()
+	r.qmtx.Lock()
+	defer r.qmtx.Unlock()
 
 	r.quoteSubs = req.QuoteGoodses
 
@@ -79,8 +81,8 @@ func (r *Client) SetQuoteSubs(req request.QuoteSubscribeReq) {
 
 // SetQuoteSubs 设置交易通知订阅信息
 func (r *Client) SetTradeNft() {
-	r.mtx.Lock()
-	defer r.mtx.Unlock()
+	r.tmtx.Lock()
+	defer r.tmtx.Unlock()
 
 	// 从订阅中心订阅交易通知
 	if r.tradeChan != nil {
@@ -95,10 +97,10 @@ func (r *Client) SetTradeNft() {
 				return // 管道已关闭,退出循环
 			}
 			// 向客户端发送交易通知信息
-			if m, ok := msg.(map[string]*[]byte); ok {
+			if m, ok := msg.(map[string][]byte); ok {
 				// 判断是否自己的通知
 				if p, ok := m[r.SessionID]; ok {
-					DispatchNftTrade(p, r)
+					DispatchNftTrade(&p, r)
 				}
 			}
 		}
@@ -107,8 +109,8 @@ func (r *Client) SetTradeNft() {
 
 // GetQuoteSubs 获取已订阅行情商品信息列表
 func (r *Client) GetQuoteSubs() (quoteSubs []request.QuoteGoods) {
-	r.mtx.Lock()
-	defer r.mtx.Unlock()
+	r.qmtx.Lock()
+	defer r.qmtx.Unlock()
 
 	return r.quoteSubs
 }
@@ -193,8 +195,8 @@ func (r *Client) GetAllAsyncTask() *map[string]*AsyncTask {
 // **************** Quote WebSocket ****************
 
 func (r *Client) SetQuoteWebSocket(ws *websocket.Conn) (err error) {
-	r.mtx.Lock()
-	defer r.mtx.Unlock()
+	r.qmtx.Lock()
+	defer r.qmtx.Unlock()
 
 	// if r.wsQuoteConn != nil {
 	// 	r.wsQuoteConn.Close()
@@ -304,8 +306,8 @@ func (r *Client) clientToQuoteAgentMsg(msg []byte) error {
 // **************** Trade WebSocket ****************
 
 func (r *Client) SetTradeWebSocket(ws *websocket.Conn) (err error) {
-	r.mtx.Lock()
-	defer r.mtx.Unlock()
+	r.tmtx.Lock()
+	defer r.tmtx.Unlock()
 
 	// if r.wsTradeConn != nil {
 	// 	r.wsTradeConn.Close()
@@ -324,7 +326,7 @@ func (r *Client) SetTradeWebSocket(ws *websocket.Conn) (err error) {
 	go r.writeClientWsTradeMessage()
 
 	// 设置交易通知订阅信息
-	// r.SetTradeNft()
+	r.SetTradeNft()
 
 	global.M2A_LOG.Info("交易长连客户端接入", zap.Any("LoginID", r.LoginID), zap.Any("SessionID", r.SessionID))
 

+ 4 - 4
config.yaml

@@ -20,7 +20,7 @@ jwt:
 # redis configuration
 redis:
   db: 0
-  addr: '192.168.31.171:5007'
+  addr: '192.168.31.204:5007'
   password: ''
 
 # system configuration
@@ -28,7 +28,7 @@ system:
   env: 'develop'  # "develop" & "public", Change to "develop" to skip authentication for development mode
   addr: 8888
   need-quote-publish: true # 是否需要连接行情发布服务
-  quote-publish-addr: '192.168.31.171:5004' # 行情发布服务地址
+  quote-publish-addr: '192.168.31.204:5004' # 行情发布服务地址
 
 # local configuration
 local:
@@ -41,14 +41,14 @@ oracle:
   address: '192.168.31.88'
   name: 'orcl'
   port: '1521'
-  user: 'mtp2_test171'
+  user: 'mtp2_test204'
   pwd: 'muchinfo'
   max-idle-conns: 10
   max-open-conns: 100
 
 # rabbitmq configuration
 rabbitmq:
-  url: 'amqp://guest:guest@192.168.31.171:5020/test'
+  url: 'amqp://guest:guest@192.168.31.204:5020/test'
   exchange: 'entry'
 
 # 跨域配置

+ 3 - 3
initialize/rabbitmq.go

@@ -258,9 +258,9 @@ func (t *MQProc) onNtf(funcode uint32, sessionId uint32, bytes *[]byte) {
 			// c.WriteTradeWsBuf(b)
 
 			// 分发给订阅者
-			m := make(map[string]*[]byte)
-			m[item.SessionID] = &b
-			global.M2A_Publish.Publish(publish.Topic_Trading, &b)
+			m := make(map[string][]byte)
+			m[item.SessionID] = b
+			global.M2A_Publish.Publish(publish.Topic_Trading, m)
 
 			// 给客户端通知
 			// global.M2A_LOG.Info("[S->C]给客户端通知", zap.Any("ntf", funcode), zap.Any("clients", c.LoginID), zap.Any("SessionID", c.SessionID), zap.Any("len", len(b)))