Преглед изворни кода

增加实时行情下发相关功能

zhou.xiaoning пре 2 година
родитељ
комит
d879785907

+ 39 - 0
api/v1/quote/quote.go

@@ -0,0 +1,39 @@
+package quote
+
+import (
+	"mtp20access/model/common/response"
+	"mtp20access/model/quote/request"
+	"mtp20access/service/quote"
+
+	"github.com/gin-gonic/gin"
+)
+
+func Quote(c *gin.Context) {
+	if err := quote.QuoteConn(c); err != nil {
+		response.FailWithMessage(err.Error(), c)
+		// return
+	}
+
+	// response.OkWithMessage("连接成功", c)
+}
+
+// SendMsgToMQ 订阅商品实时行情请求
+// @Summary  实时行情
+// @Security ApiKeyAuth
+// @accept   application/json
+// @Produce  application/json
+// @Param    data body     []request.QuoteSubscribeReq   true "入参"
+// @Success  200  {object} response.Response{msg=string} "出参"
+// @Router   /Quote/QuoteSubscribe [post]
+// @Tags     实时行情
+func QuoteSubscribe(c *gin.Context) {
+	var req []request.QuoteSubscribeReq
+	if err := c.ShouldBindJSON(&req); err != nil {
+		response.FailWithMessage("入参不正确", c)
+		return
+	}
+
+	if err := quote.QuoteSubscribe(c, req); err != nil {
+		response.FailWithMessage(err.Error(), c)
+	}
+}

+ 270 - 0
client/client.go

