common.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package service
  2. import (
  3. "fmt"
  4. "mtp20_assisted/core"
  5. "mtp20_assisted/global"
  6. "go.uber.org/zap"
  7. )
  8. // InitRabbitMQMsgPreccesser 构建总线信息处理器
  9. func InitRabbitMQMsgPreccesser() (err error) {
  10. global.M2A_MSGPROCESSER = make(map[string]global.MsgProcesser)
  11. // 铁合金服务通知
  12. // t := THJNtf{}
  13. // global.M2A_MSGPROCESSER[global.TOPIC_THJ_NTF] = &t
  14. t := new(THJNtf)
  15. global.M2A_MSGPROCESSER[global.TOPIC_THJ_NTF] = t
  16. // 铁合金服务通知(管理端专用)
  17. t1 := new(THJNtf)
  18. global.M2A_MSGPROCESSER[global.TOPIC_MANAGER_THJ_NTF] = t1
  19. return
  20. }
  21. // StartRabbitMQSubscribe 开始向总线创建和绑定队列
  22. func StartRabbitMQSubscribe() (err error) {
  23. // 订阅需要的总线主题
  24. // if err = subscribeTopic(global.TOPIC_THJ_NTF); err != nil {
  25. // global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
  26. // return
  27. // }
  28. // 订阅需要的总线主题
  29. for key := range global.M2A_MSGPROCESSER {
  30. if err = subscribeTopic(key); err != nil {
  31. global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
  32. return
  33. }
  34. }
  35. global.M2A_LOG.Info("rabbitmq subscribe topic success.")
  36. return
  37. }
  38. // StartRabbitMQReceive 开始接收总线消息
  39. func StartRabbitMQReceive() {
  40. go func() {
  41. for topic, processer := range global.M2A_MSGPROCESSER {
  42. core.Receive(topic, processer)
  43. }
  44. }()
  45. global.M2A_LOG.Info("rabbitmq start receiving messages.")
  46. }
  47. // subscribeTopic 订阅主题
  48. func subscribeTopic(topic string) (err error) {
  49. // 创建队列名称
  50. queuename := fmt.Sprintf("mtp20_assisted_%s", topic)
  51. // 申明队列
  52. if _, err = global.M2A_RABBITMQ.Channel.QueueDeclare(queuename, true, false, false, true, nil); err != nil {
  53. global.M2A_LOG.Error("rabbitmq declear queue failed, err:", zap.Error(err))
  54. return
  55. }
  56. // 绑定队列
  57. if err = global.M2A_RABBITMQ.Channel.QueueBind(queuename, topic, "entry", false, nil); err != nil {
  58. global.M2A_LOG.Error("rabbitmq bind queue failed, err:", zap.Error(err))
  59. return
  60. }
  61. return
  62. }