package initialize import ( "encoding/base64" "fmt" "mtp20access/global" rsp "mtp20access/model/mq/response" "mtp20access/packet" "mtp20access/res/pb" "mtp20access/utils" "github.com/golang/protobuf/proto" "github.com/streadway/amqp" "go.uber.org/zap" "google.golang.org/protobuf/encoding/protojson" ) func RabbitMQ() *global.RabbitMQ { url := global.M2A_CONFIG.Rabbitmq.Url connection, err := amqp.Dial(url) if err != nil { global.M2A_LOG.Error("rabbitmq connect failed, err:", zap.Error(err)) return nil } channel, err := connection.Channel() if err != nil { global.M2A_LOG.Error("rabbitmq open channel failed, err:", zap.Error(err)) return nil } global.M2A_LOG.Info("rabbitmq connect successed.") return &global.RabbitMQ{ Connection: connection, Channel: channel, } } // MQProc 消息处理对象 type MQProc struct{} // process 消息处理接口 func (t *MQProc) Process(topic, queuename string, msg *[]byte) { info := fmt.Sprintf("rabbitmq receive message from: topic[%s] queue[%s] contentLen[%d]", topic, queuename, len(string(*msg))) global.M2A_LOG.Info(info) if funcode, sessionId, bytes, serialNumber, err := t.getRspProtobuf(msg); err == nil && bytes != nil { // 尝试获取对应异步任务 if client, exists := global.M2A_Clients[int(sessionId)]; exists { key := fmt.Sprintf("%v_%v_%v", sessionId, funcode, serialNumber) asyncTask := client.GetAsyncTask(key) if asyncTask != nil { rspData := string(*bytes) // 判断是否要加密 if asyncTask.IsEncrypted { if b, err := packet.Encrypt(*bytes, packet.AESKey, true); err != nil { global.M2A_LOG.Error("总线回复数据加密失败", zap.Error(err)) } else { rspData = base64.StdEncoding.EncodeToString(b) } } // 给客户端回调 r := rsp.MQBodyRsp{ FunCode: funcode, IsEncrypted: asyncTask.IsEncrypted, Data: rspData, } asyncTask.Rsp <- r // response.OkWithData(r, asyncTask.C) // 清除异步任务 // asyncTask.Close() fmt.Println() } } } } // getRspProtobuf 将总线回复的数据反序列化为Protobuf func (t *MQProc) getRspProtobuf(msg *[]byte) (funcode uint32, sessionId uint32, bytes *[]byte, serialNumber uint32, err error) { // 分解总线包信息 funcode = utils.BytesToUint32((*msg)[0:4]) sessionId = utils.BytesToUint32((*msg)[4:8]) b := (*msg)[8:] switch int(funcode) { case global.ZSSellOrderListingRsp: // 钻石卖挂牌接口响应 var p pb.ZSSellOrderListingRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } } return } // RabbitMQSubscribeTopic 订阅主题 func RabbitMQSubscribeTopic() (err error) { // 订阅需要的总线响应主题 if err = global.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_TRADE_GZ); err != nil { global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) return } global.M2A_LOG.Info("rabbitmq subscribe topic successed.") return } // StartRabbitMQReceive 开始接收总线消息 func StartRabbitMQReceive() { t := &MQProc{} go func() { for _, subinfo := range global.SubInfos { global.Receive(subinfo.Topic, subinfo.QueueName, t) } }() } // InitFuncodeTopic 初始化功能码主题MAP func InitFuncodeTopic() { if global.M2A_FuncodeTopic == nil { global.M2A_FuncodeTopic = make(map[int]string) } global.M2A_FuncodeTopic[global.ZSSellOrderListingReq] = global.TOPIC_REQ_WAREHOUSE_TRADE_GZ }