package service import ( "fmt" "mtp20_assisted/core" "mtp20_assisted/global" "go.uber.org/zap" ) // InitRabbitMQMsgPreccesser 构建总线信息处理器 func InitRabbitMQMsgPreccesser() (err error) { global.M2A_MSGPROCESSER = make(map[string]global.MsgProcesser) // 铁合金服务通知 // t := THJNtf{} // global.M2A_MSGPROCESSER[global.TOPIC_THJ_NTF] = &t t := new(THJNtf) global.M2A_MSGPROCESSER[global.TOPIC_THJ_NTF] = t // 铁合金服务通知(管理端专用) t1 := new(THJNtf) global.M2A_MSGPROCESSER[global.TOPIC_MANAGER_THJ_NTF] = t1 return } // StartRabbitMQSubscribe 开始向总线创建和绑定队列 func StartRabbitMQSubscribe() (err error) { // 订阅需要的总线主题 // if err = subscribeTopic(global.TOPIC_THJ_NTF); err != nil { // global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) // return // } // 订阅需要的总线主题 for key := range global.M2A_MSGPROCESSER { if err = subscribeTopic(key); err != nil { global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) return } } global.M2A_LOG.Info("rabbitmq subscribe topic success.") return } // StartRabbitMQReceive 开始接收总线消息 func StartRabbitMQReceive() { go func() { for topic, processer := range global.M2A_MSGPROCESSER { core.Receive(topic, processer) } }() global.M2A_LOG.Info("rabbitmq start receiving messages.") } // subscribeTopic 订阅主题 func subscribeTopic(topic string) (err error) { // 创建队列名称 queuename := fmt.Sprintf("mtp20_assisted_%s", topic) // 申明队列 if _, err = global.M2A_RABBITMQ.Channel.QueueDeclare(queuename, true, false, false, true, nil); err != nil { global.M2A_LOG.Error("rabbitmq declear queue failed, err:", zap.Error(err)) return } // 绑定队列 if err = global.M2A_RABBITMQ.Channel.QueueBind(queuename, topic, "entry", false, nil); err != nil { global.M2A_LOG.Error("rabbitmq bind queue failed, err:", zap.Error(err)) return } return }