Pārlūkot izejas kodu

增加“发送总线通知信息”接口

zhou.xiaoning 3 gadi atpakaļ
vecāks
revīzija
5bcc050340

+ 35 - 0
api/v1/mq/ntf.go

@@ -0,0 +1,35 @@
+package mq
+
+import (
+	"mtp20access/global"
+	"mtp20access/model/common/response"
+	"mtp20access/model/mq/request"
+	"mtp20access/service/mq"
+
+	"github.com/gin-gonic/gin"
+	"go.uber.org/zap"
+)
+
+// SendNtfToMQ 发送总线通知信息
+// @Summary 总线通知
+// @accept  application/json
+// @Produce application/json
+// @Param   data body     request.MQNtfReq              true "入参"
+// @Success 200  {object} response.Response{msg=string} "出参"
+// @Router  /MQ/SendNtfToMQ [post]
+// @Tags    总线业务
+func SendNtfToMQ(c *gin.Context) {
+	req := request.MQNtfReq{}
+	if err := c.ShouldBindJSON(&req); err != nil {
+		response.FailWithMessage("入参不正确", c)
+		return
+	} else {
+		global.M2A_LOG.Info("[C->S]", zap.Any("req", req.FunCode), zap.Any("data", req.Data))
+	}
+
+	if err := mq.SendNtfMQ(c, &req); err != nil {
+		response.FailWithMessage(err.Error(), c)
+	}
+
+	response.OkWithMessage("发送成功", c)
+}

+ 66 - 0
docs/docs.go

@@ -116,6 +116,51 @@ const docTemplate = `{
                     }
                 }
             }
+        },
+        "/MQ/SendNtfToMQ": {
+            "post": {
+                "consumes": [
+                    "application/json"
+                ],
+                "produces": [
+                    "application/json"
+                ],
+                "tags": [
+                    "总线业务"
+                ],
+                "summary": "总线通知",
+                "parameters": [
+                    {
+                        "description": "入参",
+                        "name": "data",
+                        "in": "body",
+                        "required": true,
+                        "schema": {
+                            "$ref": "#/definitions/request.MQNtfReq"
+                        }
+                    }
+                ],
+                "responses": {
+                    "200": {
+                        "description": "出参",
+                        "schema": {
+                            "allOf": [
+                                {
+                                    "$ref": "#/definitions/response.Response"
+                                },
+                                {
+                                    "type": "object",
+                                    "properties": {
+                                        "msg": {
+                                            "type": "string"
+                                        }
+                                    }
+                                }
+                            ]
+                        }
+                    }
+                }
+            }
         }
     },
     "definitions": {
@@ -167,6 +212,27 @@ const docTemplate = `{
                 }
             }
         },
+        "request.MQNtfReq": {
+            "type": "object",
+            "required": [
+                "data",
+                "funCode"
+            ],
+            "properties": {
+                "data": {
+                    "description": "数据",
+                    "type": "string"
+                },
+                "funCode": {
+                    "description": "功能码",
+                    "type": "integer"
+                },
+                "topic": {
+                    "description": "主题",
+                    "type": "string"
+                }
+            }
+        },
         "response.LoginRsp": {
             "type": "object",
             "properties": {

+ 66 - 0
docs/swagger.json

@@ -107,6 +107,51 @@
                     }
                 }
             }
+        },
+        "/MQ/SendNtfToMQ": {
+            "post": {
+                "consumes": [
+                    "application/json"
+                ],
+                "produces": [
+                    "application/json"
+                ],
+                "tags": [
+                    "总线业务"
+                ],
+                "summary": "总线通知",
+                "parameters": [
+                    {
+                        "description": "入参",
+                        "name": "data",
+                        "in": "body",
+                        "required": true,
+                        "schema": {
+                            "$ref": "#/definitions/request.MQNtfReq"
+                        }
+                    }
+                ],
+                "responses": {
+                    "200": {
+                        "description": "出参",
+                        "schema": {
+                            "allOf": [
+                                {
+                                    "$ref": "#/definitions/response.Response"
+                                },
+                                {
+                                    "type": "object",
+                                    "properties": {
+                                        "msg": {
+                                            "type": "string"
+                                        }
+                                    }
+                                }
+                            ]
+                        }
+                    }
+                }
+            }
         }
     },
     "definitions": {
@@ -158,6 +203,27 @@
                 }
             }
         },
