rabbitmq.go 25 KB


  1. package initialize
  2. import (
  3. "encoding/base64"
  4. "fmt"
  5. "mtp20access/client"
  6. "mtp20access/global"
  7. rsp "mtp20access/model/mq/response"
  8. "mtp20access/packet"
  9. "mtp20access/rabbitmq"
  10. "mtp20access/res/pb"
  11. "mtp20access/utils"
  12. // "github.com/golang/protobuf/proto"
  13. "github.com/streadway/amqp"
  14. "go.uber.org/zap"
  15. "google.golang.org/protobuf/encoding/protojson"
  16. "google.golang.org/protobuf/proto"
  17. )
  18. func RabbitMQ() *global.RabbitMQ {
  19. url := global.M2A_CONFIG.Rabbitmq.Url
  20. connection, err := amqp.Dial(url)
  21. if err != nil {
  22. global.M2A_LOG.Error("rabbitmq connect failed, err:", zap.Error(err))
  23. return nil
  24. }
  25. channel, err := connection.Channel()
  26. if err != nil {
  27. global.M2A_LOG.Error("rabbitmq open channel failed, err:", zap.Error(err))
  28. return nil
  29. }
  30. global.M2A_LOG.Info("rabbitmq connect successed.")
  31. return &global.RabbitMQ{
  32. Connection: connection,
  33. Channel: channel,
  34. }
  35. }
  36. // MQProc 消息处理对象
  37. type MQProc struct{}
  38. // process 消息处理接口
  39. func (t *MQProc) Process(topic, queuename string, msg *[]byte) {
  40. // info := fmt.Sprintf("rabbitmq receive message from: topic[%s] queue[%s] contentLen[%d]",
  41. // topic,
  42. // queuename,
  43. // len(string(*msg)))
  44. // global.M2A_LOG.Info(info)
  45. if funcode, sessionId, bytes, serialNumber, err := t.getRspProtobuf(msg); err == nil && bytes != nil {
  46. if sessionId == 0 {
  47. // 通知类
  48. } else {
  49. // 请求回复
  50. // 尝试获取对应异步任务
  51. if client, exists := client.Clients[int(sessionId)]; exists {
  52. key := fmt.Sprintf("%v_%v_%v", sessionId, funcode, serialNumber)
  53. // 银行服务相关的回复流水号是错误的,所以需要特殊处理
  54. if int(funcode) == global.T2bBankSignRsp ||
  55. int(funcode) == global.T2bBankCancelSignRsp ||
  56. int(funcode) == global.T2bBankWithdrawRsp ||
  57. int(funcode) == global.T2bBankDepositRsp {
  58. key = fmt.Sprintf("%v_%v", sessionId, funcode)
  59. }
  60. asyncTask := client.GetAsyncTask(key)
  61. if asyncTask != nil {
  62. rspData := string(*bytes)
  63. // 判断是否要加密
  64. if asyncTask.IsEncrypted {
  65. if b, err := packet.Encrypt(*bytes, packet.AESKey, true); err != nil {
  66. global.M2A_LOG.Error("总线回复数据加密失败", zap.Error(err))
  67. return
  68. } else {
  69. rspData = base64.StdEncoding.EncodeToString(b)
  70. }
  71. }
  72. // 给客户端回调
  73. global.M2A_LOG.Info("[S->C]", zap.Any("rsp", funcode), zap.Any("sessionId", sessionId), zap.Any("data", string(rspData)))
  74. r := rsp.MQBodyRsp{
  75. FunCode: funcode,
  76. IsEncrypted: asyncTask.IsEncrypted,
  77. Data: rspData,
  78. }
  79. asyncTask.Rsp <- r
  80. } else {
  81. global.M2A_LOG.Info("找不到对应KEY的异步任务", zap.Any("key", key), zap.Any("AsyncMap", client.GetAllAsyncTask()))
  82. }
  83. } else {
  84. global.M2A_LOG.Info("找不到对应的client", zap.Any("sessionId", sessionId))
  85. }
  86. }
  87. }
  88. }
  89. // getRspProtobuf 将总线回复的数据反序列化为Protobuf
  90. func (t *MQProc) getRspProtobuf(msg *[]byte) (funcode uint32, sessionId uint32, bytes *[]byte, serialNumber uint32, err error) {
  91. // 分解总线包信息
  92. funcode = utils.BytesToUint32((*msg)[0:4])
  93. sessionId = utils.BytesToUint32((*msg)[4:8])
  94. b := (*msg)[8:]
  95. global.M2A_LOG.Info("收到总线消息", zap.Any("funcode", funcode), zap.Any("sessionId", sessionId), zap.Any("len", len(b)))
  96. switch int(funcode) {
  97. case global.MoneyChangedNtf: // 资金变化通知
  98. var p pb.MoneyChangedNtf
  99. if err = proto.Unmarshal(b, &p); err != nil {
  100. global.M2A_LOG.Error("总线通知数据反序列化失败", zap.Error(err))
  101. return
  102. }
  103. if bs, e := protojson.Marshal(&p); e != nil {
  104. global.M2A_LOG.Error("总线通知数据反序列化失败", zap.Error(err))
  105. return
  106. } else {
  107. bytes = &bs
  108. serialNumber = p.GetHeader().GetRequestID()
  109. }
  110. case global.ModifyPwdRsp: // 修改账户密码应答
  111. var p pb.ModifyPwdRsp
  112. if err = proto.Unmarshal(b, &p); err != nil {
  113. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  114. return
  115. }
  116. if bs, e := protojson.Marshal(&p); e != nil {
  117. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  118. return
  119. } else {
  120. bytes = &bs
  121. serialNumber = p.GetHeader().GetRequestID()
  122. }
  123. case global.UserReceiveInfoRsp: // 新增修改收货地址请求响应
  124. var p pb.UserReceiveInfoRsp
  125. if err = proto.Unmarshal(b, &p); err != nil {
  126. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  127. return
  128. }
  129. if bs, e := protojson.Marshal(&p); e != nil {
  130. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  131. return
  132. } else {
  133. bytes = &bs
  134. serialNumber = p.GetHeader().GetRequestID()
  135. }
  136. case global.DelUserReceiveInfoRsp: // 删除收货地址请求响应
  137. var p pb.DelUserReceiveInfoRsp
  138. if err = proto.Unmarshal(b, &p); err != nil {
  139. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  140. return
  141. }
  142. if bs, e := protojson.Marshal(&p); e != nil {
  143. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  144. return
  145. } else {
  146. bytes = &bs
  147. serialNumber = p.GetHeader().GetRequestID()
  148. }
  149. case global.UserReceiptInfoRsp: // 新增修改用户发票信息请求响应
  150. var p pb.UserReceiptInfoRsp
  151. if err = proto.Unmarshal(b, &p); err != nil {
  152. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  153. return
  154. }
  155. if bs, e := protojson.Marshal(&p); e != nil {
  156. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  157. return
  158. } else {
  159. bytes = &bs
  160. serialNumber = p.GetHeader().GetRequestID()
  161. }
  162. case global.DelUserReceiptInfoRsp: // 删除用户发票信息请求响应
  163. var p pb.DelUserReceiptInfoRsp
  164. if err = proto.Unmarshal(b, &p); err != nil {
  165. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  166. return
  167. }
  168. if bs, e := protojson.Marshal(&p); e != nil {
  169. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  170. return
  171. } else {
  172. bytes = &bs
  173. serialNumber = p.GetHeader().GetRequestID()
  174. }
  175. case global.T2bBankSignRsp: // 签约应答
  176. var p pb.T2BBankSignRsp
  177. if err = proto.Unmarshal(b, &p); err != nil {
  178. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  179. return
  180. }
  181. if bs, e := protojson.Marshal(&p); e != nil {
  182. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  183. return
  184. } else {
  185. bytes = &bs
  186. serialNumber = p.GetHeader().GetRequestID()
  187. }
  188. case global.T2bBankCancelSignRsp: // 解约应答
  189. var p pb.T2BBankCancelSignRsp
  190. if err = proto.Unmarshal(b, &p); err != nil {
  191. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  192. return
  193. }
  194. if bs, e := protojson.Marshal(&p); e != nil {
  195. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  196. return
  197. } else {
  198. bytes = &bs
  199. serialNumber = p.GetHeader().GetRequestID()
  200. }
  201. case global.T2bBankWithdrawRsp: // 出金应答
  202. var p pb.T2BBankWithdrawRsp
  203. if err = proto.Unmarshal(b, &p); err != nil {
  204. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  205. return
  206. }
  207. if bs, e := protojson.Marshal(&p); e != nil {
  208. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  209. return
  210. } else {
  211. bytes = &bs
  212. serialNumber = p.GetHeader().GetRequestID()
  213. }
  214. case global.T2bBankDepositRsp: // 入金应答
  215. var p pb.T2BBankDepositRsp
  216. if err = proto.Unmarshal(b, &p); err != nil {
  217. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  218. return
  219. }
  220. if bs, e := protojson.Marshal(&p); e != nil {
  221. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  222. return
  223. } else {
  224. bytes = &bs
  225. serialNumber = p.GetHeader().GetRequestID()
  226. }
  227. case global.WarehouseApplyRsp: // 仓库申请应答
  228. var p pb.WarehouseApplyRsp
  229. if err = proto.Unmarshal(b, &p); err != nil {
  230. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  231. return
  232. }
  233. if bs, e := protojson.Marshal(&p); e != nil {
  234. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  235. return
  236. } else {
  237. bytes = &bs
  238. serialNumber = p.GetHeader().GetRequestID()
  239. }
  240. case global.PerformanceContractedApplyRsp: // 违约申请应答
  241. var p pb.PerformanceContractedApplyRsp
  242. if err = proto.Unmarshal(b, &p); err != nil {
  243. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  244. return
  245. }
  246. if bs, e := protojson.Marshal(&p); e != nil {
  247. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  248. return
  249. } else {
  250. bytes = &bs
  251. serialNumber = p.GetHeader().GetRequestID()
  252. }
  253. case global.PerformanceDelayApplyRsp: // 延期申请应答
  254. var p pb.PerformanceDelayApplyRsp
  255. if err = proto.Unmarshal(b, &p); err != nil {
  256. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  257. return
  258. }
  259. if bs, e := protojson.Marshal(&p); e != nil {
  260. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  261. return
  262. } else {
  263. bytes = &bs
  264. serialNumber = p.GetHeader().GetRequestID()
  265. }
  266. case global.PerformanceManualConfirmRsp: // 履约手动确认应答
  267. var p pb.PerformanceManualConfirmRsp
  268. if err = proto.Unmarshal(b, &p); err != nil {
  269. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  270. return
  271. }
  272. if bs, e := protojson.Marshal(&p); e != nil {
  273. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  274. return
  275. } else {
  276. bytes = &bs
  277. serialNumber = p.GetHeader().GetRequestID()
  278. }
  279. case global.PerformanceModifyContactRsp: // 履约修改联络信息回应
  280. var p pb.PerformanceModifyContactRsp
  281. if err = proto.Unmarshal(b, &p); err != nil {
  282. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  283. return
  284. }
  285. if bs, e := protojson.Marshal(&p); e != nil {
  286. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  287. return
  288. } else {
  289. bytes = &bs
  290. serialNumber = p.GetHeader().GetRequestID()
  291. }
  292. case global.GoodsFavoriteOperateRsp: // 商品收藏操作接口应答
  293. var p pb.GoodsFavoriteOperateRsp
  294. if err = proto.Unmarshal(b, &p); err != nil {
  295. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  296. return
  297. }
  298. if bs, e := protojson.Marshal(&p); e != nil {
  299. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  300. return
  301. } else {
  302. bytes = &bs
  303. serialNumber = p.GetHeader().GetRequestID()
  304. }
  305. case global.ZSBuyOrderListingRsp: // 钻石买挂牌接口应答
  306. var p pb.ZSBuyOrderListingRsp
  307. if err = proto.Unmarshal(b, &p); err != nil {
  308. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  309. return
  310. }
  311. if bs, e := protojson.Marshal(&p); e != nil {
  312. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  313. return
  314. } else {
  315. bytes = &bs
  316. serialNumber = p.GetHeader().GetRequestID()
  317. }
  318. case global.ZSSellOrderListingRsp: // 钻石卖挂牌接口响应
  319. var p pb.ZSSellOrderListingRsp
  320. if err = proto.Unmarshal(b, &p); err != nil {
  321. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  322. return
  323. }
  324. if bs, e := protojson.Marshal(&p); e != nil {
  325. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  326. return
  327. } else {
  328. bytes = &bs
  329. serialNumber = p.GetHeader().GetRequestID()
  330. }
  331. case global.ZSBuyOrderDestingRsp: // 钻石卖摘牌申请接口响应
  332. var p pb.ZSBuyOrderDestingRsp
  333. if err = proto.Unmarshal(b, &p); err != nil {
  334. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  335. return
  336. }
  337. if bs, e := protojson.Marshal(&p); e != nil {
  338. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  339. return
  340. } else {
  341. bytes = &bs
  342. serialNumber = p.GetHeader().GetRequestID()
  343. }
  344. case global.ZSSellOrderDestingApplyOperateRsp: // 钻石卖摘牌申请操作接口应答
  345. var p pb.ZSSellOrderDestingApplyOperateRsp
  346. if err = proto.Unmarshal(b, &p); err != nil {
  347. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  348. return
  349. }
  350. if bs, e := protojson.Marshal(&p); e != nil {
  351. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  352. return
  353. } else {
  354. bytes = &bs
  355. serialNumber = p.GetHeader().GetRequestID()
  356. }
  357. case global.ZSBuyOrderDestingNegPriceRsp: // 买摘牌询价接口应答
  358. var p pb.ZSBuyOrderDestingNegPriceRsp
  359. if err = proto.Unmarshal(b, &p); err != nil {
  360. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  361. return
  362. }
  363. if bs, e := protojson.Marshal(&p); e != nil {
  364. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  365. return
  366. } else {
  367. bytes = &bs
  368. serialNumber = p.GetHeader().GetRequestID()
  369. }
  370. case global.ZSBuyOrderDestingNegPriceOperateRsp: // 买摘牌询价操作接口应答
  371. var p pb.ZSBuyOrderDestingNegPriceOperateRsp
  372. if err = proto.Unmarshal(b, &p); err != nil {
  373. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  374. return
  375. }
  376. if bs, e := protojson.Marshal(&p); e != nil {
  377. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  378. return
  379. } else {
  380. bytes = &bs
  381. serialNumber = p.GetHeader().GetRequestID()
  382. }
  383. case global.WRListingCancelOrderRsp: // 挂牌撤单应答
  384. var p pb.WRListingCancelOrderRsp
  385. if err = proto.Unmarshal(b, &p); err != nil {
  386. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  387. return
  388. }
  389. if bs, e := protojson.Marshal(&p); e != nil {
  390. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  391. return
  392. } else {
  393. bytes = &bs
  394. serialNumber = p.GetHeader().GetRequestID()
  395. }
  396. case global.HdWROrderRsp:
  397. var p pb.HdWROrderRsp
  398. if err = proto.Unmarshal(b, &p); err != nil {
  399. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  400. return
  401. }
  402. if bs, e := protojson.Marshal(&p); e != nil {
  403. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  404. return
  405. } else {
  406. bytes = &bs
  407. serialNumber = p.GetHeader().GetRequestID()
  408. }
  409. case global.WROutApplyRsp:
  410. var p pb.WROutApplyRsp
  411. if err = proto.Unmarshal(b, &p); err != nil {
  412. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  413. return
  414. }
  415. if bs, e := protojson.Marshal(&p); e != nil {
  416. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  417. return
  418. } else {
  419. bytes = &bs
  420. serialNumber = p.GetHeader().GetRequestID()
  421. }
  422. case global.ReceiptZSOutApplyRsp: // 钻石出库申请接口响应
  423. var p pb.ReceiptZSOutApplyRsp
  424. if err = proto.Unmarshal(b, &p); err != nil {
  425. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  426. return
  427. }
  428. if bs, e := protojson.Marshal(&p); e != nil {
  429. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  430. return
  431. } else {
  432. bytes = &bs
  433. serialNumber = p.GetHeader().GetRequestID()
  434. }
  435. case global.SpotPresaleListingOrderRsp: // 铁合金现货预售挂牌接口应答
  436. var p pb.SpotPresaleListingOrderRsp
  437. if err = proto.Unmarshal(b, &p); err != nil {
  438. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  439. return
  440. }
  441. if bs, e := protojson.Marshal(&p); e != nil {
  442. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  443. return
  444. } else {
  445. bytes = &bs
  446. serialNumber = p.GetHeader().GetRequestID()
  447. }
  448. case global.CJJCMemberOperateRsp: // 出境检测会员操作响应
  449. var p pb.CJJCMemberOperateRsp
  450. if err = proto.Unmarshal(b, &p); err != nil {
  451. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  452. return
  453. }
  454. if bs, e := protojson.Marshal(&p); e != nil {
  455. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  456. return
  457. } else {
  458. bytes = &bs
  459. serialNumber = p.GetHeader().GetRequestID()
  460. }
  461. case global.BSFWMemberOperateRsp: // 保税服务会员操作响应
  462. var p pb.BSFWMemberOperateRsp
  463. if err = proto.Unmarshal(b, &p); err != nil {
  464. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  465. return
  466. }
  467. if bs, e := protojson.Marshal(&p); e != nil {
  468. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  469. return
  470. } else {
  471. bytes = &bs
  472. serialNumber = p.GetHeader().GetRequestID()
  473. }
  474. case global.AccountFundInfoRsp: // 账户资金信息响应
  475. var p pb.AccountFundInfoRsp
  476. if err = proto.Unmarshal(b, &p); err != nil {
  477. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  478. return
  479. }
  480. if bs, e := protojson.Marshal(&p); e != nil {
  481. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  482. return
  483. } else {
  484. bytes = &bs
  485. serialNumber = p.GetHeader().GetRequestID()
  486. }
  487. case global.GZPresaleApplyRsp: // 广钻预售申请响应
  488. var p pb.GZPresaleApplyRsp
  489. if err = proto.Unmarshal(b, &p); err != nil {
  490. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  491. return
  492. }
  493. if bs, e := protojson.Marshal(&p); e != nil {
  494. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  495. return
  496. } else {
  497. bytes = &bs
  498. serialNumber = p.GetHeader().GetRequestID()
  499. }
  500. case global.GZPresaleOrderRsp: // 广钻预售认购下单响应
  501. var p pb.GZPresaleOrderRsp
  502. if err = proto.Unmarshal(b, &p); err != nil {
  503. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  504. return
  505. }
  506. if bs, e := protojson.Marshal(&p); e != nil {
  507. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  508. return
  509. } else {
  510. bytes = &bs
  511. serialNumber = p.GetHeader().GetRequestID()
  512. }
  513. case global.GZCenterPurchaseApplyRsp: // 广钻集采申请响应
  514. var p pb.GZCenterPurchaseApplyRsp
  515. if err = proto.Unmarshal(b, &p); err != nil {
  516. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  517. return
  518. }
  519. if bs, e := protojson.Marshal(&p); e != nil {
  520. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  521. return
  522. } else {
  523. bytes = &bs
  524. serialNumber = p.GetHeader().GetRequestID()
  525. }
  526. case global.GZCenterPurchaseOrderRsp: // 广钻集采认购下单响应
  527. var p pb.GZCenterPurchaseOrderRsp
  528. if err = proto.Unmarshal(b, &p); err != nil {
  529. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  530. return
  531. }
  532. if bs, e := protojson.Marshal(&p); e != nil {
  533. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  534. return
  535. } else {
  536. bytes = &bs
  537. serialNumber = p.GetHeader().GetRequestID()
  538. }
  539. case global.BSWMSReckonPayRsp: // WMS结算单支付接口响应
  540. var p pb.BSWMSReckonPayRsp
  541. if err = proto.Unmarshal(b, &p); err != nil {
  542. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  543. return
  544. }
  545. if bs, e := protojson.Marshal(&p); e != nil {
  546. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  547. return
  548. } else {
  549. bytes = &bs
  550. serialNumber = p.GetHeader().GetRequestID()
  551. }
  552. case global.OrderRsp:
  553. var p pb.OrderRsp
  554. if err = proto.Unmarshal(b, &p); err != nil {
  555. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  556. return
  557. }
  558. if bs, e := protojson.Marshal(&p); e != nil {
  559. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  560. return
  561. } else {
  562. bytes = &bs
  563. serialNumber = p.GetHeader().GetRequestID()
  564. }
  565. case global.CancelOrderRsp:
  566. var p pb.CancelOrderRsp
  567. if err = proto.Unmarshal(b, &p); err != nil {
  568. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  569. return
  570. }
  571. if bs, e := protojson.Marshal(&p); e != nil {
  572. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  573. return
  574. } else {
  575. bytes = &bs
  576. serialNumber = p.GetHeader().GetRequestID()
  577. }
  578. case global.HolderCloseRsp:
  579. var p pb.HolderCloseRsp
  580. if err = proto.Unmarshal(b, &p); err != nil {
  581. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  582. return
  583. }
  584. if bs, e := protojson.Marshal(&p); e != nil {
  585. global.M2A_LOG.Error("总线回复数据反序列化失败", zap.Error(err))
  586. return
  587. } else {
  588. bytes = &bs
  589. serialNumber = p.GetHeader().GetRequestID()
  590. }
  591. }
  592. return
  593. }
  594. // RabbitMQSubscribeTopic 订阅主题
  595. func RabbitMQSubscribeTopic() (err error) {
  596. // 订阅需要的总线响应主题
  597. if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_NTF); err != nil {
  598. global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
  599. return
  600. }
  601. if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_QKERNEL); err != nil {
  602. global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
  603. return
  604. }
  605. if err = rabbitmq.SubscribeTopic(global.TOPIC_MANAGE_RSP); err != nil {
  606. global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
  607. return
  608. }
  609. if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_BANK); err != nil {
  610. global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
  611. return
  612. }
  613. if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_MONEY); err != nil {
  614. global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
  615. return
  616. }
  617. if err = rabbitmq.SubscribeTopic(global.TOPIC_PERFORMANCE_RSP); err != nil {
  618. global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
  619. return
  620. }
  621. if err = rabbitmq.SubscribeTopic(global.TOPIC_WAREHOUSE_RECIEPT_RSP); err != nil {
  622. global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
  623. return
  624. }
  625. if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_TRADE); err != nil {
  626. global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
  627. return
  628. }
  629. if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_TRADE_GZ); err != nil {
  630. global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
  631. return
  632. }
  633. if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_CJBS_TRADE_GZ); err != nil {
  634. global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
  635. return
  636. }
  637. // 铁合金
  638. if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_PRESALE_THJ); err != nil {
  639. global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
  640. return
  641. }
  642. if err = rabbitmq.SubscribeTopic(global.TOPIC_MANAGER_THJ_NTF); err != nil {
  643. global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
  644. return
  645. }
  646. if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_PRESALE_GZ); err != nil {
  647. global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
  648. return
  649. }
  650. if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_WAREHOUSE_CENTERPURCHASE_GZ); err != nil {
  651. global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
  652. return
  653. }
  654. if err = rabbitmq.SubscribeTopic(global.TOPIC_RSP_ORDER); err != nil {
  655. global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
  656. return
  657. }
  658. global.M2A_LOG.Info("rabbitmq subscribe topic successed.")
  659. return
  660. }
  661. // StartRabbitMQReceive 开始接收总线消息
  662. func StartRabbitMQReceive() {
  663. t := &MQProc{}
  664. go func() {
  665. for _, subinfo := range rabbitmq.SubInfos {
  666. rabbitmq.Receive(subinfo.Topic, subinfo.QueueName, t)
  667. }
  668. }()
  669. }
  670. // InitFuncodeTopic 初始化功能码主题MAP
  671. func InitFuncodeTopic() {
  672. if global.M2A_FuncodeTopic == nil {
  673. global.M2A_FuncodeTopic = make(map[string][]int)
  674. }
  675. global.M2A_FuncodeTopic[global.TOPIC_REQ_QKERNEL] = []int{
  676. global.ModifyPwdReq,
  677. }
  678. global.M2A_FuncodeTopic[global.TOPIC_MANAGE_REQ] = []int{
  679. global.UserReceiveInfoReq,
  680. global.DelUserReceiveInfoReq,
  681. global.UserReceiveIsDefaultReq,
  682. global.UserReceiptInfoReq,
  683. global.DelUserReceiptInfoReq,
  684. global.WarehouseApplyReq,
  685. global.WROutApplyReq,
  686. }
  687. global.M2A_FuncodeTopic[global.TOPIC_REQ_BANK] = []int{
  688. global.T2bBankSignReq,
  689. global.T2bBankCancelSignReq,
  690. global.T2bBankWithdrawReq,
  691. global.T2bBankDepositReq,
  692. }
  693. global.M2A_FuncodeTopic[global.TOPIC_REQ_MONEY] = []int{
  694. global.AccountFundInfoReq,
  695. }
  696. global.M2A_FuncodeTopic[global.TOPIC_PERFORMANCE_REQ] = []int{
  697. global.PerformanceContractedApplyReq,
  698. global.PerformanceDelayApplyReq,
  699. global.PerformanceManualConfirmReq,
  700. global.PerformanceModifyContactReq,
  701. }
  702. global.M2A_FuncodeTopic[global.TOPIC_WAREHOUSE_RECIEPT] = []int{
  703. global.ReceiptZSOutApplyReq,
  704. }
  705. global.M2A_FuncodeTopic[global.TOPIC_REQ_WAREHOUSE_TRADE] = []int{
  706. global.WRListingCancelOrderReq,
  707. global.HdWROrderReq,
  708. }
  709. global.M2A_FuncodeTopic[global.TOPIC_REQ_WAREHOUSE_TRADE_GZ] = []int{
  710. global.GoodsFavoriteOperateReq,
  711. global.ZSBuyOrderListingReq,
  712. global.ZSSellOrderListingReq,
  713. global.ZSBuyOrderDestingReq,
  714. global.ZSSellOrderDestingApplyReq,
  715. global.ZSSellOrderDestingApplyOperateReq,
  716. global.ZSBuyOrderDestingNegPriceReq,
  717. global.ZSBuyOrderDestingNegPriceOperateReq,
  718. }
  719. global.M2A_FuncodeTopic[global.TOPIC_REQ_CJBS_TRADE_GZ] = []int{
  720. global.CJJCMemberOperateReq,
  721. global.BSFWMemberOperateReq,
  722. }
  723. // 铁合金
  724. global.M2A_FuncodeTopic[global.TOPIC_REQ_WAREHOUSE_PRESALE_THJ] = []int{
  725. global.SpotPresaleListingOrderReq,
  726. }
  727. global.M2A_FuncodeTopic[global.TOPIC_REQ_WAREHOUSE_PRESALE_GZ] = []int{
  728. global.GZPresaleApplyReq,
  729. global.GZPresaleOrderReq,
  730. }
  731. global.M2A_FuncodeTopic[global.TOPIC_REQ_WAREHOUSE_CENTERPURCHASE_GZ] = []int{
  732. global.GZCenterPurchaseApplyReq,
  733. global.GZCenterPurchaseOrderReq,
  734. global.BSWMSReckonPayReq,
  735. }
  736. global.M2A_FuncodeTopic[global.TOPIC_REQ_ORDER] = []int{
  737. global.OrderReq,
  738. global.CancelOrderReq,
  739. global.HolderCloseReq,
  740. }
  741. }