| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- package mq
- import (
- "errors"
- "fmt"
- "mtp20access/client"
- "mtp20access/global"
- commonRequest "mtp20access/model/common/request"
- rsp "mtp20access/model/common/response"
- "mtp20access/model/mq/request"
- "mtp20access/model/mq/response"
- "mtp20access/rabbitmq"
- "time"
- "github.com/gin-gonic/gin"
- "go.uber.org/zap"
- )
- // SendMQ 向总线发送信息
- func SendMQ(c *gin.Context, req *request.MQBodyReq) (err error) {
- // 获取请求账号信息
- s, exists := c.Get("claims")
- if !exists {
- err = errors.New("获取请求账号信息异常")
- global.M2A_LOG.Error(err.Error(), zap.Error(err))
- return
- }
- claims := s.(*commonRequest.CustomClaims)
- // 获取登录账户信息
- t, exists := client.Clients[claims.SessionID]
- if !exists {
- err = errors.New("获取登录账户信息异常")
- global.M2A_LOG.Error(err.Error(), zap.Any("SessionID", claims.SessionID), zap.Error(err))
- return
- }
- // 将客户端请求的数据转换成总线使用的Protobuf
- serialNumber := t.GetSerialNumber()
- bytes, err := req.GetProtoBytes(&serialNumber)
- if err != nil {
- return
- }
- if bytes == nil {
- err = errors.New("请求信息序列化失败")
- return
- }
- // 创建异步任务
- key := fmt.Sprintf("%v_%v_%v", t.SessionID, req.FunCodeRsp, serialNumber)
- global.M2A_LOG.Info("创建异步任务", zap.Any("SessionID", t.SessionID), zap.Any("serialNumber", serialNumber))
- // 银行服务相关的回复流水号是错误的,所以需要特殊处理
- if int(req.FunCodeReq) == global.T2bBankSignReq ||
- int(req.FunCodeReq) == global.T2bBankCancelSignReq ||
- int(req.FunCodeReq) == global.T2bBankWithdrawReq ||
- int(req.FunCodeReq) == global.T2bBankDepositReq {
- key = fmt.Sprintf("%v_%v", t.SessionID, req.FunCodeRsp)
- }
- asyncTask := client.AsyncTask{
- FuncodeRsp: req.FunCodeRsp,
- SerialNumber: serialNumber,
- Own: t,
- IsEncrypted: *req.IsEncrypted,
- }
- t.SetAsyncTask(&asyncTask, key)
- // 获取对应主题
- // topic, exists := global.M2A_FuncodeTopic[int(req.FunCodeReq)]
- topic := global.GetTopic(int(req.FunCodeReq))
- if topic == "" {
- global.M2A_LOG.Error(err.Error(), zap.Error(err))
- return
- }
- // 向总线发送业务信息
- packet := &client.MQPacket{
- FunCode: req.FunCodeReq,
- SessionId: uint32(claims.SessionID),
- Data: bytes,
- }
- go rabbitmq.Publish(topic, packet)
- global.M2A_LOG.Info("[C->S]", zap.Any("req", req.FunCodeReq), zap.Any("SessionId", packet.SessionId), zap.Any("data", req.Data))
- // 阻塞线程等待总线回复或超时
- // 这里make时没有指定长度,所以是发送者与接收者的同步操作,发送者会阻塞,所以一般会在下面使用超时来解锁。
- asyncTask.Rsp = make(chan response.MQBodyRsp)
- select {
- case r := <-asyncTask.Rsp: // 总线回复
- rsp.OkWithData(r, c)
- case <-time.After(time.Second * 30): // 超时
- // 删除对应异步任务
- t.DeleteAsyncTask(key)
- rsp.FailWithMessage("业务超时", c)
- }
- asyncTask.Finish()
- return
- }
|