using CommonLang; using CommonLang.Concurrent; using CommonLang.Log; using CommonLang.Net; using CommonLang.Protocol; using CommonNetwork.Net; using System; using System.Collections.Generic; using System.Linq; using System.Text; namespace CommonNetwork.Client { /// /// 用于监听请求回馈的客户端程序 /// public class NetClient : INetSessionListener, IDisposable { private HashMap mListenRequests = new HashMap(); private System.Threading.Timer mCheckTimer; public INetPackageCodec Codec {get; private set;} public INetSession Session { get; private set; } public bool IsConnected { get { return Session.IsConnected; } } public NetClient(INetSession session, INetPackageCodec codec) { this.Session = session; this.Codec = codec; this.mCheckTimer = new System.Threading.Timer(check_request_timeout, this, 1000, 1000); } public bool Connect(string url) { return Session.Open(url, Codec, this); } public void Dispose() { mCheckTimer.Dispose(); lock (mListenRequests) { mListenRequests.Clear(); } Session.Close(); Session.Dispose(); } public void Send(IMessage msg) { Session.Send(msg); } public void SendResponse(IMessage rsponse, int requestMessageID) { Session.SendResponse(rsponse, requestMessageID); } // ----------------------------------------------------------------------------------- #region _REQUEST_RESPONSE_ private AtomicInteger MessageIDGen = new AtomicInteger(1); public class Request { internal static Logger log = LoggerFactory.GetLogger("NetClient.Request"); private OnResponseHandler mHandler; private OnRequestTimeoutHandler mTimeout; public NetClient Client { get; private set; } public int TimeOutMS { get; private set; } public long EndTime { get; private set; } public long SendTime { get; private set; } public int MessageID { get; private set; } public IMessage RequestMessage { get; protected set; } public IMessage ResponseMessage { get; protected set; } public Request(NetClient client, int timeOutMS, IMessage request, OnResponseHandler handler = null, OnRequestTimeoutHandler timeout = null) { this.Client = client; this.MessageID = request.MessageID = client.MessageIDGen.GetAndIncrement(); this.RequestMessage = request; this.TimeOutMS = timeOutMS; this.SendTime = CUtils.CurrentTimeMS; this.EndTime = SendTime + timeOutMS; this.mHandler = handler; this.mTimeout = timeout; } virtual internal void onRecivedMessage(IMessage msg) { ResponseMessage = msg; if (mHandler != null) { try { mHandler.Invoke(this); } catch (Exception err) { log.Error(err.Message, err); } } mHandler = null; mTimeout = null; } virtual internal void onTimeout() { if (mTimeout != null) { try { mTimeout.Invoke(this); } catch (Exception err) { log.Error(err.Message, err); } } mHandler = null; mTimeout = null; } } public delegate void OnResponseHandler(Request req); public delegate void OnRequestTimeoutHandler(Request req); public Request SendRequest(IMessage msg, OnResponseHandler handler, OnRequestTimeoutHandler timeout = null, int timeOutMS = 15000) { Request req = new Request(this, timeOutMS, msg, handler, timeout); lock (mListenRequests) { mListenRequests.Add(req.MessageID, req); } Session.Send(msg); return req; } #endregion // ----------------------------------------------------------------------------------- #region _GENERIC_REQUEST_RESPONSE_ public class TRequest : Request where REQ : IMessage where RES : IMessage { private OnTResponseHandler mTHandler; private OnTRequestTimeoutHandler mTTimeout; public TRequest(NetClient client, int timeOutMS, IMessage request, OnTResponseHandler handler = null, OnTRequestTimeoutHandler timeout = null) : base(client, timeOutMS, request) { this.mTHandler = handler; this.mTTimeout = timeout; } public REQ TRequestMessage { get { return (REQ)base.RequestMessage; } } public RES TResponseMessage { get { return (RES)base.ResponseMessage; } } override internal void onRecivedMessage(IMessage msg) { base.ResponseMessage = msg; if (mTHandler != null) { try { mTHandler.Invoke(this); } catch (Exception err) { log.Error(err.Message, err); } } mTHandler = null; mTTimeout = null; } override internal void onTimeout() { if (mTTimeout != null) { try { mTTimeout.Invoke(this); } catch (Exception err) { log.Error(err.Message, err); } } mTHandler = null; mTTimeout = null; } } public delegate void OnTResponseHandler(TRequest req) where REQ : IMessage where RES : IMessage; public delegate void OnTRequestTimeoutHandler(TRequest req) where REQ : IMessage where RES : IMessage; public TRequest SendTRequest(REQ msg, OnTResponseHandler handler, OnTRequestTimeoutHandler timeout = null, int timeOutMS = 15000) where REQ : IMessage where RES : IMessage { TRequest req = new TRequest(this, timeOutMS, msg, handler, timeout); lock (mListenRequests) { mListenRequests.Add(req.MessageID, req); } Session.Send(msg); return req; } #endregion // ----------------------------------------------------------------------------------- private void check_request_timeout(object state) { long curTime = CUtils.CurrentTimeMS; lock (mListenRequests) { List removing = null; foreach (Request req in mListenRequests.Values) { if (req.EndTime < curTime) { if (removing == null) { removing = new List(); } removing.Add(req); } } if (removing != null) { foreach (Request remove in removing) { mListenRequests.RemoveByKey(remove.MessageID); remove.onTimeout(); } } } } // ----------------------------------------------------------------------------------- #region SessionListener void INetSessionListener.sessionOpened(INetSession session) { } void INetSessionListener.sessionClosed(INetSession session) { lock (mListenRequests) { mListenRequests.Clear(); } } void INetSessionListener.messageReceived(INetSession session, object data) { if (data is IMessage) { IMessage msg = data as IMessage; Request request = null; lock (mListenRequests) { request = mListenRequests.RemoveByKey(msg.MessageID) as Request; } if (request != null) { request.onRecivedMessage(msg); } } } void INetSessionListener.messageSent(INetSession session, object data) { } void INetSessionListener.onError(INetSession session, Exception err) { } #endregion // ----------------------------------------------------------------------------------- } }