|
- using System;
- using System.Collections;
- using System.Collections.Generic;
- using System.Net.Sockets;
- using System.IO;
- using System.Threading;
- using CommonNetwork.Net;
- using CommonLang.IO;
- using CommonLang.ByteOrder;
- using CommonLang.Protocol;
- using System.Net;
- using CommonLang.Net;
- using CommonLang;
- using CommonLang.Log;
- namespace CommonNetwork.Sockets
- {
- public class NetSession : BaseNetSession
- {
- private Logger log = LoggerFactory.GetLogger("NetSession");
- private TcpClient mTCP = null;
- protected INetPackageCodec mCodec;
- private INetSessionListener mListener;
- private Queue<Object> mSendQueue = new Queue<Object>();
- private List<object> mSendingArray = new List<object>(11);
- private Thread mWriteThread;
- private Thread mReadThread;
- public NetSession()
- {
- }
- /// <summary>
- /// 判断当前网络是否已经连接
- /// </summary>
- /// <returns></returns>
- public override bool IsConnected
- {
- get
- {
- TcpClient tcp = mTCP;
- if (tcp != null)
- {
- return tcp.Connected;
- }
- return false;
- }
- }
- public override INetPackageCodec Codec
- {
- get { return mCodec; }
- }
- public override IPEndPoint RemoteAddress
- {
- get
- {
- if (mTCP != null) return mTCP.Client.RemoteEndPoint as IPEndPoint;
- return null;
- }
- }
- public TcpClient Session
- {
- get { return mTCP; }
- }
- public override bool Open(string url, INetPackageCodec codec, INetSessionListener listener)
- {
- bool ret = false;
- try
- {
- lock (this)
- {
- if (mTCP == null)
- {
- this.mCodec = codec;
- this.mListener = listener;
- this.mURL = url;
- string[] url_kv = url.Split(':');
- //this.mRemoteAddress = IPUtil.ToEndPoint(url_kv[0], int.Parse(url_kv[1]));
- //new IPEndPoint(IPAddress.Parse(url_kv[0]), int.Parse(url_kv[1]));
- //this.mLocalAddress = IPUtil.CreateLocalConnectorEndPoint();//new IPEndPoint(IPAddress.Parse("127.0.0.1"), 0);
- //this.mRecvQueue.Clear();
- lock (mSendQueue) this.mSendQueue.Clear();
- this.mReadThread = new Thread(new ThreadStart(this.runRead));
- this.mReadThread.IsBackground = true;
- this.mWriteThread = new Thread(new ThreadStart(this.runWrite));
- this.mWriteThread.IsBackground = true;
- // 建立SOCKET链接
- this.mTCP = new TcpClient();
- //this.mTCP.ReceiveTimeout = 5000;
- //this.mTCP.SendTimeout = 5000;
- this.send_buff = new MemoryStream(8192);
- this.recv_buff = new MemoryStream(8192);
- this.mTCP.NoDelay = true;
- this.mTCP.Connect(url_kv[0], int.Parse(url_kv[1]));
- //创建读写线程对象
- this.mReadThread.Start();
- this.mWriteThread.Start();
- ret = true;
- }
- }
- }
- catch (Exception err)
- {
- log.Error(err.Message, err);
- onException(new NetException("/n[Open:]" + URL +
- "/n[InnerException:]" + err.InnerException +
- "/n[Exception:]" + err.Message +
- "/n[Source:]" + err.Source +
- "/n[StackTrace:]" + err.StackTrace));
- }
- return ret;
- }
- public override bool Close()
- {
- bool ret = false;
- lock (this)
- {
- if (mTCP != null)
- {
- try
- {
- this.mTCP.Close();
- }
- catch (Exception err)
- {
- log.Error(err.Message, err);
- }
- this.mTCP = null;
- ret = true;
- }
- }
- if (ret)
- {
- lock (mSendQueue)
- {
- this.mSendQueue.Clear();
- }
- if (mReadThread != null)
- {
- try
- {
- this.mReadThread.Join(1000);
- }
- catch (Exception err)
- {
- log.Error(err.Message, err);
- }
- this.mReadThread = null;
- }
- lock (mSendQueue)
- {
- this.mSendQueue.Clear();
- }
- if (mWriteThread != null)
- {
- try
- {
- this.mWriteThread.Join(1000);
- }
- catch (Exception err)
- {
- log.Error(err.Message, err);
- }
- this.mWriteThread = null;
- }
- }
- return ret;
- }
- //-------------------------------------------------------------------------------------
- /// <summary>
- /// 发送一个消息,该方法将立即返回。
- /// </summary>
- /// <param name="data"></param>
- public override void Send(Object data)
- {
- if (mTCP != null)
- {
- lock (mSendQueue)
- {
- mSendQueue.Enqueue(data);
- // 通知写线程开始工作。
- Monitor.PulseAll(mSendQueue);
- }
- }
- }
- public override void SendResponse(IMessage rsponse, int requestMessageID)
- {
- rsponse.MessageID = requestMessageID;
- Send(rsponse);
- }
- //-------------------------------------------------------------------------------------
- override public string ToString()
- {
- return "Session[" + URL + "](" + GetHashCode() + ")";
- }
- //-------------------------------------------------------------------------------------
- private void onClose()
- {
- lock (mSendQueue)
- {
- mSendQueue.Clear();
- try
- {
- if (send_buff != null) send_buff.Dispose();
- if (recv_buff != null) recv_buff.Dispose();
- }
- catch (Exception err)
- {
- log.Error(err.Message, err);
- }
- send_buff = null;
- recv_buff = null;
- }
- mListener.sessionClosed(this);
- if (mOnSessionClosed != null)
- {
- mOnSessionClosed.Invoke(this);
- }
- }
- private void onOpen()
- {
- mListener.sessionOpened(this);
- if (mOnSessionOpened != null)
- {
- mOnSessionOpened.Invoke(this);
- }
- }
- private void onReceive(Object message)
- {
- try
- {
- mListener.messageReceived(this, message);
- if (mOnMessageReceived != null)
- {
- mOnMessageReceived.Invoke(this, message);
- }
- }
- catch (Exception err)
- {
- log.Error(err.Message, err);
- onException(err);
- }
- }
- private void onSent(Object message)
- {
- mListener.messageSent(this, message);
- if (mOnMessageSent != null)
- {
- mOnMessageSent.Invoke(this, message);
- }
- }
- private void onException(Exception err)
- {
- mListener.onError(this, err);
- if (mOnError != null)
- {
- mOnError.Invoke(this, err);
- }
- }
- //-------------------------------------------------------------------------------------
- private void innerClose()
- {
- lock (this)
- {
- if (mTCP != null)
- {
- try
- {
- this.mTCP.Close();
- }
- catch (Exception err)
- {
- log.Error(err.Message, err);
- }
- }
- }
- }
- private void runWrite()
- {
- TcpClient _socket = mTCP;
- try
- {
- onOpen();
- Stream output = _socket.GetStream();
- while (_socket.Connected)
- {
- lock (mSendQueue)
- {
- mSendingArray.Clear();
- if (mSendQueue.Count > 0)
- {
- mSendingArray.AddRange(mSendQueue);
- mSendQueue.Clear();
- }
- else
- {
- // 如果没有待传输消息,则等待输入
- Monitor.Wait(mSendQueue, 100);
- }
- }
- if (mSendingArray.Count > 0 && _socket.Connected)
- {
- for (int i = 0; i < mSendingArray.Count; i++)
- {
- object send_msg = mSendingArray[i];
- try
- {
- int sendBytes;
- if (doEncode(output, send_msg, out sendBytes))
- {
- output.Flush();
- mSendPacks++;
- onSent(send_msg);
- }
- mSendBytes += sendBytes;
- }
- catch (Exception err)
- {
- try
- {
- if (_socket.Connected)
- {
- log.Error(err.Message, err);
- innerClose();
- onException(new NetException(err.Message, err));
- }
- }
- catch (Exception err2)
- {
- log.Error(err.Message, err2);
- }
- break;
- }
- }
- }
- }
- }
- catch (Exception err)
- {
- log.Error(err.Message, err);
- onException(new NetException("/n[runWrite:]" + URL +
- "/n[InnerException:]" + err.InnerException +
- "/n[Exception:]" + err.Message +
- "/n[Source:]" + err.Source +
- "/n[StackTrace:]" + err.StackTrace, err));
- }
- finally
- {
- onClose();
- }
- }
- private void runRead()
- {
- TcpClient _socket = mTCP;
- try
- {
- Stream input = _socket.GetStream();
- while (_socket.Connected)
- {
- try
- {
- Object msg = null;
- int readBytes;
- if (doDecode(input, out msg, out readBytes))
- {
- mRecvPacks++;
- onReceive(msg);
- }
- mRecvBytes += readBytes;
- }
- catch (Exception err)
- {
- try
- {
- if (_socket.Connected)
- {
- log.Error(err.Message, err);
- innerClose();
- onException(new NetException(err.Message, err));
- }
- }
- catch (Exception err2)
- {
- log.Error(err.Message, err2);
- }
- break;
- }
- }
- }
- catch (Exception err)
- {
- log.Error(err.Message, err);
- onException(new NetException("runRead: " + err.Message));
- }
- }
- protected static readonly byte[] ZERO_BUFF = new byte[0];
- protected MemoryStream send_buff;
- protected MemoryStream recv_buff;
- protected virtual bool doEncode(Stream output, object send_msg, out int sendBytes)
- {
- sendBytes = 0;
- send_buff.Position = 0;
- if (Codec.doEncode(send_buff, send_msg))
- {
- byte[] h_body = send_buff.GetBuffer();
- int length = (int)send_buff.Position;
- LittleEdian.PutS32(output, length);
- output.Write(h_body, 0, length);
- sendBytes = length + 4;
- return true;
- }
- return false;
- }
- protected virtual bool doDecode(Stream input, out object msg, out int readBytes)
- {
- int length = LittleEdian.GetS32(input);
- if (length == -1)
- {
- msg = null;
- readBytes = 0;
- return false;
- }
- else if (length <= 0)
- {
- throw new Exception("received negative trunk size=" + length);
- }
- if (recv_buff.Capacity < length)
- {
- recv_buff.Capacity = length;
- }
- recv_buff.Position = 0;
- recv_buff.SetLength(length);
- byte[] buffer = recv_buff.GetBuffer();
- IOUtil.ReadToEnd(input, buffer, 0, length);
- readBytes = length + 4;
- recv_buff.Position = 0;
- if (Codec.doDecode(recv_buff, out msg))
- {
- if (recv_buff.Position != length)
- {
- throw new Exception(string.Format("can not decode full trunk size={0} type={1}", length, msg != null ? msg.GetType().FullName : msg));
- }
- return true;
- }
- return false;
- }
- }
- }
|