package initialize import ( "encoding/base64" "fmt" "mtp20access/client" "mtp20access/global" rsp "mtp20access/model/mq/response" "mtp20access/packet" "mtp20access/rabbitmq" "mtp20access/res/pb" accountSrv "mtp20access/service/account" "mtp20access/utils" "strconv" // "github.com/golang/protobuf/proto" "github.com/streadway/amqp" "go.uber.org/zap" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" ) func RabbitMQ() *global.RabbitMQ { url := global.M2A_CONFIG.Rabbitmq.Url connection, err := amqp.Dial(url) if err != nil { global.M2A_LOG.Error("rabbitmq connect failed, err:", zap.Error(err)) return nil } channel, err := connection.Channel() if err != nil { global.M2A_LOG.Error("rabbitmq open channel failed, err:", zap.Error(err)) return nil } global.M2A_LOG.Info("rabbitmq connect successed.") return &global.RabbitMQ{ Connection: connection, Channel: channel, } } // MQProc 消息处理对象 type MQProc struct{} // process 消息处理接口 func (t *MQProc) Process(topic, queuename string, msg *[]byte) { // info := fmt.Sprintf("rabbitmq receive message from: topic[%s] queue[%s] contentLen[%d]", // topic, // queuename, // len(string(*msg))) // global.M2A_LOG.Info(info) if funcode, sessionId, bytes, serialNumber, err := t.getRspProtobuf(msg); err == nil && bytes != nil { if sessionId == 0 || funcode == uint32(global.LogoutRsp) || funcode == uint32(global.LoginRsp) { // 通知类 或 特殊处理 t.onNtf(funcode, sessionId, bytes) } else { // 请求回复W // 尝试获取对应异步任务 if client, exists := client.Clients[int(sessionId)]; exists { key := fmt.Sprintf("%v_%v_%v", sessionId, funcode, serialNumber) // 银行服务相关的回复流水号是错误的,所以需要特殊处理 if int(funcode) == global.T2bBankSignRsp || int(funcode) == global.T2bBankCancelSignRsp || int(funcode) == global.T2bBankWithdrawRsp || int(funcode) == global.T2bBankDepositRsp { key = fmt.Sprintf("%v_%v", sessionId, funcode) } asyncTask := client.GetAsyncTask(key) if asyncTask != nil { rspData := string(*bytes) // 判断是否要加密 if asyncTask.IsEncrypted { if b, err := packet.Encrypt(*bytes, packet.AESKey, true); err != nil { global.M2A_LOG.Error("总线回复数据加密失败", zap.Error(err)) return } else { rspData = base64.StdEncoding.EncodeToString(b) } } // 给客户端回调 global.M2A_LOG.Info("[S->C]", zap.Any("rsp", funcode), zap.Any("sessionId", sessionId), zap.Any("data", string(rspData))) r := rsp.MQBodyRsp{ FunCode: funcode, IsEncrypted: asyncTask.IsEncrypted, Data: rspData, } asyncTask.Rsp <- r } else { global.M2A_LOG.Info("找不到对应KEY的异步任务", zap.Any("key", key), zap.Any("AsyncMap", client.GetAllAsyncTask())) } } else { global.M2A_LOG.Info("找不到对应的client", zap.Any("sessionId", sessionId)) } } } } func (t *MQProc) onNtf(funcode uint32, sessionId uint32, bytes *[]byte) { var clients []*client.Client var err error var sendBytes *[]byte switch int(funcode) { case global.LoginRsp: // 用户登录应答 - 主要记录旧Token var p pb.LoginRsp if err = proto.Unmarshal(*bytes, &p); err != nil { global.M2A_LOG.Error("总线数据反序列化失败", zap.Error(err)) return } // if bs, e := protojson.Marshal(&p); e != nil { // global.M2A_LOG.Error("总线数据反序列化失败", zap.Error(err)) // return // } else { // sendBytes = &bs // } // 获取目标客户 for i := range client.Clients { c := client.Clients[i] if strconv.Itoa(int(p.GetUserID())) == c.UserID && strconv.Itoa(int(sessionId)) == c.SessionID { // 主要记录旧Token c.OldToken = p.GetToken() } } case global.LogoutRsp: // 用户登出应答 - 主要用于接收管理端踢上线 var p pb.LogoutRsp if err = proto.Unmarshal(*bytes, &p); err != nil { global.M2A_LOG.Error("总线数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线数据序列化JSON失败", zap.Error(err)) return } else { sendBytes = &bs } // 获取目标客户 clients = make([]*client.Client, 0) for i := range client.Clients { c := client.Clients[i] if strconv.Itoa(int(p.GetHeader().GetUserID())) == c.UserID { clients = append(clients, c) } } case global.MoneyChangedNtf: // 资金变化通知 var p pb.MoneyChangedNtf if err = proto.Unmarshal(*bytes, &p); err != nil { global.M2A_LOG.Error("总线数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线数据序列化JSON失败", zap.Error(err)) return } else { sendBytes = &bs } // 获取目标客户 clients, err = accountSrv.GetClientsByAccountID(*p.AccountID) if err != nil { return } case global.PosChangedNtf: // 头寸变化通知 var p pb.PosChangedNtf if err = proto.Unmarshal(*bytes, &p); err != nil { global.M2A_LOG.Error("总线数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线数据序列化JSON失败", zap.Error(err)) return } else { sendBytes = &bs } // 获取目标客户 clients, err = accountSrv.GetClientsByAccountID(*p.AccountID) if err != nil { return } case global.OrderDealedNtf: // 委托单成交通知 var p pb.OrderDealedNtf if err = proto.Unmarshal(*bytes, &p); err != nil { global.M2A_LOG.Error("总线数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线数据序列化JSON失败", zap.Error(err)) return } else { sendBytes = &bs } // 获取目标客户 clients, err = accountSrv.GetClientsByAccountID(*p.AccountID) if err != nil { return } case global.MarketStatusChangeNtf: // 市场状态变更通知 var p pb.MarketStatusChangeNtf if err = proto.Unmarshal(*bytes, &p); err != nil { global.M2A_LOG.Error("总线数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线数据序列化JSON失败", zap.Error(err)) return } else { sendBytes = &bs } // 获取目标客户 clients = make([]*client.Client, 0) for i := range client.Clients { c := client.Clients[i] clients = append(clients, c) } case global.ListingOrderChangeNtf: // 挂牌委托变更广播通知 var p pb.ListingOrderChangeNtf if err = proto.Unmarshal(*bytes, &p); err != nil { global.M2A_LOG.Error("总线数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线数据序列化JSON失败", zap.Error(err)) return } else { sendBytes = &bs } // 获取目标客户 clients = make([]*client.Client, 0) for i := range client.Clients { c := client.Clients[i] clients = append(clients, c) } } if err == nil && len(clients) > 0 { // 组装待发送给客户端的5.0报文包 b, err := packet.BuildPacket(funcode, 0, 0, *sendBytes, true) if err != nil { global.M2A_LOG.Error("组装5.0报文失败", zap.Error(err)) return } // 发送信息 for i := range clients { c := clients[i] c.WriteTradeWsBuf(b) // 给客户端通知 global.M2A_LOG.Info("[S->C]", zap.Any("ntf", funcode), zap.Any("clients", c.LoginID), zap.Any("len", len(b))) } } } // getRspProtobuf 将总线回复的数据反序列化为Protobuf func (t *MQProc) getRspProtobuf(msg *[]byte) (funcode uint32, sessionId uint32, bytes *[]byte, serialNumber uint32, err error) { // 分解总线包信息 funcode = utils.BytesToUint32((*msg)[0:4]) sessionId = utils.BytesToUint32((*msg)[4:8]) b := (*msg)[8:] global.M2A_LOG.Info("收到总线消息", zap.Any("funcode", funcode), zap.Any("sessionId", sessionId), zap.Any("len", len(b))) switch int(funcode) { case global.PosChangedNtf, global.MoneyChangedNtf, global.OrderDealedNtf, global.MarketStatusChangeNtf, global.LoginRsp, global.LogoutRsp, global.ListingOrderChangeNtf: // 资金变化通知等 bytes = &b case global.ModifyPwdRsp: // 修改账户密码应答 var p pb.ModifyPwdRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.UserReceiveInfoRsp: // 新增修改收货地址请求响应 var p pb.UserReceiveInfoRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.DelUserReceiveInfoRsp: // 删除收货地址请求响应 var p pb.DelUserReceiveInfoRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.UserReceiveIsDefaultRsp: var p pb.UserReceiveIsDefaultRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.UserReceiptInfoRsp: // 新增修改用户发票信息请求响应 var p pb.UserReceiptInfoRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.DelUserReceiptInfoRsp: // 删除用户发票信息请求响应 var p pb.DelUserReceiptInfoRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.T2bBankSignRsp: // 签约应答 var p pb.T2BBankSignRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.T2bBankCancelSignRsp: // 解约应答 var p pb.T2BBankCancelSignRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.T2bBankWithdrawRsp: // 出金应答 var p pb.T2BBankWithdrawRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.T2bBankDepositRsp: // 入金应答 var p pb.T2BBankDepositRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.WarehouseApplyRsp: // 仓库申请应答 var p pb.WarehouseApplyRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.PerformanceContractedApplyRsp: // 违约申请应答 var p pb.PerformanceContractedApplyRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.PerformanceDelayApplyRsp: // 延期申请应答 var p pb.PerformanceDelayApplyRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.PerformanceManualConfirmRsp: // 履约手动确认应答 var p pb.PerformanceManualConfirmRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.PerformanceModifyContactRsp: // 履约修改联络信息回应 var p pb.PerformanceModifyContactRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.GoodsFavoriteOperateRsp: // 商品收藏操作接口应答 var p pb.GoodsFavoriteOperateRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.ZSBuyOrderListingRsp: // 钻石买挂牌接口应答 var p pb.ZSBuyOrderListingRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.ZSSellOrderListingRsp: // 钻石卖挂牌接口响应 var p pb.ZSSellOrderListingRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.ZSBuyOrderDestingRsp: // 钻石卖摘牌申请接口响应 var p pb.ZSBuyOrderDestingRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.ZSSellOrderDestingApplyOperateRsp: // 钻石卖摘牌申请操作接口应答 var p pb.ZSSellOrderDestingApplyOperateRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.ZSBuyOrderDestingNegPriceRsp: // 买摘牌询价接口应答 var p pb.ZSBuyOrderDestingNegPriceRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.ZSBuyOrderDestingNegPriceOperateRsp: // 买摘牌询价操作接口应答 var p pb.ZSBuyOrderDestingNegPriceOperateRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.WRListingCancelOrderRsp: // 挂牌撤单应答 var p pb.WRListingCancelOrderRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.HdWROrderRsp: var p pb.HdWROrderRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.WROutApplyRsp: var p pb.WROutApplyRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.ReceiptZSOutApplyRsp: // 钻石出库申请接口响应 var p pb.ReceiptZSOutApplyRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.SpotPresaleListingOrderRsp: // 铁合金现货预售挂牌接口应答 var p pb.SpotPresaleListingOrderRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.CJJCMemberOperateRsp: // 出境检测会员操作响应 var p pb.CJJCMemberOperateRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.BSFWMemberOperateRsp: // 保税服务会员操作响应 var p pb.BSFWMemberOperateRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.AccountFundInfoRsp: // 账户资金信息响应 var p pb.AccountFundInfoRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.GZPresaleApplyRsp: // 广钻预售申请响应 var p pb.GZPresaleApplyRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.GZPresaleOrderRsp: // 广钻预售认购下单响应 var p pb.GZPresaleOrderRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.GZCenterPurchaseApplyRsp: // 广钻集采申请响应 var p pb.GZCenterPurchaseApplyRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.GZCenterPurchaseOrderRsp: // 广钻集采认购下单响应 var p pb.GZCenterPurchaseOrderRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.BSWMSReckonPayRsp: // WMS结算单支付接口响应 var p pb.BSWMSReckonPayRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.OrderRsp: var p pb.OrderRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.CancelOrderRsp: var p pb.CancelOrderRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.HolderCloseRsp: var p pb.HolderCloseRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.DeliveryOrderRsp: var p pb.DeliveryOrderRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.MakeUpDepositRsp: var p pb.MakeUpDepositRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.HoldAppendDepositRsp: var p pb.HoldAppendDepositRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.OfflineDeliveryRsp: var p pb.OfflineDeliveryRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.HdWRDealOrderRsp: var p pb.HdWRDealOrderRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.SupplementDepositRsp: var p pb.SupplementDepositRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.RefundedDepositRsp: var p pb.RefundedDepositRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.MarketOrderDeliveryApplyRsp: var p pb.MarketOrderDeliveryApplyRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } case global.DeliveryClientOperatorRsp: var p pb.DeliveryClientOperatorRsp if err = proto.Unmarshal(b, &p); err != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } if bs, e := protojson.Marshal(&p); e != nil { global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err)) return } else { bytes = &bs serialNumber = p.GetHeader().GetRequestID() } } return } // RabbitMQSubscribeTopic 订阅主题 func RabbitMQSubscribeTopic() (err error) { // 订阅需要的总线响应主题 if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_USER); err != nil { global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) return } if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_NTF); err != nil { global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) return } if err = rabbitmq.SubscribeTopic(global.TOPIC_CLIENT_NTF); err != nil { global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) return } if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_QKERNEL); err != nil { global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) return } if err = rabbitmq.SubscribeTopic(global.TOPIC_MANAGE_RSP); err != nil { global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) return } if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_BANK); err != nil { global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) return } if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_MONEY); err != nil { global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) return } if err = rabbitmq.SubscribeTopic(global.TOPIC_PERFORMANCE_RSP); err != nil { global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) return } if err = rabbitmq.SubscribeTopic(global.TOPIC_WAREHOUSE_RECIEPT_RSP); err != nil { global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) return } if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_TRADE); err != nil { global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) return } if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_TRADE_GZ); err != nil { global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) return } if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_CJBS_TRADE_GZ); err != nil { global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) return } // 铁合金 if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_PRESALE_THJ); err != nil { global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) return } if err = rabbitmq.SubscribeTopic(global.TOPIC_MANAGER_THJ_NTF); err != nil { global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) return } if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_PRESALE_GZ); err != nil { global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) return } if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_CENTERPURCHASE_GZ); err != nil { global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) return } if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_ORDER); err != nil { global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) return } if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_DELIVERY_TRADE); err != nil { global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err)) return } global.M2A_LOG.Info("rabbitmq subscribe topic successed.") return } // StartRabbitMQReceive 开始接收总线消息 func StartRabbitMQReceive() { t := &MQProc{} go func() { for _, subinfo := range rabbitmq.SubInfos { rabbitmq.Receive(subinfo.Topic, subinfo.QueueName, t) } }() } // InitFuncodeTopic 初始化功能码主题MAP func InitFuncodeTopic() { if global.M2A_FuncodeTopic == nil { global.M2A_FuncodeTopic = make(map[string][]int) } global.M2A_FuncodeTopic[global.TOPIC_REQ_QKERNEL] = []int{ global.ModifyPwdReq, } global.M2A_FuncodeTopic[global.TOPIC_MANAGE_REQ] = []int{ global.UserReceiveInfoReq, global.DelUserReceiveInfoReq, global.UserReceiveIsDefaultReq, global.UserReceiptInfoReq, global.DelUserReceiptInfoReq, global.WarehouseApplyReq, global.WROutApplyReq, } global.M2A_FuncodeTopic[global.TOPIC_REQ_BANK] = []int{ global.T2bBankSignReq, global.T2bBankCancelSignReq, global.T2bBankWithdrawReq, global.T2bBankDepositReq, } global.M2A_FuncodeTopic[global.TOPIC_REQ_MONEY] = []int{ global.AccountFundInfoReq, } global.M2A_FuncodeTopic[global.TOPIC_PERFORMANCE_REQ] = []int{ global.PerformanceContractedApplyReq, global.PerformanceDelayApplyReq, global.PerformanceManualConfirmReq, global.PerformanceModifyContactReq, } global.M2A_FuncodeTopic[global.TOPIC_WAREHOUSE_RECIEPT] = []int{ global.ReceiptZSOutApplyReq, } global.M2A_FuncodeTopic[global.TOPIC_REQ_WAREHOUSE_TRADE] = []int{ global.WRListingCancelOrderReq, global.HdWROrderReq, global.HdWRDealOrderReq, } global.M2A_FuncodeTopic[global.TOPIC_REQ_WAREHOUSE_TRADE_GZ] = []int{ global.GoodsFavoriteOperateReq, global.ZSBuyOrderListingReq, global.ZSSellOrderListingReq, global.ZSBuyOrderDestingReq, global.ZSSellOrderDestingApplyReq, global.ZSSellOrderDestingApplyOperateReq, global.ZSBuyOrderDestingNegPriceReq, global.ZSBuyOrderDestingNegPriceOperateReq, } global.M2A_FuncodeTopic[global.TOPIC_REQ_CJBS_TRADE_GZ] = []int{ global.CJJCMemberOperateReq, global.BSFWMemberOperateReq, global.BSWMSReckonPayReq, } // 铁合金 global.M2A_FuncodeTopic[global.TOPIC_REQ_WAREHOUSE_PRESALE_THJ] = []int{ global.SpotPresaleListingOrderReq, } global.M2A_FuncodeTopic[global.TOPIC_REQ_WAREHOUSE_PRESALE_GZ] = []int{ global.GZPresaleApplyReq, global.GZPresaleOrderReq, } global.M2A_FuncodeTopic[global.TOPIC_REQ_WAREHOUSE_CENTERPURCHASE_GZ] = []int{ global.GZCenterPurchaseApplyReq, global.GZCenterPurchaseOrderReq, } global.M2A_FuncodeTopic[global.TOPIC_REQ_ORDER] = []int{ global.OrderReq, global.CancelOrderReq, global.HolderCloseReq, global.MakeUpDepositReq, global.HoldAppendDepositReq, global.OfflineDeliveryReq, global.SupplementDepositReq, global.RefundedDepositReq, global.MarketOrderDeliveryApplyReq, global.DeliveryClientOperatorReq, } global.M2A_FuncodeTopic[global.TOPIC_REQ_DELIVERY_TRADE] = []int{ global.DeliveryOrderReq, } global.M2A_FuncodeTopic[global.TOPIC_REQ_USER] = []int{ global.LoginReq, } }