|
- using CommonLang;
- using CommonLang.Concurrent;
- using CommonLang.Log;
- using SimpleJson;
- using System;
- using System.Collections.Generic;
- using System.IO;
- using System.Net;
- using System.Net.Sockets;
- using System.Runtime.InteropServices;
- using System.Text.RegularExpressions;
- using System.Threading;
- namespace Pomelo.DotNetClient
- {
- public class PomeloTCP : PomeloClientAdapter
- {
- protected readonly IPomeloClientAdapterListener listener;
- private long last_heartbeat_r2c = CUtils.CurrentTimeMS;
- private long last_heartbeat_chk = CUtils.CurrentTimeMS;
- private long total_recv_bytes = 0;
- private long total_sent_bytes = 0;
- private const int _SendTimeOut = 3000;
- private const int _ReceiveTimeOut = 3000;
- private const int _MaxRead = 1024 * 16;
- private readonly byte[] _byteBuffer = new byte[_MaxRead];
- private MemoryStream _memStream = null;
- private BinaryReader _reader = null;
- private NetWorkState mCurStatus = NetWorkState.NONE;
- private Logger log = LoggerFactory.GetLogger("PomeloTCP");
- Queue<Message> MsgQueue = new Queue<Message>();
- protected class Connecting
- {
- public string _host;
- public int _port;
- public int _timeout;
- public JsonObject _user;
- }
- protected TcpClient socket;
- protected Connecting _NetInfo;
- public Socket Session
- {
- get { return (socket != null) ? socket.Client : null; }
- }
- public NetWorkState NetWorkState
- {
- get { return mCurStatus; }
- }
- public long TotalRecvBytes { get { return total_recv_bytes; } }
- public long TotalSentBytes { get { return total_sent_bytes; } }
- public void DoSendUpdate() { }
- public bool Connected
- {
- get
- {
- return socket != null &&
- socket.Connected &&
- socket.GetStream().CanRead &&
- socket.GetStream().CanWrite;
- }
- }
- public PomeloTCP(IPomeloClientAdapterListener listener)
- {
- socket = null;
- this.listener = listener;
- _memStream = new MemoryStream();
- _reader = new BinaryReader(_memStream);
- }
- public long GetPing()
- {
- return CUtils.CurrentTimeMS - last_heartbeat_r2c;
- }
- //--------------------------------------------------------------------------------------------------
- #region --Interface--
- public virtual void connect(string host, int port, int timeout, JsonObject user)
- {
- log.Debug("===========socket connect begin " + Thread.CurrentThread.ManagedThreadId);
- if (Connected)
- {
- //send msg
- if (listener != null)
- {
- log.Debug("===========socket connect call function ");
- listener.OnConnected(socket.Client, _NetInfo._user);
- }
- return;
- }
- //已经有一个连接在进行中
- if(mCurStatus == NetWorkState.CONNECTING)
- {
- JsonObject json = new JsonObject();
- DoNetState(NetWorkState.ERROR);
- json["s2c_code"] = 500;
- json["s2c_msg"] = "socket connecting";
- listener.OnConnected(socket.Client, json);
- return;
- }
- _NetInfo = new Connecting()
- {
- _host = host,
- _port = port,
- _timeout = timeout,
- _user = user,
- };
- DoNetState(NetWorkState.CONNECTING);
- string newIp = null;
- AddressFamily addressFamily;
- GetIPType(host, port.ToString(), out newIp, out addressFamily);
- _NetInfo._host = newIp;
- _NetInfo._port = port;
- this.socket = new TcpClient(addressFamily)
- {
- SendTimeout = _SendTimeOut,
- ReceiveTimeout = _ReceiveTimeOut,
- NoDelay = true
- };
- socket.Client.Blocking = true;
- //connect
- try
- {
- log.Debug("===========socket connect call socket.BeginConnect " + _NetInfo._host + ":" + _NetInfo._port);
- _received_heartbeat();
- socket.BeginConnect(_NetInfo._host, _NetInfo._port, new AsyncCallback(OnConnect), socket);
- }
- catch (Exception err)
- {
- if (err is SocketException)
- {
- JsonObject json = new JsonObject();
- switch ((err as SocketException).SocketErrorCode)
- {
- case SocketError.TimedOut:
- DoNetState(NetWorkState.TIMEOUT);
- json["s2c_code"] = 501;
- json["s2c_msg"] = err.Message;
- break;
- default:
- DoNetState(NetWorkState.ERROR);
- json["s2c_code"] = 500;
- json["s2c_msg"] = err.Message;
- break;
- }
- listener.OnConnected(socket.Client, json);
- }
- else
- {
- log.Debug("===========socket connect call socket.BeginConnect CloseSocket");
- CloseSocket();
- }
- }
- }
- private void OnConnect(IAsyncResult ar)
- {
- //reset stream
- try
- {
- socket.EndConnect(ar);
- if (Connected)
- {
- _memStream.SetLength(0);
- _memStream.Position = 0;
- Array.Clear(_byteBuffer, 0, _byteBuffer.Length);
- StartReadMessage();
- _send_handshake(_NetInfo._user);
- log.Debug("socket connect success ");
- }
- else
- {
- log.Debug("socket connect faile ");
- this._set_net_state(NetWorkState.DISCONNECTED);
- }
- }
- catch (Exception e)
- {
- log.Warn("===========OnConnect========== Error" + e.Message);
- CloseSocket();
- _on_error(e);
- }
- }
- public virtual void disconnect()
- {
- log.Debug("socket disconnect " + Thread.CurrentThread.ManagedThreadId);
- DoNetState(NetWorkState.CLOSED);
- CloseSocket();
- ClearMsgQueue();
- if(listener != null)
- {
- listener.OnDisconnected(null, "CLOSED");
- }
-
- }
- public virtual bool resolve_route(RecvMessage msg, byte flag, out string route)
- {
- if ((flag & RecvMessage.MSG_Route_Mask) == 1)
- {
- ushort routeId = RecvMessage.ReadShort(msg.Stream);
- route = "area.playerPush.battleEventPush";
- return true;
- }
- route = null;
- return false;
- }
- public virtual void update(float deltime)
- {
- ProcessMsg();
- }
- public virtual void send(SendMessage send_object)
- {
- try
- {
- startSend(send_object);
- }
- catch (Exception err) {
- CloseSocket();
- log.Warn("socket startSend Error:"+ err.StackTrace);
- _on_error(err);
- }
- }
- public virtual void disposing()
- {
- log.Debug("socket disposing " + Thread.CurrentThread.ManagedThreadId);
- if (_memStream != null)
- {
- _memStream.Close();
- _memStream = null;
- }
- if (_reader != null)
- {
- _reader.Close();
- _reader = null;
- }
- disconnect();
- }
- #endregion
- //--------------------------------------------------------------------------------------------------
- #region --Internal--
- private void _on_error(Exception err)
- {
- log.Warn("_on_error ThreadID:" + Thread.CurrentThread.ManagedThreadId);
- _set_net_state(NetWorkState.DISCONNECTED);
- }
- private void _set_net_state(NetWorkState st)
- {
- LocalNetStatus recv_object = LocalNetStatus.Alloc(st);
- AddMsg(recv_object);
- }
- private bool DoNetState(NetWorkState st)
- {
- if (mCurStatus != st)
- {
- if (NetWorkState.CLOSED == mCurStatus && NetWorkState.DISCONNECTED == st)
- {
- //DISCONNECTED 为重连标识,所以只能在connected状态下切换
- return false;
- }
- mCurStatus = st;
- if (st != NetWorkState.NONE)
- {
- if(NetWorkState.DISCONNECTED == mCurStatus)
- {
- CloseSocket();
- ClearMsgQueue();
- }
- listener.OnNetWorkStateChanged(st);
- return true;
- }
- }
- return false;
- }
- private void CloseSocket()
- {
- try
- {
- log.Debug("====CloseSocket=====");
- if (Connected)
- {
- socket.Client.Shutdown(SocketShutdown.Both);
- socket.Client.Close();
- //socket.GetStream().Close();
-
- socket.Close();
- socket = null;
- log.Debug("====CloseSocket===== end");
- }
- }
- catch (Exception err)
- {
- log.Debug("===== socket === CloseSocket Error");
- log.Warn(err.Message + "\n" + err.StackTrace);
- if(socket != null)
- {
- socket.Close();
- socket = null;
- }
- }
- }
- private void _send_handshake(JsonObject user)
- {
- //Debug.Log("begin handshake!");
- var Version = "0.3.0";
- var Type = "unity-socket";
- if (user == null) user = new JsonObject();
- var msg = new JsonObject();
- //Build sys option
- var sys = new JsonObject();
- sys["version"] = Version;
- sys["type"] = Type;
- //Build handshake message
- msg["sys"] = sys;
- msg["user"] = user;
- var body = Message.UTF8.GetBytes(msg.ToString());
- var send_object = SendMessage.Alloc(PackageType.PKG_HANDSHAKE, body);
- startSend(send_object);
- }
- private void _received_handshake(JsonObject msg)
- {
- var code = msg["code"];
- var sys = msg["sys"] as JsonObject;
- if (code == null || sys == null || Convert.ToInt32(code) != 200)
- {
- throw new Exception("Handshake error! Please check your handshake config.");
- }
- object jobj;
- JsonObject dict;
- if (sys.TryGetValue("dict", out jobj))
- {
- dict = (JsonObject)jobj;
- }
- else
- {
- dict = new JsonObject();
- }
- //Init heartbeat service
- int interval = 0;
- if (sys.TryGetValue("heartbeat", out jobj))
- {
- interval = Convert.ToInt32(jobj);
- //_init_heartbeat(interval);
- }
- var send_object = SendMessage.Alloc(PackageType.PKG_HANDSHAKE_ACK);
- startSend(send_object);
- JsonObject user;
- if (msg.TryGetValue("user", out jobj))
- {
- user = (JsonObject)jobj;
- }
- else
- {
- user = new JsonObject();
- }
- DoNetState(NetWorkState.CONNECTED);
- _received_heartbeat();
- if (listener != null)
- {
- listener.OnConnected(socket.Client, user);
- }
-
- }
- private void _received_package(Message recv)
- {
- if(recv is RecvMessage)
- {
- RecvMessage recv_object = recv as RecvMessage;
- if (recv_object.PkgType == PackageType.PKG_HANDSHAKE)
- {
- var body = Message.UTF8.GetString(
- recv_object.Buffer,
- RecvMessage.FIXED_HEAD_SIZE,
- recv_object.PkgLength);
- JsonObject data = (JsonObject)SimpleJson.SimpleJson.DeserializeObject(body);
- _received_handshake(data);
- }
- else if (recv_object.PkgType == PackageType.PKG_HEARTBEAT)
- {
- listener.OnReceivedMessage(recv_object);
- }
- else if (recv_object.PkgType == PackageType.PKG_DATA)
- {
- recv_object.DecodeBody(this);
- listener.OnReceivedMessage(recv_object);
- }
- else if (recv_object.PkgType == PackageType.PKG_KICK)
- {
- DoNetState(NetWorkState.KICK);
- }
- }
- else
- {
- //异步处理 网络状态更新
- LocalNetStatus netStatus = recv as LocalNetStatus;
- if(netStatus != null)
- {
- DoNetState(netStatus.St);
- }
- }
- recv.Dispose();
- }
- private void _send_heartbeat()
- {
- var send_object = SendMessage.Alloc(PackageType.PKG_HEARTBEAT);
- startSend(send_object);
- }
- private void _received_heartbeat()
- {
- last_heartbeat_r2c = CUtils.CurrentTimeMS;
- }
- #endregion
- //--------------------------------------------------------------------------------------------------
- #region --收发--
- private void StartReadMessage()
- {
- try
- {
- if (!Connected)
- {
- return;
- }
- if (socket.GetStream().CanRead)
- {
- Array.Clear(_byteBuffer, 0, _byteBuffer.Length);
- //log.Debug("WorkStream.StartReadMessage ThreadID:" + Thread.CurrentThread.ManagedThreadId);
- socket.GetStream().BeginRead(_byteBuffer, 0, _byteBuffer.Length,
- new AsyncCallback(TCPReadCallBack), null);
- }
- else
- {
- log.Warn("Network IO problem not cantread");
- }
- }
- catch (Exception err)
- {
- log.Warn("StartReadMessage error :" + err.Message);
- }
- }
- private void TCPReadCallBack(IAsyncResult ar)
- {
- //主动断开时
- if (!Connected)
- {
- return;
- }
- int numberOfBytesRead = 0;
- try
- {
-
- numberOfBytesRead = socket.GetStream().EndRead(ar);
- if (numberOfBytesRead > 0)
- {
- pickpackage(_byteBuffer, numberOfBytesRead);
- Array.Clear(_byteBuffer, 0, _byteBuffer.Length);
- //log.Debug("socket WorkStream.StartReadMessage callback ThreadID:" + Thread.CurrentThread.ManagedThreadId);
- socket.GetStream().BeginRead(_byteBuffer, 0, _byteBuffer.Length,new AsyncCallback(TCPReadCallBack), null);
- }
- else
- {
- //被动断开时 回到登录界面
- log.Debug("===========socket recve numberOfBytesRead: recve = " + numberOfBytesRead.ToString());
- _set_net_state(NetWorkState.DISCONNECTED);
- }
- }
- catch (Exception err)
- {
- log.Warn("=======socket===TCPReadCallBack===Error>" + err.Message);
- _on_error(err);
- }
- }
- private long RemainingBytes()
- {
- return _memStream.Length - _memStream.Position;
- }
- private void pickpackage(byte[] bytes, int length)
- {
- _memStream.Seek(0, SeekOrigin.End);
- _memStream.Write(bytes, 0, length);
- _memStream.Seek(0, SeekOrigin.Begin);
- while (RemainingBytes() >= RecvMessage.FIXED_HEAD_SIZE)
- {
- PackageType PkgType = (PackageType)_reader.ReadByte();
- int PkgLength = (_reader.ReadByte() << 16) + (_reader.ReadByte() << 8) + _reader.ReadByte();
- long remainingLength = RemainingBytes();
- _memStream.Position -= RecvMessage.FIXED_HEAD_SIZE;
- if (remainingLength < PkgLength)
- {
- break;
- }
- int msgSize = RecvMessage.FIXED_HEAD_SIZE + PkgLength;
- RecvMessage recv_object = RecvMessage.Alloc();
- recv_object.stateSocket = socket;
- recv_object.Stream.Write(_reader.ReadBytes(msgSize), 0, msgSize);
- recv_object.DecodeHead();
- //Enqueue
- _received_heartbeat();
- AddMsg(recv_object);
- }
- var leftover = _reader.ReadBytes((int)RemainingBytes());
- _memStream.SetLength(0); //Clear
- _memStream.Write(leftover, 0, leftover.Length);
- }
- #endregion
- #region <<BeginSend 方式发送>>
- private void startSend(SendMessage send_object)
- {
- try
- {
- if (Connected)
- {
- socket.GetStream().BeginWrite(send_object.Buffer, 0, send_object.BufferLength, endSend, send_object);
- }
- }
- catch (Exception err)
- {
- send_object.Dispose();
- log.Warn("startSend Error>" + err.Message);
- }
- }
- private void endSend(IAsyncResult asyncSend)
- {
- var send_object = asyncSend.AsyncState as SendMessage;
- try
- {
- if (!Connected)
- {
- return;
- }
- socket.GetStream().EndWrite(asyncSend);
- }
- catch (Exception err)
- {
- CloseSocket();
- }
- finally
- {
- send_object.Dispose();
- }
- }
- #endregion
- private void AddMsg(Message msg)
- {
- if (msg != null)
- {
- lock (MsgQueue)
- {
- MsgQueue.Enqueue(msg);
- }
- }
- }
- private void ProcessMsg()
- {
- lock (MsgQueue)
- {
- long last = CUtils.CurrentTimeMS;
- while (MsgQueue.Count > 0)
- {
- if(CUtils.CurrentTimeMS - last > 100)
- {
- break;
- }
- var recv_object = MsgQueue.Dequeue();
- if (recv_object != null)
- {
- _received_package(recv_object);
- }
- }
- }
- }
- private void ClearMsgQueue()
- {
- lock(MsgQueue)
- {
- while (MsgQueue.Count > 0)
- {
- var recv_object = MsgQueue.Dequeue();
- if (recv_object != null)
- {
- recv_object.Dispose();
- }
- }
- }
- }
- #region ipv4||6
- ////////////////////////////////////
- [DllImport("__Internal")]
- private static extern string getIPv6(string mHost, string mPort);
- public static string GetIPv6(string mHost, string mPort)
- {
- #if UNITY_IPHONE && !UNITY_EDITOR
- string mIPv6 = getIPv6(mHost, mPort);
- return mIPv6;
- #else
- return mHost + "&&ipv4";
- #endif
- }
- private void GetIPType(string serverIp, string serverPorts, out string newServerIp, out AddressFamily mIPType)
- {
- mIPType = AddressFamily.InterNetwork;
- newServerIp = serverIp;
- try
- {
- var mIPv6 = GetIPv6(serverIp, serverPorts);
- if (!string.IsNullOrEmpty(mIPv6))
- {
- var m_StrTemp = Regex.Split(mIPv6, "&&");
- if (m_StrTemp != null && m_StrTemp.Length >= 2)
- {
- var IPType = m_StrTemp[1];
- if (IPType == "ipv6")
- {
- newServerIp = m_StrTemp[0];
- mIPType = AddressFamily.InterNetworkV6;
- }
- }
- }
- }
- catch (Exception e)
- {
- log.Log("GetIPv6 error:" + e);
- }
- }
- #endregion
- }
- }
|