client.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. package client
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "mtp20access/global"
  7. rsp "mtp20access/model/mq/response"
  8. "mtp20access/packet"
  9. "mtp20access/publish"
  10. "sync"
  11. "time"
  12. "github.com/gorilla/websocket"
  13. "github.com/mitchellh/mapstructure"
  14. "go.uber.org/zap"
  15. )
  16. var Clients map[int]*Client // key:SessionID
  17. type Client struct {
  18. LoginRedis
  19. // FIXME: - 这里是否需要定义多个锁?
  20. mtx sync.RWMutex
  21. curSerialNumber uint32 // 当前业务流水号
  22. asyncTasks map[string]*AsyncTask // key:SessionId_FuncodeRsp_SerialNumber
  23. wsTradeConn *websocket.Conn // 终端交易WebSocket连接
  24. tradeWriteChan chan []byte // 推送队列 Access -> Client
  25. tradeChan chan interface{} // 接收交易通知channel RabbitMQ -> Access
  26. // wsTradeCloseChan chan struct{} // 终端WebSocket连接关闭信号
  27. }
  28. // GetSerialNumber 获取可用流水号
  29. func (r *Client) GetSerialNumber() uint32 {
  30. r.mtx.Lock()
  31. defer func() {
  32. r.mtx.Unlock()
  33. }()
  34. r.curSerialNumber += 1
  35. return r.curSerialNumber
  36. }
  37. // SetQuoteSubs 设置交易通知订阅信息
  38. func (r *Client) SetTradeNft() {
  39. // global.M2A_LOG.Info("SetTradeNft 11111111111111111111111111111")
  40. // r.tmtx.Lock()
  41. // defer r.tmtx.Unlock()
  42. // 从订阅中心订阅交易通知
  43. if r.tradeChan != nil {
  44. global.M2A_Publish.Unsubscribe(publish.Topic_Trading, r.tradeChan)
  45. }
  46. r.tradeChan = global.M2A_Publish.Subscribe(publish.Topic_Trading)
  47. global.M2A_LOG.Info("SetTradeNft 添加交易通知订阅", zap.Any("r.tradeChan", r.tradeChan))
  48. go func() {
  49. for {
  50. msg, ok := <-r.tradeChan
  51. if !ok {
  52. return // 管道已关闭,退出循环
  53. }
  54. // 向客户端发送交易通知信息
  55. if m, ok := msg.(map[string][]byte); ok {
  56. // 判断是否自己的通知
  57. if p, ok := m[r.SessionID]; ok {
  58. // 发送给客户端
  59. if r.tradeWriteChan != nil {
  60. r.tradeWriteChan <- p
  61. }
  62. }
  63. }
  64. }
  65. }()
  66. }
  67. // GetAsyncTask 获取目标异步任务
  68. // key:SessionId_FuncodeRsp
  69. func (r *Client) GetAsyncTask(key string) (asyncTask *AsyncTask) {
  70. r.mtx.RLock()
  71. defer func() {
  72. r.mtx.RUnlock()
  73. }()
  74. asyncTask, ok := r.asyncTasks[key]
  75. if ok {
  76. return asyncTask
  77. } else {
  78. return nil
  79. }
  80. }
  81. // SetAsyncTask 设置异步任务
  82. func (r *Client) SetAsyncTask(asyncTask *AsyncTask, key string) {
  83. r.mtx.Lock()
  84. defer func() {
  85. r.mtx.Unlock()
  86. }()
  87. if r.asyncTasks == nil {
  88. r.asyncTasks = make(map[string]*AsyncTask, 0)
  89. }
  90. delete(r.asyncTasks, key)
  91. r.asyncTasks[key] = asyncTask
  92. }
  93. // DeleteAsyncTask 删除异步任务
  94. func (r *Client) DeleteAsyncTask(key string) {
  95. r.mtx.Lock()
  96. defer func() {
  97. r.mtx.Unlock()
  98. }()
  99. delete(r.asyncTasks, key)
  100. }
  101. func (r *Client) GetAllAsyncTask() *map[string]*AsyncTask {
  102. return &r.asyncTasks
  103. }
  104. // **************** Trade WebSocket ****************
  105. func (r *Client) SetTradeWebSocket(ws *websocket.Conn) (err error) {
  106. // global.M2A_LOG.Info("交易长连 SetTradeWebSocket 33333333333333333333")
  107. r.mtx.Lock()
  108. defer r.mtx.Unlock()
  109. // if r.wsTradeConn != nil {
  110. // r.wsTradeConn.Close()
  111. // }
  112. r.wsTradeConn = ws
  113. r.tradeWriteChan = make(chan []byte, 100)
  114. r.wsTradeConn.SetCloseHandler(func(code int, text string) error {
  115. global.M2A_LOG.Info("交易长连发生断开", zap.Any("LoginID", r.LoginID), zap.Any("SessionID", r.SessionID), zap.Any("code", code), zap.Any("text", text))
  116. return nil
  117. })
  118. // 开始读取客户端发送信息
  119. go r.readClientWsTradeMessage()
  120. // 开始推送客户端信息循环
  121. go r.writeClientWsTradeMessage()
  122. // 设置交易通知订阅信息
  123. r.SetTradeNft()
  124. global.M2A_LOG.Info("交易长连客户端接入", zap.Any("LoginID", r.LoginID), zap.Any("SessionID", r.SessionID))
  125. return
  126. }
  127. // readClientWsQuoteMessage 处理终端发过来的websocket数据
  128. // 注意: 阻塞式, 直到websocket关闭才退出
  129. func (r *Client) readClientWsTradeMessage() {
  130. for {
  131. fmt.Println("readClientWsTradeMessage start")
  132. // 40秒心跳超时
  133. r.wsTradeConn.SetReadDeadline(time.Now().Add(40 * time.Second))
  134. mt, msg, err := r.wsTradeConn.ReadMessage()
  135. if err != nil {
  136. global.M2A_LOG.Info("readClientWsTradeMessage::ReadMessage发生错误", zap.Any("err", err))
  137. return
  138. }
  139. switch mt {
  140. case websocket.PingMessage:
  141. if err = r.wsTradeConn.WriteMessage(mt, msg); err != nil {
  142. global.M2A_LOG.Info("readClientWsTradeMessage::PingMessage发生错误", zap.Any("err", err))
  143. }
  144. case websocket.CloseMessage:
  145. return
  146. case websocket.BinaryMessage:
  147. if err := r.clientToTradeAgentMsg(msg); err != nil {
  148. global.M2A_LOG.Info("readClientWsTradeMessage::BinaryMessage发生错误", zap.Any("err", err))
  149. return
  150. }
  151. case websocket.TextMessage:
  152. if err = r.wsTradeConn.WriteMessage(mt, msg); err != nil {
  153. global.M2A_LOG.Info("readClientWsTradeMessage::TextMessage发生错误", zap.Any("err", err))
  154. return
  155. }
  156. }
  157. }
  158. }
  159. // writeClientWsQuoteMessage 由于websocket非线程安全,
  160. // 所以由统一协程写入
  161. func (r *Client) writeClientWsTradeMessage() {
  162. for {
  163. buf, ok := <-r.tradeWriteChan
  164. if !ok {
  165. // 通道已关闭
  166. global.M2A_LOG.Info("writeClientWsTradeMessage 通道已关闭", zap.Any("LoginID", r.LoginID), zap.Any("SessionID", r.SessionID))
  167. return
  168. }
  169. err := r.wsTradeConn.WriteMessage(websocket.BinaryMessage, buf)
  170. if err != nil {
  171. global.M2A_LOG.Info("writeClientWsTradeMessage 向客户端发送信息时发生错误", zap.Any("LoginID", r.LoginID), zap.Any("SessionID", r.SessionID), zap.Any("err", err))
  172. return
  173. }
  174. }
  175. }
  176. // clientToTradeAgentMsg 处理客户端发上来的包
  177. func (r *Client) clientToTradeAgentMsg(msg []byte) error {
  178. var p packet.MiPacket
  179. err := p.UnPackHead(msg)
  180. if err != nil {
  181. // logger.Logger().Errorf("[%v c->s] invalid packet: %v", r.ProxyId, err)
  182. return err
  183. }
  184. // 长度24 是心跳包, 目前只允许客户端上行心跳包
  185. if p.Length == 24 {
  186. // logger.Logger().Debugf("%v [%v c->s] %s", r.ClientIP, r.ProxyId, p.HeaderInfo())
  187. // 将心跳发回给客户端
  188. err = r.wsTradeConn.WriteMessage(websocket.BinaryMessage, msg)
  189. if err != nil {
  190. return err
  191. }
  192. }
  193. return nil
  194. }
  195. // **************** RabbitMQ ****************
  196. // MQPacket 与总线交互的数据体
  197. type MQPacket struct {
  198. FunCode uint32 // 功能码
  199. SessionId uint32 // 数据包的sid
  200. Data *[]byte // 业务数据体
  201. }
  202. // AsyncTask 异步任务结构体
  203. type AsyncTask struct {
  204. PacketRsp chan MQPacket // 总线数据处理通道
  205. FuncodeRsp uint32 // 回复功能码
  206. SerialNumber uint32 // 通信流水号
  207. Own *Client
  208. IsEncrypted bool // 是否加密
  209. Rsp chan rsp.MQBodyRsp // 回调
  210. doClose sync.Once // 仅关闭通道一次
  211. }
  212. // Finish 完成
  213. func (r *AsyncTask) Finish() {
  214. r.doClose.Do(
  215. func() {
  216. close(r.Rsp)
  217. key := fmt.Sprintf("%v_%v_%v", r.Own.SessionID, r.FuncodeRsp, r.SerialNumber)
  218. // 银行服务相关的回复流水号是错误的,所以需要特殊处理
  219. if int(r.FuncodeRsp) == global.T2bBankSignRsp ||
  220. int(r.FuncodeRsp) == global.T2bSMSVerificationCodeRsp ||
  221. int(r.FuncodeRsp) == global.T2bBankCancelSignRsp ||
  222. int(r.FuncodeRsp) == global.T2bBankWithdrawRsp ||
  223. int(r.FuncodeRsp) == global.T2bBankDepositRsp {
  224. key = fmt.Sprintf("%v_%v", r.Own.SessionID, r.FuncodeRsp)
  225. }
  226. r.Own.DeleteAsyncTask(key)
  227. })
  228. }
  229. type LoginRedis struct {
  230. LoginID string `json:"loginId" redis:"loginId"` // 登陆账号
  231. UserID string `json:"userId" redis:"userId"` // 用户ID
  232. SessionID string `json:"sessionId" redis:"sessionId"` // 终端sid
  233. Token string `json:"token" redis:"token"` // 令牌
  234. Group string `json:"group" redis:"group"` // 终端分组
  235. Addr string `json:"addr" redis:"addr"` // 客户端地址信息 // FIXME: 由于本服务改用短连,所以每次提交请交请求可能会不一样,后期可判断是否在中间件中进行拦截
  236. OldToken string `json:"-" redis:"-"` // 旧接入Token
  237. }
  238. // FromMap Map to Struct
  239. func (r *LoginRedis) FromMap(val map[string]interface{}) error {
  240. return mapstructure.Decode(val, r)
  241. }
  242. // ToMap Struct to Map
  243. func (r *LoginRedis) ToMap() (val map[string]interface{}, err error) {
  244. if marshalContent, err := json.Marshal(r); err != nil {
  245. return nil, err
  246. } else {
  247. d := json.NewDecoder(bytes.NewReader(marshalContent))
  248. d.UseNumber() // 设置将float64转为一个number
  249. if err := d.Decode(&val); err != nil {
  250. fmt.Println(err)
  251. } else {
  252. for k, v := range val {
  253. val[k] = v
  254. }
  255. }
  256. }
  257. return
  258. }