@@ -0,0 +1,270 @@
+package client
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"mtp20access/global"
+	rsp "mtp20access/model/mq/response"
+	"mtp20access/model/quote/request"
+	"mtp20access/packet"
+	"mtp20access/publish"
+	"sync"
+
+	"github.com/gorilla/websocket"
+	"github.com/mitchellh/mapstructure"
+)
+
+var Clients map[int]*Client // key:SessionID
+
+type Client struct {
+	LoginRedis
+
+	mtx             sync.RWMutex
+	curSerialNumber uint32                // 当前业务流水号
+	asyncTasks      map[string]*AsyncTask // key:SessionId_FuncodeRsp_SerialNumber
+
+	wsConn      *websocket.Conn             // 终端WebSocket连接
+	quoteSubs   []request.QuoteSubscribeReq // 当前已订阅行情的商品
+	ch          chan interface{}            // 接收实时行情订阅channel
+	unSubscribe chan struct{}               // 取消订阅信号
+	writeChan   chan []byte                 // 推送队列 QuoteServer -> Client
+	wsCloseChan chan struct{}               // 终端WebSocket连接关闭信号
+}
+
+// GetSerialNumber 获取可用流水号
+func (r *Client) GetSerialNumber() uint32 {
+	r.mtx.Lock()
+	defer func() {
+		r.mtx.Unlock()
+	}()
+	r.curSerialNumber += 1
+
+	return r.curSerialNumber
+}
+
+// SetQuoteSubs 设置商品行情订阅信息
+func (r *Client) SetQuoteSubs(req []request.QuoteSubscribeReq) {
+	r.mtx.Lock()
+	defer r.mtx.Unlock()
+
+	r.quoteSubs = req
+
+	if r.ch != nil {
+		r.unSubscribe <- struct{}{}
+		global.M2A_Publish.Unsubscribe(publish.Topic_Quote, r.ch)
+	}
+	r.ch = global.M2A_Publish.Subscribe(publish.Topic_Quote)
+
+	go func() {
+		for {
+			select {
+			case msg, ok := <-r.ch:
+				if !ok {
+					return // 管道已关闭,退出循环
+				}
+				// 向客户端发送行情信息
+				if p, ok := msg.(*packet.MiQuotePacket); ok {
+					DispatchRealQuote(p, r)
+				}
+			case <-r.unSubscribe:
+				// 取消订阅信息
+				return
+			}
+		}
+	}()
+}
+
+// WriteWsBuf 向客户端发送实时行情
+func (r *Client) WriteWsBuf(buf []byte) (err error) {
+	r.writeChan <- buf
+
+	return
+}
+
+// GetAsyncTask 获取目标异步任务
+// key:SessionId_FuncodeRsp
+func (r *Client) GetAsyncTask(key string) (asyncTask *AsyncTask) {
+	r.mtx.RLock()
+	defer func() {
+		r.mtx.RUnlock()
+	}()
+
+	asyncTask, ok := r.asyncTasks[key]
+	if ok {
+		return asyncTask
+	} else {
+		return nil
+	}
+}
+
+// SetAsyncTask 设置异步任务
+func (r *Client) SetAsyncTask(asyncTask *AsyncTask, key string) {
+	r.mtx.Lock()
+	defer func() {
+		r.mtx.Unlock()
+	}()
+
+	if r.asyncTasks == nil {
+		r.asyncTasks = make(map[string]*AsyncTask, 0)
+	}
+	delete(r.asyncTasks, key)
+	r.asyncTasks[key] = asyncTask
+}
+
+// DeleteAsyncTask 删除异步任务
+func (r *Client) DeleteAsyncTask(key string) {
+	r.mtx.Lock()
+	defer func() {
+		r.mtx.Unlock()
+	}()
+	delete(r.asyncTasks, key)
+}
+
+func (r *Client) GetAllAsyncTask() *map[string]*AsyncTask {
+	return &r.asyncTasks
+}
+
+func (r *Client) SetWebSocket(ws *websocket.Conn) (err error) {
+	r.mtx.Lock()
+	defer r.mtx.Unlock()
+
+	if r.wsConn != nil {
+		r.wsConn.Close()
+	}
+	r.wsConn = ws
+
+	// 开始读取客户端发送信息
+	go r.readClientWsMessage()
+	// 开始推送客户端信息循环
+	r.writeChan = make(chan []byte, 100)
+	go r.writeClientWsMessage()
+
+	return
+}
+
+// readClientWsMessage 处理终端发过来的websocket数据
+// 注意: 阻塞式, 直到websocket关闭才退出
+func (r *Client) readClientWsMessage() {
+	for {
+		mt, msg, err := r.wsConn.ReadMessage()
+		if err != nil {
+			fmt.Println(err)
+			break
+		}
+
+		switch mt {
+		case websocket.PingMessage:
+			_ = r.wsConn.WriteMessage(mt, msg)
+		case websocket.CloseMessage:
+			return
+		case websocket.BinaryMessage:
+			if err := r.clientToQuoteAgentMsg(msg); err != nil {
+				return
+			}
+		case websocket.TextMessage:
+			fmt.Println(string(msg))
+			_ = r.wsConn.WriteMessage(mt, msg)
+		}
+	}
+}
+
+// writeClientWsMessage 由于websocket非线程安全,
+// 所以由统一协程写入
+func (r *Client) writeClientWsMessage() {
+	// defer r.close()
+	for {
+		select {
+		case buf := <-r.writeChan:
+			err := r.wsConn.WriteMessage(websocket.BinaryMessage, buf)
+			if err != nil {
+				return
+			}
+		case <-r.wsCloseChan: // 与终端连接关闭信息
+			return
+		}
+	}
+}
+
+// clientToQuoteAgentMsg 处理客户端发上来的包
+func (r *Client) clientToQuoteAgentMsg(msg []byte) error {
+	var p packet.MiQuotePacket
+	err := p.UnPackHead(msg)
+	if err != nil {
+		// logger.Logger().Errorf("[%v c->s] invalid packet: %v", r.ProxyId, err)
+		return err
+	}
+	// 长度15 是心跳包, 目前只允许客户端上行心跳包
+	if p.Length == 15 {
+		// logger.Logger().Debugf("%v [%v c->s] %s", r.ClientIP, r.ProxyId, p.HeaderInfo())
+		// 将心跳发回给客户端
+		err = r.wsConn.WriteMessage(websocket.BinaryMessage, msg)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+// MQPacket 与总线交互的数据体
+type MQPacket struct {
+	FunCode   uint32  // 功能码
+	SessionId uint32  // 数据包的sid
+	Data      *[]byte // 业务数据体
+}
+
+// AsyncTask 异步任务结构体
+type AsyncTask struct {
+	PacketRsp    chan MQPacket // 总线数据处理通道
+	FuncodeRsp   uint32        // 回复功能码
+	SerialNumber uint32        // 通信流水号
+	Own          *Client
+	IsEncrypted  bool // 是否加密
+
+	Rsp     chan rsp.MQBodyRsp // 回调
+	doClose sync.Once          // 仅关闭通道一次
+}
+
+// Finish 完成
+func (r *AsyncTask) Finish() {
+	r.doClose.Do(
+		func() {
+			close(r.Rsp)
+			key := fmt.Sprintf("%v_%v_%v", r.Own.SessionID, r.FuncodeRsp, r.SerialNumber)
+			r.Own.DeleteAsyncTask(key)
+		})
+}
+
+type LoginRedis struct {
+	LoginID   string `json:"loginId" redis:"loginId"`     // 登陆账号
+	UserID    string `json:"userId" redis:"userId"`       // 用户ID
+	SessionID string `json:"sessionId" redis:"sessionId"` // 终端sid
+	Token     string `json:"token" redis:"token"`         // 令牌
+	Group     string `json:"group" redis:"group"`         // 终端分组
+	Addr      string `json:"addr" redis:"addr"`           // 客户端地址信息 // FIXME: 由于本服务改用短连,所以每次提交请交请求可能会不一样,后期可判断是否在中间件中进行拦截
+}
+
+// FromMap Map to Struct
+func (r *LoginRedis) FromMap(val map[string]interface{}) error {
+	return mapstructure.Decode(val, r)
+}
+
+// ToMap Struct to Map
+func (r *LoginRedis) ToMap() (val map[string]interface{}, err error) {
+	if marshalContent, err := json.Marshal(r); err != nil {
+		return nil, err
+	} else {
+		d := json.NewDecoder(bytes.NewReader(marshalContent))
+		d.UseNumber() // 设置将float64转为一个number
+		if err := d.Decode(&val); err != nil {
+			fmt.Println(err)
+		} else {
+			for k, v := range val {
+				val[k] = v
+			}
+		}
+	}
+
+	return
+}

+ 82 - 0
client/msgRealQuote.go

@@ -0,0 +1,82 @@
+/**
+* @Author  : zou.yingbin
+* @Create  : 2022/3/26 14:52
+* @Modify  : 2022/3/26 14:52
+* @note    :
+ */
+
+package client
+
+import (
+	"mtp20access/packet"
+)
+
+// DispatchRealQuote 分发行情
+func DispatchRealQuote(p *packet.MiQuotePacket, clinet *Client) {
+	// TODO: 目前只实现了订阅发送模式, 未支持全部发送模式
+	// 解析接收到的商品
+	// ware := parseWareInfo(p)
+
+	// if len(ware) == 1 {
+	// 	// 只有一个商品, 不需要重新打包,提高效率
+	// 	v := ware[0]
+	// 	if s, ok := subMgr.getSession(v.exchId, v.goodsCode); ok {
+	// 		for i := range s {
+	// 			err := clinet.WriteBuf(p.OriMsg)
+	// 			if err != nil {
+	// 				// Logger().Errorf("%s push quote error:%v", s[i].info(), err)
+	// 			}
+	// 		}
+	// 	}
+	// } else {
+	// 	// 分发行情到链接
+	// 	for _, v := range ware {
+	// 		if s, ok := subMgr.getSession(v.exchId, v.goodsCode); ok {
+	// 			// 按商品重新打包
+	// 			quote := packet.MiQuotePacket{
+	// 				BigType:   p.BigType,
+	// 				SmallType: p.SmallType,
+	// 				SerialNum: p.SerialNum,
+	// 				Mode:      p.Mode,
+	// 			}
+	// 			quote.Msg = v.buf
+	// 			sendBuf := quote.EnPack()
+	// 			for i := range s {
+	// 				err := s[i].WriteBuf(sendBuf)
+	// 				if err != nil {
+	// 					// Logger().Errorf("%s push quote error:%v", s[i].info(), err)
+	// 				}
+	// 			}
+	// 		}
+	// 	}
+	// }
+	clinet.WriteWsBuf(p.OriMsg)
+}
+
+// parseWareInfo 从报文中解析出所有报价商品
+func parseWareInfo(p *packet.MiQuotePacket) []wareInfo {
+	data := p.OriMsg[14:]
+	ware := make([]wareInfo, 0)
+
+	nPos1, nPos2 := -1, -1
+	for i := 0; i < len(data); i++ {
+		// 报价包开始
+		if data[i] == 0x10 {
+			nPos1 = i
+		}
+		// 报价包结束
+		if data[i] == 0x11 {
+			nPos2 = i + 1
+		}
+		if nPos1 >= 0 && nPos2 > 0 {
+			v := wareInfo{buf: data[nPos1:nPos2]}
+			v.parseField()
+			//v.printInfo()
+			//v.debugPrintAllField()
+			ware = append(ware, v)
+			// 重置nPos1, nPos2 继续查找下一个报价包
+			nPos1, nPos2 = -1, -1
+		}
+	}
+	return ware
+}

+ 88 - 0
client/wareInfo.go

@@ -0,0 +1,88 @@
+/**
+* @Author  : zou.yingbin
+* @Create  : 2022/3/23 11:05
+* @Modify  : 2022/3/23 11:05
+* @note    :
+ */
+
+package client
+
+import (
+	"log"
+	"strings"
+)
+
+type wareInfo struct {
+	buf       []byte // 内容[报价开始(0x10):报价结束(0x11)]
+	exchId    string // 交易所id
+	goodsCode string // 商品代码
+	ask       string // 卖价
+	bid       string // 买价
+	last      string // 最新价
+	tm        string // 时间
+}
+
+func (r *wareInfo) getField(tag byte) string {
+	for i, v := range r.buf {
+		if v == tag {
+			for j, d := range r.buf[i:] {
+				if d == 0x01 || d == 0x11 {
+					buf := r.buf[i+2 : i+j]
+					return string(buf)
+				}
+			}
+		}
+	}
+	return ""
+}
+
+func (r *wareInfo) parseField() {
+	r.goodsCode = r.getField(0x21)
+	r.ask = r.getField(0x4c)
+	r.bid = r.getField(0x42)
+	r.last = r.getField(0x24)
+	r.tm = r.getField(0x41)
+	r.exchId = r.getField(0x56)
+	if r.exchId == "200" {
+		r.goodsCode = "200"
+	}
+}
+
+func (r *wareInfo) printInfo() {
+	if r.ask == "" {
+		return
+	}
+	log.Println(r.exchId, r.goodsCode, r.ask, r.bid, r.last, r.tm)
+}
+
+func (r *wareInfo) debugPrintAllField() {
+	if len(r.buf) <= 2 {
+		return
+	}
+	if r.buf[0] != 0x10 || r.buf[len(r.buf)-1] != 0x11 {
+		return
+	}
+	type Field struct {
+		Tag   byte
+		Value string
+	}
+	var sField []Field
+	tmp := string(r.buf[2 : len(r.buf)-1])
+	//0x01 字段分割
+	strS := strings.Split(tmp, string([]byte{0x01}))
+	for _, v := range strS {
+		//0x02 值分割
+		split := strings.Split(v, string([]byte{0x02}))
+		if len(split) != 2 {
+			break
+		}
+		d := Field{
+			Tag:   ([]byte(split[0]))[0],
+			Value: split[1],
+		}
+		sField = append(sField, d)
+	}
+	for _, v := range sField {
+		log.Printf("0x%0x %s\n", v.Tag, v.Value)
+	}
+}

+ 68 - 0
config.j2

@@ -0,0 +1,68 @@
+# zap logger configuration
+zap:
+  level: 'info'
+  prefix: '[MTP20_Access]'
+  format: 'console'
+  director: 'log'
+  encode-level: 'LowercaseColorLevelEncoder'
+  stacktrace-key: 'stacktrace'
+  max-age: 30 # 默认日志留存默认以天为单位
+  show-line: true
+  log-in-console: true
+
+# jwt configuration
+jwt:
+  signing-key: 'IVThJraI1R52mE7b'
+  expires-time: 604800
+  buffer-time: 86400
+  issuer: 'Muchinfo'
+
+# redis configuration
+redis:
+  db: 0
+  addr: {{redis_addr}}
+  password: ''
+
+# system configuration
+system:
+  env: 'develop'  # "develop" & "public", Change to "develop" to skip authentication for development mode
+  addr: 8888
+  need-quote-publish: false # 是否需要连接行情发布服务
+  quote-publish-addr: {{quote_publish_addr}} # 行情发布服务地址
+
+# local configuration
+local:
+  path: 'uploads/file' # 访问路径
+  store-path: 'uploads/file' # 存储路径
+
+# oracle configuration
+oracle:
+  driver: 'oci8'
+  address: '{{oracle_address}}'
+  name: 'orcl'
+  port: '{{oracle_port}}'
+  user: '{{oracle_user}}'
+  pwd: '{{oracle_pwd}}'
+  max-idle-conns: 10
+  max-open-conns: 100
+
+# rabbitmq configuration
+rabbitmq:
+  url: '{{rabbitmq_url}}'
+  exchange: 'entry'
+
+# 跨域配置
+# 需要配合 server/initialize/router.go#L32 使用
+cors:
+  mode: allow-all # 放行模式: allow-all, 放行全部; whitelist, 白名单模式, 来自白名单内域名的请求添加 cors 头; strict-whitelist 严格白名单模式, 白名单外的请求一律拒绝
+  whitelist:
+    - allow-origin: example1.com
+      allow-headers: content-type
+      allow-methods: GET, POST
+      expose-headers: Content-Length, Access-Control-Allow-Origin, Access-Control-Allow-Headers, Content-Type
+      allow-credentials: true # 布尔值
+    - allow-origin: example2.com
+      allow-headers: content-type
+      allow-methods: GET, POST
+      expose-headers: Content-Length, Access-Control-Allow-Origin, Access-Control-Allow-Headers, Content-Type
+      allow-credentials: true # 布尔值

+ 5 - 3
config.yaml

@@ -20,13 +20,15 @@ jwt:
 # redis configuration
 redis:
   db: 0
-  addr: '192.168.31.201:5007'
+  addr: '192.168.31.139:5007'
   password: ''
 
 # system configuration
 system:
   env: 'develop'  # "develop" & "public", Change to "develop" to skip authentication for development mode
   addr: 8888
+  need-quote-publish: false # 是否需要连接行情发布服务
+  quote-publish-addr: '192.168.31.139:5004' # 行情发布服务地址
 
 # local configuration
 local:
@@ -39,14 +41,14 @@ oracle:
   address: '192.168.31.88'
   name: 'orcl'
   port: '1521'
-  user: 'mtp2_test201'
+  user: 'mtp2_test139'
   pwd: 'muchinfo'
   max-idle-conns: 10
   max-open-conns: 100
 
 # rabbitmq configuration
 rabbitmq:
-  url: 'amqp://guest:guest@192.168.31.201:5020/test'
+  url: 'amqp://guest:guest@192.168.31.139:5020/test'
   exchange: 'entry'
 
 # 跨域配置

+ 4 - 2
config/system.go

@@ -1,6 +1,8 @@
 package config
 
 type System struct {
-	Env  string `mapstructure:"env" json:"env" yaml:"env"`    // 环境值
-	Addr int    `mapstructure:"addr" json:"addr" yaml:"addr"` // 端口值
+	Env              string `mapstructure:"env" json:"env" yaml:"env"`                                              // 环境值
+	Addr             int    `mapstructure:"addr" json:"addr" yaml:"addr"`                                           // 端口值
+	NeedQuotePublish bool   `mapstructure:"need-quote-publish" json:"need-quote-publish" yaml:"need-quote-publish"` // 是否需要连接行情发布服务
+	QuotePublishAddr string `mapstructure:"quote-publish-addr" json:"quote-publish-addr" yaml:"quote-publish-addr"` // 行情发布服务地址
 }

+ 78 - 0
docs/docs.go

@@ -200,6 +200,59 @@ const docTemplate = `{
                     }
                 }
             }
+        },
+        "/Quote/QuoteSubscribe": {
+            "post": {
+                "security": [
+                    {
+                        "ApiKeyAuth": []
+                    }
+                ],
+                "consumes": [
+                    "application/json"
+                ],
+                "produces": [
+                    "application/json"
+                ],
+                "tags": [
+                    "实时行情"
+                ],
+                "summary": "实时行情",
+                "parameters": [
+                    {
+                        "description": "入参",
+                        "name": "data",
+                        "in": "body",
+                        "required": true,
+                        "schema": {
+                            "type": "array",
+                            "items": {
+                                "$ref": "#/definitions/request.QuoteSubscribeReq"
+                            }
+                        }
+                    }
+                ],
+                "responses": {
+                    "200": {
+                        "description": "出参",
+                        "schema": {
+                            "allOf": [
+                                {
+                                    "$ref": "#/definitions/response.Response"
+                                },
+                                {
+                                    "type": "object",
+                                    "properties": {
+                                        "msg": {
+                                            "type": "string"
+                                        }
+                                    }
+                                }
+                            ]
+                        }
+                    }
+                }
+            }
         }
     },
     "definitions": {
@@ -272,6 +325,31 @@ const docTemplate = `{
                 }
             }
         },
