123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340 |
- using CommonLang.Log;
- using CommonLang.Net;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- using System.IO;
- using SuperSocket.SocketBase.Protocol;
- using SuperSocket.SocketBase;
- using SuperSocket.Facility.Protocol;
- using CommonLang.ByteOrder;
- using SuperSocket.SocketBase.Config;
- using System.Collections.Concurrent;
- using CommonLang;
- namespace Pomelo
- {
- public class FuckFastStream : FastStream
- {
- public static int MAX_REQUEST_LEN = 1024 * 1024 * 10; // 10M
- private static Logger log;
- private FuckAppServer acceptor;
- public FuckFastStream()
- {
- log = LoggerFactory.GetLogger("FuckStream");
- }
- public override void Start(FastStreamConfig fcfg, IZone zone)
- {
- try
- {
- this.acceptor = new FuckAppServer(zone);
- var cfg = new ServerConfig();
- {
- cfg.SyncSend = false;
- cfg.Port = fcfg.port;
- cfg.Mode = SocketMode.Tcp;
- cfg.DisableSessionSnapshot = true;
- cfg.LogCommand = false;
- cfg.LogBasicSessionActivity = false;
- cfg.Name = "FuckAppServer";
- cfg.ClearIdleSession = false;
- cfg.ClearIdleSessionInterval = 10;
- cfg.IdleSessionTimeOut = 1000000;
- cfg.SendingQueueSize = 40960;
- cfg.SendTimeOut = 5000;
- cfg.TextEncoding = "UTF-8";
- cfg.MaxConnectionNumber = 1000;
- cfg.MaxRequestLength = MAX_REQUEST_LEN;
- }
- if (acceptor.Setup(cfg))
- {
- this.acceptor.Start();
- log.Warn("start fuck stream on port:" + fcfg.port);
- }
- else
- {
- throw new Exception("Open Server Failed !");
- }
- }
- catch (Exception err)
- {
- log.Error(err.Message, err);
- throw;
- }
- }
- public override void Stop()
- {
- try
- {
- this.acceptor.Stop();
- }
- catch (Exception err)
- {
- log.Error(err.Message, err);
- throw;
- }
- try
- {
- this.acceptor.Dispose();
- }
- catch (Exception err)
- {
- log.Error(err.Message, err);
- throw;
- }
- }
- public override void Send(IFastSession session, string uid,string instanceId, ArraySegment<byte> data)
- {
- //FuckSession test = session as FuckSession;
- //Console.WriteLine(" - sendMsg - " + test.RemoteEndPoint.ToString());
- acceptor.SendInternal(session as FuckSession, uid, instanceId,data);
- }
- public override IFastSession GetSessionByID(string sessionID)
- {
- return acceptor.GetFuckSessionByID(sessionID);
- }
- //--------------------------------------------------------------------------------------------------------------------------------------------------------
- class FuckAppServer : AppServer<FuckSession, BinaryRequestInfo>
- {
- private Logger log = LoggerFactory.GetLogger("FuckAppServer");
- private ConcurrentDictionary<string, FuckSession> sessions = new ConcurrentDictionary<string, FuckSession>();
- private IZone zone;
- public FuckAppServer(IZone zone) : base(new FuckReceiveFilterFactory())
- {
- this.zone = zone;
- this.NewRequestReceived += AppServerImpl_NewRequestReceived;
- }
- public IFastSession GetFuckSessionByID(string sessionID)
- {
- FuckSession session;
- sessions.TryGetValue(sessionID, out session);
- return session;
- }
- protected override void OnStopped()
- {
- sessions.Clear();
- base.OnStopped();
- }
- protected override void OnNewSessionConnected(FuckSession session)
- {
- base.OnNewSessionConnected(session);
- log.Warn("A Session Connected : " + session);
- }
- protected override void OnSessionClosed(FuckSession session, CloseReason reason)
- {
- base.OnSessionClosed(session, reason);
- log.Warn("A Session Closed : " + session.ConnectorId + " : reason=" + reason);
- }
- private void AppServerImpl_NewRequestReceived(FuckSession session, BinaryRequestInfo bin)
- {
- try
- {
-
- //注意要捕获异常
- if (session.ConnectorId == null && "connetorId".Equals(bin.Key))
- {
- //connetorId为握手协议,表示了session的身份
- string connetorId = Encoding.UTF8.GetString(bin.Body);
- //ice绑定fastSocket
- if (!IceManager.instance().setFastSession(connetorId, session))
- {
- session.Close();
- return;
- }
-
- log.Warn("A Session Binded : " + session + " : connetorId=" + connetorId + ", info:" + session.RemoteEndPoint.ToString());
- session.ConnectorId = connetorId;
- sessions.AddOrUpdate(connetorId, session, (key, oldValue) => session);
- zone.onNetReconnect(connetorId, session);
- /*
- //客户端版本号信息
- using (var stream = new MemoryStream(5))
- {
- Composer.writeU8(stream, (ushort)200);
- Composer.writeU32(stream, (uint)bsVersion);
- session.Send(stream.GetBuffer(), 0, (int)stream.Position);
- }
- */
- }
- else
- {
- //发送数据到场景
- zone.PlayerReceive(bin.Key, bin.Body);
- }
- }
- catch (Exception e)
- {
- log.Error(e.Message, e);
- }
- }
- internal void SendInternal(FuckSession session, string uid, string instanceId, ArraySegment<byte> data)
- {
- try
- {
- byte[] bytesUid = System.Text.UTF8Encoding.UTF8.GetBytes(uid);
- //byte[] bytesInstanceId = System.Text.UTF8Encoding.UTF8.GetBytes(instanceId);
- int contentSize = 5 + bytesUid.Length + data.Count;// + bytesInstanceId.Length;
-
- //int lengthSize = Composer.calLengthSize(contentSize); // pomelo协议(待废弃)
- using (var stream = new MemoryStream(contentSize))
- {
- //composer head
- //Composer.writeLength(stream, contentSize, lengthSize);// pomelo协议(待废弃)
- //协议 head
- Composer.writeU8(stream, (ushort)bytesUid.Length);
- //Composer.writeU16(stream, (ushort)bytesInstanceId.Length);
- Composer.writeU32(stream, (uint)data.Count);
- //uid
- Composer.writeBytes(stream, bytesUid, 0, bytesUid.Length);
- //instanceId
- //Composer.writeBytes(stream, bytesInstanceId, 0, bytesInstanceId.Length);
- //data
- Composer.writeBytes(stream, data.Array, data.Offset, data.Count);
- //
- session.Send(stream.GetBuffer(), 0, (int)stream.Position);
- }
- }
- catch (Exception err)
- {
- log.Error(err.Message, err);
- session.Close(CloseReason.InternalError);
- }
- }
- }
- public class FuckSession : AppSession<FuckSession, BinaryRequestInfo>, IFastSession
- {
- public string ConnectorId { get; internal set; }
- public void doClose()
- {
- this.Close();
- }
- public override void Initialize(IAppServer<FuckSession, BinaryRequestInfo> appServer, ISocketSession socketSession)
- {
- base.Initialize(appServer, socketSession);
- socketSession.Client.NoDelay = true;
- }
- public bool IsConnected()
- {
- return this.Connected;
- }
- public string GetDescribe()
- {
- string des = "";
- try
- {
- des = this.LocalEndPoint.ToString();
- }
- catch(System.Exception e)
- {
- log.Warn("GetDescribe catch: " + e);
- }
- return des;
- }
- }
- class FuckReceiveFilterFactory : IReceiveFilterFactory<BinaryRequestInfo>
- {
- public IReceiveFilter<BinaryRequestInfo> CreateFilter(IAppServer appServer, IAppSession appSession, System.Net.IPEndPoint remoteEndPoint)
- {
- return new FuckReceiveFilter(appSession as FuckSession);
- }
- }
- class FuckReceiveFilter : FixedHeaderReceiveFilter<BinaryRequestInfo>
- {
- private readonly FuckSession appSession;
- public FuckReceiveFilter(FuckSession appSession) : base(6)
- {
- this.appSession = appSession;
- }
- protected override int GetBodyLengthFromHeader(byte[] header, int offset, int length)
- {
- int pos = offset;
- int idLength = LittleEdian.GetS16(header, ref pos);
- int bodyLength = LittleEdian.GetS32(header, ref pos);
- if (bodyLength > MAX_REQUEST_LEN)
- {
- appSession.Close(CloseReason.InternalError);
- throw new Exception(string.Format("FuckReceiveFilter bodyLength:{0} out of limit:{1} ", bodyLength, MAX_REQUEST_LEN));
- }
- return bodyLength + idLength;
- }
- protected override BinaryRequestInfo ResolveRequestInfo(ArraySegment<byte> header, byte[] bodyBuffer, int offset, int length)
- {
- int headerOffset = header.Offset;
- int idLength = LittleEdian.GetS16(header.Array, ref headerOffset);
- int dataLength = LittleEdian.GetS32(header.Array, ref headerOffset);
- if (length != idLength + dataLength)
- {
- appSession.Close(CloseReason.InternalError);
- throw new Exception("Body Length not equal idLength + dataLength");
- }
- string id = Encoding.UTF8.GetString(bodyBuffer, offset, idLength);
- byte[] body = new byte[dataLength];
- Buffer.BlockCopy(bodyBuffer, offset + idLength, body, 0, dataLength);
- return new BinaryRequestInfo(id, body);
- }
- }
- static class Composer
- {
- private const int LEFT_SHIFT_BITS = 1 << 7;
- public static int calLengthSize(int length)
- {
- int res = 0;
- while (length > 0)
- {
- length = length >> 7;
- res++;
- }
- return res;
- }
- public static void writeLength(Stream stream, int data, int size)
- {
- int offset = size - 1, b;
- byte[] bytes = new byte[size];
- for (; offset >= 0; offset--)
- {
- b = data % LEFT_SHIFT_BITS;
- if (offset < size - 1)
- {
- b |= 0x80;
- }
- bytes[offset] = (byte)b;
- data = data >> 7;
- }
- stream.Write(bytes, 0, bytes.Length);
- }
- public static void writeU8(Stream stream, UInt16 value)
- {
- stream.WriteByte((byte)(value));
- }
- public static void writeU16(Stream stream, UInt16 value)
- {
- stream.WriteByte((byte)(value));
- stream.WriteByte((byte)(value >> 8));
- }
- public static void writeU32(Stream stream, UInt32 value)
- {
- stream.WriteByte((byte)(value));
- stream.WriteByte((byte)(value >> 8));
- stream.WriteByte((byte)(value >> 16));
- stream.WriteByte((byte)(value >> 24));
- }
- public static void writeBytes(Stream stream, byte[] value, int offset, int count)
- {
- stream.Write(value, offset, count);
- }
- }
- }
- }
|