rabbitmq.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package core
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "mtp20_assisted/global"
  7. "github.com/streadway/amqp"
  8. "go.uber.org/zap"
  9. )
  10. // RabbitMQ 连接总线
  11. func RabbitMQ() *global.RabbitMQ {
  12. url := global.M2A_CONFIG.Rabbitmq.Url
  13. connection, err := amqp.Dial(url)
  14. if err != nil {
  15. global.M2A_LOG.Error("rabbitmq connect failed, err:", zap.Error(err))
  16. return nil
  17. }
  18. channel, err := connection.Channel()
  19. if err != nil {
  20. global.M2A_LOG.Error("rabbitmq open channel failed, err:", zap.Error(err))
  21. return nil
  22. }
  23. global.M2A_LOG.Info("rabbitmq connect successed.")
  24. return &global.RabbitMQ{
  25. Connection: connection,
  26. Channel: channel,
  27. }
  28. }
  29. // Publish 发送消息
  30. func Publish(topic string, msg *global.MQPacket) (err error) {
  31. if global.M2A_RABBITMQ == nil || global.M2A_RABBITMQ.Connection.IsClosed() {
  32. err = errors.New("rabbitmq is not connected")
  33. global.M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err))
  34. return
  35. }
  36. // 组织发送数据
  37. data := []byte{}
  38. funCode := make([]byte, 4)
  39. binary.LittleEndian.PutUint32(funCode, msg.FunCode)
  40. data = append(data, funCode...)
  41. sessionId := make([]byte, 4)
  42. binary.LittleEndian.PutUint32(sessionId, msg.SessionId)
  43. data = append(data, sessionId...)
  44. data = append(data, *msg.Data...)
  45. if err = global.M2A_RABBITMQ.Channel.Publish("entry", topic, false, false, amqp.Publishing{
  46. ContentType: "text/plain",
  47. Body: data,
  48. }); err != nil {
  49. global.M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err))
  50. return
  51. }
  52. return
  53. }
  54. // Receive 接收消息
  55. func Receive(topic string, processer global.MsgProcesser) (err error) {
  56. // 队列名称
  57. queuename := fmt.Sprintf("mtp20_access_%s", topic)
  58. if global.M2A_RABBITMQ == nil || global.M2A_RABBITMQ.Connection.IsClosed() {
  59. err = errors.New("rabbitmq is not connected")
  60. global.M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err))
  61. return
  62. }
  63. msgList, err := global.M2A_RABBITMQ.Channel.Consume(queuename, "", false, false, false, false, nil)
  64. if err != nil {
  65. global.M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err))
  66. return
  67. }
  68. go func() {
  69. for msg := range msgList {
  70. processer.Process(&msg.Body)
  71. msg.Ack(false)
  72. }
  73. }()
  74. return
  75. }