|
@@ -21,16 +21,22 @@ var Clients map[int]*Client // key:SessionID
|
|
|
type Client struct {
|
|
type Client struct {
|
|
|
LoginRedis
|
|
LoginRedis
|
|
|
|
|
|
|
|
|
|
+ // FIXME: - 这里是否需要定义多个锁?
|
|
|
mtx sync.RWMutex
|
|
mtx sync.RWMutex
|
|
|
curSerialNumber uint32 // 当前业务流水号
|
|
curSerialNumber uint32 // 当前业务流水号
|
|
|
asyncTasks map[string]*AsyncTask // key:SessionId_FuncodeRsp_SerialNumber
|
|
asyncTasks map[string]*AsyncTask // key:SessionId_FuncodeRsp_SerialNumber
|
|
|
|
|
|
|
|
- wsConn *websocket.Conn // 终端WebSocket连接
|
|
|
|
|
- quoteSubs []request.QuoteGoods // 当前已订阅行情的商品
|
|
|
|
|
- ch chan interface{} // 接收实时行情订阅channel
|
|
|
|
|
- unSubscribe chan struct{} // 取消订阅信号
|
|
|
|
|
- writeChan chan []byte // 推送队列 QuoteServer -> Client
|
|
|
|
|
- wsCloseChan chan struct{} // 终端WebSocket连接关闭信号
|
|
|
|
|
|
|
+ wsQuoteConn *websocket.Conn // 终端行情WebSocket连接
|
|
|
|
|
+ quoteWriteChan chan []byte // 推送队列 Access -> Client
|
|
|
|
|
+ quoteChan chan interface{} // 接收实时行情订阅channel QuoteServer -> Access
|
|
|
|
|
+ quoteSubs []request.QuoteGoods // 当前已订阅行情的商品
|
|
|
|
|
+ unSubscribe chan struct{} // 取消订阅信号
|
|
|
|
|
+ wsQuoteCloseChan chan struct{} // 终端WebSocket连接关闭信号
|
|
|
|
|
+
|
|
|
|
|
+ wsTradeConn *websocket.Conn // 终端交易WebSocket连接
|
|
|
|
|
+ tradeWriteChan chan []byte // 推送队列 Access -> Client
|
|
|
|
|
+ tradeChan chan interface{} // 接收交易通知channel RabbitMQ -> Access
|
|
|
|
|
+ wsTradeCloseChan chan struct{} // 终端WebSocket连接关闭信号
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// GetSerialNumber 获取可用流水号
|
|
// GetSerialNumber 获取可用流水号
|
|
@@ -51,16 +57,16 @@ func (r *Client) SetQuoteSubs(req request.QuoteSubscribeReq) {
|
|
|
|
|
|
|
|
r.quoteSubs = req.QuoteGoodses
|
|
r.quoteSubs = req.QuoteGoodses
|
|
|
|
|
|
|
|
- if r.ch != nil {
|
|
|
|
|
|
|
+ if r.quoteChan != nil {
|
|
|
// r.unSubscribe <- struct{}{}
|
|
// r.unSubscribe <- struct{}{}
|
|
|
- global.M2A_Publish.Unsubscribe(publish.Topic_Quote, r.ch)
|
|
|
|
|
|
|
+ global.M2A_Publish.Unsubscribe(publish.Topic_Quote, r.quoteChan)
|
|
|
}
|
|
}
|
|
|
- r.ch = global.M2A_Publish.Subscribe(publish.Topic_Quote)
|
|
|
|
|
|
|
+ r.quoteChan = global.M2A_Publish.Subscribe(publish.Topic_Quote)
|
|
|
|
|
|
|
|
go func() {
|
|
go func() {
|
|
|
for {
|
|
for {
|
|
|
select {
|
|
select {
|
|
|
- case msg, ok := <-r.ch:
|
|
|
|
|
|
|
+ case msg, ok := <-r.quoteChan:
|
|
|
if !ok {
|
|
if !ok {
|
|
|
return // 管道已关闭,退出循环
|
|
return // 管道已关闭,退出循环
|
|
|
}
|
|
}
|
|
@@ -86,7 +92,7 @@ func (r *Client) GetQuoteSubs() (quoteSubs []request.QuoteGoods) {
|
|
|
|
|
|
|
|
// WriteWsBuf 向客户端发送实时行情
|
|
// WriteWsBuf 向客户端发送实时行情
|
|
|
func (r *Client) WriteWsBuf(buf []byte) (err error) {
|
|
func (r *Client) WriteWsBuf(buf []byte) (err error) {
|
|
|
- r.writeChan <- buf
|
|
|
|
|
|
|
+ r.quoteWriteChan <- buf
|
|
|
|
|
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
@@ -134,22 +140,24 @@ func (r *Client) GetAllAsyncTask() *map[string]*AsyncTask {
|
|
|
return &r.asyncTasks
|
|
return &r.asyncTasks
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (r *Client) SetWebSocket(ws *websocket.Conn) (err error) {
|
|
|
|
|
|
|
+// **************** Quote WebSocket ****************
|
|
|
|
|
+
|
|
|
|
|
+func (r *Client) SetQuoteWebSocket(ws *websocket.Conn) (err error) {
|
|
|
r.mtx.Lock()
|
|
r.mtx.Lock()
|
|
|
defer r.mtx.Unlock()
|
|
defer r.mtx.Unlock()
|
|
|
|
|
|
|
|
- if r.wsConn != nil {
|
|
|
|
|
- r.wsConn.Close()
|
|
|
|
|
|
|
+ if r.wsQuoteConn != nil {
|
|
|
|
|
+ r.wsQuoteConn.Close()
|
|
|
}
|
|
}
|
|
|
- r.wsConn = ws
|
|
|
|
|
- r.writeChan = make(chan []byte, 100)
|
|
|
|
|
|
|
+ r.wsQuoteConn = ws
|
|
|
|
|
+ r.quoteWriteChan = make(chan []byte, 100)
|
|
|
|
|
|
|
|
// 开始读取客户端发送信息
|
|
// 开始读取客户端发送信息
|
|
|
- go r.readClientWsMessage()
|
|
|
|
|
|
|
+ go r.readClientWsQuoteMessage()
|
|
|
// 开始推送客户端信息循环
|
|
// 开始推送客户端信息循环
|
|
|
- go r.writeClientWsMessage()
|
|
|
|
|
|
|
+ go r.writeClientWsQuoteMessage()
|
|
|
|
|
|
|
|
- // r.wsConn.SetCloseHandler(func(code int, text string) error {
|
|
|
|
|
|
|
+ // r.wsQuoteConn.SetCloseHandler(func(code int, text string) error {
|
|
|
// close(r.wsCloseChan)
|
|
// close(r.wsCloseChan)
|
|
|
// return nil
|
|
// return nil
|
|
|
// })
|
|
// })
|
|
@@ -157,14 +165,14 @@ func (r *Client) SetWebSocket(ws *websocket.Conn) (err error) {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// readClientWsMessage 处理终端发过来的websocket数据
|
|
|
|
|
|
|
+// readClientWsQuoteMessage 处理终端发过来的websocket数据
|
|
|
// 注意: 阻塞式, 直到websocket关闭才退出
|
|
// 注意: 阻塞式, 直到websocket关闭才退出
|
|
|
-func (r *Client) readClientWsMessage() {
|
|
|
|
|
|
|
+func (r *Client) readClientWsQuoteMessage() {
|
|
|
for {
|
|
for {
|
|
|
// 40秒心跳超时
|
|
// 40秒心跳超时
|
|
|
- r.wsConn.SetReadDeadline(time.Now().Add(40 * time.Second))
|
|
|
|
|
|
|
+ r.wsQuoteConn.SetReadDeadline(time.Now().Add(40 * time.Second))
|
|
|
|
|
|
|
|
- mt, msg, err := r.wsConn.ReadMessage()
|
|
|
|
|
|
|
+ mt, msg, err := r.wsQuoteConn.ReadMessage()
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
fmt.Println(err)
|
|
fmt.Println(err)
|
|
|
break
|
|
break
|
|
@@ -172,7 +180,7 @@ func (r *Client) readClientWsMessage() {
|
|
|
|
|
|
|
|
switch mt {
|
|
switch mt {
|
|
|
case websocket.PingMessage:
|
|
case websocket.PingMessage:
|
|
|
- _ = r.wsConn.WriteMessage(mt, msg)
|
|
|
|
|
|
|
+ _ = r.wsQuoteConn.WriteMessage(mt, msg)
|
|
|
case websocket.CloseMessage:
|
|
case websocket.CloseMessage:
|
|
|
return
|
|
return
|
|
|
case websocket.BinaryMessage:
|
|
case websocket.BinaryMessage:
|
|
@@ -181,7 +189,7 @@ func (r *Client) readClientWsMessage() {
|
|
|
}
|
|
}
|
|
|
case websocket.TextMessage:
|
|
case websocket.TextMessage:
|
|
|
fmt.Println(string(msg))
|
|
fmt.Println(string(msg))
|
|
|
- _ = r.wsConn.WriteMessage(mt, msg)
|
|
|
|
|
|
|
+ _ = r.wsQuoteConn.WriteMessage(mt, msg)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// FIXME: - 这里要判断是否有问题
|
|
// FIXME: - 这里要判断是否有问题
|
|
@@ -192,18 +200,18 @@ func (r *Client) readClientWsMessage() {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// writeClientWsMessage 由于websocket非线程安全,
|
|
|
|
|
|
|
+// writeClientWsQuoteMessage 由于websocket非线程安全,
|
|
|
// 所以由统一协程写入
|
|
// 所以由统一协程写入
|
|
|
-func (r *Client) writeClientWsMessage() {
|
|
|
|
|
|
|
+func (r *Client) writeClientWsQuoteMessage() {
|
|
|
// defer r.close()
|
|
// defer r.close()
|
|
|
for {
|
|
for {
|
|
|
select {
|
|
select {
|
|
|
- case buf := <-r.writeChan:
|
|
|
|
|
- err := r.wsConn.WriteMessage(websocket.BinaryMessage, buf)
|
|
|
|
|
|
|
+ case buf := <-r.quoteWriteChan:
|
|
|
|
|
+ err := r.wsQuoteConn.WriteMessage(websocket.BinaryMessage, buf)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
- case <-r.wsCloseChan: // 与终端连接关闭信息
|
|
|
|
|
|
|
+ case <-r.wsQuoteCloseChan: // 与终端连接关闭信息
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -221,7 +229,7 @@ func (r *Client) clientToQuoteAgentMsg(msg []byte) error {
|
|
|
if p.Length == 15 {
|
|
if p.Length == 15 {
|
|
|
// logger.Logger().Debugf("%v [%v c->s] %s", r.ClientIP, r.ProxyId, p.HeaderInfo())
|
|
// logger.Logger().Debugf("%v [%v c->s] %s", r.ClientIP, r.ProxyId, p.HeaderInfo())
|
|
|
// 将心跳发回给客户端
|
|
// 将心跳发回给客户端
|
|
|
- err = r.wsConn.WriteMessage(websocket.BinaryMessage, msg)
|
|
|
|
|
|
|
+ err = r.wsQuoteConn.WriteMessage(websocket.BinaryMessage, msg)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
@@ -230,6 +238,74 @@ func (r *Client) clientToQuoteAgentMsg(msg []byte) error {
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// **************** Trade WebSocket ****************
|
|
|
|
|
+
|
|
|
|
|
+func (r *Client) SetTradeWebSocket(ws *websocket.Conn) (err error) {
|
|
|
|
|
+ r.mtx.Lock()
|
|
|
|
|
+ defer r.mtx.Unlock()
|
|
|
|
|
+
|
|
|
|
|
+ if r.wsTradeConn != nil {
|
|
|
|
|
+ r.wsTradeConn.Close()
|
|
|
|
|
+ }
|
|
|
|
|
+ r.wsTradeConn = ws
|
|
|
|
|
+ r.tradeWriteChan = make(chan []byte, 100)
|
|
|
|
|
+
|
|
|
|
|
+ // 开始读取客户端发送信息
|
|
|
|
|
+ go r.readClientWsTradeMessage()
|
|
|
|
|
+ // 开始推送客户端信息循环
|
|
|
|
|
+ go r.writeClientWsTradeMessage()
|
|
|
|
|
+
|
|
|
|
|
+ return
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// readClientWsQuoteMessage 处理终端发过来的websocket数据
|
|
|
|
|
+// 注意: 阻塞式, 直到websocket关闭才退出
|
|
|
|
|
+func (r *Client) readClientWsTradeMessage() {
|
|
|
|
|
+ for {
|
|
|
|
|
+ // 40秒心跳超时
|
|
|
|
|
+ r.wsTradeConn.SetReadDeadline(time.Now().Add(40 * time.Second))
|
|
|
|
|
+
|
|
|
|
|
+ mt, msg, err := r.wsTradeConn.ReadMessage()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ fmt.Println(err)
|
|
|
|
|
+ break
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ switch mt {
|
|
|
|
|
+ case websocket.PingMessage:
|
|
|
|
|
+ _ = r.wsTradeConn.WriteMessage(mt, msg)
|
|
|
|
|
+ case websocket.CloseMessage:
|
|
|
|
|
+ return
|
|
|
|
|
+ case websocket.BinaryMessage:
|
|
|
|
|
+ if err := r.clientToQuoteAgentMsg(msg); err != nil {
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ case websocket.TextMessage:
|
|
|
|
|
+ fmt.Println(string(msg))
|
|
|
|
|
+ _ = r.wsTradeConn.WriteMessage(mt, msg)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// writeClientWsQuoteMessage 由于websocket非线程安全,
|
|
|
|
|
+// 所以由统一协程写入
|
|
|
|
|
+func (r *Client) writeClientWsTradeMessage() {
|
|
|
|
|
+ // defer r.close()
|
|
|
|
|
+ for {
|
|
|
|
|
+ select {
|
|
|
|
|
+ case buf := <-r.tradeWriteChan:
|
|
|
|
|
+ err := r.wsTradeConn.WriteMessage(websocket.BinaryMessage, buf)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ case <-r.wsTradeCloseChan: // 与终端连接关闭信息
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// **************** RabbitMQ ****************
|
|
|
|
|
+
|
|
|
// MQPacket 与总线交互的数据体
|
|
// MQPacket 与总线交互的数据体
|
|
|
type MQPacket struct {
|
|
type MQPacket struct {
|
|
|
FunCode uint32 // 功能码
|
|
FunCode uint32 // 功能码
|