mq.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package mq
  2. import (
  3. "errors"
  4. "fmt"
  5. "mtp20access/global"
  6. commonRequest "mtp20access/model/common/request"
  7. rsp "mtp20access/model/common/response"
  8. "mtp20access/model/mq/request"
  9. "mtp20access/model/mq/response"
  10. "time"
  11. "github.com/gin-gonic/gin"
  12. "go.uber.org/zap"
  13. )
  14. // SendMQ 向总线发送信息
  15. func SendMQ(c *gin.Context, req *request.MQBodyReq) (err error) {
  16. // 获取请求账号信息
  17. s, exists := c.Get("claims")
  18. if !exists {
  19. err = errors.New("获取请求账号信息异常")
  20. global.M2A_LOG.Error(err.Error(), zap.Error(err))
  21. return
  22. }
  23. claims := s.(*commonRequest.CustomClaims)
  24. // 获取登录账户信息
  25. client, exists := global.M2A_Clients[claims.SessionID]
  26. if !exists {
  27. err = errors.New("获取登录账户信息异常")
  28. global.M2A_LOG.Error(err.Error(), zap.Error(err))
  29. return
  30. }
  31. // 将客户端请求的数据转换成总线使用的Protobuf
  32. serialNumber := client.GetSerialNumber()
  33. bytes, err := req.GetProtoBytes(&serialNumber)
  34. if err != nil {
  35. return
  36. }
  37. if bytes == nil {
  38. err = errors.New("请求信息序列化失败")
  39. return
  40. }
  41. // 创建异步任务
  42. key := fmt.Sprintf("%v_%v_%v", client.SessionID, req.FunCodeRsp, serialNumber)
  43. // 银行服务相关的回复流水号是错误的,所以需要特殊处理
  44. if int(req.FunCodeReq) == global.T2bBankSignReq ||
  45. int(req.FunCodeReq) == global.T2bBankCancelSignReq ||
  46. int(req.FunCodeReq) == global.T2bBankWithdrawReq ||
  47. int(req.FunCodeReq) == global.T2bBankDepositReq {
  48. key = fmt.Sprintf("%v_%v", client.SessionID, req.FunCodeRsp)
  49. }
  50. asyncTask := global.AsyncTask{
  51. FuncodeRsp: req.FunCodeRsp,
  52. SerialNumber: serialNumber,
  53. Own: client,
  54. IsEncrypted: *req.IsEncrypted,
  55. }
  56. client.SetAsyncTask(&asyncTask, key)
  57. // 获取对应主题
  58. // topic, exists := global.M2A_FuncodeTopic[int(req.FunCodeReq)]
  59. topic := global.GetTopic(int(req.FunCodeReq))
  60. if topic == "" {
  61. global.M2A_LOG.Error(err.Error(), zap.Error(err))
  62. return
  63. }
  64. // 向总线发送业务信息
  65. packet := &global.MQPacket{
  66. FunCode: req.FunCodeReq,
  67. SessionId: uint32(claims.SessionID),
  68. Data: bytes,
  69. }
  70. go global.Publish(topic, packet)
  71. // 阻塞线程等待总线回复或超时
  72. asyncTask.Rsp = make(chan response.MQBodyRsp)
  73. select {
  74. case r := <-asyncTask.Rsp: // 总线回复
  75. rsp.OkWithData(r, c)
  76. case <-time.After(time.Second * 30): // 超时
  77. // 删除对应异步任务
  78. client.DeleteAsyncTask(key)
  79. rsp.FailWithMessage("业务超时", c)
  80. }
  81. asyncTask.Finish()
  82. return
  83. }