123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552 |
- using System;
- using System.Collections;
- using System.Collections.Generic;
- using System.Net.Sockets;
- using System.IO;
- using System.Threading;
- using CommonNetwork.Net;
- using CommonLang.IO;
- using CommonLang.ByteOrder;
- using CommonLang.Protocol;
- using System.Net;
- using CommonLang.Net;
- using CommonLang;
- using CommonLang.Log;
- namespace CommonNetwork.Sockets
- {
- public class NetSessionAsync : BaseNetSession
- {
- private Logger log = LoggerFactory.GetLogger("NetSessionAsync");
- private Socket mTCP = null;
- protected INetPackageCodec mCodec;
- private INetSessionListener mListener;
- private Queue<Object> mSendQueue = new Queue<Object>();
- public NetSessionAsync()
- {
- }
- /// <summary>
- /// 判断当前网络是否已经连接
- /// </summary>
- /// <returns></returns>
- public override bool IsConnected
- {
- get
- {
- Socket tcp = mTCP;
- if (tcp != null)
- {
- return tcp.Connected;
- }
- return false;
- }
- }
- public override INetPackageCodec Codec
- {
- get { return mCodec; }
- }
- public override IPEndPoint RemoteAddress
- {
- get
- {
- if (mTCP != null) return mTCP.RemoteEndPoint as IPEndPoint;
- return null;
- }
- }
- public Socket Session
- {
- get { return mTCP; }
- }
- private void onException(Exception err)
- {
- mListener.onError(this, err);
- if (mOnError != null)
- {
- mOnError.Invoke(this, err);
- }
- }
- //-------------------------------------------------------------------------------------
- #region Open
- public override bool Open(string url, INetPackageCodec codec, INetSessionListener listener)
- {
- bool ret = false;
- try
- {
- lock (this)
- {
- if (mTCP == null)
- {
- this.mCodec = codec;
- this.mListener = listener;
- this.mURL = url;
- string[] url_kv = url.Split(':');
- //this.mRemoteAddress = IPUtil.ToEndPoint(url_kv[0], int.Parse(url_kv[1]));
- //new IPEndPoint(IPAddress.Parse(url_kv[0]), int.Parse(url_kv[1]));
- lock (mSendQueue) this.mSendQueue.Clear();
- // 建立SOCKET链接
- this.mTCP = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
- //this.mTCP.ReceiveTimeout = 5000;
- //this.mTCP.SendTimeout = 5000;
- this.mTCP.NoDelay = true;
- this.mTCP.BeginConnect(url_kv[0], int.Parse(url_kv[1]), endConnect, mTCP);
- //创建读写线程对象
- ret = true;
- }
- }
- }
- catch (Exception err)
- {
- log.Error(err.Message, err);
- onException(new NetException("\n[Open:]" + URL +
- "\n[InnerException:]" + err.InnerException +
- "\n[Exception:]" + err.Message +
- "\n[Source:]" + err.Source +
- "\n[StackTrace:]" + err.StackTrace));
- }
- return ret;
- }
- private void endConnect(IAsyncResult result)
- {
- mTCP.EndConnect(result);
- if (result.IsCompleted)
- {
- onOpen();
- startReceiveHead();
- }
- else
- {
- Close();
- }
- }
- private void onOpen()
- {
- mListener.sessionOpened(this);
- if (mOnSessionOpened != null)
- {
- mOnSessionOpened.Invoke(this);
- }
- }
- #endregion
- //-------------------------------------------------------------------------------------
- #region Close
- public override bool Close()
- {
- bool ret = false;
- lock (this)
- {
- if (mTCP != null)
- {
- try
- {
- this.mTCP.Close();
- }
- catch (Exception err)
- {
- log.Error(err.Message, err);
- }
- finally
- {
- lock (mSendQueue)
- {
- this.mSendQueue.Clear();
- }
- }
- this.mTCP = null;
- ret = true;
- onClose();
- }
- }
- return ret;
- }
- private void onClose()
- {
- lock (mSendQueue)
- {
- mSendQueue.Clear();
- }
- mListener.sessionClosed(this);
- if (mOnSessionClosed != null)
- {
- mOnSessionClosed.Invoke(this);
- }
- }
- #endregion
- //-------------------------------------------------------------------------------------
- //-------------------------------------------------------------------------------------
- //-------------------------------------------------------------------------------------
- //-------------------------------------------------------------------------------------
- //-------------------------------------------------------------------------------------------------------------------------------
- #region Send
- /// <summary>
- /// 发送一个消息,该方法将立即返回。
- /// </summary>
- /// <param name="data"></param>
- public override void Send(Object data)
- {
- lock (this)
- {
- if (mTCP != null)
- {
- lock (mSendQueue)
- {
- mSendQueue.Enqueue(data);
- }
- // 通知写线程开始工作。
- startSend();
- }
- }
- }
- public override void SendResponse(IMessage rsponse, int requestMessageID)
- {
- rsponse.MessageID = requestMessageID;
- Send(rsponse);
- }
- private void onSent(Object message)
- {
- mListener.messageSent(this, message);
- if (mOnMessageSent != null)
- {
- mOnMessageSent.Invoke(this, message);
- }
- }
- private class SendObject
- {
- public readonly List<object> sending = new List<object>();
- public readonly MemoryStream buffer = new MemoryStream(1024);
- }
- private class SendObjectPool
- {
- private ObjectPool<SendObject> s_Pool = new ObjectPool<SendObject>(s_ListPool_OnCreate);
- private static SendObject s_ListPool_OnCreate()
- {
- return new SendObject();
- }
- public SendObject Alloc()
- {
- SendObject ret = s_Pool.Get();
- ret.buffer.Position = 0;
- ret.buffer.SetLength(0);
- ret.sending.Clear();
- return ret;
- }
- public void Release(SendObject toRelease)
- {
- toRelease.sending.Clear();
- s_Pool.Release(toRelease);
- }
- }
- private SendObjectPool mSendPool = new SendObjectPool();
- private void startSend()
- {
- var sending = mSendPool.Alloc();
- try
- {
- lock (mSendQueue)
- {
- if (mSendQueue.Count > 0)
- {
- sending.sending.AddRange(mSendQueue);
- mSendQueue.Clear();
- }
- }
- if (sending.sending.Count > 0 && mTCP.Connected)
- {
- for (int i = 0; i < sending.sending.Count; i++)
- {
- object send_msg = sending.sending[i];
- doEncode(sending.buffer, send_msg);
- }
- }
- mTCP.BeginSend(sending.buffer.GetBuffer(), 0, (int)sending.buffer.Length, SocketFlags.None, endSend, sending);
- }
- catch (Exception err)
- {
- mSendPool.Release(sending);
- log.Error(err.Message, err);
- onException(new NetException("\n[runWrite:]" + URL +
- "\n[InnerException:]" + err.InnerException +
- "\n[Exception:]" + err.Message +
- "\n[Source:]" + err.Source +
- "\n[StackTrace:]" + err.StackTrace, err));
- this.Close();
- }
- }
- private void endSend(IAsyncResult result)
- {
- if (result.IsCompleted)
- {
- var sending = result.AsyncState as SendObject;
- try
- {
- int length = mTCP.EndSend(result);
- if (length > 0)
- {
- mSendBytes += length;
- mSendPacks += sending.sending.Count;
- for (int i = 0; i < sending.sending.Count; i++)
- {
- object send_msg = sending.sending[i];
- onSent(send_msg);
- }
- sending.sending.Clear();
- }
- else
- {
- Close();
- }
- }
- catch (Exception err)
- {
- log.Error(err.Message, err);
- onException(new NetException("endReceive: " + err.Message));
- this.Close();
- }
- finally
- {
- mSendPool.Release(sending);
- }
- }
- else
- {
- log.Info("Continue send !");
- }
- }
- #endregion
- //-------------------------------------------------------------------------------------------------------------------------------
- #region Receive
- private void onReceive(Object message)
- {
- try
- {
- mListener.messageReceived(this, message);
- if (mOnMessageReceived != null)
- {
- mOnMessageReceived.Invoke(this, message);
- }
- }
- catch (Exception err)
- {
- log.Error(err.Message, err);
- onException(err);
- }
- }
- private class ReceiveObject
- {
- public readonly byte[] head = new byte[4];
- public int head_position = 0;
- public int body_length = 0;
- public int body_position = 0;
- public readonly MemoryStream body_buffer = new MemoryStream(1024);
- }
- private class ReceiveObjectPool
- {
- private ObjectPool<ReceiveObject> s_Pool = new ObjectPool<ReceiveObject>(s_ListPool_OnCreate);
- private static ReceiveObject s_ListPool_OnCreate()
- {
- return new ReceiveObject();
- }
- public ReceiveObject Alloc()
- {
- ReceiveObject ret = s_Pool.Get();
- ret.body_buffer.Position = 0;
- ret.body_buffer.SetLength(0);
- ret.head_position = 0;
- ret.body_length = 0;
- ret.body_position = 0;
- return ret;
- }
- public void Release(ReceiveObject toRelease)
- {
- s_Pool.Release(toRelease);
- }
- }
- private ReceiveObjectPool mReceivePool = new ReceiveObjectPool();
- private void startReceiveHead()
- {
- var recv_object = mReceivePool.Alloc();
- try
- {
- mTCP.BeginReceive(
- recv_object.head,
- recv_object.head_position,
- recv_object.head.Length - recv_object.head_position,
- SocketFlags.None, endReceiveHead, recv_object);
- }
- catch (Exception err)
- {
- mReceivePool.Release(recv_object);
- log.Error(err.Message, err);
- onException(new NetException("endReceive: " + err.Message));
- this.Close();
- }
- }
- private void endReceiveHead(IAsyncResult result)
- {
- var recv_object = result.AsyncState as ReceiveObject;
- try
- {
- int length = mTCP.EndReceive(result);
- if (length > 0)
- {
- mRecvBytes += length;
- recv_object.head_position += length;
- if (recv_object.head_position == recv_object.head.Length)
- {
- recv_object.body_length = GetBodyLength(recv_object.head);
- recv_object.body_position = 0;
- if (recv_object.body_buffer.Capacity < recv_object.body_length)
- {
- recv_object.body_buffer.Capacity = recv_object.body_length;
- }
- recv_object.body_buffer.SetLength(recv_object.body_length);
- startReceiveBody(recv_object);
- }
- else if (recv_object.head_position > recv_object.head.Length)
- {
- throw new NetException("Receive head overfollow");
- }
- else
- {
- startReceiveHead();
- }
- }
- else
- {
- throw new NetException("Receive 0 bytes!");
- }
- }
- catch (Exception err)
- {
- mReceivePool.Release(recv_object);
- log.Error(err.Message, err);
- onException(new NetException("endReceive: " + err.Message));
- this.Close();
- }
- }
- private void startReceiveBody(ReceiveObject recv_object)
- {
- try
- {
- mTCP.BeginReceive(
- recv_object.body_buffer.GetBuffer(),
- recv_object.body_position,
- recv_object.body_length - recv_object.body_position,
- SocketFlags.None, endReceiveBody, recv_object);
- }
- catch (Exception err)
- {
- mReceivePool.Release(recv_object);
- log.Error(err.Message, err);
- onException(new NetException("endReceive: " + err.Message));
- this.Close();
- }
- }
- private void endReceiveBody(IAsyncResult result)
- {
- var recv_object = result.AsyncState as ReceiveObject;
- try
- {
- int length = mTCP.EndReceive(result);
- if (length > 0)
- {
- mRecvBytes += length;
- recv_object.body_position += length;
- if (recv_object.body_position == recv_object.body_length)
- {
- Object msg = null;
- var input = recv_object.body_buffer;
- input.Position = 0;
- if (doDecode(input, out msg))
- {
- mRecvPacks++;
- onReceive(msg);
- }
- recv_object.head_position = 0;
- startReceiveHead();
- mReceivePool.Release(recv_object);
- }
- else if (recv_object.body_position > recv_object.body_length)
- {
- throw new NetException("Receive body overfollow");
- }
- else
- {
- startReceiveBody(recv_object);
- }
- }
- else
- {
- throw new NetException("Receive 0 bytes!");
- }
- }
- catch (Exception err)
- {
- mReceivePool.Release(recv_object);
- log.Error(err.Message, err);
- onException(new NetException("endReceive: " + err.Message));
- this.Close();
- }
- }
- #endregion
- //-------------------------------------------------------------------------------------------------------------------------------
- protected virtual int GetBodyLength(byte[] buffer)
- {
- int pos = 0;
- int length = LittleEdian.GetS32(buffer, ref pos);
- return length;
- }
- protected virtual bool doEncode(Stream output, object send_msg)
- {
- long old_position = output.Position;
- LittleEdian.PutS32(output, 0);
- if (Codec.doEncode(output, send_msg))
- {
- int full_length = (int)(output.Position - old_position);
- output.Position = old_position;
- LittleEdian.PutS32(output, full_length - 4);
- output.Position = old_position + full_length;
- return true;
- }
- return false;
- }
- protected virtual bool doDecode(Stream input, out object msg)
- {
- if (Codec.doDecode(input, out msg))
- {
- if (input.Position != input.Length)
- {
- throw new Exception(string.Format("can not decode full trunk size={0} type={1}", input.Length, msg != null ? msg.GetType().FullName : msg));
- }
- return true;
- }
- return false;
- }
- }
- }
|