#define CONVERT_MSG using CommonLang; using CommonLang.Concurrent; using CommonLang.IO; using SimpleJson; using System; using System.Collections.Generic; using System.IO; using System.Text; namespace Pomelo.DotNetClient { public enum PackageType { PKG_HANDSHAKE = 1, PKG_HANDSHAKE_ACK = 2, PKG_HEARTBEAT = 3, PKG_DATA = 4, PKG_KICK = 5 } public enum MessageType { MSG_REQUEST = 0, MSG_NOTIFY = 1, MSG_RESPONSE = 2, MSG_PUSH = 3 } //---------------------------------------------------------------------------------------------------- public abstract class Message : IDisposable { public static Encoding UTF8 = Encoding.UTF8; public const int FIXED_HEAD_SIZE = 4; public const int MSG_Route_Limit = 255; public const int MSG_Route_Mask = 0x01; public const int MSG_Type_Mask = 0x07; protected readonly MemoryStream buffer = new MemoryStream(1024); public MemoryStream Stream { get { return buffer; } } public byte[] Buffer { get { return buffer.GetBuffer(); } } public int BufferPosition { get { return (int)buffer.Position; } set { buffer.Position = value; } } public int BufferLength { get { return (int)buffer.Length; } protected set { if (value < FIXED_HEAD_SIZE) { throw new Exception("缓冲区不能小于固定长度 " + FIXED_HEAD_SIZE); } buffer.SetLength(value); } } public PackageType PkgType { get; protected set; } public int PkgLength { get; protected set; } //----------------------------------------------------------- protected Message() { } public abstract void Dispose(); public override string ToString() { return string.Format("{0}:capacity={1}", GetType().Name, buffer.Capacity); } //----------------------------------------------------------- public static void WriteShort(ushort value, Stream bytes) { bytes.WriteByte((byte)(value >> 8 & 0xff)); bytes.WriteByte((byte)(value & 0xff)); } public static ushort ReadShort(Stream bytes) { ushort result = 0; result += (ushort)(bytes.ReadByte() << 8); result += (ushort)(bytes.ReadByte()); return result; } public static uint DecodeUInt32(Stream bytes) { UInt64 value = 0; UInt64 b = 0; for (int i = 0; i <= 70; i += 7) { b = Convert.ToUInt64(bytes.ReadByte()); value |= ((b & 0x7F) << i); if ((b & 0x80) == 0) break; } return (uint)value; } public static void EncodeUInt32(uint n, Stream stream) { do { uint tmp = n % 128; uint next = n >> 7; if (next != 0) { tmp = tmp + 128; } stream.WriteByte(Convert.ToByte(tmp)); n = next; } while (n != 0); } //----------------------------------------------------------- } //---------------------------------------------------------------------------------------------------- public class SendMessage : Message { //CommonLang.Log.Logger log = CommonLang.Log.LoggerFactory.GetLogger("SendMessage"); private SendMessage() { } private static ObjectPool s_SendPool = new ObjectPool( () => { return new SendMessage(); }); private static AtomicLong max_pkg_size = new AtomicLong(0); public static long PoolSize { get { return s_SendPool.countAll; } } public static long PoolActive { get { return s_SendPool.countActive; } } public static long PoolInactive { get { return s_SendPool.countInactive; } } public static long MaxPackageSize { get { return max_pkg_size.Value; } } public static SendMessage Alloc(string route, uint id, ISerializer serializer, object msg) { SendMessage ret = s_SendPool.Get(); try { ret.BufferLength = FIXED_HEAD_SIZE; ret.BufferPosition = FIXED_HEAD_SIZE; ret.PkgType = PackageType.PKG_DATA; { ret.EncodeRouter(route, id); serializer.Serialize(ret.buffer, msg); ret.EncodeHead(); } return ret; } catch { ret.Dispose(); throw; } } public static SendMessage Alloc(string route, uint id, object msg, MessageEncoder encoder) { SendMessage ret = s_SendPool.Get(); try { ret.BufferLength = FIXED_HEAD_SIZE; ret.BufferPosition = FIXED_HEAD_SIZE; ret.PkgType = PackageType.PKG_DATA; { ret.EncodeRouter(route, id); if (msg != null && encoder != null) { encoder.Invoke(route, msg, ret.buffer); } ret.EncodeHead(); } return ret; } catch { ret.Dispose(); throw; } } public static SendMessage Alloc(string route, uint id, byte[] data) { SendMessage ret = s_SendPool.Get(); try { ret.BufferLength = FIXED_HEAD_SIZE; ret.BufferPosition = FIXED_HEAD_SIZE; ret.PkgType = PackageType.PKG_DATA; { ret.EncodeRouter(route, id); if (data != null) { ret.buffer.Write(data, 0, data.Length); } ret.EncodeHead(); } return ret; } catch { ret.Dispose(); throw; } } public static SendMessage Alloc(PackageType type, byte[] body = null) { SendMessage ret = s_SendPool.Get(); try { ret.BufferLength = FIXED_HEAD_SIZE; ret.BufferPosition = FIXED_HEAD_SIZE; ret.PkgType = type; if (body != null) { ret.buffer.Write(body, 0, body.Length); } ret.EncodeHead(); return ret; } catch { ret.Dispose(); throw; } } public static SendMessage Alloc(string route, uint id, MemoryStream stream) { var ret = s_SendPool.Get(); try { ret.BufferLength = FIXED_HEAD_SIZE; ret.BufferPosition = FIXED_HEAD_SIZE; ret.PkgType = PackageType.PKG_DATA; ret.EncodeRouter(route, id); if (stream != null) { ret.buffer.WriteByte(0); ret.buffer.WriteByte(0); ret.buffer.Write(stream.GetBuffer(), 0, (int)stream.Length); } ret.EncodeHead(); return ret; } catch { ret.Dispose(); throw; } } public override void Dispose() { buffer.Position = 0; s_SendPool.Release(this); } private void EncodeRouter(string route, uint id) { //Debug.Log("SendMessage ========================================> " + route); byte flag = 0; if (id > 0) { flag |= ((byte)MessageType.MSG_REQUEST) << 1; } else { flag |= ((byte)MessageType.MSG_NOTIFY) << 1; } if ("area.playerHandler.battleEventNotify" == route) { flag |= MSG_Route_Mask; buffer.WriteByte(flag); WriteShort(1, buffer); return; } var router_bytes = UTF8BytesCache.Instance.GetBytes(route); if (router_bytes.Length > MSG_Route_Limit) { throw new Exception("Route is too long!" + route); } buffer.WriteByte(flag); if (id > 0) { EncodeUInt32(id, buffer); } #if CONVERT_MSG int msgid = CommonMsg.GetInstance.GetMsgID(route); if(msgid != 0) { WriteShort((ushort)msgid, buffer); } #else //Write route length // buffer.WriteByte((byte)router_bytes.Length); //Write route buffer.Write(router_bytes, 0, router_bytes.Length); #endif } private void EncodeHead() { var oldp = buffer.Position; try { buffer.Position = 0; PkgLength = (int)(this.BufferLength - FIXED_HEAD_SIZE); buffer.WriteByte(Convert.ToByte(PkgType)); buffer.WriteByte(Convert.ToByte((PkgLength >> 16) & 0xFF)); buffer.WriteByte(Convert.ToByte((PkgLength >> 8) & 0xFF)); buffer.WriteByte(Convert.ToByte((PkgLength) & 0xFF)); lock (max_pkg_size) { if (max_pkg_size.Value < BufferLength) { max_pkg_size.Value = BufferLength; } } } finally { buffer.Position = oldp; } } } //---------------------------------------------------------------------------------------------------- public class RecvMessage : Message { private RecvMessage() { } private static ObjectPool s_RecvPool = new ObjectPool(() => { return new RecvMessage(); }); private static AtomicLong max_pkg_size = new AtomicLong(0); private static byte[] router_bytes = new byte[MSG_Route_Limit]; public static long PoolSize { get { return s_RecvPool.countAll; } } public static long PoolActive { get { return s_RecvPool.countActive; } } public static long PoolInactive { get { return s_RecvPool.countInactive; } } public static long MaxPackageSize { get { return max_pkg_size.Value; } } public System.Net.Sockets.TcpClient stateSocket; public static RecvMessage Alloc() { RecvMessage ret = s_RecvPool.Get(); ret.BufferLength = FIXED_HEAD_SIZE; ret.BufferPosition = 0; ret.MsgID = 0; ret.Route = null; ret.MsgType = MessageType.MSG_PUSH; return ret; } public override void Dispose() { this.stateSocket = null; this.BufferLength = FIXED_HEAD_SIZE; this.BufferPosition = 0; s_RecvPool.Release(this); } public MessageType MsgType { get; private set; } public uint MsgID { get; private set; } public string Route { get; internal set; } CommonLang.Log.Logger log = CommonLang.Log.LoggerFactory.GetLogger("RecvMessage"); public void DecodeHead() { var headBuffer = buffer.GetBuffer(); this.PkgType = (PackageType)headBuffer[0]; // this.PkgLength = (headBuffer[1] << 16) + (headBuffer[2] << 8) + headBuffer[3]; this.BufferLength = PkgLength + FIXED_HEAD_SIZE; } public void DecodeBody(PomeloClientAdapter adapter) { //log.Debug("=============socket======== DecodeBody"); buffer.Position = FIXED_HEAD_SIZE; //Decode head //Get flag byte flag = (byte)buffer.ReadByte(); //Get type from flag; this.MsgType = (MessageType)((flag >> 1) & MSG_Type_Mask); if (MsgType == MessageType.MSG_RESPONSE) { this.MsgID = DecodeUInt32(buffer); } else if (MsgType == MessageType.MSG_PUSH) { //Get route string r; if (adapter.resolve_route(this, flag, out r)) { this.Route = r; } else { #if CONVERT_MSG lock (router_bytes) { byte h = (byte)buffer.ReadByte(); byte l = (byte)buffer.ReadByte(); int msgid = ((h & 0xff) << 8) | (l & 0xff); this.Route = CommonMsg.GetInstance.GetMsgStr(msgid); } #else byte length = (byte)buffer.ReadByte(); lock (router_bytes) { buffer.Read(router_bytes, 0, length); this.Route = UTF8.GetString(router_bytes, 0, length); } #endif } } lock (max_pkg_size) { if (max_pkg_size.Value < BufferLength) { max_pkg_size.Value = BufferLength; } } //if(string.IsNullOrEmpty(this.Route) == false) //{ // Debug.Log("RecvMessage ================> " + this.Route); //} } public object ReadBody(MessageDecoder decoder) { int cur_pos = BufferPosition; try { var decode = decoder(Route, Stream); return decode; } finally { this.BufferPosition = cur_pos; } } public byte[] ReadBody() { int cur_pos = BufferPosition; try { return IOUtil.ReadExpect(buffer, BufferLength - BufferPosition); } finally { this.BufferPosition = cur_pos; } } public object ReadBody(ISerializer serializer, Type type) { int cur_pos = BufferPosition; try { return serializer.Deserialize(Stream, null, type); } finally { this.BufferPosition = cur_pos; } } } //---------------------------------------------------------------------------------------------------- public class LocalNetStatus : Message { private NetWorkState mStatus; private LocalNetStatus(NetWorkState st) { mStatus = st; } public static LocalNetStatus Alloc(NetWorkState st) { return new LocalNetStatus(st); } public override void Dispose() { } public NetWorkState St { get { return mStatus; } } } }