common.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package service
  2. import (
  3. "fmt"
  4. "mtp20_assisted/core"
  5. "mtp20_assisted/global"
  6. "go.uber.org/zap"
  7. )
  8. // InitRabbitMQMsgPreccesser 构建总线信息处理器
  9. func InitRabbitMQMsgPreccesser() (err error) {
  10. global.M2A_MSGPROCESSER = make(map[string]global.MsgProcesser)
  11. // 铁合金服务通知
  12. // t := THJNtf{}
  13. // global.M2A_MSGPROCESSER[global.TOPIC_THJ_NTF] = &t
  14. t := new(THJNtf)
  15. global.M2A_MSGPROCESSER[global.TOPIC_THJ_NTF] = t
  16. return
  17. }
  18. // StartRabbitMQSubscribe 开始向总线创建和绑定队列
  19. func StartRabbitMQSubscribe() (err error) {
  20. // 订阅需要的总线主题
  21. if err = subscribeTopic(global.TOPIC_THJ_NTF); err != nil {
  22. global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
  23. return
  24. }
  25. global.M2A_LOG.Info("rabbitmq subscribe topic success.")
  26. return
  27. }
  28. // StartRabbitMQReceive 开始接收总线消息
  29. func StartRabbitMQReceive() {
  30. go func() {
  31. for topic, processer := range global.M2A_MSGPROCESSER {
  32. core.Receive(topic, processer)
  33. }
  34. }()
  35. global.M2A_LOG.Info("rabbitmq start receiving messages.")
  36. }
  37. // subscribeTopic 订阅主题
  38. func subscribeTopic(topic string) (err error) {
  39. // 创建队列名称
  40. queuename := fmt.Sprintf("mtp20_assisted_%s", topic)
  41. // 申明队列
  42. if _, err = global.M2A_RABBITMQ.Channel.QueueDeclare(queuename, true, false, false, true, nil); err != nil {
  43. global.M2A_LOG.Error("rabbitmq declear queue failed, err:", zap.Error(err))
  44. return
  45. }
  46. // 绑定队列
  47. if err = global.M2A_RABBITMQ.Channel.QueueBind(queuename, topic, "entry", false, nil); err != nil {
  48. global.M2A_LOG.Error("rabbitmq bind queue failed, err:", zap.Error(err))
  49. return
  50. }
  51. return
  52. }