using CommonLang; using CommonLang.Concurrent; using CommonLang.IO; using CommonLang.Log; using SimpleJson; using System; using System.Collections.Generic; using System.ComponentModel; using System.IO; using System.Net; using System.Net.Sockets; using System.Threading; namespace Pomelo.DotNetClient { public class PomeloClient : IDisposable { //--------------------------------------------------------------------------------------------------------------------- protected readonly CommonLang.Log.Logger log; private readonly PomeloClientAdapter adapter; private readonly Listener listener; private HashMap response_map = new HashMap(); private HashMap> push_handler = new HashMap>(); private HashMap push_handler_once = new HashMap(); private readonly AtomicUInt req_id_gen = new AtomicUInt(1); private readonly SystemTimeInterval request_timer = new SystemTimeInterval(5*1000); //private readonly SyncMessageQueue tasks = new SyncMessageQueue(); private readonly SyncMessageQueue2 tasks; private readonly ISerializer serializer; private bool disposed = false; //--------------------------------------------------------------------------------------------------------------------- // public TypeModel Serializer // { // get { return serializer; } // } public bool IsDisposed { get { return disposed; } } public bool IsConnected { get { return this.Session != null ? this.Session.Connected : false; } } public NetWorkState NetWorkState { get { return adapter.NetWorkState; } } public long RecvTime { get { return adapter.GetPing(); } } public long TotalRecvBytes { get { return adapter.TotalRecvBytes; } } public long TotalSentBytes { get { return adapter.TotalSentBytes; } } //--------------------------------------------------------------------------------------------------------------------- public Socket Session { get { return adapter.Session; } } public PomeloClient(ISerializer serializer) { this.serializer = serializer; this.log = LoggerFactory.GetLogger(GetType().Name); this.listener = new Listener(this); this.adapter = PomeloClientFactory.Instance.CreateAdapter(listener); this.tasks = new SyncMessageQueue2(onError); } public virtual void QueueTask(Action action) { tasks.Enqueue(action); } public virtual void ClearQueue() { tasks.Clear(); } //--------------------------------------------------------------------------------------------------------------------- class Listener : IPomeloClientAdapterListener { public readonly PomeloClient client; public Action connectCallback; public Listener(PomeloClient c) { this.client = c; } public void OnConnected(Socket session, JsonObject user) { if (connectCallback != null) { connectCallback(user); } } public void OnError(Exception err) { if(client != null) { client.onError(err); } } public void OnNetWorkStateChanged(NetWorkState st) { if (client != null && client.event_NetWorkStateChangedEvent != null) { client.event_NetWorkStateChangedEvent(st); } } public void OnDisconnected(Socket session, string reason) { client.clear_response(true); if (client.event_OnDisconnected != null) { client.event_OnDisconnected.Invoke(reason); } } public void OnReceivedMessage(RecvMessage msg) { client.process_message(msg); } public void ClearQueue() { client.ClearQueue(); } } /// /// 网络连接 /// /// /// /// /// /// public void connect(string host, int port, int timeout, JsonObject user, Action connectCallback) { this.listener.connectCallback = connectCallback; this.request_timer.Tag = timeout; this.adapter.connect(host, port, timeout, user); } /// /// 网络连接 /// /// /// /// /// public void connect(string host, int port, int timeout, Action connectCallback) { this.listener.connectCallback = connectCallback; this.request_timer.Tag = timeout; this.adapter.connect(host, port, timeout, null); } /// /// 主动断开连接 /// public void disconnect() { log.Debug("====socket ==PomeloClient== disconnect ThreadID:"+Thread.CurrentThread.ManagedThreadId); adapter.disconnect(); this.clear_response(false); } protected void disposing() { adapter.disposing(); this.ClearLastResponse(); this.clear_response(false); this.clear_push(); ClearQueue(); //tasks.ProcessMessages(); } //--------------------------------------------------------------------------------------------------------------------- /// /// 心跳,客户端一般是一帧调用一次 /// public virtual void update(float deltime) { tasks.ProcessMessages(); adapter.update(deltime); check_request_timeout(); } public void DoSendUpdate() { adapter.DoSendUpdate(); } //--------------------------------------------------------------------------------------------------------------------- /// /// 发送请求消息 /// public void request_binary(string route, byte[] data, Action action) { uint id = req_id_gen.GetAndIncrement(); this.listen_response_binary(id, route, action); try { this.send(route, id, data); } catch (Exception e) { onError(e); } } /// /// 发送请求消息 /// public void request(string route, object msg, Action action) { uint id = req_id_gen.GetAndIncrement(); this.listen_response(id, route, action, null); try { this.send(route, id, msg); } catch (Exception e) { onError(e); } } /// /// 发送请求消息并匹配编/解码器 /// public void request(string route, object msg, Action action, MessageEncoder encoder, MessageDecoder decoder) { uint id = req_id_gen.GetAndIncrement(); this.listen_response(id, route, action, decoder); try { this.send(route, id, msg, encoder); } catch (Exception e) { onError(e); } } /// /// 发送请求消息并匹配编/解码器 /// public void request(string route, object msg, Action action, MessageEncoder encoder, MessageDecoder decoder, object option) { this.onRequestStart(route, option); this.request(route, msg, (err, response) => { this.onRequestEnd(route, err, response, option); action(err, response); }, encoder, decoder); } /// /// 发送请求并验证 /// public void request(object req, ResponseValidater validate, Action cb, object option = null) { var route = EventTypes.GetRequestKey(req.GetType()); Type responseType = EventTypes.GetResponseType(route); this.onRequestStart(route, option); this.request(route, req, (err, response) => { if (err != null) { this.onRequestEnd(route, err, null, option); cb(err, null); } else if (response != null) { int s2c_code; string s2c_msg; if (validate(response, out s2c_code, out s2c_msg)) { this.onRequestEnd(route, null, response, option); cb(null, response); } else { PomeloException exp = new PomeloException(s2c_code, s2c_msg); exp.Route = route; this.onRequestEnd(route, exp, null, option); cb(exp, null); } } }); } /// /// 发送请求消息 /// public void request(object req, Action cb, object option = null) { this.request(req, (object rsp, out int s2c_code, out string s2c_msg) => { s2c_code = 200; s2c_msg = null; return true; }, cb, option); } /// /// 发送请求消息 /// public void request(object req, ResponseValidater validate, Action cb, object option = null) where RSP : class { this.request(req, (object rsp, out int s2c_code, out string s2c_msg) => { return validate(rsp as RSP, out s2c_code, out s2c_msg); }, (err, rsp) => { cb(err, rsp as RSP); }, option); } /// /// 发送请求消息 /// public void request(object req, Action cb, object option = null) where RSP : class { this.request(req, (object rsp, out int s2c_code, out string s2c_msg) => { s2c_code = 200; s2c_msg = null; return true; }, (err, rsp) => { cb(err, rsp as RSP); }, option); } //----------------------------------------------------------------------------------------------------------------- /// /// 发送通知消息 /// public void notify(string route, object msg, MessageEncoder encoder) { this.send(route, 0, msg, encoder); } /// /// 发送通知消息 /// public void notify_binary(string route, byte[] data) { this.send(route, 0, data); } /// /// 发送通知消息 /// public void notify(object msg) { var route = EventTypes.GetNotifyKey(msg.GetType()); this.send(route, 0, msg); } //----------------------------------------------------------------------------------------------------------------- /// /// 注册事件回调 /// public PushHandler listen(Action action) where T : class { var route = EventTypes.GetPushKey(typeof(T)); if (route != null) { return listen_push(route, (push) => { action(push as T); }, null); } return null; } /// /// 注册事件回调 /// public PushHandler listen(string route, Action action) { return listen_push(route, action, null); } /// /// 注册事件回调 /// public PushHandler listen(string route, Action action, MessageDecoder decoder) { return listen_push(route, action, decoder); } /// /// 注册事件回调 /// public PushHandler listen_binary(string route, Action action) { return listen_push_binary(route, action); } /// /// 注册事件回调(如果之前有监听,则替换) /// public void listen_once(Action action) where T : class { var route = EventTypes.GetPushKey(typeof(T)); if (route != null) { listen_push_once(route, (push) => { action(push as T); }, null); } } /// /// 注册事件回调(如果之前有监听,则替换) /// public void listen_once(string route, Action action) { listen_push_once(route, action, null); } /// /// 注册事件回调(如果之前有监听,则替换) /// public void listen_once(string route, Action action, MessageDecoder decoder) { listen_push_once(route, action, decoder); } /// /// 注册事件回调(如果之前有监听,则替换) /// public void listen_once_binary(string route, Action action) { listen_push_once_binary(route, action); } //----------------------------------------------------------------------------------------------------------------- #region ProcessMessages protected void send(string route, uint id, object msg) { try { SendMessage send_object; #if LOCK_SERIALIZER //lock (this.serializer) #endif { send_object = SendMessage.Alloc(route, id, this.serializer, msg); } adapter.send(send_object); } catch (Exception err) { onError(err); } } public void send(SendMessage msg) { try { adapter.send(msg); } catch (Exception err) { onError(err); } } public void send(string route, uint id, MemoryStream stream) { try { var send_object = SendMessage.Alloc(route, id, stream); adapter.send(send_object); } catch (Exception err) { onError(err); } } protected void send(string route, uint id, object msg, MessageEncoder encoder) { try { var send_object = SendMessage.Alloc(route, id, msg, encoder); adapter.send(send_object); } catch (Exception err) { onError(err); } } protected void send(string route, uint id, byte[] data) { try { var send_object = SendMessage.Alloc(route, id, data); adapter.send(send_object); } catch (Exception err) { onError(err); } } protected virtual void process_message(RecvMessage msg) { if (event_NetWorkHandlePushImmediately != null) { if (event_NetWorkHandlePushImmediately(msg.Route, msg.Stream)) { //log.Debug("socket ===== event_NetWorkHandlePushImmediately msg.Route: " + msg.Route); return; } } try { if (msg.PkgType == PackageType.PKG_HEARTBEAT) { if (event_OnHeartBeat != null) { event_OnHeartBeat.Invoke(); } return; } if (msg.MsgType == MessageType.MSG_RESPONSE) { process_response(msg); } else if (msg.MsgType == MessageType.MSG_PUSH) { process_push(msg); } } catch (Exception err) { log.Error("====== PomeloClient.process_message() exception:" + err.StackTrace); onError(err); } } private void check_request_timeout() { if (request_timer.Update()) { using (var removing = ListObjectPool.AllocAutoRelease()) using (var list = ListObjectPool>.AllocAutoRelease()) { long cur_time = CUtils.CurrentTimeMS; if (response_map.Count > 0) { list.AddRange(response_map); foreach (var req in list) { if (req.Value.CheckTimeout(request_timer.Tag, cur_time)) { response_map.Remove(req.Key); removing.Add(req.Value); } } } else { return; } if (removing.Count > 0) { foreach (var r in removing) { PomeloException exp = new PomeloException(408, "Request Timeout : " + r.Route); exp.Route = r.Route; exp.Timeout = true; r.Invoke(exp); log.Error("socket ====== check_request_timeout exp.Route "+ exp.Route); } } } } } private void clear_response(bool async) { response_map.Clear(); //using (var cbs = ListObjectPool.AllocAutoRelease()) //{ // lock (response_map) // { // if (response_map.Count > 0) // { // log.Debug("===========socket listen_response Clear All"); // cbs.AddRange(response_map.Values); // response_map.Clear(); // } // else // { // return; // } // } // foreach (var cb in cbs) // { // var err = new PomeloException(-1, "closed"); // err.Route = cb.Route; // if (async) // { // QueueTask(() => // { // cb.Invoke(err); // }); // } // else // { // cb.Invoke(err); // } // } //} } private void listen_response(uint id, string route, Action cb, MessageDecoder dc) { //log.Debug("socekt listen_response 1 ThreadID: " + Thread.CurrentThread.ManagedThreadId); if (id > 0 && cb != null) { //log.Debug("===========socket listen_response Add msgid:" + id); this.response_map.Add(id, new RequestHandler(this, route, cb, null, dc)); } } private void listen_response_binary(uint id, string route, Action cb) { //log.Debug("socekt listen_response 2 ThreadID: " + Thread.CurrentThread.ManagedThreadId); if (id > 0 && cb != null) { //log.Debug("===========socket listen_response_binary Add msgid:" + id); this.response_map.Add(id, new RequestHandler(this, route, null, cb, null)); } } protected virtual void process_response(RecvMessage msg) { RequestHandler cb; //log.Debug("lixu===========socket process_response msgid:" + msg.MsgID); if (!response_map.TryGetValue(msg.MsgID, out cb)) { log.WarnFormat("Ignore response message : {0}", msg.MsgID); return; } response_map.Remove(msg.MsgID); try { //log.Debug("socket ===== process_response msg.Route: " + cb.Route); msg.Route = cb.Route; var output = msg.Stream; if (cb.IsBinary) { var response = msg.ReadBody(); cb.InvokeBin(response); } else if (cb.Decoder != null) { var response = msg.ReadBody(cb.Decoder); if (response == null) { throw new Exception("Decode response error : " + msg.Route); } if (response is PomeloException) { cb.Invoke(response as PomeloException); } else { cb.Invoke(response); } } else { Type responseType = EventTypes.GetResponseType(msg.Route); if (responseType == null) { throw new Exception("No response type : " + msg.Route); } object response; //lock (this.serializer) { response = msg.ReadBody(this.serializer, responseType); } if (response == null) { throw new Exception("Deserialize response error : " + responseType); } else { this.addLastResponse(response); cb.Invoke(response); } } } catch (Exception e) { var exp = new PomeloException(501, e.Message, e); exp.Route = msg.Route; cb.Invoke(exp); onError(e); } } private PushHandler listen_push(string route, Action cb, MessageDecoder dc) { var ret = new PushHandler(this, route, cb, dc); var act = push_handler.Get(route); if (act == null) { act = new List(); push_handler.Put(route, act); } act.Add(ret); return ret; } private void listen_push_once(string route, Action cb, MessageDecoder dc) { //lock (push_handler_once) { if (cb == null) { push_handler_once.Remove(route); } else { push_handler_once.Put(route, new PushHandler(this, route, cb, dc)); } } } private PushHandler listen_push_binary(string route, Action cb) { var ret = new PushHandler(this, route, cb); var act = push_handler.Get(route); if (act == null) { act = new List(); push_handler.Put(route, act); } act.Add(ret); return ret; } private void listen_push_once_binary(string route, Action cb) { if (cb == null) { push_handler_once.Remove(route); } else { push_handler_once.Put(route, new PushHandler(this, route, cb)); } } internal void remove_push(PushHandler handler) { var act = push_handler.Get(handler.Route); if (act != null) { act.Remove(handler); } } protected virtual void process_push(RecvMessage msg) { try { using (var all = ListObjectPool.AllocAutoRelease()) { //log.Debug("socket ===== process_push msg.Route: " + msg.Route); //lock (push_handler) { var list = push_handler.Get(msg.Route); if (list != null) { all.AddRange(list); } } //lock (push_handler_once) { var once = push_handler_once.Get(msg.Route); if (once != null) all.Add(once); } if (all.Count > 0) { object push = null; byte[] push_bin = null; var push_type = EventTypes.GetPushType(msg.Route); for (int i = 0; i < all.Count; i++) { var handler = all[i]; if (handler.IsBinary) { if (push_bin == null) push_bin = msg.ReadBody(); handler.InvokeBin(push_bin); } else if (handler.decoder != null) { var decode = msg.ReadBody(handler.decoder); if (decode != null) { handler.Invoke(decode); } } else { if (push == null) { if (push_type != null) { //lock (this.serializer) { push = msg.ReadBody(this.serializer, push_type); } } } if (push != null) { this.addLastResponse(push); handler.Invoke(push); } } } } else { if (event_NetWorkHandlePush != null) { event_NetWorkHandlePush.Invoke(msg.Route, msg.Stream); } } } } catch (Exception e) { log.Error("Decode Error: route = " + msg.Route + "\n" + e.Message); onError(e); } } private void clear_push() { //lock (push_handler) { push_handler.Clear(); } //lock (push_handler_once) { push_handler_once.Clear(); } } #endregion //----------------------------------------------------------------------------------------------------------------- #region LastResponse private bool enable_last_response = true; private readonly HashMap last_response = new HashMap(); public bool IsSaveResponse { get { return enable_last_response; } set { enable_last_response = value; if (value == false) { //lock (last_response) { last_response.Clear(); } } } } internal void addLastResponse(object push) { //lock (last_response) { if (push != null && enable_last_response) { last_response.Put(push.GetType(), push); } } } public T GetLastResponse() where T : class { //lock (last_response) { return last_response.Get(typeof(T)) as T; } } public object GetLastResponse(Type type) { //lock (last_response) { return last_response.Get(type); } } public void ClearLastResponse() { //lock (last_response) { last_response.Clear(); } } #endregion //----------------------------------------------------------------------------------------------------------------- internal void onError(Exception err) { if (event_OnError != null) { event_OnError.Invoke(err); } } //----------------------------------------------------------------------------------------------------------------- public void onRequestStart(string route, object option) { if (event_RequestStartEvent != null) { event_RequestStartEvent(route, option); } } public void onRequestEnd(string route, PomeloException excep, object response, object option) { if (event_RequestEndEvent != null) { event_RequestEndEvent(route, excep, response, option); } } //----------------------------------------------------------------------------------------------------------------- public void Dispose() { if (this.disposed) return; this.disposing(); this.disposing_events(); this.disposed = true; } //----------------------------------------------------------------------------------------------------------------- /// /// 网络变更事件 /// public event Action NetWorkStateChangedEvent { add { /*lock (this)*/ event_NetWorkStateChangedEvent += value; } remove { /*lock (this)*/ event_NetWorkStateChangedEvent -= value; } } /// /// 网络线程立即回调收到消息 /// public event ProcessMessageImmediately NetWorkHandlePushImmediately { add { /*lock (this)*/ event_NetWorkHandlePushImmediately += value; } remove { /*lock (this)*/ event_NetWorkHandlePushImmediately -= value; } } /// /// 错误回调 /// public event Action OnError { add { /*lock (this)*/ event_OnError += value; } remove { /*lock (this)*/ event_OnError -= value; } } /// /// 主线程回调收到未处理消息 /// public event Action NetWorkHandlePush { add { /*lock (this)*/ event_NetWorkHandlePush += value; } remove { /*lock (this)*/ event_NetWorkHandlePush -= value; } } /// /// 请求开始事件 /// public event Action RequestStartEvent { add { /*lock (this)*/ event_RequestStartEvent += value; } remove { /*lock (this)*/ event_RequestStartEvent -= value; } } /// /// 请求返回事件 /// public event Action RequestEndEvent { add { /*lock (this)*/ event_RequestEndEvent += value; } remove { /*lock (this)*/ event_RequestEndEvent -= value; } } /// /// 已断线 /// public event Action OnDisconnected { add { /*lock (this)*/ event_OnDisconnected += value; } remove { /*lock (this)*/ event_OnDisconnected -= value; } } /// /// 用于Gate心跳包 /// public event Action OnHeartBeat { add { /*lock (this)*/ event_OnHeartBeat += value; } remove { /*lock (this)*/ event_OnHeartBeat -= value; } } private Action event_OnError; private Action event_RequestStartEvent; private Action event_RequestEndEvent; private Action event_NetWorkStateChangedEvent; private Action event_OnDisconnected; private Action event_NetWorkHandlePush; private ProcessMessageImmediately event_NetWorkHandlePushImmediately; private Action event_OnHeartBeat; protected virtual void disposing_events() { event_OnError = null; event_RequestStartEvent = null; event_RequestEndEvent = null; event_NetWorkStateChangedEvent = null; event_OnDisconnected = null; event_NetWorkHandlePush = null; event_NetWorkHandlePushImmediately = null; event_OnHeartBeat = null; } //----------------------------------------------------------------------------------------------------------------- } }