| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- package rabbitmq
- import (
- "encoding/binary"
- "errors"
- "fmt"
- "mtp20access/client"
- "mtp20access/global"
- "github.com/streadway/amqp"
- "go.uber.org/zap"
- )
- // SubscribeInfo 订阅信息结构
- type SubscribeInfo struct {
- Topic string
- QueueName string
- }
- // SubInfos 订阅信息数组
- var SubInfos []SubscribeInfo
- // MsgProcesser 消息处理者接口定义
- type MsgProcesser interface {
- Process(string, string, *[]byte)
- }
- // SubscribeTopic 订阅主题
- func SubscribeTopic(topic string) (err error) {
- // 创建队列名称
- queuename := fmt.Sprintf("mtp20_access_%s", topic)
- // 申明队列
- if _, err = global.M2A_RABBITMQ.Channel.QueueDeclare(queuename, true, false, false, true, nil); err != nil {
- global.M2A_LOG.Error("rabbitmq declear queue failed, err:", zap.Error(err))
- return
- }
- // 绑定队列
- if err = global.M2A_RABBITMQ.Channel.QueueBind(queuename, topic, "entry", false, nil); err != nil {
- global.M2A_LOG.Error("rabbitmq bind queue failed, err:", zap.Error(err))
- return
- }
- // 添加订阅信息
- SubInfos = append(SubInfos, SubscribeInfo{Topic: topic, QueueName: queuename})
- return
- }
- // Publish 发送消息
- func Publish(topic string, msg *client.MQPacket) (err error) {
- if global.M2A_RABBITMQ == nil || global.M2A_RABBITMQ.Connection.IsClosed() {
- err = errors.New("rabbitmq is not connected")
- global.M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err))
- return
- }
- // 组织发送数据
- data := []byte{}
- funCode := make([]byte, 4)
- binary.LittleEndian.PutUint32(funCode, msg.FunCode)
- data = append(data, funCode...)
- sessionId := make([]byte, 4)
- binary.LittleEndian.PutUint32(sessionId, msg.SessionId)
- data = append(data, sessionId...)
- data = append(data, *msg.Data...)
- if err = global.M2A_RABBITMQ.Channel.Publish("entry", topic, false, false, amqp.Publishing{
- ContentType: "text/plain",
- Body: data,
- }); err != nil {
- global.M2A_LOG.Error("rabbitmq publish failed, err:", zap.Error(err))
- return
- }
- return
- }
- // Receive 接收消息
- func Receive(topic, queuename string, processer MsgProcesser) (err error) {
- if global.M2A_RABBITMQ == nil || global.M2A_RABBITMQ.Connection.IsClosed() {
- err = errors.New("rabbitmq is not connected")
- global.M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err))
- return
- }
- msgList, err := global.M2A_RABBITMQ.Channel.Consume(queuename, "", false, false, false, false, nil)
- if err != nil {
- global.M2A_LOG.Error("rabbitmq receive failed, err:", zap.Error(err))
- return
- }
- go func() {
- for msg := range msgList {
- processer.Process(topic, queuename, &msg.Body)
- msg.Ack(false)
- }
- }()
- return
- }
|