package global import ( "encoding/binary" "errors" "fmt" "github.com/streadway/amqp" "go.uber.org/zap" ) // SubscribeInfo 订阅信息结构 type SubscribeInfo struct { Topic string QueueName string } // SubInfos 订阅信息数组 var SubInfos []SubscribeInfo // MsgProcesser 消息处理者接口定义 type MsgProcesser interface { Process(string, string, *[]byte) } // SubscribeTopic 订阅主题 func SubscribeTopic(topic string) (err error) { // 创建队列名称 queuename := fmt.Sprintf("mtp20_access_%s", topic) // 申明队列 if _, err = M2A_RABBITMQ.Channel.QueueDeclare(queuename, true, false, false, true, nil); err != nil { M2A_LOG.Error("rabbitmq declear queue failed, err:", zap.Error(err)) return } // 绑定队列 if err = M2A_RABBITMQ.Channel.QueueBind(queuename, topic, "entry", false, nil); err != nil { M2A_LOG.Error("rabbitmq bind queue failed, err:", zap.Error(err)) return } // 添加订阅信息 SubInfos = append(SubInfos, SubscribeInfo{Topic: topic, QueueName: queuename}) return } // Publish 发送消息 func Publish(topic string, msg *MQPacket) (err error) { if M2A_RABBITMQ == nil || M2A_RABBITMQ.Connection.IsClosed() { err = errors.New("rabbitmq is not connected") M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err)) return } // 组织发送数据 data := []byte{} funCode := make([]byte, 4) binary.LittleEndian.PutUint32(funCode, msg.FunCode) data = append(data, funCode...) sessionId := make([]byte, 4) binary.LittleEndian.PutUint32(sessionId, msg.SessionId) data = append(data, sessionId...) data = append(data, *msg.Data...) if err = M2A_RABBITMQ.Channel.Publish("entry", topic, false, false, amqp.Publishing{ ContentType: "text/plain", Body: data, }); err != nil { M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err)) return } return } // Receive 接收消息 func Receive(topic, queuename string, processer MsgProcesser) (err error) { if M2A_RABBITMQ == nil || M2A_RABBITMQ.Connection.IsClosed() { err = errors.New("rabbitmq is not connected") M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err)) return } msgList, err := M2A_RABBITMQ.Channel.Consume(queuename, "", false, false, false, false, nil) if err != nil { M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err)) return } go func() { for msg := range msgList { processer.Process(topic, queuename, &msg.Body) msg.Ack(false) } }() return }