NetSessionAsync.cs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552
  1. using System;
  2. using System.Collections;
  3. using System.Collections.Generic;
  4. using System.Net.Sockets;
  5. using System.IO;
  6. using System.Threading;
  7. using CommonNetwork.Net;
  8. using CommonLang.IO;
  9. using CommonLang.ByteOrder;
  10. using CommonLang.Protocol;
  11. using System.Net;
  12. using CommonLang.Net;
  13. using CommonLang;
  14. using CommonLang.Log;
  15. namespace CommonNetwork.Sockets
  16. {
  17. public class NetSessionAsync : BaseNetSession
  18. {
  19. private Logger log = LoggerFactory.GetLogger("NetSessionAsync");
  20. private Socket mTCP = null;
  21. protected INetPackageCodec mCodec;
  22. private INetSessionListener mListener;
  23. private Queue<Object> mSendQueue = new Queue<Object>();
  24. public NetSessionAsync()
  25. {
  26. }
  27. /// <summary>
  28. /// 判断当前网络是否已经连接
  29. /// </summary>
  30. /// <returns></returns>
  31. public override bool IsConnected
  32. {
  33. get
  34. {
  35. Socket tcp = mTCP;
  36. if (tcp != null)
  37. {
  38. return tcp.Connected;
  39. }
  40. return false;
  41. }
  42. }
  43. public override INetPackageCodec Codec
  44. {
  45. get { return mCodec; }
  46. }
  47. public override IPEndPoint RemoteAddress
  48. {
  49. get
  50. {
  51. if (mTCP != null) return mTCP.RemoteEndPoint as IPEndPoint;
  52. return null;
  53. }
  54. }
  55. public Socket Session
  56. {
  57. get { return mTCP; }
  58. }
  59. private void onException(Exception err)
  60. {
  61. mListener.onError(this, err);
  62. if (mOnError != null)
  63. {
  64. mOnError.Invoke(this, err);
  65. }
  66. }
  67. //-------------------------------------------------------------------------------------
  68. #region Open
  69. public override bool Open(string url, INetPackageCodec codec, INetSessionListener listener)
  70. {
  71. bool ret = false;
  72. try
  73. {
  74. lock (this)
  75. {
  76. if (mTCP == null)
  77. {
  78. this.mCodec = codec;
  79. this.mListener = listener;
  80. this.mURL = url;
  81. string[] url_kv = url.Split(':');
  82. //this.mRemoteAddress = IPUtil.ToEndPoint(url_kv[0], int.Parse(url_kv[1]));
  83. //new IPEndPoint(IPAddress.Parse(url_kv[0]), int.Parse(url_kv[1]));
  84. lock (mSendQueue) this.mSendQueue.Clear();
  85. // 建立SOCKET链接
  86. this.mTCP = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  87. //this.mTCP.ReceiveTimeout = 5000;
  88. //this.mTCP.SendTimeout = 5000;
  89. this.mTCP.NoDelay = true;
  90. this.mTCP.BeginConnect(url_kv[0], int.Parse(url_kv[1]), endConnect, mTCP);
  91. //创建读写线程对象
  92. ret = true;
  93. }
  94. }
  95. }
  96. catch (Exception err)
  97. {
  98. log.Error(err.Message, err);
  99. onException(new NetException("\n[Open:]" + URL +
  100. "\n[InnerException:]" + err.InnerException +
  101. "\n[Exception:]" + err.Message +
  102. "\n[Source:]" + err.Source +
  103. "\n[StackTrace:]" + err.StackTrace));
  104. }
  105. return ret;
  106. }
  107. private void endConnect(IAsyncResult result)
  108. {
  109. mTCP.EndConnect(result);
  110. if (result.IsCompleted)
  111. {
  112. onOpen();
  113. startReceiveHead();
  114. }
  115. else
  116. {
  117. Close();
  118. }
  119. }
  120. private void onOpen()
  121. {
  122. mListener.sessionOpened(this);
  123. if (mOnSessionOpened != null)
  124. {
  125. mOnSessionOpened.Invoke(this);
  126. }
  127. }
  128. #endregion
  129. //-------------------------------------------------------------------------------------
  130. #region Close
  131. public override bool Close()
  132. {
  133. bool ret = false;
  134. lock (this)
  135. {
  136. if (mTCP != null)
  137. {
  138. try
  139. {
  140. this.mTCP.Close();
  141. }
  142. catch (Exception err)
  143. {
  144. log.Error(err.Message, err);
  145. }
  146. finally
  147. {
  148. lock (mSendQueue)
  149. {
  150. this.mSendQueue.Clear();
  151. }
  152. }
  153. this.mTCP = null;
  154. ret = true;
  155. onClose();
  156. }
  157. }
  158. return ret;
  159. }
  160. private void onClose()
  161. {
  162. lock (mSendQueue)
  163. {
  164. mSendQueue.Clear();
  165. }
  166. mListener.sessionClosed(this);
  167. if (mOnSessionClosed != null)
  168. {
  169. mOnSessionClosed.Invoke(this);
  170. }
  171. }
  172. #endregion
  173. //-------------------------------------------------------------------------------------
  174. //-------------------------------------------------------------------------------------
  175. //-------------------------------------------------------------------------------------
  176. //-------------------------------------------------------------------------------------
  177. //-------------------------------------------------------------------------------------------------------------------------------
  178. #region Send
  179. /// <summary>
  180. /// 发送一个消息,该方法将立即返回。
  181. /// </summary>
  182. /// <param name="data"></param>
  183. public override void Send(Object data)
  184. {
  185. lock (this)
  186. {
  187. if (mTCP != null)
  188. {
  189. lock (mSendQueue)
  190. {
  191. mSendQueue.Enqueue(data);
  192. }
  193. // 通知写线程开始工作。
  194. startSend();
  195. }
  196. }
  197. }
  198. public override void SendResponse(IMessage rsponse, int requestMessageID)
  199. {
  200. rsponse.MessageID = requestMessageID;
  201. Send(rsponse);
  202. }
  203. private void onSent(Object message)
  204. {
  205. mListener.messageSent(this, message);
  206. if (mOnMessageSent != null)
  207. {
  208. mOnMessageSent.Invoke(this, message);
  209. }
  210. }
  211. private class SendObject
  212. {
  213. public readonly List<object> sending = new List<object>();
  214. public readonly MemoryStream buffer = new MemoryStream(1024);
  215. }
  216. private class SendObjectPool
  217. {
  218. private ObjectPool<SendObject> s_Pool = new ObjectPool<SendObject>(s_ListPool_OnCreate);
  219. private static SendObject s_ListPool_OnCreate()
  220. {
  221. return new SendObject();
  222. }
  223. public SendObject Alloc()
  224. {
  225. SendObject ret = s_Pool.Get();
  226. ret.buffer.Position = 0;
  227. ret.buffer.SetLength(0);
  228. ret.sending.Clear();
  229. return ret;
  230. }
  231. public void Release(SendObject toRelease)
  232. {
  233. toRelease.sending.Clear();
  234. s_Pool.Release(toRelease);
  235. }
  236. }
  237. private SendObjectPool mSendPool = new SendObjectPool();
  238. private void startSend()
  239. {
  240. var sending = mSendPool.Alloc();
  241. try
  242. {
  243. lock (mSendQueue)
  244. {
  245. if (mSendQueue.Count > 0)
  246. {
  247. sending.sending.AddRange(mSendQueue);
  248. mSendQueue.Clear();
  249. }
  250. }
  251. if (sending.sending.Count > 0 && mTCP.Connected)
  252. {
  253. for (int i = 0; i < sending.sending.Count; i++)
  254. {
  255. object send_msg = sending.sending[i];
  256. doEncode(sending.buffer, send_msg);
  257. }
  258. }
  259. mTCP.BeginSend(sending.buffer.GetBuffer(), 0, (int)sending.buffer.Length, SocketFlags.None, endSend, sending);
  260. }
  261. catch (Exception err)
  262. {
  263. mSendPool.Release(sending);
  264. log.Error(err.Message, err);
  265. onException(new NetException("\n[runWrite:]" + URL +
  266. "\n[InnerException:]" + err.InnerException +
  267. "\n[Exception:]" + err.Message +
  268. "\n[Source:]" + err.Source +
  269. "\n[StackTrace:]" + err.StackTrace, err));
  270. this.Close();
  271. }
  272. }
  273. private void endSend(IAsyncResult result)
  274. {
  275. if (result.IsCompleted)
  276. {
  277. var sending = result.AsyncState as SendObject;
  278. try
  279. {
  280. int length = mTCP.EndSend(result);
  281. if (length > 0)
  282. {
  283. mSendBytes += length;
  284. mSendPacks += sending.sending.Count;
  285. for (int i = 0; i < sending.sending.Count; i++)
  286. {
  287. object send_msg = sending.sending[i];
  288. onSent(send_msg);
  289. }
  290. sending.sending.Clear();
  291. }
  292. else
  293. {
  294. Close();
  295. }
  296. }
  297. catch (Exception err)
  298. {
  299. log.Error(err.Message, err);
  300. onException(new NetException("endReceive: " + err.Message));
  301. this.Close();
  302. }
  303. finally
  304. {
  305. mSendPool.Release(sending);
  306. }
  307. }
  308. else
  309. {
  310. log.Info("Continue send !");
  311. }
  312. }
  313. #endregion
  314. //-------------------------------------------------------------------------------------------------------------------------------
  315. #region Receive
  316. private void onReceive(Object message)
  317. {
  318. try
  319. {
  320. mListener.messageReceived(this, message);
  321. if (mOnMessageReceived != null)
  322. {
  323. mOnMessageReceived.Invoke(this, message);
  324. }
  325. }
  326. catch (Exception err)
  327. {
  328. log.Error(err.Message, err);
  329. onException(err);
  330. }
  331. }
  332. private class ReceiveObject
  333. {
  334. public readonly byte[] head = new byte[4];
  335. public int head_position = 0;
  336. public int body_length = 0;
  337. public int body_position = 0;
  338. public readonly MemoryStream body_buffer = new MemoryStream(1024);
  339. }
  340. private class ReceiveObjectPool
  341. {
  342. private ObjectPool<ReceiveObject> s_Pool = new ObjectPool<ReceiveObject>(s_ListPool_OnCreate);
  343. private static ReceiveObject s_ListPool_OnCreate()
  344. {
  345. return new ReceiveObject();
  346. }
  347. public ReceiveObject Alloc()
  348. {
  349. ReceiveObject ret = s_Pool.Get();
  350. ret.body_buffer.Position = 0;
  351. ret.body_buffer.SetLength(0);
  352. ret.head_position = 0;
  353. ret.body_length = 0;
  354. ret.body_position = 0;
  355. return ret;
  356. }
  357. public void Release(ReceiveObject toRelease)
  358. {
  359. s_Pool.Release(toRelease);
  360. }
  361. }
  362. private ReceiveObjectPool mReceivePool = new ReceiveObjectPool();
  363. private void startReceiveHead()
  364. {
  365. var recv_object = mReceivePool.Alloc();
  366. try
  367. {
  368. mTCP.BeginReceive(
  369. recv_object.head,
  370. recv_object.head_position,
  371. recv_object.head.Length - recv_object.head_position,
  372. SocketFlags.None, endReceiveHead, recv_object);
  373. }
  374. catch (Exception err)
  375. {
  376. mReceivePool.Release(recv_object);
  377. log.Error(err.Message, err);
  378. onException(new NetException("endReceive: " + err.Message));
  379. this.Close();
  380. }
  381. }
  382. private void endReceiveHead(IAsyncResult result)
  383. {
  384. var recv_object = result.AsyncState as ReceiveObject;
  385. try
  386. {
  387. int length = mTCP.EndReceive(result);
  388. if (length > 0)
  389. {
  390. mRecvBytes += length;
  391. recv_object.head_position += length;
  392. if (recv_object.head_position == recv_object.head.Length)
  393. {
  394. recv_object.body_length = GetBodyLength(recv_object.head);
  395. recv_object.body_position = 0;
  396. if (recv_object.body_buffer.Capacity < recv_object.body_length)
  397. {
  398. recv_object.body_buffer.Capacity = recv_object.body_length;
  399. }
  400. recv_object.body_buffer.SetLength(recv_object.body_length);
  401. startReceiveBody(recv_object);
  402. }
  403. else if (recv_object.head_position > recv_object.head.Length)
  404. {
  405. throw new NetException("Receive head overfollow");
  406. }
  407. else
  408. {
  409. startReceiveHead();
  410. }
  411. }
  412. else
  413. {
  414. throw new NetException("Receive 0 bytes!");
  415. }
  416. }
  417. catch (Exception err)
  418. {
  419. mReceivePool.Release(recv_object);
  420. log.Error(err.Message, err);
  421. onException(new NetException("endReceive: " + err.Message));
  422. this.Close();
  423. }
  424. }
  425. private void startReceiveBody(ReceiveObject recv_object)
  426. {
  427. try
  428. {
  429. mTCP.BeginReceive(
  430. recv_object.body_buffer.GetBuffer(),
  431. recv_object.body_position,
  432. recv_object.body_length - recv_object.body_position,
  433. SocketFlags.None, endReceiveBody, recv_object);
  434. }
  435. catch (Exception err)
  436. {
  437. mReceivePool.Release(recv_object);
  438. log.Error(err.Message, err);
  439. onException(new NetException("endReceive: " + err.Message));
  440. this.Close();
  441. }
  442. }
  443. private void endReceiveBody(IAsyncResult result)
  444. {
  445. var recv_object = result.AsyncState as ReceiveObject;
  446. try
  447. {
  448. int length = mTCP.EndReceive(result);
  449. if (length > 0)
  450. {
  451. mRecvBytes += length;
  452. recv_object.body_position += length;
  453. if (recv_object.body_position == recv_object.body_length)
  454. {
  455. Object msg = null;
  456. var input = recv_object.body_buffer;
  457. input.Position = 0;
  458. if (doDecode(input, out msg))
  459. {
  460. mRecvPacks++;
  461. onReceive(msg);
  462. }
  463. recv_object.head_position = 0;
  464. startReceiveHead();
  465. mReceivePool.Release(recv_object);
  466. }
  467. else if (recv_object.body_position > recv_object.body_length)
  468. {
  469. throw new NetException("Receive body overfollow");
  470. }
  471. else
  472. {
  473. startReceiveBody(recv_object);
  474. }
  475. }
  476. else
  477. {
  478. throw new NetException("Receive 0 bytes!");
  479. }
  480. }
  481. catch (Exception err)
  482. {
  483. mReceivePool.Release(recv_object);
  484. log.Error(err.Message, err);
  485. onException(new NetException("endReceive: " + err.Message));
  486. this.Close();
  487. }
  488. }
  489. #endregion
  490. //-------------------------------------------------------------------------------------------------------------------------------
  491. protected virtual int GetBodyLength(byte[] buffer)
  492. {
  493. int pos = 0;
  494. int length = LittleEdian.GetS32(buffer, ref pos);
  495. return length;
  496. }
  497. protected virtual bool doEncode(Stream output, object send_msg)
  498. {
  499. long old_position = output.Position;
  500. LittleEdian.PutS32(output, 0);
  501. if (Codec.doEncode(output, send_msg))
  502. {
  503. int full_length = (int)(output.Position - old_position);
  504. output.Position = old_position;
  505. LittleEdian.PutS32(output, full_length - 4);
  506. output.Position = old_position + full_length;
  507. return true;
  508. }
  509. return false;
  510. }
  511. protected virtual bool doDecode(Stream input, out object msg)
  512. {
  513. if (Codec.doDecode(input, out msg))
  514. {
  515. if (input.Position != input.Length)
  516. {
  517. throw new Exception(string.Format("can not decode full trunk size={0} type={1}", input.Length, msg != null ? msg.GetType().FullName : msg));
  518. }
  519. return true;
  520. }
  521. return false;
  522. }
  523. }
  524. }