| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425 |
- package busdriver
- import (
- "bytes"
- "encoding/hex"
- "errors"
- "fmt"
- "mtp2_if/config"
- "mtp2_if/packet"
- "time"
- "github.com/streadway/amqp"
- )
- // SubscribeInfo 订阅信息结构
- type SubscribeInfo struct {
- Topic string
- QueueName string
- }
- // MsgProcesser 消息处理者接口定义
- type MsgProcesser interface {
- process(string, string, *string)
- }
- // MqConn 连接指针
- var MqConn *amqp.Connection
- // MqChannel channel指针
- var MqChannel *amqp.Channel
- // SerName 服务名称用与创建队列
- var SerName string = "mtp2_if"
- // SubInfos 订阅信息数组
- var SubInfos []SubscribeInfo
- // TestProc 消息处理对象
- type TestProc struct {
- msgContent string
- }
- // 消息处理接口
- func (t *TestProc) process(topic string, queuename string, msg *string) {
- info := fmt.Sprintf("Receive message from: time[%s] topic[%s] queue[%s] content[%s]",
- time.Now().Format("2006-01-02 15:04:05"),
- topic,
- queuename,
- *msg)
- fmt.Println(info)
- }
- // MqConnect 连接到rabbitMQ
- func MqConnect() error {
- // 尝试解密
- var url []byte
- ciphertext, _ := hex.DecodeString(config.SerCfg.MqCfg.Url)
- if len(ciphertext) > 8 {
- ciphertext = ciphertext[4 : len(ciphertext)-8]
- url, _ = packet.Decrypt(ciphertext, packet.AESKey, true)
- if url == nil {
- url = []byte(config.SerCfg.MqCfg.Url)
- }
- } else {
- url = []byte(config.SerCfg.MqCfg.Url)
- }
- var err error
- if MqChannel == nil || MqConn.IsClosed() {
- MqConn, err = amqp.Dial(string(url))
- if err != nil {
- fmt.Printf("rabbitmq connect failed:%s\n", err)
- return err
- }
- MqChannel, err = MqConn.Channel()
- if err != nil {
- fmt.Printf("rabbitmq channel open failed:%s\n", err)
- return err
- }
- }
- return nil
- }
- // MqClose 断开rabbitMQ连接
- func MqClose() {
- MqChannel.Close()
- MqConn.Close()
- }
- // MqExchange 创建交换
- func MqExchange() error {
- if MqChannel == nil {
- return errors.New("rabbitMQ is not init")
- }
- err := MqChannel.ExchangeDeclarePassive("entry", "topic", false, false, false, true, nil)
- if err != nil {
- // 注册交换机
- err = MqChannel.ExchangeDeclare("entry", "topic", false, false, false, true, nil)
- if err != nil {
- fmt.Printf("declear exchange failed:%s \n", err)
- return err
- }
- }
- return nil
- }
- // MqStart 启动接收任务
- func MqStart() {
- msg := "test message"
- t := &TestProc{
- msgContent: msg,
- }
- go func() {
- // 遍历所有订阅的主题,启动接收协程
- for _, subinfo := range SubInfos {
- Receive(subinfo.Topic, subinfo.QueueName, t)
- }
- }()
- }
- // SubscribeTopic 订阅主题
- func SubscribeTopic(topic string) error {
- // 创建队列名称
- queuename := fmt.Sprintf("%s_%s", SerName, topic)
- // 判断连接
- var err error
- if MqChannel == nil {
- err = MqConnect()
- if err != nil {
- return err
- }
- }
- // 申明队列
- //_, err = MqChannel.QueueDeclarePassive(queuename, true, false, false, true, nil)
- //if err != nil {
- _, err = MqChannel.QueueDeclare(queuename, true, false, false, true, nil)
- if err != nil {
- fmt.Printf("declear queue failed:%s \n", err)
- return err
- }
- //}
- // 绑定队列
- err = MqChannel.QueueBind(queuename, topic, "entry", false, nil)
- if err != nil {
- fmt.Printf("bind queue failed:%s \n", err)
- return err
- }
- // 添加订阅信息
- subinfo := SubscribeInfo{
- Topic: topic,
- QueueName: queuename,
- }
- SubInfos = append(SubInfos, subinfo)
- return nil
- }
- // Publish 发送消息
- func Publish(topic, msg string) error {
- var err error
- if MqChannel == nil || MqConn.IsClosed() {
- err = MqConnect()
- if err != nil {
- return err
- }
- }
- err = MqChannel.Publish("entry", topic, false, false, amqp.Publishing{
- ContentType: "text/plain",
- Body: []byte(msg),
- })
- if err != nil {
- fmt.Printf("publish failed:%s \n", err)
- return err
- }
- return nil
- }
- // Receive 接收消息
- func Receive(topic, queuename string, processer MsgProcesser) error {
- var err error
- if MqChannel == nil || MqConn.IsClosed() {
- err = MqConnect()
- if err != nil {
- return err
- }
- }
- msgList, err := MqChannel.Consume(queuename, "", false, false, false, false, nil)
- if err != nil {
- fmt.Printf("get receive channel failed:%s \n", err)
- return err
- }
- go func() {
- for msg := range msgList {
- msgstr := bytesToString(&(msg.Body))
- processer.process(topic, queuename, msgstr)
- msg.Ack(false)
- }
- }()
- return nil
- }
- func bytesToString(b *[]byte) *string {
- s := bytes.NewBuffer(*b)
- r := s.String()
- return &r
- }
- /*
- // 定义生产者接口
- type Producer interface {
- MsgContent() string
- }
- // 定义消费者接口
- type Receiver interface {
- Consumer([]byte) error
- }
- // 定义队列交换机对象
- type QueueExchange struct {
- QueName string // 队列名称
- RtKey string // 路由key
- ExName string // 交换机名称
- ExType string // 交换机类型
- }
- // 定义RabbitMQ对象
- type RabbitMQ struct {
- connection *amqp.Connection
- channel *amqp.Channel
- queueName string
- routintKey string
- exchangeName string
- exchangeType string
- producerList []Producer
- receiverList []Receiver
- mutex sync.Mutex
- }
- // 打开rabbitmq链接
- func (r *RabbitMQ) mqConnect() {
- url := "amqp://guest:guest@192.168.31.136:5020/test"
- var err error
- mqConn, err = amqp.Dial(url)
- if err != nil {
- fmt.Printf("rabbitmq connect failed:%s\n", err)
- return
- }
- r.connection = mqConn
- mqChannel, err = mqConn.Channel()
- if err != nil {
- fmt.Printf("rabbitmq channel open failed:%s\n", err)
- return
- }
- r.channel = mqChannel
- }
- // 关闭rabbitmq链接
- func (r *RabbitMQ) mqClose() {
- err := r.channel.Close()
- if err != nil {
- fmt.Printf("rabbitmq channel close failed:%s\n", err)
- return
- }
- err = r.connection.Close()
- if err != nil {
- fmt.Printf("rabbitmq disconnect failed:%s\n", err)
- return
- }
- }
- // 创建一个队列交换机对象
- func New(q *QueueExchange) *RabbitMQ {
- return &RabbitMQ{
- queueName: q.QueName,
- routintKey: q.RtKey,
- exchangeName: q.ExName,
- exchangeType: q.ExType,
- }
- }
- // 注册发送指定队列指定路由的生产者
- func (r *RabbitMQ) RegiestrProducer(producer Producer) {
- r.producerList = append(r.producerList, producer)
- }
- // 发送任务
- func (r *RabbitMQ) listenProducer(producer Producer) {
- // 验证链接是否正常,否则重连
- if r.channel == nil {
- r.mqConnect()
- }
- // 检查队列是否存在
- _, err := r.channel.QueueDeclarePassive(r.queueName, true, false, false, true, nil)
- if err != nil {
- // 队列不存在,声明队列
- _, err = r.channel.QueueDeclare(r.queueName, true, false, false, true, nil)
- if err != nil {
- fmt.Printf("declear queue failed:%s \n", err)
- return
- }
- }
- // 队列绑定
- err = r.channel.QueueBind(r.queueName, r.routintKey, r.exchangeName, true, nil)
- if err != nil {
- fmt.Printf("bind queue failed:%s \n", err)
- return
- }
- // 检查交换机是否存在
- err = r.channel.ExchangeDeclarePassive(r.exchangeName, r.exchangeType, true, false, false, true, nil)
- if err != nil {
- // 注册交换机
- err = r.channel.ExchangeDeclare(r.exchangeName, r.exchangeType, true, false, false, true, nil)
- if err != nil {
- fmt.Printf("declear exchange failed:%s \n", err)
- return
- }
- }
- // 发送任务消息
- err = r.channel.Publish(r.exchangeName, r.exchangeType, false, false, amqp.Publishing{
- ContentType: "test/plain",
- Body: []byte(producer.MsgContent()),
- })
- if err != nil {
- fmt.Printf("send message failed:%s \n", err)
- return
- }
- }
- // 注册接收指定队列指定路由的接收者
- func (r *RabbitMQ) RegiesterReceiver(receiver Receiver) {
- r.mutex.Lock()
- r.receiverList = append(r.receiverList, receiver)
- r.mutex.Unlock()
- }
- // 监听接收者接收任务
- func (r *RabbitMQ) listenReceiver(receiver Receiver) {
- // 结束后关闭链接
- defer r.mqClose()
- // 验证链接是否正常
- if r.channel == nil {
- r.mqConnect()
- }
- // 检查队列是否存在
- _, err := r.channel.QueueDeclarePassive(r.queueName, true, false, false, true, nil)
- if err != nil {
- // 队列不存在,声明队列
- _, err = r.channel.QueueDeclare(r.queueName, true, false, false, true, nil)
- if err != nil {
- fmt.Printf("declear queue failed:%s \n", err)
- return
- }
- }
- // 队列绑定
- err = r.channel.QueueBind(r.queueName, r.routintKey, r.exchangeName, true, nil)
- if err != nil {
- fmt.Printf("bind queue failed:%s \n", err)
- return
- }
- // 获取消费通道
- err = r.channel.Qos(1, 0, true)
- msgList, err := r.channel.Consume(r.queueName, "", false, false, false, false, nil)
- if err != nil {
- fmt.Printf("get receive channel failed:%s \n", err)
- return
- }
- for msg := range msgList {
- err = receiver.Consumer(msg.Body)
- if err != nil {
- err = msg.Ack(true)
- if err != nil {
- fmt.Printf("ack message failed:%s \n", err)
- return
- }
- } else {
- err = msg.Ack(false)
- if err != nil {
- fmt.Printf("ack message failed:%s \n", err)
- return
- }
- return
- }
- }
- }
- // 启动RabbitMQk客户端
- func (r *RabbitMQ) Start() {
- // 开启监听生产者发送任务
- for _, producer := range r.producerList {
- go r.listenProducer(producer)
- }
- // 开启监听生产者发送任务
- for _, receiver := range r.receiverList {
- go r.listenReceiver(receiver)
- }
- time.Sleep(1 * time.Second)
- }
- */
|