PomeloClient.TCP.cs 21 KB


  1. using CommonLang;
  2. using CommonLang.Concurrent;
  3. using CommonLang.Log;
  4. using SimpleJson;
  5. using System;
  6. using System.Collections.Generic;
  7. using System.IO;
  8. using System.Net;
  9. using System.Net.Sockets;
  10. using System.Runtime.InteropServices;
  11. using System.Text.RegularExpressions;
  12. using System.Threading;
  13. namespace Pomelo.DotNetClient
  14. {
  15. public class PomeloTCP : PomeloClientAdapter
  16. {
  17. protected readonly IPomeloClientAdapterListener listener;
  18. private long last_heartbeat_r2c = CUtils.CurrentTimeMS;
  19. private long last_heartbeat_chk = CUtils.CurrentTimeMS;
  20. private long total_recv_bytes = 0;
  21. private long total_sent_bytes = 0;
  22. private const int _SendTimeOut = 3000;
  23. private const int _ReceiveTimeOut = 3000;
  24. private const int _MaxRead = 1024 * 16;
  25. private readonly byte[] _byteBuffer = new byte[_MaxRead];
  26. private MemoryStream _memStream = null;
  27. private BinaryReader _reader = null;
  28. private NetWorkState mCurStatus = NetWorkState.NONE;
  29. private Logger log = LoggerFactory.GetLogger("PomeloTCP");
  30. Queue<Message> MsgQueue = new Queue<Message>();
  31. protected class Connecting
  32. {
  33. public string _host;
  34. public int _port;
  35. public int _timeout;
  36. public JsonObject _user;
  37. }
  38. protected TcpClient socket;
  39. protected Connecting _NetInfo;
  40. public Socket Session
  41. {
  42. get { return (socket != null) ? socket.Client : null; }
  43. }
  44. public NetWorkState NetWorkState
  45. {
  46. get { return mCurStatus; }
  47. }
  48. public long TotalRecvBytes { get { return total_recv_bytes; } }
  49. public long TotalSentBytes { get { return total_sent_bytes; } }
  50. public void DoSendUpdate() { }
  51. public bool Connected
  52. {
  53. get
  54. {
  55. return socket != null &&
  56. socket.Connected &&
  57. socket.GetStream().CanRead &&
  58. socket.GetStream().CanWrite;
  59. }
  60. }
  61. public PomeloTCP(IPomeloClientAdapterListener listener)
  62. {
  63. socket = null;
  64. this.listener = listener;
  65. _memStream = new MemoryStream();
  66. _reader = new BinaryReader(_memStream);
  67. }
  68. public long GetPing()
  69. {
  70. return CUtils.CurrentTimeMS - last_heartbeat_r2c;
  71. }
  72. //--------------------------------------------------------------------------------------------------
  73. #region --Interface--
  74. public virtual void connect(string host, int port, int timeout, JsonObject user)
  75. {
  76. log.Debug("===========socket connect begin " + Thread.CurrentThread.ManagedThreadId);
  77. if (Connected)
  78. {
  79. //send msg
  80. if (listener != null)
  81. {
  82. log.Debug("===========socket connect call function ");
  83. listener.OnConnected(socket.Client, _NetInfo._user);
  84. }
  85. return;
  86. }
  87. //已经有一个连接在进行中
  88. if(mCurStatus == NetWorkState.CONNECTING)
  89. {
  90. JsonObject json = new JsonObject();
  91. DoNetState(NetWorkState.ERROR);
  92. json["s2c_code"] = 500;
  93. json["s2c_msg"] = "socket connecting";
  94. listener.OnConnected(socket.Client, json);
  95. return;
  96. }
  97. _NetInfo = new Connecting()
  98. {
  99. _host = host,
  100. _port = port,
  101. _timeout = timeout,
  102. _user = user,
  103. };
  104. DoNetState(NetWorkState.CONNECTING);
  105. string newIp = null;
  106. AddressFamily addressFamily;
  107. GetIPType(host, port.ToString(), out newIp, out addressFamily);
  108. _NetInfo._host = newIp;
  109. _NetInfo._port = port;
  110. this.socket = new TcpClient(addressFamily)
  111. {
  112. SendTimeout = _SendTimeOut,
  113. ReceiveTimeout = _ReceiveTimeOut,
  114. NoDelay = true
  115. };
  116. socket.Client.Blocking = true;
  117. try
  118. {
  119. log.Debug("===========socket connect call socket.BeginConnect " + _NetInfo._host + ":" + _NetInfo._port);
  120. _received_heartbeat();
  121. conResult = socket.BeginConnect(_NetInfo._host, _NetInfo._port, new AsyncCallback(OnConnect), socket);
  122. }
  123. catch (Exception err)
  124. {
  125. if (err is SocketException)
  126. {
  127. JsonObject json = new JsonObject();
  128. DoNetState(NetWorkState.ERROR);
  129. json["s2c_code"] = 500;
  130. json["s2c_msg"] = err.Message;
  131. listener.OnConnected(socket.Client, json);
  132. }
  133. else
  134. {
  135. log.Debug("===========socket connect call socket.BeginConnect CloseSocket");
  136. CloseSocket();
  137. }
  138. }
  139. }
  140. private void OnConnect(IAsyncResult ar)
  141. {
  142. //reset stream
  143. try
  144. {
  145. socket.EndConnect(ar);
  146. if (Connected)
  147. {
  148. _memStream.SetLength(0);
  149. _memStream.Position = 0;
  150. Array.Clear(_byteBuffer, 0, _byteBuffer.Length);
  151. StartReadMessage();
  152. _send_handshake(_NetInfo._user);
  153. log.Debug("socket connect success ");
  154. }
  155. else
  156. {
  157. log.Debug("socket connect faile ");
  158. this._set_net_state(NetWorkState.DISCONNECTED);
  159. }
  160. }
  161. catch (Exception e)
  162. {
  163. log.Warn("===========OnConnect========== Error" + e.Message);
  164. CloseSocket();
  165. _on_error(e);
  166. }
  167. }
  168. public virtual void disconnect()
  169. {
  170. log.Debug("socket disconnect " + Thread.CurrentThread.ManagedThreadId);
  171. DoNetState(NetWorkState.CLOSED);
  172. CloseSocket();
  173. ClearMsgQueue();
  174. if(listener != null)
  175. {
  176. listener.OnDisconnected(null, "CLOSED");
  177. }
  178. }
  179. public virtual bool resolve_route(RecvMessage msg, byte flag, out string route)
  180. {
  181. if ((flag & RecvMessage.MSG_Route_Mask) == 1)
  182. {
  183. ushort routeId = RecvMessage.ReadShort(msg.Stream);
  184. route = "area.playerPush.battleEventPush";
  185. return true;
  186. }
  187. route = null;
  188. return false;
  189. }
  190. IAsyncResult conResult;
  191. private float conTime;
  192. public virtual void update(float deltime)
  193. {
  194. if (mCurStatus == NetWorkState.CONNECTING)
  195. {
  196. conTime += deltime * 1000;
  197. if (conTime > _NetInfo._timeout && conResult.IsCompleted == false)
  198. {
  199. conTime = 0;
  200. socket.Client.Close();
  201. socket.Close();
  202. socket = null;
  203. }
  204. }
  205. else
  206. {
  207. conTime = 0;
  208. }
  209. ProcessMsg();
  210. }
  211. public virtual void send(SendMessage send_object)
  212. {
  213. try
  214. {
  215. startSend(send_object);
  216. }
  217. catch (Exception err) {
  218. CloseSocket();
  219. log.Warn("socket startSend Error:"+ err.StackTrace);
  220. _on_error(err);
  221. }
  222. }
  223. public virtual void disposing()
  224. {
  225. log.Debug("socket disposing " + Thread.CurrentThread.ManagedThreadId);
  226. if (_memStream != null)
  227. {
  228. _memStream.Close();
  229. _memStream = null;
  230. }
  231. if (_reader != null)
  232. {
  233. _reader.Close();
  234. _reader = null;
  235. }
  236. disconnect();
  237. }
  238. #endregion
  239. //--------------------------------------------------------------------------------------------------
  240. #region --Internal--
  241. private void _on_error(Exception err)
  242. {
  243. log.Warn("_on_error ThreadID:" + Thread.CurrentThread.ManagedThreadId);
  244. _set_net_state(NetWorkState.DISCONNECTED);
  245. }
  246. private void _set_net_state(NetWorkState st)
  247. {
  248. LocalNetStatus recv_object = LocalNetStatus.Alloc(st);
  249. AddMsg(recv_object);
  250. }
  251. private bool DoNetState(NetWorkState st)
  252. {
  253. if (mCurStatus != st)
  254. {
  255. if (NetWorkState.CLOSED == mCurStatus && NetWorkState.DISCONNECTED == st)
  256. {
  257. //DISCONNECTED 为重连标识,所以只能在connected状态下切换
  258. return false;
  259. }
  260. mCurStatus = st;
  261. if (st != NetWorkState.NONE)
  262. {
  263. if(NetWorkState.DISCONNECTED == mCurStatus)
  264. {
  265. CloseSocket();
  266. ClearMsgQueue();
  267. }
  268. listener.OnNetWorkStateChanged(st);
  269. return true;
  270. }
  271. }
  272. return false;
  273. }
  274. private void CloseSocket()
  275. {
  276. try
  277. {
  278. log.Debug("====CloseSocket=====");
  279. if (Connected)
  280. {
  281. socket.Client.Shutdown(SocketShutdown.Both);
  282. socket.Client.Close();
  283. //socket.GetStream().Close();
  284. socket.Close();
  285. socket = null;
  286. log.Debug("====CloseSocket===== end");
  287. }
  288. }
  289. catch (Exception err)
  290. {
  291. log.Debug("===== socket === CloseSocket Error");
  292. log.Warn(err.Message + "\n" + err.StackTrace);
  293. if(socket != null)
  294. {
  295. socket.Close();
  296. socket = null;
  297. }
  298. }
  299. }
  300. private void _send_handshake(JsonObject user)
  301. {
  302. //Debug.Log("begin handshake!");
  303. var Version = "0.3.0";
  304. var Type = "unity-socket";
  305. if (user == null) user = new JsonObject();
  306. var msg = new JsonObject();
  307. //Build sys option
  308. var sys = new JsonObject();
  309. sys["version"] = Version;
  310. sys["type"] = Type;
  311. //Build handshake message
  312. msg["sys"] = sys;
  313. msg["user"] = user;
  314. var body = Message.UTF8.GetBytes(msg.ToString());
  315. var send_object = SendMessage.Alloc(PackageType.PKG_HANDSHAKE, body);
  316. startSend(send_object);
  317. }
  318. private void _received_handshake(JsonObject msg)
  319. {
  320. var code = msg["code"];
  321. var sys = msg["sys"] as JsonObject;
  322. if (code == null || sys == null || Convert.ToInt32(code) != 200)
  323. {
  324. throw new Exception("Handshake error! Please check your handshake config.");
  325. }
  326. object jobj;
  327. JsonObject dict;
  328. if (sys.TryGetValue("dict", out jobj))
  329. {
  330. dict = (JsonObject)jobj;
  331. }
  332. else
  333. {
  334. dict = new JsonObject();
  335. }
  336. //Init heartbeat service
  337. int interval = 0;
  338. if (sys.TryGetValue("heartbeat", out jobj))
  339. {
  340. interval = Convert.ToInt32(jobj);
  341. //_init_heartbeat(interval);
  342. }
  343. var send_object = SendMessage.Alloc(PackageType.PKG_HANDSHAKE_ACK);
  344. startSend(send_object);
  345. JsonObject user;
  346. if (msg.TryGetValue("user", out jobj))
  347. {
  348. user = (JsonObject)jobj;
  349. }
  350. else
  351. {
  352. user = new JsonObject();
  353. }
  354. DoNetState(NetWorkState.CONNECTED);
  355. _received_heartbeat();
  356. if (listener != null)
  357. {
  358. listener.OnConnected(socket.Client, user);
  359. }
  360. }
  361. private void _received_package(Message recv)
  362. {
  363. if(recv is RecvMessage)
  364. {
  365. RecvMessage recv_object = recv as RecvMessage;
  366. if (recv_object.PkgType == PackageType.PKG_HANDSHAKE)
  367. {
  368. var body = Message.UTF8.GetString(
  369. recv_object.Buffer,
  370. RecvMessage.FIXED_HEAD_SIZE,
  371. recv_object.PkgLength);
  372. JsonObject data = (JsonObject)SimpleJson.SimpleJson.DeserializeObject(body);
  373. _received_handshake(data);
  374. }
  375. else if (recv_object.PkgType == PackageType.PKG_HEARTBEAT)
  376. {
  377. listener.OnReceivedMessage(recv_object);
  378. }
  379. else if (recv_object.PkgType == PackageType.PKG_DATA)
  380. {
  381. recv_object.DecodeBody(this);
  382. listener.OnReceivedMessage(recv_object);
  383. }
  384. else if (recv_object.PkgType == PackageType.PKG_KICK)
  385. {
  386. DoNetState(NetWorkState.KICK);
  387. }
  388. }
  389. else
  390. {
  391. //异步处理 网络状态更新
  392. LocalNetStatus netStatus = recv as LocalNetStatus;
  393. if(netStatus != null)
  394. {
  395. DoNetState(netStatus.St);
  396. }
  397. }
  398. recv.Dispose();
  399. }
  400. private void _send_heartbeat()
  401. {
  402. var send_object = SendMessage.Alloc(PackageType.PKG_HEARTBEAT);
  403. startSend(send_object);
  404. }
  405. private void _received_heartbeat()
  406. {
  407. last_heartbeat_r2c = CUtils.CurrentTimeMS;
  408. }
  409. #endregion
  410. //--------------------------------------------------------------------------------------------------
  411. #region --收发--
  412. private void StartReadMessage()
  413. {
  414. try
  415. {
  416. if (!Connected)
  417. {
  418. return;
  419. }
  420. if (socket.GetStream().CanRead)
  421. {
  422. Array.Clear(_byteBuffer, 0, _byteBuffer.Length);
  423. //log.Debug("WorkStream.StartReadMessage ThreadID:" + Thread.CurrentThread.ManagedThreadId);
  424. socket.GetStream().BeginRead(_byteBuffer, 0, _byteBuffer.Length,
  425. new AsyncCallback(TCPReadCallBack), null);
  426. }
  427. else
  428. {
  429. log.Warn("Network IO problem not cantread");
  430. }
  431. }
  432. catch (Exception err)
  433. {
  434. log.Warn("StartReadMessage error :" + err.Message);
  435. }
  436. }
  437. private void TCPReadCallBack(IAsyncResult ar)
  438. {
  439. //主动断开时
  440. if (!Connected)
  441. {
  442. return;
  443. }
  444. int numberOfBytesRead = 0;
  445. try
  446. {
  447. numberOfBytesRead = socket.GetStream().EndRead(ar);
  448. if (numberOfBytesRead > 0)
  449. {
  450. pickpackage(_byteBuffer, numberOfBytesRead);
  451. Array.Clear(_byteBuffer, 0, _byteBuffer.Length);
  452. //log.Debug("socket WorkStream.StartReadMessage callback ThreadID:" + Thread.CurrentThread.ManagedThreadId);
  453. socket.GetStream().BeginRead(_byteBuffer, 0, _byteBuffer.Length,new AsyncCallback(TCPReadCallBack), null);
  454. }
  455. else
  456. {
  457. //被动断开时 回到登录界面
  458. log.Debug("===========socket recve numberOfBytesRead: recve = " + numberOfBytesRead.ToString());
  459. _set_net_state(NetWorkState.DISCONNECTED);
  460. }
  461. }
  462. catch (Exception err)
  463. {
  464. log.Warn("=======socket===TCPReadCallBack===Error>" + err.Message);
  465. _on_error(err);
  466. }
  467. }
  468. private long RemainingBytes()
  469. {
  470. return _memStream.Length - _memStream.Position;
  471. }
  472. private void pickpackage(byte[] bytes, int length)
  473. {
  474. _memStream.Seek(0, SeekOrigin.End);
  475. _memStream.Write(bytes, 0, length);
  476. _memStream.Seek(0, SeekOrigin.Begin);
  477. while (RemainingBytes() >= RecvMessage.FIXED_HEAD_SIZE)
  478. {
  479. PackageType PkgType = (PackageType)_reader.ReadByte();
  480. int PkgLength = (_reader.ReadByte() << 16) + (_reader.ReadByte() << 8) + _reader.ReadByte();
  481. long remainingLength = RemainingBytes();
  482. _memStream.Position -= RecvMessage.FIXED_HEAD_SIZE;
  483. if (remainingLength < PkgLength)
  484. {
  485. break;
  486. }
  487. int msgSize = RecvMessage.FIXED_HEAD_SIZE + PkgLength;
  488. RecvMessage recv_object = RecvMessage.Alloc();
  489. recv_object.stateSocket = socket;
  490. recv_object.Stream.Write(_reader.ReadBytes(msgSize), 0, msgSize);
  491. recv_object.DecodeHead();
  492. //Enqueue
  493. _received_heartbeat();
  494. AddMsg(recv_object);
  495. }
  496. var leftover = _reader.ReadBytes((int)RemainingBytes());
  497. _memStream.SetLength(0); //Clear
  498. _memStream.Write(leftover, 0, leftover.Length);
  499. }
  500. #endregion
  501. #region <<BeginSend 方式发送>>
  502. private void startSend(SendMessage send_object)
  503. {
  504. try
  505. {
  506. if (Connected)
  507. {
  508. socket.GetStream().BeginWrite(send_object.Buffer, 0, send_object.BufferLength, endSend, send_object);
  509. }
  510. }
  511. catch (Exception err)
  512. {
  513. send_object.Dispose();
  514. log.Warn("startSend Error>" + err.Message);
  515. }
  516. }
  517. private void endSend(IAsyncResult asyncSend)
  518. {
  519. var send_object = asyncSend.AsyncState as SendMessage;
  520. try
  521. {
  522. if (!Connected)
  523. {
  524. return;
  525. }
  526. socket.GetStream().EndWrite(asyncSend);
  527. }
  528. catch (Exception err)
  529. {
  530. CloseSocket();
  531. }
  532. finally
  533. {
  534. send_object.Dispose();
  535. }
  536. }
  537. #endregion
  538. private void AddMsg(Message msg)
  539. {
  540. if (msg != null)
  541. {
  542. lock (MsgQueue)
  543. {
  544. MsgQueue.Enqueue(msg);
  545. }
  546. }
  547. }
  548. private void ProcessMsg()
  549. {
  550. lock (MsgQueue)
  551. {
  552. long last = CUtils.CurrentTimeMS;
  553. while (MsgQueue.Count > 0)
  554. {
  555. if(CUtils.CurrentTimeMS - last > 100)
  556. {
  557. break;
  558. }
  559. var recv_object = MsgQueue.Dequeue();
  560. if (recv_object != null)
  561. {
  562. _received_package(recv_object);
  563. }
  564. }
  565. }
  566. }
  567. private void ClearMsgQueue()
  568. {
  569. lock(MsgQueue)
  570. {
  571. while (MsgQueue.Count > 0)
  572. {
  573. var recv_object = MsgQueue.Dequeue();
  574. if (recv_object != null)
  575. {
  576. recv_object.Dispose();
  577. }
  578. }
  579. }
  580. }
  581. #region ipv4||6
  582. ////////////////////////////////////
  583. [DllImport("__Internal")]
  584. private static extern string getIPv6(string mHost, string mPort);
  585. public static string GetIPv6(string mHost, string mPort)
  586. {
  587. #if UNITY_IPHONE && !UNITY_EDITOR
  588. string mIPv6 = getIPv6(mHost, mPort);
  589. return mIPv6;
  590. #else
  591. return mHost + "&&ipv4";
  592. #endif
  593. }
  594. private void GetIPType(string serverIp, string serverPorts, out string newServerIp, out AddressFamily mIPType)
  595. {
  596. mIPType = AddressFamily.InterNetwork;
  597. newServerIp = serverIp;
  598. try
  599. {
  600. var mIPv6 = GetIPv6(serverIp, serverPorts);
  601. if (!string.IsNullOrEmpty(mIPv6))
  602. {
  603. var m_StrTemp = Regex.Split(mIPv6, "&&");
  604. if (m_StrTemp != null && m_StrTemp.Length >= 2)
  605. {
  606. var IPType = m_StrTemp[1];
  607. if (IPType == "ipv6")
  608. {
  609. newServerIp = m_StrTemp[0];
  610. mIPType = AddressFamily.InterNetworkV6;
  611. }
  612. }
  613. }
  614. }
  615. catch (Exception e)
  616. {
  617. log.Log("GetIPv6 error:" + e);
  618. }
  619. }
  620. #endregion
  621. }
  622. }