index.ts 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. import { v4 } from 'uuid'
  2. import { Package40, Package50 } from '../package';
  3. import { Package, ConnectionState, SendMessage, AsyncTask } from './types';
  4. import moment from 'moment'
  5. /**
  6. * MTP2.0 长链通信类
  7. */
  8. export class MTP2WebSocket<T extends Package40 | Package50>{
  9. /** 报文协议 */
  10. private readonly Package: Package<T>;
  11. /** 当前实例标识 */
  12. private uuid = '';
  13. /** 服务端地址 */
  14. private host;
  15. /** WebSocket 对象 */
  16. private ws?: WebSocket;
  17. /** 心跳定时器 */
  18. private beatTimer = 0;
  19. /** 心跳发送时间 */
  20. private beatTimerId = '';
  21. /** 重连定时器 */
  22. private reconnectTimer = 0;
  23. /** 超时定时器 */
  24. private timeoutTimer = 0;
  25. /** 当前流水号 */
  26. private currentSerial = 1;
  27. /** 信息发送异步建值对 */
  28. private asyncTaskMap = new Map<number, AsyncTask<T>>();
  29. /** 连接准备完成 */
  30. private onReady?: Promise<MTP2WebSocket<T>>;
  31. /** 连接成功的回调 */
  32. onOpen?: (socket: MTP2WebSocket<T>) => void;
  33. /** 连接断开的回调 */
  34. onClose?: () => void;
  35. /** 连接发生错误的回调 */
  36. onError?: (e: Event) => void;
  37. /** 接收到推送类报文的回调 */
  38. onPush?: (data: T) => void;
  39. /** 在重连之前回调 */
  40. onBeforeReconnect?: (count: number) => void;
  41. /** 在重连成功之后回调 */
  42. onReconnect?: () => void;
  43. /** 当前连接状态 */
  44. connState: keyof typeof ConnectionState = 'Unconnected';
  45. /** 重连次数 */
  46. reconnectCount = 0;
  47. /** 心跳间隔时长,默认为30秒 */
  48. beatInterval = 30 * 1000;
  49. /** 消息超时时长,默认为15秒 */
  50. timeoutInterval = 15 * 1000;
  51. constructor(pkg: Package<T>, host = '') {
  52. this.Package = pkg;
  53. this.host = host;
  54. }
  55. /**
  56. * 连接服务器
  57. */
  58. async connect(host?: string, protocols?: string | string[]): Promise<MTP2WebSocket<T>> {
  59. if (!this.onReady) {
  60. clearTimeout(this.reconnectTimer);
  61. this.stopHeartBeat();
  62. this.host = host || this.host;
  63. this.connState = 'Connecting';
  64. console.log(this.host, '正在连接');
  65. this.onReady = new Promise((resolve, reject) => {
  66. const errMsg = this.host + ' 连接发生错误';
  67. const uuid = v4();
  68. this.uuid = uuid;
  69. try {
  70. this.ws = new WebSocket(this.host, protocols);
  71. // 连接成功
  72. this.ws.onopen = () => {
  73. console.log(this.host, '连接成功');
  74. this.connState = 'Connected';
  75. this.startHeartBeat();
  76. this.onOpen && this.onOpen(this);
  77. resolve(this);
  78. }
  79. // 连接断开(CLOSING有可能延迟)
  80. this.ws.onclose = () => {
  81. // 判断是否当前实例
  82. // 如果连接断开后不等待 onclose 响应,由于 onclose 延迟的原因可能会在创建新的 ws 实例后触发,导致刚创建的实例被断开进入重连机制
  83. if (this.uuid === uuid) {
  84. console.warn(this.host, '连接已断开');
  85. this.reset();
  86. this.reconnect(); // 重连失败会不断尝试,直到成功为止
  87. }
  88. }
  89. // 连接发生错误
  90. this.ws.onerror = (e) => {
  91. this.onError && this.onError(e);
  92. reject(errMsg);
  93. }
  94. // 接收数据
  95. this.ws.onmessage = (e) => {
  96. // 接收数据
  97. new Response(e.data).arrayBuffer().then((res) => {
  98. this.disposeReceiveDatas(new Uint8Array(res));
  99. })
  100. }
  101. } catch {
  102. reject(errMsg);
  103. }
  104. })
  105. }
  106. return this.onReady
  107. }
  108. /**
  109. * 主动断开连接,断开后不会自动重连
  110. */
  111. close() {
  112. clearTimeout(this.reconnectTimer);
  113. this.stopHeartBeat();
  114. this.uuid = v4(); // 改变实例标识,取消重连状态
  115. this.reconnectCount = 0;
  116. if (this.ws) {
  117. this.ws.close();
  118. console.warn(this.host, '主动断开');
  119. }
  120. this.reset(); // 不等待 ws.onclose 响应强制重置实例
  121. }
  122. /**
  123. * 重置实例
  124. */
  125. private reset() {
  126. this.ws = undefined;
  127. this.onReady = undefined;
  128. this.connState = 'Unconnected';
  129. this.onClose && this.onClose();
  130. }
  131. /**
  132. * 启动心跳请求包发送Timer
  133. */
  134. private startHeartBeat() {
  135. this.beatTimer = window.setTimeout(() => {
  136. this.beatTimerId = moment(new Date()).format('HH:mm:ss');
  137. //console.log(this.host, '发送心跳', this.beatTimerId);
  138. // 发送心跳
  139. switch (this.Package) {
  140. case Package40: {
  141. // 4.0
  142. this.send({
  143. data: { rspCode: -1, payload: new this.Package(0x12) }
  144. })
  145. break;
  146. }
  147. case Package50: {
  148. // 5.0
  149. this.send({
  150. data: { rspCode: 0, payload: new this.Package(0) }
  151. })
  152. break;
  153. }
  154. }
  155. // 如果已经超过或心跳超时时长没有收到心跳回复,则认为网络已经异常,进行断网重连
  156. // WARNING: - 注意在正常断网时停止心跳逻辑
  157. this.timeoutTimer = window.setTimeout(() => {
  158. console.warn(this.host, '心跳超时', this.beatTimerId);
  159. this.reconnect();
  160. }, this.timeoutInterval)
  161. }, this.beatInterval)
  162. }
  163. /**
  164. * 停止心跳请求包发送Timer
  165. */
  166. private stopHeartBeat() {
  167. clearTimeout(this.timeoutTimer);
  168. clearTimeout(this.beatTimer);
  169. }
  170. /**
  171. * 断网重连方法,在重连尝试失败时会再次重试
  172. */
  173. private reconnect() {
  174. this.stopHeartBeat();
  175. if (this.connState !== 'Connecting') {
  176. this.reconnectCount++;
  177. this.onBeforeReconnect && this.onBeforeReconnect(this.reconnectCount);
  178. // 自动计算每次重试的延时,重试次数越多,延时越大
  179. const delay = this.reconnectCount * 5000
  180. console.log(this.host, `${delay / 1000}秒后将进行第${this.reconnectCount}次重连`);
  181. this.reconnectTimer = window.setTimeout(() => {
  182. this.onReady = undefined;
  183. this.connect().then(() => {
  184. console.log(this.host, '重连成功,可开始进行业务操作');
  185. this.reconnectCount = 0;
  186. this.onReconnect && this.onReconnect();
  187. }).catch(() => {
  188. // 重连失败会进入 ws.onclose 再次发起重连
  189. if (this.reconnectCount) {
  190. console.warn(this.host, `第${this.reconnectCount}次重连失败`);
  191. }
  192. })
  193. }, delay);
  194. }
  195. }
  196. /**
  197. * 发送报文
  198. * @param data 目标报文
  199. * @param fncode 回复号。P为Package40对象时为回复大类号,同时必传;P为Package50对象为回复功能码,可不传。
  200. * @param success 成功回调
  201. * @param fail 失败回调
  202. */
  203. send(msg: SendMessage<T>) {
  204. const { data, success, fail } = msg;
  205. const { payload } = data;
  206. if (this.onReady) {
  207. this.onReady.then(() => {
  208. const mapKey = this.currentSerial;
  209. payload.serialNumber = mapKey; // 设置流水号
  210. // 判断是否需要异步回调
  211. if (success || fail) {
  212. // 保存发送任务,为任务设置一个自动超时定时器
  213. this.asyncTaskMap.set(mapKey, {
  214. sendMessage: msg,
  215. timeoutId: window.setTimeout(() => this.asyncTimeout(mapKey), this.timeoutInterval),
  216. })
  217. }
  218. // 发送信息
  219. this.ws?.send(payload.data());
  220. this.currentSerial++;
  221. })
  222. } else {
  223. fail && fail('服务未连接');
  224. }
  225. }
  226. /**
  227. * 异步任务超时
  228. * @param key key
  229. */
  230. private asyncTimeout(key: number) {
  231. // 获取对应异步任务对象
  232. const asyncTask = this.asyncTaskMap.get(key);
  233. const onFail = asyncTask?.sendMessage.fail;
  234. onFail && onFail('业务超时');
  235. this.asyncTaskMap.delete(key);
  236. }
  237. /**
  238. * 处理接收数据
  239. * @param bytes 接收的数据
  240. */
  241. private disposeReceiveDatas(bytes: Uint8Array) {
  242. const cache: number[] = [];
  243. let cachePackageLength = 0;
  244. for (let i = 0; i < bytes.length; i++) {
  245. const byte = bytes[i];
  246. cache.push(byte);
  247. if (i === 0 && byte !== 0xff) {
  248. console.error('接收到首字节不是0xFF');
  249. return;
  250. } else {
  251. // 取报文长度
  252. if (cache.length === 5) {
  253. switch (this.Package) {
  254. // 4.0报文
  255. case Package40: {
  256. cachePackageLength = new DataView(new Uint8Array(new Uint8Array(cache).subarray(1, 5)).buffer).getUint32(0, false);
  257. if (cachePackageLength > 65535) {
  258. console.error('接收到长度超过65535的行情包');
  259. return;
  260. }
  261. break;
  262. }
  263. // 5.0报文
  264. case Package50: {
  265. const sub = new Uint8Array(cache).subarray(1, 5);
  266. const dataView = new DataView(new Uint8Array(sub).buffer); // 注意这里要new一个新的Uint8Array,直接buffer取到的还是原数组的buffer
  267. cachePackageLength = dataView.getUint32(0, true);
  268. break;
  269. }
  270. }
  271. }
  272. // 判断是否已经到包尾
  273. if (cache.length === cachePackageLength) {
  274. if (byte !== 0x0) {
  275. console.error('接收到尾字节不是0x0的错误数据包');
  276. return;
  277. }
  278. const content = new Uint8Array(cache);
  279. const result = new this.Package(content);
  280. if (result.packageLength === 0) {
  281. console.error('报文装箱失败');
  282. return;
  283. }
  284. this.disposePackageResult(result);
  285. }
  286. }
  287. }
  288. }
  289. /**
  290. * 处理完整的报文
  291. */
  292. private disposePackageResult(p: T) {
  293. const asyncTask = this.asyncTaskMap.get(p.serialNumber);
  294. const sendMessage = asyncTask?.sendMessage;
  295. // 4.0报文
  296. if (p instanceof Package40) {
  297. // 推送类报文, 0x12 - 心跳, 0x41 - 实时行情推送, 0x42 - 控制信号
  298. switch (p.mainClassNumber) {
  299. case 0x12: {
  300. // 接收到心跳回复
  301. //console.log(this.host, '收到心跳回复', this.beatTimerId);
  302. this.stopHeartBeat();
  303. this.startHeartBeat();
  304. break;
  305. }
  306. case 0x41:
  307. case 0x42: {
  308. // 推送类报文
  309. this.onPush && this.onPush(p);
  310. break;
  311. }
  312. default: {
  313. // 非推送类报文
  314. sendMessage?.success && sendMessage.success(p);
  315. }
  316. }
  317. }
  318. // 5.0报文
  319. if (p instanceof Package50) {
  320. if (p.funCode === 0) {
  321. // 接收到心跳回复
  322. //console.log(this.host, '收到心跳回复', this.beatTimerId);
  323. this.stopHeartBeat();
  324. this.startHeartBeat();
  325. } else if (p.serialNumber === 0) {
  326. // 推送类报文
  327. this.onPush && this.onPush(p);
  328. } else if (sendMessage) {
  329. // 非推送类报文
  330. const { data, success } = sendMessage;
  331. // 可能会收到多个流水号相同的数据包,需判断是否正确的回复码
  332. if (data.rspCode === p.funCode) {
  333. success && success(p);
  334. } else {
  335. // 跳过当前收到的数据包,继续等待直到asyncTask超时
  336. //console.log('无效的回复码', p.funCode)
  337. return
  338. }
  339. }
  340. }
  341. // 移除当前发送的任务请求
  342. if (asyncTask) {
  343. clearTimeout(asyncTask.timeoutId);
  344. this.asyncTaskMap.delete(p.serialNumber);
  345. }
  346. }
  347. }