zhou.xiaoning 2 năm trước cách đây
mục cha
commit
75e6407d86

+ 0 - 41
api/v1/quote/quote.go

@@ -1,41 +0,0 @@
-package quote
-
-import (
-	"mtp20access/model/common/response"
-	"mtp20access/model/quote/request"
-	"mtp20access/service/quote"
-
-	"github.com/gin-gonic/gin"
-)
-
-func Quote(c *gin.Context) {
-	if err := quote.QuoteConn(c); err != nil {
-		response.FailWithMessage(err.Error(), c)
-		// return
-	}
-
-	// response.OkWithMessage("连接成功", c)
-}
-
-// QuoteSubscribe 订阅商品实时行情请求
-// @Summary  订阅商品实时行情请求
-// @Security ApiKeyAuth
-// @accept   application/json
-// @Produce  application/json
-// @Param    data body     request.QuoteSubscribeReq     true "入参1"
-// @Success  200  {object} response.Response{msg=string} "出参"
-// @Router   /Quote/QuoteSubscribe [post]
-// @Tags     实时行情
-func QuoteSubscribe(c *gin.Context) {
-	var req request.QuoteSubscribeReq
-	if err := c.ShouldBindJSON(&req); err != nil {
-		response.FailWithMessage("入参不正确", c)
-		return
-	}
-
-	if err := quote.QuoteSubscribe(c, req); err != nil {
-		response.FailWithMessage(err.Error(), c)
-		return
-	}
-	response.OkWithMessage("订阅成功", c)
-}

+ 0 - 191
client/client.go

