Explorar o código

重写websocket断线重连机制

li.shaoyi %!s(int64=3) %!d(string=hai) anos
pai
achega
988d8dbe48

+ 11 - 53
src/services/index.ts

@@ -25,12 +25,10 @@ export default new (class LifeCycleCtr {
 
     /** 长链接管理 */
     public Socket: LongLink = {
-        quote: new MTP2WebSocket<Package40>(0),
-        trade: new MTP2WebSocket<Package50>(1),
+        quote: new MTP2WebSocket<Package40>('quote'),
+        trade: new MTP2WebSocket<Package50>('trade'),
     };
 
-    constructor() { }
-
     /** 数据中心初始化 */
     initDataCenter(): void { }
 
@@ -67,55 +65,27 @@ export default new (class LifeCycleCtr {
 
     /** 主动连接行情 */
     connectQuote(): Promise<string> {
-        if (this.Socket['quote'].connState === 0) {
-            return this.connectServer('quote', serviceURL.quoteUrl).then(() => {
-                // 开始发送心跳
-                this.startBeatTime('quote');
-                return 'ok';
-            });
-        } else {
-            return Promise.resolve('已连接quote');
-        }
+        return this.connectServer('quote', serviceURL.quoteUrl);
     }
 
     /** 主动连接交易 */
     connectTrading(): Promise<string> {
-        return this.connectServer('trade', serviceURL.tradeUrl).then(() => {
-            // 开始发送心跳
-            this.startBeatTime('trade');
-            return 'ok';
-        });
-        // if (this.Socket['trade'].connState === 0) {
-        //     return this.connectServer('trade', serviceURL.tradeUrl);
-        // } else {
-        //     return Promise.resolve('已连接trading');
-        // }
+        return this.connectServer('trade', serviceURL.tradeUrl);
     }
 
     /** 连接服务器 */
     connectServer(socket: LontLinkName, ws: string): Promise<string> {
+        const service = this.Socket[socket];
         return new Promise((resolve, reject) => {
-            this.Socket[socket].conn(ws);
-            this.Socket[socket].onConnected = () => {
-                console.info(`${socket},${ws},建立链接成功!`);
-                resolve('ok');
-            };
-            this.Socket[socket].onClosed = () => {
-                console.error(`${socket},${ws},断开链接了!`);
-            };
-            this.Socket[socket].onError = (obj: MTP2WebSocket<Package40> | MTP2WebSocket<Package50>, err: Error) => {
-                console.error(`${socket},${ws},发送错误:${err.message}`);
+            service.onError = (obj: MTP2WebSocket<Package40> | MTP2WebSocket<Package50>, err: Error) => {
                 reject(err);
-            };
+            }
+            service.conn(ws).then(() => {
+                resolve(socket);
+            })
             if (socket === 'quote') this.quoteRelevant(this.Socket[socket]);
             if (socket === 'trade') this.tradingRelevant(this.Socket[socket]);
-        });
-    }
-
-    /** 发送心跳 */
-    startBeatTime(socket: LontLinkName): void {
-        this.Socket[socket].startBeatTime(socket);
-        this.Socket[socket].canReconnect = true;
+        })
     }
 
     /** 向交易服务器发送请求 */
@@ -129,14 +99,10 @@ export default new (class LifeCycleCtr {
     }
     /** 主动关闭行情服务 */
     closeQuote() {
-        //  清空心跳定时器
-        this.Socket['quote'].stopBeatTimer('quote');
         this.Socket['quote'].close();
     }
     /** 主动关闭长链接 */
     closeServer(): void {
-        //  清空心跳定时器
-        this.Socket['trade'].stopBeatTimer('trade');
         this.Socket['trade'].close();
         this.dataCenter.reset();
         // sessionStorage.clear();
@@ -157,11 +123,9 @@ export default new (class LifeCycleCtr {
                 case ReconnectChangeState.BeginReconnect:
                     // 停止 token 校验
                     // stopCheckToken()
-                    console.log(obj.host, '开始尝试重连服务端');
                     break;
                 /** 尝试重连失败,将在下一个周期后再次尝试重连 */
                 case ReconnectChangeState.FailAndWaitPeriod:
-                    console.log(obj.host, '尝试重连失败,将在下一个周期后再次尝试重连');
                     break;
                 /** 重连成功,将进行业务操作 */
                 case ReconnectChangeState.ReconnectSuccessed:
@@ -178,7 +142,6 @@ export default new (class LifeCycleCtr {
                     // checkTokenAction();
                     console.log(obj.host, '重连成功后业务操作成功(包括交易服务的账户登录状态更新或行情服务的行情订阅等');
                     // 行情链路重连成功后,需要通知上层 重新订阅商品
-
                     break;
             }
         };
@@ -223,16 +186,12 @@ export default new (class LifeCycleCtr {
                 case ReconnectChangeState.BeginReconnect:
                     // 停止 token 校验
                     stopCheckToken()
-                    console.log(obj.host, '开始尝试重连服务端');
                     break;
                 /** 尝试重连失败,将在下一个周期后再次尝试重连 */
                 case ReconnectChangeState.FailAndWaitPeriod:
-                    console.log(obj.host, '尝试重连失败,将在下一个周期后再次尝试重连');
                     break;
                 /** 重连成功,将进行业务操作 */
                 case ReconnectChangeState.ReconnectSuccessed:
-                    //重新发起心跳
-                    _this.startBeatTime('trade');
                     // 重新启动定时 token 校验
                     checkTokenAction()
                     console.log(obj.host, '重连成功,将进行业务操作');
@@ -242,7 +201,6 @@ export default new (class LifeCycleCtr {
                     break;
                 /** 重连成功后业务操作成功(包括交易服务的账户登录状态更新或行情服务的行情订阅等 */
                 case ReconnectChangeState.Logined:
-
                     break;
             }
         };

+ 1 - 1
src/services/socket/protobuf/buildReq.ts

@@ -162,7 +162,7 @@ function buildCommonProtoReq50(param: CommonSearchParam): Package50 {
 }
 
 function isErrer(rspPackage: any, funCodeName: string): void {
-    if (APP.Socket.trade.connState !== 2) {
+    if (APP.Socket.trade.connState !== 'Connected') {
         console.error(`交易服务未连接!`);
     }
     // 判断回复FunCode是否正确

+ 24 - 21
src/services/socket/quota/index.ts

@@ -52,27 +52,29 @@ function trySubcribe() {
  */
 function actionSubcribe() {
     const arr = recombinationSubscribeGoods();
-    const req = buildSubscribePeq(arr);
-    isSubscribeSucess = false
-    APP.sendQuoteServer(req, funCode.MainClassNumber_Quota_SubscriptRsp, {
-        onSuccess: (res: any) => {
-            parseSubscribeRsp(res)
-                .then((value) => {
-                    console.log('订阅成功!', value);
-                    isSubscribeSucess = true
-                })
-                .catch((err) => {
-                    isSubscribeSucess = false
-                    console.log('订阅失败:', err);
-                    // 失败原因:行情断
-                    // 重新 走行情链接逻辑
-                });
-        },
-        onFail: (err) => {
-            isSubscribeSucess = false
-            console.log('订阅失败:', err)
-        },
-    } as Callback);
+    if (arr.length) {
+        const req = buildSubscribePeq(arr);
+        isSubscribeSucess = false
+        APP.sendQuoteServer(req, funCode.MainClassNumber_Quota_SubscriptRsp, {
+            onSuccess: (res: any) => {
+                parseSubscribeRsp(res)
+                    .then((value) => {
+                        console.log('订阅成功!', value);
+                        isSubscribeSucess = true
+                    })
+                    .catch((err) => {
+                        isSubscribeSucess = false
+                        console.log('订阅失败:', err);
+                        // 失败原因:行情断
+                        // 重新 走行情链接逻辑
+                    });
+            },
+            onFail: (err) => {
+                isSubscribeSucess = false
+                console.log('订阅失败:', err)
+            },
+        } as Callback);
+    }
 }
 
 /**
@@ -92,6 +94,7 @@ export function addSubscribeQuotation(uuid: string, subscribeInfos: SubscribeInf
 export function removeSubscribeQuotation(uuid: string) {
     if (subscribeGoodsList.has(uuid)) {
         subscribeGoodsList.delete(uuid);
+        console.log('删除订阅', uuid)
         if (subscribeGoodsList.size) {
             trySubcribe();
         } else {

+ 155 - 158
src/utils/websocket/index.ts

@@ -1,5 +1,15 @@
-import timerUtil from '@/utils/timer/timerUtil';
 import { Package40, Package50 } from './package';
+import moment from 'moment'
+
+/**
+ * 连接状态
+ */
+enum ConnectionState {
+    Unconnected, // 未连接
+    Connecting, // 连接中
+    Connected, // 已连接
+}
+
 /** 重连状态 */
 export enum ReconnectChangeState {
     /** 开始重连 */
@@ -47,12 +57,14 @@ class AsyncTask {
 
 /** MTP2.0 长链通信类 */
 export class MTP2WebSocket<T extends Package40 | Package50> {
-    /** 报文类型,0 - 4.0行情报文,1 - 5.0交易报文 */
-    packageType = 0;
     /** 服务端地址 */
-    host = '';
-    /** 当前连接状态,0 - 未连接,1 - 连接中,2 - 已连接 */
-    connState = 0;
+    host;
+    /** 当前连接状态 */
+    connState: keyof typeof ConnectionState = 'Unconnected';
+    /** 重连次数 */
+    reconnectCount = 0;
+    /** 重连间隔时长,默认为5秒 */
+    reconnectInterval = 5 * 1000;
 
     /** 连接成功的回调 */
     onConnected?: (obj: MTP2WebSocket<T>) => void;
@@ -65,6 +77,8 @@ export class MTP2WebSocket<T extends Package40 | Package50> {
     /** 重连状态发生改变时的回调 */
     onReconnectChangeState?: (obj: MTP2WebSocket<T>, state: ReconnectChangeState) => void;
 
+    /** 报文类型,0 - 4.0行情报文,1 - 5.0交易报文 */
+    private packageType;
     /** WebSocket 对象 */
     private socket?: WebSocket;
     /** 当前流水号 */
@@ -78,137 +92,132 @@ export class MTP2WebSocket<T extends Package40 | Package50> {
     /** 当前处理的段包报文的原长度 */
     private cachePackageLength = 0;
 
+    /** 心跳定时器 */
+    private beatTimer = 0;
+    /** 心跳发送时间 */
+    private beatTimerId = '';
+    /** 重连定时器 */
+    private reconnectTimer = 0;
+    /** 超时定时器 */
+    private timeoutTimer = 0;
     /** 心跳间隔时间,默认为10秒 */
     private beatInterval = 10 * 1000;
-    /** 心跳专用定时器 */
-    // private beatTimer = 'beatTimer';
-    /** 最后一次收到心跳的时间 */
-    private lastRecevieBeatTime?: Date;
     /**  心跳回复超时时间,默认为30秒 */
     private beatTimeoutInterval = 30 * 1000;
-
-    /** 当前是否正在进行断网重连 */
-    private isReconnecting = false;
-    /** 当前是否可进行断网重连;业务与网络分离,而断网重连需要在登录账号后(或行情订阅成功后)才可进行 */
-    public canReconnect = false;
-    /** 断网重连失败后重试的TimeOut标识 */
-    private reconnectTimer = 0;
     /** 外部是否要求停止断网重连操作标志 */
     private isBrokenReconnecting = false;
 
+    private readyState?: Promise<MTP2WebSocket<T>>;
+
     /**
      * 构造函数
      * @param packageType 报文类型,0 - 4.0行情报文,1 - 5.0交易报文
      */
-    constructor(packageType: number) {
+    constructor(packageType: beatTimer, host?: string) {
         this.packageType = packageType;
+        this.host = host || '';
     }
 
     /**
      * 连接服务端
      * @param host 服务端地址
      */
-    conn(host: string) {
-        return new Promise((resolve, reject) => {
-            if (this.socket) {
-                this.socket.close();
-                this.socket = undefined;
-            }
-
-            this.connState = 1;
-            this.socket = new WebSocket(host);
-            this.socket.onopen = () => {
+    conn(host?: string) {
+        if (!this.readyState) {
+            clearTimeout(this.reconnectTimer);
+            this.stopBeatTimer();
+            this.host = host || this.host;
+            this.connState = 'Connecting';
+            this.isBrokenReconnecting = false;
+            console.log(this.packageType, this.host, '正在连接');
+
+            this.readyState = new Promise((resolve, reject) => {
+                this.socket = new WebSocket(this.host);
                 // 连接成功
-                this.connState = 2;
-                this.host = host;
-                // this.canReconnect = true;
-
-                resolve('ok');
-                if (this.onConnected) this.onConnected(this);
-            };
-            this.socket.onclose = () => {
+                this.socket.onopen = () => {
+                    console.log(this.packageType, this.host, '连接成功');
+                    this.connState = 'Connected';
+                    this.startBeatTime();
+                    this.onConnected && this.onConnected(this);
+                    resolve(this);
+                }
                 // 连接断开
-                this.connState = 0;
-                if (this.onClosed) this.onClosed(this);
-            };
-            this.socket.onerror = (ev) => {
-                // 发生错误
-                this.connState = 0;
-                const err = new Error('连接发生错误');
-                reject(err);
-                if (this.onError) this.onError(this, err);
-
-                // 判断是否可进行断网重连
-                this.canReconnectThenError();
-            };
-            this.socket.onmessage = (message) => {
+                this.socket.onclose = () => {
+                    console.warn(this.packageType, this.host, '连接已断开');
+                    this.socket = undefined;
+                    this.connState = 'Unconnected';
+                    this.onClosed && this.onClosed(this);
+                }
+                // 连接发生错误
+                this.socket.onerror = () => {
+                    const message = this.host + '连接发生错误';
+                    this.connState = 'Unconnected';
+                    this.onError && this.onError(this, new Error(message));
+                    this.callAllAsyncTaskOnReconnecting();  // 回调当前所有发送信息错误块
+                    this.reconnect();
+                    reject(message);
+                }
                 // 接收数据
-                new Response(message.data).arrayBuffer().then((res) => {
-                    this.disposeReceiveDatas(new Uint8Array(res));
-                });
-            };
-        });
+                this.socket.onmessage = (e) => {
+                    // 接收数据
+                    new Response(e.data).arrayBuffer().then((res) => {
+                        this.disposeReceiveDatas(new Uint8Array(res));
+                    })
+                }
+            })
+        }
+        return this.readyState;
     }
 
     /**
-     * 断开连接
+     * 主动断开连接,断开后不会自动重连
      */
     close() {
-        this.connState = 0;
-        if (this.socket) this.socket.close();
+        clearTimeout(this.reconnectTimer);
+        this.stopBeatTimer();
+        this.readyState = undefined;
+        this.connState = 'Unconnected';
+        this.isBrokenReconnecting = true;
+        this.reconnectCount = 0;
+        this.socket?.close();
     }
 
     /**
      * 启动心跳请求包发送Timer
      * 行情与交易是不同的链路,需要启动不同的定时器发送心跳
      */
-    startBeatTime(timer: beatTimer) {
-        this.stopBeatTimer(timer);
-        this.lastRecevieBeatTime = new Date();
-        this.sendBeat(timer);
-    }
-
-    /**
-     * 停止心跳请求包发送Timer
-     */
-    stopBeatTimer(timer: beatTimer) {
-        timerUtil.clearInterval(timer);
-    }
-
-    /**
-     * 发送心跳
-     */
-    private sendBeat(timer: beatTimer) {
-        timerUtil.setInterval(
-            () => {
-                console.log('心跳回复', this.packageType, ':', this.connState);
-                // 当前没有连接
-                if (this.connState !== 2) {
-                    return;
-                }
-
-                // 发送心跳
-                if (this.packageType === 0) {
+    startBeatTime() {
+        this.beatTimer = window.setTimeout(() => {
+            this.beatTimerId = moment(new Date()).format('HH:mm:ss');
+            console.log(this.packageType, '发送心跳', this.beatTimerId);
+            // 发送心跳
+            switch (this.packageType) {
+                case 'quote': {
                     // 4.0
                     this.send40(new Package40([0x12, undefined], undefined), -1, undefined);
-                } else if (this.packageType === 1) {
+                    break;
+                }
+                case 'trade': {
                     // 5.0
                     this.send50(new Package50([0, undefined]), undefined);
+                    break;
                 }
-                // 如果已经超过或心跳超时时长没有收到心跳回复,则认为网络已经异常,进行断网重连
-                // WARNING: - 注意在正常断网时停止心跳逻辑
-
-                const date = new Date();
-                if (date.getTime() - this.lastRecevieBeatTime!.getTime() > this.beatTimeoutInterval) {
-                    if (this.onError) this.onError(this, new Error('心跳超时'));
+            }
+            // 如果已经超过或心跳超时时长没有收到心跳回复,则认为网络已经异常,进行断网重连
+            // WARNING: - 注意在正常断网时停止心跳逻辑
+            this.timeoutTimer = window.setTimeout(() => {
+                console.warn(this.packageType, '心跳超时', this.beatTimerId);
+                this.reconnect();
+            }, this.beatTimeoutInterval)
+        }, this.beatInterval)
+    }
 
-                    // 判断是否可进行断网重连
-                    this.canReconnectThenError();
-                }
-            },
-            this.beatInterval,
-            timer
-        );
+    /**
+     * 停止心跳请求包发送Timer
+     */
+    stopBeatTimer() {
+        clearTimeout(this.timeoutTimer);
+        clearTimeout(this.beatTimer);
     }
 
     /**
@@ -218,20 +227,22 @@ export class MTP2WebSocket<T extends Package40 | Package50> {
      * @param callback 回调
      */
     send(p: T, rsp?: number, callback?: Callback) {
-        if (this.connState != 2) {
-            if (callback && callback.onFail) callback.onFail(new Error('连接状态错误'));
-            return;
-        }
-        if (p instanceof Package40) {
-            // 4.0报文
-            if (!rsp && callback && callback.onFail) {
-                callback.onFail(new Error('缺少rsp'));
-                return;
-            }
-            this.send40(p, rsp!, callback);
+        if (this.readyState) {
+            this.readyState.then(() => {
+                if (p instanceof Package40) {
+                    // 4.0报文
+                    if (!rsp && callback && callback.onFail) {
+                        callback.onFail(new Error('缺少rsp'));
+                        return;
+                    }
+                    this.send40(p, rsp!, callback);
+                } else {
+                    // 5.0报文
+                    this.send50(p as Package50, rsp, callback);
+                }
+            })
         } else {
-            // 5.0报文
-            this.send50(p as Package50, rsp, callback);
+            if (callback && callback.onFail) callback.onFail(new Error('连接状态错误'));
         }
     }
 
@@ -322,7 +333,7 @@ export class MTP2WebSocket<T extends Package40 | Package50> {
                 this.cache[this.cache.length] = byte;
                 // 取报文长度
                 if (this.cache.length === 5) {
-                    if (this.packageType === 0) {
+                    if (this.packageType === 'quote') {
                         // 4.0报文
                         this.cachePackageLength = new DataView(new Uint8Array(new Uint8Array(this.cache).subarray(1, 5)).buffer).getUint32(0, false);
                         if (this.cachePackageLength > 65535) {
@@ -347,7 +358,7 @@ export class MTP2WebSocket<T extends Package40 | Package50> {
                         this.cachePackageLength = 0;
                         return;
                     }
-                    if (this.packageType === 0) {
+                    if (this.packageType === 'quote') {
                         // 4.0报文
                         this.disposePackage40(new Uint8Array(this.cache));
                     } else {
@@ -378,8 +389,9 @@ export class MTP2WebSocket<T extends Package40 | Package50> {
 
             // 收到心跳
             if (p.mainClassNumber === 0x12) {
-                this.lastRecevieBeatTime = new Date();
-                console.log(`packageType=${this.packageType},${this.host},收到心跳回复!`);
+                console.log(this.packageType, this.host, '收到心跳回复!');
+                this.stopBeatTimer();
+                this.startBeatTime();
             }
         } else {
             // 非推送类报文
@@ -411,8 +423,9 @@ export class MTP2WebSocket<T extends Package40 | Package50> {
 
             // 收到心跳
             if (p.funCode === 0) {
-                console.log(`packageType=${this.packageType},${this.host},收到心跳回复!`);
-                this.lastRecevieBeatTime = new Date();
+                console.log(this.packageType, this.host, '收到心跳回复!');
+                this.stopBeatTimer();
+                this.startBeatTime();
             }
         } else {
             // 非推送类报文
@@ -431,50 +444,34 @@ export class MTP2WebSocket<T extends Package40 | Package50> {
     }
 
     /**
-     * 判断是否可进行断网重连
-     */
-    private canReconnectThenError() {
-        // 判断是否应该启动断网重连机制
-        if (this.isReconnecting || !this.canReconnect || this.isBrokenReconnecting) return;
-
-        // 回调当前所有发送信息错误块
-        this.callAllAsyncTaskOnReconnecting();
-
-        // 开始断网重连机制
-        this.isReconnecting = true;
-        this.reconnect();
-    }
-
-    /**
      * 断网重连方法,在重连尝试失败时会递归调用自己
      */
     private reconnect() {
-        // 判断是否已由外部主动断开
-        if (this.isBrokenReconnecting) {
-            this.isReconnecting = false;
-            return;
-        }
-
-        // 开始尝试重连服务端
-        if (this.onReconnectChangeState) this.onReconnectChangeState(this, ReconnectChangeState.BeginReconnect);
-        this.conn(this.host)
-            .then((res) => {
-                // 连接成功
-                this.isReconnecting = false;
-                if (this.onReconnectChangeState) this.onReconnectChangeState(this, ReconnectChangeState.ReconnectSuccessed);
-            })
-            .catch((err) => {
-                // 连接失败
-                if (this.onReconnectChangeState) this.onReconnectChangeState(this, ReconnectChangeState.FailAndWaitPeriod);
-
-                // 5秒后尝试重连
-                if (this.reconnectTimer) {
-                    clearTimeout(this.reconnectTimer);
-                }
-                this.reconnectTimer = window.setTimeout(() => {
+        this.stopBeatTimer();
+        if (this.isBrokenReconnecting || this.connState === 'Connecting') return;
+
+        this.reconnectCount++;
+        console.log(this.packageType, this.host, `5秒后将进行第${this.reconnectCount}次重连`);
+
+        this.reconnectTimer = window.setTimeout(() => {
+            this.readyState = undefined;
+            this.onReconnectChangeState && this.onReconnectChangeState(this, ReconnectChangeState.BeginReconnect);
+
+            this.conn().then(() => {
+                // 重连成功
+                this.reconnectCount = 0;
+                this.onReconnectChangeState && this.onReconnectChangeState(this, ReconnectChangeState.ReconnectSuccessed);
+            }).catch(() => {
+                this.readyState = undefined;
+                this.onReconnectChangeState && this.onReconnectChangeState(this, ReconnectChangeState.FailAndWaitPeriod);
+
+                // 重连失败处理,如果重连过程中主动断开的了就不再进行重连
+                if (!this.isBrokenReconnecting) {
+                    console.log(this.packageType, this.host, `第${this.reconnectCount}次重连失败`);
                     this.reconnect();
-                }, 5000);
-            });
+                }
+            })
+        }, this.reconnectInterval);
     }
 
     /**