mq.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package mq
  2. import (
  3. "errors"
  4. "fmt"
  5. "mtp20access/client"
  6. "mtp20access/global"
  7. commonRequest "mtp20access/model/common/request"
  8. rsp "mtp20access/model/common/response"
  9. "mtp20access/model/mq/request"
  10. "mtp20access/model/mq/response"
  11. "mtp20access/rabbitmq"
  12. "time"
  13. "github.com/gin-gonic/gin"
  14. "go.uber.org/zap"
  15. )
  16. // SendMQ 向总线发送信息
  17. func SendMQ(c *gin.Context, req *request.MQBodyReq) (err error) {
  18. // 获取请求账号信息
  19. s, exists := c.Get("claims")
  20. if !exists {
  21. err = errors.New("获取请求账号信息异常")
  22. global.M2A_LOG.Error(err.Error(), zap.Error(err))
  23. return
  24. }
  25. claims := s.(*commonRequest.CustomClaims)
  26. // 获取登录账户信息
  27. t, exists := client.Clients[claims.SessionID]
  28. if !exists {
  29. err = errors.New("获取登录账户信息异常")
  30. global.M2A_LOG.Error(err.Error(), zap.Any("SessionID", claims.SessionID), zap.Error(err))
  31. return
  32. }
  33. // 将客户端请求的数据转换成总线使用的Protobuf
  34. serialNumber := t.GetSerialNumber()
  35. bytes, err := req.GetProtoBytes(&serialNumber)
  36. if err != nil {
  37. return
  38. }
  39. if bytes == nil {
  40. err = errors.New("请求信息序列化失败")
  41. return
  42. }
  43. // 创建异步任务
  44. key := fmt.Sprintf("%v_%v_%v", t.SessionID, req.FunCodeRsp, serialNumber)
  45. global.M2A_LOG.Info("创建异步任务", zap.Any("SessionID", t.SessionID), zap.Any("serialNumber", serialNumber))
  46. // 银行服务相关的回复流水号是错误的,所以需要特殊处理
  47. if int(req.FunCodeReq) == global.T2bBankSignReq ||
  48. int(req.FunCodeReq) == global.T2bBankCancelSignReq ||
  49. int(req.FunCodeReq) == global.T2bBankWithdrawReq ||
  50. int(req.FunCodeReq) == global.T2bBankDepositReq {
  51. key = fmt.Sprintf("%v_%v", t.SessionID, req.FunCodeRsp)
  52. }
  53. asyncTask := client.AsyncTask{
  54. FuncodeRsp: req.FunCodeRsp,
  55. SerialNumber: serialNumber,
  56. Own: t,
  57. IsEncrypted: *req.IsEncrypted,
  58. }
  59. t.SetAsyncTask(&asyncTask, key)
  60. // 获取对应主题
  61. // topic, exists := global.M2A_FuncodeTopic[int(req.FunCodeReq)]
  62. topic := global.GetTopic(int(req.FunCodeReq))
  63. if topic == "" {
  64. global.M2A_LOG.Error(err.Error(), zap.Error(err))
  65. return
  66. }
  67. // 向总线发送业务信息
  68. packet := &client.MQPacket{
  69. FunCode: req.FunCodeReq,
  70. SessionId: uint32(claims.SessionID),
  71. Data: bytes,
  72. }
  73. go rabbitmq.Publish(topic, packet)
  74. global.M2A_LOG.Info("[C->S]", zap.Any("req", req.FunCodeReq), zap.Any("SessionId", packet.SessionId), zap.Any("data", req.Data))
  75. // 阻塞线程等待总线回复或超时
  76. // 这里make时没有指定长度,所以是发送者与接收者的同步操作,发送者会阻塞,所以一般会在下面使用超时来解锁。
  77. asyncTask.Rsp = make(chan response.MQBodyRsp)
  78. select {
  79. case r := <-asyncTask.Rsp: // 总线回复
  80. rsp.OkWithData(r, c)
  81. case <-time.After(time.Second * 30): // 超时
  82. // 删除对应异步任务
  83. t.DeleteAsyncTask(key)
  84. rsp.FailWithMessage("业务超时", c)
  85. }
  86. asyncTask.Finish()
  87. return
  88. }