Message.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. #define CONVERT_MSG
  2. using CommonLang;
  3. using CommonLang.Concurrent;
  4. using CommonLang.IO;
  5. using SimpleJson;
  6. using System;
  7. using System.Collections.Generic;
  8. using System.IO;
  9. using System.Text;
  10. namespace Pomelo.DotNetClient
  11. {
  12. public enum PackageType
  13. {
  14. PKG_HANDSHAKE = 1,
  15. PKG_HANDSHAKE_ACK = 2,
  16. PKG_HEARTBEAT = 3,
  17. PKG_DATA = 4,
  18. PKG_KICK = 5
  19. }
  20. public enum MessageType
  21. {
  22. MSG_REQUEST = 0,
  23. MSG_NOTIFY = 1,
  24. MSG_RESPONSE = 2,
  25. MSG_PUSH = 3
  26. }
  27. //----------------------------------------------------------------------------------------------------
  28. public abstract class Message : IDisposable
  29. {
  30. public static Encoding UTF8 = Encoding.UTF8;
  31. public const int FIXED_HEAD_SIZE = 4;
  32. public const int MSG_Route_Limit = 255;
  33. public const int MSG_Route_Mask = 0x01;
  34. public const int MSG_Type_Mask = 0x07;
  35. protected readonly MemoryStream buffer = new MemoryStream(1024);
  36. public MemoryStream Stream
  37. {
  38. get { return buffer; }
  39. }
  40. public byte[] Buffer
  41. {
  42. get { return buffer.GetBuffer(); }
  43. }
  44. public int BufferPosition
  45. {
  46. get { return (int)buffer.Position; }
  47. set { buffer.Position = value; }
  48. }
  49. public int BufferLength
  50. {
  51. get { return (int)buffer.Length; }
  52. protected set
  53. {
  54. if (value < FIXED_HEAD_SIZE)
  55. {
  56. throw new Exception("缓冲区不能小于固定长度 " + FIXED_HEAD_SIZE);
  57. }
  58. buffer.SetLength(value);
  59. }
  60. }
  61. public PackageType PkgType { get; protected set; }
  62. public int PkgLength { get; protected set; }
  63. //-----------------------------------------------------------
  64. protected Message() { }
  65. public abstract void Dispose();
  66. public override string ToString()
  67. {
  68. return string.Format("{0}:capacity={1}", GetType().Name, buffer.Capacity);
  69. }
  70. //-----------------------------------------------------------
  71. public static void WriteShort(ushort value, Stream bytes)
  72. {
  73. bytes.WriteByte((byte)(value >> 8 & 0xff));
  74. bytes.WriteByte((byte)(value & 0xff));
  75. }
  76. public static ushort ReadShort(Stream bytes)
  77. {
  78. ushort result = 0;
  79. result += (ushort)(bytes.ReadByte() << 8);
  80. result += (ushort)(bytes.ReadByte());
  81. return result;
  82. }
  83. public static uint DecodeUInt32(Stream bytes)
  84. {
  85. UInt64 value = 0;
  86. UInt64 b = 0;
  87. for (int i = 0; i <= 70; i += 7)
  88. {
  89. b = Convert.ToUInt64(bytes.ReadByte());
  90. value |= ((b & 0x7F) << i);
  91. if ((b & 0x80) == 0)
  92. break;
  93. }
  94. return (uint)value;
  95. }
  96. public static void EncodeUInt32(uint n, Stream stream)
  97. {
  98. do
  99. {
  100. uint tmp = n % 128;
  101. uint next = n >> 7;
  102. if (next != 0)
  103. {
  104. tmp = tmp + 128;
  105. }
  106. stream.WriteByte(Convert.ToByte(tmp));
  107. n = next;
  108. } while (n != 0);
  109. }
  110. //-----------------------------------------------------------
  111. }
  112. //----------------------------------------------------------------------------------------------------
  113. public class SendMessage : Message
  114. {
  115. //CommonLang.Log.Logger log = CommonLang.Log.LoggerFactory.GetLogger("SendMessage");
  116. private SendMessage() { }
  117. private static ObjectPool<SendMessage> s_SendPool = new ObjectPool<SendMessage>(
  118. () =>
  119. {
  120. return new SendMessage();
  121. });
  122. private static AtomicLong max_pkg_size = new AtomicLong(0);
  123. public static long PoolSize { get { return s_SendPool.countAll; } }
  124. public static long PoolActive { get { return s_SendPool.countActive; } }
  125. public static long PoolInactive { get { return s_SendPool.countInactive; } }
  126. public static long MaxPackageSize { get { return max_pkg_size.Value; } }
  127. public static SendMessage Alloc(string route, uint id, ISerializer serializer, object msg)
  128. {
  129. SendMessage ret = s_SendPool.Get();
  130. try
  131. {
  132. ret.BufferLength = FIXED_HEAD_SIZE;
  133. ret.BufferPosition = FIXED_HEAD_SIZE;
  134. ret.PkgType = PackageType.PKG_DATA;
  135. {
  136. ret.EncodeRouter(route, id);
  137. serializer.Serialize(ret.buffer, msg);
  138. ret.EncodeHead();
  139. }
  140. return ret;
  141. }
  142. catch
  143. {
  144. ret.Dispose();
  145. throw;
  146. }
  147. }
  148. public static SendMessage Alloc(string route, uint id, object msg, MessageEncoder encoder)
  149. {
  150. SendMessage ret = s_SendPool.Get();
  151. try
  152. {
  153. ret.BufferLength = FIXED_HEAD_SIZE;
  154. ret.BufferPosition = FIXED_HEAD_SIZE;
  155. ret.PkgType = PackageType.PKG_DATA;
  156. {
  157. ret.EncodeRouter(route, id);
  158. if (msg != null && encoder != null)
  159. {
  160. encoder.Invoke(route, msg, ret.buffer);
  161. }
  162. ret.EncodeHead();
  163. }
  164. return ret;
  165. }
  166. catch
  167. {
  168. ret.Dispose();
  169. throw;
  170. }
  171. }
  172. public static SendMessage Alloc(string route, uint id, byte[] data)
  173. {
  174. SendMessage ret = s_SendPool.Get();
  175. try
  176. {
  177. ret.BufferLength = FIXED_HEAD_SIZE;
  178. ret.BufferPosition = FIXED_HEAD_SIZE;
  179. ret.PkgType = PackageType.PKG_DATA;
  180. {
  181. ret.EncodeRouter(route, id);
  182. if (data != null)
  183. {
  184. ret.buffer.Write(data, 0, data.Length);
  185. }
  186. ret.EncodeHead();
  187. }
  188. return ret;
  189. }
  190. catch
  191. {
  192. ret.Dispose();
  193. throw;
  194. }
  195. }
  196. public static SendMessage Alloc(PackageType type, byte[] body = null)
  197. {
  198. SendMessage ret = s_SendPool.Get();
  199. try
  200. {
  201. ret.BufferLength = FIXED_HEAD_SIZE;
  202. ret.BufferPosition = FIXED_HEAD_SIZE;
  203. ret.PkgType = type;
  204. if (body != null)
  205. {
  206. ret.buffer.Write(body, 0, body.Length);
  207. }
  208. ret.EncodeHead();
  209. return ret;
  210. }
  211. catch
  212. {
  213. ret.Dispose();
  214. throw;
  215. }
  216. }
  217. public static SendMessage Alloc(string route, uint id, MemoryStream stream)
  218. {
  219. var ret = s_SendPool.Get();
  220. try
  221. {
  222. ret.BufferLength = FIXED_HEAD_SIZE;
  223. ret.BufferPosition = FIXED_HEAD_SIZE;
  224. ret.PkgType = PackageType.PKG_DATA;
  225. ret.EncodeRouter(route, id);
  226. if (stream != null)
  227. {
  228. ret.buffer.WriteByte(0);
  229. ret.buffer.WriteByte(0);
  230. ret.buffer.Write(stream.GetBuffer(), 0, (int)stream.Length);
  231. }
  232. ret.EncodeHead();
  233. return ret;
  234. }
  235. catch
  236. {
  237. ret.Dispose();
  238. throw;
  239. }
  240. }
  241. public override void Dispose()
  242. {
  243. buffer.Position = 0;
  244. s_SendPool.Release(this);
  245. }
  246. private void EncodeRouter(string route, uint id)
  247. {
  248. //Debug.Log("SendMessage ========================================> " + route);
  249. byte flag = 0;
  250. if (id > 0)
  251. {
  252. flag |= ((byte)MessageType.MSG_REQUEST) << 1;
  253. }
  254. else
  255. {
  256. flag |= ((byte)MessageType.MSG_NOTIFY) << 1;
  257. }
  258. if ("area.playerHandler.battleEventNotify" == route)
  259. {
  260. flag |= MSG_Route_Mask;
  261. buffer.WriteByte(flag);
  262. WriteShort(1, buffer);
  263. return;
  264. }
  265. var router_bytes = UTF8BytesCache.Instance.GetBytes(route);
  266. if (router_bytes.Length > MSG_Route_Limit)
  267. {
  268. throw new Exception("Route is too long!" + route);
  269. }
  270. buffer.WriteByte(flag);
  271. if (id > 0)
  272. {
  273. EncodeUInt32(id, buffer);
  274. }
  275. #if CONVERT_MSG
  276. int msgid = CommonMsg.GetInstance.GetMsgID(route);
  277. if(msgid != 0)
  278. {
  279. WriteShort((ushort)msgid, buffer);
  280. }
  281. #else
  282. //Write route length
  283. //
  284. buffer.WriteByte((byte)router_bytes.Length);
  285. //Write route
  286. buffer.Write(router_bytes, 0, router_bytes.Length);
  287. #endif
  288. }
  289. private void EncodeHead()
  290. {
  291. var oldp = buffer.Position;
  292. try
  293. {
  294. buffer.Position = 0;
  295. PkgLength = (int)(this.BufferLength - FIXED_HEAD_SIZE);
  296. buffer.WriteByte(Convert.ToByte(PkgType));
  297. buffer.WriteByte(Convert.ToByte((PkgLength >> 16) & 0xFF));
  298. buffer.WriteByte(Convert.ToByte((PkgLength >> 8) & 0xFF));
  299. buffer.WriteByte(Convert.ToByte((PkgLength) & 0xFF));
  300. lock (max_pkg_size)
  301. {
  302. if (max_pkg_size.Value < BufferLength)
  303. {
  304. max_pkg_size.Value = BufferLength;
  305. }
  306. }
  307. }
  308. finally
  309. {
  310. buffer.Position = oldp;
  311. }
  312. }
  313. }
  314. //----------------------------------------------------------------------------------------------------
  315. public class RecvMessage : Message
  316. {
  317. private RecvMessage() { }
  318. private static ObjectPool<RecvMessage> s_RecvPool = new ObjectPool<RecvMessage>(() => { return new RecvMessage(); });
  319. private static AtomicLong max_pkg_size = new AtomicLong(0);
  320. private static byte[] router_bytes = new byte[MSG_Route_Limit];
  321. public static long PoolSize { get { return s_RecvPool.countAll; } }
  322. public static long PoolActive { get { return s_RecvPool.countActive; } }
  323. public static long PoolInactive { get { return s_RecvPool.countInactive; } }
  324. public static long MaxPackageSize { get { return max_pkg_size.Value; } }
  325. public System.Net.Sockets.TcpClient stateSocket;
  326. public static RecvMessage Alloc()
  327. {
  328. RecvMessage ret = s_RecvPool.Get();
  329. ret.BufferLength = FIXED_HEAD_SIZE;
  330. ret.BufferPosition = 0;
  331. ret.MsgID = 0;
  332. ret.Route = null;
  333. ret.MsgType = MessageType.MSG_PUSH;
  334. return ret;
  335. }
  336. public override void Dispose()
  337. {
  338. this.stateSocket = null;
  339. this.BufferLength = FIXED_HEAD_SIZE;
  340. this.BufferPosition = 0;
  341. s_RecvPool.Release(this);
  342. }
  343. public MessageType MsgType { get; private set; }
  344. public uint MsgID { get; private set; }
  345. public string Route { get; internal set; }
  346. CommonLang.Log.Logger log = CommonLang.Log.LoggerFactory.GetLogger("RecvMessage");
  347. public void DecodeHead()
  348. {
  349. var headBuffer = buffer.GetBuffer();
  350. this.PkgType = (PackageType)headBuffer[0];
  351. //
  352. this.PkgLength = (headBuffer[1] << 16) + (headBuffer[2] << 8) + headBuffer[3];
  353. this.BufferLength = PkgLength + FIXED_HEAD_SIZE;
  354. }
  355. public void DecodeBody(PomeloClientAdapter adapter)
  356. {
  357. //log.Debug("=============socket======== DecodeBody");
  358. buffer.Position = FIXED_HEAD_SIZE;
  359. //Decode head
  360. //Get flag
  361. byte flag = (byte)buffer.ReadByte();
  362. //Get type from flag;
  363. this.MsgType = (MessageType)((flag >> 1) & MSG_Type_Mask);
  364. if (MsgType == MessageType.MSG_RESPONSE)
  365. {
  366. this.MsgID = DecodeUInt32(buffer);
  367. }
  368. else if (MsgType == MessageType.MSG_PUSH)
  369. {
  370. //Get route
  371. string r;
  372. if (adapter.resolve_route(this, flag, out r))
  373. {
  374. this.Route = r;
  375. }
  376. else
  377. {
  378. #if CONVERT_MSG
  379. lock (router_bytes)
  380. {
  381. byte h = (byte)buffer.ReadByte();
  382. byte l = (byte)buffer.ReadByte();
  383. int msgid = ((h & 0xff) << 8) | (l & 0xff);
  384. this.Route = CommonMsg.GetInstance.GetMsgStr(msgid);
  385. }
  386. #else
  387. byte length = (byte)buffer.ReadByte();
  388. lock (router_bytes)
  389. {
  390. buffer.Read(router_bytes, 0, length);
  391. this.Route = UTF8.GetString(router_bytes, 0, length);
  392. }
  393. #endif
  394. }
  395. }
  396. lock (max_pkg_size)
  397. {
  398. if (max_pkg_size.Value < BufferLength)
  399. {
  400. max_pkg_size.Value = BufferLength;
  401. }
  402. }
  403. //if(string.IsNullOrEmpty(this.Route) == false)
  404. //{
  405. // Debug.Log("RecvMessage ================> " + this.Route);
  406. //}
  407. }
  408. public object ReadBody(MessageDecoder decoder)
  409. {
  410. int cur_pos = BufferPosition;
  411. try
  412. {
  413. var decode = decoder(Route, Stream);
  414. return decode;
  415. }
  416. finally
  417. {
  418. this.BufferPosition = cur_pos;
  419. }
  420. }
  421. public byte[] ReadBody()
  422. {
  423. int cur_pos = BufferPosition;
  424. try
  425. {
  426. return IOUtil.ReadExpect(buffer, BufferLength - BufferPosition);
  427. }
  428. finally
  429. {
  430. this.BufferPosition = cur_pos;
  431. }
  432. }
  433. public object ReadBody(ISerializer serializer, Type type)
  434. {
  435. int cur_pos = BufferPosition;
  436. try
  437. {
  438. return serializer.Deserialize(Stream, null, type);
  439. }
  440. finally
  441. {
  442. this.BufferPosition = cur_pos;
  443. }
  444. }
  445. }
  446. //----------------------------------------------------------------------------------------------------
  447. public class LocalNetStatus : Message
  448. {
  449. private NetWorkState mStatus;
  450. private LocalNetStatus(NetWorkState st) { mStatus = st; }
  451. public static LocalNetStatus Alloc(NetWorkState st)
  452. {
  453. return new LocalNetStatus(st);
  454. }
  455. public override void Dispose()
  456. {
  457. }
  458. public NetWorkState St { get { return mStatus; } }
  459. }
  460. }