@@ -6,7 +6,6 @@ import (
 	"fmt"
 	"mtp20access/global"
 	rsp "mtp20access/model/mq/response"
-	"mtp20access/model/quote/request"
 	"mtp20access/packet"
 	"mtp20access/publish"
 	"sync"
@@ -27,12 +26,6 @@ type Client struct {
 	curSerialNumber uint32                // 当前业务流水号
 	asyncTasks      map[string]*AsyncTask // key:SessionId_FuncodeRsp_SerialNumber
 
-	wsQuoteConn    *websocket.Conn      // 终端行情WebSocket连接
-	quoteWriteChan chan []byte          // 推送队列 Access -> Client
-	quoteChan      chan interface{}     // 接收实时行情订阅channel QuoteServer -> Access
-	quoteSubs      []request.QuoteGoods // 当前已订阅行情的商品
-	// wsQuoteCloseChan chan struct{}        // 终端WebSocket连接关闭信号
-
 	wsTradeConn    *websocket.Conn  // 终端交易WebSocket连接
 	tradeWriteChan chan []byte      // 推送队列 Access -> Client
 	tradeChan      chan interface{} // 接收交易通知channel RabbitMQ -> Access
@@ -50,33 +43,6 @@ func (r *Client) GetSerialNumber() uint32 {
 	return r.curSerialNumber
 }
 
-// SetQuoteSubs 设置商品行情订阅信息
-func (r *Client) SetQuoteSubs(req request.QuoteSubscribeReq) {
-	// r.qmtx.Lock()
-	// defer r.qmtx.Unlock()
-
-	r.quoteSubs = req.QuoteGoodses
-
-	// 从订阅中心订阅行情通知
-	if r.quoteChan != nil {
-		global.M2A_Publish.Unsubscribe(publish.Topic_Quote, r.quoteChan)
-	}
-	r.quoteChan = global.M2A_Publish.Subscribe(publish.Topic_Quote)
-
-	go func() {
-		for {
-			msg, ok := <-r.quoteChan
-			if !ok {
-				return // 管道已关闭,退出循环
-			}
-			// 向客户端发送行情信息
-			if p, ok := msg.(*packet.MiQuotePacket); ok {
-				DispatchRealQuote(p, r)
-			}
-		}
-	}()
-}
-
 // SetQuoteSubs 设置交易通知订阅信息
 func (r *Client) SetTradeNft() {
 	// global.M2A_LOG.Info("SetTradeNft 11111111111111111111111111111")
@@ -110,48 +76,6 @@ func (r *Client) SetTradeNft() {
 	}()
 }
 
-// GetQuoteSubs 获取已订阅行情商品信息列表
-func (r *Client) GetQuoteSubs() (quoteSubs []request.QuoteGoods) {
-	r.mtx.Lock()
-	defer r.mtx.Unlock()
-
-	return r.quoteSubs
-}
-
-// // WriteWsBuf 向客户端发送实时行情
-// func (r *Client) WriteQuoteWsBuf(buf []byte) (err error) {
-// 	if r.quoteWriteChan != nil {
-// 		r.quoteWriteChan <- buf
-// 	}
-
-// 	return
-// }
-
-// func (r *Client) WriteTradeWsBuf(buf []byte) (err error) {
-// 	if r.tradeWriteChan != nil {
-// 		r.tradeWriteChan <- buf
-// 	} else {
-// 		global.M2A_LOG.Info("WriteTradeWsBuf::tradeWriteChan == nil", zap.Any("LoginID", r.LoginID), zap.Any("SessionID", r.SessionID))
-// 	}
-
-// 	if r.wsTradeConn != nil {
-// 		// r.tradeWriteChan <- buf
-// 	} else {
-// 		global.M2A_LOG.Info("WriteTradeWsBuf::wsTradeConn == nil", zap.Any("LoginID", r.LoginID), zap.Any("SessionID", r.SessionID))
-// 		global.M2A_LOG.Info("当前所有Client----")
-// 		for _, item := range Clients {
-// 			global.M2A_LOG.Info("client",
-// 				zap.Any("LoginID", item.LoginID),
-// 				zap.Any("Group", item.Group), zap.Any("SessionID", item.SessionID),
-// 				zap.Any("wsTradeConn", item.wsTradeConn), zap.Any("tradeWriteChan", item.tradeWriteChan),
-// 				zap.Any("wsQuoteConn", item.wsQuoteConn), zap.Any("quoteWriteChan", item.quoteWriteChan))
-// 		}
-// 		global.M2A_LOG.Info("----------------")
-// 	}
-
-// 	return
-// }
-
 // GetAsyncTask 获取目标异步任务
 // key:SessionId_FuncodeRsp
 func (r *Client) GetAsyncTask(key string) (asyncTask *AsyncTask) {
@@ -195,121 +119,6 @@ func (r *Client) GetAllAsyncTask() *map[string]*AsyncTask {
 	return &r.asyncTasks
 }
 
-// **************** Quote WebSocket ****************
-
-func (r *Client) SetQuoteWebSocket(ws *websocket.Conn) (err error) {
-	// global.M2A_LOG.Info("行情长连 SetQuoteWebSocket 22222222222222")
-
-	r.mtx.Lock()
-	defer r.mtx.Unlock()
-
-	// if r.wsQuoteConn != nil {
-	// 	r.wsQuoteConn.Close()
-	// }
-	r.wsQuoteConn = ws
-	r.quoteWriteChan = make(chan []byte, 100)
-
-	// 开始读取客户端发送信息
-	go r.readClientWsQuoteMessage()
-	// 开始推送客户端信息循环
-	go r.writeClientWsQuoteMessage()
-
-	// r.wsQuoteConn.SetCloseHandler(func(code int, text string) error {
-	// 	close(r.wsCloseChan)
-	// 	return nil
-	// })
-
-	global.M2A_LOG.Info("行情长连客户端接入", zap.Any("LoginID", r.LoginID), zap.Any("SessionID", r.SessionID))
-
-	return
-}
-
-// readClientWsQuoteMessage 处理终端发过来的websocket数据
-// 注意: 阻塞式, 直到websocket关闭才退出
-func (r *Client) readClientWsQuoteMessage() {
-	for {
-		fmt.Println("readClientWsQuoteMessage start")
-
-		// 40秒心跳超时
-		r.wsQuoteConn.SetReadDeadline(time.Now().Add(40 * time.Second))
-
-		mt, msg, err := r.wsQuoteConn.ReadMessage()
-		if err != nil {
-			fmt.Printf("readClientWsQuoteMessage: %v\n", err)
-			return
-		}
-
-		switch mt {
-		case websocket.PingMessage:
-			_ = r.wsQuoteConn.WriteMessage(mt, msg)
-		case websocket.CloseMessage:
-			return
-		case websocket.BinaryMessage:
-			if err := r.clientToQuoteAgentMsg(msg); err != nil {
-				return
-			}
-		case websocket.TextMessage:
-			fmt.Println(string(msg))
-			_ = r.wsQuoteConn.WriteMessage(mt, msg)
-		}
-
-		// FIXME: - 这里要判断是否有问题
-		// select {
-		// case <-r.wsCloseChan:
-		// 	return
-		// }
-	}
-}
-
-// writeClientWsQuoteMessage 由于websocket非线程安全,
-// 所以由统一协程写入
-func (r *Client) writeClientWsQuoteMessage() {
-	// defer r.close()
-	// for {
-	// 	select {
-	// 	case buf := <-r.quoteWriteChan:
-	// 		err := r.wsQuoteConn.WriteMessage(websocket.BinaryMessage, buf)
-	// 		if err != nil {
-	// 			return
-	// 		}
-	// 	case <-r.wsQuoteCloseChan: // 与终端连接关闭信息
-	// 		return
-	// 	}
-	// }
-	for {
-		buf, ok := <-r.quoteWriteChan
-		if !ok {
-			// 通道已关闭
-			return
-		}
-		err := r.wsQuoteConn.WriteMessage(websocket.BinaryMessage, buf)
-		if err != nil {
-			return
-		}
-	}
-}
-
-// clientToQuoteAgentMsg 处理客户端发上来的包
-func (r *Client) clientToQuoteAgentMsg(msg []byte) error {
-	var p packet.MiQuotePacket
-	err := p.UnPackHead(msg)
-	if err != nil {
-		// logger.Logger().Errorf("[%v c->s] invalid packet: %v", r.ProxyId, err)
-		return err
-	}
-	// 长度15 是心跳包, 目前只允许客户端上行心跳包
-	if p.Length == 15 {
-		// logger.Logger().Debugf("%v [%v c->s] %s", r.ClientIP, r.ProxyId, p.HeaderInfo())
-		// 将心跳发回给客户端
-		err = r.wsQuoteConn.WriteMessage(websocket.BinaryMessage, msg)
-		if err != nil {
-			return err
-		}
-	}
-
-	return nil
-}
-
 // **************** Trade WebSocket ****************
 
 func (r *Client) SetTradeWebSocket(ws *websocket.Conn) (err error) {

+ 0 - 90
client/msgRealQuote.go

@@ -1,90 +0,0 @@
-/**
-* @Author  : zou.yingbin
-* @Create  : 2022/3/26 14:52
-* @Modify  : 2022/3/26 14:52
-* @note    :
- */
-
-package client
-
-import (
-	"fmt"
-	"mtp20access/packet"
-	"strconv"
-)
-
-// DispatchRealQuote 分发行情
-func DispatchRealQuote(p *packet.MiQuotePacket, clinet *Client) {
-	// TODO: 目前只实现了订阅发送模式, 未支持全部发送模式
-
-	pending := make([]byte, 0)
-	// 解析接收到的商品
-	ware := parseWareInfo(p)
-	// 获取已订阅行情商品列表
-	quoteSubs := clinet.GetQuoteSubs()
-	for _, sub := range quoteSubs {
-		for i := range ware {
-			w := &ware[i]
-			if w.exchId == strconv.Itoa(sub.ExchangeId) && w.goodsCode == sub.GoodsCode {
-				pending = append(pending, w.buf...)
-			}
-		}
-	}
-	if len(pending) > 0 {
-		// 按商品重新打包
-		quote := packet.MiQuotePacket{
-			BigType:   p.BigType,
-			SmallType: p.SmallType,
-			SerialNum: p.SerialNum,
-			Mode:      p.Mode,
-		}
-		quote.Msg = pending
-		sendBuf := quote.EnPack()
-
-		// 发送给客户端
-		if clinet.quoteWriteChan != nil {
-			clinet.quoteWriteChan <- sendBuf
-		}
-	}
-}
-
-// parseWareInfo 从报文中解析出所有报价商品
-func parseWareInfo(p *packet.MiQuotePacket) []wareInfo {
-	ware := make([]wareInfo, 0)
-
-	// 0x42(66)是控制信息,要过滤掉
-	if p.OriMsg[5] == 0x42 {
-		return ware
-	}
-
-	data := p.OriMsg[14:]
-	nPos1, nPos2 := -1, -1
-	for i := 0; i < len(data); i++ {
-		// 报价包开始
-		if data[i] == 0x10 {
-			nPos1 = i
-		}
-		// 报价包结束
-		if data[i] == 0x11 {
-			nPos2 = i + 1
-		}
-		if nPos1 >= 0 && nPos2 > 0 {
-			// 处理闪退问题
-			if nPos1 > nPos2 {
-				fmt.Printf("接收到错误的行情记录(nPos1>nPos2): %v \n", p.OriMsg)
-
-				// 重置nPos1, nPos2 继续查找下一个报价包
-				nPos1, nPos2 = -1, -1
-			}
-
-			v := wareInfo{buf: data[nPos1:nPos2]}
-			v.parseField()
-			//v.printInfo()
-			//v.debugPrintAllField()
-			ware = append(ware, v)
-			// 重置nPos1, nPos2 继续查找下一个报价包
-			nPos1, nPos2 = -1, -1
-		}
-	}
-	return ware
-}

+ 0 - 85
client/wareInfo.go

@@ -1,85 +0,0 @@
-/**
-* @Author  : zou.yingbin
-* @Create  : 2022/3/23 11:05
-* @Modify  : 2022/3/23 11:05
-* @note    :
- */
-
-package client
-
-type wareInfo struct {
-	buf       []byte // 内容[报价开始(0x10):报价结束(0x11)]
-	exchId    string // 交易所id
-	goodsCode string // 商品代码
-	ask       string // 卖价
-	bid       string // 买价
-	last      string // 最新价
-	tm        string // 时间
-}
-
-func (r *wareInfo) getField(tag byte) string {
-	for i, v := range r.buf {
-		if v == tag {
-			if i > 0 && r.buf[i-1] == 0x01 {
-				for j, d := range r.buf[i:] {
-					if d == 0x01 || d == 0x11 {
-						buf := r.buf[i+2 : i+j]
-						return string(buf)
-					}
-				}
-			}
-		}
-	}
-	return ""
-}
-
-func (r *wareInfo) parseField() {
-	r.goodsCode = r.getField(0x21)
-	r.ask = r.getField(0x4c)
-	r.bid = r.getField(0x42)
-	r.last = r.getField(0x24)
-	r.tm = r.getField(0x41)
-	r.exchId = r.getField(0x56)
-	if r.exchId == "200" {
-		r.goodsCode = "200"
-	}
-}
-
-// func (r *wareInfo) printInfo() {
-// 	if r.ask == "" {
-// 		return
-// 	}
-// 	log.Println(r.exchId, r.goodsCode, r.ask, r.bid, r.last, r.tm)
-// }
-
-// func (r *wareInfo) debugPrintAllField() {
-// 	if len(r.buf) <= 2 {
-// 		return
-// 	}
-// 	if r.buf[0] != 0x10 || r.buf[len(r.buf)-1] != 0x11 {
-// 		return
-// 	}
-// 	type Field struct {
-// 		Tag   byte
-// 		Value string
-// 	}
-// 	var sField []Field
-// 	tmp := string(r.buf[2 : len(r.buf)-1])
-// 	//0x01 字段分割
-// 	strS := strings.Split(tmp, string([]byte{0x01}))
-// 	for _, v := range strS {
-// 		//0x02 值分割
-// 		split := strings.Split(v, string([]byte{0x02}))
-// 		if len(split) != 2 {
-// 			break
-// 		}
-// 		d := Field{
-// 			Tag:   ([]byte(split[0]))[0],
-// 			Value: split[1],
-// 		}
-// 		sField = append(sField, d)
-// 	}
-// 	for _, v := range sField {
-// 		log.Printf("0x%0x %s\n", v.Tag, v.Value)
-// 	}
-// }

+ 1 - 3
initialize/rabbitmq.go

@@ -55,7 +55,7 @@ func (t *MQProc) Process(topic, queuename string, msg *[]byte) {
 	if funcode, sessionId, bytes, serialNumber, err := t.getRspProtobuf(msg); err == nil && bytes != nil {
 		if sessionId == 0 || funcode == uint32(global.LogoutRsp) || funcode == uint32(global.LoginRsp) {
 			// 通知类 或 特殊处理
-			t.onNtf(funcode, sessionId, bytes)
+			// t.onNtf(funcode, sessionId, bytes)
 		} else {
 			// 请求回复
 			// 尝试获取对应异步任务
@@ -273,8 +273,6 @@ func (t *MQProc) onNtf(funcode uint32, sessionId uint32, bytes *[]byte) {
 			global.M2A_Publish.Publish(publish.Topic_Trading, m)
 		}
 	}
-
-	// global.M2A_LOG.Info("----------退出通知逻辑", zap.Any("funcode", funcode), zap.Any("sessionId", sessionId))
 }
 
 // getRspProtobuf 将总线回复的数据反序列化为Protobuf

+ 0 - 2
initialize/router.go

@@ -39,7 +39,6 @@ func Routers() *gin.Engine {
 			router.InitMQPublicRouter(publicGroup)
 		}
 		router.InitAccountPublicRouter(publicGroup)
-		router.InitQuotePublicRouter(publicGroup)
 		router.InitTradePublicRouter(publicGroup)
 	}
 
@@ -49,7 +48,6 @@ func Routers() *gin.Engine {
 	{
 		router.InitMQPrivateRouter(privateGroup)
 		router.InitAccountPrivateRouter(privateGroup)
-		router.InitQuotePrivateRouter(privateGroup)
 	}
 
 	return Router

+ 0 - 10
model/quote/request/quote.go

@@ -1,10 +0,0 @@
-package request
-
-type QuoteGoods struct {
-	ExchangeId int    `json:"exchangeId"` // 交易所代码,一般写死250
-	GoodsCode  string `json:"goodsCode"`  // 商品代码,全大写
-}
-
-type QuoteSubscribeReq struct {
-	QuoteGoodses []QuoteGoods // 待订阅商品信息
-}

+ 0 - 21
router/quote.go

@@ -1,21 +0,0 @@
-package router
-
-import (
-	"mtp20access/api/v1/quote"
-
-	"github.com/gin-gonic/gin"
-)
-
-func InitQuotePublicRouter(r *gin.RouterGroup) {
-	quoteR := r.Group("Quote").Use()
-	{
-		quoteR.GET("WS", quote.Quote)
-	}
-}
-
-func InitQuotePrivateRouter(r *gin.RouterGroup) {
-	quoteR := r.Group("Quote").Use()
-	{
-		quoteR.POST("QuoteSubscribe", quote.QuoteSubscribe)
-	}
-}

+ 0 - 126
service/quote/quote.go

@@ -1,126 +0,0 @@
-package quote
-
-import (
-	"errors"
-	"mtp20access/client"
-	"mtp20access/global"
-	commonRequest "mtp20access/model/common/request"
-	"mtp20access/model/common/response"
-	"mtp20access/model/quote/request"
-	"mtp20access/utils"
-	"net/http"
-
-	"github.com/gin-gonic/gin"
-	"github.com/gorilla/websocket"
-	"go.uber.org/zap"
-)
-
-// QuoteConn 终端连接WebSocket
-func QuoteConn(c *gin.Context) (err error) {
-	// 获取请求账号信息
-	// s, exists := c.Get("claims")
-	// if !exists {
-	// 	err = errors.New("获取请求账号信息异常")
-	// 	global.M2A_LOG.Error(err.Error())
-	// 	return
-	// }
-	// claims := s.(*commonRequest.CustomClaims)
-
-	// 我们这里jwt鉴权取头部信息 x-token 登录时回返回token信息 这里前端需要把token存储到cookie或者本地localStorage中 不过需要跟后端协商过期时间 可以约定刷新令牌或者重新登录
-	token := c.Request.Header.Get("Sec-WebSocket-Protocol")
-	if token == "" {
-		response.FailWithDetailed(gin.H{"reload": true}, "未登录或非法访问", c)
-		c.Abort()
-		return
-	}
-
-	j := utils.NewJWT()
-	// parseToken 解析token包含的信息
-	claims, err := j.ParseToken(token)
-	if err != nil {
-		if err == utils.ErrTokenExpired {
-			response.FailWithCodeAndDetail(gin.H{"reload": true}, response.ERROR_TOKEN_EXPIRED, "授权已过期", c)
-			// c.Abort()
-			return
-		}
-		response.FailWithDetailed(gin.H{"reload": true}, err.Error(), c)
-		// c.Abort()
-		return
-	}
-	// 从Redis获取登录信息
-	loginMap, err := j.GetRedisLogin(claims.LoginID, claims.Group)
-	if err != nil {
-		response.FailWithCodeAndDetail(gin.H{"reload": true}, response.ERROR_TOKEN_OTHER_LOGIN, "您的帐户异地登陆或令牌失效", c)
-		// c.Abort()
-		return
-	}
-	// 判断Token是否有效
-	if redisToken, isHas := loginMap["token"]; isHas {
-		if redisToken != token {
-			response.FailWithCodeAndDetail(gin.H{"reload": true}, response.ERROR_TOKEN_OTHER_LOGIN, "您的帐户异地登陆或令牌失效", c)
-			// c.Abort()
-			return
-		}
-	} else {
-		response.FailWithDetailed(gin.H{"reload": true}, "未登录或非法访问", c)
-		// c.Abort()
-		return
-	}
-
-	// 获取登录账户信息
-	client, exists := client.Clients[claims.SessionID]
-	if !exists {
-		err = errors.New("获取登录账户信息异常")
-		global.M2A_LOG.Error(err.Error())
-		return
-	}
-
-	// 需要注意的是,如果前端通过websocket连接时指定了Sec-WebSocket-Protocol,后端接收到连接后,必须原封不动的将Sec-WebSocket-Protocol头信息返回给前端,否则连接会抛出异常。
-	// c.Header("Sec-WebSocket-Protocol", token)
-	// c.String(200, "ok")
-	// 将http连接升级为websocket连接
-	upGrader := websocket.Upgrader{
-		ReadBufferSize:  1000000,
-		WriteBufferSize: 1000000,
-		CheckOrigin: func(r *http.Request) bool {
-			return true
-		},
-		//将获取的参数放进这个数组
-		Subprotocols: []string{token},
-	}
-	ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
-	if err != nil {
-		err = errors.New("连接错误")
-		global.M2A_LOG.Error("将http连接升级为websocket连接失败", zap.Any("err", err))
-		return
-	}
-
-	client.SetQuoteWebSocket(ws)
-
-	return
-}
-
-// QuoteSubscribe 设置客户订阅商品信息
-func QuoteSubscribe(c *gin.Context, req request.QuoteSubscribeReq) (err error) {
-	// 获取请求账号信息
-	s, exists := c.Get("claims")
-	if !exists {
-		err = errors.New("获取请求账号信息异常")
-		global.M2A_LOG.Error(err.Error())
-		return
-	}
-	claims := s.(*commonRequest.CustomClaims)
-
-	// 获取登录账户信息
-	client, exists := client.Clients[claims.SessionID]
-	if !exists {
-		err = errors.New("获取登录账户信息异常")
-		global.M2A_LOG.Error(err.Error())
-		return
-	}
-
-	// 设置客户订阅商品信息
-	client.SetQuoteSubs(req)
-
-	return
-}