using CommonLang; using CommonLang.Concurrent; using CommonLang.Log; using SimpleJson; using System; using System.Collections.Generic; using System.IO; using System.Net; using System.Net.Sockets; using System.Runtime.InteropServices; using System.Text.RegularExpressions; using System.Threading; namespace Pomelo.DotNetClient { public class PomeloTCP : PomeloClientAdapter { protected readonly IPomeloClientAdapterListener listener; private long last_heartbeat_r2c = CUtils.CurrentTimeMS; private long last_heartbeat_chk = CUtils.CurrentTimeMS; private long total_recv_bytes = 0; private long total_sent_bytes = 0; private const int _SendTimeOut = 3000; private const int _ReceiveTimeOut = 3000; private const int _MaxRead = 1024 * 16; private readonly byte[] _byteBuffer = new byte[_MaxRead]; private MemoryStream _memStream = null; private BinaryReader _reader = null; private NetWorkState mCurStatus = NetWorkState.NONE; private Logger log = LoggerFactory.GetLogger("PomeloTCP"); Queue MsgQueue = new Queue(); protected class Connecting { public string _host; public int _port; public int _timeout; public JsonObject _user; } protected TcpClient socket; protected Connecting _NetInfo; public Socket Session { get { return (socket != null) ? socket.Client : null; } } public NetWorkState NetWorkState { get { return mCurStatus; } } public long TotalRecvBytes { get { return total_recv_bytes; } } public long TotalSentBytes { get { return total_sent_bytes; } } public void DoSendUpdate() { } public bool Connected { get { return socket != null && socket.Connected && socket.GetStream().CanRead && socket.GetStream().CanWrite; } } public PomeloTCP(IPomeloClientAdapterListener listener) { socket = null; this.listener = listener; _memStream = new MemoryStream(); _reader = new BinaryReader(_memStream); } public long GetPing() { return CUtils.CurrentTimeMS - last_heartbeat_r2c; } //-------------------------------------------------------------------------------------------------- #region --Interface-- public virtual void connect(string host, int port, int timeout, JsonObject user) { log.Debug("===========socket connect begin " + Thread.CurrentThread.ManagedThreadId); if (Connected) { //send msg if (listener != null) { log.Debug("===========socket connect call function "); listener.OnConnected(socket.Client, _NetInfo._user); } return; } //已经有一个连接在进行中 if(mCurStatus == NetWorkState.CONNECTING) { JsonObject json = new JsonObject(); DoNetState(NetWorkState.ERROR); json["s2c_code"] = 500; json["s2c_msg"] = "socket connecting"; listener.OnConnected(socket.Client, json); return; } _NetInfo = new Connecting() { _host = host, _port = port, _timeout = timeout, _user = user, }; DoNetState(NetWorkState.CONNECTING); string newIp = null; AddressFamily addressFamily; GetIPType(host, port.ToString(), out newIp, out addressFamily); _NetInfo._host = newIp; _NetInfo._port = port; this.socket = new TcpClient(addressFamily) { SendTimeout = _SendTimeOut, ReceiveTimeout = _ReceiveTimeOut, NoDelay = true }; socket.Client.Blocking = true; //connect try { log.Debug("===========socket connect call socket.BeginConnect " + _NetInfo._host + ":" + _NetInfo._port); _received_heartbeat(); socket.BeginConnect(_NetInfo._host, _NetInfo._port, new AsyncCallback(OnConnect), socket); } catch (Exception err) { if (err is SocketException) { JsonObject json = new JsonObject(); switch ((err as SocketException).SocketErrorCode) { case SocketError.TimedOut: DoNetState(NetWorkState.TIMEOUT); json["s2c_code"] = 501; json["s2c_msg"] = err.Message; break; default: DoNetState(NetWorkState.ERROR); json["s2c_code"] = 500; json["s2c_msg"] = err.Message; break; } listener.OnConnected(socket.Client, json); } else { log.Debug("===========socket connect call socket.BeginConnect CloseSocket"); CloseSocket(); } } } private void OnConnect(IAsyncResult ar) { //reset stream try { socket.EndConnect(ar); if (Connected) { _memStream.SetLength(0); _memStream.Position = 0; Array.Clear(_byteBuffer, 0, _byteBuffer.Length); StartReadMessage(); _send_handshake(_NetInfo._user); log.Debug("socket connect success "); } else { log.Debug("socket connect faile "); this._set_net_state(NetWorkState.DISCONNECTED); } } catch (Exception e) { log.Warn("===========OnConnect========== Error" + e.Message); CloseSocket(); _on_error(e); } } public virtual void disconnect() { log.Debug("socket disconnect " + Thread.CurrentThread.ManagedThreadId); DoNetState(NetWorkState.CLOSED); CloseSocket(); ClearMsgQueue(); if(listener != null) { listener.OnDisconnected(null, "CLOSED"); } } public virtual bool resolve_route(RecvMessage msg, byte flag, out string route) { if ((flag & RecvMessage.MSG_Route_Mask) == 1) { ushort routeId = RecvMessage.ReadShort(msg.Stream); route = "area.playerPush.battleEventPush"; return true; } route = null; return false; } public virtual void update(float deltime) { ProcessMsg(); } public virtual void send(SendMessage send_object) { try { startSend(send_object); } catch (Exception err) { CloseSocket(); log.Warn("socket startSend Error:"+ err.StackTrace); _on_error(err); } } public virtual void disposing() { log.Debug("socket disposing " + Thread.CurrentThread.ManagedThreadId); if (_memStream != null) { _memStream.Close(); _memStream = null; } if (_reader != null) { _reader.Close(); _reader = null; } disconnect(); } #endregion //-------------------------------------------------------------------------------------------------- #region --Internal-- private void _on_error(Exception err) { log.Warn("_on_error ThreadID:" + Thread.CurrentThread.ManagedThreadId); _set_net_state(NetWorkState.DISCONNECTED); } private void _set_net_state(NetWorkState st) { LocalNetStatus recv_object = LocalNetStatus.Alloc(st); AddMsg(recv_object); } private bool DoNetState(NetWorkState st) { if (mCurStatus != st) { if (NetWorkState.CLOSED == mCurStatus && NetWorkState.DISCONNECTED == st) { //DISCONNECTED 为重连标识,所以只能在connected状态下切换 return false; } mCurStatus = st; if (st != NetWorkState.NONE) { if(NetWorkState.DISCONNECTED == mCurStatus) { CloseSocket(); ClearMsgQueue(); } listener.OnNetWorkStateChanged(st); return true; } } return false; } private void CloseSocket() { try { log.Debug("====CloseSocket====="); if (Connected) { socket.Client.Shutdown(SocketShutdown.Both); socket.Client.Close(); //socket.GetStream().Close(); socket.Close(); socket = null; log.Debug("====CloseSocket===== end"); } } catch (Exception err) { log.Debug("===== socket === CloseSocket Error"); log.Warn(err.Message + "\n" + err.StackTrace); if(socket != null) { socket.Close(); socket = null; } } } private void _send_handshake(JsonObject user) { //Debug.Log("begin handshake!"); var Version = "0.3.0"; var Type = "unity-socket"; if (user == null) user = new JsonObject(); var msg = new JsonObject(); //Build sys option var sys = new JsonObject(); sys["version"] = Version; sys["type"] = Type; //Build handshake message msg["sys"] = sys; msg["user"] = user; var body = Message.UTF8.GetBytes(msg.ToString()); var send_object = SendMessage.Alloc(PackageType.PKG_HANDSHAKE, body); startSend(send_object); } private void _received_handshake(JsonObject msg) { var code = msg["code"]; var sys = msg["sys"] as JsonObject; if (code == null || sys == null || Convert.ToInt32(code) != 200) { throw new Exception("Handshake error! Please check your handshake config."); } object jobj; JsonObject dict; if (sys.TryGetValue("dict", out jobj)) { dict = (JsonObject)jobj; } else { dict = new JsonObject(); } //Init heartbeat service int interval = 0; if (sys.TryGetValue("heartbeat", out jobj)) { interval = Convert.ToInt32(jobj); //_init_heartbeat(interval); } var send_object = SendMessage.Alloc(PackageType.PKG_HANDSHAKE_ACK); startSend(send_object); JsonObject user; if (msg.TryGetValue("user", out jobj)) { user = (JsonObject)jobj; } else { user = new JsonObject(); } DoNetState(NetWorkState.CONNECTED); _received_heartbeat(); if (listener != null) { listener.OnConnected(socket.Client, user); } } private void _received_package(Message recv) { if(recv is RecvMessage) { RecvMessage recv_object = recv as RecvMessage; if (recv_object.PkgType == PackageType.PKG_HANDSHAKE) { var body = Message.UTF8.GetString( recv_object.Buffer, RecvMessage.FIXED_HEAD_SIZE, recv_object.PkgLength); JsonObject data = (JsonObject)SimpleJson.SimpleJson.DeserializeObject(body); _received_handshake(data); } else if (recv_object.PkgType == PackageType.PKG_HEARTBEAT) { listener.OnReceivedMessage(recv_object); } else if (recv_object.PkgType == PackageType.PKG_DATA) { recv_object.DecodeBody(this); listener.OnReceivedMessage(recv_object); } else if (recv_object.PkgType == PackageType.PKG_KICK) { DoNetState(NetWorkState.KICK); } } else { //异步处理 网络状态更新 LocalNetStatus netStatus = recv as LocalNetStatus; if(netStatus != null) { DoNetState(netStatus.St); } } recv.Dispose(); } private void _send_heartbeat() { var send_object = SendMessage.Alloc(PackageType.PKG_HEARTBEAT); startSend(send_object); } private void _received_heartbeat() { last_heartbeat_r2c = CUtils.CurrentTimeMS; } #endregion //-------------------------------------------------------------------------------------------------- #region --收发-- private void StartReadMessage() { try { if (!Connected) { return; } if (socket.GetStream().CanRead) { Array.Clear(_byteBuffer, 0, _byteBuffer.Length); //log.Debug("WorkStream.StartReadMessage ThreadID:" + Thread.CurrentThread.ManagedThreadId); socket.GetStream().BeginRead(_byteBuffer, 0, _byteBuffer.Length, new AsyncCallback(TCPReadCallBack), null); } else { log.Warn("Network IO problem not cantread"); } } catch (Exception err) { log.Warn("StartReadMessage error :" + err.Message); } } private void TCPReadCallBack(IAsyncResult ar) { //主动断开时 if (!Connected) { return; } int numberOfBytesRead = 0; try { numberOfBytesRead = socket.GetStream().EndRead(ar); if (numberOfBytesRead > 0) { pickpackage(_byteBuffer, numberOfBytesRead); Array.Clear(_byteBuffer, 0, _byteBuffer.Length); //log.Debug("socket WorkStream.StartReadMessage callback ThreadID:" + Thread.CurrentThread.ManagedThreadId); socket.GetStream().BeginRead(_byteBuffer, 0, _byteBuffer.Length,new AsyncCallback(TCPReadCallBack), null); } else { //被动断开时 回到登录界面 log.Debug("===========socket recve numberOfBytesRead: recve = " + numberOfBytesRead.ToString()); _set_net_state(NetWorkState.DISCONNECTED); } } catch (Exception err) { log.Warn("=======socket===TCPReadCallBack===Error>" + err.Message); _on_error(err); } } private long RemainingBytes() { return _memStream.Length - _memStream.Position; } private void pickpackage(byte[] bytes, int length) { _memStream.Seek(0, SeekOrigin.End); _memStream.Write(bytes, 0, length); _memStream.Seek(0, SeekOrigin.Begin); while (RemainingBytes() >= RecvMessage.FIXED_HEAD_SIZE) { PackageType PkgType = (PackageType)_reader.ReadByte(); int PkgLength = (_reader.ReadByte() << 16) + (_reader.ReadByte() << 8) + _reader.ReadByte(); long remainingLength = RemainingBytes(); _memStream.Position -= RecvMessage.FIXED_HEAD_SIZE; if (remainingLength < PkgLength) { break; } int msgSize = RecvMessage.FIXED_HEAD_SIZE + PkgLength; RecvMessage recv_object = RecvMessage.Alloc(); recv_object.stateSocket = socket; recv_object.Stream.Write(_reader.ReadBytes(msgSize), 0, msgSize); recv_object.DecodeHead(); //Enqueue _received_heartbeat(); AddMsg(recv_object); } var leftover = _reader.ReadBytes((int)RemainingBytes()); _memStream.SetLength(0); //Clear _memStream.Write(leftover, 0, leftover.Length); } #endregion #region <> private void startSend(SendMessage send_object) { try { if (Connected) { socket.GetStream().BeginWrite(send_object.Buffer, 0, send_object.BufferLength, endSend, send_object); } } catch (Exception err) { send_object.Dispose(); log.Warn("startSend Error>" + err.Message); } } private void endSend(IAsyncResult asyncSend) { var send_object = asyncSend.AsyncState as SendMessage; try { if (!Connected) { return; } socket.GetStream().EndWrite(asyncSend); } catch (Exception err) { CloseSocket(); } finally { send_object.Dispose(); } } #endregion private void AddMsg(Message msg) { if (msg != null) { lock (MsgQueue) { MsgQueue.Enqueue(msg); } } } private void ProcessMsg() { lock (MsgQueue) { long last = CUtils.CurrentTimeMS; while (MsgQueue.Count > 0) { if(CUtils.CurrentTimeMS - last > 100) { break; } var recv_object = MsgQueue.Dequeue(); if (recv_object != null) { _received_package(recv_object); } } } } private void ClearMsgQueue() { lock(MsgQueue) { while (MsgQueue.Count > 0) { var recv_object = MsgQueue.Dequeue(); if (recv_object != null) { recv_object.Dispose(); } } } } #region ipv4||6 //////////////////////////////////// [DllImport("__Internal")] private static extern string getIPv6(string mHost, string mPort); public static string GetIPv6(string mHost, string mPort) { #if UNITY_IPHONE && !UNITY_EDITOR string mIPv6 = getIPv6(mHost, mPort); return mIPv6; #else return mHost + "&&ipv4"; #endif } private void GetIPType(string serverIp, string serverPorts, out string newServerIp, out AddressFamily mIPType) { mIPType = AddressFamily.InterNetwork; newServerIp = serverIp; try { var mIPv6 = GetIPv6(serverIp, serverPorts); if (!string.IsNullOrEmpty(mIPv6)) { var m_StrTemp = Regex.Split(mIPv6, "&&"); if (m_StrTemp != null && m_StrTemp.Length >= 2) { var IPType = m_StrTemp[1]; if (IPType == "ipv6") { newServerIp = m_StrTemp[0]; mIPType = AddressFamily.InterNetworkV6; } } } } catch (Exception e) { log.Log("GetIPv6 error:" + e); } } #endregion } }