+        "request.MQNtfReq": {
+            "type": "object",
+            "required": [
+                "data",
+                "funCode"
+            ],
+            "properties": {
+                "data": {
+                    "description": "数据",
+                    "type": "string"
+                },
+                "funCode": {
+                    "description": "功能码",
+                    "type": "integer"
+                },
+                "topic": {
+                    "description": "主题",
+                    "type": "string"
+                }
+            }
+        },
         "response.LoginRsp": {
             "type": "object",
             "properties": {

+ 41 - 0
docs/swagger.yaml

@@ -34,6 +34,21 @@ definitions:
     - funCodeReq
     - isEncrypted
     type: object
+  request.MQNtfReq:
+    properties:
+      data:
+        description: 数据
+        type: string
+      funCode:
+        description: 功能码
+        type: integer
+      topic:
+        description: 主题
+        type: string
+    required:
+    - data
+    - funCode
+    type: object
   response.LoginRsp:
     properties:
       expiresAt:
@@ -133,6 +148,32 @@ paths:
       summary: 总线业务
       tags:
       - 总线业务
+  /MQ/SendNtfToMQ:
+    post:
+      consumes:
+      - application/json
+      parameters:
+      - description: 入参
+        in: body
+        name: data
+        required: true
+        schema:
+          $ref: '#/definitions/request.MQNtfReq'
+      produces:
+      - application/json
+      responses:
+        "200":
+          description: 出参
+          schema:
+            allOf:
+            - $ref: '#/definitions/response.Response'
+            - properties:
+                msg:
+                  type: string
+              type: object
+      summary: 总线通知
+      tags:
+      - 总线业务
 securityDefinitions:
   ApiKeyAuth:
     in: header

+ 2 - 0
global/funcode.go

@@ -56,6 +56,8 @@ var (
 
 	SpotPresaleListingOrderReq = 1441847 // 铁合金现货预售挂牌接口请求(1441847)
 	SpotPresaleListingOrderRsp = 1441848 // 铁合金现货预售挂牌接口应答(1441848)
+
+	THJPurchaseTradeNtf = 1441865 // 铁合金成交通知
 )
 
 // 通过请求功能码获取对应主题的方法

+ 2 - 0
global/topic.go

@@ -24,4 +24,6 @@ var (
 
 	TOPIC_REQ_WAREHOUSE_PRESALE_THJ = "warehouse_presale_thj_req" // 掌上铁合金现货预售请求
 	TOPIC_RSP_WAREHOUSE_PRESALE_THJ = "warehouse_presale_thj_rsp" // 掌上铁合金现货预售响应
+
+	TOPIC_MANAGER_THJ_NTF = "manager_thj_ntf" // 铁合金服务通知(管理端专用)
 )

+ 4 - 0
initialize/rabbitmq.go

@@ -437,6 +437,10 @@ func RabbitMQSubscribeTopic() (err error) {
 		global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
 		return
 	}
+	if err = global.SubscribeTopic(global.TOPIC_MANAGER_THJ_NTF); err != nil {
+		global.M2A_LOG.Error("rabbitmq subscribe topic failed, err:", zap.Error(err))
+		return
+	}
 
 	global.M2A_LOG.Info("rabbitmq subscribe topic successed.")
 

+ 4 - 0
initialize/router.go

@@ -34,6 +34,10 @@ func Routers() *gin.Engine {
 	// 非鉴权组
 	publicGroup := Router.Group("api")
 	{
+		// 开发测试模式
+		if global.M2A_CONFIG.System.Env == "develop" {
+			router.InitMQPublicRouter(publicGroup)
+		}
 		router.InitAccountPublicRouter(publicGroup)
 	}
 

+ 64 - 0
model/mq/request/ntf.go

@@ -0,0 +1,64 @@
+package request
+
+import (
+	"errors"
+	"mtp20access/global"
+	"mtp20access/res/pb"
+	"regexp"
+	"strings"
+
+	"github.com/golang/protobuf/jsonpb"
+	"github.com/golang/protobuf/proto"
+	"go.uber.org/zap"
+)
+
+type MQNtfReq struct {
+	Topic   string `json:"topic"`                      // 主题
+	FunCode uint32 `json:"funCode" binding:"required"` // 功能码
+	Data    string `json:"data" binding:"required"`    // 数据
+}
+
+// GetProtoBytes 将客户端请求的数据转换成总线使用的Protobuf
+func (r *MQNtfReq) GetProtoBytes() (bytes *[]byte, err error) {
+	// JSON -> Protobuf
+	switch int(r.FunCode) {
+	case global.THJPurchaseTradeNtf:
+		m := pb.THJPurchaseTradeNtf{}
+		if err = r.reflect(r.Data, &m); err != nil {
+			return
+		}
+		if m.Header != nil {
+			*m.Header.RequestID = 0
+		} else {
+			err = errors.New("请求信息序列化失败")
+			return
+		}
+		if b, e := proto.Marshal(&m); e != nil {
+			global.M2A_LOG.Error(e.Error(), zap.Error(e))
+			err = errors.New("请求信息序列化失败")
+			return
+		} else {
+			bytes = &b
+		}
+	}
+
+	return
+}
+
+func (r *MQNtfReq) reflect(data string, m proto.Message) (err error) {
+	// 由于JS对Long类型支持不佳,故让小程序端在使用单号时以字符串发过来,服务这边会自动把19位数字的字符串转化为数字
+	reg, _ := regexp.Compile(`"[0-9]{19}"`)
+	all := reg.FindAll([]byte(data), -1)
+	for _, i := range all {
+		r := strings.Replace(string(i), "\"", "", -1)
+		data = strings.Replace(data, string(i), r, -1)
+	}
+
+	if e := jsonpb.UnmarshalString(data, m); e != nil {
+		global.M2A_LOG.Error(e.Error(), zap.Error(e))
+		err = errors.New("业务数据装箱失败")
+		return
+	}
+
+	return
+}

+ 85 - 8
res/pb/mtp2.pb.go

@@ -6545,6 +6545,62 @@ func (x *SpotPresaleListingOrderRsp) GetClientSerialNo() string {
 	return ""
 }
 
+// 铁合金成交通知
+type THJPurchaseTradeNtf struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Header          *MessageHead `protobuf:"bytes,1,opt,name=Header" json:"Header,omitempty"`                    // 消息头
+	WRTradeDetailID *uint64      `protobuf:"varint,2,opt,name=WRTradeDetailID" json:"WRTradeDetailID,omitempty"` // uint64 预售成交明细ID
+}
+
+func (x *THJPurchaseTradeNtf) Reset() {
+	*x = THJPurchaseTradeNtf{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_mtp2_proto_msgTypes[57]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *THJPurchaseTradeNtf) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*THJPurchaseTradeNtf) ProtoMessage() {}
+
+func (x *THJPurchaseTradeNtf) ProtoReflect() protoreflect.Message {
+	mi := &file_mtp2_proto_msgTypes[57]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use THJPurchaseTradeNtf.ProtoReflect.Descriptor instead.
+func (*THJPurchaseTradeNtf) Descriptor() ([]byte, []int) {
+	return file_mtp2_proto_rawDescGZIP(), []int{57}
+}
+
+func (x *THJPurchaseTradeNtf) GetHeader() *MessageHead {
+	if x != nil {
+		return x.Header
+	}
+	return nil
+}
+
+func (x *THJPurchaseTradeNtf) GetWRTradeDetailID() uint64 {
+	if x != nil && x.WRTradeDetailID != nil {
+		return *x.WRTradeDetailID
+	}
+	return 0
+}
+
 var File_mtp2_proto protoreflect.FileDescriptor
 
 var file_mtp2_proto_rawDesc = []byte{
@@ -7750,7 +7806,14 @@ var file_mtp2_proto_rawDesc = []byte{
 	0x04, 0x52, 0x0e, 0x57, 0x52, 0x54, 0x72, 0x61, 0x64, 0x65, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x49,
 	0x44, 0x12, 0x26, 0x0a, 0x0e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x65, 0x72, 0x69, 0x61,
 	0x6c, 0x4e, 0x6f, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x43, 0x6c, 0x69, 0x65, 0x6e,
-	0x74, 0x53, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x4e, 0x6f,
+	0x74, 0x53, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x4e, 0x6f, 0x22, 0x68, 0x0a, 0x13, 0x54, 0x48, 0x4a,
+	0x50, 0x75, 0x72, 0x63, 0x68, 0x61, 0x73, 0x65, 0x54, 0x72, 0x61, 0x64, 0x65, 0x4e, 0x74, 0x66,
+	0x12, 0x27, 0x0a, 0x06, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b,
+	0x32, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x65, 0x61,
+	0x64, 0x52, 0x06, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x28, 0x0a, 0x0f, 0x57, 0x52, 0x54,
+	0x72, 0x61, 0x64, 0x65, 0x44, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01,
+	0x28, 0x04, 0x52, 0x0f, 0x57, 0x52, 0x54, 0x72, 0x61, 0x64, 0x65, 0x44, 0x65, 0x74, 0x61, 0x69,
+	0x6c, 0x49, 0x44,
 }
 
 var (
@@ -7765,7 +7828,7 @@ func file_mtp2_proto_rawDescGZIP() []byte {
 	return file_mtp2_proto_rawDescData
 }
 
-var file_mtp2_proto_msgTypes = make([]protoimpl.MessageInfo, 57)
+var file_mtp2_proto_msgTypes = make([]protoimpl.MessageInfo, 58)
 var file_mtp2_proto_goTypes = []interface{}{
 	(*MessageHead)(nil),                         // 0: pb.MessageHead
 	(*ModifyPwdReq)(nil),                        // 1: pb.ModifyPwdReq
@@ -7824,6 +7887,7 @@ var file_mtp2_proto_goTypes = []interface{}{
 	(*DepositConfigInfo)(nil),                   // 54: pb.DepositConfigInfo
 	(*SpotPresaleListingOrderReq)(nil),          // 55: pb.SpotPresaleListingOrderReq
 	(*SpotPresaleListingOrderRsp)(nil),          // 56: pb.SpotPresaleListingOrderRsp
+	(*THJPurchaseTradeNtf)(nil),                 // 57: pb.THJPurchaseTradeNtf
 }
 var file_mtp2_proto_depIdxs = []int32{
 	0,  // 0: pb.ModifyPwdReq.Header:type_name -> pb.MessageHead
@@ -7883,11 +7947,12 @@ var file_mtp2_proto_depIdxs = []int32{
 	0,  // 54: pb.SpotPresaleListingOrderReq.Header:type_name -> pb.MessageHead
 	54, // 55: pb.SpotPresaleListingOrderReq.DepositConfigs:type_name -> pb.DepositConfigInfo
 	0,  // 56: pb.SpotPresaleListingOrderRsp.Header:type_name -> pb.MessageHead
-	57, // [57:57] is the sub-list for method output_type
-	57, // [57:57] is the sub-list for method input_type
-	57, // [57:57] is the sub-list for extension type_name
-	57, // [57:57] is the sub-list for extension extendee
-	0,  // [0:57] is the sub-list for field type_name
+	0,  // 57: pb.THJPurchaseTradeNtf.Header:type_name -> pb.MessageHead
+	58, // [58:58] is the sub-list for method output_type
+	58, // [58:58] is the sub-list for method input_type
+	58, // [58:58] is the sub-list for extension type_name
+	58, // [58:58] is the sub-list for extension extendee
+	0,  // [0:58] is the sub-list for field type_name
 }
 
 func init() { file_mtp2_proto_init() }
@@ -8580,6 +8645,18 @@ func file_mtp2_proto_init() {
 				return nil
 			}
 		}
+		file_mtp2_proto_msgTypes[57].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*THJPurchaseTradeNtf); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
 	}
 	type x struct{}
 	out := protoimpl.TypeBuilder{
@@ -8587,7 +8664,7 @@ func file_mtp2_proto_init() {
 			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
 			RawDescriptor: file_mtp2_proto_rawDesc,
 			NumEnums:      0,
-			NumMessages:   57,
+			NumMessages:   58,
 			NumExtensions: 0,
 			NumServices:   0,
 		},

+ 6 - 0
res/pb/mtp2.proto

@@ -757,4 +757,10 @@ message SpotPresaleListingOrderRsp {
 		optional uint64 PresaleApplyID = 4; // 预售申请ID
 		optional uint64 WRTradeOrderID = 5; // 委托单ID
 		optional string ClientSerialNo = 6; // 客户端流水号
+}
+
+// 铁合金成交通知
+message THJPurchaseTradeNtf {
+	optional MessageHead Header = 1; // 消息头
+		optional uint64 WRTradeDetailID = 2; // uint64 预售成交明细ID
 }

+ 7 - 0
router/mq.go

@@ -6,6 +6,13 @@ import (
 	"github.com/gin-gonic/gin"
 )
 
+func InitMQPublicRouter(r *gin.RouterGroup) {
+	mqR := r.Group("MQ").Use()
+	{
+		mqR.POST("SendNtfToMQ", mq.SendNtfToMQ)
+	}
+}
+
 func InitMQPrivateRouter(r *gin.RouterGroup) {
 	mqR := r.Group("MQ").Use()
 	{

+ 32 - 0
service/mq/ntf.go

@@ -0,0 +1,32 @@
+package mq
+
+import (
+	"errors"
+	"mtp20access/global"
+	"mtp20access/model/mq/request"
+
+	"github.com/gin-gonic/gin"
+)
+
+// SendNtfMQ 向总线发送通知类信息
+func SendNtfMQ(c *gin.Context, req *request.MQNtfReq) (err error) {
+	// 将请求的数据转换成总线使用的Protobuf
+	bytes, err := req.GetProtoBytes()
+	if err != nil {
+		return
+	}
+	if bytes == nil {
+		err = errors.New("请求信息序列化失败")
+		return
+	}
+
+	// 向总线发送业务信息
+	packet := &global.MQPacket{
+		FunCode:   req.FunCode,
+		SessionId: 0,
+		Data:      bytes,
+	}
+	go global.Publish(req.Topic, packet)
+
+	return
+}