| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100 |
- package initialize
- import (
- "encoding/base64"
- "fmt"
- "mtp20access/client"
- "mtp20access/global"
- rsp "mtp20access/model/mq/response"
- "mtp20access/packet"
- "mtp20access/publish"
- "mtp20access/rabbitmq"
- "mtp20access/res/pb"
- accountSrv "mtp20access/service/account"
- "mtp20access/utils"
- "strconv"
- // "github.com/golang/protobuf/proto"
- "github.com/streadway/amqp"
- "go.uber.org/zap"
- "google.golang.org/protobuf/encoding/protojson"
- "google.golang.org/protobuf/proto"
- )
- func RabbitMQ() *global.RabbitMQ {
- url := global.M2A_CONFIG.Rabbitmq.Url
- connection, err := amqp.Dial(url)
- if err != nil {
- global.M2A_LOG.Error("rabbitmq connect failed, err:", zap.Error(err))
- return nil
- }
- channel, err := connection.Channel()
- if err != nil {
- global.M2A_LOG.Error("rabbitmq open channel failed, err:", zap.Error(err))
- return nil
- }
- global.M2A_LOG.Info("rabbitmq connect successed.")
- return &global.RabbitMQ{
- Connection: connection,
- Channel: channel,
- }
- }
- // MQProc 消息处理对象
- type MQProc struct{}
- // process 消息处理接口
- func (t *MQProc) Process(topic, queuename string, msg *[]byte) {
- // info := fmt.Sprintf("rabbitmq receive message from: topic[%s] queue[%s] contentLen[%d]",
- // topic,
- // queuename,
- // len(string(*msg)))
- // global.M2A_LOG.Info(info)
- if funcode, sessionId, bytes, serialNumber, err := t.getRspProtobuf(msg); err == nil && bytes != nil {
- if sessionId == 0 || funcode == uint32(global.LogoutRsp) || funcode == uint32(global.LoginRsp) {
- // 通知类 或 特殊处理
- t.onNtf(funcode, sessionId, bytes)
- } else {
- // 请求回复W
- // 尝试获取对应异步任务
- if client, exists := client.Clients[int(sessionId)]; exists {
- key := fmt.Sprintf("%v_%v_%v", sessionId, funcode, serialNumber)
- // 银行服务相关的回复流水号是错误的,所以需要特殊处理
- if int(funcode) == global.T2bBankSignRsp ||
- int(funcode) == global.T2bBankCancelSignRsp ||
- int(funcode) == global.T2bBankWithdrawRsp ||
- int(funcode) == global.T2bBankDepositRsp {
- key = fmt.Sprintf("%v_%v", sessionId, funcode)
- }
- asyncTask := client.GetAsyncTask(key)
- if asyncTask != nil {
- rspData := string(*bytes)
- // 判断是否要加密
- if asyncTask.IsEncrypted {
- if b, err := packet.Encrypt(*bytes, packet.AESKey, true); err != nil {
- global.M2A_LOG.Error("总线回复数据加密失败", zap.Error(err))
- return
- } else {
- rspData = base64.StdEncoding.EncodeToString(b)
- }
- }
- // 给客户端回调
- global.M2A_LOG.Info("[S->C]", zap.Any("rsp", funcode), zap.Any("sessionId", sessionId), zap.Any("data", string(rspData)))
- r := rsp.MQBodyRsp{
- FunCode: funcode,
- IsEncrypted: asyncTask.IsEncrypted,
- Data: rspData,
- }
- asyncTask.Rsp <- r
- } else {
- global.M2A_LOG.Info("找不到对应KEY的异步任务", zap.Any("key", key), zap.Any("AsyncMap", client.GetAllAsyncTask()))
- }
- } else {
- global.M2A_LOG.Info("找不到对应的client", zap.Any("sessionId", sessionId))
- }
- }
- }
- }
- func (t *MQProc) onNtf(funcode uint32, sessionId uint32, bytes *[]byte) {
- var clients []*client.Client
- var err error
- var sendBytes *[]byte
- switch int(funcode) {
- case global.LoginRsp: // 用户登录应答 - 主要记录旧Token
- var p pb.LoginRsp
- if err = proto.Unmarshal(*bytes, &p); err != nil {
- global.M2A_LOG.Error("总线数据反序列化失败", zap.Error(err))
- return
- }
- // if bs, e := protojson.Marshal(&p); e != nil {
- // global.M2A_LOG.Error("总线数据反序列化失败", zap.Error(err))
- // return
- // } else {
- // sendBytes = &bs
- // }
- // 获取目标客户
- for i := range client.Clients {
- c := client.Clients[i]
- if strconv.Itoa(int(p.GetUserID())) == c.UserID &&
- strconv.Itoa(int(sessionId)) == c.SessionID {
- // 主要记录旧Token
- c.OldToken = p.GetToken()
- }
- }
- case global.LogoutRsp: // 用户登出应答 - 主要用于接收管理端踢上线
- var p pb.LogoutRsp
- if err = proto.Unmarshal(*bytes, &p); err != nil {
- global.M2A_LOG.Error("总线数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线数据序列化JSON失败", zap.Error(err))
- return
- } else {
- sendBytes = &bs
- }
- // 获取目标客户
- clients = make([]*client.Client, 0)
- for i := range client.Clients {
- c := client.Clients[i]
- if strconv.Itoa(int(p.GetHeader().GetUserID())) == c.UserID {
- clients = append(clients, c)
- }
- }
- case global.MoneyChangedNtf: // 资金变化通知
- var p pb.MoneyChangedNtf
- if err = proto.Unmarshal(*bytes, &p); err != nil {
- global.M2A_LOG.Error("总线数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线数据序列化JSON失败", zap.Error(err))
- return
- } else {
- sendBytes = &bs
- }
- // 获取目标客户
- clients, err = accountSrv.GetClientsByAccountID(*p.AccountID)
- if err != nil {
- return
- }
- case global.PosChangedNtf: // 头寸变化通知
- var p pb.PosChangedNtf
- if err = proto.Unmarshal(*bytes, &p); err != nil {
- global.M2A_LOG.Error("总线数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线数据序列化JSON失败", zap.Error(err))
- return
- } else {
- sendBytes = &bs
- }
- global.M2A_LOG.Info("接收 PosChangedNtf", zap.Any("ntf", funcode), zap.Any("AccountID", *p.AccountID))
- global.M2A_LOG.Info("当前的 Clients", zap.Any("ntf", funcode), zap.Any("client.Clients", client.Clients))
- // 获取目标客户
- clients, err = accountSrv.GetClientsByAccountID(*p.AccountID)
- global.M2A_LOG.Info("找到对应的clients", zap.Any("ntf", funcode), zap.Any("clients", clients))
- if err != nil {
- global.M2A_LOG.Info(fmt.Sprintf("接收头寸变化通知时获取不到对应Client,AccountID:%v", *p.AccountID))
- return
- }
- case global.OrderDealedNtf: // 委托单成交通知
- var p pb.OrderDealedNtf
- if err = proto.Unmarshal(*bytes, &p); err != nil {
- global.M2A_LOG.Error("总线数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线数据序列化JSON失败", zap.Error(err))
- return
- } else {
- sendBytes = &bs
- }
- // 获取目标客户
- clients, err = accountSrv.GetClientsByAccountID(*p.AccountID)
- if err != nil {
- return
- }
- case global.MarketStatusChangeNtf: // 市场状态变更通知
- var p pb.MarketStatusChangeNtf
- if err = proto.Unmarshal(*bytes, &p); err != nil {
- global.M2A_LOG.Error("总线数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线数据序列化JSON失败", zap.Error(err))
- return
- } else {
- sendBytes = &bs
- }
- // 获取目标客户
- clients = make([]*client.Client, 0)
- for i := range client.Clients {
- c := client.Clients[i]
- clients = append(clients, c)
- }
- case global.ListingOrderChangeNtf: // 挂牌委托变更广播通知
- var p pb.ListingOrderChangeNtf
- if err = proto.Unmarshal(*bytes, &p); err != nil {
- global.M2A_LOG.Error("总线数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线数据序列化JSON失败", zap.Error(err))
- return
- } else {
- sendBytes = &bs
- }
- // 获取目标客户
- clients = make([]*client.Client, 0)
- for i := range client.Clients {
- c := client.Clients[i]
- clients = append(clients, c)
- }
- }
- if err == nil && len(clients) > 0 {
- // 组装待发送给客户端的5.0报文包
- b, err := packet.BuildPacket(funcode, 0, 0, *sendBytes, true)
- if err != nil {
- global.M2A_LOG.Error("组装5.0报文失败", zap.Error(err))
- return
- }
- // 发送信息
- for _, item := range clients {
- // c := clients[i]
- sessionId, _ := strconv.Atoi(item.SessionID)
- c := client.Clients[sessionId]
- // c.WriteTradeWsBuf(b)
- // 给客户端通知
- global.M2A_LOG.Info("[S->C]给客户端通知", zap.Any("ntf", funcode), zap.Any("clients", c.LoginID), zap.Any("SessionID", c.SessionID), zap.Any("len", len(b)))
- // 分发给订阅者
- m := make(map[string][]byte)
- m[item.SessionID] = b
- global.M2A_Publish.Publish(publish.Topic_Trading, m)
- }
- }
- global.M2A_LOG.Info("----------退出通知逻辑", zap.Any("funcode", funcode), zap.Any("sessionId", sessionId))
- }
- // getRspProtobuf 将总线回复的数据反序列化为Protobuf
- func (t *MQProc) getRspProtobuf(msg *[]byte) (funcode uint32, sessionId uint32, bytes *[]byte, serialNumber uint32, err error) {
- // 分解总线包信息
- funcode = utils.BytesToUint32((*msg)[0:4])
- sessionId = utils.BytesToUint32((*msg)[4:8])
- b := (*msg)[8:]
- global.M2A_LOG.Info("收到总线消息", zap.Any("funcode", funcode), zap.Any("sessionId", sessionId), zap.Any("len", len(b)))
- switch int(funcode) {
- case global.PosChangedNtf,
- global.MoneyChangedNtf,
- global.OrderDealedNtf,
- global.MarketStatusChangeNtf,
- global.LoginRsp,
- global.LogoutRsp,
- global.ListingOrderChangeNtf: // 资金变化通知等
- global.M2A_LOG.Info("进入通知逻辑----------", zap.Any("funcode", funcode), zap.Any("sessionId", sessionId))
- bytes = &b
- case global.ModifyPwdRsp: // 修改账户密码应答
- var p pb.ModifyPwdRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.UserReceiveInfoRsp: // 新增修改收货地址请求响应
- var p pb.UserReceiveInfoRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.DelUserReceiveInfoRsp: // 删除收货地址请求响应
- var p pb.DelUserReceiveInfoRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.UserReceiveIsDefaultRsp:
- var p pb.UserReceiveIsDefaultRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.UserReceiptInfoRsp: // 新增修改用户发票信息请求响应
- var p pb.UserReceiptInfoRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.DelUserReceiptInfoRsp: // 删除用户发票信息请求响应
- var p pb.DelUserReceiptInfoRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.T2bBankSignRsp: // 签约应答
- var p pb.T2BBankSignRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.T2bBankCancelSignRsp: // 解约应答
- var p pb.T2BBankCancelSignRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.T2bBankWithdrawRsp: // 出金应答
- var p pb.T2BBankWithdrawRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.T2bBankDepositRsp: // 入金应答
- var p pb.T2BBankDepositRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.WarehouseApplyRsp: // 仓库申请应答
- var p pb.WarehouseApplyRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.PerformanceContractedApplyRsp: // 违约申请应答
- var p pb.PerformanceContractedApplyRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.PerformanceDelayApplyRsp: // 延期申请应答
- var p pb.PerformanceDelayApplyRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.PerformanceManualConfirmRsp: // 履约手动确认应答
- var p pb.PerformanceManualConfirmRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.PerformanceModifyContactRsp: // 履约修改联络信息回应
- var p pb.PerformanceModifyContactRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.GoodsFavoriteOperateRsp: // 商品收藏操作接口应答
- var p pb.GoodsFavoriteOperateRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.ZSBuyOrderListingRsp: // 钻石买挂牌接口应答
- var p pb.ZSBuyOrderListingRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.ZSSellOrderListingRsp: // 钻石卖挂牌接口响应
- var p pb.ZSSellOrderListingRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.ZSBuyOrderDestingRsp: // 钻石卖摘牌申请接口响应
- var p pb.ZSBuyOrderDestingRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.ZSSellOrderDestingApplyOperateRsp: // 钻石卖摘牌申请操作接口应答
- var p pb.ZSSellOrderDestingApplyOperateRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.ZSBuyOrderDestingNegPriceRsp: // 买摘牌询价接口应答
- var p pb.ZSBuyOrderDestingNegPriceRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.ZSBuyOrderDestingNegPriceOperateRsp: // 买摘牌询价操作接口应答
- var p pb.ZSBuyOrderDestingNegPriceOperateRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.WRListingCancelOrderRsp: // 挂牌撤单应答
- var p pb.WRListingCancelOrderRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.HdWROrderRsp:
- var p pb.HdWROrderRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.WROutApplyRsp:
- var p pb.WROutApplyRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.ReceiptZSOutApplyRsp: // 钻石出库申请接口响应
- var p pb.ReceiptZSOutApplyRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.SpotPresaleListingOrderRsp: // 铁合金现货预售挂牌接口应答
- var p pb.SpotPresaleListingOrderRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.CJJCMemberOperateRsp: // 出境检测会员操作响应
- var p pb.CJJCMemberOperateRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.BSFWMemberOperateRsp: // 保税服务会员操作响应
- var p pb.BSFWMemberOperateRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.AccountFundInfoRsp: // 账户资金信息响应
- var p pb.AccountFundInfoRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.GZPresaleApplyRsp: // 广钻预售申请响应
- var p pb.GZPresaleApplyRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.GZPresaleOrderRsp: // 广钻预售认购下单响应
- var p pb.GZPresaleOrderRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.GZCenterPurchaseApplyRsp: // 广钻集采申请响应
- var p pb.GZCenterPurchaseApplyRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.GZCenterPurchaseOrderRsp: // 广钻集采认购下单响应
- var p pb.GZCenterPurchaseOrderRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.BSWMSReckonPayRsp: // WMS结算单支付接口响应
- var p pb.BSWMSReckonPayRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.OrderRsp:
- var p pb.OrderRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.CancelOrderRsp:
- var p pb.CancelOrderRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.HolderCloseRsp:
- var p pb.HolderCloseRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.DeliveryOrderRsp:
- var p pb.DeliveryOrderRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.MakeUpDepositRsp:
- var p pb.MakeUpDepositRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.HoldAppendDepositRsp:
- var p pb.HoldAppendDepositRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.OfflineDeliveryRsp:
- var p pb.OfflineDeliveryRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.HdWRDealOrderRsp:
- var p pb.HdWRDealOrderRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.SupplementDepositRsp:
- var p pb.SupplementDepositRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.RefundedDepositRsp:
- var p pb.RefundedDepositRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.MarketOrderDeliveryApplyRsp:
- var p pb.MarketOrderDeliveryApplyRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- case global.DeliveryClientOperatorRsp:
- var p pb.DeliveryClientOperatorRsp
- if err = proto.Unmarshal(b, &p); err != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- }
- if bs, e := protojson.Marshal(&p); e != nil {
- global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
- return
- } else {
- bytes = &bs
- serialNumber = p.GetHeader().GetRequestID()
- }
- }
- return
- }
- // RabbitMQSubscribeTopic 订阅主题
- func RabbitMQSubscribeTopic() (err error) {
- // 订阅需要的总线响应主题
- if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_USER); err != nil {
- global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
- return
- }
- if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_NTF); err != nil {
- global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
- return
- }
- if err = rabbitmq.SubscribeTopic(global.TOPIC_CLIENT_NTF); err != nil {
- global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
- return
- }
- if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_QKERNEL); err != nil {
- global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
- return
- }
- if err = rabbitmq.SubscribeTopic(global.TOPIC_MANAGE_RSP); err != nil {
- global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
- return
- }
- if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_BANK); err != nil {
- global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
- return
- }
- if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_MONEY); err != nil {
- global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
- return
- }
- if err = rabbitmq.SubscribeTopic(global.TOPIC_PERFORMANCE_RSP); err != nil {
- global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
- return
- }
- if err = rabbitmq.SubscribeTopic(global.TOPIC_WAREHOUSE_RECIEPT_RSP); err != nil {
- global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
- return
- }
- if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_TRADE); err != nil {
- global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
- return
- }
- if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_TRADE_GZ); err != nil {
- global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
- return
- }
- if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_CJBS_TRADE_GZ); err != nil {
- global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
- return
- }
- // 铁合金
- if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_PRESALE_THJ); err != nil {
- global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
- return
- }
- if err = rabbitmq.SubscribeTopic(global.TOPIC_MANAGER_THJ_NTF); err != nil {
- global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
- return
- }
- if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_PRESALE_GZ); err != nil {
- global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
- return
- }
- if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_CENTERPURCHASE_GZ); err != nil {
- global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
- return
- }
- if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_ORDER); err != nil {
- global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
- return
- }
- if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_DELIVERY_TRADE); err != nil {
- global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
- return
- }
- global.M2A_LOG.Info("rabbitmq subscribe topic successed.")
- return
- }
- // StartRabbitMQReceive 开始接收总线消息
- func StartRabbitMQReceive() {
- t := &MQProc{}
- go func() {
- for _, subinfo := range rabbitmq.SubInfos {
- rabbitmq.Receive(subinfo.Topic, subinfo.QueueName, t)
- }
- }()
- }
- // InitFuncodeTopic 初始化功能码主题MAP
- func InitFuncodeTopic() {
- if global.M2A_FuncodeTopic == nil {
- global.M2A_FuncodeTopic = make(map[string][]int)
- }
- global.M2A_FuncodeTopic[global.TOPIC_REQ_QKERNEL] = []int{
- global.ModifyPwdReq,
- }
- global.M2A_FuncodeTopic[global.TOPIC_MANAGE_REQ] = []int{
- global.UserReceiveInfoReq,
- global.DelUserReceiveInfoReq,
- global.UserReceiveIsDefaultReq,
- global.UserReceiptInfoReq,
- global.DelUserReceiptInfoReq,
- global.WarehouseApplyReq,
- global.WROutApplyReq,
- }
- global.M2A_FuncodeTopic[global.TOPIC_REQ_BANK] = []int{
- global.T2bBankSignReq,
- global.T2bBankCancelSignReq,
- global.T2bBankWithdrawReq,
- global.T2bBankDepositReq,
- }
- global.M2A_FuncodeTopic[global.TOPIC_REQ_MONEY] = []int{
- global.AccountFundInfoReq,
- }
- global.M2A_FuncodeTopic[global.TOPIC_PERFORMANCE_REQ] = []int{
- global.PerformanceContractedApplyReq,
- global.PerformanceDelayApplyReq,
- global.PerformanceManualConfirmReq,
- global.PerformanceModifyContactReq,
- }
- global.M2A_FuncodeTopic[global.TOPIC_WAREHOUSE_RECIEPT] = []int{
- global.ReceiptZSOutApplyReq,
- }
- global.M2A_FuncodeTopic[global.TOPIC_REQ_WAREHOUSE_TRADE] = []int{
- global.WRListingCancelOrderReq,
- global.HdWROrderReq,
- global.HdWRDealOrderReq,
- }
- global.M2A_FuncodeTopic[global.TOPIC_REQ_WAREHOUSE_TRADE_GZ] = []int{
- global.GoodsFavoriteOperateReq,
- global.ZSBuyOrderListingReq,
- global.ZSSellOrderListingReq,
- global.ZSBuyOrderDestingReq,
- global.ZSSellOrderDestingApplyReq,
- global.ZSSellOrderDestingApplyOperateReq,
- global.ZSBuyOrderDestingNegPriceReq,
- global.ZSBuyOrderDestingNegPriceOperateReq,
- }
- global.M2A_FuncodeTopic[global.TOPIC_REQ_CJBS_TRADE_GZ] = []int{
- global.CJJCMemberOperateReq,
- global.BSFWMemberOperateReq,
- global.BSWMSReckonPayReq,
- }
- // 铁合金
- global.M2A_FuncodeTopic[global.TOPIC_REQ_WAREHOUSE_PRESALE_THJ] = []int{
- global.SpotPresaleListingOrderReq,
- }
- global.M2A_FuncodeTopic[global.TOPIC_REQ_WAREHOUSE_PRESALE_GZ] = []int{
- global.GZPresaleApplyReq,
- global.GZPresaleOrderReq,
- }
- global.M2A_FuncodeTopic[global.TOPIC_REQ_WAREHOUSE_CENTERPURCHASE_GZ] = []int{
- global.GZCenterPurchaseApplyReq,
- global.GZCenterPurchaseOrderReq,
- }
- global.M2A_FuncodeTopic[global.TOPIC_REQ_ORDER] = []int{
- global.OrderReq,
- global.CancelOrderReq,
- global.HolderCloseReq,
- global.MakeUpDepositReq,
- global.HoldAppendDepositReq,
- global.OfflineDeliveryReq,
- global.SupplementDepositReq,
- global.RefundedDepositReq,
- global.MarketOrderDeliveryApplyReq,
- global.DeliveryClientOperatorReq,
- }
- global.M2A_FuncodeTopic[global.TOPIC_REQ_DELIVERY_TRADE] = []int{
- global.DeliveryOrderReq,
- }
- global.M2A_FuncodeTopic[global.TOPIC_REQ_USER] = []int{
- global.LoginReq,
- }
- }
|