QuoteTcpLinkProxy.cs 20 KB


  1. using Muchinfo.MTPClient.Data.Enums;
  2. using Muchinfo.MTPClient.Data.Model;
  3. using Muchinfo.MTPClient.Infrastructure.Cache;
  4. using Muchinfo.MTPClient.Infrastructure.EntityHelpers;
  5. using Muchinfo.MTPClient.Infrastructure.Helpers;
  6. using Muchinfo.MTPClient.Infrastructure.Utilities;
  7. using Muchinfo.MTPClient.NetworkCore;
  8. using System;
  9. using System.Collections.Generic;
  10. using System.Linq;
  11. using System.Net;
  12. using System.Threading;
  13. using System.Timers;
  14. using Timer = System.Timers.Timer;
  15. namespace Muchinfo.MTPClient.Infrastructure.LinkProxy.TCP
  16. {
  17. /// <summary>
  18. /// 业务代理连接
  19. /// </summary>
  20. public class QuoteTcpLinkProxy : IDisposable
  21. {
  22. public string _host; ////主机地址
  23. public int _port; ////通信端口号
  24. private TCPManager _tcpManager;
  25. private Timer _timer;
  26. private bool _isGetGoodsInfo; ////是否从行情服务获取商品信息
  27. private readonly object _lock = new object();
  28. private bool _subscribeFlag = true;
  29. private bool _queryDayQuote = true;
  30. private const int c_AccountId = 2;
  31. private const string c_checkToken = "2_TOKEN_NEKOT_"; ///2_TOKEN_NEKOT_
  32. public TCPConnectState TcpConnectState
  33. {
  34. get { return (_tcpManager == null) ? TCPConnectState.Closed : _tcpManager.tcpConnectState; }
  35. }
  36. /// <summary>
  37. /// 创建业务连接
  38. /// </summary>
  39. /// <param name="isGetGoods">是否获取商品信息</param>
  40. public QuoteTcpLinkProxy(string host, int port, bool isGetGoods = false)
  41. {
  42. _isGetGoodsInfo = isGetGoods;
  43. _host = host;
  44. _port = port;
  45. _tcpManager = new TCPManager(DatagramType.Datagram40, new QuoteTcpCallback(this));
  46. LogInfoHelper.WriteInfo("开始连接行情服务..");
  47. _tcpManager.Connect(_host, _port, ConnectSuccess, ConnectFail);
  48. var interval = GetSyncDayQuoteInterval();
  49. _timer = new Timer(interval);
  50. _timer.Elapsed += Timer_Elapsed;
  51. }
  52. #region Private Methods
  53. /// <summary>
  54. /// Handles the Elapsed event of the Timer control.
  55. /// </summary>
  56. /// <param name="sender">The source of the event.</param>
  57. /// <param name="e">The <see cref="ElapsedEventArgs"/> instance containing the event data.</param>
  58. private void Timer_Elapsed(object sender, ElapsedEventArgs e)
  59. {
  60. QueryDayQuote();
  61. }
  62. /// <summary>
  63. /// 获取同步盘面间隔,默认10分钟
  64. /// </summary>
  65. /// <returns>System.Int32.</returns>
  66. private int GetSyncDayQuoteInterval()
  67. {
  68. int interval = 10;
  69. try
  70. {
  71. var value = ApplicationParameter.DayQuoteInterval;
  72. if (value <= 0)
  73. {
  74. interval = 10;
  75. }
  76. }
  77. catch
  78. {
  79. interval = 10;
  80. }
  81. ////单位:分钟
  82. return interval * 1000 * 60;
  83. }
  84. /// <summary>
  85. /// 连接失败
  86. /// </summary>
  87. /// <param name="errorCode">错误码</param>
  88. /// <param name="desc">连接描述</param>
  89. private void ConnectFail(int errorCode, string desc)
  90. {
  91. //停止同步盘面定时器,Subscribe()成功后会启动
  92. StopSyncTimer();
  93. _tcpManager.ReconnectWithDisconnFirst(); //行情连接不上进需重连
  94. //todo:通知连接失败
  95. LogInfoHelper.WriteInfo("行情服务连接失败![" + errorCode + "] " + desc);
  96. MessengerHelper.DefaultSend(false, MessengerTokens.QuoteCheckTokenSuccess);
  97. }
  98. /// <summary>
  99. /// 连接成功
  100. /// </summary>
  101. private void ConnectSuccess()
  102. {
  103. if (!_isGetGoodsInfo)
  104. {
  105. LogInfoHelper.WriteInfo("行情服务连接成功!, 开始订阅商品.");
  106. _subscribeFlag = true;
  107. //连接成功订阅商品
  108. Subscribe();
  109. }
  110. else
  111. {
  112. LogInfoHelper.WriteInfo(" 行情服务令牌校验.");
  113. ////取时间
  114. CheckedToken();
  115. }
  116. }
  117. #endregion
  118. #region Public Methods
  119. /// <summary>
  120. /// 发送消息,
  121. /// </summary>
  122. /// <param name="package">消息内容</param>
  123. /// <param name="successAction">成功回复</param>
  124. /// <param name="failAction">错误回复</param>
  125. public void SendPackage(TCPPackage40 package, Action<TCPPackage40> successAction, Action<int, string> failAction)
  126. {
  127. if (_tcpManager == null || TcpConnectState != TCPConnectState.Connected) return;
  128. _tcpManager.SendMessage(package, successAction, failAction);
  129. }
  130. /// <summary>
  131. /// 通知通信组件开始发送心跳的方法
  132. /// </summary>
  133. public void StartSendBeat()
  134. {
  135. if (_tcpManager == null) return;
  136. _tcpManager.StartSendBeats();
  137. }
  138. /// <summary>
  139. /// 组包并订阅
  140. /// </summary>
  141. public void Subscribe()
  142. {
  143. Subscribe(GetAvailableList(CacheManager.CacheGoodsBaseInfos));
  144. }
  145. /// <summary>
  146. /// 断线(开市)清盘
  147. /// </summary>
  148. public void ClearQuote()
  149. {
  150. ///先清盘
  151. foreach (var goods in CacheManager.CacheGoodsBaseInfos)
  152. {
  153. if (_isGetGoodsInfo && goods.ContainsGoodsSrc == (int)GoodsFromScr.Brown) ////行情商品清盘
  154. {
  155. goods.ClearQuote();
  156. }
  157. else if (!_isGetGoodsInfo && (goods.ContainsGoodsSrc & (int)GoodsFromScr.Trade) > 0) ////交易商品清盘
  158. {
  159. goods.ClearQuote();
  160. }
  161. }
  162. }
  163. /// <summary>
  164. /// 组包并订阅
  165. /// </summary>
  166. /// <param name="list">The list.</param>
  167. public void Subscribe(List<QuoteGoods> list)
  168. {
  169. if (_subscribeFlag)
  170. {
  171. _subscribeFlag = false;
  172. try
  173. {
  174. if (null == list || !list.Any())
  175. {
  176. list = new List<QuoteGoods>()
  177. {
  178. new QuoteGoods("250", "99999999", "99999999", QuoteCategoryType.PreciousMetal)
  179. };
  180. LogInfoHelper.WriteInfo("调用Subscribe() 订阅QuoteGoods列表为空,发送不存在商品进行取消订阅!");
  181. }
  182. TCPPackage40 gram = new TCPPackage40
  183. {
  184. Tag = (byte) Datagram40TagPrimaryFunction.Trade,
  185. Tag2 = (byte) Datagram40TagPrimaryFunction.Reserve //随便填值
  186. };
  187. var accountId = _isGetGoodsInfo ? c_AccountId : UserManager.CurrentTradeAccount.LoginID;
  188. var token = _isGetGoodsInfo ? c_checkToken : UserManager.CurrentTradeAccount.Token;
  189. byte[] bytes = list.GetSubscribeBytesV2(accountId, token);
  190. //LogInfoHelper.WriteInfo("Login:" + BitConverter.ToString(bytes));
  191. gram.SetData(bytes);
  192. SendPackage(gram, LogInCallback, ExceptionCallback);
  193. }
  194. catch (Exception e)
  195. {
  196. LogInfoHelper.WriteInfo("调用Subscribe() catch:" + e.Message);
  197. }
  198. finally
  199. {
  200. _subscribeFlag = true;
  201. }
  202. }
  203. }
  204. /// <summary>
  205. /// 行情链路令牌校检
  206. /// </summary>
  207. public void CheckedToken()
  208. {
  209. if (!_isGetGoodsInfo) ////交易链路不校检
  210. return;
  211. ////行情链路校检Token取服务器时间
  212. try
  213. {
  214. TCPPackage40 gram = new TCPPackage40
  215. {
  216. Tag = (byte)Datagram40TagPrimaryFunction.QuoteChekedToken,
  217. Tag2 = (byte)Datagram40TagPrimaryFunction.Reserve //随便填值
  218. };
  219. byte[] bytes = QuoteDatagramHelper.QuoteCheckToken( c_AccountId, c_checkToken);
  220. gram.SetData(bytes);
  221. SendPackage(gram, CheckedSuccess, CheckedTokenError);
  222. }
  223. catch (Exception e)
  224. {
  225. _subscribeFlag = true;
  226. LogInfoHelper.WriteInfo("调用CheckedToken() catch:" + e.Message);
  227. }
  228. }
  229. public void CheckedSuccess(TCPPackage40 package)
  230. {
  231. try
  232. {
  233. var bytes = package.GetData();
  234. var count = GetReturnCode(bytes);
  235. //>=0: Goods个数; <0: 错误码(-1:报文解析失败,-2:Token校验失败,-3:订阅商品不合法)
  236. LogInfoHelper.WriteInfo("行情链路令牌校检, 返回码:" + count);
  237. if (count >= 0)
  238. {
  239. byte[] block = new byte[4];
  240. Array.Copy(bytes, 4, block, 0, 4);
  241. var servrTime = IPAddress.NetworkToHostOrder(BitConverter.ToInt32(block, 0));
  242. ApplicationParameter.SetSystemTime(servrTime);
  243. MessengerHelper.DefaultSend(true, MessengerTokens.QuoteCheckTokenSuccess);
  244. }
  245. else
  246. {
  247. MessengerHelper.DefaultSend(false, MessengerTokens.QuoteCheckTokenSuccess);
  248. }
  249. }
  250. catch (Exception ex)
  251. {
  252. LogInfoHelper.WriteInfo("查询QueryDayQuote解析失败" + ex.ToString());
  253. }
  254. }
  255. /// <summary>
  256. /// Checked
  257. /// </summary>
  258. /// <param name="errorId"></param>
  259. /// <param name="message"></param>
  260. private void CheckedTokenError(int errorId, string message)
  261. {
  262. MessengerHelper.DefaultSend(false, MessengerTokens.QuoteCheckTokenSuccess);
  263. LogInfoHelper.WriteInfo("调用CheckedToken() " + errorId + ":" + message);
  264. }
  265. /// <summary>
  266. /// Queries the day quote.
  267. /// </summary>
  268. public void QueryDayQuote()
  269. {
  270. QueryDayQuote(GetAvailableList(CacheManager.CacheGoodsBaseInfos));
  271. }
  272. /// <summary>
  273. /// Gets the available list.
  274. /// </summary>
  275. /// <param name="list">The list.</param>
  276. /// <returns>IEnumerable{QuoteGoods}.</returns>
  277. private List<QuoteGoods> GetAvailableList(List<QuoteGoods> list)
  278. {
  279. //if (list == null || !list.Any()) return null;
  280. //////交易商品
  281. //if (!_isGetGoodsInfo)
  282. //{
  283. // list = list.Where((item) => (item.ContainsGoodsSrc & (int)GoodsFromScr.trade) > 0).ToList();
  284. //}
  285. //else
  286. //{
  287. // ////行情商品
  288. // list = list.Where((item) => (item.ContainsGoodsSrc == (int)GoodsFromScr.Brown)).ToList();
  289. //}
  290. //////商城、发售商品不需要订阅和查询盘面
  291. ////var result = list.Where(z => z.GoodsParameters != null && z.TradeMode != eTradeMode.TRADEMODE_SALE && z.TradeMode != eTradeMode.TRADEMODE_MALL);
  292. //var result = list.Where(z => z.GoodsParameters != null);
  293. //if (result == null || !result.Any()) return null;
  294. //return result.ToList();
  295. //订阅持仓商品+当前分类标签显示的商品+当前交易面板的商品 -- dyp
  296. var lst = new List<int>();
  297. if(UserManager.CurrentGoodsId >0) lst.Add(UserManager.CurrentGoodsId);
  298. var pList = UserManager.GetHoldSummaries(UserManager.CurrentTradeAccount.FundsAccountId);
  299. if (pList != null && pList.Any()) lst.AddRange(pList.Select(z => (int)z.GoodsId).ToList());
  300. if (UserManager.CurrentGoodsIds != null && UserManager.CurrentGoodsIds.Any()) lst.AddRange(UserManager.CurrentGoodsIds);
  301. var goodses = CacheManager.CacheGoodsBaseInfos.Where(z => lst.Contains((int)z.GoodsId)) ?? new List<QuoteGoods>();
  302. return goodses.ToList();
  303. }
  304. /// <summary>
  305. /// 盘面查询申请
  306. /// </summary>
  307. /// <param name="list">The list.</param>
  308. public void QueryDayQuote(List<QuoteGoods> list)
  309. {
  310. lock (_lock)
  311. {
  312. if (_queryDayQuote)
  313. {
  314. if (list == null || !list.Any()) return;
  315. try
  316. {
  317. _queryDayQuote = false;
  318. TCPPackage40 gram = new TCPPackage40
  319. {
  320. Tag = (byte)Datagram40TagPrimaryFunction.Query,
  321. Tag2 = (byte)Datagram40TagPrimaryFunction.Reserve, //随便填值
  322. };
  323. var accountId = _isGetGoodsInfo ? c_AccountId : UserManager.CurrentTradeAccount.LoginID;
  324. var token = _isGetGoodsInfo ? c_checkToken : UserManager.CurrentTradeAccount.Token;
  325. byte[] bytes = list.ToList()
  326. .GetSubscribeBytesV2(accountId, token);
  327. gram.SetData(bytes);
  328. SendPackage(gram, QueryDayQuoteCallback, QueryDayQuoteExceptionCallback);
  329. LogInfoHelper.WriteInfo("行情取盘面 数量:" + list.Count);
  330. }
  331. catch
  332. {
  333. _queryDayQuote = true;
  334. }
  335. }
  336. }
  337. }
  338. /// <summary>
  339. /// 执行与释放或重置非托管资源相关的应用程序定义的任务。
  340. /// </summary>
  341. public void Dispose()
  342. {
  343. if (_timer != null)
  344. {
  345. _timer.Stop();
  346. _timer = null;
  347. }
  348. if (_tcpManager != null)
  349. {
  350. _tcpManager.Disconnect();
  351. _tcpManager = null;
  352. }
  353. }
  354. #endregion
  355. #region SendCallbacks
  356. /// <summary>
  357. /// Logs the in callback.
  358. /// </summary>
  359. /// <param name="package">The package.</param>
  360. private void LogInCallback(TCPPackage40 package)
  361. {
  362. try
  363. {
  364. var bytes = package.GetData();
  365. var count = GetReturnCode(bytes);
  366. //>=0: Goods个数; <0: 错误码(-1:报文解析失败,-2:Token校验失败,-3:订阅商品不合法)
  367. LogInfoHelper.WriteInfo("行情订阅成功回调, 返回码:" + count);
  368. if (count == -2)
  369. {
  370. //Token校验失败, 不处理,等待交易链路回应
  371. //MessengerHelper.DefaultSend(false, MessengerTokens.UserOfflineNoticeToken);
  372. var random = new Random();
  373. var second = random.Next(5000, 10000);
  374. Thread.Sleep(second);
  375. }
  376. else if (count >= 0)
  377. {
  378. //订阅成功,发送盘面查询请求
  379. QueryDayQuote();
  380. if (_timer == null) return;
  381. _timer.Stop();
  382. _timer.Start();
  383. }
  384. }
  385. catch
  386. {
  387. }
  388. finally
  389. {
  390. _subscribeFlag = true;
  391. StartSendBeat();
  392. LogInfoHelper.WriteInfo("行情订阅成功回调,启动发送心跳包!");
  393. }
  394. }
  395. /// <summary>
  396. /// Exceptions the callback.
  397. /// </summary>
  398. /// <param name="errorId">The error identifier.</param>
  399. /// <param name="message">The message.</param>
  400. private void ExceptionCallback(int errorId, string message)
  401. {
  402. _subscribeFlag = true;
  403. LogInfoHelper.WriteInfo("行情订阅出错:" + errorId + " " + message);
  404. }
  405. /// <summary>
  406. /// Queries the day quote callback.
  407. /// </summary>
  408. /// <param name="package">The package.</param>
  409. private void QueryDayQuoteCallback(TCPPackage40 package)
  410. {
  411. try
  412. {
  413. var bytes = package.GetData();
  414. var count = GetReturnCode(bytes);
  415. //// >=0: 返回记录数目; <0: 错误码(-1:报文解析失败,-2:Token校验失败,-3:订阅商品不合法)
  416. LogInfoHelper.WriteInfo("查询QueryDayQuote成功,返回:" + count);
  417. if (count == -2)
  418. {
  419. ////Token校验失败, 返回登录界面
  420. LogInfoHelper.WriteInfo("查询QueryDayQuote 失败返回 -2");
  421. //MessengerHelper.DefaultSend(RestartType.QuoteCheckedFail, MessengerTokens.UserOfflineNoticeToken);
  422. }
  423. else if (count >= 0)
  424. {
  425. ////查询成功,更新数据
  426. if (bytes.Length < 4) return;
  427. ////前四位为count,数据体从4位开始
  428. var length = bytes.Length - 4;
  429. var goodsData = bytes.Skip(4).Take(length).ToArray();
  430. var quoteTiks = goodsData.ToQuoteTiks(true);
  431. quoteTiks.UpdateQuoteGoodses();
  432. }
  433. }
  434. catch (Exception ex)
  435. {
  436. LogInfoHelper.WriteInfo("查询QueryDayQuote解析失败" + ex.ToString());
  437. }
  438. finally
  439. {
  440. _queryDayQuote = true;
  441. }
  442. }
  443. /// <summary>
  444. /// Exceptions the callback.
  445. /// </summary>
  446. /// <param name="errorId">The error identifier.</param>
  447. /// <param name="message">The message.</param>
  448. private void QueryDayQuoteExceptionCallback(int errorId, string message)
  449. {
  450. _queryDayQuote = true;
  451. LogInfoHelper.WriteInfo("盘面查询请求失败:" + message + "[" + errorId + "]");
  452. }
  453. /// <summary>
  454. /// Gets the return code.
  455. /// </summary>
  456. /// <param name="bytes">The bytes.</param>
  457. /// <returns>System.Int32.</returns>
  458. private int GetReturnCode(byte[] bytes)
  459. {
  460. /* 查询返回结构体
  461. * 查询通用回应(包括订阅、盘面查询等)
  462. typedef struct tagQUERY_COMMON_REQ {
  463. int32_t iCount; // >=0: 返回记录数目; <0: 错误码(-1:报文解析失败,-2:Token校验失败,-3:订阅商品不合法)
  464. void* pRecords; // 根据请求类别分别指向不同的结果集(GOODS/QUERY_HISTROY_RSP/...)
  465. } QUERY_COMMON_REQ;
  466. */
  467. //// >=0: 返回记录数目; <0: 错误码(-1:报文解析失败,-2:Token校验失败,-3:订阅商品不合法)
  468. if (bytes == null || bytes.Length < 4) return -1;
  469. var returnCodeBytes = bytes.Take(4).ToArray();
  470. return IPAddress.NetworkToHostOrder(BitConverter.ToInt32(returnCodeBytes, 0));
  471. }
  472. /// <summary>
  473. /// 停止同步盘面定时器,Subscribe()成功后会启动
  474. /// </summary>
  475. public void StopSyncTimer()
  476. {
  477. if (_timer != null) _timer.Stop();
  478. }
  479. /// <summary>
  480. /// 设置断线重连标志
  481. /// </summary>
  482. public void SetReconnectFlag(bool flag)
  483. {
  484. if (null != _tcpManager) _tcpManager.SetReconnectFlag(flag);
  485. }
  486. #endregion
  487. }
  488. }