package client import ( "bytes" "encoding/json" "fmt" "mtp20access/global" rsp "mtp20access/model/mq/response" "mtp20access/model/quote/request" "mtp20access/packet" "mtp20access/publish" "sync" "time" "github.com/gorilla/websocket" "github.com/mitchellh/mapstructure" "go.uber.org/zap" ) var Clients map[int]*Client // key:SessionID type Client struct { LoginRedis // FIXME: - 这里是否需要定义多个锁? mtx sync.RWMutex curSerialNumber uint32 // 当前业务流水号 asyncTasks map[string]*AsyncTask // key:SessionId_FuncodeRsp_SerialNumber wsQuoteConn *websocket.Conn // 终端行情WebSocket连接 quoteWriteChan chan []byte // 推送队列 Access -> Client quoteChan chan interface{} // 接收实时行情订阅channel QuoteServer -> Access quoteSubs []request.QuoteGoods // 当前已订阅行情的商品 qmtx sync.RWMutex // wsQuoteCloseChan chan struct{} // 终端WebSocket连接关闭信号 wsTradeConn *websocket.Conn // 终端交易WebSocket连接 tradeWriteChan chan []byte // 推送队列 Access -> Client tradeChan chan interface{} // 接收交易通知channel RabbitMQ -> Access tmtx sync.RWMutex // wsTradeCloseChan chan struct{} // 终端WebSocket连接关闭信号 } // GetSerialNumber 获取可用流水号 func (r *Client) GetSerialNumber() uint32 { r.mtx.Lock() defer func() { r.mtx.Unlock() }() r.curSerialNumber += 1 return r.curSerialNumber } // SetQuoteSubs 设置商品行情订阅信息 func (r *Client) SetQuoteSubs(req request.QuoteSubscribeReq) { r.qmtx.Lock() defer r.qmtx.Unlock() r.quoteSubs = req.QuoteGoodses // 从订阅中心订阅行情通知 if r.quoteChan != nil { global.M2A_Publish.Unsubscribe(publish.Topic_Quote, r.quoteChan) } r.quoteChan = global.M2A_Publish.Subscribe(publish.Topic_Quote) go func() { for { msg, ok := <-r.quoteChan if !ok { return // 管道已关闭,退出循环 } // 向客户端发送行情信息 if p, ok := msg.(*packet.MiQuotePacket); ok { DispatchRealQuote(p, r) } } }() } // SetQuoteSubs 设置交易通知订阅信息 func (r *Client) SetTradeNft() { r.tmtx.Lock() defer r.tmtx.Unlock() // 从订阅中心订阅交易通知 if r.tradeChan != nil { global.M2A_Publish.Unsubscribe(publish.Topic_Trading, r.tradeChan) } r.tradeChan = global.M2A_Publish.Subscribe(publish.Topic_Trading) go func() { for { msg, ok := <-r.tradeChan if !ok { return // 管道已关闭,退出循环 } // 向客户端发送交易通知信息 if m, ok := msg.(map[string][]byte); ok { // 判断是否自己的通知 if p, ok := m[r.SessionID]; ok { DispatchNftTrade(&p, r) } } } }() } // GetQuoteSubs 获取已订阅行情商品信息列表 func (r *Client) GetQuoteSubs() (quoteSubs []request.QuoteGoods) { r.qmtx.Lock() defer r.qmtx.Unlock() return r.quoteSubs } // // WriteWsBuf 向客户端发送实时行情 // func (r *Client) WriteQuoteWsBuf(buf []byte) (err error) { // if r.quoteWriteChan != nil { // r.quoteWriteChan <- buf // } // return // } // func (r *Client) WriteTradeWsBuf(buf []byte) (err error) { // if r.tradeWriteChan != nil { // r.tradeWriteChan <- buf // } else { // global.M2A_LOG.Info("WriteTradeWsBuf::tradeWriteChan == nil", zap.Any("LoginID", r.LoginID), zap.Any("SessionID", r.SessionID)) // } // if r.wsTradeConn != nil { // // r.tradeWriteChan <- buf // } else { // global.M2A_LOG.Info("WriteTradeWsBuf::wsTradeConn == nil", zap.Any("LoginID", r.LoginID), zap.Any("SessionID", r.SessionID)) // global.M2A_LOG.Info("当前所有Client----") // for _, item := range Clients { // global.M2A_LOG.Info("client", // zap.Any("LoginID", item.LoginID), // zap.Any("Group", item.Group), zap.Any("SessionID", item.SessionID), // zap.Any("wsTradeConn", item.wsTradeConn), zap.Any("tradeWriteChan", item.tradeWriteChan), // zap.Any("wsQuoteConn", item.wsQuoteConn), zap.Any("quoteWriteChan", item.quoteWriteChan)) // } // global.M2A_LOG.Info("----------------") // } // return // } // GetAsyncTask 获取目标异步任务 // key:SessionId_FuncodeRsp func (r *Client) GetAsyncTask(key string) (asyncTask *AsyncTask) { r.mtx.RLock() defer func() { r.mtx.RUnlock() }() asyncTask, ok := r.asyncTasks[key] if ok { return asyncTask } else { return nil } } // SetAsyncTask 设置异步任务 func (r *Client) SetAsyncTask(asyncTask *AsyncTask, key string) { r.mtx.Lock() defer func() { r.mtx.Unlock() }() if r.asyncTasks == nil { r.asyncTasks = make(map[string]*AsyncTask, 0) } delete(r.asyncTasks, key) r.asyncTasks[key] = asyncTask } // DeleteAsyncTask 删除异步任务 func (r *Client) DeleteAsyncTask(key string) { r.mtx.Lock() defer func() { r.mtx.Unlock() }() delete(r.asyncTasks, key) } func (r *Client) GetAllAsyncTask() *map[string]*AsyncTask { return &r.asyncTasks } // **************** Quote WebSocket **************** func (r *Client) SetQuoteWebSocket(ws *websocket.Conn) (err error) { r.qmtx.Lock() defer r.qmtx.Unlock() // if r.wsQuoteConn != nil { // r.wsQuoteConn.Close() // } r.wsQuoteConn = ws r.quoteWriteChan = make(chan []byte, 100) // 开始读取客户端发送信息 go r.readClientWsQuoteMessage() // 开始推送客户端信息循环 go r.writeClientWsQuoteMessage() // r.wsQuoteConn.SetCloseHandler(func(code int, text string) error { // close(r.wsCloseChan) // return nil // }) return } // readClientWsQuoteMessage 处理终端发过来的websocket数据 // 注意: 阻塞式, 直到websocket关闭才退出 func (r *Client) readClientWsQuoteMessage() { for { fmt.Println("readClientWsQuoteMessage start") // 40秒心跳超时 r.wsQuoteConn.SetReadDeadline(time.Now().Add(40 * time.Second)) mt, msg, err := r.wsQuoteConn.ReadMessage() if err != nil { fmt.Printf("readClientWsQuoteMessage: %v\n", err) return } switch mt { case websocket.PingMessage: _ = r.wsQuoteConn.WriteMessage(mt, msg) case websocket.CloseMessage: return case websocket.BinaryMessage: if err := r.clientToQuoteAgentMsg(msg); err != nil { return } case websocket.TextMessage: fmt.Println(string(msg)) _ = r.wsQuoteConn.WriteMessage(mt, msg) } // FIXME: - 这里要判断是否有问题 // select { // case <-r.wsCloseChan: // return // } } } // writeClientWsQuoteMessage 由于websocket非线程安全, // 所以由统一协程写入 func (r *Client) writeClientWsQuoteMessage() { // defer r.close() // for { // select { // case buf := <-r.quoteWriteChan: // err := r.wsQuoteConn.WriteMessage(websocket.BinaryMessage, buf) // if err != nil { // return // } // case <-r.wsQuoteCloseChan: // 与终端连接关闭信息 // return // } // } for { buf, ok := <-r.quoteWriteChan if !ok { // 通道已关闭 return } err := r.wsQuoteConn.WriteMessage(websocket.BinaryMessage, buf) if err != nil { return } } } // clientToQuoteAgentMsg 处理客户端发上来的包 func (r *Client) clientToQuoteAgentMsg(msg []byte) error { var p packet.MiQuotePacket err := p.UnPackHead(msg) if err != nil { // logger.Logger().Errorf("[%v c->s] invalid packet: %v", r.ProxyId, err) return err } // 长度15 是心跳包, 目前只允许客户端上行心跳包 if p.Length == 15 { // logger.Logger().Debugf("%v [%v c->s] %s", r.ClientIP, r.ProxyId, p.HeaderInfo()) // 将心跳发回给客户端 err = r.wsQuoteConn.WriteMessage(websocket.BinaryMessage, msg) if err != nil { return err } } return nil } // **************** Trade WebSocket **************** func (r *Client) SetTradeWebSocket(ws *websocket.Conn) (err error) { r.tmtx.Lock() defer r.tmtx.Unlock() // if r.wsTradeConn != nil { // r.wsTradeConn.Close() // } r.wsTradeConn = ws r.tradeWriteChan = make(chan []byte, 100) r.wsTradeConn.SetCloseHandler(func(code int, text string) error { global.M2A_LOG.Info("交易长连发生断开", zap.Any("LoginID", r.LoginID), zap.Any("SessionID", r.SessionID), zap.Any("code", code), zap.Any("text", text)) return nil }) // 开始读取客户端发送信息 go r.readClientWsTradeMessage() // 开始推送客户端信息循环 go r.writeClientWsTradeMessage() // 设置交易通知订阅信息 r.SetTradeNft() global.M2A_LOG.Info("交易长连客户端接入", zap.Any("LoginID", r.LoginID), zap.Any("SessionID", r.SessionID)) return } // readClientWsQuoteMessage 处理终端发过来的websocket数据 // 注意: 阻塞式, 直到websocket关闭才退出 func (r *Client) readClientWsTradeMessage() { for { fmt.Println("readClientWsTradeMessage start") // 40秒心跳超时 r.wsTradeConn.SetReadDeadline(time.Now().Add(40 * time.Second)) mt, msg, err := r.wsTradeConn.ReadMessage() if err != nil { global.M2A_LOG.Info("readClientWsTradeMessage::ReadMessage发生错误", zap.Any("err", err)) return } switch mt { case websocket.PingMessage: if err = r.wsTradeConn.WriteMessage(mt, msg); err != nil { global.M2A_LOG.Info("readClientWsTradeMessage::PingMessage发生错误", zap.Any("err", err)) } case websocket.CloseMessage: return case websocket.BinaryMessage: if err := r.clientToTradeAgentMsg(msg); err != nil { global.M2A_LOG.Info("readClientWsTradeMessage::BinaryMessage发生错误", zap.Any("err", err)) return } case websocket.TextMessage: if err = r.wsTradeConn.WriteMessage(mt, msg); err != nil { global.M2A_LOG.Info("readClientWsTradeMessage::TextMessage发生错误", zap.Any("err", err)) return } } } } // writeClientWsQuoteMessage 由于websocket非线程安全, // 所以由统一协程写入 func (r *Client) writeClientWsTradeMessage() { for { buf, ok := <-r.tradeWriteChan if !ok { // 通道已关闭 global.M2A_LOG.Info("writeClientWsTradeMessage 通道已关闭", zap.Any("LoginID", r.LoginID), zap.Any("SessionID", r.SessionID)) return } err := r.wsTradeConn.WriteMessage(websocket.BinaryMessage, buf) if err != nil { global.M2A_LOG.Info("writeClientWsTradeMessage 向客户端发送信息时发生错误", zap.Any("LoginID", r.LoginID), zap.Any("SessionID", r.SessionID), zap.Any("err", err)) return } } } // clientToTradeAgentMsg 处理客户端发上来的包 func (r *Client) clientToTradeAgentMsg(msg []byte) error { var p packet.MiPacket err := p.UnPackHead(msg) if err != nil { // logger.Logger().Errorf("[%v c->s] invalid packet: %v", r.ProxyId, err) return err } // 长度24 是心跳包, 目前只允许客户端上行心跳包 if p.Length == 24 { // logger.Logger().Debugf("%v [%v c->s] %s", r.ClientIP, r.ProxyId, p.HeaderInfo()) // 将心跳发回给客户端 err = r.wsTradeConn.WriteMessage(websocket.BinaryMessage, msg) if err != nil { return err } } return nil } // **************** RabbitMQ **************** // MQPacket 与总线交互的数据体 type MQPacket struct { FunCode uint32 // 功能码 SessionId uint32 // 数据包的sid Data *[]byte // 业务数据体 } // AsyncTask 异步任务结构体 type AsyncTask struct { PacketRsp chan MQPacket // 总线数据处理通道 FuncodeRsp uint32 // 回复功能码 SerialNumber uint32 // 通信流水号 Own *Client IsEncrypted bool // 是否加密 Rsp chan rsp.MQBodyRsp // 回调 doClose sync.Once // 仅关闭通道一次 } // Finish 完成 func (r *AsyncTask) Finish() { r.doClose.Do( func() { close(r.Rsp) key := fmt.Sprintf("%v_%v_%v", r.Own.SessionID, r.FuncodeRsp, r.SerialNumber) r.Own.DeleteAsyncTask(key) }) } type LoginRedis struct { LoginID string `json:"loginId" redis:"loginId"` // 登陆账号 UserID string `json:"userId" redis:"userId"` // 用户ID SessionID string `json:"sessionId" redis:"sessionId"` // 终端sid Token string `json:"token" redis:"token"` // 令牌 Group string `json:"group" redis:"group"` // 终端分组 Addr string `json:"addr" redis:"addr"` // 客户端地址信息 // FIXME: 由于本服务改用短连,所以每次提交请交请求可能会不一样,后期可判断是否在中间件中进行拦截 OldToken string `json:"-" redis:"-"` // 旧接入Token } // FromMap Map to Struct func (r *LoginRedis) FromMap(val map[string]interface{}) error { return mapstructure.Decode(val, r) } // ToMap Struct to Map func (r *LoginRedis) ToMap() (val map[string]interface{}, err error) { if marshalContent, err := json.Marshal(r); err != nil { return nil, err } else { d := json.NewDecoder(bytes.NewReader(marshalContent)) d.UseNumber() // 设置将float64转为一个number if err := d.Decode(&val); err != nil { fmt.Println(err) } else { for k, v := range val { val[k] = v } } } return }