import { Package40, Package50 } from './package'; import timerUtil from '@/utils/timer/timerUtil'; /** 重连状态 */ export enum ReconnectChangeState { /** 开始重连 */ BeginReconnect, /** 尝试重连失败,将在下一个周期后再次尝试重连 */ FailAndWaitPeriod, /** 重连成功,将进行业务操作 */ ReconnectSuccessed, /** 重连成功后业务操作失败并将再次重试,由业务模块发起 */ LoginFail, /** 重连成功后业务操作成功(包括交易服务的账户登录状态更新或行情服务的行情订阅等 */ Logined, } export type beatTimer = 'quote' | 'trade'; /** * 回调类 */ export interface Callback { /** 成功回调 */ onSuccess?(res: T): void; /** 失败回调 */ onFail?(err: Error): void; } /** 信息发送异步任务类 */ class AsyncTask { /** 5.0报文直接为流水号;4.0报文为流水号+"_"+大类号(由于服务端行情推送会使用流水号自增) */ key: string; /** 返回功能码,主要用于过滤账户服务的错传包 */ rspFunCode?: number; /** 超时标识 */ timeOut: number; /** 回调 */ callback: Callback; constructor(key: string, timeOut: number, callback: Callback, rspFunCode?: number) { this.key = key; this.rspFunCode = rspFunCode; this.timeOut = timeOut; this.callback = callback; } } /** MTP2.0 长链通信类 */ export class MTP2WebSocket { /** 报文类型,0 - 4.0行情报文,1 - 5.0交易报文 */ packageType = 0; /** 服务端地址 */ host = ''; /** 当前连接状态,0 - 未连接,1 - 连接中,2 - 已连接 */ connState = 0; /** 连接成功的回调 */ onConnected?: (obj: MTP2WebSocket) => void; /** 连接断开的回调 */ onClosed?: (obj: MTP2WebSocket) => void; /** 连接发生错误的回调 */ onError?: (obj: MTP2WebSocket, err: Error) => void; /** 接收到推送类报文的回调 */ onReceivePush?: (obj: MTP2WebSocket, data: T) => void; /** 重连状态发生改变时的回调 */ onReconnectChangeState?: (obj: MTP2WebSocket, state: ReconnectChangeState) => void; /** WebSocket 对象 */ private socket?: WebSocket; /** 当前流水号 */ private currentSerial = 1; /** 默认超时时长(秒) */ private timeOutInterval = 30 * 1000; /** 信息发送异步建值对 */ private asyncTaskMap: Map = new Map(); /** 段包 */ private cache?: number[]; /** 当前处理的段包报文的原长度 */ private cachePackageLength = 0; /** 心跳间隔时间,默认为10秒 */ private beatInterval = 10 * 1000; /** 心跳专用定时器 */ // private beatTimer = 'beatTimer'; /** 最后一次收到心跳的时间 */ private lastRecevieBeatTime?: Date; /** 心跳回复超时时间,默认为30秒 */ private beatTimeoutInterval = 30 * 1000; /** 当前是否正在进行断网重连 */ private isReconnecting = false; /** 当前是否可进行断网重连;业务与网络分离,而断网重连需要在登录账号后(或行情订阅成功后)才可进行 */ public canReconnect = false; /** 断网重连失败后重试的TimeOut标识 */ private reconnectTimer = 0; /** 外部是否要求停止断网重连操作标志 */ private isBrokenReconnecting = false; /** * 构造函数 * @param packageType 报文类型,0 - 4.0行情报文,1 - 5.0交易报文 */ constructor(packageType: number) { this.packageType = packageType; } /** * 连接服务端 * @param host 服务端地址 */ conn(host: string) { return new Promise((resolve, reject) => { if (this.socket) { this.socket.close(); this.socket = undefined; } this.connState = 1; this.socket = new WebSocket(host); this.socket.onopen = () => { // 连接成功 this.connState = 2; this.host = host; // this.canReconnect = true; resolve('ok'); if (this.onConnected) this.onConnected(this); }; this.socket.onclose = () => { // 连接断开 this.connState = 0; if (this.onClosed) this.onClosed(this); }; this.socket.onerror = (ev) => { // 发生错误 this.connState = 0; const err = new Error('连接发生错误'); reject(err); if (this.onError) this.onError(this, err); // 判断是否可进行断网重连 this.canReconnectThenError(); }; this.socket.onmessage = (message) => { // 接收数据 new Response(message.data).arrayBuffer().then((res) => { this.disposeReceiveDatas(new Uint8Array(res)); }); }; }); } /** * 断开连接 */ close() { this.connState = 0; if (this.socket) this.socket.close(); } /** * 启动心跳请求包发送Timer * 行情与交易是不同的链路,需要启动不同的定时器发送心跳 */ startBeatTime(timer: beatTimer) { this.stopBeatTimer(timer); this.lastRecevieBeatTime = new Date(); this.sendBeat(timer); } /** * 停止心跳请求包发送Timer */ stopBeatTimer(timer: beatTimer) { timerUtil.clearInterval(timer); } /** * 发送心跳 */ private sendBeat(timer: beatTimer) { timerUtil.setInterval( () => { console.log('心跳回复', this.packageType, ':', this.connState); // 当前没有连接 if (this.connState !== 2) { return; } // 发送心跳 if (this.packageType === 0) { // 4.0 this.send40(new Package40([0x12, undefined], undefined), -1, undefined); } else if (this.packageType === 1) { // 5.0 this.send50(new Package50([0, undefined]), undefined); } // 如果已经超过或心跳超时时长没有收到心跳回复,则认为网络已经异常,进行断网重连 // WARNING: - 注意在正常断网时停止心跳逻辑 const date = new Date(); if (date.getTime() - this.lastRecevieBeatTime!.getTime() > this.beatTimeoutInterval) { if (this.onError) this.onError(this, new Error('心跳超时')); // 判断是否可进行断网重连 this.canReconnectThenError(); } }, this.beatInterval, timer ); } /** * 发送报文 * @param p 目标报文 * @param rsp 回复号。P为Package40对象时为回复大类号,同时必传;P为Package50对象为回复功能码,可不传。 * @param callback 回调 */ send(p: T, rsp?: number, callback?: Callback) { if (this.connState != 2) { if (callback && callback.onFail) callback.onFail(new Error('连接状态错误')); return; } if (p instanceof Package40) { // 4.0报文 if (!rsp && callback && callback.onFail) { callback.onFail(new Error('缺少rsp')); return; } this.send40(p, rsp!, callback); } else { // 5.0报文 this.send50(p as Package50, rsp, callback); } } /** * 发送4.0报文 * @param p 待发送报文 * @param rspMainClassNumber 回复大类号 * @param callback 回调 */ private send40(p: Package40, rspMainClassNumber: number, callback?: Callback) { // 设置流水号 p.serialNumber = this.currentSerial; this.currentSerial++; // 判断是否需要异步回调 if (callback) { const key = p.serialNumber + '_' + rspMainClassNumber; const asyncTask = new AsyncTask( key, window.setTimeout(() => this.asyncTimeOut(key), this.timeOutInterval), callback, rspMainClassNumber ); this.asyncTaskMap.set(key, asyncTask); } // 发送信息 const data = p.data(); this.socket?.send(data); } /** * 发送5.0报文 * @param p 待发送报文 * @param rspFunCode 回复功能码 * @param callback 回调 */ private send50(p: Package50, rspFunCode?: number, callback?: Callback) { // 设置流水号 p.serialNumber = this.currentSerial; this.currentSerial++; // 判断是否需要异步回调 if (callback) { const key = p.serialNumber.toString(); const asyncTask = new AsyncTask( key, window.setTimeout(() => this.asyncTimeOut(key), this.timeOutInterval), callback, rspFunCode ); this.asyncTaskMap.set(key, asyncTask); } // 发送信息 this.socket?.send(p.data()); } /** * 异步任务超时 * @param key key */ private asyncTimeOut(key: string) { // 获取对应异步任务对象 const asyncTask = this.asyncTaskMap.get(key); if (asyncTask) { if (asyncTask.callback && asyncTask.callback.onFail) asyncTask.callback.onFail(new Error('业务超时')); } this.asyncTaskMap.delete(key); } /** * 处理接收数据 * @param bytes 接收的数据 */ private disposeReceiveDatas(bytes: Uint8Array) { for (let i = 0; i < bytes.length; i++) { const byte = bytes[i]; if (!this.cache) { // 新报文 if (byte !== 0xff) { // 接收到首字节不是0xFF的错误数据 return; } this.cache = []; this.cache[this.cache.length] = byte; } else { // 追加数据 this.cache[this.cache.length] = byte; // 取报文长度 if (this.cache.length === 5) { if (this.packageType === 0) { // 4.0报文 this.cachePackageLength = new DataView(new Uint8Array(new Uint8Array(this.cache).subarray(1, 5)).buffer).getUint32(0, false); if (this.cachePackageLength > 65535) { // 接收到长度超过65535的行情包 this.cache = undefined; this.cachePackageLength = 0; return; } } else { // 5.0报文 const sub = new Uint8Array(this.cache).subarray(1, 5); const dataView = new DataView(new Uint8Array(sub).buffer); // 注意这里要new一个新的Uint8Array,直接buffer取到的还是原数组的buffer this.cachePackageLength = dataView.getUint32(0, true); } } // 判断是否已经到包尾 if (this.cache.length === this.cachePackageLength) { if (byte !== 0x0) { // 接收到尾字节不是0x0的错误数据包 this.cache = undefined; this.cachePackageLength = 0; return; } if (this.packageType === 0) { // 4.0报文 this.disposePackage40(new Uint8Array(this.cache)); } else { // 5.0报文 this.disposePackage50(new Uint8Array(this.cache)); } this.cache = undefined; this.cachePackageLength = 0; } } } } /** * 处理完整的4.0报文 * @param bytes 4.0报文字段流 */ private disposePackage40(bytes: Uint8Array) { const p = new Package40(undefined, bytes); if (p.packageLength === 0) { // 报文装箱失败 return; } if (p.mainClassNumber === 0x12 || p.mainClassNumber === 0x41 || p.mainClassNumber === 0x42) { // 推送类报文, 0x12 - 心跳, 0x41 - 实时行情推送, 0x42 - 控制信号 if (this.onReceivePush) this.onReceivePush(this, p); // 收到心跳 if (p.mainClassNumber === 0x12) { this.lastRecevieBeatTime = new Date(); console.log(`packageType=${this.packageType},${this.host},收到心跳回复!`); } } else { // 非推送类报文 const key = p.serialNumber + '_' + p.mainClassNumber; const asyncTask = this.asyncTaskMap.get(key); if (asyncTask) { clearTimeout(asyncTask.timeOut); if (asyncTask.callback && asyncTask.callback.onSuccess) asyncTask.callback.onSuccess(p); this.asyncTaskMap.delete(key); } } } /** * 处理完整的5.0报文 * @param bytes 5.0报文字段流 */ private disposePackage50(bytes: Uint8Array) { const p = new Package50(undefined, bytes); if (p.packageLength === 0) { // 报文装箱失败 return; } if (p.funCode === 0 || p.serialNumber === 0) { // 推送类报文 if (this.onReceivePush) this.onReceivePush(this, p); // 收到心跳 if (p.funCode === 0) { console.log(`packageType=${this.packageType},${this.host},收到心跳回复!`); this.lastRecevieBeatTime = new Date(); } } else { // 非推送类报文 const key = p.serialNumber.toString(); const asyncTask = this.asyncTaskMap.get(key); if (asyncTask) { // 判断是否需要检验回复功能码 if (asyncTask.rspFunCode && asyncTask.rspFunCode !== p.funCode) { return; } clearTimeout(asyncTask.timeOut); if (asyncTask.callback && asyncTask.callback.onSuccess) asyncTask.callback.onSuccess(p); this.asyncTaskMap.delete(key); } } } /** * 判断是否可进行断网重连 */ private canReconnectThenError() { // 判断是否应该启动断网重连机制 if (this.isReconnecting || !this.canReconnect || this.isBrokenReconnecting) return; // 回调当前所有发送信息错误块 this.callAllAsyncTaskOnReconnecting(); // 开始断网重连机制 this.isReconnecting = true; this.reconnect(); } /** * 断网重连方法,在重连尝试失败时会递归调用自己 */ private reconnect() { // 判断是否已由外部主动断开 if (this.isBrokenReconnecting) { this.isReconnecting = false; return; } // 开始尝试重连服务端 if (this.onReconnectChangeState) this.onReconnectChangeState!(this, ReconnectChangeState.BeginReconnect); this.conn(this.host) .then((res) => { // 连接成功 this.isReconnecting = false; if (this.onReconnectChangeState) this.onReconnectChangeState!(this, ReconnectChangeState.ReconnectSuccessed); }) .catch((err) => { // 连接失败 if (this.onReconnectChangeState) this.onReconnectChangeState!(this, ReconnectChangeState.FailAndWaitPeriod); // 5秒后尝试重连 if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); } this.reconnectTimer = window.setTimeout(() => { this.reconnect(); }, 5000); }); } /** * 在断网重连时调用所有的错误回调 */ private callAllAsyncTaskOnReconnecting() { if (this.asyncTaskMap && this.asyncTaskMap.size > 0) { for (const key in this.asyncTaskMap) { const asyncTask = this.asyncTaskMap.get(key); if (asyncTask && asyncTask.callback && asyncTask.callback.onFail) { asyncTask.callback.onFail(new Error('发生断网重连')); clearTimeout(asyncTask.timeOut); } } this.asyncTaskMap.clear(); } } }