IceServerIoSession.cs 18 KB


  1. using System;
  2. using System.IO;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Text;
  6. using CommonNetwork_ICE.Util;
  7. using CommonServer.Server;
  8. using CommonLang.Protocol;
  9. using CommonLang.IO;
  10. using CommonServer_ICE.Msg;
  11. using CommonServer.Protocol;
  12. using CommonNetwork_ICE.Common;
  13. using Slice;
  14. using CommonLang.Log;
  15. using CommonLang.Concurrent;
  16. using CommonLang.ByteOrder;
  17. using CommonLang.Property;
  18. using CommonServer_ICE.Server;
  19. namespace CommonServer_ICE.Session
  20. {
  21. /// <summary>
  22. /// 网络会话对象实现,客户对象通过回话对象可以发送消息,存储会话级变量
  23. /// </summary>
  24. internal class IceServerIoSession : ICombatSessionDisp_, ISession
  25. {
  26. private static Logger log = LoggerFactory.GetLogger("IceServerIoSession");
  27. // 会话级变量存储器
  28. private Dictionary<string, Object> attributes = new Dictionary<string, object>();
  29. // 会话监听列表
  30. private List<ISessionListener> sessionListenerList = null;
  31. // 会话ID编号器
  32. private static AtomicInteger idGenerator = new AtomicInteger(1);
  33. // 会话消息序列号编号器
  34. private AtomicInteger msgSerialGenerator = new AtomicInteger(1);
  35. // 服务器对象
  36. private IceConnectServer server;
  37. // 发送时的锁定对象
  38. private Object sendLock = new Object();
  39. // 服务器数据发送接口对象
  40. private ServerCallbackPrx callback;
  41. private Ice.ObjectAdapter iceTcpAdapter;
  42. public IceServerIoSession(IceConnectServer server)
  43. {
  44. sessionListenerList = new List<ISessionListener>();
  45. // 记录客户端最后一个发送的可靠消息包的序号
  46. RecvLastSerial = -1;
  47. // 记录服务器最后发送的可靠消息后包的序号
  48. SendedLastSerial = -1;
  49. // 记录服务器发送可靠消息后客户端返回的最后一个收到的可靠消息包的应答序号
  50. SendedRecvLastSerial = -1;
  51. ID = idGenerator.GetAndIncrement().ToString();
  52. LastReadTime = DateTime.Now;
  53. LastWriteTime = DateTime.Now;
  54. LastHingeSendTime = DateTime.Now;
  55. this.server = server;
  56. this.Codec = new IceServerMessageCodecImpl(server.PackageCodec, this);
  57. if (server.getCommType() == Constants.COMM_TYPE_TCP)
  58. {
  59. IsConnected = true;
  60. }
  61. }
  62. public string ID
  63. {
  64. private set;
  65. get;
  66. }
  67. public bool IsConnected
  68. {
  69. set;
  70. get;
  71. }
  72. public ISessionListener Listener
  73. {
  74. set;
  75. get;
  76. }
  77. public IceConnector Connector
  78. {
  79. set;
  80. get;
  81. }
  82. // Ice编码器
  83. public IceMessageCodec Codec
  84. {
  85. get;
  86. private set;
  87. }
  88. public long RecvLastSerial
  89. {
  90. set;
  91. get;
  92. }
  93. public long SendedLastSerial
  94. {
  95. set;
  96. get;
  97. }
  98. public long SendedRecvLastSerial
  99. {
  100. set;
  101. get;
  102. }
  103. internal SenderPrx Sender
  104. {
  105. set;
  106. get;
  107. }
  108. /// <summary>
  109. /// 最后的一次收到消息的时间
  110. /// </summary>
  111. internal DateTime LastReadTime
  112. {
  113. set;
  114. get;
  115. }
  116. /// <summary>
  117. /// 最近的一次发送消息的时间
  118. /// </summary>
  119. internal DateTime LastWriteTime
  120. {
  121. set;
  122. get;
  123. }
  124. public void bindSessionListener(ISessionListener sessionListener)
  125. {
  126. sessionListenerList.Add(sessionListener);
  127. Listener = sessionListener;
  128. }
  129. public void OnSessionStarted()
  130. {
  131. log.Trace("新的网络会话启动, 编号 = " + this.ID);
  132. foreach (ISessionListener sessionListener in sessionListenerList)
  133. {
  134. sessionListener.OnConnected(this);
  135. }
  136. }
  137. public void OnSessionClosed(String reason)
  138. {
  139. log.Trace("网络会话关闭, 编号 = " + this.ID + ", 关闭原因 = " + reason.ToString());
  140. foreach (ISessionListener sessionListener in sessionListenerList)
  141. {
  142. sessionListener.OnDisconnected(this, true, reason.ToString());
  143. }
  144. }
  145. public void HandleException(Exception e)
  146. {
  147. foreach (ISessionListener sessionListener in sessionListenerList)
  148. {
  149. sessionListener.OnError(this, e);
  150. }
  151. }
  152. public void HandleUnknownRequest()
  153. {
  154. foreach (ISessionListener sessionListener in sessionListenerList)
  155. {
  156. sessionListener.OnError(this, new Exception("非法的请求!"));
  157. }
  158. }
  159. /// <summary>
  160. /// 关闭通讯通道
  161. /// </summary>
  162. /// <param name="force">是否强制</param>
  163. /// <returns>是否正常关闭</returns>
  164. public bool Disconnect(bool force)
  165. {
  166. lock (sendLock)
  167. {
  168. if (!IsConnected)
  169. {
  170. return true;
  171. }
  172. this.IsConnected = false;
  173. }
  174. log.Info("会话关闭,编号 = " + ID);
  175. if (server.getCommType() == Constants.COMM_TYPE_UDP)
  176. {
  177. SessionManager.removeSession(this);
  178. Dispose();
  179. }
  180. else if (server.getCommType() == Constants.COMM_TYPE_TCP)
  181. {
  182. TcpSessionManager.RemoveSession(this);
  183. if (iceTcpAdapter != null)
  184. {
  185. try
  186. {
  187. iceTcpAdapter.remove(this.ClientIdentity);
  188. }
  189. catch (Exception e)
  190. {
  191. log.Error("会话TCP连接关闭发生异常,SessionId = " + ID + " 异常内容:" + e.Message);
  192. }
  193. }
  194. }
  195. // 通知会话关闭
  196. foreach (ISessionListener sessionListener in sessionListenerList)
  197. {
  198. try
  199. {
  200. sessionListener.OnDisconnected(this, force, "主动关闭");
  201. }
  202. catch (Exception e)
  203. {
  204. log.Error("会话关闭发生异常,SessionId = " + ID + " 异常内容:" + e.Message);
  205. }
  206. }
  207. return true;
  208. }
  209. /// <summary>
  210. /// 销毁占用的资源
  211. /// </summary>
  212. private void Dispose()
  213. {
  214. if (Connector != null)
  215. {
  216. Connector.Destroy();
  217. Connector = null;
  218. }
  219. }
  220. /// <summary>
  221. /// 通知消息已经发送,触发OnSent事件
  222. /// </summary>
  223. /// <param name="message"></param>
  224. internal void NotifySentMsg(IMessage message)
  225. {
  226. foreach (ISessionListener sessionListener in sessionListenerList)
  227. {
  228. sessionListener.OnSentMessage(this, message);
  229. }
  230. }
  231. /// <summary>
  232. /// 发送数据接口
  233. /// </summary>
  234. /// <param name="message"></param>
  235. /// <returns></returns>
  236. public bool Send(IMessage message)
  237. {
  238. if (message == null)
  239. {
  240. return true;
  241. }
  242. if (!this.IsConnected)
  243. {
  244. return false;
  245. }
  246. // 消息转码
  247. TransMessage transMessage;
  248. try
  249. {
  250. Codec.doEncode(message, out transMessage);
  251. }
  252. catch (Exception e)
  253. {
  254. onError(e);
  255. return false;
  256. }
  257. transMessage.serial = msgSerialGenerator.GetAndIncrement();
  258. // 发送
  259. long startTickCount = Environment.TickCount;
  260. if (server.getCommType() == Constants.COMM_TYPE_TCP)
  261. {
  262. return SendTcpMsg(message, transMessage);
  263. }
  264. else if (server.getCommType() == Constants.COMM_TYPE_UDP)
  265. {
  266. return SendUdpMsg(message, transMessage);
  267. }
  268. long endTickCount = Environment.TickCount;
  269. long interval = endTickCount - startTickCount;
  270. if (interval > Env.MSG_PROC_TIME_OUT)
  271. {
  272. log.Info("服务器消息发送超时,用时【" + interval + "】,消息编号【" + message.GetType() + "】,客户端地址IP【" + this.RemoteIp + "】,端口【" + this.ClientSentDataPort + "】");
  273. }
  274. return false;
  275. }
  276. /// <summary>
  277. /// 发送TCP消息
  278. /// </summary>
  279. /// <param name="message"></param>
  280. /// <param name="transMessage"></param>
  281. /// <returns></returns>
  282. private bool SendTcpMsg(IMessage message, TransMessage transMessage)
  283. {
  284. try
  285. {
  286. callback.begin_ServerToClient(transMessage);
  287. }
  288. catch (Exception e)
  289. {
  290. log.Error("服务器发送数据发生异常,SessionId = " + ID + " 消息类型:" + message.GetType() + " 异常内容:" + e.Message);
  291. onError(e);
  292. return false;
  293. }
  294. TotalSentBytes += transMessage.length;
  295. LastWriteTime = DateTime.Now;
  296. return true;
  297. }
  298. /// <summary>
  299. /// 发送UDP消息
  300. /// </summary>
  301. /// <param name="transMessage"></param>
  302. /// <returns></returns>
  303. private bool SendUdpMsg(IMessage message, TransMessage transMessage)
  304. {
  305. if (transMessage.type == Constants.PACKET_HINGE)
  306. {
  307. SeverSendMsgManager.AddPacket(this, message, transMessage);
  308. }
  309. else
  310. {
  311. // 非关键包立即发送
  312. bool successed = SendTo(transMessage);
  313. if (successed)
  314. {
  315. NotifySentMsg(message);
  316. }
  317. return successed;
  318. }
  319. return true;
  320. }
  321. public bool SendTo(TransMessage transMessage)
  322. {
  323. try
  324. {
  325. Sender.SendData(this.ClientRecvDataPort, transMessage);
  326. }
  327. catch (Exception e)
  328. {
  329. log.Error("服务器发送数据发生异常,SessionId = " + ID + " 异常内容:" + e.Message);
  330. onError(e);
  331. return false;
  332. }
  333. LastWriteTime = DateTime.Now;
  334. return true;
  335. }
  336. private void onError(Exception e)
  337. {
  338. foreach (ISessionListener sessionListener in sessionListenerList)
  339. {
  340. sessionListener.OnError(this, e);
  341. }
  342. }
  343. public bool SendResponse(IMessage request, IMessage response)
  344. {
  345. response.MessageID = request.MessageID;
  346. return Send(response);
  347. }
  348. /// <summary>
  349. /// 接收接口
  350. /// </summary>
  351. /// <param name="message"></param>
  352. public void Receive(IMessage message)
  353. {
  354. LastReadTime = DateTime.Now;
  355. long startTickCount = Environment.TickCount;
  356. // 通知监听器,有数据接收到
  357. foreach (ISessionListener sessionListener in sessionListenerList)
  358. {
  359. try
  360. {
  361. sessionListener.OnReceivedMessage(this, message);
  362. }
  363. catch (Exception e)
  364. {
  365. log.Error("会话消息处理发生异常,SessionId = " + ID + " 异常内容:" + e.Message);
  366. }
  367. }
  368. long endTickCount = Environment.TickCount;
  369. long interval = endTickCount - startTickCount;
  370. if (interval > Env.MSG_PROC_TIME_OUT)
  371. {
  372. log.Info("服务器消息处理超时,用时【" + interval + "】,消息编号【" + message.GetType() + "】,客户端地址IP【" + this.RemoteIp + "】,端口【" + this.ClientSentDataPort + "】");
  373. }
  374. }
  375. public string GetRemoteAddress()
  376. {
  377. return RemoteIp;
  378. }
  379. public object GetAttribute(string key)
  380. {
  381. if (attributes.ContainsKey(key))
  382. {
  383. return attributes[key];
  384. }
  385. return null;
  386. }
  387. public void SetAttribute(string key, object value)
  388. {
  389. attributes.Add(key, value);
  390. }
  391. public object RemoveAttribute(string key)
  392. {
  393. if (attributes.ContainsKey(key))
  394. {
  395. return attributes.Remove(key);
  396. }
  397. return true;
  398. }
  399. public bool ContainsAttribute(string key)
  400. {
  401. return attributes.ContainsKey(key);
  402. }
  403. public ICollection<string> GetAttributeKeys()
  404. {
  405. return attributes.Keys;
  406. }
  407. public long TotalSentBytes
  408. {
  409. set;
  410. get;
  411. }
  412. public long TotalRecvBytes
  413. {
  414. set;
  415. get;
  416. }
  417. /// <summary>
  418. /// 最后一次发送关键消息包的时间
  419. /// </summary>
  420. public DateTime LastHingeSendTime
  421. {
  422. set;
  423. get;
  424. }
  425. /// <summary>
  426. /// 客户端IP
  427. /// </summary>
  428. public String RemoteIp
  429. {
  430. set;
  431. get;
  432. }
  433. /// <summary>
  434. /// 客户端向服务器发送数据的端口
  435. /// </summary>
  436. public int ClientSentDataPort
  437. {
  438. set;
  439. get;
  440. }
  441. /// <summary>
  442. /// 客户端接收服务器数据的端口
  443. /// </summary>
  444. public int ClientRecvDataPort
  445. {
  446. set;
  447. get;
  448. }
  449. /// <summary>
  450. /// Ice传输解码器
  451. /// </summary>
  452. private class IceServerMessageCodecImpl : IceMessageCodec
  453. {
  454. public static int DEFAULT_BUFFER_SIZE = 1024;
  455. private IPackageCodec codec = null;
  456. private IceServerIoSession session;
  457. public IceServerMessageCodecImpl(IPackageCodec codec, IceServerIoSession session)
  458. {
  459. this.codec = codec;
  460. this.session = session;
  461. }
  462. public bool doDecode(TransMessage transMessage, out CommonLang.Protocol.IMessage message)
  463. {
  464. int length = transMessage.length;
  465. using (MemoryStream ms = new MemoryStream(transMessage.data))
  466. {
  467. InputStream input_stream = new InputStream(ms, codec.Factory);
  468. if (codec.doDecode(session, input_stream, out message))
  469. {
  470. input_stream = null;
  471. return true;
  472. }
  473. input_stream = null;
  474. }
  475. message = null;
  476. return false;
  477. }
  478. public void doEncode(CommonLang.Protocol.IMessage message, out TransMessage transMessage)
  479. {
  480. transMessage = new TransMessage();
  481. IMessage nm = (IMessage)message;
  482. using (MemoryStream ms = new MemoryStream(DEFAULT_BUFFER_SIZE))
  483. {
  484. OutputStream output_stream = new OutputStream(ms, codec.Factory);
  485. if (codec.doEncode(session, output_stream, message))
  486. {
  487. int length = (int)ms.Position;
  488. transMessage.data = new byte[length];
  489. Array.Copy(ms.GetBuffer(), transMessage.data, length);
  490. transMessage.length = length;
  491. transMessage.type = 1;
  492. }
  493. output_stream = null;
  494. }
  495. }
  496. }
  497. #region ICombatSessionDisp_ 抽象方法实现
  498. /// <summary>
  499. /// 设定服务器端的回调
  500. /// </summary>
  501. /// <param name="callback"></param>
  502. /// <param name="current__"></param>
  503. public override void SetCallback(ServerCallbackPrx callback, Ice.Current current__)
  504. {
  505. this.callback = callback;
  506. this.iceTcpAdapter = current__.adapter;
  507. }
  508. /// <summary>
  509. /// 接收客户端用TCP方式传输过来的消息
  510. /// </summary>
  511. /// <param name="message"></param>
  512. /// <param name="current__"></param>
  513. public override void ClientToServer(TransMessage message, Ice.Current current__)
  514. {
  515. TotalRecvBytes += message.length;
  516. // 消息转码
  517. IMessage iMessage;
  518. try
  519. {
  520. Codec.doDecode(message, out iMessage);
  521. }
  522. catch (Exception e)
  523. {
  524. log.Error("消息转码异常:" + e.Message);
  525. HandleException(e);
  526. return;
  527. }
  528. try
  529. {
  530. Receive(iMessage);
  531. }
  532. catch (Exception e)
  533. {
  534. log.Error("服务器消息处理异常:" + e.Message);
  535. HandleException(e);
  536. return;
  537. }
  538. }
  539. /// <summary>
  540. /// 删除会话
  541. /// </summary>
  542. /// <param name="current__">Ice上下文</param>
  543. public override void destroy(Ice.Current current__)
  544. {
  545. Disconnect(true);
  546. }
  547. /// <summary>
  548. /// 客户端接收服务器数据的端口
  549. /// </summary>
  550. internal TcpSessionManager TcpSessionManager
  551. {
  552. set;
  553. private get;
  554. }
  555. /// <summary>
  556. /// 客户端的ICE标识
  557. /// </summary>
  558. internal Ice.Identity ClientIdentity
  559. {
  560. set;
  561. private get;
  562. }
  563. #endregion
  564. }
  565. }