import { checkTokenAction, stopCheckToken } from '@/services/bus/token'; import { NeedClearSourceDataType, NoClearSourceDataType } from '@/services/dataCenter/interface'; import { funCode } from '@/services/funcode/index'; import { serviceURL } from '@/services/request/index'; import timerUtil from '@/utils/timer/timerUtil'; import { Callback, MTP2WebSocket, ReconnectChangeState } from '@/utils/websocket/index'; import { Package40, Package50 } from '@/utils/websocket/package'; import eventBus from '../utils/eventBus'; import DataCenter from './dataCenter/index'; import { noticeParseRsp } from './socket/protobuf/buildReq'; /** 行情和交易长链 */ interface LongLink { /** 行情长链 */ quote: MTP2WebSocket; /** 交易长链 */ trade: MTP2WebSocket; } type LontLinkName = 'quote' | 'trade'; /** 全局数据连接生命周期控制类 */ export default new (class LifeCycleCtr { /** 数据中心 */ private dataCenter = new DataCenter(); /** 长链接管理 */ public Socket: LongLink = { quote: new MTP2WebSocket(0), trade: new MTP2WebSocket(1), }; constructor() { } /** 数据中心初始化 */ initDataCenter(): void { } /** * 从数据中心获取普通数据 * @param key needClearSourceDataType | noClearSourceDataType */ get(key: keyof (NeedClearSourceDataType & NoClearSourceDataType)) { return this.dataCenter.getOneOf(key).value; } /** * 从数据中心获取数据响应式数据 * @param key needClearSourceDataType | noClearSourceDataType */ getRef(key: keyof (NeedClearSourceDataType & NoClearSourceDataType)) { return this.dataCenter.getOneOf(key); } /** * 从数据中心获取所有数据 * @param key needClearSourceDataType | noClearSourceDataType */ reset() { this.dataCenter.reset(); } /** * 数据中心存入数据 * @param key needClearSourceDataType | noClearSourceDataType * @param value any */ set(key: keyof (NeedClearSourceDataType & NoClearSourceDataType), value: any) { return this.dataCenter.setOneOf(key, value); } /** 主动连接行情 */ connectQuote(): Promise { if (this.Socket['quote'].connState === 0) { return this.connectServer('quote', serviceURL.quoteUrl).then(() => { // 开始发送心跳 this.startBeatTime('quote'); return 'ok'; }); } else { return Promise.resolve('已连接quote'); } } /** 主动连接交易 */ connectTrading(): Promise { return this.connectServer('trade', serviceURL.tradeUrl).then(() => { // 开始发送心跳 this.startBeatTime('trade'); return 'ok'; }); // if (this.Socket['trade'].connState === 0) { // return this.connectServer('trade', serviceURL.tradeUrl); // } else { // return Promise.resolve('已连接trading'); // } } /** 连接服务器 */ connectServer(socket: LontLinkName, ws: string): Promise { return new Promise((resolve, reject) => { this.Socket[socket].conn(ws); this.Socket[socket].onConnected = () => { console.info(`${socket},${ws},建立链接成功!`); resolve('ok'); }; this.Socket[socket].onClosed = () => { // 清空心跳定时器 this.Socket[socket].stopBeatTimer(socket); console.error(`${socket},${ws},断开链接了!`); }; this.Socket[socket].onError = (obj: MTP2WebSocket | MTP2WebSocket, err: Error) => { console.error(`${socket},${ws},发送错误:${err.message}`); reject(err); }; if (socket === 'quote') this.quoteRelevant(this.Socket[socket]); if (socket === 'trade') this.tradingRelevant(this.Socket[socket]); }); } /** 发送心跳 */ startBeatTime(socket: LontLinkName): void { this.Socket[socket].startBeatTime(socket); this.Socket[socket].canReconnect = true; } /** 向交易服务器发送请求 */ sendTradingServer(p: Package50, rsp?: number, callback?: Callback): void { this.Socket['trade'].send(p, rsp, callback); } /** 向行情服务器发送请求 */ sendQuoteServer(p: Package40, rsp?: number, callback?: Callback): void { this.Socket['quote'].send(p, rsp, callback); } /** 主动关闭行情服务 */ closeQuote() { this.Socket['quote'].close(); } /** 主动关闭长链接 */ closeServer(): void { this.Socket['trade'].close(); this.dataCenter.reset(); // sessionStorage.clear(); // localStorageUtil.removeItem('loginData'); timerUtil.clearAll(); } /** 处理行情相关信息 */ quoteRelevant(socket: MTP2WebSocket) { // 推送信息 socket.onReceivePush = (obj, msg: Package40) => { eventBus.$emit('quoteReceiveNtf', msg.content); }; // 重连成功回调 socket.onReconnectChangeState = (obj: MTP2WebSocket, state) => { switch (state) { /** 开始重连 */ case ReconnectChangeState.BeginReconnect: // 停止 token 校验 // stopCheckToken() console.log(obj.host, '开始尝试重连服务端'); break; /** 尝试重连失败,将在下一个周期后再次尝试重连 */ case ReconnectChangeState.FailAndWaitPeriod: console.log(obj.host, '尝试重连失败,将在下一个周期后再次尝试重连'); break; /** 重连成功,将进行业务操作 */ case ReconnectChangeState.ReconnectSuccessed: console.log(obj.host, '重连成功,可开始进行业务操作'); // 通知上层 重新订阅商品 eventBus.$emit('quoteReconnectSucess', true); break; /** 重连成功后业务操作失败并将再次重试,由业务模块发起 */ case ReconnectChangeState.LoginFail: break; /** 重连成功后业务操作成功(包括交易服务的账户登录状态更新或行情服务的行情订阅等 */ case ReconnectChangeState.Logined: // 重连成功继续校验token // checkTokenAction(); console.log(obj.host, '重连成功后业务操作成功(包括交易服务的账户登录状态更新或行情服务的行情订阅等'); // 行情链路重连成功后,需要通知上层 重新订阅商品 break; } }; } /** 处理交易相关信息 */ tradingRelevant(socket: MTP2WebSocket) { // eslint-disable-next-line @typescript-eslint/no-this-alias const _this = this // 推送信息 socket.onReceivePush = (_self, msg) => { switch (msg.funCode) { case funCode.CustOfflineNtf: //接收到账户离线通知 eventBus.$emit('custOfflineNtf', noticeParseRsp(msg, 'CustOfflineNtf')); break; case funCode.LogoutRsp: // 接收到用户登出应答 eventBus.$emit('userLogout', noticeParseRsp(msg, 'LogoutRsp')); break; case funCode.PosChangedNtf: //接收到头寸变化通知! eventBus.$emit('posChangedNtf', noticeParseRsp(msg, 'PosChangedNtf')); break; case funCode.MoneyChangedNtf: // 接收到资金变化通知 eventBus.$emit('moneyChangedNtf', noticeParseRsp(msg, 'MoneyChangedNtf')); break; case funCode.OrderCanceledNtf: // 接收到委托单撤单通知 eventBus.$emit('orderCanceledNtf', noticeParseRsp(msg, 'OrderCanceledNtf')); break; default: msg.funCode !== 0 && console.warn({ msg: '接收到通知:' + msg.funCode }); break; } }; // 重连成功回调 socket.onReconnectChangeState = (obj: MTP2WebSocket, state) => { switch (state) { /** 开始重连 */ case ReconnectChangeState.BeginReconnect: // 停止 token 校验 stopCheckToken() console.log(obj.host, '开始尝试重连服务端'); break; /** 尝试重连失败,将在下一个周期后再次尝试重连 */ case ReconnectChangeState.FailAndWaitPeriod: console.log(obj.host, '尝试重连失败,将在下一个周期后再次尝试重连'); break; /** 重连成功,将进行业务操作 */ case ReconnectChangeState.ReconnectSuccessed: //重新发起心跳 _this.startBeatTime('trade'); // 重新启动定时 token 校验 checkTokenAction() console.log(obj.host, '重连成功,将进行业务操作'); break; /** 重连成功后业务操作失败并将再次重试,由业务模块发起 */ case ReconnectChangeState.LoginFail: break; /** 重连成功后业务操作成功(包括交易服务的账户登录状态更新或行情服务的行情订阅等 */ case ReconnectChangeState.Logined: break; } }; } })();