rabbitmq.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package core
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "mtp20_assisted/global"
  7. "github.com/streadway/amqp"
  8. "go.uber.org/zap"
  9. )
  10. // MQPacket 与总线交互的数据体
  11. type MQPacket struct {
  12. FunCode uint32 // 功能码
  13. SessionId uint32 // 数据包的sid
  14. Data *[]byte // 业务数据体
  15. }
  16. // SubscribeInfo 订阅信息结构
  17. type SubscribeInfo struct {
  18. Topic string
  19. QueueName string
  20. }
  21. // SubInfos 订阅信息数组
  22. var SubInfos []SubscribeInfo
  23. // MsgProcesser 消息处理者接口定义
  24. type MsgProcesser interface {
  25. Process(string, string, *[]byte)
  26. }
  27. func RabbitMQ() *global.RabbitMQ {
  28. url := global.M2A_CONFIG.Rabbitmq.Url
  29. connection, err := amqp.Dial(url)
  30. if err != nil {
  31. global.M2A_LOG.Error("rabbitmq connect failed, err:", zap.Error(err))
  32. return nil
  33. }
  34. channel, err := connection.Channel()
  35. if err != nil {
  36. global.M2A_LOG.Error("rabbitmq open channel failed, err:", zap.Error(err))
  37. return nil
  38. }
  39. global.M2A_LOG.Info("rabbitmq connect successed.")
  40. return &global.RabbitMQ{
  41. Connection: connection,
  42. Channel: channel,
  43. }
  44. }
  45. // SubscribeTopic 订阅主题
  46. func SubscribeTopic(topic string) (err error) {
  47. // 创建队列名称
  48. queuename := fmt.Sprintf("mtp20_access_%s", topic)
  49. // 申明队列
  50. if _, err = global.M2A_RABBITMQ.Channel.QueueDeclare(queuename, true, false, false, true, nil); err != nil {
  51. global.M2A_LOG.Error("rabbitmq declear queue failed, err:", zap.Error(err))
  52. return
  53. }
  54. // 绑定队列
  55. if err = global.M2A_RABBITMQ.Channel.QueueBind(queuename, topic, "entry", false, nil); err != nil {
  56. global.M2A_LOG.Error("rabbitmq bind queue failed, err:", zap.Error(err))
  57. return
  58. }
  59. // 添加订阅信息
  60. SubInfos = append(SubInfos, SubscribeInfo{Topic: topic, QueueName: queuename})
  61. return
  62. }
  63. // Publish 发送消息
  64. func Publish(topic string, msg *MQPacket) (err error) {
  65. if global.M2A_RABBITMQ == nil || global.M2A_RABBITMQ.Connection.IsClosed() {
  66. err = errors.New("rabbitmq is not connected")
  67. global.M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err))
  68. return
  69. }
  70. // 组织发送数据
  71. data := []byte{}
  72. funCode := make([]byte, 4)
  73. binary.LittleEndian.PutUint32(funCode, msg.FunCode)
  74. data = append(data, funCode...)
  75. sessionId := make([]byte, 4)
  76. binary.LittleEndian.PutUint32(sessionId, msg.SessionId)
  77. data = append(data, sessionId...)
  78. data = append(data, *msg.Data...)
  79. if err = global.M2A_RABBITMQ.Channel.Publish("entry", topic, false, false, amqp.Publishing{
  80. ContentType: "text/plain",
  81. Body: data,
  82. }); err != nil {
  83. global.M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err))
  84. return
  85. }
  86. return
  87. }
  88. // Receive 接收消息
  89. func Receive(topic, queuename string, processer MsgProcesser) (err error) {
  90. if global.M2A_RABBITMQ == nil || global.M2A_RABBITMQ.Connection.IsClosed() {
  91. err = errors.New("rabbitmq is not connected")
  92. global.M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err))
  93. return
  94. }
  95. msgList, err := global.M2A_RABBITMQ.Channel.Consume(queuename, "", false, false, false, false, nil)
  96. if err != nil {
  97. global.M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err))
  98. return
  99. }
  100. go func() {
  101. for msg := range msgList {
  102. processer.Process(topic, queuename, &msg.Body)
  103. msg.Ack(false)
  104. }
  105. }()
  106. return
  107. }