mq.go 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package mq
  2. import (
  3. "errors"
  4. "fmt"
  5. "mtp20access/global"
  6. commonRequest "mtp20access/model/common/request"
  7. rsp "mtp20access/model/common/response"
  8. "mtp20access/model/mq/request"
  9. "mtp20access/model/mq/response"
  10. "time"
  11. "github.com/gin-gonic/gin"
  12. "go.uber.org/zap"
  13. )
  14. // SendMQ 向总线发送信息
  15. func SendMQ(c *gin.Context, req *request.MQBodyReq) (err error) {
  16. // 获取请求账号信息
  17. s, exists := c.Get("claims")
  18. if !exists {
  19. err = errors.New("获取请求账号信息异常")
  20. global.M2A_LOG.Error(err.Error(), zap.Error(err))
  21. return
  22. }
  23. claims := s.(*commonRequest.CustomClaims)
  24. // 获取登录账户信息
  25. client, exists := global.M2A_Clients[claims.SessionID]
  26. if !exists {
  27. err = errors.New("获取登录账户信息异常")
  28. global.M2A_LOG.Error(err.Error(), zap.Error(err))
  29. return
  30. }
  31. // 将客户端请求的数据转换成总线使用的Protobuf
  32. serialNumber := client.GetSerialNumber()
  33. bytes, err := req.GetProtoBytes(&serialNumber)
  34. if err != nil {
  35. return
  36. }
  37. if bytes == nil {
  38. err = errors.New("请求信息序列化失败")
  39. return
  40. }
  41. // 创建异步任务
  42. key := fmt.Sprintf("%v_%v_%v", client.SessionID, req.FunCodeRsp, serialNumber)
  43. // 银行服务相关的回复流水号是错误的,所以需要特殊处理
  44. if int(req.FunCodeReq) == global.T2bBankSignReq ||
  45. int(req.FunCodeReq) == global.T2bBankCancelSignReq ||
  46. int(req.FunCodeReq) == global.T2bBankWithdrawReq ||
  47. int(req.FunCodeReq) == global.T2bBankDepositReq {
  48. key = fmt.Sprintf("%v_%v", client.SessionID, req.FunCodeRsp)
  49. }
  50. asyncTask := global.AsyncTask{
  51. FuncodeRsp: req.FunCodeRsp,
  52. SerialNumber: serialNumber,
  53. Own: client,
  54. IsEncrypted: *req.IsEncrypted,
  55. }
  56. client.SetAsyncTask(&asyncTask, key)
  57. // 获取对应主题
  58. // topic, exists := global.M2A_FuncodeTopic[int(req.FunCodeReq)]
  59. topic := global.GetTopic(int(req.FunCodeReq))
  60. if topic == "" {
  61. global.M2A_LOG.Error(err.Error(), zap.Error(err))
  62. return
  63. }
  64. // 向总线发送业务信息
  65. packet := &global.MQPacket{
  66. FunCode: req.FunCodeReq,
  67. SessionId: uint32(claims.SessionID),
  68. Data: bytes,
  69. }
  70. go global.Publish(topic, packet)
  71. global.M2A_LOG.Info("[C->S]", zap.Any("req", req.FunCodeReq), zap.Any("SessionId", packet.SessionId), zap.Any("data", req.Data))
  72. // 阻塞线程等待总线回复或超时
  73. asyncTask.Rsp = make(chan response.MQBodyRsp)
  74. select {
  75. case r := <-asyncTask.Rsp: // 总线回复
  76. rsp.OkWithData(r, c)
  77. case <-time.After(time.Second * 30): // 超时
  78. // 删除对应异步任务
  79. client.DeleteAsyncTask(key)
  80. rsp.FailWithMessage("业务超时", c)
  81. }
  82. asyncTask.Finish()
  83. return
  84. }