using CommonLang; using CommonLang.ByteOrder; using CommonLang.IO; using CommonLang.Net; using CommonLang.Protocol; using CommonNetwork.Net; using CommonNetwork.Utils; using SuperSocket.ClientEngine; using System; using System.Collections; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; namespace CommonNetwork_SSocket.SuperSocket { public class SNetSession : INetSession { private string mURL; private IPEndPoint mRemoteAddress; private AsyncTcpSession mTCP = null; protected INetPackageCodec mCodec; private INetSessionListener mListener; private HashMap mAttributes = new HashMap(); private long mSendBytes = 0; private long mRecvBytes = 0; private long mSendPacks = 0; private long mRecvPacks = 0; public SNetSession() { } public long TotalSentBytes { get { return mSendBytes; } } public long TotalRecvBytes { get { return mRecvBytes; } } public long TotalSentPackages { get { return mSendPacks; } } public long TotalRecvPackages { get { return mRecvPacks; } } public string URL { get { return mURL; } } public IPEndPoint RemoteAddress { get { return mRemoteAddress; } } /// /// 判断当前网络是否已经连接 /// /// public bool IsConnected { get { AsyncTcpSession tcp = mTCP; if (tcp != null) { return tcp.IsConnected; } return false; } } public bool Open(string url, INetPackageCodec codec, INetSessionListener listener) { bool ret = false; try { lock (this) { if (mTCP == null) { this.mCodec = codec; this.mListener = listener; this.mURL = url; string[] url_kv = url.Split(':'); this.mRemoteAddress = IPUtil.ToEndPoint(url_kv[0], int.Parse(url_kv[1]));//new IPEndPoint(IPAddress.Parse(url_kv[0]), int.Parse(url_kv[1])); //this.mLocalAddress = IPUtil.CreateLocalConnectorEndPoint();//new IPEndPoint(IPAddress.Parse("127.0.0.1"), 0); //this.mRecvQueue.Clear(); // 建立SOCKET链接 this.mTCP = new AsyncTcpSession(mRemoteAddress); this.mTCP.Closed += mTCP_Closed; this.mTCP.Connected += mTCP_Connected; this.mTCP.DataReceived += mTCP_DataReceived; this.mTCP.Error += mTCP_Error; this.mTCP.NoDeplay = true; this.mTCP.Connect(); ret = true; return ret; } } } catch (Exception err) { onException(new NetException("/n[Open:]" + URL + "/n[InnerException:]" + err.InnerException + "/n[Exception:]" + err.Message + "/n[Source:]" + err.Source + "/n[StackTrace:]" + err.StackTrace)); } return ret; } private void mTCP_Error(object sender, global::SuperSocket.ClientEngine.ErrorEventArgs e) { try { this.onException(e.Exception); } catch (Exception err) { } } private void mTCP_Connected(object sender, EventArgs e) { try { this.onOpen(); } catch (Exception err) { } } private void mTCP_Closed(object sender, EventArgs e) { try { this.onClose(); } catch (Exception err) { } } private void mTCP_DataReceived(object sender, DataEventArgs e) { try { doDecode(e); } catch (Exception err) { try { this.onException(err); } catch (Exception err2) { } } } public bool Close() { bool ret = false; lock (this) { if (mTCP != null) { try { this.mTCP.Close(); } catch (Exception err) { } this.mTCP = null; ret = true; } } return ret; } public void Dispose() { Close(); } //------------------------------------------------------------------------------------- /// /// 发送一个消息,该方法将立即返回。 /// /// public void Send(Object data) { if (mTCP != null) { ArraySegment send; if (doEncode(data, out send)) { if (mTCP.TrySend(send)) { onSent(data); } } } } public void SendResponse(IMessage rsponse, int requestMessageID) { rsponse.MessageID = requestMessageID; Send(rsponse); } //------------------------------------------------------------------------------------- public object GetAttribute(string key) { return mAttributes[key]; } public void SetAttribute(string key, object value) { mAttributes[key] = value; } public void RemoveAttribute(string key) { mAttributes.Remove(key); } public bool ContainsAttribute(string key) { return mAttributes.ContainsKey(key); } public ICollection GetAttributeKeys() { return mAttributes.Keys; } override public string ToString() { return "Session[" + URL + "](" + GetHashCode() + ")"; } //------------------------------------------------------------------------------------- private void onClose() { mListener.sessionClosed(this); if (mOnSessionClosed != null) { mOnSessionClosed.Invoke(this); } } private void onOpen() { mListener.sessionOpened(this); if (mOnSessionOpened != null) { mOnSessionOpened.Invoke(this); } } private void onReceive(Object message) { try { mListener.messageReceived(this, message); if (mOnMessageReceived != null) { mOnMessageReceived.Invoke(this, message); } } catch (Exception err) { onException(err); } } private void onSent(Object message) { mListener.messageSent(this, message); if (mOnMessageSent != null) { mOnMessageSent.Invoke(this, message); } } private void onException(Exception err) { mListener.onError(this, err); if (mOnError != null) { mOnError.Invoke(this, err); } } //------------------------------------------------------------------------------------- private OnSessionOpenedHandler mOnSessionOpened; private OnSessionClosedHandler mOnSessionClosed; private OnMessageReceivedHandler mOnMessageReceived; private OnMessageSentHandler mOnMessageSent; private OnErrorHandler mOnError; public event OnSessionOpenedHandler OnSessionOpened { add { mOnSessionOpened += value; } remove { mOnSessionOpened -= value; } } public event OnSessionClosedHandler OnSessionClosed { add { mOnSessionClosed += value; } remove { mOnSessionClosed -= value; } } public event OnMessageReceivedHandler OnMessageReceived { add { mOnMessageReceived += value; } remove { mOnMessageReceived -= value; } } public event OnMessageSentHandler OnMessageSent { add { mOnMessageSent += value; } remove { mOnMessageSent -= value; } } public event OnErrorHandler OnError { add { mOnError += value; } remove { mOnError -= value; } } //------------------------------------------------------------------------------------- private MemoryStream send_buff = new MemoryStream(8192); private ReceiveBuffer recv_buff = new ReceiveBuffer(8192); private Queue decoding_queue = new Queue(); protected virtual bool doEncode(object send_msg, out ArraySegment send) { lock (send_buff) { send_buff.Position = 0; LittleEdian.PutS32(send_buff, 0); if (mCodec.doEncode(send_buff, send_msg)) { send = new ArraySegment(send_buff.GetBuffer(), 0, (int)send_buff.Position); send_buff.Position = 0; LittleEdian.PutS32(send_buff, send.Count - 4); return true; } } throw new Exception("Can not encode message : " + send_msg); } private void doDecode(DataEventArgs input) { lock (recv_buff) { recv_buff.Put(input.Data, input.Offset, input.Length); recv_buff.Begin(); object message; while (doDecodeOnce(recv_buff, out message)) { recv_buff.Over(); if (message != null) { lock (decoding_queue) { decoding_queue.Enqueue(message); } } } } lock (decoding_queue) { while (decoding_queue.Count > 0) { onReceive(decoding_queue.Dequeue()); } } } protected virtual bool doDecodeOnce(ReceiveBuffer buff, out object message) { if (buff.Remaining >= 4) { int length = LittleEdian.GetS32(recv_buff.Stream); if (buff.Remaining >= length) { mCodec.doDecode(buff.Stream, out message); return true; } } message = null; return false; } } }