mq.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. package mq
  2. import (
  3. "errors"
  4. "fmt"
  5. "mtp20access/global"
  6. commonRequest "mtp20access/model/common/request"
  7. rsp "mtp20access/model/common/response"
  8. "mtp20access/model/mq/request"
  9. "mtp20access/model/mq/response"
  10. "time"
  11. "github.com/gin-gonic/gin"
  12. "go.uber.org/zap"
  13. )
  14. // SendMQ 向总线发送信息
  15. func SendMQ(c *gin.Context, req *request.MQBodyReq) (err error) {
  16. // 获取请求账号信息
  17. s, exists := c.Get("claims")
  18. if !exists {
  19. err = errors.New("获取请求账号信息异常")
  20. global.M2A_LOG.Error(err.Error(), zap.Error(err))
  21. return
  22. }
  23. claims := s.(*commonRequest.CustomClaims)
  24. // 获取登录账户信息
  25. client, exists := global.M2A_Clients[claims.SessionID]
  26. if !exists {
  27. err = errors.New("获取登录账户信息异常")
  28. global.M2A_LOG.Error(err.Error(), zap.Error(err))
  29. return
  30. }
  31. // 将客户端请求的数据转换成总线使用的Protobuf
  32. serialNumber := client.GetSerialNumber()
  33. bytes, err := req.GetProtoBytes(&serialNumber)
  34. if err != nil {
  35. return
  36. }
  37. // 创建异步任务
  38. key := fmt.Sprintf("%v_%v_%v", client.SessionID, req.FunCodeRsp, serialNumber)
  39. asyncTask := global.AsyncTask{
  40. FuncodeRsp: req.FunCodeRsp,
  41. SerialNumber: serialNumber,
  42. Own: client,
  43. IsEncrypted: *req.IsEncrypted,
  44. }
  45. client.SetAsyncTask(&asyncTask, key)
  46. // 获取对应主题
  47. // topic, exists := global.M2A_FuncodeTopic[int(req.FunCodeReq)]
  48. topic := global.GetTopic(int(req.FunCodeReq))
  49. if topic == "" {
  50. global.M2A_LOG.Error(err.Error(), zap.Error(err))
  51. return
  52. }
  53. // 向总线发送业务信息
  54. packet := &global.MQPacket{
  55. FunCode: req.FunCodeReq,
  56. SessionId: uint32(claims.SessionID),
  57. Data: bytes,
  58. }
  59. go global.Publish(topic, packet)
  60. // 阻塞线程等待总线回复或超时
  61. asyncTask.Rsp = make(chan response.MQBodyRsp)
  62. select {
  63. case r := <-asyncTask.Rsp: // 总线回复
  64. rsp.OkWithData(r, c)
  65. case <-time.After(time.Second * 30): // 超时
  66. rsp.FailWithMessage("业务超时", c)
  67. }
  68. asyncTask.Finish()
  69. return
  70. }