mq.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package mq
  2. import (
  3. "errors"
  4. "fmt"
  5. "mtp20access/global"
  6. commonRequest "mtp20access/model/common/request"
  7. rsp "mtp20access/model/common/response"
  8. "mtp20access/model/mq/request"
  9. "mtp20access/model/mq/response"
  10. "time"
  11. "github.com/gin-gonic/gin"
  12. "go.uber.org/zap"
  13. )
  14. // SendMQ 向总线发送信息
  15. func SendMQ(c *gin.Context, req *request.MQBodyReq) (err error) {
  16. // 获取请求账号信息
  17. s, exists := c.Get("claims")
  18. if !exists {
  19. err = errors.New("获取请求账号信息异常")
  20. global.M2A_LOG.Error(err.Error(), zap.Error(err))
  21. return
  22. }
  23. claims := s.(*commonRequest.CustomClaims)
  24. // 获取登录账户信息
  25. client, exists := global.M2A_Clients[claims.SessionID]
  26. if !exists {
  27. err = errors.New("获取登录账户信息异常")
  28. global.M2A_LOG.Error(err.Error(), zap.Error(err))
  29. return
  30. }
  31. // 将客户端请求的数据转换成总线使用的Protobuf
  32. serialNumber := client.GetSerialNumber()
  33. bytes, err := req.GetProtoBytes(&serialNumber)
  34. if err != nil {
  35. return
  36. }
  37. if bytes == nil {
  38. err = errors.New("请求信息序列化失败")
  39. return
  40. }
  41. // 创建异步任务
  42. key := fmt.Sprintf("%v_%v_%v", client.SessionID, req.FunCodeRsp, serialNumber)
  43. asyncTask := global.AsyncTask{
  44. FuncodeRsp: req.FunCodeRsp,
  45. SerialNumber: serialNumber,
  46. Own: client,
  47. IsEncrypted: *req.IsEncrypted,
  48. }
  49. client.SetAsyncTask(&asyncTask, key)
  50. // 获取对应主题
  51. // topic, exists := global.M2A_FuncodeTopic[int(req.FunCodeReq)]
  52. topic := global.GetTopic(int(req.FunCodeReq))
  53. if topic == "" {
  54. global.M2A_LOG.Error(err.Error(), zap.Error(err))
  55. return
  56. }
  57. // 向总线发送业务信息
  58. packet := &global.MQPacket{
  59. FunCode: req.FunCodeReq,
  60. SessionId: uint32(claims.SessionID),
  61. Data: bytes,
  62. }
  63. go global.Publish(topic, packet)
  64. // 阻塞线程等待总线回复或超时
  65. asyncTask.Rsp = make(chan response.MQBodyRsp)
  66. select {
  67. case r := <-asyncTask.Rsp: // 总线回复
  68. rsp.OkWithData(r, c)
  69. case <-time.After(time.Second * 30): // 超时
  70. rsp.FailWithMessage("业务超时", c)
  71. }
  72. asyncTask.Finish()
  73. return
  74. }