using System; using System.IO; using System.Collections.Generic; using System.Linq; using System.Text; using CommonNetwork_ICE.Util; using CommonServer.Server; using CommonLang.Protocol; using CommonLang.IO; using CommonServer_ICE.Msg; using CommonServer.Protocol; using CommonNetwork_ICE.Common; using Slice; using CommonLang.Log; using CommonLang.Concurrent; using CommonLang.ByteOrder; using CommonLang.Property; using CommonServer_ICE.Server; namespace CommonServer_ICE.Session { /// /// 网络会话对象实现,客户对象通过回话对象可以发送消息,存储会话级变量 /// internal class IceServerIoSession : ICombatSessionDisp_, ISession { private static Logger log = LoggerFactory.GetLogger("IceServerIoSession"); // 会话级变量存储器 private Dictionary attributes = new Dictionary(); // 会话监听列表 private List sessionListenerList = null; // 会话ID编号器 private static AtomicInteger idGenerator = new AtomicInteger(1); // 会话消息序列号编号器 private AtomicInteger msgSerialGenerator = new AtomicInteger(1); // 服务器对象 private IceConnectServer server; // 发送时的锁定对象 private Object sendLock = new Object(); // 服务器数据发送接口对象 private ServerCallbackPrx callback; private Ice.ObjectAdapter iceTcpAdapter; public IceServerIoSession(IceConnectServer server) { sessionListenerList = new List(); // 记录客户端最后一个发送的可靠消息包的序号 RecvLastSerial = -1; // 记录服务器最后发送的可靠消息后包的序号 SendedLastSerial = -1; // 记录服务器发送可靠消息后客户端返回的最后一个收到的可靠消息包的应答序号 SendedRecvLastSerial = -1; ID = idGenerator.GetAndIncrement().ToString(); LastReadTime = DateTime.Now; LastWriteTime = DateTime.Now; LastHingeSendTime = DateTime.Now; this.server = server; this.Codec = new IceServerMessageCodecImpl(server.PackageCodec, this); if (server.getCommType() == Constants.COMM_TYPE_TCP) { IsConnected = true; } } public string ID { private set; get; } public bool IsConnected { set; get; } public ISessionListener Listener { set; get; } public IceConnector Connector { set; get; } // Ice编码器 public IceMessageCodec Codec { get; private set; } public long RecvLastSerial { set; get; } public long SendedLastSerial { set; get; } public long SendedRecvLastSerial { set; get; } internal SenderPrx Sender { set; get; } /// /// 最后的一次收到消息的时间 /// internal DateTime LastReadTime { set; get; } /// /// 最近的一次发送消息的时间 /// internal DateTime LastWriteTime { set; get; } public void bindSessionListener(ISessionListener sessionListener) { sessionListenerList.Add(sessionListener); Listener = sessionListener; } public void OnSessionStarted() { log.Trace("新的网络会话启动, 编号 = " + this.ID); foreach (ISessionListener sessionListener in sessionListenerList) { sessionListener.OnConnected(this); } } public void OnSessionClosed(String reason) { log.Trace("网络会话关闭, 编号 = " + this.ID + ", 关闭原因 = " + reason.ToString()); foreach (ISessionListener sessionListener in sessionListenerList) { sessionListener.OnDisconnected(this, true, reason.ToString()); } } public void HandleException(Exception e) { foreach (ISessionListener sessionListener in sessionListenerList) { sessionListener.OnError(this, e); } } public void HandleUnknownRequest() { foreach (ISessionListener sessionListener in sessionListenerList) { sessionListener.OnError(this, new Exception("非法的请求!")); } } /// /// 关闭通讯通道 /// /// 是否强制 /// 是否正常关闭 public bool Disconnect(bool force) { lock (sendLock) { if (!IsConnected) { return true; } this.IsConnected = false; } log.Info("会话关闭,编号 = " + ID); if (server.getCommType() == Constants.COMM_TYPE_UDP) { SessionManager.removeSession(this); Dispose(); } else if (server.getCommType() == Constants.COMM_TYPE_TCP) { TcpSessionManager.RemoveSession(this); if (iceTcpAdapter != null) { try { iceTcpAdapter.remove(this.ClientIdentity); } catch (Exception e) { log.Error("会话TCP连接关闭发生异常,SessionId = " + ID + " 异常内容:" + e.Message); } } } // 通知会话关闭 foreach (ISessionListener sessionListener in sessionListenerList) { try { sessionListener.OnDisconnected(this, force, "主动关闭"); } catch (Exception e) { log.Error("会话关闭发生异常,SessionId = " + ID + " 异常内容:" + e.Message); } } return true; } /// /// 销毁占用的资源 /// private void Dispose() { if (Connector != null) { Connector.Destroy(); Connector = null; } } /// /// 通知消息已经发送,触发OnSent事件 /// /// internal void NotifySentMsg(IMessage message) { foreach (ISessionListener sessionListener in sessionListenerList) { sessionListener.OnSentMessage(this, message); } } /// /// 发送数据接口 /// /// /// public bool Send(IMessage message) { if (message == null) { return true; } if (!this.IsConnected) { return false; } // 消息转码 TransMessage transMessage; try { Codec.doEncode(message, out transMessage); } catch (Exception e) { onError(e); return false; } transMessage.serial = msgSerialGenerator.GetAndIncrement(); // 发送 long startTickCount = Environment.TickCount; if (server.getCommType() == Constants.COMM_TYPE_TCP) { return SendTcpMsg(message, transMessage); } else if (server.getCommType() == Constants.COMM_TYPE_UDP) { return SendUdpMsg(message, transMessage); } long endTickCount = Environment.TickCount; long interval = endTickCount - startTickCount; if (interval > Env.MSG_PROC_TIME_OUT) { log.Info("服务器消息发送超时,用时【" + interval + "】,消息编号【" + message.GetType() + "】,客户端地址IP【" + this.RemoteIp + "】,端口【" + this.ClientSentDataPort + "】"); } return false; } /// /// 发送TCP消息 /// /// /// /// private bool SendTcpMsg(IMessage message, TransMessage transMessage) { try { callback.begin_ServerToClient(transMessage); } catch (Exception e) { log.Error("服务器发送数据发生异常,SessionId = " + ID + " 消息类型:" + message.GetType() + " 异常内容:" + e.Message); onError(e); return false; } TotalSentBytes += transMessage.length; LastWriteTime = DateTime.Now; return true; } /// /// 发送UDP消息 /// /// /// private bool SendUdpMsg(IMessage message, TransMessage transMessage) { if (transMessage.type == Constants.PACKET_HINGE) { SeverSendMsgManager.AddPacket(this, message, transMessage); } else { // 非关键包立即发送 bool successed = SendTo(transMessage); if (successed) { NotifySentMsg(message); } return successed; } return true; } public bool SendTo(TransMessage transMessage) { try { Sender.SendData(this.ClientRecvDataPort, transMessage); } catch (Exception e) { log.Error("服务器发送数据发生异常,SessionId = " + ID + " 异常内容:" + e.Message); onError(e); return false; } LastWriteTime = DateTime.Now; return true; } private void onError(Exception e) { foreach (ISessionListener sessionListener in sessionListenerList) { sessionListener.OnError(this, e); } } public bool SendResponse(IMessage request, IMessage response) { response.MessageID = request.MessageID; return Send(response); } /// /// 接收接口 /// /// public void Receive(IMessage message) { LastReadTime = DateTime.Now; long startTickCount = Environment.TickCount; // 通知监听器,有数据接收到 foreach (ISessionListener sessionListener in sessionListenerList) { try { sessionListener.OnReceivedMessage(this, message); } catch (Exception e) { log.Error("会话消息处理发生异常,SessionId = " + ID + " 异常内容:" + e.Message); } } long endTickCount = Environment.TickCount; long interval = endTickCount - startTickCount; if (interval > Env.MSG_PROC_TIME_OUT) { log.Info("服务器消息处理超时,用时【" + interval + "】,消息编号【" + message.GetType() + "】,客户端地址IP【" + this.RemoteIp + "】,端口【" + this.ClientSentDataPort + "】"); } } public string GetRemoteAddress() { return RemoteIp; } public object GetAttribute(string key) { if (attributes.ContainsKey(key)) { return attributes[key]; } return null; } public void SetAttribute(string key, object value) { attributes.Add(key, value); } public object RemoveAttribute(string key) { if (attributes.ContainsKey(key)) { return attributes.Remove(key); } return true; } public bool ContainsAttribute(string key) { return attributes.ContainsKey(key); } public ICollection GetAttributeKeys() { return attributes.Keys; } public long TotalSentBytes { set; get; } public long TotalRecvBytes { set; get; } /// /// 最后一次发送关键消息包的时间 /// public DateTime LastHingeSendTime { set; get; } /// /// 客户端IP /// public String RemoteIp { set; get; } /// /// 客户端向服务器发送数据的端口 /// public int ClientSentDataPort { set; get; } /// /// 客户端接收服务器数据的端口 /// public int ClientRecvDataPort { set; get; } /// /// Ice传输解码器 /// private class IceServerMessageCodecImpl : IceMessageCodec { public static int DEFAULT_BUFFER_SIZE = 1024; private IPackageCodec codec = null; private IceServerIoSession session; public IceServerMessageCodecImpl(IPackageCodec codec, IceServerIoSession session) { this.codec = codec; this.session = session; } public bool doDecode(TransMessage transMessage, out CommonLang.Protocol.IMessage message) { int length = transMessage.length; using (MemoryStream ms = new MemoryStream(transMessage.data)) { InputStream input_stream = new InputStream(ms, codec.Factory); if (codec.doDecode(session, input_stream, out message)) { input_stream = null; return true; } input_stream = null; } message = null; return false; } public void doEncode(CommonLang.Protocol.IMessage message, out TransMessage transMessage) { transMessage = new TransMessage(); IMessage nm = (IMessage)message; using (MemoryStream ms = new MemoryStream(DEFAULT_BUFFER_SIZE)) { OutputStream output_stream = new OutputStream(ms, codec.Factory); if (codec.doEncode(session, output_stream, message)) { int length = (int)ms.Position; transMessage.data = new byte[length]; Array.Copy(ms.GetBuffer(), transMessage.data, length); transMessage.length = length; transMessage.type = 1; } output_stream = null; } } } #region ICombatSessionDisp_ 抽象方法实现 /// /// 设定服务器端的回调 /// /// /// public override void SetCallback(ServerCallbackPrx callback, Ice.Current current__) { this.callback = callback; this.iceTcpAdapter = current__.adapter; } /// /// 接收客户端用TCP方式传输过来的消息 /// /// /// public override void ClientToServer(TransMessage message, Ice.Current current__) { TotalRecvBytes += message.length; // 消息转码 IMessage iMessage; try { Codec.doDecode(message, out iMessage); } catch (Exception e) { log.Error("消息转码异常:" + e.Message); HandleException(e); return; } try { Receive(iMessage); } catch (Exception e) { log.Error("服务器消息处理异常:" + e.Message); HandleException(e); return; } } /// /// 删除会话 /// /// Ice上下文 public override void destroy(Ice.Current current__) { Disconnect(true); } /// /// 客户端接收服务器数据的端口 /// internal TcpSessionManager TcpSessionManager { set; private get; } /// /// 客户端的ICE标识 /// internal Ice.Identity ClientIdentity { set; private get; } #endregion } }