package core import ( "encoding/binary" "errors" "fmt" "mtp20_assisted/global" "github.com/streadway/amqp" "go.uber.org/zap" ) // RabbitMQ 连接总线 func RabbitMQ() *global.RabbitMQ { url := global.M2A_CONFIG.Rabbitmq.Url connection, err := amqp.Dial(url) if err != nil { global.M2A_LOG.Error("rabbitmq connect failed, err:", zap.Error(err)) return nil } channel, err := connection.Channel() if err != nil { global.M2A_LOG.Error("rabbitmq open channel failed, err:", zap.Error(err)) return nil } global.M2A_LOG.Info("rabbitmq connect successed.") return &global.RabbitMQ{ Connection: connection, Channel: channel, } } // Publish 发送消息 func Publish(topic string, msg *global.MQPacket) (err error) { if global.M2A_RABBITMQ == nil || global.M2A_RABBITMQ.Connection.IsClosed() { err = errors.New("rabbitmq is not connected") global.M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err)) return } // 组织发送数据 data := []byte{} funCode := make([]byte, 4) binary.LittleEndian.PutUint32(funCode, msg.FunCode) data = append(data, funCode...) sessionId := make([]byte, 4) binary.LittleEndian.PutUint32(sessionId, msg.SessionId) data = append(data, sessionId...) data = append(data, *msg.Data...) if err = global.M2A_RABBITMQ.Channel.Publish("entry", topic, false, false, amqp.Publishing{ ContentType: "text/plain", Body: data, }); err != nil { global.M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err)) return } return } // Receive 接收消息 func Receive(topic string, processer global.MsgProcesser) (err error) { // 队列名称 queuename := fmt.Sprintf("mtp20_assisted_%s", topic) if global.M2A_RABBITMQ == nil || global.M2A_RABBITMQ.Connection.IsClosed() { err = errors.New("rabbitmq is not connected") global.M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err)) return } msgList, err := global.M2A_RABBITMQ.Channel.Consume(queuename, "", false, false, false, false, nil) if err != nil { global.M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err)) return } go func() { for msg := range msgList { processer.Process(&msg.Body) // msg.Ack(false) } }() return }