| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495 |
- import { Package40, Package50 } from './package';
- import timerUtil from '@/utils/timer/timerUtil';
- /** 重连状态 */
- export enum ReconnectChangeState {
- /** 开始重连 */
- BeginReconnect,
- /** 尝试重连失败,将在下一个周期后再次尝试重连 */
- FailAndWaitPeriod,
- /** 重连成功,将进行业务操作 */
- ReconnectSuccessed,
- /** 重连成功后业务操作失败并将再次重试,由业务模块发起 */
- LoginFail,
- /** 重连成功后业务操作成功(包括交易服务的账户登录状态更新或行情服务的行情订阅等 */
- Logined,
- }
- export type beatTimer = 'quote' | 'trade';
- /**
- * 回调类
- */
- export interface Callback {
- /** 成功回调 */
- onSuccess?<T extends Package40 | Package50>(res: T): void;
- /** 失败回调 */
- onFail?(err: Error): void;
- }
- /** 信息发送异步任务类 */
- class AsyncTask {
- /** 5.0报文直接为流水号;4.0报文为流水号+"_"+大类号(由于服务端行情推送会使用流水号自增) */
- key: string;
- /** 返回功能码,主要用于过滤账户服务的错传包 */
- rspFunCode?: number;
- /** 超时标识 */
- timeOut: number;
- /** 回调 */
- callback: Callback;
- constructor(key: string, timeOut: number, callback: Callback, rspFunCode?: number) {
- this.key = key;
- this.rspFunCode = rspFunCode;
- this.timeOut = timeOut;
- this.callback = callback;
- }
- }
- /** MTP2.0 长链通信类 */
- export class MTP2WebSocket<T extends Package40 | Package50> {
- /** 报文类型,0 - 4.0行情报文,1 - 5.0交易报文 */
- packageType = 0;
- /** 服务端地址 */
- host = '';
- /** 当前连接状态,0 - 未连接,1 - 连接中,2 - 已连接 */
- connState = 0;
- /** 连接成功的回调 */
- onConnected?: (obj: MTP2WebSocket<T>) => void;
- /** 连接断开的回调 */
- onClosed?: (obj: MTP2WebSocket<T>) => void;
- /** 连接发生错误的回调 */
- onError?: (obj: MTP2WebSocket<T>, err: Error) => void;
- /** 接收到推送类报文的回调 */
- onReceivePush?: (obj: MTP2WebSocket<T>, data: T) => void;
- /** 重连状态发生改变时的回调 */
- onReconnectChangeState?: (obj: MTP2WebSocket<T>, state: ReconnectChangeState) => void;
- /** WebSocket 对象 */
- private socket?: WebSocket;
- /** 当前流水号 */
- private currentSerial = 1;
- /** 默认超时时长(秒) */
- private timeOutInterval = 30 * 1000;
- /** 信息发送异步建值对 */
- private asyncTaskMap: Map<string, AsyncTask> = new Map();
- /** 段包 */
- private cache?: number[];
- /** 当前处理的段包报文的原长度 */
- private cachePackageLength = 0;
- /** 心跳间隔时间,默认为10秒 */
- private beatInterval = 10 * 1000;
- /** 心跳专用定时器 */
- // private beatTimer = 'beatTimer';
- /** 最后一次收到心跳的时间 */
- private lastRecevieBeatTime?: Date;
- /** 心跳回复超时时间,默认为30秒 */
- private beatTimeoutInterval = 30 * 1000;
- /** 当前是否正在进行断网重连 */
- private isReconnecting = false;
- /** 当前是否可进行断网重连;业务与网络分离,而断网重连需要在登录账号后(或行情订阅成功后)才可进行 */
- public canReconnect = false;
- /** 断网重连失败后重试的TimeOut标识 */
- private reconnectTimer = 0;
- /** 外部是否要求停止断网重连操作标志 */
- private isBrokenReconnecting = false;
- /**
- * 构造函数
- * @param packageType 报文类型,0 - 4.0行情报文,1 - 5.0交易报文
- */
- constructor(packageType: number) {
- this.packageType = packageType;
- }
- /**
- * 连接服务端
- * @param host 服务端地址
- */
- conn(host: string) {
- return new Promise((resolve, reject) => {
- if (this.socket) {
- this.socket.close();
- this.socket = undefined;
- }
- this.connState = 1;
- this.socket = new WebSocket(host);
- this.socket.onopen = () => {
- // 连接成功
- this.connState = 2;
- this.host = host;
- // this.canReconnect = true;
- resolve('ok');
- if (this.onConnected) this.onConnected(this);
- };
- this.socket.onclose = () => {
- // 连接断开
- this.connState = 0;
- if (this.onClosed) this.onClosed(this);
- };
- this.socket.onerror = (ev) => {
- // 发生错误
- this.connState = 0;
- const err = new Error('连接发生错误');
- reject(err);
- if (this.onError) this.onError(this, err);
- // 判断是否可进行断网重连
- this.canReconnectThenError();
- };
- this.socket.onmessage = (message) => {
- // 接收数据
- new Response(message.data).arrayBuffer().then((res) => {
- this.disposeReceiveDatas(new Uint8Array(res));
- });
- };
- });
- }
- /**
- * 断开连接
- */
- close() {
- this.connState = 0;
- if (this.socket) this.socket.close();
- }
- /**
- * 启动心跳请求包发送Timer
- * 行情与交易是不同的链路,需要启动不同的定时器发送心跳
- */
- startBeatTime(timer: beatTimer) {
- this.stopBeatTimer(timer);
- this.lastRecevieBeatTime = new Date();
- this.sendBeat(timer);
- }
- /**
- * 停止心跳请求包发送Timer
- */
- stopBeatTimer(timer: beatTimer) {
- timerUtil.clearInterval(timer);
- }
- /**
- * 发送心跳
- */
- private sendBeat(timer: beatTimer) {
- timerUtil.setInterval(
- () => {
- console.log('心跳回复', this.packageType, ':', this.connState);
- // 当前没有连接
- if (this.connState !== 2) {
- return;
- }
- // 发送心跳
- if (this.packageType === 0) {
- // 4.0
- this.send40(new Package40([0x12, undefined], undefined), -1, undefined);
- } else if (this.packageType === 1) {
- // 5.0
- this.send50(new Package50([0, undefined]), undefined);
- }
- // 如果已经超过或心跳超时时长没有收到心跳回复,则认为网络已经异常,进行断网重连
- // WARNING: - 注意在正常断网时停止心跳逻辑
- const date = new Date();
- if (date.getTime() - this.lastRecevieBeatTime!.getTime() > this.beatTimeoutInterval) {
- if (this.onError) this.onError(this, new Error('心跳超时'));
- // 判断是否可进行断网重连
- this.canReconnectThenError();
- }
- },
- this.beatInterval,
- timer
- );
- }
- /**
- * 发送报文
- * @param p 目标报文
- * @param rsp 回复号。P为Package40对象时为回复大类号,同时必传;P为Package50对象为回复功能码,可不传。
- * @param callback 回调
- */
- send(p: T, rsp?: number, callback?: Callback) {
- if (this.connState != 2) {
- if (callback && callback.onFail) callback.onFail(new Error('连接状态错误'));
- return;
- }
- if (p instanceof Package40) {
- // 4.0报文
- if (!rsp && callback && callback.onFail) {
- callback.onFail(new Error('缺少rsp'));
- return;
- }
- this.send40(p, rsp!, callback);
- } else {
- // 5.0报文
- this.send50(p as Package50, rsp, callback);
- }
- }
- /**
- * 发送4.0报文
- * @param p 待发送报文
- * @param rspMainClassNumber 回复大类号
- * @param callback 回调
- */
- private send40(p: Package40, rspMainClassNumber: number, callback?: Callback) {
- // 设置流水号
- p.serialNumber = this.currentSerial;
- this.currentSerial++;
- // 判断是否需要异步回调
- if (callback) {
- const key = p.serialNumber + '_' + rspMainClassNumber;
- const asyncTask = new AsyncTask(
- key,
- window.setTimeout(() => this.asyncTimeOut(key), this.timeOutInterval),
- callback,
- rspMainClassNumber
- );
- this.asyncTaskMap.set(key, asyncTask);
- }
- // 发送信息
- const data = p.data();
- this.socket?.send(data);
- }
- /**
- * 发送5.0报文
- * @param p 待发送报文
- * @param rspFunCode 回复功能码
- * @param callback 回调
- */
- private send50(p: Package50, rspFunCode?: number, callback?: Callback) {
- // 设置流水号
- p.serialNumber = this.currentSerial;
- this.currentSerial++;
- // 判断是否需要异步回调
- if (callback) {
- const key = p.serialNumber.toString();
- const asyncTask = new AsyncTask(
- key,
- window.setTimeout(() => this.asyncTimeOut(key), this.timeOutInterval),
- callback,
- rspFunCode
- );
- this.asyncTaskMap.set(key, asyncTask);
- }
- // 发送信息
- this.socket?.send(p.data());
- }
- /**
- * 异步任务超时
- * @param key key
- */
- private asyncTimeOut(key: string) {
- // 获取对应异步任务对象
- const asyncTask = this.asyncTaskMap.get(key);
- if (asyncTask) {
- if (asyncTask.callback && asyncTask.callback.onFail) asyncTask.callback.onFail(new Error('业务超时'));
- }
- this.asyncTaskMap.delete(key);
- }
- /**
- * 处理接收数据
- * @param bytes 接收的数据
- */
- private disposeReceiveDatas(bytes: Uint8Array) {
- for (let i = 0; i < bytes.length; i++) {
- const byte = bytes[i];
- if (!this.cache) {
- // 新报文
- if (byte !== 0xff) {
- // 接收到首字节不是0xFF的错误数据
- return;
- }
- this.cache = [];
- this.cache[this.cache.length] = byte;
- } else {
- // 追加数据
- this.cache[this.cache.length] = byte;
- // 取报文长度
- if (this.cache.length === 5) {
- if (this.packageType === 0) {
- // 4.0报文
- this.cachePackageLength = new DataView(new Uint8Array(new Uint8Array(this.cache).subarray(1, 5)).buffer).getUint32(0, false);
- if (this.cachePackageLength > 65535) {
- // 接收到长度超过65535的行情包
- this.cache = undefined;
- this.cachePackageLength = 0;
- return;
- }
- } else {
- // 5.0报文
- const sub = new Uint8Array(this.cache).subarray(1, 5);
- const dataView = new DataView(new Uint8Array(sub).buffer); // 注意这里要new一个新的Uint8Array,直接buffer取到的还是原数组的buffer
- this.cachePackageLength = dataView.getUint32(0, true);
- }
- }
- // 判断是否已经到包尾
- if (this.cache.length === this.cachePackageLength) {
- if (byte !== 0x0) {
- // 接收到尾字节不是0x0的错误数据包
- this.cache = undefined;
- this.cachePackageLength = 0;
- return;
- }
- if (this.packageType === 0) {
- // 4.0报文
- this.disposePackage40(new Uint8Array(this.cache));
- } else {
- // 5.0报文
- this.disposePackage50(new Uint8Array(this.cache));
- }
- this.cache = undefined;
- this.cachePackageLength = 0;
- }
- }
- }
- }
- /**
- * 处理完整的4.0报文
- * @param bytes 4.0报文字段流
- */
- private disposePackage40(bytes: Uint8Array) {
- const p = new Package40(undefined, bytes);
- if (p.packageLength === 0) {
- // 报文装箱失败
- return;
- }
- if (p.mainClassNumber === 0x12 || p.mainClassNumber === 0x41 || p.mainClassNumber === 0x42) {
- // 推送类报文, 0x12 - 心跳, 0x41 - 实时行情推送, 0x42 - 控制信号
- if (this.onReceivePush) this.onReceivePush(this, <T>p);
- // 收到心跳
- if (p.mainClassNumber === 0x12) {
- this.lastRecevieBeatTime = new Date();
- console.log(`packageType=${this.packageType},${this.host},收到心跳回复!`);
- }
- } else {
- // 非推送类报文
- const key = p.serialNumber + '_' + p.mainClassNumber;
- const asyncTask = this.asyncTaskMap.get(key);
- if (asyncTask) {
- clearTimeout(asyncTask.timeOut);
- if (asyncTask.callback && asyncTask.callback.onSuccess) asyncTask.callback.onSuccess(p);
- this.asyncTaskMap.delete(key);
- }
- }
- }
- /**
- * 处理完整的5.0报文
- * @param bytes 5.0报文字段流
- */
- private disposePackage50(bytes: Uint8Array) {
- const p = new Package50(undefined, bytes);
- if (p.packageLength === 0) {
- // 报文装箱失败
- return;
- }
- if (p.funCode === 0 || p.serialNumber === 0) {
- // 推送类报文
- if (this.onReceivePush) this.onReceivePush(this, <T>p);
- // 收到心跳
- if (p.funCode === 0) {
- console.log(`packageType=${this.packageType},${this.host},收到心跳回复!`);
- this.lastRecevieBeatTime = new Date();
- }
- } else {
- // 非推送类报文
- const key = p.serialNumber.toString();
- const asyncTask = this.asyncTaskMap.get(key);
- if (asyncTask) {
- // 判断是否需要检验回复功能码
- if (asyncTask.rspFunCode && asyncTask.rspFunCode !== p.funCode) {
- return;
- }
- clearTimeout(asyncTask.timeOut);
- if (asyncTask.callback && asyncTask.callback.onSuccess) asyncTask.callback.onSuccess(p);
- this.asyncTaskMap.delete(key);
- }
- }
- }
- /**
- * 判断是否可进行断网重连
- */
- private canReconnectThenError() {
- // 判断是否应该启动断网重连机制
- if (this.isReconnecting || !this.canReconnect || this.isBrokenReconnecting) return;
- // 回调当前所有发送信息错误块
- this.callAllAsyncTaskOnReconnecting();
- // 开始断网重连机制
- this.isReconnecting = true;
- this.reconnect();
- }
- /**
- * 断网重连方法,在重连尝试失败时会递归调用自己
- */
- private reconnect() {
- // 判断是否已由外部主动断开
- if (this.isBrokenReconnecting) {
- this.isReconnecting = false;
- return;
- }
- // 开始尝试重连服务端
- if (this.onReconnectChangeState) this.onReconnectChangeState!(this, ReconnectChangeState.BeginReconnect);
- this.conn(this.host)
- .then((res) => {
- // 连接成功
- this.isReconnecting = false;
- if (this.onReconnectChangeState) this.onReconnectChangeState!(this, ReconnectChangeState.ReconnectSuccessed);
- })
- .catch((err) => {
- // 连接失败
- if (this.onReconnectChangeState) this.onReconnectChangeState!(this, ReconnectChangeState.FailAndWaitPeriod);
- // 5秒后尝试重连
- if (this.reconnectTimer) {
- clearTimeout(this.reconnectTimer);
- }
- this.reconnectTimer = window.setTimeout(() => {
- this.reconnect();
- }, 5000);
- });
- }
- /**
- * 在断网重连时调用所有的错误回调
- */
- private callAllAsyncTaskOnReconnecting() {
- if (this.asyncTaskMap && this.asyncTaskMap.size > 0) {
- for (const key in this.asyncTaskMap) {
- const asyncTask = this.asyncTaskMap.get(key);
- if (asyncTask && asyncTask.callback && asyncTask.callback.onFail) {
- asyncTask.callback.onFail(new Error('发生断网重连'));
- clearTimeout(asyncTask.timeOut);
- }
- }
- this.asyncTaskMap.clear();
- }
- }
- }
|