+        "request.QuoteGoods": {
+            "type": "object",
+            "properties": {
+                "exchangeId": {
+                    "description": "交易所代码,一般写死250",
+                    "type": "integer"
+                },
+                "goodsCode": {
+                    "description": "商品代码,全大写",
+                    "type": "string"
+                }
+            }
+        },
+        "request.QuoteSubscribeReq": {
+            "type": "object",
+            "properties": {
+                "quoteGoodses": {
+                    "description": "待订阅商品信息",
+                    "type": "array",
+                    "items": {
+                        "$ref": "#/definitions/request.QuoteGoods"
+                    }
+                }
+            }
+        },
         "response.LoginRsp": {
             "type": "object",
             "properties": {

+ 78 - 0
docs/swagger.json

@@ -191,6 +191,59 @@
                     }
                 }
             }
+        },
+        "/Quote/QuoteSubscribe": {
+            "post": {
+                "security": [
+                    {
+                        "ApiKeyAuth": []
+                    }
+                ],
+                "consumes": [
+                    "application/json"
+                ],
+                "produces": [
+                    "application/json"
+                ],
+                "tags": [
+                    "实时行情"
+                ],
+                "summary": "实时行情",
+                "parameters": [
+                    {
+                        "description": "入参",
+                        "name": "data",
+                        "in": "body",
+                        "required": true,
+                        "schema": {
+                            "type": "array",
+                            "items": {
+                                "$ref": "#/definitions/request.QuoteSubscribeReq"
+                            }
+                        }
+                    }
+                ],
+                "responses": {
+                    "200": {
+                        "description": "出参",
+                        "schema": {
+                            "allOf": [
+                                {
+                                    "$ref": "#/definitions/response.Response"
+                                },
+                                {
+                                    "type": "object",
+                                    "properties": {
+                                        "msg": {
+                                            "type": "string"
+                                        }
+                                    }
+                                }
+                            ]
+                        }
+                    }
+                }
+            }
         }
     },
     "definitions": {
@@ -263,6 +316,31 @@
                 }
             }
         },
