瀏覽代碼

支持交易通知推送

zhou.xiaoning 2 年之前
父節點
當前提交
08aec976ad
共有 10 個文件被更改,包括 82 次插入29 次删除
  1. 1 1
      api/v1/quote/quote.go
  2. 0 0
      api/v1/trade/trade.go
  3. 16 8
      client/client.go
  4. 1 1
      docs/docs.go
  5. 1 1
      docs/swagger.json
  6. 1 1
      docs/swagger.yaml
  7. 34 15
      initialize/rabbitmq.go
  8. 1 1
      router/trade.go
  9. 26 0
      service/account/login.go
  10. 1 1
      service/trade/trade.go

+ 1 - 1
api/v1/quote/quote.go

@@ -18,7 +18,7 @@ func Quote(c *gin.Context) {
 }
 }
 
 
 // SendMsgToMQ 订阅商品实时行情请求
 // SendMsgToMQ 订阅商品实时行情请求
-// @Summary  实时行情
+// @Summary  订阅商品实时行情请求
 // @Security ApiKeyAuth
 // @Security ApiKeyAuth
 // @accept   application/json
 // @accept   application/json
 // @Produce  application/json
 // @Produce  application/json

+ 0 - 0
api/trade/trade.go → api/v1/trade/trade.go


+ 16 - 8
client/client.go

@@ -32,9 +32,9 @@ type Client struct {
 	quoteSubs      []request.QuoteGoods // 当前已订阅行情的商品
 	quoteSubs      []request.QuoteGoods // 当前已订阅行情的商品
 	// wsQuoteCloseChan chan struct{}        // 终端WebSocket连接关闭信号
 	// wsQuoteCloseChan chan struct{}        // 终端WebSocket连接关闭信号
 
 
-	wsTradeConn    *websocket.Conn  // 终端交易WebSocket连接
-	tradeWriteChan chan []byte      // 推送队列 Access -> Client
-	tradeChan      chan interface{} // 接收交易通知channel RabbitMQ -> Access
+	wsTradeConn    *websocket.Conn // 终端交易WebSocket连接
+	tradeWriteChan chan []byte     // 推送队列 Access -> Client
+	// tradeChan      chan interface{} // 接收交易通知channel RabbitMQ -> Access
 	// wsTradeCloseChan chan struct{}    // 终端WebSocket连接关闭信号
 	// wsTradeCloseChan chan struct{}    // 终端WebSocket连接关闭信号
 }
 }
 
 
@@ -84,12 +84,20 @@ func (r *Client) GetQuoteSubs() (quoteSubs []request.QuoteGoods) {
 }
 }
 
 
 // WriteWsBuf 向客户端发送实时行情
 // WriteWsBuf 向客户端发送实时行情
-func (r *Client) WriteWsBuf(buf []byte) (err error) {
+func (r *Client) WriteQuoteWsBuf(buf []byte) (err error) {
 	r.quoteWriteChan <- buf
 	r.quoteWriteChan <- buf
 
 
 	return
 	return
 }
 }
 
 
