package mq import ( "errors" "fmt" "mtp20access/client" "mtp20access/global" commonRequest "mtp20access/model/common/request" rsp "mtp20access/model/common/response" "mtp20access/model/mq/request" "mtp20access/model/mq/response" "mtp20access/rabbitmq" "time" "github.com/gin-gonic/gin" "go.uber.org/zap" ) // SendMQ 向总线发送信息 func SendMQ(c *gin.Context, req *request.MQBodyReq) (err error) { // 获取请求账号信息 s, exists := c.Get("claims") if !exists { err = errors.New("获取请求账号信息异常") global.M2A_LOG.Error(err.Error(), zap.Error(err)) return } claims := s.(*commonRequest.CustomClaims) // 获取登录账户信息 t, exists := client.Clients[claims.SessionID] if !exists { err = errors.New("获取登录账户信息异常") global.M2A_LOG.Error(err.Error(), zap.Error(err)) return } // 将客户端请求的数据转换成总线使用的Protobuf serialNumber := t.GetSerialNumber() bytes, err := req.GetProtoBytes(&serialNumber) if err != nil { return } if bytes == nil { err = errors.New("请求信息序列化失败") return } // 创建异步任务 key := fmt.Sprintf("%v_%v_%v", t.SessionID, req.FunCodeRsp, serialNumber) global.M2A_LOG.Info("创建异步任务", zap.Any("SessionID", t.SessionID), zap.Any("serialNumber", serialNumber)) // 银行服务相关的回复流水号是错误的,所以需要特殊处理 if int(req.FunCodeReq) == global.T2bBankSignReq || int(req.FunCodeReq) == global.T2bBankCancelSignReq || int(req.FunCodeReq) == global.T2bBankWithdrawReq || int(req.FunCodeReq) == global.T2bBankDepositReq { key = fmt.Sprintf("%v_%v", t.SessionID, req.FunCodeRsp) } asyncTask := client.AsyncTask{ FuncodeRsp: req.FunCodeRsp, SerialNumber: serialNumber, Own: t, IsEncrypted: *req.IsEncrypted, } t.SetAsyncTask(&asyncTask, key) // 获取对应主题 // topic, exists := global.M2A_FuncodeTopic[int(req.FunCodeReq)] topic := global.GetTopic(int(req.FunCodeReq)) if topic == "" { global.M2A_LOG.Error(err.Error(), zap.Error(err)) return } // 向总线发送业务信息 packet := &client.MQPacket{ FunCode: req.FunCodeReq, SessionId: uint32(claims.SessionID), Data: bytes, } go rabbitmq.Publish(topic, packet) global.M2A_LOG.Info("[C->S]", zap.Any("req", req.FunCodeReq), zap.Any("SessionId", packet.SessionId), zap.Any("data", req.Data)) // 阻塞线程等待总线回复或超时 // 这里make时没有指定长度,所以是发送者与接收者的同步操作,发送者会阻塞,所以一般会在下面使用超时来解锁。 asyncTask.Rsp = make(chan response.MQBodyRsp) select { case r := <-asyncTask.Rsp: // 总线回复 rsp.OkWithData(r, c) case <-time.After(time.Second * 30): // 超时 // 删除对应异步任务 t.DeleteAsyncTask(key) rsp.FailWithMessage("业务超时", c) } asyncTask.Finish() return }