index.ts 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. import { Package40, Package50 } from './package';
  2. import timerUtil from '@/utils/timer/timerUtil';
  3. /** 重连状态 */
  4. export enum ReconnectChangeState {
  5. /** 开始重连 */
  6. BeginReconnect,
  7. /** 尝试重连失败,将在下一个周期后再次尝试重连 */
  8. FailAndWaitPeriod,
  9. /** 重连成功,将进行业务操作 */
  10. ReconnectSuccessed,
  11. /** 重连成功后业务操作失败并将再次重试,由业务模块发起 */
  12. LoginFail,
  13. /** 重连成功后业务操作成功(包括交易服务的账户登录状态更新或行情服务的行情订阅等 */
  14. Logined,
  15. }
  16. export type beatTimer = 'quote' | 'trade';
  17. /**
  18. * 回调类
  19. */
  20. export interface Callback {
  21. /** 成功回调 */
  22. onSuccess?<T extends Package40 | Package50>(res: T): void;
  23. /** 失败回调 */
  24. onFail?(err: Error): void;
  25. }
  26. /** 信息发送异步任务类 */
  27. class AsyncTask {
  28. /** 5.0报文直接为流水号;4.0报文为流水号+"_"+大类号(由于服务端行情推送会使用流水号自增) */
  29. key: string;
  30. /** 返回功能码,主要用于过滤账户服务的错传包 */
  31. rspFunCode?: number;
  32. /** 超时标识 */
  33. timeOut: number;
  34. /** 回调 */
  35. callback: Callback;
  36. constructor(key: string, timeOut: number, callback: Callback, rspFunCode?: number) {
  37. this.key = key;
  38. this.rspFunCode = rspFunCode;
  39. this.timeOut = timeOut;
  40. this.callback = callback;
  41. }
  42. }
  43. /** MTP2.0 长链通信类 */
  44. export class MTP2WebSocket<T extends Package40 | Package50> {
  45. /** 报文类型,0 - 4.0行情报文,1 - 5.0交易报文 */
  46. packageType = 0;
  47. /** 服务端地址 */
  48. host = '';
  49. /** 当前连接状态,0 - 未连接,1 - 连接中,2 - 已连接 */
  50. connState = 0;
  51. /** 连接成功的回调 */
  52. onConnected?: (obj: MTP2WebSocket<T>) => void;
  53. /** 连接断开的回调 */
  54. onClosed?: (obj: MTP2WebSocket<T>) => void;
  55. /** 连接发生错误的回调 */
  56. onError?: (obj: MTP2WebSocket<T>, err: Error) => void;
  57. /** 接收到推送类报文的回调 */
  58. onReceivePush?: (obj: MTP2WebSocket<T>, data: T) => void;
  59. /** 重连状态发生改变时的回调 */
  60. onReconnectChangeState?: (obj: MTP2WebSocket<T>, state: ReconnectChangeState) => void;
  61. /** WebSocket 对象 */
  62. private socket?: WebSocket;
  63. /** 当前流水号 */
  64. private currentSerial = 1;
  65. /** 默认超时时长(秒) */
  66. private timeOutInterval = 30 * 1000;
  67. /** 信息发送异步建值对 */
  68. private asyncTaskMap: Map<string, AsyncTask> = new Map();
  69. /** 段包 */
  70. private cache?: number[];
  71. /** 当前处理的段包报文的原长度 */
  72. private cachePackageLength = 0;
  73. /** 心跳间隔时间,默认为10秒 */
  74. private beatInterval = 10 * 1000;
  75. /** 心跳专用定时器 */
  76. // private beatTimer = 'beatTimer';
  77. /** 最后一次收到心跳的时间 */
  78. private lastRecevieBeatTime?: Date;
  79. /** 心跳回复超时时间,默认为30秒 */
  80. private beatTimeoutInterval = 30 * 1000;
  81. /** 当前是否正在进行断网重连 */
  82. private isReconnecting = false;
  83. /** 当前是否可进行断网重连;业务与网络分离,而断网重连需要在登录账号后(或行情订阅成功后)才可进行 */
  84. public canReconnect = false;
  85. /** 断网重连失败后重试的TimeOut标识 */
  86. private reconnectTimer = 0;
  87. /** 外部是否要求停止断网重连操作标志 */
  88. private isBrokenReconnecting = false;
  89. /**
  90. * 构造函数
  91. * @param packageType 报文类型,0 - 4.0行情报文,1 - 5.0交易报文
  92. */
  93. constructor(packageType: number) {
  94. this.packageType = packageType;
  95. }
  96. /**
  97. * 连接服务端
  98. * @param host 服务端地址
  99. */
  100. conn(host: string) {
  101. return new Promise((resolve, reject) => {
  102. if (this.socket) {
  103. this.socket.close();
  104. this.socket = undefined;
  105. }
  106. this.connState = 1;
  107. this.socket = new WebSocket(host);
  108. this.socket.onopen = () => {
  109. // 连接成功
  110. this.connState = 2;
  111. this.host = host;
  112. // this.canReconnect = true;
  113. resolve('ok');
  114. if (this.onConnected) this.onConnected(this);
  115. };
  116. this.socket.onclose = () => {
  117. // 连接断开
  118. this.connState = 0;
  119. if (this.onClosed) this.onClosed(this);
  120. };
  121. this.socket.onerror = (ev) => {
  122. // 发生错误
  123. this.connState = 0;
  124. const err = new Error('连接发生错误');
  125. reject(err);
  126. if (this.onError) this.onError(this, err);
  127. // 判断是否可进行断网重连
  128. this.canReconnectThenError();
  129. };
  130. this.socket.onmessage = (message) => {
  131. // 接收数据
  132. new Response(message.data).arrayBuffer().then((res) => {
  133. this.disposeReceiveDatas(new Uint8Array(res));
  134. });
  135. };
  136. });
  137. }
  138. /**
  139. * 断开连接
  140. */
  141. close() {
  142. this.connState = 0;
  143. if (this.socket) this.socket.close();
  144. }
  145. /**
  146. * 启动心跳请求包发送Timer
  147. * 行情与交易是不同的链路,需要启动不同的定时器发送心跳
  148. */
  149. startBeatTime(timer: beatTimer) {
  150. this.stopBeatTimer(timer);
  151. this.lastRecevieBeatTime = new Date();
  152. this.sendBeat(timer);
  153. }
  154. /**
  155. * 停止心跳请求包发送Timer
  156. */
  157. stopBeatTimer(timer: beatTimer) {
  158. timerUtil.clearInterval(timer);
  159. }
  160. /**
  161. * 发送心跳
  162. */
  163. private sendBeat(timer: beatTimer) {
  164. timerUtil.setInterval(
  165. () => {
  166. console.log('心跳回复', this.packageType, ':', this.connState);
  167. // 当前没有连接
  168. if (this.connState !== 2) {
  169. return;
  170. }
  171. // 发送心跳
  172. if (this.packageType === 0) {
  173. // 4.0
  174. this.send40(new Package40([0x12, undefined], undefined), -1, undefined);
  175. } else if (this.packageType === 1) {
  176. // 5.0
  177. this.send50(new Package50([0, undefined]), undefined);
  178. }
  179. // 如果已经超过或心跳超时时长没有收到心跳回复,则认为网络已经异常,进行断网重连
  180. // WARNING: - 注意在正常断网时停止心跳逻辑
  181. const date = new Date();
  182. if (date.getTime() - this.lastRecevieBeatTime!.getTime() > this.beatTimeoutInterval) {
  183. if (this.onError) this.onError(this, new Error('心跳超时'));
  184. // 判断是否可进行断网重连
  185. this.canReconnectThenError();
  186. }
  187. },
  188. this.beatInterval,
  189. timer
  190. );
  191. }
  192. /**
  193. * 发送报文
  194. * @param p 目标报文
  195. * @param rsp 回复号。P为Package40对象时为回复大类号,同时必传;P为Package50对象为回复功能码,可不传。
  196. * @param callback 回调
  197. */
  198. send(p: T, rsp?: number, callback?: Callback) {
  199. if (this.connState != 2) {
  200. if (callback && callback.onFail) callback.onFail(new Error('连接状态错误'));
  201. return;
  202. }
  203. if (p instanceof Package40) {
  204. // 4.0报文
  205. if (!rsp && callback && callback.onFail) {
  206. callback.onFail(new Error('缺少rsp'));
  207. return;
  208. }
  209. this.send40(p, rsp!, callback);
  210. } else {
  211. // 5.0报文
  212. this.send50(p as Package50, rsp, callback);
  213. }
  214. }
  215. /**
  216. * 发送4.0报文
  217. * @param p 待发送报文
  218. * @param rspMainClassNumber 回复大类号
  219. * @param callback 回调
  220. */
  221. private send40(p: Package40, rspMainClassNumber: number, callback?: Callback) {
  222. // 设置流水号
  223. p.serialNumber = this.currentSerial;
  224. this.currentSerial++;
  225. // 判断是否需要异步回调
  226. if (callback) {
  227. const key = p.serialNumber + '_' + rspMainClassNumber;
  228. const asyncTask = new AsyncTask(
  229. key,
  230. window.setTimeout(() => this.asyncTimeOut(key), this.timeOutInterval),
  231. callback,
  232. rspMainClassNumber
  233. );
  234. this.asyncTaskMap.set(key, asyncTask);
  235. }
  236. // 发送信息
  237. const data = p.data();
  238. this.socket?.send(data);
  239. }
  240. /**
  241. * 发送5.0报文
  242. * @param p 待发送报文
  243. * @param rspFunCode 回复功能码
  244. * @param callback 回调
  245. */
  246. private send50(p: Package50, rspFunCode?: number, callback?: Callback) {
  247. // 设置流水号
  248. p.serialNumber = this.currentSerial;
  249. this.currentSerial++;
  250. // 判断是否需要异步回调
  251. if (callback) {
  252. const key = p.serialNumber.toString();
  253. const asyncTask = new AsyncTask(
  254. key,
  255. window.setTimeout(() => this.asyncTimeOut(key), this.timeOutInterval),
  256. callback,
  257. rspFunCode
  258. );
  259. this.asyncTaskMap.set(key, asyncTask);
  260. }
  261. // 发送信息
  262. this.socket?.send(p.data());
  263. }
  264. /**
  265. * 异步任务超时
  266. * @param key key
  267. */
  268. private asyncTimeOut(key: string) {
  269. // 获取对应异步任务对象
  270. const asyncTask = this.asyncTaskMap.get(key);
  271. if (asyncTask) {
  272. if (asyncTask.callback && asyncTask.callback.onFail) asyncTask.callback.onFail(new Error('业务超时'));
  273. }
  274. this.asyncTaskMap.delete(key);
  275. }
  276. /**
  277. * 处理接收数据
  278. * @param bytes 接收的数据
  279. */
  280. private disposeReceiveDatas(bytes: Uint8Array) {
  281. for (let i = 0; i < bytes.length; i++) {
  282. const byte = bytes[i];
  283. if (!this.cache) {
  284. // 新报文
  285. if (byte !== 0xff) {
  286. // 接收到首字节不是0xFF的错误数据
  287. return;
  288. }
  289. this.cache = [];
  290. this.cache[this.cache.length] = byte;
  291. } else {
  292. // 追加数据
  293. this.cache[this.cache.length] = byte;
  294. // 取报文长度
  295. if (this.cache.length === 5) {
  296. if (this.packageType === 0) {
  297. // 4.0报文
  298. this.cachePackageLength = new DataView(new Uint8Array(new Uint8Array(this.cache).subarray(1, 5)).buffer).getUint32(0, false);
  299. if (this.cachePackageLength > 65535) {
  300. // 接收到长度超过65535的行情包
  301. this.cache = undefined;
  302. this.cachePackageLength = 0;
  303. return;
  304. }
  305. } else {
  306. // 5.0报文
  307. const sub = new Uint8Array(this.cache).subarray(1, 5);
  308. const dataView = new DataView(new Uint8Array(sub).buffer); // 注意这里要new一个新的Uint8Array,直接buffer取到的还是原数组的buffer
  309. this.cachePackageLength = dataView.getUint32(0, true);
  310. }
  311. }
  312. // 判断是否已经到包尾
  313. if (this.cache.length === this.cachePackageLength) {
  314. if (byte !== 0x0) {
  315. // 接收到尾字节不是0x0的错误数据包
  316. this.cache = undefined;
  317. this.cachePackageLength = 0;
  318. return;
  319. }
  320. if (this.packageType === 0) {
  321. // 4.0报文
  322. this.disposePackage40(new Uint8Array(this.cache));
  323. } else {
  324. // 5.0报文
  325. this.disposePackage50(new Uint8Array(this.cache));
  326. }
  327. this.cache = undefined;
  328. this.cachePackageLength = 0;
  329. }
  330. }
  331. }
  332. }
  333. /**
  334. * 处理完整的4.0报文
  335. * @param bytes 4.0报文字段流
  336. */
  337. private disposePackage40(bytes: Uint8Array) {
  338. const p = new Package40(undefined, bytes);
  339. if (p.packageLength === 0) {
  340. // 报文装箱失败
  341. return;
  342. }
  343. if (p.mainClassNumber === 0x12 || p.mainClassNumber === 0x41 || p.mainClassNumber === 0x42) {
  344. // 推送类报文, 0x12 - 心跳, 0x41 - 实时行情推送, 0x42 - 控制信号
  345. if (this.onReceivePush) this.onReceivePush(this, <T>p);
  346. // 收到心跳
  347. if (p.mainClassNumber === 0x12) {
  348. this.lastRecevieBeatTime = new Date();
  349. console.log(`packageType=${this.packageType},${this.host},收到心跳回复!`);
  350. }
  351. } else {
  352. // 非推送类报文
  353. const key = p.serialNumber + '_' + p.mainClassNumber;
  354. const asyncTask = this.asyncTaskMap.get(key);
  355. if (asyncTask) {
  356. clearTimeout(asyncTask.timeOut);
  357. if (asyncTask.callback && asyncTask.callback.onSuccess) asyncTask.callback.onSuccess(p);
  358. this.asyncTaskMap.delete(key);
  359. }
  360. }
  361. }
  362. /**
  363. * 处理完整的5.0报文
  364. * @param bytes 5.0报文字段流
  365. */
  366. private disposePackage50(bytes: Uint8Array) {
  367. const p = new Package50(undefined, bytes);
  368. if (p.packageLength === 0) {
  369. // 报文装箱失败
  370. return;
  371. }
  372. if (p.funCode === 0 || p.serialNumber === 0) {
  373. // 推送类报文
  374. if (this.onReceivePush) this.onReceivePush(this, <T>p);
  375. // 收到心跳
  376. if (p.funCode === 0) {
  377. console.log(`packageType=${this.packageType},${this.host},收到心跳回复!`);
  378. this.lastRecevieBeatTime = new Date();
  379. }
  380. } else {
  381. // 非推送类报文
  382. const key = p.serialNumber.toString();
  383. const asyncTask = this.asyncTaskMap.get(key);
  384. if (asyncTask) {
  385. // 判断是否需要检验回复功能码
  386. if (asyncTask.rspFunCode && asyncTask.rspFunCode !== p.funCode) {
  387. return;
  388. }
  389. clearTimeout(asyncTask.timeOut);
  390. if (asyncTask.callback && asyncTask.callback.onSuccess) asyncTask.callback.onSuccess(p);
  391. this.asyncTaskMap.delete(key);
  392. }
  393. }
  394. }
  395. /**
  396. * 判断是否可进行断网重连
  397. */
  398. private canReconnectThenError() {
  399. // 判断是否应该启动断网重连机制
  400. if (this.isReconnecting || !this.canReconnect || this.isBrokenReconnecting) return;
  401. // 回调当前所有发送信息错误块
  402. this.callAllAsyncTaskOnReconnecting();
  403. // 开始断网重连机制
  404. this.isReconnecting = true;
  405. this.reconnect();
  406. }
  407. /**
  408. * 断网重连方法,在重连尝试失败时会递归调用自己
  409. */
  410. private reconnect() {
  411. // 判断是否已由外部主动断开
  412. if (this.isBrokenReconnecting) {
  413. this.isReconnecting = false;
  414. return;
  415. }
  416. // 开始尝试重连服务端
  417. if (this.onReconnectChangeState) this.onReconnectChangeState!(this, ReconnectChangeState.BeginReconnect);
  418. this.conn(this.host)
  419. .then((res) => {
  420. // 连接成功
  421. this.isReconnecting = false;
  422. if (this.onReconnectChangeState) this.onReconnectChangeState!(this, ReconnectChangeState.ReconnectSuccessed);
  423. })
  424. .catch((err) => {
  425. // 连接失败
  426. if (this.onReconnectChangeState) this.onReconnectChangeState!(this, ReconnectChangeState.FailAndWaitPeriod);
  427. // 5秒后尝试重连
  428. if (this.reconnectTimer) {
  429. clearTimeout(this.reconnectTimer);
  430. }
  431. this.reconnectTimer = window.setTimeout(() => {
  432. this.reconnect();
  433. }, 5000);
  434. });
  435. }
  436. /**
  437. * 在断网重连时调用所有的错误回调
  438. */
  439. private callAllAsyncTaskOnReconnecting() {
  440. if (this.asyncTaskMap && this.asyncTaskMap.size > 0) {
  441. for (const key in this.asyncTaskMap) {
  442. const asyncTask = this.asyncTaskMap.get(key);
  443. if (asyncTask && asyncTask.callback && asyncTask.callback.onFail) {
  444. asyncTask.callback.onFail(new Error('发生断网重连'));
  445. clearTimeout(asyncTask.timeOut);
  446. }
  447. }
  448. this.asyncTaskMap.clear();
  449. }
  450. }
  451. }