index.ts 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. import { Package40, Package50 } from './package';
  2. import { Package, ConnectionState, SendMessage, AsyncTask } from './interface';
  3. import { v4 } from '../uuid/index'
  4. import moment from 'moment'
  5. /**
  6. * MTP2.0 长链通信类
  7. */
  8. export class MTP2WebSocket<T extends Package40 | Package50>{
  9. /** 报文协议 */
  10. private readonly Package: Package<T>;
  11. /** 当前实例标识 */
  12. private uuid = '';
  13. /** 服务端地址 */
  14. private host;
  15. /** WebSocket 对象 */
  16. private ws?: WechatMiniprogram.SocketTask;
  17. /** 心跳定时器 */
  18. private beatTimer = 0;
  19. /** 心跳发送时间 */
  20. private beatTimerId = '';
  21. /** 重连定时器 */
  22. private reconnectTimer = 0;
  23. /** 超时定时器 */
  24. private timeoutTimer = 0;
  25. /** 当前流水号 */
  26. private currentSerial = 1;
  27. /** 信息发送异步建值对 */
  28. private asyncTaskMap = new Map<string, AsyncTask<T>>();
  29. /** 初始默认重连间隔时长 */
  30. private defaultReconnectInterval = 5 * 1000;
  31. /** 连接准备完成 */
  32. private onReady?: Promise<MTP2WebSocket<T>>;
  33. /** 连接成功的回调 */
  34. onOpen?: (socket: MTP2WebSocket<T>) => void;
  35. /** 连接断开的回调 */
  36. onClose?: () => void;
  37. /** 连接发生错误的回调 */
  38. onError?: (e: string) => void;
  39. /** 接收到推送类报文的回调 */
  40. onPush?: (data: T) => void;
  41. /** 重连状态发生改变时的回调 */
  42. onReconnect?: (socket: MTP2WebSocket<T>) => void;
  43. /** 当前连接状态 */
  44. connState: keyof typeof ConnectionState = 'Unconnected';
  45. /** 重连次数 */
  46. reconnectCount = 0;
  47. /** 心跳间隔时长,默认为30秒 */
  48. beatInterval = 30 * 1000;
  49. /** 重连间隔时长,默认为5秒 */
  50. reconnectInterval = this.defaultReconnectInterval;
  51. /** 消息超时时长,默认为15秒 */
  52. timeoutInterval = 15 * 1000;
  53. constructor(protocol: Package<T>, host = '') {
  54. this.Package = protocol;
  55. this.host = host;
  56. }
  57. /**
  58. * 连接服务器
  59. */
  60. connect(host?: string): Promise<MTP2WebSocket<T>> {
  61. if (!this.onReady) {
  62. clearTimeout(this.reconnectTimer);
  63. this.stopHeartBeat();
  64. this.host = host || this.host;
  65. this.connState = 'Connecting';
  66. console.log(this.Package.name, this.host, '正在连接');
  67. this.onReady = new Promise((resolve, reject) => {
  68. const uuid = v4();
  69. this.uuid = uuid;
  70. this.ws = wx.connectSocket({ url: this.host });
  71. try {
  72. // 连接成功
  73. this.ws.onOpen(() => {
  74. console.log(this.Package.name, this.host, '连接成功');
  75. this.connState = 'Connected';
  76. this.startHeartBeat();
  77. this.onOpen && this.onOpen(this);
  78. resolve(this);
  79. })
  80. // 连接断开(CLOSING有可能延迟)
  81. this.ws.onClose(() => {
  82. // 判断是否当前实例
  83. // 如果连接断开后不等待 onclose 响应,由于 onclose 延迟的原因可能会在创建新的 ws 实例后触发,导致刚创建的实例被断开进入重连机制
  84. if (this.uuid === uuid) {
  85. console.warn(this.Package.name, this.host, '连接已断开');
  86. this.reset();
  87. this.reconnect();
  88. }
  89. })
  90. // 连接发生错误
  91. this.ws.onError((e) => {
  92. const message = this.host + '连接发生错误';
  93. this.onError && this.onError(e.errMsg);
  94. reject(message);
  95. })
  96. // 接收数据
  97. this.ws.onMessage((e) => {
  98. if (e.data instanceof ArrayBuffer) {
  99. this.disposeReceiveDatas(new Uint8Array(e.data));
  100. } else {
  101. const data = e.data.split('').map((val, index) => val.charCodeAt(index));
  102. this.disposeReceiveDatas(new Uint8Array(data));
  103. }
  104. })
  105. }
  106. catch {
  107. reject(this.Package.name + ' 连接失败');
  108. }
  109. })
  110. }
  111. return this.onReady;
  112. }
  113. /**
  114. * 主动断开连接,断开后不会自动重连
  115. */
  116. close() {
  117. clearTimeout(this.reconnectTimer);
  118. this.stopHeartBeat();
  119. this.uuid = v4();
  120. this.reconnectCount = 0;
  121. this.reconnectInterval = this.defaultReconnectInterval;
  122. if (this.ws) {
  123. this.ws.close({ reason: '主动断开' });
  124. console.warn(this.Package.name, this.host, '主动断开');
  125. }
  126. this.reset(); // 不等待 ws.onclose 响应强制重置实例
  127. }
  128. /**
  129. * 重置实例
  130. */
  131. private reset() {
  132. this.ws = undefined;
  133. this.onReady = undefined;
  134. this.connState = 'Unconnected';
  135. this.onClose && this.onClose();
  136. }
  137. /**
  138. * 启动心跳请求包发送Timer
  139. */
  140. private startHeartBeat() {
  141. this.beatTimer = setTimeout(() => {
  142. this.beatTimerId = moment(new Date()).format('HH:mm:ss');
  143. console.log(this.Package.name, '发送心跳', this.beatTimerId);
  144. // 发送心跳
  145. switch (this.Package) {
  146. case Package40: {
  147. // 4.0
  148. this.send({
  149. data: { rspCode: -1, payload: new this.Package(0x12) }
  150. })
  151. break;
  152. }
  153. case Package50: {
  154. // 5.0
  155. this.send({
  156. data: { payload: new this.Package(0) }
  157. })
  158. break;
  159. }
  160. }
  161. // 如果已经超过或心跳超时时长没有收到心跳回复,则认为网络已经异常,进行断网重连
  162. // WARNING: - 注意在正常断网时停止心跳逻辑
  163. this.timeoutTimer = setTimeout(() => {
  164. console.warn(this.Package.name, '心跳超时', this.beatTimerId);
  165. this.reconnect();
  166. }, this.timeoutInterval)
  167. }, this.beatInterval)
  168. }
  169. /**
  170. * 停止心跳请求包发送Timer
  171. */
  172. private stopHeartBeat() {
  173. clearTimeout(this.timeoutTimer);
  174. clearTimeout(this.beatTimer);
  175. }
  176. /**
  177. * 断网重连方法,在重连尝试失败时会递归调用自己
  178. */
  179. private reconnect() {
  180. this.stopHeartBeat();
  181. if (this.connState !== 'Connecting') {
  182. this.reconnectCount++;
  183. console.log(this.Package.name, this.host, `${this.reconnectInterval / 1000}秒后将进行第${this.reconnectCount}次重连`);
  184. this.reconnectTimer = setTimeout(() => {
  185. this.onReady = undefined;
  186. this.connect().then(() => {
  187. console.log(this.Package.name, this.host, '重连成功,可开始进行业务操作');
  188. this.reconnectCount = 0;
  189. this.reconnectInterval = this.defaultReconnectInterval;
  190. this.onReconnect && this.onReconnect(this);
  191. }).catch(() => {
  192. if (this.reconnectCount) {
  193. this.reconnectInterval += this.defaultReconnectInterval; // 重连间隔时间每次递增5秒
  194. console.warn(this.Package.name, this.host, `第${this.reconnectCount}次重连失败`);
  195. }
  196. })
  197. }, this.reconnectInterval);
  198. }
  199. }
  200. /**
  201. * 发送报文
  202. * @param data 目标报文
  203. * @param fncode 回复号。P为Package40对象时为回复大类号,同时必传;P为Package50对象为回复功能码,可不传。
  204. * @param success 成功回调
  205. * @param fail 失败回调
  206. */
  207. send(msg: SendMessage<T>) {
  208. const { data, success, fail } = msg;
  209. const { rspCode, payload } = data;
  210. if (this.onReady) {
  211. this.onReady.then(() => {
  212. // 设置流水号
  213. payload.serialNumber = this.currentSerial;
  214. this.currentSerial++;
  215. let key = payload.serialNumber.toString();
  216. // 4.0报文
  217. if (payload instanceof Package40) {
  218. if (!rspCode && fail) {
  219. fail('缺少rspCode参数');
  220. return;
  221. }
  222. key += '_' + rspCode;
  223. }
  224. // 判断是否需要异步回调
  225. if (success || fail) {
  226. // 保存发送任务,为任务设置一个自动超时定时器
  227. this.asyncTaskMap.set(key, {
  228. sendMessage: msg,
  229. timeoutId: setTimeout(() => this.asyncTimeout(key), this.timeoutInterval),
  230. })
  231. }
  232. // 发送信息
  233. this.ws?.send({ data: payload.data().buffer });
  234. })
  235. } else {
  236. fail && fail('服务未连接');
  237. }
  238. }
  239. /**
  240. * 异步任务超时
  241. * @param key key
  242. */
  243. private asyncTimeout(key: string) {
  244. // 获取对应异步任务对象
  245. const asyncTask = this.asyncTaskMap.get(key);
  246. const onFail = asyncTask?.sendMessage.fail;
  247. onFail && onFail('业务超时');
  248. this.asyncTaskMap.delete(key);
  249. }
  250. /**
  251. * 处理接收数据
  252. * @param bytes 接收的数据
  253. */
  254. private disposeReceiveDatas(bytes: Uint8Array) {
  255. const cache: number[] = [];
  256. let cachePackageLength = 0;
  257. for (let i = 0; i < bytes.length; i++) {
  258. const byte = bytes[i];
  259. cache.push(byte);
  260. if (i === 0 && byte !== 0xff) {
  261. console.error('接收到首字节不是0xFF');
  262. return;
  263. } else {
  264. // 取报文长度
  265. if (cache.length === 5) {
  266. switch (this.Package) {
  267. // 4.0报文
  268. case Package40: {
  269. cachePackageLength = new DataView(new Uint8Array(new Uint8Array(cache).subarray(1, 5)).buffer).getUint32(0, false);
  270. if (cachePackageLength > 65535) {
  271. console.error('接收到长度超过65535的行情包');
  272. return;
  273. }
  274. break;
  275. }
  276. // 5.0报文
  277. case Package50: {
  278. const sub = new Uint8Array(cache).subarray(1, 5);
  279. const dataView = new DataView(new Uint8Array(sub).buffer); // 注意这里要new一个新的Uint8Array,直接buffer取到的还是原数组的buffer
  280. cachePackageLength = dataView.getUint32(0, true);
  281. break;
  282. }
  283. }
  284. }
  285. // 判断是否已经到包尾
  286. if (cache.length === cachePackageLength) {
  287. if (byte !== 0x0) {
  288. console.error('接收到尾字节不是0x0的错误数据包');
  289. return;
  290. }
  291. const content = new Uint8Array(cache);
  292. const result = new this.Package(content);
  293. if (result.packageLength === 0) {
  294. console.error('报文装箱失败');
  295. return;
  296. }
  297. this.disposePackageResult(result);
  298. }
  299. }
  300. }
  301. }
  302. /**
  303. * 处理完整的报文
  304. */
  305. private disposePackageResult(p: T) {
  306. let key = p.serialNumber.toString();
  307. if (p instanceof Package40) {
  308. key += '_' + p.mainClassNumber;
  309. }
  310. const asyncTask = this.asyncTaskMap.get(key);
  311. const sendMessage = asyncTask?.sendMessage;
  312. // 移除当前发送的任务请求
  313. if (asyncTask) {
  314. clearTimeout(asyncTask.timeoutId);
  315. this.asyncTaskMap.delete(key);
  316. }
  317. // 4.0报文
  318. if (p instanceof Package40) {
  319. // 推送类报文, 0x12 - 心跳, 0x41 - 实时行情推送, 0x42 - 控制信号
  320. switch (p.mainClassNumber) {
  321. case 0x12: {
  322. // 接收到心跳回复
  323. console.log(this.Package.name, '收到心跳回复', this.beatTimerId);
  324. this.stopHeartBeat();
  325. this.startHeartBeat();
  326. break;
  327. }
  328. case 0x41:
  329. case 0x42: {
  330. // 推送类报文
  331. this.onPush && this.onPush(p);
  332. break;
  333. }
  334. default: {
  335. // 非推送类报文
  336. sendMessage?.success && sendMessage.success(p);
  337. }
  338. }
  339. }
  340. // 5.0报文
  341. if (p instanceof Package50) {
  342. if (p.funCode === 0) {
  343. // 接收到心跳回复
  344. console.log(this.Package.name, '收到心跳回复', this.beatTimerId);
  345. this.stopHeartBeat();
  346. this.startHeartBeat();
  347. } else if (p.serialNumber === 0) {
  348. // 推送类报文
  349. this.onPush && this.onPush(p);
  350. } else if (sendMessage) {
  351. // 非推送类报文
  352. const { data, success, fail } = sendMessage;
  353. // 判断是否需要检验回复功能码
  354. if (data.rspCode && data.rspCode !== p.funCode) {
  355. fail && fail('无效的rspCode:');
  356. } else {
  357. success && success(p);
  358. }
  359. }
  360. }
  361. }
  362. }