| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- package core
- import (
- "encoding/binary"
- "errors"
- "fmt"
- "mtp20_assisted/global"
- "github.com/streadway/amqp"
- "go.uber.org/zap"
- )
- // MQPacket 与总线交互的数据体
- type MQPacket struct {
- FunCode uint32 // 功能码
- SessionId uint32 // 数据包的sid
- Data *[]byte // 业务数据体
- }
- // SubscribeInfo 订阅信息结构
- type SubscribeInfo struct {
- Topic string
- QueueName string
- }
- // SubInfos 订阅信息数组
- var SubInfos []SubscribeInfo
- // MsgProcesser 消息处理者接口定义
- type MsgProcesser interface {
- Process(string, string, *[]byte)
- }
- func RabbitMQ() *global.RabbitMQ {
- url := global.M2A_CONFIG.Rabbitmq.Url
- connection, err := amqp.Dial(url)
- if err != nil {
- global.M2A_LOG.Error("rabbitmq connect failed, err:", zap.Error(err))
- return nil
- }
- channel, err := connection.Channel()
- if err != nil {
- global.M2A_LOG.Error("rabbitmq open channel failed, err:", zap.Error(err))
- return nil
- }
- global.M2A_LOG.Info("rabbitmq connect successed.")
- return &global.RabbitMQ{
- Connection: connection,
- Channel: channel,
- }
- }
- // SubscribeTopic 订阅主题
- func SubscribeTopic(topic string) (err error) {
- // 创建队列名称
- queuename := fmt.Sprintf("mtp20_access_%s", topic)
- // 申明队列
- if _, err = global.M2A_RABBITMQ.Channel.QueueDeclare(queuename, true, false, false, true, nil); err != nil {
- global.M2A_LOG.Error("rabbitmq declear queue failed, err:", zap.Error(err))
- return
- }
- // 绑定队列
- if err = global.M2A_RABBITMQ.Channel.QueueBind(queuename, topic, "entry", false, nil); err != nil {
- global.M2A_LOG.Error("rabbitmq bind queue failed, err:", zap.Error(err))
- return
- }
- // 添加订阅信息
- SubInfos = append(SubInfos, SubscribeInfo{Topic: topic, QueueName: queuename})
- return
- }
- // Publish 发送消息
- func Publish(topic string, msg *MQPacket) (err error) {
- if global.M2A_RABBITMQ == nil || global.M2A_RABBITMQ.Connection.IsClosed() {
- err = errors.New("rabbitmq is not connected")
- global.M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err))
- return
- }
- // 组织发送数据
- data := []byte{}
- funCode := make([]byte, 4)
- binary.LittleEndian.PutUint32(funCode, msg.FunCode)
- data = append(data, funCode...)
- sessionId := make([]byte, 4)
- binary.LittleEndian.PutUint32(sessionId, msg.SessionId)
- data = append(data, sessionId...)
- data = append(data, *msg.Data...)
- if err = global.M2A_RABBITMQ.Channel.Publish("entry", topic, false, false, amqp.Publishing{
- ContentType: "text/plain",
- Body: data,
- }); err != nil {
- global.M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err))
- return
- }
- return
- }
- // Receive 接收消息
- func Receive(topic, queuename string, processer MsgProcesser) (err error) {
- if global.M2A_RABBITMQ == nil || global.M2A_RABBITMQ.Connection.IsClosed() {
- err = errors.New("rabbitmq is not connected")
- global.M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err))
- return
- }
- msgList, err := global.M2A_RABBITMQ.Channel.Consume(queuename, "", false, false, false, false, nil)
- if err != nil {
- global.M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err))
- return
- }
- go func() {
- for msg := range msgList {
- processer.Process(topic, queuename, &msg.Body)
- msg.Ack(false)
- }
- }()
- return
- }
|