PomeloClient.TCP.cs 21 KB


  1. using CommonLang;
  2. using CommonLang.Concurrent;
  3. using CommonLang.Log;
  4. using SimpleJson;
  5. using System;
  6. using System.Collections.Generic;
  7. using System.IO;
  8. using System.Net;
  9. using System.Net.Sockets;
  10. using System.Runtime.InteropServices;
  11. using System.Text.RegularExpressions;
  12. using System.Threading;
  13. namespace Pomelo.DotNetClient
  14. {
  15. public class PomeloTCP : PomeloClientAdapter
  16. {
  17. protected readonly IPomeloClientAdapterListener listener;
  18. private long last_heartbeat_r2c = CUtils.CurrentTimeMS;
  19. private long last_heartbeat_chk = CUtils.CurrentTimeMS;
  20. private long total_recv_bytes = 0;
  21. private long total_sent_bytes = 0;
  22. private const int _SendTimeOut = 3000;
  23. private const int _ReceiveTimeOut = 3000;
  24. private const int _MaxRead = 1024 * 16;
  25. private readonly byte[] _byteBuffer = new byte[_MaxRead];
  26. private MemoryStream _memStream = null;
  27. private BinaryReader _reader = null;
  28. private NetWorkState mCurStatus = NetWorkState.NONE;
  29. private Logger log = LoggerFactory.GetLogger("PomeloTCP");
  30. Queue<Message> MsgQueue = new Queue<Message>();
  31. protected class Connecting
  32. {
  33. public string _host;
  34. public int _port;
  35. public int _timeout;
  36. public JsonObject _user;
  37. }
  38. protected TcpClient socket;
  39. protected Connecting _NetInfo;
  40. public Socket Session
  41. {
  42. get { return (socket != null) ? socket.Client : null; }
  43. }
  44. public NetWorkState NetWorkState
  45. {
  46. get { return mCurStatus; }
  47. }
  48. public long TotalRecvBytes { get { return total_recv_bytes; } }
  49. public long TotalSentBytes { get { return total_sent_bytes; } }
  50. public void DoSendUpdate() { }
  51. public bool Connected
  52. {
  53. get
  54. {
  55. return socket != null &&
  56. socket.Connected &&
  57. socket.GetStream().CanRead &&
  58. socket.GetStream().CanWrite;
  59. }
  60. }
  61. public PomeloTCP(IPomeloClientAdapterListener listener)
  62. {
  63. socket = null;
  64. this.listener = listener;
  65. _memStream = new MemoryStream();
  66. _reader = new BinaryReader(_memStream);
  67. }
  68. public long GetPing()
  69. {
  70. return CUtils.CurrentTimeMS - last_heartbeat_r2c;
  71. }
  72. //--------------------------------------------------------------------------------------------------
  73. #region --Interface--
  74. public virtual void connect(string host, int port, int timeout, JsonObject user)
  75. {
  76. log.Debug("===========socket connect begin " + Thread.CurrentThread.ManagedThreadId);
  77. if (Connected)
  78. {
  79. //send msg
  80. if (listener != null)
  81. {
  82. log.Debug("===========socket connect call function ");
  83. listener.OnConnected(socket.Client, _NetInfo._user);
  84. }
  85. return;
  86. }
  87. //已经有一个连接在进行中
  88. if(mCurStatus == NetWorkState.CONNECTING)
  89. {
  90. JsonObject json = new JsonObject();
  91. DoNetState(NetWorkState.ERROR);
  92. json["s2c_code"] = 500;
  93. json["s2c_msg"] = "socket connecting";
  94. listener.OnConnected(socket.Client, json);
  95. return;
  96. }
  97. _NetInfo = new Connecting()
  98. {
  99. _host = host,
  100. _port = port,
  101. _timeout = timeout,
  102. _user = user,
  103. };
  104. DoNetState(NetWorkState.CONNECTING);
  105. string newIp = null;
  106. AddressFamily addressFamily;
  107. GetIPType(host, port.ToString(), out newIp, out addressFamily);
  108. _NetInfo._host = newIp;
  109. _NetInfo._port = port;
  110. this.socket = new TcpClient(addressFamily)
  111. {
  112. SendTimeout = _SendTimeOut,
  113. ReceiveTimeout = _ReceiveTimeOut,
  114. NoDelay = true
  115. };
  116. socket.Client.Blocking = true;
  117. //connect
  118. try
  119. {
  120. log.Debug("===========socket connect call socket.BeginConnect " + _NetInfo._host + ":" + _NetInfo._port);
  121. _received_heartbeat();
  122. socket.BeginConnect(_NetInfo._host, _NetInfo._port, new AsyncCallback(OnConnect), socket);
  123. }
  124. catch (Exception err)
  125. {
  126. if (err is SocketException)
  127. {
  128. JsonObject json = new JsonObject();
  129. switch ((err as SocketException).SocketErrorCode)
  130. {
  131. case SocketError.TimedOut:
  132. DoNetState(NetWorkState.TIMEOUT);
  133. json["s2c_code"] = 501;
  134. json["s2c_msg"] = err.Message;
  135. break;
  136. default:
  137. DoNetState(NetWorkState.ERROR);
  138. json["s2c_code"] = 500;
  139. json["s2c_msg"] = err.Message;
  140. break;
  141. }
  142. listener.OnConnected(socket.Client, json);
  143. }
  144. else
  145. {
  146. log.Debug("===========socket connect call socket.BeginConnect CloseSocket");
  147. CloseSocket();
  148. }
  149. }
  150. }
  151. private void OnConnect(IAsyncResult ar)
  152. {
  153. //reset stream
  154. try
  155. {
  156. socket.EndConnect(ar);
  157. if (Connected)
  158. {
  159. _memStream.SetLength(0);
  160. _memStream.Position = 0;
  161. Array.Clear(_byteBuffer, 0, _byteBuffer.Length);
  162. StartReadMessage();
  163. _send_handshake(_NetInfo._user);
  164. log.Debug("socket connect success ");
  165. }
  166. else
  167. {
  168. log.Debug("socket connect faile ");
  169. this._set_net_state(NetWorkState.DISCONNECTED);
  170. }
  171. }
  172. catch (Exception e)
  173. {
  174. log.Warn("===========OnConnect========== Error" + e.Message);
  175. CloseSocket();
  176. _on_error(e);
  177. }
  178. }
  179. public virtual void disconnect()
  180. {
  181. log.Debug("socket disconnect " + Thread.CurrentThread.ManagedThreadId);
  182. DoNetState(NetWorkState.CLOSED);
  183. CloseSocket();
  184. ClearMsgQueue();
  185. if(listener != null)
  186. {
  187. listener.OnDisconnected(null, "CLOSED");
  188. }
  189. }
  190. public virtual bool resolve_route(RecvMessage msg, byte flag, out string route)
  191. {
  192. if ((flag & RecvMessage.MSG_Route_Mask) == 1)
  193. {
  194. ushort routeId = RecvMessage.ReadShort(msg.Stream);
  195. route = "area.playerPush.battleEventPush";
  196. return true;
  197. }
  198. route = null;
  199. return false;
  200. }
  201. public virtual void update(float deltime)
  202. {
  203. ProcessMsg();
  204. }
  205. public virtual void send(SendMessage send_object)
  206. {
  207. try
  208. {
  209. startSend(send_object);
  210. }
  211. catch (Exception err) {
  212. CloseSocket();
  213. log.Warn("socket startSend Error:"+ err.StackTrace);
  214. _on_error(err);
  215. }
  216. }
  217. public virtual void disposing()
  218. {
  219. log.Debug("socket disposing " + Thread.CurrentThread.ManagedThreadId);
  220. if (_memStream != null)
  221. {
  222. _memStream.Close();
  223. _memStream = null;
  224. }
  225. if (_reader != null)
  226. {
  227. _reader.Close();
  228. _reader = null;
  229. }
  230. disconnect();
  231. }
  232. #endregion
  233. //--------------------------------------------------------------------------------------------------
  234. #region --Internal--
  235. private void _on_error(Exception err)
  236. {
  237. log.Warn("_on_error ThreadID:" + Thread.CurrentThread.ManagedThreadId);
  238. _set_net_state(NetWorkState.DISCONNECTED);
  239. }
  240. private void _set_net_state(NetWorkState st)
  241. {
  242. LocalNetStatus recv_object = LocalNetStatus.Alloc(st);
  243. AddMsg(recv_object);
  244. }
  245. private bool DoNetState(NetWorkState st)
  246. {
  247. if (mCurStatus != st)
  248. {
  249. if (NetWorkState.CLOSED == mCurStatus && NetWorkState.DISCONNECTED == st)
  250. {
  251. //DISCONNECTED 为重连标识,所以只能在connected状态下切换
  252. return false;
  253. }
  254. mCurStatus = st;
  255. if (st != NetWorkState.NONE)
  256. {
  257. if(NetWorkState.DISCONNECTED == mCurStatus)
  258. {
  259. CloseSocket();
  260. ClearMsgQueue();
  261. }
  262. listener.OnNetWorkStateChanged(st);
  263. return true;
  264. }
  265. }
  266. return false;
  267. }
  268. private void CloseSocket()
  269. {
  270. try
  271. {
  272. log.Debug("====CloseSocket=====");
  273. if (Connected)
  274. {
  275. socket.Client.Shutdown(SocketShutdown.Both);
  276. socket.Client.Close();
  277. //socket.GetStream().Close();
  278. socket.Close();
  279. socket = null;
  280. log.Debug("====CloseSocket===== end");
  281. }
  282. }
  283. catch (Exception err)
  284. {
  285. log.Debug("===== socket === CloseSocket Error");
  286. log.Warn(err.Message + "\n" + err.StackTrace);
  287. if(socket != null)
  288. {
  289. socket.Close();
  290. socket = null;
  291. }
  292. }
  293. }
  294. private void _send_handshake(JsonObject user)
  295. {
  296. //Debug.Log("begin handshake!");
  297. var Version = "0.3.0";
  298. var Type = "unity-socket";
  299. if (user == null) user = new JsonObject();
  300. var msg = new JsonObject();
  301. //Build sys option
  302. var sys = new JsonObject();
  303. sys["version"] = Version;
  304. sys["type"] = Type;
  305. //Build handshake message
  306. msg["sys"] = sys;
  307. msg["user"] = user;
  308. var body = Message.UTF8.GetBytes(msg.ToString());
  309. var send_object = SendMessage.Alloc(PackageType.PKG_HANDSHAKE, body);
  310. startSend(send_object);
  311. }
  312. private void _received_handshake(JsonObject msg)
  313. {
  314. var code = msg["code"];
  315. var sys = msg["sys"] as JsonObject;
  316. if (code == null || sys == null || Convert.ToInt32(code) != 200)
  317. {
  318. throw new Exception("Handshake error! Please check your handshake config.");
  319. }
  320. object jobj;
  321. JsonObject dict;
  322. if (sys.TryGetValue("dict", out jobj))
  323. {
  324. dict = (JsonObject)jobj;
  325. }
  326. else
  327. {
  328. dict = new JsonObject();
  329. }
  330. //Init heartbeat service
  331. int interval = 0;
  332. if (sys.TryGetValue("heartbeat", out jobj))
  333. {
  334. interval = Convert.ToInt32(jobj);
  335. //_init_heartbeat(interval);
  336. }
  337. var send_object = SendMessage.Alloc(PackageType.PKG_HANDSHAKE_ACK);
  338. startSend(send_object);
  339. JsonObject user;
  340. if (msg.TryGetValue("user", out jobj))
  341. {
  342. user = (JsonObject)jobj;
  343. }
  344. else
  345. {
  346. user = new JsonObject();
  347. }
  348. DoNetState(NetWorkState.CONNECTED);
  349. _received_heartbeat();
  350. if (listener != null)
  351. {
  352. listener.OnConnected(socket.Client, user);
  353. }
  354. }
  355. private void _received_package(Message recv)
  356. {
  357. if(recv is RecvMessage)
  358. {
  359. RecvMessage recv_object = recv as RecvMessage;
  360. if (recv_object.PkgType == PackageType.PKG_HANDSHAKE)
  361. {
  362. var body = Message.UTF8.GetString(
  363. recv_object.Buffer,
  364. RecvMessage.FIXED_HEAD_SIZE,
  365. recv_object.PkgLength);
  366. JsonObject data = (JsonObject)SimpleJson.SimpleJson.DeserializeObject(body);
  367. _received_handshake(data);
  368. }
  369. else if (recv_object.PkgType == PackageType.PKG_HEARTBEAT)
  370. {
  371. listener.OnReceivedMessage(recv_object);
  372. }
  373. else if (recv_object.PkgType == PackageType.PKG_DATA)
  374. {
  375. recv_object.DecodeBody(this);
  376. listener.OnReceivedMessage(recv_object);
  377. }
  378. else if (recv_object.PkgType == PackageType.PKG_KICK)
  379. {
  380. DoNetState(NetWorkState.KICK);
  381. }
  382. }
  383. else
  384. {
  385. //异步处理 网络状态更新
  386. LocalNetStatus netStatus = recv as LocalNetStatus;
  387. if(netStatus != null)
  388. {
  389. DoNetState(netStatus.St);
  390. }
  391. }
  392. recv.Dispose();
  393. }
  394. private void _send_heartbeat()
  395. {
  396. var send_object = SendMessage.Alloc(PackageType.PKG_HEARTBEAT);
  397. startSend(send_object);
  398. }
  399. private void _received_heartbeat()
  400. {
  401. last_heartbeat_r2c = CUtils.CurrentTimeMS;
  402. }
  403. #endregion
  404. //--------------------------------------------------------------------------------------------------
  405. #region --收发--
  406. private void StartReadMessage()
  407. {
  408. try
  409. {
  410. if (!Connected)
  411. {
  412. return;
  413. }
  414. if (socket.GetStream().CanRead)
  415. {
  416. Array.Clear(_byteBuffer, 0, _byteBuffer.Length);
  417. //log.Debug("WorkStream.StartReadMessage ThreadID:" + Thread.CurrentThread.ManagedThreadId);
  418. socket.GetStream().BeginRead(_byteBuffer, 0, _byteBuffer.Length,
  419. new AsyncCallback(TCPReadCallBack), null);
  420. }
  421. else
  422. {
  423. log.Warn("Network IO problem not cantread");
  424. }
  425. }
  426. catch (Exception err)
  427. {
  428. log.Warn("StartReadMessage error :" + err.Message);
  429. }
  430. }
  431. private void TCPReadCallBack(IAsyncResult ar)
  432. {
  433. //主动断开时
  434. if (!Connected)
  435. {
  436. return;
  437. }
  438. int numberOfBytesRead = 0;
  439. try
  440. {
  441. numberOfBytesRead = socket.GetStream().EndRead(ar);
  442. if (numberOfBytesRead > 0)
  443. {
  444. pickpackage(_byteBuffer, numberOfBytesRead);
  445. Array.Clear(_byteBuffer, 0, _byteBuffer.Length);
  446. //log.Debug("socket WorkStream.StartReadMessage callback ThreadID:" + Thread.CurrentThread.ManagedThreadId);
  447. socket.GetStream().BeginRead(_byteBuffer, 0, _byteBuffer.Length,new AsyncCallback(TCPReadCallBack), null);
  448. }
  449. else
  450. {
  451. //被动断开时 回到登录界面
  452. log.Debug("===========socket recve numberOfBytesRead: recve = " + numberOfBytesRead.ToString());
  453. _set_net_state(NetWorkState.DISCONNECTED);
  454. }
  455. }
  456. catch (Exception err)
  457. {
  458. log.Warn("=======socket===TCPReadCallBack===Error>" + err.Message);
  459. _on_error(err);
  460. }
  461. }
  462. private long RemainingBytes()
  463. {
  464. return _memStream.Length - _memStream.Position;
  465. }
  466. private void pickpackage(byte[] bytes, int length)
  467. {
  468. _memStream.Seek(0, SeekOrigin.End);
  469. _memStream.Write(bytes, 0, length);
  470. _memStream.Seek(0, SeekOrigin.Begin);
  471. while (RemainingBytes() >= RecvMessage.FIXED_HEAD_SIZE)
  472. {
  473. PackageType PkgType = (PackageType)_reader.ReadByte();
  474. int PkgLength = (_reader.ReadByte() << 16) + (_reader.ReadByte() << 8) + _reader.ReadByte();
  475. long remainingLength = RemainingBytes();
  476. _memStream.Position -= RecvMessage.FIXED_HEAD_SIZE;
  477. if (remainingLength < PkgLength)
  478. {
  479. break;
  480. }
  481. int msgSize = RecvMessage.FIXED_HEAD_SIZE + PkgLength;
  482. RecvMessage recv_object = RecvMessage.Alloc();
  483. recv_object.stateSocket = socket;
  484. recv_object.Stream.Write(_reader.ReadBytes(msgSize), 0, msgSize);
  485. recv_object.DecodeHead();
  486. //Enqueue
  487. _received_heartbeat();
  488. AddMsg(recv_object);
  489. }
  490. var leftover = _reader.ReadBytes((int)RemainingBytes());
  491. _memStream.SetLength(0); //Clear
  492. _memStream.Write(leftover, 0, leftover.Length);
  493. }
  494. #endregion
  495. #region <<BeginSend 方式发送>>
  496. private void startSend(SendMessage send_object)
  497. {
  498. try
  499. {
  500. if (Connected)
  501. {
  502. socket.GetStream().BeginWrite(send_object.Buffer, 0, send_object.BufferLength, endSend, send_object);
  503. }
  504. }
  505. catch (Exception err)
  506. {
  507. send_object.Dispose();
  508. log.Warn("startSend Error>" + err.Message);
  509. }
  510. }
  511. private void endSend(IAsyncResult asyncSend)
  512. {
  513. var send_object = asyncSend.AsyncState as SendMessage;
  514. try
  515. {
  516. if (!Connected)
  517. {
  518. return;
  519. }
  520. socket.GetStream().EndWrite(asyncSend);
  521. }
  522. catch (Exception err)
  523. {
  524. CloseSocket();
  525. }
  526. finally
  527. {
  528. send_object.Dispose();
  529. }
  530. }
  531. #endregion
  532. private void AddMsg(Message msg)
  533. {
  534. if (msg != null)
  535. {
  536. lock (MsgQueue)
  537. {
  538. MsgQueue.Enqueue(msg);
  539. }
  540. }
  541. }
  542. private void ProcessMsg()
  543. {
  544. lock (MsgQueue)
  545. {
  546. long last = CUtils.CurrentTimeMS;
  547. while (MsgQueue.Count > 0)
  548. {
  549. if(CUtils.CurrentTimeMS - last > 100)
  550. {
  551. break;
  552. }
  553. var recv_object = MsgQueue.Dequeue();
  554. if (recv_object != null)
  555. {
  556. _received_package(recv_object);
  557. }
  558. }
  559. }
  560. }
  561. private void ClearMsgQueue()
  562. {
  563. lock(MsgQueue)
  564. {
  565. while (MsgQueue.Count > 0)
  566. {
  567. var recv_object = MsgQueue.Dequeue();
  568. if (recv_object != null)
  569. {
  570. recv_object.Dispose();
  571. }
  572. }
  573. }
  574. }
  575. #region ipv4||6
  576. ////////////////////////////////////
  577. [DllImport("__Internal")]
  578. private static extern string getIPv6(string mHost, string mPort);
  579. public static string GetIPv6(string mHost, string mPort)
  580. {
  581. #if UNITY_IPHONE && !UNITY_EDITOR
  582. string mIPv6 = getIPv6(mHost, mPort);
  583. return mIPv6;
  584. #else
  585. return mHost + "&&ipv4";
  586. #endif
  587. }
  588. private void GetIPType(string serverIp, string serverPorts, out string newServerIp, out AddressFamily mIPType)
  589. {
  590. mIPType = AddressFamily.InterNetwork;
  591. newServerIp = serverIp;
  592. try
  593. {
  594. var mIPv6 = GetIPv6(serverIp, serverPorts);
  595. if (!string.IsNullOrEmpty(mIPv6))
  596. {
  597. var m_StrTemp = Regex.Split(mIPv6, "&&");
  598. if (m_StrTemp != null && m_StrTemp.Length >= 2)
  599. {
  600. var IPType = m_StrTemp[1];
  601. if (IPType == "ipv6")
  602. {
  603. newServerIp = m_StrTemp[0];
  604. mIPType = AddressFamily.InterNetworkV6;
  605. }
  606. }
  607. }
  608. }
  609. catch (Exception e)
  610. {
  611. log.Log("GetIPv6 error:" + e);
  612. }
  613. }
  614. #endregion
  615. }
  616. }