using Muchinfo.Communication; using Muchinfo.TASClient.Service.Utilities; using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; namespace Muchinfo.TASClient.Service.LinkProxy.TCP { public class QuotationTcpLinkProxy { private TcpPoint _client; private object syncRoot = new object(); private IDatagramResolver _resolver; private string _address; private int _port; private bool _isOnLine = false; private const int c_timeOutSencond = 30; //异步超时时间 private Task timeOutTask; private int _heartbeatInterval = 6; public event EventHandler DatagramRecieved; public event EventHandler StateChanged; private uint _sessionid = 0; private static object _lockSessionid = new object(); private Dictionary> requestDictionary = new Dictionary>(); //回调字典 private Dictionary timeOutSencondDic = new Dictionary(); //会话ID 超时时间 private Dictionary> asyncDictionary = new Dictionary>(); //回调字典 /// /// 创建Tcp代理,打开时会阻塞 /// /// The host. /// The port. /// 心跳时间 public QuotationTcpLinkProxy(string host, int port, int heartbeatInterval = 6) { _address = host; _port = port; _heartbeatInterval = heartbeatInterval; Open(); } /// /// 打开连接 /// public void Open() { this.Close(); //先关闭之前的连接 InitializeClient(); } /// /// 关闭当前连接 /// public void Close() { if (this._client != null) { this._client.StateChanged -= this.Client_StateChanged; this._client.DatagramReceived -= this.Client_DatagramRecieved; this.Dispose(); this._client = null; this.Enabled = false; requestDictionary.Clear(); if (timeOutTask != null) { timeOutTask.Dispose(); } } } public uint SessionId { get { lock (_lockSessionid) { do { if (_sessionid == uint.MaxValue) { _sessionid = 0; } ++_sessionid; } while (requestDictionary.ContainsKey(_sessionid)); return _sessionid; } } } private void CheckTimeOutService() { while (true) { Thread.Sleep(1000); var keys = new List(timeOutSencondDic.Keys); foreach (var key in keys) { if (timeOutSencondDic[key] <= 0) { timeOutSencondDic.Remove(key); if (requestDictionary.ContainsKey(key)) //处理超时 { requestDictionary.Remove(key); } else if (asyncDictionary.ContainsKey(key)) { asyncDictionary[key].SetTimeOut(); asyncDictionary.Remove(key); } } else { timeOutSencondDic[key] -= 1; } } } } /// /// 回调的方法请求行情数据 /// /// 回调的方法 /// 参数 /// 返回0表示发送 public uint AsyncSend(Action asyncCallAction, Datagram param) { uint id = SessionId; requestDictionary[id] = asyncCallAction; timeOutSencondDic[id] = c_timeOutSencond; param.SessionId = id; try { if (this._client != null) { this._client.AsyncSend(param); return id; } } catch { requestDictionary.Remove(id); timeOutSencondDic.Remove(id); //todo:状态栏显示失败原因 } return 0; ////发送失败 } public uint AsyncSend(AsyncResult asyncResult, Datagram param) { uint id = SessionId; asyncDictionary[id] = asyncResult; timeOutSencondDic[id] = c_timeOutSencond; param.SessionId = id; try { if (this._client != null) { this._client.AsyncSend(param); return id; } } catch { asyncDictionary.Remove(id); timeOutSencondDic.Remove(id); //todo:状态栏显示失败原因 } return 0; ////发送失败 } private void InitializeClient() { lock (this.syncRoot) { try { if (this._client == null) { this._client = new TcpPoint(_address, _port); if (this._resolver != null) { this._client.Resolver = this._resolver; } this._client.StateChanged += this.Client_StateChanged; this._client.DatagramReceived += this.Client_DatagramRecieved; this._client.HeartbeatInterval = _heartbeatInterval; this._client.HeartbeatEnabled = true; this._client.Open(); this.Enabled = true; timeOutTask = Task.Factory.StartNew(() => CheckTimeOutService()); } } catch (Exception ex) { if (this._client != null) { this.Client_StateChanged(this, new CommunicationStateEventArgs(this._client.SessionId, CommunicationState.Closed)); } this.Close(); } } } public bool Enabled { get; private set; } /// /// 异步等时间 /// public int AsycnTimeOutSencond { get { return c_timeOutSencond; } } public IDatagramResolver Resolver { get { if (this._client != null) { return this._client.Resolver; } return null; } } /// /// Sends the specified datagram. /// /// The datagram. public void Send(Datagram datagram) { try { if (this._client != null) { this._client.AsyncSend(datagram); } } catch (Exception ex) { } } private void Client_StateChanged(object sender, CommunicationStateEventArgs e) { if (StateChanged != null) { StateChanged(this, e); _isOnLine = e.State == CommunicationState.Opened; } } private void Client_DatagramRecieved(object sender, DatagramEventArgs e) { var sessionid = e.Datagram.SessionId; if (requestDictionary.ContainsKey(sessionid)) { Task.Factory.StartNew(() => { try { //回调函数中处理异常 requestDictionary[sessionid].Invoke(e.Datagram, this.Resolver); } finally { requestDictionary.Remove(sessionid); if (timeOutSencondDic.ContainsKey(sessionid)) { timeOutSencondDic.Remove(sessionid); } } }); } else if (asyncDictionary.ContainsKey(sessionid)) { asyncDictionary[sessionid].Complete(e.Datagram, DateTime.Now); } if (DatagramRecieved != null) { DatagramRecieved(this, e); } } public void Dispose() { lock (this.syncRoot) { if (this._client != null) { if (this._client.State != CommunicationState.Closing && this._client.State != CommunicationState.Closed) { this._client.Close(); } else { this._client.Dispose(); } this._client = null; } } } } }