import { v4 } from 'uuid' import { Package40, Package50 } from '../package'; import { Package, ConnectionState, SendMessage, AsyncTask } from './types'; import moment from 'moment' /** * MTP2.0 长链通信类 */ export class MTP2WebSocket{ /** 报文协议 */ private readonly Package: Package; /** 当前实例标识 */ private uuid = ''; /** 服务端地址 */ private host; /** WebSocket 对象 */ private ws?: WebSocket; /** 心跳定时器 */ private beatTimer = 0; /** 心跳发送时间 */ private beatTimerId = ''; /** 重连定时器 */ private reconnectTimer = 0; /** 超时定时器 */ private timeoutTimer = 0; /** 当前流水号 */ private currentSerial = 1; /** 信息发送异步建值对 */ private asyncTaskMap = new Map>(); /** 连接准备完成 */ private onReady?: Promise>; /** 连接成功的回调 */ onOpen?: (socket: MTP2WebSocket) => void; /** 连接断开的回调 */ onClose?: () => void; /** 连接发生错误的回调 */ onError?: (e: Event) => void; /** 接收到推送类报文的回调 */ onPush?: (data: T) => void; /** 在重连之前回调 */ onBeforeReconnect?: (count: number) => void; /** 在重连成功之后回调 */ onReconnect?: () => void; /** 当前连接状态 */ connState: keyof typeof ConnectionState = 'Unconnected'; /** 重连次数 */ reconnectCount = 0; /** 心跳间隔时长,默认为30秒 */ beatInterval = 30 * 1000; /** 消息超时时长,默认为15秒 */ timeoutInterval = 15 * 1000; constructor(pkg: Package, host = '') { this.Package = pkg; this.host = host; } /** * 连接服务器 */ async connect(host?: string, protocols?: string | string[]): Promise> { if (!this.onReady) { clearTimeout(this.reconnectTimer); this.stopHeartBeat(); this.host = host || this.host; this.connState = 'Connecting'; console.log(this.host, '正在连接'); this.onReady = new Promise((resolve, reject) => { const errMsg = this.host + ' 连接发生错误'; const uuid = v4(); this.uuid = uuid; try { this.ws = new WebSocket(this.host, protocols); // 连接成功 this.ws.onopen = () => { console.log(this.host, '连接成功'); this.connState = 'Connected'; this.startHeartBeat(); this.onOpen && this.onOpen(this); resolve(this); } // 连接断开(CLOSING有可能延迟) this.ws.onclose = () => { // 判断是否当前实例 // 如果连接断开后不等待 onclose 响应,由于 onclose 延迟的原因可能会在创建新的 ws 实例后触发,导致刚创建的实例被断开进入重连机制 if (this.uuid === uuid) { console.warn(this.host, '连接已断开'); this.reset(); this.reconnect(); // 重连失败会不断尝试,直到成功为止 } } // 连接发生错误 this.ws.onerror = (e) => { this.onError && this.onError(e); reject(errMsg); } // 接收数据 this.ws.onmessage = (e) => { // 接收数据 new Response(e.data).arrayBuffer().then((res) => { this.disposeReceiveDatas(new Uint8Array(res)); }) } } catch { reject(errMsg); } }) } return this.onReady } /** * 主动断开连接,断开后不会自动重连 */ close() { clearTimeout(this.reconnectTimer); this.stopHeartBeat(); this.uuid = v4(); // 改变实例标识,取消重连状态 this.reconnectCount = 0; if (this.ws) { this.ws.close(); console.warn(this.host, '主动断开'); } this.reset(); // 不等待 ws.onclose 响应强制重置实例 } /** * 重置实例 */ private reset() { this.ws = undefined; this.onReady = undefined; this.connState = 'Unconnected'; this.onClose && this.onClose(); } /** * 启动心跳请求包发送Timer */ private startHeartBeat() { this.beatTimer = window.setTimeout(() => { this.beatTimerId = moment(new Date()).format('HH:mm:ss'); //console.log(this.host, '发送心跳', this.beatTimerId); // 发送心跳 switch (this.Package) { case Package40: { // 4.0 this.send({ data: { rspCode: -1, payload: new this.Package(0x12) } }) break; } case Package50: { // 5.0 this.send({ data: { rspCode: 0, payload: new this.Package(0) } }) break; } } // 如果已经超过或心跳超时时长没有收到心跳回复,则认为网络已经异常,进行断网重连 // WARNING: - 注意在正常断网时停止心跳逻辑 this.timeoutTimer = window.setTimeout(() => { console.warn(this.host, '心跳超时', this.beatTimerId); this.reconnect(); }, this.timeoutInterval) }, this.beatInterval) } /** * 停止心跳请求包发送Timer */ private stopHeartBeat() { clearTimeout(this.timeoutTimer); clearTimeout(this.beatTimer); } /** * 断网重连方法,在重连尝试失败时会再次重试 */ private reconnect() { this.stopHeartBeat(); if (this.connState !== 'Connecting') { this.reconnectCount++; this.onBeforeReconnect && this.onBeforeReconnect(this.reconnectCount); // 自动计算每次重试的延时,重试次数越多,延时越大 const delay = this.reconnectCount * 5000 console.log(this.host, `${delay / 1000}秒后将进行第${this.reconnectCount}次重连`); this.reconnectTimer = window.setTimeout(() => { this.onReady = undefined; this.connect().then(() => { console.log(this.host, '重连成功,可开始进行业务操作'); this.reconnectCount = 0; this.onReconnect && this.onReconnect(); }).catch(() => { // 重连失败会进入 ws.onclose 再次发起重连 if (this.reconnectCount) { console.warn(this.host, `第${this.reconnectCount}次重连失败`); } }) }, delay); } } /** * 发送报文 * @param data 目标报文 * @param fncode 回复号。P为Package40对象时为回复大类号,同时必传;P为Package50对象为回复功能码,可不传。 * @param success 成功回调 * @param fail 失败回调 */ send(msg: SendMessage) { const { data, success, fail } = msg; const { payload } = data; if (this.onReady) { this.onReady.then(() => { const mapKey = this.currentSerial; payload.serialNumber = mapKey; // 设置流水号 // 判断是否需要异步回调 if (success || fail) { // 保存发送任务,为任务设置一个自动超时定时器 this.asyncTaskMap.set(mapKey, { sendMessage: msg, timeoutId: window.setTimeout(() => this.asyncTimeout(mapKey), this.timeoutInterval), }) } // 发送信息 this.ws?.send(payload.data()); this.currentSerial++; }) } else { fail && fail('服务未连接'); } } /** * 异步任务超时 * @param key key */ private asyncTimeout(key: number) { // 获取对应异步任务对象 const asyncTask = this.asyncTaskMap.get(key); const onFail = asyncTask?.sendMessage.fail; onFail && onFail('业务超时'); this.asyncTaskMap.delete(key); } /** * 处理接收数据 * @param bytes 接收的数据 */ private disposeReceiveDatas(bytes: Uint8Array) { const cache: number[] = []; let cachePackageLength = 0; for (let i = 0; i < bytes.length; i++) { const byte = bytes[i]; cache.push(byte); if (i === 0 && byte !== 0xff) { console.error('接收到首字节不是0xFF'); return; } else { // 取报文长度 if (cache.length === 5) { switch (this.Package) { // 4.0报文 case Package40: { cachePackageLength = new DataView(new Uint8Array(new Uint8Array(cache).subarray(1, 5)).buffer).getUint32(0, false); if (cachePackageLength > 65535) { console.error('接收到长度超过65535的行情包'); return; } break; } // 5.0报文 case Package50: { const sub = new Uint8Array(cache).subarray(1, 5); const dataView = new DataView(new Uint8Array(sub).buffer); // 注意这里要new一个新的Uint8Array,直接buffer取到的还是原数组的buffer cachePackageLength = dataView.getUint32(0, true); break; } } } // 判断是否已经到包尾 if (cache.length === cachePackageLength) { if (byte !== 0x0) { console.error('接收到尾字节不是0x0的错误数据包'); return; } const content = new Uint8Array(cache); const result = new this.Package(content); if (result.packageLength === 0) { console.error('报文装箱失败'); return; } this.disposePackageResult(result); } } } } /** * 处理完整的报文 */ private disposePackageResult(p: T) { const asyncTask = this.asyncTaskMap.get(p.serialNumber); const sendMessage = asyncTask?.sendMessage; // 4.0报文 if (p instanceof Package40) { // 推送类报文, 0x12 - 心跳, 0x41 - 实时行情推送, 0x42 - 控制信号 switch (p.mainClassNumber) { case 0x12: { // 接收到心跳回复 //console.log(this.host, '收到心跳回复', this.beatTimerId); this.stopHeartBeat(); this.startHeartBeat(); break; } case 0x41: case 0x42: { // 推送类报文 this.onPush && this.onPush(p); break; } default: { // 非推送类报文 sendMessage?.success && sendMessage.success(p); } } } // 5.0报文 if (p instanceof Package50) { if (p.funCode === 0) { // 接收到心跳回复 //console.log(this.host, '收到心跳回复', this.beatTimerId); this.stopHeartBeat(); this.startHeartBeat(); } else if (p.serialNumber === 0) { // 推送类报文 this.onPush && this.onPush(p); } else if (sendMessage) { // 非推送类报文 const { data, success } = sendMessage; // 可能会收到多个流水号相同的数据包,需判断是否正确的回复码 if (data.rspCode === p.funCode) { success && success(p); } else { // 跳过当前收到的数据包,继续等待直到asyncTask超时 //console.log('无效的回复码', p.funCode) return } } } // 移除当前发送的任务请求 if (asyncTask) { clearTimeout(asyncTask.timeoutId); this.asyncTaskMap.delete(p.serialNumber); } } }