rabbitmq.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package rabbitmq
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "mtp20access/client"
  7. "mtp20access/global"
  8. "github.com/streadway/amqp"
  9. "go.uber.org/zap"
  10. )
  11. // SubscribeInfo 订阅信息结构
  12. type SubscribeInfo struct {
  13. Topic string
  14. QueueName string
  15. }
  16. // SubInfos 订阅信息数组
  17. var SubInfos []SubscribeInfo
  18. // MsgProcesser 消息处理者接口定义
  19. type MsgProcesser interface {
  20. Process(string, string, *[]byte)
  21. }
  22. // SubscribeTopic 订阅主题
  23. func SubscribeTopic(topic string) (err error) {
  24. // 创建队列名称
  25. queuename := fmt.Sprintf("mtp20_access_%s", topic)
  26. // 申明队列
  27. if _, err = global.M2A_RABBITMQ.Channel.QueueDeclare(queuename, true, false, false, true, nil); err != nil {
  28. global.M2A_LOG.Error("rabbitmq declear queue failed, err:", zap.Error(err))
  29. return
  30. }
  31. // 绑定队列
  32. if err = global.M2A_RABBITMQ.Channel.QueueBind(queuename, topic, "entry", false, nil); err != nil {
  33. global.M2A_LOG.Error("rabbitmq bind queue failed, err:", zap.Error(err))
  34. return
  35. }
  36. // 添加订阅信息
  37. SubInfos = append(SubInfos, SubscribeInfo{Topic: topic, QueueName: queuename})
  38. return
  39. }
  40. // Publish 发送消息
  41. func Publish(topic string, msg *client.MQPacket) (err error) {
  42. if global.M2A_RABBITMQ == nil || global.M2A_RABBITMQ.Connection.IsClosed() {
  43. err = errors.New("rabbitmq is not connected")
  44. global.M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err))
  45. return
  46. }
  47. // 组织发送数据
  48. data := []byte{}
  49. funCode := make([]byte, 4)
  50. binary.LittleEndian.PutUint32(funCode, msg.FunCode)
  51. data = append(data, funCode...)
  52. sessionId := make([]byte, 4)
  53. binary.LittleEndian.PutUint32(sessionId, msg.SessionId)
  54. data = append(data, sessionId...)
  55. data = append(data, *msg.Data...)
  56. if err = global.M2A_RABBITMQ.Channel.Publish("entry", topic, false, false, amqp.Publishing{
  57. ContentType: "text/plain",
  58. Body: data,
  59. }); err != nil {
  60. global.M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err))
  61. return
  62. }
  63. return
  64. }
  65. // Receive 接收消息
  66. func Receive(topic, queuename string, processer MsgProcesser) (err error) {
  67. if global.M2A_RABBITMQ == nil || global.M2A_RABBITMQ.Connection.IsClosed() {
  68. err = errors.New("rabbitmq is not connected")
  69. global.M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err))
  70. return
  71. }
  72. msgList, err := global.M2A_RABBITMQ.Channel.Consume(queuename, "", false, false, false, false, nil)
  73. if err != nil {
  74. global.M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err))
  75. return
  76. }
  77. go func() {
  78. for msg := range msgList {
  79. processer.Process(topic, queuename, &msg.Body)
  80. msg.Ack(false)
  81. }
  82. }()
  83. return
  84. }