rabbitmq.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package global
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "github.com/streadway/amqp"
  7. "go.uber.org/zap"
  8. )
  9. // SubscribeInfo 订阅信息结构
  10. type SubscribeInfo struct {
  11. Topic string
  12. QueueName string
  13. }
  14. // SubInfos 订阅信息数组
  15. var SubInfos []SubscribeInfo
  16. // MsgProcesser 消息处理者接口定义
  17. type MsgProcesser interface {
  18. Process(string, string, *[]byte)
  19. }
  20. // SubscribeTopic 订阅主题
  21. func SubscribeTopic(topic string) (err error) {
  22. // 创建队列名称
  23. queuename := fmt.Sprintf("mtp20_access_%s", topic)
  24. // 申明队列
  25. if _, err = M2A_RABBITMQ.Channel.QueueDeclare(queuename, true, false, false, true, nil); err != nil {
  26. M2A_LOG.Error("rabbitmq declear queue failed, err:", zap.Error(err))
  27. return
  28. }
  29. // 绑定队列
  30. if err = M2A_RABBITMQ.Channel.QueueBind(queuename, topic, "entry", false, nil); err != nil {
  31. M2A_LOG.Error("rabbitmq bind queue failed, err:", zap.Error(err))
  32. return
  33. }
  34. // 添加订阅信息
  35. SubInfos = append(SubInfos, SubscribeInfo{Topic: topic, QueueName: queuename})
  36. return
  37. }
  38. // Publish 发送消息
  39. func Publish(topic string, msg *MQPacket) (err error) {
  40. if M2A_RABBITMQ == nil || M2A_RABBITMQ.Connection.IsClosed() {
  41. err = errors.New("rabbitmq is not connected")
  42. M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err))
  43. return
  44. }
  45. // 组织发送数据
  46. data := []byte{}
  47. funCode := make([]byte, 4)
  48. binary.LittleEndian.PutUint32(funCode, msg.FunCode)
  49. data = append(data, funCode...)
  50. sessionId := make([]byte, 4)
  51. binary.LittleEndian.PutUint32(sessionId, msg.SessionId)
  52. data = append(data, sessionId...)
  53. data = append(data, *msg.Data...)
  54. if err = M2A_RABBITMQ.Channel.Publish("entry", topic, false, false, amqp.Publishing{
  55. ContentType: "text/plain",
  56. Body: data,
  57. }); err != nil {
  58. M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err))
  59. return
  60. }
  61. return
  62. }
  63. // Receive 接收消息
  64. func Receive(topic, queuename string, processer MsgProcesser) (err error) {
  65. if M2A_RABBITMQ == nil || M2A_RABBITMQ.Connection.IsClosed() {
  66. err = errors.New("rabbitmq is not connected")
  67. M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err))
  68. return
  69. }
  70. msgList, err := M2A_RABBITMQ.Channel.Consume(queuename, "", false, false, false, false, nil)
  71. if err != nil {
  72. M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err))
  73. return
  74. }
  75. go func() {
  76. for msg := range msgList {
  77. processer.Process(topic, queuename, &msg.Body)
  78. msg.Ack(false)
  79. }
  80. }()
  81. return
  82. }