package global import ( "bytes" "encoding/json" "fmt" rsp "mtp20access/model/mq/response" "sync" "github.com/mitchellh/mapstructure" ) type LoginRedis struct { LoginID string `json:"loginId" redis:"loginId"` // 登陆账号 UserID string `json:"userId" redis:"userId"` // 用户ID SessionID string `json:"sessionId" redis:"sessionId"` // 终端sid Token string `json:"token" redis:"token"` // 令牌 Group string `json:"group" redis:"group"` // 终端分组 Addr string `json:"addr" redis:"addr"` // 客户端地址信息 // FIXME: 由于本服务改用短连,所以每次提交请交请求可能会不一样,后期可判断是否在中间件中进行拦截 } // FromMap Map to Struct func (r *LoginRedis) FromMap(val map[string]interface{}) error { return mapstructure.Decode(val, r) } // ToMap Struct to Map func (r *LoginRedis) ToMap() (val map[string]interface{}, err error) { if marshalContent, err := json.Marshal(r); err != nil { return nil, err } else { d := json.NewDecoder(bytes.NewReader(marshalContent)) d.UseNumber() // 设置将float64转为一个number if err := d.Decode(&val); err != nil { fmt.Println(err) } else { for k, v := range val { val[k] = v } } } return } type Client struct { LoginRedis mtx sync.RWMutex curSerialNumber uint32 // 当前业务流水号 asyncTasks map[string]*AsyncTask // key:SessionId_FuncodeRsp_SerialNumber } // GetSerialNumber 获取可用流水号 func (r *Client) GetSerialNumber() uint32 { r.mtx.Lock() defer func() { r.mtx.Unlock() }() r.curSerialNumber += 1 return r.curSerialNumber } // GetAsyncTask 获取目标异步任务 // key:SessionId_FuncodeRsp func (r *Client) GetAsyncTask(key string) (asyncTask *AsyncTask) { r.mtx.RLock() defer func() { r.mtx.RUnlock() }() asyncTask, ok := r.asyncTasks[key] if ok { return asyncTask } else { return nil } } // SetAsyncTask 设置异步任务 func (r *Client) SetAsyncTask(asyncTask *AsyncTask, key string) { r.mtx.Lock() defer func() { r.mtx.Unlock() }() if r.asyncTasks == nil { r.asyncTasks = make(map[string]*AsyncTask, 0) } delete(r.asyncTasks, key) r.asyncTasks[key] = asyncTask } // DeleteAsyncTask 删除异步任务 func (r *Client) DeleteAsyncTask(key string) { r.mtx.Lock() defer func() { r.mtx.Unlock() }() delete(r.asyncTasks, key) } func (r *Client) GetAllAsyncTask() *map[string]*AsyncTask { return &r.asyncTasks } // MQPacket 与总线交互的数据体 type MQPacket struct { FunCode uint32 // 功能码 SessionId uint32 // 数据包的sid Data *[]byte // 业务数据体 } // AsyncTask 异步任务结构体 type AsyncTask struct { PacketRsp chan MQPacket // 总线数据处理通道 FuncodeRsp uint32 // 回复功能码 SerialNumber uint32 // 通信流水号 Own *Client IsEncrypted bool // 是否加密 Rsp chan rsp.MQBodyRsp // 回调 doClose sync.Once // 仅关闭通道一次 } // Finish 完成 func (r *AsyncTask) Finish() { r.doClose.Do( func() { close(r.Rsp) key := fmt.Sprintf("%v_%v_%v", r.Own.SessionID, r.FuncodeRsp, r.SerialNumber) r.Own.DeleteAsyncTask(key) }) }