+func (r *Client) WriteTradeWsBuf(buf []byte) (err error) {
+	if r.wsTradeConn != nil {
+		r.tradeWriteChan <- buf
+	}
+
+	return
+}
+
 // GetAsyncTask 获取目标异步任务
 // GetAsyncTask 获取目标异步任务
 // key:SessionId_FuncodeRsp
 // key:SessionId_FuncodeRsp
 func (r *Client) GetAsyncTask(key string) (asyncTask *AsyncTask) {
 func (r *Client) GetAsyncTask(key string) (asyncTask *AsyncTask) {
@@ -280,10 +288,10 @@ func (r *Client) readClientWsTradeMessage() {
 			_ = r.wsTradeConn.WriteMessage(mt, msg)
 			_ = r.wsTradeConn.WriteMessage(mt, msg)
 		case websocket.CloseMessage:
 		case websocket.CloseMessage:
 			return
 			return
-		case websocket.BinaryMessage:
-			if err := r.clientToQuoteAgentMsg(msg); err != nil {
-				return
-			}
+		// case websocket.BinaryMessage:
+		// 	if err := r.clientToQuoteAgentMsg(msg); err != nil {
+		// 		return
+		// 	}
 		case websocket.TextMessage:
 		case websocket.TextMessage:
 			fmt.Println(string(msg))
 			fmt.Println(string(msg))
 			_ = r.wsTradeConn.WriteMessage(mt, msg)
 			_ = r.wsTradeConn.WriteMessage(mt, msg)

+ 1 - 1
docs/docs.go

@@ -217,7 +217,7 @@ const docTemplate = `{
                 "tags": [
                 "tags": [
                     "实时行情"
                     "实时行情"
                 ],
                 ],
-                "summary": "实时行情",
+                "summary": "订阅商品实时行情请求",
                 "parameters": [
                 "parameters": [
                     {
                     {
                         "description": "入参1",
                         "description": "入参1",

+ 1 - 1
docs/swagger.json

@@ -208,7 +208,7 @@
                 "tags": [
                 "tags": [
                     "实时行情"
                     "实时行情"
                 ],
                 ],
-                "summary": "实时行情",
+                "summary": "订阅商品实时行情请求",
                 "parameters": [
                 "parameters": [
                     {
                     {
                         "description": "入参1",
                         "description": "入参1",

+ 1 - 1
docs/swagger.yaml

@@ -237,7 +237,7 @@ paths:
               type: object
               type: object
       security:
       security:
       - ApiKeyAuth: []
       - ApiKeyAuth: []
-      summary: 实时行情
+      summary: 订阅商品实时行情请求
       tags:
       tags:
       - 实时行情
       - 实时行情
 securityDefinitions:
 securityDefinitions:

+ 34 - 15
initialize/rabbitmq.go

@@ -9,6 +9,7 @@ import (
 	"mtp20access/packet"
 	"mtp20access/packet"
 	"mtp20access/rabbitmq"
 	"mtp20access/rabbitmq"
 	"mtp20access/res/pb"
 	"mtp20access/res/pb"
+	accountSrv "mtp20access/service/account"
 	"mtp20access/utils"
 	"mtp20access/utils"
 
 
 	// "github.com/golang/protobuf/proto"
 	// "github.com/golang/protobuf/proto"
@@ -52,10 +53,9 @@ func (t *MQProc) Process(topic, queuename string, msg *[]byte) {
 	if funcode, sessionId, bytes, serialNumber, err := t.getRspProtobuf(msg); err == nil && bytes != nil {
 	if funcode, sessionId, bytes, serialNumber, err := t.getRspProtobuf(msg); err == nil && bytes != nil {
 		if sessionId == 0 {
 		if sessionId == 0 {
 			// 通知类
 			// 通知类
-
+			t.onNtf(funcode, bytes)
 		} else {
 		} else {
-			// 请求回复
-
+			// 请求回复W
 			// 尝试获取对应异步任务
 			// 尝试获取对应异步任务
 			if client, exists := client.Clients[int(sessionId)]; exists {
 			if client, exists := client.Clients[int(sessionId)]; exists {
 				key := fmt.Sprintf("%v_%v_%v", sessionId, funcode, serialNumber)
 				key := fmt.Sprintf("%v_%v_%v", sessionId, funcode, serialNumber)
@@ -96,6 +96,36 @@ func (t *MQProc) Process(topic, queuename string, msg *[]byte) {
 	}
 	}
 }
 }
 
 
+func (t *MQProc) onNtf(funcode uint32, bytes *[]byte) {
+	switch int(funcode) {
+	case global.MoneyChangedNtf: // 资金变化通知
+		var p pb.MoneyChangedNtf
+		if err := proto.Unmarshal(*bytes, &p); err != nil {
+			global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
+			return
+		}
+
+		// 组装待发送给客户端的5.0报文包
+		b, err := packet.BuildPacket(funcode, 0, 0, *bytes, true)
+		if err != nil {
+			global.M2A_LOG.Error("组装5.0报文失败", zap.Error(err))
+			return
+		}
+
+		// 获取目标客户
+		clients, err := accountSrv.GetClientsByAccountID(*p.AccountID)
+		if err != nil {
+			return
+		}
+
+		// 发送信息
+		for i := range clients {
+			c := clients[i]
+			c.WriteTradeWsBuf(b)
+		}
+	}
+}
+
 // getRspProtobuf 将总线回复的数据反序列化为Protobuf
 // getRspProtobuf 将总线回复的数据反序列化为Protobuf
 func (t *MQProc) getRspProtobuf(msg *[]byte) (funcode uint32, sessionId uint32, bytes *[]byte, serialNumber uint32, err error) {
 func (t *MQProc) getRspProtobuf(msg *[]byte) (funcode uint32, sessionId uint32, bytes *[]byte, serialNumber uint32, err error) {
 	// 分解总线包信息
 	// 分解总线包信息
@@ -107,18 +137,7 @@ func (t *MQProc) getRspProtobuf(msg *[]byte) (funcode uint32, sessionId uint32,
 
 
 	switch int(funcode) {
 	switch int(funcode) {
 	case global.MoneyChangedNtf: // 资金变化通知
 	case global.MoneyChangedNtf: // 资金变化通知
-		var p pb.MoneyChangedNtf
-		if err = proto.Unmarshal(b, &p); err != nil {
-			global.M2A_LOG.Error("总线通知数据反序列化失败", zap.Error(err))
-			return
-		}
-		if bs, e := protojson.Marshal(&p); e != nil {
-			global.M2A_LOG.Error("总线通知数据反序列化失败", zap.Error(err))
-			return
-		} else {
-			bytes = &bs
-			serialNumber = p.GetHeader().GetRequestID()
-		}
+		bytes = &b
 	case global.ModifyPwdRsp: // 修改账户密码应答
 	case global.ModifyPwdRsp: // 修改账户密码应答
 		var p pb.ModifyPwdRsp
 		var p pb.ModifyPwdRsp
 		if err = proto.Unmarshal(b, &p); err != nil {
 		if err = proto.Unmarshal(b, &p); err != nil {

+ 1 - 1
router/trade.go

@@ -1,7 +1,7 @@
 package router
 package router
 
 
 import (
 import (
-	"mtp20access/api/trade"
+	"mtp20access/api/v1/trade"
 
 
 	"github.com/gin-gonic/gin"
 	"github.com/gin-gonic/gin"
 )
 )

+ 26 - 0
service/account/login.go

@@ -214,7 +214,33 @@ func RestoreLoginWithToken(loginID int, group int, token string) (err error) {
 	return
 	return
 }
 }
 
 
+// GetClientsByAccountID 通过资金账户获取所有的
 func GetClientsByAccountID(accountID uint64) (clients []*client.Client, err error) {
 func GetClientsByAccountID(accountID uint64) (clients []*client.Client, err error) {
+	clients = make([]*client.Client, 0)
+
+	loginIds := make([]string, 0)
+	if err = global.M2A_DB.Table("loginaccount t").
+		Select(`t.loginid`).
+		Join("INNER", "taaccount a", "a.userid = t.userid").
+		Where("a.accountid = ?", accountID).Find(&loginIds); err != nil {
+
+		global.M2A_LOG.Error("获取LoginID失败", zap.Error(err))
+		return
+	}
+
+	var mtx sync.RWMutex
+	mtx.Lock()
+	defer mtx.Unlock()
+	if len(loginIds) > 0 && len(client.Clients) > 0 {
+		for _, item := range loginIds {
+			for i := range client.Clients {
+				c := client.Clients[i]
+				if c.LoginID == item {
+					clients = append(clients, c)
+				}
+			}
+		}
+	}
 
 
 	return
 	return
 }
 }

+ 1 - 1
service/trade/trade.go

@@ -93,7 +93,7 @@ func TradeConn(c *gin.Context) (err error) {
 		return
 		return
 	}
 	}
 
 
-	client.SetQuoteWebSocket(ws)
+	client.SetTradeWebSocket(ws)
 
 
 	return
 	return
 }
 }