using CommonLang.Log; using CommonLang.Net; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.IO; using SuperSocket.SocketBase.Protocol; using SuperSocket.SocketBase; using SuperSocket.Facility.Protocol; using CommonLang.ByteOrder; using SuperSocket.SocketBase.Config; using System.Collections.Concurrent; using CommonLang; namespace Pomelo { public class FuckFastStream : FastStream { public static int MAX_REQUEST_LEN = 1024 * 1024 * 10; // 10M private static Logger log; private FuckAppServer acceptor; public FuckFastStream() { log = LoggerFactory.GetLogger("FuckStream"); } public override void Start(FastStreamConfig fcfg, IZone zone) { try { this.acceptor = new FuckAppServer(zone); var cfg = new ServerConfig(); { cfg.SyncSend = false; cfg.Port = fcfg.port; cfg.Mode = SocketMode.Tcp; cfg.DisableSessionSnapshot = true; cfg.LogCommand = false; cfg.LogBasicSessionActivity = false; cfg.Name = "FuckAppServer"; cfg.ClearIdleSession = false; cfg.ClearIdleSessionInterval = 10; cfg.IdleSessionTimeOut = 1000000; cfg.SendingQueueSize = 40960; cfg.SendTimeOut = 5000; cfg.TextEncoding = "UTF-8"; cfg.MaxConnectionNumber = 1000; cfg.MaxRequestLength = MAX_REQUEST_LEN; } if (acceptor.Setup(cfg)) { this.acceptor.Start(); log.Warn("start fuck stream on port:" + fcfg.port); } else { throw new Exception("Open Server Failed !"); } } catch (Exception err) { log.Error(err.Message, err); throw; } } public override void Stop() { try { this.acceptor.Stop(); } catch (Exception err) { log.Error(err.Message, err); throw; } try { this.acceptor.Dispose(); } catch (Exception err) { log.Error(err.Message, err); throw; } } public override void Send(IFastSession session, string uid,string instanceId, ArraySegment data) { //FuckSession test = session as FuckSession; //Console.WriteLine(" - sendMsg - " + test.RemoteEndPoint.ToString()); acceptor.SendInternal(session as FuckSession, uid, instanceId,data); } public override IFastSession GetSessionByID(string sessionID) { return acceptor.GetFuckSessionByID(sessionID); } //-------------------------------------------------------------------------------------------------------------------------------------------------------- class FuckAppServer : AppServer { private Logger log = LoggerFactory.GetLogger("FuckAppServer"); private ConcurrentDictionary sessions = new ConcurrentDictionary(); private IZone zone; public FuckAppServer(IZone zone) : base(new FuckReceiveFilterFactory()) { this.zone = zone; this.NewRequestReceived += AppServerImpl_NewRequestReceived; } public IFastSession GetFuckSessionByID(string sessionID) { FuckSession session; sessions.TryGetValue(sessionID, out session); return session; } protected override void OnStopped() { sessions.Clear(); base.OnStopped(); } protected override void OnNewSessionConnected(FuckSession session) { base.OnNewSessionConnected(session); log.Warn("A Session Connected : " + session); } protected override void OnSessionClosed(FuckSession session, CloseReason reason) { base.OnSessionClosed(session, reason); log.Warn("A Session Closed : " + session.ConnectorId + " : reason=" + reason); } private void AppServerImpl_NewRequestReceived(FuckSession session, BinaryRequestInfo bin) { try { //注意要捕获异常 if (session.ConnectorId == null && "connetorId".Equals(bin.Key)) { //connetorId为握手协议,表示了session的身份 string connetorId = Encoding.UTF8.GetString(bin.Body); //ice绑定fastSocket if (!IceManager.instance().setFastSession(connetorId, session)) { session.Close(); return; } log.Warn("A Session Binded : " + session + " : connetorId=" + connetorId + ", info:" + session.RemoteEndPoint.ToString()); session.ConnectorId = connetorId; sessions.AddOrUpdate(connetorId, session, (key, oldValue) => session); /* //客户端版本号信息 using (var stream = new MemoryStream(5)) { Composer.writeU8(stream, (ushort)200); Composer.writeU32(stream, (uint)bsVersion); session.Send(stream.GetBuffer(), 0, (int)stream.Position); } */ } else { //发送数据到场景 zone.PlayerReceive(bin.Key, bin.Body); } } catch (Exception e) { log.Error(e.Message, e); } } internal void SendInternal(FuckSession session, string uid, string instanceId, ArraySegment data) { try { byte[] bytesUid = System.Text.UTF8Encoding.UTF8.GetBytes(uid); //byte[] bytesInstanceId = System.Text.UTF8Encoding.UTF8.GetBytes(instanceId); int contentSize = 5 + bytesUid.Length + data.Count;// + bytesInstanceId.Length; //int lengthSize = Composer.calLengthSize(contentSize); // pomelo协议(待废弃) using (var stream = new MemoryStream(contentSize)) { //composer head //Composer.writeLength(stream, contentSize, lengthSize);// pomelo协议(待废弃) //协议 head Composer.writeU8(stream, (ushort)bytesUid.Length); //Composer.writeU16(stream, (ushort)bytesInstanceId.Length); Composer.writeU32(stream, (uint)data.Count); //uid Composer.writeBytes(stream, bytesUid, 0, bytesUid.Length); //instanceId //Composer.writeBytes(stream, bytesInstanceId, 0, bytesInstanceId.Length); //data Composer.writeBytes(stream, data.Array, data.Offset, data.Count); // session.Send(stream.GetBuffer(), 0, (int)stream.Position); } } catch (Exception err) { log.Error(err.Message, err); session.Close(CloseReason.InternalError); } } } class FuckSession : AppSession, IFastSession { public string ConnectorId { get; internal set; } public void doClose() { this.Close(); } public override void Initialize(IAppServer appServer, ISocketSession socketSession) { base.Initialize(appServer, socketSession); socketSession.Client.NoDelay = true; } public bool IsConnected() { return this.Connected; } public string GetDescribe() { string des = ""; try { des = this.LocalEndPoint.ToString(); } catch(System.Exception e) { log.Warn("GetDescribe catch: " + e); } return des; } } class FuckReceiveFilterFactory : IReceiveFilterFactory { public IReceiveFilter CreateFilter(IAppServer appServer, IAppSession appSession, System.Net.IPEndPoint remoteEndPoint) { return new FuckReceiveFilter(appSession as FuckSession); } } class FuckReceiveFilter : FixedHeaderReceiveFilter { private readonly FuckSession appSession; public FuckReceiveFilter(FuckSession appSession) : base(6) { this.appSession = appSession; } protected override int GetBodyLengthFromHeader(byte[] header, int offset, int length) { int pos = offset; int idLength = LittleEdian.GetS16(header, ref pos); int bodyLength = LittleEdian.GetS32(header, ref pos); if (bodyLength > MAX_REQUEST_LEN) { appSession.Close(CloseReason.InternalError); throw new Exception(string.Format("FuckReceiveFilter bodyLength:{0} out of limit:{1} ", bodyLength, MAX_REQUEST_LEN)); } return bodyLength + idLength; } protected override BinaryRequestInfo ResolveRequestInfo(ArraySegment header, byte[] bodyBuffer, int offset, int length) { int headerOffset = header.Offset; int idLength = LittleEdian.GetS16(header.Array, ref headerOffset); int dataLength = LittleEdian.GetS32(header.Array, ref headerOffset); if (length != idLength + dataLength) { appSession.Close(CloseReason.InternalError); throw new Exception("Body Length not equal idLength + dataLength"); } string id = Encoding.UTF8.GetString(bodyBuffer, offset, idLength); byte[] body = new byte[dataLength]; Buffer.BlockCopy(bodyBuffer, offset + idLength, body, 0, dataLength); return new BinaryRequestInfo(id, body); } } static class Composer { private const int LEFT_SHIFT_BITS = 1 << 7; public static int calLengthSize(int length) { int res = 0; while (length > 0) { length = length >> 7; res++; } return res; } public static void writeLength(Stream stream, int data, int size) { int offset = size - 1, b; byte[] bytes = new byte[size]; for (; offset >= 0; offset--) { b = data % LEFT_SHIFT_BITS; if (offset < size - 1) { b |= 0x80; } bytes[offset] = (byte)b; data = data >> 7; } stream.Write(bytes, 0, bytes.Length); } public static void writeU8(Stream stream, UInt16 value) { stream.WriteByte((byte)(value)); } public static void writeU16(Stream stream, UInt16 value) { stream.WriteByte((byte)(value)); stream.WriteByte((byte)(value >> 8)); } public static void writeU32(Stream stream, UInt32 value) { stream.WriteByte((byte)(value)); stream.WriteByte((byte)(value >> 8)); stream.WriteByte((byte)(value >> 16)); stream.WriteByte((byte)(value >> 24)); } public static void writeBytes(Stream stream, byte[] value, int offset, int count) { stream.Write(value, offset, count); } } } }