package busdriver import ( "bytes" "encoding/hex" "errors" "fmt" "mtp2_if/config" "mtp2_if/packet" "time" "github.com/streadway/amqp" ) // SubscribeInfo 订阅信息结构 type SubscribeInfo struct { Topic string QueueName string } // MsgProcesser 消息处理者接口定义 type MsgProcesser interface { process(string, string, *string) } // MqConn 连接指针 var MqConn *amqp.Connection // MqChannel channel指针 var MqChannel *amqp.Channel // SerName 服务名称用与创建队列 var SerName string = "mtp2_if" // SubInfos 订阅信息数组 var SubInfos []SubscribeInfo // TestProc 消息处理对象 type TestProc struct { msgContent string } // 消息处理接口 func (t *TestProc) process(topic string, queuename string, msg *string) { info := fmt.Sprintf("Receive message from: time[%s] topic[%s] queue[%s] content[%s]", time.Now().Format("2006-01-02 15:04:05"), topic, queuename, *msg) fmt.Println(info) } // MqConnect 连接到rabbitMQ func MqConnect() error { // 尝试解密 var url []byte ciphertext, _ := hex.DecodeString(config.SerCfg.MqCfg.Url) if len(ciphertext) > 8 { ciphertext = ciphertext[4 : len(ciphertext)-8] url, _ = packet.Decrypt(ciphertext, packet.AESKey, true) if url == nil { url = []byte(config.SerCfg.MqCfg.Url) } } else { url = []byte(config.SerCfg.MqCfg.Url) } var err error if MqChannel == nil || MqConn.IsClosed() { MqConn, err = amqp.Dial(string(url)) if err != nil { fmt.Printf("rabbitmq connect failed:%s\n", err) return err } MqChannel, err = MqConn.Channel() if err != nil { fmt.Printf("rabbitmq channel open failed:%s\n", err) return err } } return nil } // MqClose 断开rabbitMQ连接 func MqClose() { MqChannel.Close() MqConn.Close() } // MqExchange 创建交换 func MqExchange() error { if MqChannel == nil { return errors.New("rabbitMQ is not init") } err := MqChannel.ExchangeDeclarePassive("entry", "topic", false, false, false, true, nil) if err != nil { // 注册交换机 err = MqChannel.ExchangeDeclare("entry", "topic", false, false, false, true, nil) if err != nil { fmt.Printf("declear exchange failed:%s \n", err) return err } } return nil } // MqStart 启动接收任务 func MqStart() { msg := "test message" t := &TestProc{ msgContent: msg, } go func() { // 遍历所有订阅的主题,启动接收协程 for _, subinfo := range SubInfos { Receive(subinfo.Topic, subinfo.QueueName, t) } }() } // SubscribeTopic 订阅主题 func SubscribeTopic(topic string) error { // 创建队列名称 queuename := fmt.Sprintf("%s_%s", SerName, topic) // 判断连接 var err error if MqChannel == nil { err = MqConnect() if err != nil { return err } } // 申明队列 //_, err = MqChannel.QueueDeclarePassive(queuename, true, false, false, true, nil) //if err != nil { _, err = MqChannel.QueueDeclare(queuename, true, false, false, true, nil) if err != nil { fmt.Printf("declear queue failed:%s \n", err) return err } //} // 绑定队列 err = MqChannel.QueueBind(queuename, topic, "entry", false, nil) if err != nil { fmt.Printf("bind queue failed:%s \n", err) return err } // 添加订阅信息 subinfo := SubscribeInfo{ Topic: topic, QueueName: queuename, } SubInfos = append(SubInfos, subinfo) return nil } // Publish 发送消息 func Publish(topic, msg string) error { var err error if MqChannel == nil || MqConn.IsClosed() { err = MqConnect() if err != nil { return err } } err = MqChannel.Publish("entry", topic, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), }) if err != nil { fmt.Printf("publish failed:%s \n", err) return err } return nil } // Receive 接收消息 func Receive(topic, queuename string, processer MsgProcesser) error { var err error if MqChannel == nil || MqConn.IsClosed() { err = MqConnect() if err != nil { return err } } msgList, err := MqChannel.Consume(queuename, "", false, false, false, false, nil) if err != nil { fmt.Printf("get receive channel failed:%s \n", err) return err } go func() { for msg := range msgList { msgstr := bytesToString(&(msg.Body)) processer.process(topic, queuename, msgstr) msg.Ack(false) } }() return nil } func bytesToString(b *[]byte) *string { s := bytes.NewBuffer(*b) r := s.String() return &r } /* // 定义生产者接口 type Producer interface { MsgContent() string } // 定义消费者接口 type Receiver interface { Consumer([]byte) error } // 定义队列交换机对象 type QueueExchange struct { QueName string // 队列名称 RtKey string // 路由key ExName string // 交换机名称 ExType string // 交换机类型 } // 定义RabbitMQ对象 type RabbitMQ struct { connection *amqp.Connection channel *amqp.Channel queueName string routintKey string exchangeName string exchangeType string producerList []Producer receiverList []Receiver mutex sync.Mutex } // 打开rabbitmq链接 func (r *RabbitMQ) mqConnect() { url := "amqp://guest:guest@192.168.31.136:5020/test" var err error mqConn, err = amqp.Dial(url) if err != nil { fmt.Printf("rabbitmq connect failed:%s\n", err) return } r.connection = mqConn mqChannel, err = mqConn.Channel() if err != nil { fmt.Printf("rabbitmq channel open failed:%s\n", err) return } r.channel = mqChannel } // 关闭rabbitmq链接 func (r *RabbitMQ) mqClose() { err := r.channel.Close() if err != nil { fmt.Printf("rabbitmq channel close failed:%s\n", err) return } err = r.connection.Close() if err != nil { fmt.Printf("rabbitmq disconnect failed:%s\n", err) return } } // 创建一个队列交换机对象 func New(q *QueueExchange) *RabbitMQ { return &RabbitMQ{ queueName: q.QueName, routintKey: q.RtKey, exchangeName: q.ExName, exchangeType: q.ExType, } } // 注册发送指定队列指定路由的生产者 func (r *RabbitMQ) RegiestrProducer(producer Producer) { r.producerList = append(r.producerList, producer) } // 发送任务 func (r *RabbitMQ) listenProducer(producer Producer) { // 验证链接是否正常,否则重连 if r.channel == nil { r.mqConnect() } // 检查队列是否存在 _, err := r.channel.QueueDeclarePassive(r.queueName, true, false, false, true, nil) if err != nil { // 队列不存在,声明队列 _, err = r.channel.QueueDeclare(r.queueName, true, false, false, true, nil) if err != nil { fmt.Printf("declear queue failed:%s \n", err) return } } // 队列绑定 err = r.channel.QueueBind(r.queueName, r.routintKey, r.exchangeName, true, nil) if err != nil { fmt.Printf("bind queue failed:%s \n", err) return } // 检查交换机是否存在 err = r.channel.ExchangeDeclarePassive(r.exchangeName, r.exchangeType, true, false, false, true, nil) if err != nil { // 注册交换机 err = r.channel.ExchangeDeclare(r.exchangeName, r.exchangeType, true, false, false, true, nil) if err != nil { fmt.Printf("declear exchange failed:%s \n", err) return } } // 发送任务消息 err = r.channel.Publish(r.exchangeName, r.exchangeType, false, false, amqp.Publishing{ ContentType: "test/plain", Body: []byte(producer.MsgContent()), }) if err != nil { fmt.Printf("send message failed:%s \n", err) return } } // 注册接收指定队列指定路由的接收者 func (r *RabbitMQ) RegiesterReceiver(receiver Receiver) { r.mutex.Lock() r.receiverList = append(r.receiverList, receiver) r.mutex.Unlock() } // 监听接收者接收任务 func (r *RabbitMQ) listenReceiver(receiver Receiver) { // 结束后关闭链接 defer r.mqClose() // 验证链接是否正常 if r.channel == nil { r.mqConnect() } // 检查队列是否存在 _, err := r.channel.QueueDeclarePassive(r.queueName, true, false, false, true, nil) if err != nil { // 队列不存在,声明队列 _, err = r.channel.QueueDeclare(r.queueName, true, false, false, true, nil) if err != nil { fmt.Printf("declear queue failed:%s \n", err) return } } // 队列绑定 err = r.channel.QueueBind(r.queueName, r.routintKey, r.exchangeName, true, nil) if err != nil { fmt.Printf("bind queue failed:%s \n", err) return } // 获取消费通道 err = r.channel.Qos(1, 0, true) msgList, err := r.channel.Consume(r.queueName, "", false, false, false, false, nil) if err != nil { fmt.Printf("get receive channel failed:%s \n", err) return } for msg := range msgList { err = receiver.Consumer(msg.Body) if err != nil { err = msg.Ack(true) if err != nil { fmt.Printf("ack message failed:%s \n", err) return } } else { err = msg.Ack(false) if err != nil { fmt.Printf("ack message failed:%s \n", err) return } return } } } // 启动RabbitMQk客户端 func (r *RabbitMQ) Start() { // 开启监听生产者发送任务 for _, producer := range r.producerList { go r.listenProducer(producer) } // 开启监听生产者发送任务 for _, receiver := range r.receiverList { go r.listenReceiver(receiver) } time.Sleep(1 * time.Second) } */