| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- package core
- import (
- "encoding/binary"
- "errors"
- "fmt"
- "mtp20_assisted/global"
- "github.com/streadway/amqp"
- "go.uber.org/zap"
- )
- // RabbitMQ 连接总线
- func RabbitMQ() *global.RabbitMQ {
- url := global.M2A_CONFIG.Rabbitmq.Url
- connection, err := amqp.Dial(url)
- if err != nil {
- global.M2A_LOG.Error("rabbitmq connect failed, err:", zap.Error(err))
- return nil
- }
- channel, err := connection.Channel()
- if err != nil {
- global.M2A_LOG.Error("rabbitmq open channel failed, err:", zap.Error(err))
- return nil
- }
- global.M2A_LOG.Info("rabbitmq connect successed.")
- return &global.RabbitMQ{
- Connection: connection,
- Channel: channel,
- }
- }
- // Publish 发送消息
- func Publish(topic string, msg *global.MQPacket) (err error) {
- if global.M2A_RABBITMQ == nil || global.M2A_RABBITMQ.Connection.IsClosed() {
- err = errors.New("rabbitmq is not connected")
- global.M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err))
- return
- }
- // 组织发送数据
- data := []byte{}
- funCode := make([]byte, 4)
- binary.LittleEndian.PutUint32(funCode, msg.FunCode)
- data = append(data, funCode...)
- sessionId := make([]byte, 4)
- binary.LittleEndian.PutUint32(sessionId, msg.SessionId)
- data = append(data, sessionId...)
- data = append(data, *msg.Data...)
- if err = global.M2A_RABBITMQ.Channel.Publish("entry", topic, false, false, amqp.Publishing{
- ContentType: "text/plain",
- Body: data,
- }); err != nil {
- global.M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err))
- return
- }
- return
- }
- // Receive 接收消息
- func Receive(topic string, processer global.MsgProcesser) (err error) {
- // 队列名称
- queuename := fmt.Sprintf("mtp20_assisted_%s", topic)
- if global.M2A_RABBITMQ == nil || global.M2A_RABBITMQ.Connection.IsClosed() {
- err = errors.New("rabbitmq is not connected")
- global.M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err))
- return
- }
- msgList, err := global.M2A_RABBITMQ.Channel.Consume(queuename, "", false, false, false, false, nil)
- if err != nil {
- global.M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err))
- return
- }
- go func() {
- for msg := range msgList {
- processer.Process(&msg.Body)
- msg.Ack(false)
- }
- }()
- return
- }
|