| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- package service
- import (
- "fmt"
- "mtp20_assisted/core"
- "mtp20_assisted/global"
- "go.uber.org/zap"
- )
- // InitRabbitMQMsgPreccesser 构建总线信息处理器
- func InitRabbitMQMsgPreccesser() (err error) {
- global.M2A_MSGPROCESSER = make(map[string]global.MsgProcesser)
- // 铁合金服务通知
- // t := THJNtf{}
- // global.M2A_MSGPROCESSER[global.TOPIC_THJ_NTF] = &t
- t := new(THJNtf)
- global.M2A_MSGPROCESSER[global.TOPIC_THJ_NTF] = t
- // 铁合金服务通知(管理端专用)
- t1 := new(THJNtf)
- global.M2A_MSGPROCESSER[global.TOPIC_MANAGER_THJ_NTF] = t1
- return
- }
- // StartRabbitMQSubscribe 开始向总线创建和绑定队列
- func StartRabbitMQSubscribe() (err error) {
- // 订阅需要的总线主题
- // if err = subscribeTopic(global.TOPIC_THJ_NTF); err != nil {
- // global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
- // return
- // }
- // 订阅需要的总线主题
- for key := range global.M2A_MSGPROCESSER {
- if err = subscribeTopic(key); err != nil {
- global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
- return
- }
- }
- global.M2A_LOG.Info("rabbitmq subscribe topic success.")
- return
- }
- // StartRabbitMQReceive 开始接收总线消息
- func StartRabbitMQReceive() {
- go func() {
- for topic, processer := range global.M2A_MSGPROCESSER {
- core.Receive(topic, processer)
- }
- }()
- global.M2A_LOG.Info("rabbitmq start receiving messages.")
- }
- // subscribeTopic 订阅主题
- func subscribeTopic(topic string) (err error) {
- // 创建队列名称
- queuename := fmt.Sprintf("mtp20_assisted_%s", topic)
- // 申明队列
- if _, err = global.M2A_RABBITMQ.Channel.QueueDeclare(queuename, true, false, false, true, nil); err != nil {
- global.M2A_LOG.Error("rabbitmq declear queue failed, err:", zap.Error(err))
- return
- }
- // 绑定队列
- if err = global.M2A_RABBITMQ.Channel.QueueBind(queuename, topic, "entry", false, nil); err != nil {
- global.M2A_LOG.Error("rabbitmq bind queue failed, err:", zap.Error(err))
- return
- }
- return
- }
|