| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- //const unsigned char MSG_SUBSCRIPTION_REQ = 0x20;
- /************行情协议格式************************************
- 下标 标识 类型 字节数 内容 说明
- 0 HeadTag byte 1 0xFF 头标记
- 1 Length uint 4 包总长度
- 4 TypeMain byte 1 大类
- 6 TypeSecond []byte 2 小类
- 8 SerialNumber uint 4 通讯报文序号
- 12 Mode byte 1 内容类型,0, 自定义格式,没什么用
- 13 Version byte 1 版本号
- 数据体 业务结构体,自定义格式
- FootTag byte 1 0x00 尾标记
- 数据体前,包头长度=14, 空包长度=15
- 行情订阅数据体格式
- 14 accountid []byte 8 账户id(loginid) bigEnd
- 22 token []byte 64 令牌
- 86 goodscount uint 4 商品数量 bigEnd
- repeated{
- istatus byte 1 订阅状态
- exchangeid byte 1 交易所id
- goodscode []byte 64 商品代码
- }
- *******************************************************/
- package packet
- import (
- "fmt"
- "io"
- "mtp20access/utils"
- "net"
- )
- // MiQuotePacket 行情协议结构体
- type MiQuotePacket struct {
- Length uint32
- BigType byte
- SmallType uint16
- SerialNum uint32
- Mode byte
- Msg []byte
- OriMsg []byte // 原始包数据
- }
- // EnPack 重新打包数据
- func (r *MiQuotePacket) EnPack() []byte {
- r.Length = uint32(len(r.Msg)) + 15 // 空包长度15
- buf := make([]byte, 0) // 缓存
- buf = append(buf, byte(0xFF)) // HeadTag
- buf = append(buf, utils.UintTobytesBigEnd(r.Length)...) // Length
- buf = append(buf, r.BigType) // TypeMain
- buf = append(buf, utils.Uint16TobytesBigEnd(r.SmallType)...) // TypeSecond
- buf = append(buf, utils.UintTobytesBigEnd(r.SerialNum)...) // SerialNumber
- buf = append(buf, byte(0)) // Mode
- buf = append(buf, byte(0)) // version
- buf = append(buf, r.Msg...) // 数据体body
- buf = append(buf, byte(0)) // FootTag
- return buf
- }
- func (r *MiQuotePacket) HeaderLen() uint32 {
- return 14
- }
- // BodyLen 数据体长度, 含FootTag
- func (r *MiQuotePacket) BodyLen() uint32 {
- return r.Length - 14
- }
- // UnPackHead 解包头
- func (r *MiQuotePacket) UnPackHead(buf []byte) error {
- if len(buf) < 14 {
- return fmt.Errorf("行情包头长度错误")
- }
- if buf[0] != 0xFF {
- return fmt.Errorf("行情包头长度错误")
- }
- r.Length = utils.BytesBigEndToUint32(buf[1:5])
- r.BigType = buf[5]
- r.SmallType = utils.BytesBigEndToUint16(buf[6:8])
- r.SerialNum = utils.BytesBigEndToUint32(buf[8:12])
- r.Mode = buf[12]
- return nil
- }
- // SetOriMsg 保存原始数据包
- func (r *MiQuotePacket) SetOriMsg(arg ...[]byte) {
- if r.OriMsg == nil {
- r.OriMsg = make([]byte, 0)
- }
- for i := range arg {
- r.OriMsg = append(r.OriMsg, arg[i]...)
- }
- }
- // ReadMessage 从指定tcp链接读取一个协议包
- // @返回值 []byte 未解包的原始数据包, 如果需获取业务数据内容,
- // 请调用UnPack方法后取成员变量 Data 的内容
- func (r *MiQuotePacket) ReadMessage(conn *net.Conn) ([]byte, error) {
- r.OriMsg = make([]byte, 0)
- if conn == nil {
- return r.OriMsg, fmt.Errorf("当前未连接行情发布服务")
- }
- headerBuf := make([]byte, r.HeaderLen())
- nRead, err := io.ReadFull(*conn, headerBuf)
- if err != nil || nRead != len(headerBuf) {
- return r.OriMsg, fmt.Errorf("读取行情包头错误,可能连接已关闭:%v", err)
- }
- err = r.UnPackHead(headerBuf)
- if err != nil {
- return r.OriMsg, err
- }
- dataBuf := make([]byte, r.BodyLen())
- nRead, err = io.ReadFull(*conn, dataBuf)
- if err != nil || nRead != len(dataBuf) {
- return r.OriMsg, fmt.Errorf("读取行情包数据错误,可能连接已关闭:%v", err)
- }
- r.SetOriMsg(headerBuf, dataBuf)
- return r.OriMsg, nil
- }
- // HeaderInfo 头部信息, 功能号、sid、流水号、长度等
- func (r *MiQuotePacket) HeaderInfo() string {
- return fmt.Sprintf("bigtype[%d] smalltype[%d] serial[%d] iLen:%d",
- r.BigType, r.SmallType, r.SerialNum, r.Length)
- }
|