| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378 |
- import { v4 } from 'uuid'
- import { Package40, Package50 } from '../package';
- import { Package, ConnectionState, SendMessage, AsyncTask } from './types';
- import moment from 'moment'
- /**
- * MTP2.0 长链通信类
- */
- export class MTP2WebSocket<T extends Package40 | Package50>{
- /** 报文协议 */
- private readonly Package: Package<T>;
- /** 当前实例标识 */
- private uuid = '';
- /** 服务端地址 */
- private host;
- /** WebSocket 对象 */
- private ws?: WebSocket;
- /** 心跳定时器 */
- private beatTimer = 0;
- /** 心跳发送时间 */
- private beatTimerId = '';
- /** 重连定时器 */
- private reconnectTimer = 0;
- /** 超时定时器 */
- private timeoutTimer = 0;
- /** 当前流水号 */
- private currentSerial = 1;
- /** 信息发送异步建值对 */
- private asyncTaskMap = new Map<number, AsyncTask<T>>();
- /** 连接准备完成 */
- private onReady?: Promise<MTP2WebSocket<T>>;
- /** 连接成功的回调 */
- onOpen?: (socket: MTP2WebSocket<T>) => void;
- /** 连接断开的回调 */
- onClose?: () => void;
- /** 连接发生错误的回调 */
- onError?: (e: Event) => void;
- /** 接收到推送类报文的回调 */
- onPush?: (data: T) => void;
- /** 在重连之前回调 */
- onBeforeReconnect?: (count: number) => void;
- /** 在重连成功之后回调 */
- onReconnect?: () => void;
- /** 当前连接状态 */
- connState: keyof typeof ConnectionState = 'Unconnected';
- /** 重连次数 */
- reconnectCount = 0;
- /** 心跳间隔时长,默认为30秒 */
- beatInterval = 30 * 1000;
- /** 消息超时时长,默认为15秒 */
- timeoutInterval = 15 * 1000;
- constructor(pkg: Package<T>, host = '') {
- this.Package = pkg;
- this.host = host;
- }
- /**
- * 连接服务器
- */
- async connect(host?: string, protocols?: string | string[]): Promise<MTP2WebSocket<T>> {
- if (!this.onReady) {
- clearTimeout(this.reconnectTimer);
- this.stopHeartBeat();
- this.host = host || this.host;
- this.connState = 'Connecting';
- console.log(this.host, '正在连接');
- this.onReady = new Promise((resolve, reject) => {
- const errMsg = this.host + ' 连接发生错误';
- const uuid = v4();
- this.uuid = uuid;
- try {
- this.ws = new WebSocket(this.host, protocols);
- // 连接成功
- this.ws.onopen = () => {
- console.log(this.host, '连接成功');
- this.connState = 'Connected';
- this.startHeartBeat();
- this.onOpen && this.onOpen(this);
- resolve(this);
- }
- // 连接断开(CLOSING有可能延迟)
- this.ws.onclose = () => {
- // 判断是否当前实例
- // 如果连接断开后不等待 onclose 响应,由于 onclose 延迟的原因可能会在创建新的 ws 实例后触发,导致刚创建的实例被断开进入重连机制
- if (this.uuid === uuid) {
- console.warn(this.host, '连接已断开');
- this.reset();
- this.reconnect(); // 重连失败会不断尝试,直到成功为止
- }
- }
- // 连接发生错误
- this.ws.onerror = (e) => {
- this.onError && this.onError(e);
- reject(errMsg);
- }
- // 接收数据
- this.ws.onmessage = (e) => {
- // 接收数据
- new Response(e.data).arrayBuffer().then((res) => {
- this.disposeReceiveDatas(new Uint8Array(res));
- })
- }
- } catch {
- reject(errMsg);
- }
- })
- }
- return this.onReady
- }
- /**
- * 主动断开连接,断开后不会自动重连
- */
- close() {
- clearTimeout(this.reconnectTimer);
- this.stopHeartBeat();
- this.uuid = v4(); // 改变实例标识,取消重连状态
- this.reconnectCount = 0;
- if (this.ws) {
- this.ws.close();
- console.warn(this.host, '主动断开');
- }
- this.reset(); // 不等待 ws.onclose 响应强制重置实例
- }
- /**
- * 重置实例
- */
- private reset() {
- this.ws = undefined;
- this.onReady = undefined;
- this.connState = 'Unconnected';
- this.onClose && this.onClose();
- }
- /**
- * 启动心跳请求包发送Timer
- */
- private startHeartBeat() {
- this.beatTimer = window.setTimeout(() => {
- this.beatTimerId = moment(new Date()).format('HH:mm:ss');
- //console.log(this.host, '发送心跳', this.beatTimerId);
- // 发送心跳
- switch (this.Package) {
- case Package40: {
- // 4.0
- this.send({
- data: { rspCode: -1, payload: new this.Package(0x12) }
- })
- break;
- }
- case Package50: {
- // 5.0
- this.send({
- data: { rspCode: 0, payload: new this.Package(0) }
- })
- break;
- }
- }
- // 如果已经超过或心跳超时时长没有收到心跳回复,则认为网络已经异常,进行断网重连
- // WARNING: - 注意在正常断网时停止心跳逻辑
- this.timeoutTimer = window.setTimeout(() => {
- console.warn(this.host, '心跳超时', this.beatTimerId);
- this.reconnect();
- }, this.timeoutInterval)
- }, this.beatInterval)
- }
- /**
- * 停止心跳请求包发送Timer
- */
- private stopHeartBeat() {
- clearTimeout(this.timeoutTimer);
- clearTimeout(this.beatTimer);
- }
- /**
- * 断网重连方法,在重连尝试失败时会再次重试
- */
- private reconnect() {
- this.stopHeartBeat();
- if (this.connState !== 'Connecting') {
- this.reconnectCount++;
- this.onBeforeReconnect && this.onBeforeReconnect(this.reconnectCount);
- // 自动计算每次重试的延时,重试次数越多,延时越大
- const delay = this.reconnectCount * 5000
- console.log(this.host, `${delay / 1000}秒后将进行第${this.reconnectCount}次重连`);
- this.reconnectTimer = window.setTimeout(() => {
- this.onReady = undefined;
- this.connect().then(() => {
- console.log(this.host, '重连成功,可开始进行业务操作');
- this.reconnectCount = 0;
- this.onReconnect && this.onReconnect();
- }).catch(() => {
- // 重连失败会进入 ws.onclose 再次发起重连
- if (this.reconnectCount) {
- console.warn(this.host, `第${this.reconnectCount}次重连失败`);
- }
- })
- }, delay);
- }
- }
- /**
- * 发送报文
- * @param data 目标报文
- * @param fncode 回复号。P为Package40对象时为回复大类号,同时必传;P为Package50对象为回复功能码,可不传。
- * @param success 成功回调
- * @param fail 失败回调
- */
- send(msg: SendMessage<T>) {
- const { data, success, fail } = msg;
- const { payload } = data;
- if (this.onReady) {
- this.onReady.then(() => {
- const mapKey = this.currentSerial;
- payload.serialNumber = mapKey; // 设置流水号
- // 判断是否需要异步回调
- if (success || fail) {
- // 保存发送任务,为任务设置一个自动超时定时器
- this.asyncTaskMap.set(mapKey, {
- sendMessage: msg,
- timeoutId: window.setTimeout(() => this.asyncTimeout(mapKey), this.timeoutInterval),
- })
- }
- // 发送信息
- this.ws?.send(payload.data());
- this.currentSerial++;
- })
- } else {
- fail && fail('服务未连接');
- }
- }
- /**
- * 异步任务超时
- * @param key key
- */
- private asyncTimeout(key: number) {
- // 获取对应异步任务对象
- const asyncTask = this.asyncTaskMap.get(key);
- const onFail = asyncTask?.sendMessage.fail;
- onFail && onFail('业务超时');
- this.asyncTaskMap.delete(key);
- }
- /**
- * 处理接收数据
- * @param bytes 接收的数据
- */
- private disposeReceiveDatas(bytes: Uint8Array) {
- const cache: number[] = [];
- let cachePackageLength = 0;
- for (let i = 0; i < bytes.length; i++) {
- const byte = bytes[i];
- cache.push(byte);
- if (i === 0 && byte !== 0xff) {
- console.error('接收到首字节不是0xFF');
- return;
- } else {
- // 取报文长度
- if (cache.length === 5) {
- switch (this.Package) {
- // 4.0报文
- case Package40: {
- cachePackageLength = new DataView(new Uint8Array(new Uint8Array(cache).subarray(1, 5)).buffer).getUint32(0, false);
- if (cachePackageLength > 65535) {
- console.error('接收到长度超过65535的行情包');
- return;
- }
- break;
- }
- // 5.0报文
- case Package50: {
- const sub = new Uint8Array(cache).subarray(1, 5);
- const dataView = new DataView(new Uint8Array(sub).buffer); // 注意这里要new一个新的Uint8Array,直接buffer取到的还是原数组的buffer
- cachePackageLength = dataView.getUint32(0, true);
- break;
- }
- }
- }
- // 判断是否已经到包尾
- if (cache.length === cachePackageLength) {
- if (byte !== 0x0) {
- console.error('接收到尾字节不是0x0的错误数据包');
- return;
- }
- const content = new Uint8Array(cache);
- const result = new this.Package(content);
- if (result.packageLength === 0) {
- console.error('报文装箱失败');
- return;
- }
- this.disposePackageResult(result);
- }
- }
- }
- }
- /**
- * 处理完整的报文
- */
- private disposePackageResult(p: T) {
- const asyncTask = this.asyncTaskMap.get(p.serialNumber);
- const sendMessage = asyncTask?.sendMessage;
- // 4.0报文
- if (p instanceof Package40) {
- // 推送类报文, 0x12 - 心跳, 0x41 - 实时行情推送, 0x42 - 控制信号
- switch (p.mainClassNumber) {
- case 0x12: {
- // 接收到心跳回复
- //console.log(this.host, '收到心跳回复', this.beatTimerId);
- this.stopHeartBeat();
- this.startHeartBeat();
- break;
- }
- case 0x41:
- case 0x42: {
- // 推送类报文
- this.onPush && this.onPush(p);
- break;
- }
- default: {
- // 非推送类报文
- sendMessage?.success && sendMessage.success(p);
- }
- }
- }
- // 5.0报文
- if (p instanceof Package50) {
- if (p.funCode === 0) {
- // 接收到心跳回复
- //console.log(this.host, '收到心跳回复', this.beatTimerId);
- this.stopHeartBeat();
- this.startHeartBeat();
- } else if (p.serialNumber === 0) {
- // 推送类报文
- this.onPush && this.onPush(p);
- } else if (sendMessage) {
- // 非推送类报文
- const { data, success } = sendMessage;
- // 可能会收到多个流水号相同的数据包,需判断是否正确的回复码
- if (data.rspCode === p.funCode) {
- success && success(p);
- } else {
- // 跳过当前收到的数据包,继续等待直到asyncTask超时
- //console.log('无效的回复码', p.funCode)
- return
- }
- }
- }
- // 移除当前发送的任务请求
- if (asyncTask) {
- clearTimeout(asyncTask.timeoutId);
- this.asyncTaskMap.delete(p.serialNumber);
- }
- }
- }
|