+        "request.QuoteGoods": {
+            "type": "object",
+            "properties": {
+                "exchangeId": {
+                    "description": "交易所代码,一般写死250",
+                    "type": "integer"
+                },
+                "goodsCode": {
+                    "description": "商品代码,全大写",
+                    "type": "string"
+                }
+            }
+        },
+        "request.QuoteSubscribeReq": {
+            "type": "object",
+            "properties": {
+                "quoteGoodses": {
+                    "description": "待订阅商品信息",
+                    "type": "array",
+                    "items": {
+                        "$ref": "#/definitions/request.QuoteGoods"
+                    }
+                }
+            }
+        },
         "response.LoginRsp": {
             "type": "object",
             "properties": {

+ 47 - 0
docs/swagger.yaml

@@ -49,6 +49,23 @@ definitions:
     - data
     - funCode
     type: object
+  request.QuoteGoods:
+    properties:
+      exchangeId:
+        description: 交易所代码,一般写死250
+        type: integer
+      goodsCode:
+        description: 商品代码,全大写
+        type: string
+    type: object
+  request.QuoteSubscribeReq:
+    properties:
+      quoteGoodses:
+        description: 待订阅商品信息
+        items:
+          $ref: '#/definitions/request.QuoteGoods'
+        type: array
+    type: object
   response.LoginRsp:
     properties:
       expiresAt:
@@ -195,6 +212,36 @@ paths:
       summary: 总线通知
       tags:
       - 总线业务
+  /Quote/QuoteSubscribe:
+    post:
+      consumes:
+      - application/json
+      parameters:
+      - description: 入参
+        in: body
+        name: data
+        required: true
+        schema:
+          items:
+            $ref: '#/definitions/request.QuoteSubscribeReq'
+          type: array
+      produces:
+      - application/json
+      responses:
+        "200":
+          description: 出参
+          schema:
+            allOf:
+            - $ref: '#/definitions/response.Response'
+            - properties:
+                msg:
+                  type: string
+              type: object
+      security:
+      - ApiKeyAuth: []
+      summary: 实时行情
+      tags:
+      - 实时行情
 securityDefinitions:
   ApiKeyAuth:
     in: header

+ 0 - 135
global/client.go

@@ -1,135 +0,0 @@
-package global
-
-import (
-	"bytes"
-	"encoding/json"
-	"fmt"
-	rsp "mtp20access/model/mq/response"
-	"sync"
-
-	"github.com/mitchellh/mapstructure"
-)
-
-type LoginRedis struct {
-	LoginID   string `json:"loginId" redis:"loginId"`     // 登陆账号
-	UserID    string `json:"userId" redis:"userId"`       // 用户ID
-	SessionID string `json:"sessionId" redis:"sessionId"` // 终端sid
-	Token     string `json:"token" redis:"token"`         // 令牌
-	Group     string `json:"group" redis:"group"`         // 终端分组
-	Addr      string `json:"addr" redis:"addr"`           // 客户端地址信息 // FIXME: 由于本服务改用短连,所以每次提交请交请求可能会不一样,后期可判断是否在中间件中进行拦截
-}
-
-// FromMap Map to Struct
-func (r *LoginRedis) FromMap(val map[string]interface{}) error {
-	return mapstructure.Decode(val, r)
-}
-
-// ToMap Struct to Map
-func (r *LoginRedis) ToMap() (val map[string]interface{}, err error) {
-	if marshalContent, err := json.Marshal(r); err != nil {
-		return nil, err
-	} else {
-		d := json.NewDecoder(bytes.NewReader(marshalContent))
-		d.UseNumber() // 设置将float64转为一个number
-		if err := d.Decode(&val); err != nil {
-			fmt.Println(err)
-		} else {
-			for k, v := range val {
-				val[k] = v
-			}
-		}
-	}
-
-	return
-}
-
-type Client struct {
-	LoginRedis
-
-	mtx             sync.RWMutex
-	curSerialNumber uint32                // 当前业务流水号
-	asyncTasks      map[string]*AsyncTask // key:SessionId_FuncodeRsp_SerialNumber
-}
-
-// GetSerialNumber 获取可用流水号
-func (r *Client) GetSerialNumber() uint32 {
-	r.mtx.Lock()
-	defer func() {
-		r.mtx.Unlock()
-	}()
-	r.curSerialNumber += 1
-
-	return r.curSerialNumber
-}
-
-// GetAsyncTask 获取目标异步任务
-// key:SessionId_FuncodeRsp
-func (r *Client) GetAsyncTask(key string) (asyncTask *AsyncTask) {
-	r.mtx.RLock()
-	defer func() {
-		r.mtx.RUnlock()
-	}()
-
-	asyncTask, ok := r.asyncTasks[key]
-	if ok {
-		return asyncTask
-	} else {
-		return nil
-	}
-}
-
-// SetAsyncTask 设置异步任务
-func (r *Client) SetAsyncTask(asyncTask *AsyncTask, key string) {
-	r.mtx.Lock()
-	defer func() {
-		r.mtx.Unlock()
-	}()
-
-	if r.asyncTasks == nil {
-		r.asyncTasks = make(map[string]*AsyncTask, 0)
-	}
-	delete(r.asyncTasks, key)
-	r.asyncTasks[key] = asyncTask
-}
-
-// DeleteAsyncTask 删除异步任务
-func (r *Client) DeleteAsyncTask(key string) {
-	r.mtx.Lock()
-	defer func() {
-		r.mtx.Unlock()
-	}()
-	delete(r.asyncTasks, key)
-}
-
-func (r *Client) GetAllAsyncTask() *map[string]*AsyncTask {
-	return &r.asyncTasks
-}
-
-// MQPacket 与总线交互的数据体
-type MQPacket struct {
-	FunCode   uint32  // 功能码
-	SessionId uint32  // 数据包的sid
-	Data      *[]byte // 业务数据体
-}
-
-// AsyncTask 异步任务结构体
-type AsyncTask struct {
-	PacketRsp    chan MQPacket // 总线数据处理通道
-	FuncodeRsp   uint32        // 回复功能码
-	SerialNumber uint32        // 通信流水号
-	Own          *Client
-	IsEncrypted  bool // 是否加密
-
-	Rsp     chan rsp.MQBodyRsp // 回调
-	doClose sync.Once          // 仅关闭通道一次
-}
-
-// Finish 完成
-func (r *AsyncTask) Finish() {
-	r.doClose.Do(
-		func() {
-			close(r.Rsp)
-			key := fmt.Sprintf("%v_%v_%v", r.Own.SessionID, r.FuncodeRsp, r.SerialNumber)
-			r.Own.DeleteAsyncTask(key)
-		})
-}

+ 3 - 1
global/global.go

@@ -2,6 +2,7 @@ package global
 
 import (
 	"mtp20access/config"
+	"mtp20access/publish"
 
 	"github.com/go-redis/redis/v8"
 	"github.com/spf13/viper"
@@ -25,6 +26,7 @@ var (
 	M2A_Concurrency_Control = &singleflight.Group{}
 
 	M2A_RABBITMQ     *RabbitMQ
-	M2A_Clients      map[int]*Client  // key:SessionID
 	M2A_FuncodeTopic map[string][]int // 总线主题与REQ Funcode的MAP, key:主题
+
+	M2A_Publish *publish.Publisher
 )

+ 1 - 0
go.mod

@@ -8,6 +8,7 @@ require (
 	github.com/go-redis/redis/v8 v8.11.5
 	github.com/golang-jwt/jwt/v4 v4.4.2
 	github.com/golang/protobuf v1.5.2
+	github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c
 	github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
 	github.com/mattn/go-oci8 v0.1.1
 	github.com/mitchellh/mapstructure v1.5.0

+ 1 - 0
go.sum

@@ -258,6 +258,7 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORR
 github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
 github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
 github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
+github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c h1:Lh2aW+HnU2Nbe1gqD9SOJLJxW1jBMmQOktN2acDyJk8=
 github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
 github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=

+ 20 - 17
initialize/rabbitmq.go

@@ -3,16 +3,19 @@ package initialize
 import (
 	"encoding/base64"
 	"fmt"
+	"mtp20access/client"
 	"mtp20access/global"
 	rsp "mtp20access/model/mq/response"
 	"mtp20access/packet"
+	"mtp20access/rabbitmq"
 	"mtp20access/res/pb"
 	"mtp20access/utils"
 
-	"github.com/golang/protobuf/proto"
+	// "github.com/golang/protobuf/proto"
 	"github.com/streadway/amqp"
 	"go.uber.org/zap"
 	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/proto"
 )
 
 func RabbitMQ() *global.RabbitMQ {
@@ -48,7 +51,7 @@ func (t *MQProc) Process(topic, queuename string, msg *[]byte) {
 
 	if funcode, sessionId, bytes, serialNumber, err := t.getRspProtobuf(msg); err == nil && bytes != nil {
 		// 尝试获取对应异步任务
-		if client, exists := global.M2A_Clients[int(sessionId)]; exists {
+		if client, exists := client.Clients[int(sessionId)]; exists {
 			key := fmt.Sprintf("%v_%v_%v", sessionId, funcode, serialNumber)
 			// 银行服务相关的回复流水号是错误的,所以需要特殊处理
 			if int(funcode) == global.T2bBankSignRsp ||
@@ -520,57 +523,57 @@ func (t *MQProc) getRspProtobuf(msg *[]byte) (funcode uint32, sessionId uint32,
 // RabbitMQSubscribeTopic 订阅主题
 func RabbitMQSubscribeTopic() (err error) {
 	// 订阅需要的总线响应主题
-	if err = global.SubscribeTopic(global.TOPIC_RSP_QKERNEL); err != nil {
+	if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_QKERNEL); err != nil {
 		global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
 		return
 	}
-	if err = global.SubscribeTopic(global.TOPIC_MANAGE_RSP); err != nil {
+	if err = rabbitmq.SubscribeTopic(global.TOPIC_MANAGE_RSP); err != nil {
 		global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
 		return
 	}
-	if err = global.SubscribeTopic(global.TOPIC_RSP_BANK); err != nil {
+	if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_BANK); err != nil {
 		global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
 		return
 	}
-	if err = global.SubscribeTopic(global.TOPIC_RSP_MONEY); err != nil {
+	if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_MONEY); err != nil {
 		global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
 		return
 	}
-	if err = global.SubscribeTopic(global.TOPIC_PERFORMANCE_RSP); err != nil {
+	if err = rabbitmq.SubscribeTopic(global.TOPIC_PERFORMANCE_RSP); err != nil {
 		global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
 		return
 	}
-	if err = global.SubscribeTopic(global.TOPIC_WAREHOUSE_RECIEPT_RSP); err != nil {
+	if err = rabbitmq.SubscribeTopic(global.TOPIC_WAREHOUSE_RECIEPT_RSP); err != nil {
 		global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
 		return
 	}
-	if err = global.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_TRADE); err != nil {
+	if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_TRADE); err != nil {
 		global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
 		return
 	}
-	if err = global.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_TRADE_GZ); err != nil {
+	if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_TRADE_GZ); err != nil {
 		global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
 		return
 	}
-	if err = global.SubscribeTopic(global.TOPIC_RSP_CJBS_TRADE_GZ); err != nil {
+	if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_CJBS_TRADE_GZ); err != nil {
 		global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
 		return
 	}
 
 	// 铁合金
-	if err = global.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_PRESALE_THJ); err != nil {
+	if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_PRESALE_THJ); err != nil {
 		global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
 		return
 	}
-	if err = global.SubscribeTopic(global.TOPIC_MANAGER_THJ_NTF); err != nil {
+	if err = rabbitmq.SubscribeTopic(global.TOPIC_MANAGER_THJ_NTF); err != nil {
 		global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
 		return
 	}
-	if err = global.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_PRESALE_GZ); err != nil {
+	if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_PRESALE_GZ); err != nil {
 		global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
 		return
 	}
-	if err = global.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_CENTERPURCHASE_GZ); err != nil {
+	if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_CENTERPURCHASE_GZ); err != nil {
 		global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
 		return
 	}
@@ -585,8 +588,8 @@ func StartRabbitMQReceive() {
 	t := &MQProc{}
 
 	go func() {
-		for _, subinfo := range global.SubInfos {
-			global.Receive(subinfo.Topic, subinfo.QueueName, t)
+		for _, subinfo := range rabbitmq.SubInfos {
+			rabbitmq.Receive(subinfo.Topic, subinfo.QueueName, t)
 		}
 	}()
 }

+ 1 - 0
initialize/router.go

@@ -47,6 +47,7 @@ func Routers() *gin.Engine {
 	{
 		router.InitMQPrivateRouter(privateGroup)
 		router.InitAccountPrivateRouter(privateGroup)
+		router.InitQuotePrivateRouter(privateGroup)
 	}
 
 	return Router

+ 10 - 0
main.go

@@ -4,6 +4,8 @@ import (
 	"mtp20access/core"
 	"mtp20access/global"
 	"mtp20access/initialize"
+	"mtp20access/publish"
+	"mtp20access/quote_publish"
 
 	"go.uber.org/zap"
 )
@@ -52,6 +54,14 @@ func main() {
 	// 开始接收RabbitMQ信息
 	initialize.StartRabbitMQReceive()
 
+	// 初始化订阅器
+	global.M2A_Publish = publish.NewPublisher()
+
+	// 连接行情发布服务
+	if global.M2A_CONFIG.System.NeedQuotePublish {
+		go quote_publish.QuotePublishSer.Run()
+	}
+
 	// 启动Http API 服务
 	core.RunApiServer()
 	defer global.M2A_REDIS.Close()

+ 10 - 0
model/quote/request/quote.go

@@ -0,0 +1,10 @@
+package request
+
+type QuoteGoods struct {
+	ExchangeId int    `json:"exchangeId"` // 交易所代码,一般写死250
+	GoodsCode  string `json:"goodsCode"`  // 商品代码,全大写
+}
+
+type QuoteSubscribeReq struct {
+	QuoteGoodses []QuoteGoods // 待订阅商品信息
+}

+ 135 - 0
packet/quotePacket.go

@@ -0,0 +1,135 @@
+//const unsigned char MSG_SUBSCRIPTION_REQ            = 0x20;
+/************行情协议格式************************************
+下标  标识	        类型	字节数	内容	说明
+0    HeadTag	    byte	1	0xFF	头标记
+1    Length	        uint	4		包总长度
+4    TypeMain	    byte	1		大类
+6    TypeSecond	    []byte	2		小类
+8    SerialNumber	uint	4		通讯报文序号
+12   Mode	        byte	1		内容类型,0, 自定义格式,没什么用
+13   Version	    byte	1		版本号
+     数据体			业务结构体,自定义格式
+     FootTag	     byte	1	0x00	尾标记
+
+     数据体前,包头长度=14, 空包长度=15
+     行情订阅数据体格式
+14   accountid       []byte  8       账户id(loginid) bigEnd
+22   token           []byte  64      令牌
+86   goodscount      uint    4       商品数量  bigEnd
+     repeated{
+        istatus      byte    1       订阅状态
+        exchangeid   byte    1       交易所id
+        goodscode    []byte  64      商品代码
+     }
+*******************************************************/
+
+package packet
+
+import (
+	"fmt"
+	"io"
+	"mtp20access/utils"
+	"net"
+)
+
+// MiQuotePacket 行情协议结构体
+type MiQuotePacket struct {
+	Length    uint32
+	BigType   byte
+	SmallType uint16
+	SerialNum uint32
+	Mode      byte
+	Msg       []byte
+	OriMsg    []byte // 原始包数据
+}
+
+// EnPack 重新打包数据
+func (r *MiQuotePacket) EnPack() []byte {
+	r.Length = uint32(len(r.Msg)) + 15                           // 空包长度15
+	buf := make([]byte, 0)                                       // 缓存
+	buf = append(buf, byte(0xFF))                                // HeadTag
+	buf = append(buf, utils.UintTobytesBigEnd(r.Length)...)      // Length
+	buf = append(buf, r.BigType)                                 // TypeMain
+	buf = append(buf, utils.Uint16TobytesBigEnd(r.SmallType)...) // TypeSecond
+	buf = append(buf, utils.UintTobytesBigEnd(r.SerialNum)...)   // SerialNumber
+	buf = append(buf, byte(0))                                   // Mode
+	buf = append(buf, byte(0))                                   // version
+	buf = append(buf, r.Msg...)                                  // 数据体body
+	buf = append(buf, byte(0))                                   // FootTag
+
+	return buf
+}
+
+func (r *MiQuotePacket) HeaderLen() uint32 {
+	return 14
+}
+
+// BodyLen 数据体长度, 含FootTag
+func (r *MiQuotePacket) BodyLen() uint32 {
+	return r.Length - 14
+}
+
+// UnPackHead 解包头
+func (r *MiQuotePacket) UnPackHead(buf []byte) error {
+	if len(buf) < 14 {
+		return fmt.Errorf("行情包头长度错误")
+	}
+
+	if buf[0] != 0xFF {
+		return fmt.Errorf("行情包头长度错误")
+	}
+
+	r.Length = utils.BytesBigEndToUint32(buf[1:5])
+	r.BigType = buf[5]
+	r.SmallType = utils.BytesBigEndToUint16(buf[6:8])
+	r.SerialNum = utils.BytesBigEndToUint32(buf[8:12])
+	r.Mode = buf[12]
+
+	return nil
+}
+
+// SetOriMsg 保存原始数据包
+func (r *MiQuotePacket) SetOriMsg(arg ...[]byte) {
+	if r.OriMsg == nil {
+		r.OriMsg = make([]byte, 0)
+	}
+	for i := range arg {
+		r.OriMsg = append(r.OriMsg, arg[i]...)
+	}
+}
+
+// ReadMessage 从指定tcp链接读取一个协议包
+// @返回值 []byte 未解包的原始数据包, 如果需获取业务数据内容,
+// 请调用UnPack方法后取成员变量 Data 的内容
+func (r *MiQuotePacket) ReadMessage(conn *net.Conn) ([]byte, error) {
+	r.OriMsg = make([]byte, 0)
+	if conn == nil {
+		return r.OriMsg, fmt.Errorf("当前未连接行情发布服务")
+	}
+
+	headerBuf := make([]byte, r.HeaderLen())
+	nRead, err := io.ReadFull(*conn, headerBuf)
+	if err != nil || nRead != len(headerBuf) {
+		return r.OriMsg, fmt.Errorf("读取行情包头错误,可能连接已关闭:%v", err)
+	}
+
+	err = r.UnPackHead(headerBuf)
+	if err != nil {
+		return r.OriMsg, err
+	}
+
+	dataBuf := make([]byte, r.BodyLen())
+	nRead, err = io.ReadFull(*conn, dataBuf)
+	if err != nil || nRead != len(dataBuf) {
+		return r.OriMsg, fmt.Errorf("读取行情包数据错误,可能连接已关闭:%v", err)
+	}
+
+	r.SetOriMsg(headerBuf, dataBuf)
+	return r.OriMsg, nil
+}
+
+// HeaderInfo 头部信息, 功能号、sid、流水号、长度等
+func (r *MiQuotePacket) HeaderInfo() string {
+	return fmt.Sprintf("bigtype[%d] smalltype[%d] serial[%d] iLen:%d",
+		r.BigType, r.SmallType, r.SerialNum, r.Length)
+}

+ 60 - 0
publish/publish.go

@@ -0,0 +1,60 @@
+package publish
+
+import (
+	"sync"
+)
+
+type Topic string
+
+const (
+	Topic_Quote   = "quote"   // 实时行情
+	Topic_Trading = "trading" // 交易服务通知
+)
+
+type Publisher struct {
+	subscribers map[Topic][]chan interface{}
+	mu          sync.RWMutex
+}
+
+func NewPublisher() *Publisher {
+	return &Publisher{
+		subscribers: make(map[Topic][]chan interface{}),
+	}
+}
+
+func (p *Publisher) Subscribe(topic Topic) chan interface{} {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+
+	ch := make(chan interface{}, 1)
+	p.subscribers[topic] = append(p.subscribers[topic], ch)
+	return ch
+}
+
+func (p *Publisher) Unsubscribe(topic Topic, ch chan interface{}) {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+
+	subs := p.subscribers[topic]
+	for i := range subs {
+		if subs[i] == ch {
+			p.subscribers[topic] = append(subs[:i], subs[i+1:]...)
+			break
+		}
+	}
+	close(ch)
+}
+
+func (p *Publisher) Publish(topic Topic, data interface{}) {
+	p.mu.RLock()
+	defer p.mu.RUnlock()
+
+	subs := p.subscribers[topic]
+	for _, ch := range subs {
+		select {
+		case ch <- data:
+		default:
+			// 如果消息管道已满,直接跳过,不阻塞订阅者
+		}
+	}
+}

+ 90 - 0
quote_publish/quote_publish.go

@@ -0,0 +1,90 @@
+package quote_publish
+
+import (
+	"mtp20access/global"
+	"mtp20access/packet"
+	"mtp20access/publish"
+	"net"
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+)
+
+var QuotePublishSer = QuotePublish{}
+
+type QuotePublish struct {
+	conn   net.Conn     // 与行情发布服务的连接
+	status int          // 连接状态 -1:连接已断开 0:初始状态 1:已连接 2:正在连接
+	mtx    sync.RWMutex // 锁
+}
+
+func (r *QuotePublish) Run() {
+	// 连接行情发布, 阻塞模式, 直接成功才返回
+	r.connQuotePublish()
+
+	// 读取行情发布连接tcp报文
+	go r.readQuote()
+}
+
+func (r *QuotePublish) setStatus(status int) {
+	r.mtx.Lock()
+	defer r.mtx.Unlock()
+	r.status = status
+}
+
+// connQuotePublish 连接行情发布服务
+func (r *QuotePublish) connQuotePublish() {
+	var err error
+	for {
+		if r.conn != nil {
+			r.conn.Close()
+		}
+
+		r.setStatus(2)
+		global.M2A_LOG.Info("正在尝试连接行情发布服务", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr))
+		if r.conn, err = net.Dial("tcp", global.M2A_CONFIG.System.QuotePublishAddr); err != nil {
+			global.M2A_LOG.Error("尝试连接行情发布服务失败,将在3秒后重试", zap.Any("err", err))
+			time.Sleep(time.Second * 3)
+			continue
+		}
+
+		global.M2A_LOG.Info("连接行情发布服务成功", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr))
+		r.setStatus(1)
+		break
+	}
+}
+
+// onDisconnected 连接断开事件
+func (r *QuotePublish) onDisconnected() {
+	if r.status == 2 {
+		return
+	}
+	global.M2A_LOG.Error("行情发布服务连接已断开,将会尝试重新连接行情发布服务", zap.Any("host", global.M2A_CONFIG.System.QuotePublishAddr))
+
+	r.setStatus(-1)
+	go func() {
+		r.connQuotePublish()
+		r.readQuote()
+	}()
+}
+
+// readQuote 从行情发布服务读取行情信息
+func (r *QuotePublish) readQuote() {
+	for {
+		var p packet.MiQuotePacket
+		if _, err := p.ReadMessage(&r.conn); err != nil {
+			go r.onDisconnected()
+			break
+		}
+
+		// 心跳包
+		if p.BigType == 0x12 && p.Length <= 24 {
+			global.M2A_LOG.Debug("接收到行情发页服务心跳回复")
+			continue
+		}
+
+		// 分发给订阅者
+		global.M2A_Publish.Publish(publish.Topic_Quote, &p)
+	}
+}

+ 16 - 14
global/rabbitmq.go → rabbitmq/rabbitmq.go

@@ -1,9 +1,11 @@
-package global
+package rabbitmq
 
 import (
 	"encoding/binary"
 	"errors"
 	"fmt"
+	"mtp20access/client"
+	"mtp20access/global"
 
 	"github.com/streadway/amqp"
 	"go.uber.org/zap"
@@ -29,14 +31,14 @@ func SubscribeTopic(topic string) (err error) {
 	queuename := fmt.Sprintf("mtp20_access_%s", topic)
 
 	// 申明队列
-	if _, err = M2A_RABBITMQ.Channel.QueueDeclare(queuename, true, false, false, true, nil); err != nil {
-		M2A_LOG.Error("rabbitmq declear queue failed, err:", zap.Error(err))
+	if _, err = global.M2A_RABBITMQ.Channel.QueueDeclare(queuename, true, false, false, true, nil); err != nil {
+		global.M2A_LOG.Error("rabbitmq declear queue failed, err:", zap.Error(err))
 		return
 	}
 
 	// 绑定队列
-	if err = M2A_RABBITMQ.Channel.QueueBind(queuename, topic, "entry", false, nil); err != nil {
-		M2A_LOG.Error("rabbitmq bind queue failed, err:", zap.Error(err))
+	if err = global.M2A_RABBITMQ.Channel.QueueBind(queuename, topic, "entry", false, nil); err != nil {
+		global.M2A_LOG.Error("rabbitmq bind queue failed, err:", zap.Error(err))
 		return
 	}
 
@@ -47,10 +49,10 @@ func SubscribeTopic(topic string) (err error) {
 }
 
 // Publish 发送消息
-func Publish(topic string, msg *MQPacket) (err error) {
-	if M2A_RABBITMQ == nil || M2A_RABBITMQ.Connection.IsClosed() {
+func Publish(topic string, msg *client.MQPacket) (err error) {
+	if global.M2A_RABBITMQ == nil || global.M2A_RABBITMQ.Connection.IsClosed() {
 		err = errors.New("rabbitmq is not connected")
-		M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err))
+		global.M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err))
 		return
 	}
 
@@ -64,11 +66,11 @@ func Publish(topic string, msg *MQPacket) (err error) {
 	data = append(data, sessionId...)
 	data = append(data, *msg.Data...)
 
-	if err = M2A_RABBITMQ.Channel.Publish("entry", topic, false, false, amqp.Publishing{
+	if err = global.M2A_RABBITMQ.Channel.Publish("entry", topic, false, false, amqp.Publishing{
 		ContentType: "text/plain",
 		Body:        data,
 	}); err != nil {
-		M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err))
+		global.M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err))
 		return
 	}
 
@@ -77,15 +79,15 @@ func Publish(topic string, msg *MQPacket) (err error) {
 
 // Receive 接收消息
 func Receive(topic, queuename string, processer MsgProcesser) (err error) {
-	if M2A_RABBITMQ == nil || M2A_RABBITMQ.Connection.IsClosed() {
+	if global.M2A_RABBITMQ == nil || global.M2A_RABBITMQ.Connection.IsClosed() {
 		err = errors.New("rabbitmq is not connected")
-		M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err))
+		global.M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err))
 		return
 	}
 
-	msgList, err := M2A_RABBITMQ.Channel.Consume(queuename, "", false, false, false, false, nil)
+	msgList, err := global.M2A_RABBITMQ.Channel.Consume(queuename, "", false, false, false, false, nil)
 	if err != nil {
-		M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err))
+		global.M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err))
 		return
 	}
 

+ 15 - 0
router/quote.go

@@ -0,0 +1,15 @@
+package router
+
+import (
+	"mtp20access/api/v1/quote"
+
+	"github.com/gin-gonic/gin"
+)
+
+func InitQuotePrivateRouter(r *gin.RouterGroup) {
+	mqR := r.Group("Quote").Use()
+	{
+		mqR.GET("WS", quote.Quote)
+		mqR.POST("QuoteSubscribe", quote.QuoteSubscribe)
+	}
+}

+ 8 - 11
service/account/login.go

@@ -5,6 +5,7 @@ import (
 	"encoding/hex"
 	"errors"
 	"fmt"
+	"mtp20access/client"
 	"mtp20access/global"
 	accountModel "mtp20access/model/account"
 	"mtp20access/model/account/request"
@@ -106,9 +107,7 @@ func getLoginAccount(userName string, password string) (loginaccount *accountMod
 // newSessionID 获取
 func newSessionID() int {
 	mtx.RLock()
-	defer func() {
-		mtx.RUnlock()
-	}()
+	defer mtx.RUnlock()
 	curSessionID += 1
 
 	return curSessionID
@@ -132,7 +131,7 @@ func buildRedisLoginInfo(loginaccount accountModel.Loginaccount, addr string, gr
 		return
 	}
 	expiresAt = claims.RegisteredClaims.ExpiresAt.Unix()
-	loginLogin := global.LoginRedis{
+	loginLogin := client.LoginRedis{
 		LoginID:   strconv.Itoa(int(loginaccount.LOGINID)),
 		UserID:    strconv.Itoa(int(loginaccount.USERID)),
 		SessionID: strconv.Itoa(sessionID),
@@ -166,14 +165,12 @@ func buildRedisLoginInfo(loginaccount accountModel.Loginaccount, addr string, gr
 
 	// 记录用户信息
 	mtx.Lock()
-	defer func() {
-		mtx.Unlock()
-	}()
-	if global.M2A_Clients == nil {
-		global.M2A_Clients = make(map[int]*global.Client, 0)
+	defer mtx.Unlock()
+	if client.Clients == nil {
+		client.Clients = make(map[int]*client.Client, 0)
 	}
-	delete(global.M2A_Clients, claims.SessionID)
-	global.M2A_Clients[claims.SessionID] = &global.Client{LoginRedis: loginLogin}
+	delete(client.Clients, claims.SessionID)
+	client.Clients[claims.SessionID] = &client.Client{LoginRedis: loginLogin}
 
 	return
 }

+ 12 - 10
service/mq/mq.go

@@ -3,11 +3,13 @@ package mq
 import (
 	"errors"
 	"fmt"
+	"mtp20access/client"
 	"mtp20access/global"
 	commonRequest "mtp20access/model/common/request"
 	rsp "mtp20access/model/common/response"
 	"mtp20access/model/mq/request"
 	"mtp20access/model/mq/response"
+	"mtp20access/rabbitmq"
 	"time"
 
 	"github.com/gin-gonic/gin"
@@ -26,7 +28,7 @@ func SendMQ(c *gin.Context, req *request.MQBodyReq) (err error) {
 	claims := s.(*commonRequest.CustomClaims)
 
 	// 获取登录账户信息
-	client, exists := global.M2A_Clients[claims.SessionID]
+	t, exists := client.Clients[claims.SessionID]
 	if !exists {
 		err = errors.New("获取登录账户信息异常")
 		global.M2A_LOG.Error(err.Error(), zap.Error(err))
@@ -34,7 +36,7 @@ func SendMQ(c *gin.Context, req *request.MQBodyReq) (err error) {
 	}
 
 	// 将客户端请求的数据转换成总线使用的Protobuf
-	serialNumber := client.GetSerialNumber()
+	serialNumber := t.GetSerialNumber()
 	bytes, err := req.GetProtoBytes(&serialNumber)
 	if err != nil {
 		return
@@ -45,21 +47,21 @@ func SendMQ(c *gin.Context, req *request.MQBodyReq) (err error) {
 	}
 
 	// 创建异步任务
-	key := fmt.Sprintf("%v_%v_%v", client.SessionID, req.FunCodeRsp, serialNumber)
+	key := fmt.Sprintf("%v_%v_%v", t.SessionID, req.FunCodeRsp, serialNumber)
 	// 银行服务相关的回复流水号是错误的,所以需要特殊处理
 	if int(req.FunCodeReq) == global.T2bBankSignReq ||
 		int(req.FunCodeReq) == global.T2bBankCancelSignReq ||
 		int(req.FunCodeReq) == global.T2bBankWithdrawReq ||
 		int(req.FunCodeReq) == global.T2bBankDepositReq {
-		key = fmt.Sprintf("%v_%v", client.SessionID, req.FunCodeRsp)
+		key = fmt.Sprintf("%v_%v", t.SessionID, req.FunCodeRsp)
 	}
-	asyncTask := global.AsyncTask{
+	asyncTask := client.AsyncTask{
 		FuncodeRsp:   req.FunCodeRsp,
 		SerialNumber: serialNumber,
-		Own:          client,
+		Own:          t,
 		IsEncrypted:  *req.IsEncrypted,
 	}
-	client.SetAsyncTask(&asyncTask, key)
+	t.SetAsyncTask(&asyncTask, key)
 
 	// 获取对应主题
 	// topic, exists := global.M2A_FuncodeTopic[int(req.FunCodeReq)]
@@ -70,12 +72,12 @@ func SendMQ(c *gin.Context, req *request.MQBodyReq) (err error) {
 	}
 
 	// 向总线发送业务信息
-	packet := &global.MQPacket{
+	packet := &client.MQPacket{
 		FunCode:   req.FunCodeReq,
 		SessionId: uint32(claims.SessionID),
 		Data:      bytes,
 	}
-	go global.Publish(topic, packet)
+	go rabbitmq.Publish(topic, packet)
 
 	global.M2A_LOG.Info("[C->S]", zap.Any("req", req.FunCodeReq), zap.Any("SessionId", packet.SessionId), zap.Any("data", req.Data))
 
@@ -86,7 +88,7 @@ func SendMQ(c *gin.Context, req *request.MQBodyReq) (err error) {
 		rsp.OkWithData(r, c)
 	case <-time.After(time.Second * 30): // 超时
 		// 删除对应异步任务
-		client.DeleteAsyncTask(key)
+		t.DeleteAsyncTask(key)
 		rsp.FailWithMessage("业务超时", c)
 	}
 	asyncTask.Finish()

+ 4 - 3
service/mq/ntf.go

@@ -2,8 +2,9 @@ package mq
 
 import (
 	"errors"
-	"mtp20access/global"
+	"mtp20access/client"
 	"mtp20access/model/mq/request"
+	"mtp20access/rabbitmq"
 
 	"github.com/gin-gonic/gin"
 )
@@ -21,12 +22,12 @@ func SendNtfMQ(c *gin.Context, req *request.MQNtfReq) (err error) {
 	}
 
 	// 向总线发送业务信息
-	packet := &global.MQPacket{
+	packet := &client.MQPacket{
 		FunCode:   req.FunCode,
 		SessionId: 0,
 		Data:      bytes,
 	}
-	go global.Publish(req.Topic, packet)
+	go rabbitmq.Publish(req.Topic, packet)
 
 	return
 }

+ 78 - 0
service/quote/quote.go

@@ -0,0 +1,78 @@
+package quote
+
+import (
+	"errors"
+	"mtp20access/client"
+	"mtp20access/global"
+	commonRequest "mtp20access/model/common/request"
+	"mtp20access/model/quote/request"
+	"net/http"
+
+	"github.com/gin-gonic/gin"
+	"github.com/gorilla/websocket"
+	"go.uber.org/zap"
+)
+
+var upGrader = websocket.Upgrader{
+	ReadBufferSize:  1000000,
+	WriteBufferSize: 1000000,
+	CheckOrigin: func(r *http.Request) bool {
+		return true
+	},
+}
+
+// QuoteConn 终端连接WebSocket
+func QuoteConn(c *gin.Context) (err error) {
+	// 获取请求账号信息
+	s, exists := c.Get("claims")
+	if !exists {
+		err = errors.New("获取请求账号信息异常")
+		global.M2A_LOG.Error(err.Error())
+		return
+	}
+	claims := s.(*commonRequest.CustomClaims)
+
+	// 获取登录账户信息
+	client, exists := client.Clients[claims.SessionID]
+	if !exists {
+		err = errors.New("获取登录账户信息异常")
+		global.M2A_LOG.Error(err.Error())
+		return
+	}
+
+	// 将http连接升级为websocket连接
+	ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
+	if err != nil {
+		err = errors.New("连接错误")
+		global.M2A_LOG.Error("将http连接升级为websocket连接失败", zap.Any("err", err))
+		return
+	}
+	client.SetWebSocket(ws)
+
+	return
+}
+
+// QuoteSubscribe 设置客户订阅商品信息
+func QuoteSubscribe(c *gin.Context, req []request.QuoteSubscribeReq) (err error) {
+	// 获取请求账号信息
+	s, exists := c.Get("claims")
+	if !exists {
+		err = errors.New("获取请求账号信息异常")
+		global.M2A_LOG.Error(err.Error())
+		return
+	}
+	claims := s.(*commonRequest.CustomClaims)
+
+	// 获取登录账户信息
+	client, exists := client.Clients[claims.SessionID]
+	if !exists {
+		err = errors.New("获取登录账户信息异常")
+		global.M2A_LOG.Error(err.Error())
+		return
+	}
+
+	// 设置客户订阅商品信息
+	client.SetQuoteSubs(req)
+
+	return
+}

+ 35 - 0
test/docs/docs.go

@@ -0,0 +1,35 @@
+// Package docs GENERATED BY SWAG; DO NOT EDIT
+// This file was generated by swaggo/swag
+package docs
+
+import "github.com/swaggo/swag"
+
+const docTemplate = `{
+    "schemes": {{ marshal .Schemes }},
+    "swagger": "2.0",
+    "info": {
+        "description": "{{escape .Description}}",
+        "title": "{{.Title}}",
+        "contact": {},
+        "version": "{{.Version}}"
+    },
+    "host": "{{.Host}}",
+    "basePath": "{{.BasePath}}",
+    "paths": {}
+}`
+
+// SwaggerInfo holds exported Swagger Info so clients can modify it
+var SwaggerInfo = &swag.Spec{
+	Version:          "",
+	Host:             "",
+	BasePath:         "",
+	Schemes:          []string{},
+	Title:            "",
+	Description:      "",
+	InfoInstanceName: "swagger",
+	SwaggerTemplate:  docTemplate,
+}
+
+func init() {
+	swag.Register(SwaggerInfo.InstanceName(), SwaggerInfo)
+}

+ 7 - 0
test/docs/swagger.json

@@ -0,0 +1,7 @@
+{
+    "swagger": "2.0",
+    "info": {
+        "contact": {}
+    },
+    "paths": {}
+}

+ 4 - 0
test/docs/swagger.yaml

@@ -0,0 +1,4 @@
+info:
+  contact: {}
+paths: {}
+swagger: "2.0"

+ 12 - 0
test/main.go

@@ -0,0 +1,12 @@
+package main
+
+import (
+	"encoding/base64"
+	"mtp20access/packet"
+)
+
+func main() {
+	if b, err := packet.Encrypt([]byte("d99999999"), packet.AESKey, true); err == nil {
+		println(base64.StdEncoding.EncodeToString(b))
+	}
+}

+ 8 - 8
utils/directory.go

@@ -8,10 +8,10 @@ import (
 	"go.uber.org/zap"
 )
 
-//@function: PathExists
-//@description: 文件目录是否存在
-//@param: path string
-//@return: bool, error
+// @function:    PathExists
+// @description: 文件目录是否存在
+// @param:       path string
+// @return:      bool, error
 func PathExists(path string) (bool, error) {
 	fi, err := os.Stat(path)
 	if err == nil {
@@ -26,10 +26,10 @@ func PathExists(path string) (bool, error) {
 	return false, err
 }
 
-//@function: CreateDir
-//@description: 批量创建文件夹
-//@param: dirs ...string
-//@return: err error
+// @function:    CreateDir
+// @description: 批量创建文件夹
+// @param:       dirs ...string
+// @return:      err error
 func CreateDir(dirs ...string) (err error) {
 	for _, v := range dirs {
 		exist, err := PathExists(v)