FastStream_old.cs 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867
  1. using CommonLang.Log;
  2. using System;
  3. using System.Collections.Concurrent;
  4. using System.Collections.Generic;
  5. using System.IO;
  6. using System.Net;
  7. using System.Net.Sockets;
  8. using System.Text;
  9. using System.Threading;
  10. using System.Threading.Tasks;
  11. namespace Pomelo
  12. {
  13. /// <summary>
  14. /// morefunFastStream管理器
  15. /// </summary>
  16. public class FastStream_old : FastStream
  17. {
  18. /*-------------------------------------Composer--------------------------------------------------------*/
  19. /// <summary>
  20. /// 数据合成bytes
  21. /// </summary>
  22. class Composer
  23. {
  24. private const int LEFT_SHIFT_BITS = 1 << 7;
  25. private MemoryStream stream;
  26. public Composer(string uid,string instanceId, ArraySegment<byte> data)
  27. {
  28. byte[] bytesUid = System.Text.UTF8Encoding.UTF8.GetBytes(uid);
  29. byte[] bytesInstanceId = System.Text.UTF8Encoding.UTF8.GetBytes(instanceId);
  30. int contentSize = 8 + bytesUid.Length + bytesInstanceId.Length + data.Count;
  31. int lengthSize = calLengthSize(contentSize);
  32. this.stream = new MemoryStream(lengthSize + contentSize);
  33. //composer head
  34. writeLength(contentSize, lengthSize);
  35. //协议 head
  36. writeU16((ushort)bytesUid.Length);
  37. writeU16((ushort)bytesInstanceId.Length);
  38. writeU32((uint)data.Count);
  39. //uid
  40. writeBytes(bytesUid, 0, bytesUid.Length);
  41. //instanceId
  42. writeBytes(bytesInstanceId, 0, bytesInstanceId.Length);
  43. //data
  44. writeBytes(data.Array, data.Offset, data.Count);
  45. }
  46. public byte[] getBytes()
  47. {
  48. return stream.GetBuffer();
  49. }
  50. public static int calLengthSize(int length)
  51. {
  52. int res = 0;
  53. while (length > 0)
  54. {
  55. length = length >> 7;
  56. res++;
  57. }
  58. return res;
  59. }
  60. public void writeLength(int data, int size)
  61. {
  62. int offset = size - 1, b;
  63. byte[] bytes = new byte[size];
  64. for (; offset >= 0; offset--)
  65. {
  66. b = data % LEFT_SHIFT_BITS;
  67. if (offset < size - 1)
  68. {
  69. b |= 0x80;
  70. }
  71. bytes[offset] = (byte)b;
  72. data = data >> 7;
  73. }
  74. stream.Write(bytes, 0, bytes.Length);
  75. }
  76. public void writeU16(UInt16 value)
  77. {
  78. stream.WriteByte((byte)(value));
  79. stream.WriteByte((byte)(value >> 8));
  80. }
  81. public void writeU32(UInt32 value)
  82. {
  83. stream.WriteByte((byte)(value));
  84. stream.WriteByte((byte)(value >> 8));
  85. stream.WriteByte((byte)(value >> 16));
  86. stream.WriteByte((byte)(value >> 24));
  87. }
  88. public void writeBytes(byte[] value, int offset, int length)
  89. {
  90. stream.Write(value, offset, length);
  91. }
  92. }
  93. /*-------------------------------------Transporter--------------------------------------------------------*/
  94. /// <summary>
  95. /// 数据传输协议
  96. /// </summary>
  97. class Transporter
  98. {
  99. /// <summary>
  100. /// 传输状态
  101. /// </summary>
  102. enum TransportState
  103. {
  104. readHead = 1, // on read head
  105. readBody = 2, // on read body
  106. closed = 3 // connection closed, will ignore all the message and wait for clean up
  107. }
  108. /*-------------------------------------TransportState--------------------------------------------------------*/
  109. //读取buffer
  110. class StateObject
  111. {
  112. public const int BufferSize = 1024;
  113. internal byte[] buffer = new byte[BufferSize];
  114. }
  115. public const int HeadLength = 6;
  116. private TcpClient socket;
  117. private Action<FastStreamRequest> messageProcesser;
  118. //Used for get message
  119. private StateObject stateObject = new StateObject();
  120. private TransportState transportState;
  121. private IAsyncResult asyncReceive;
  122. private IAsyncResult asyncSend;
  123. // private bool onSending = false;
  124. // private bool onReceiving = false;
  125. private byte[] headBuffer = new byte[HeadLength];
  126. private byte[] buffer;
  127. private int bufferOffset = 0;
  128. private int pkgLength = 0;
  129. internal Action onDisconnect = null;
  130. /// <summary>
  131. /// 日志
  132. /// </summary>
  133. private Logger log = LoggerFactory.GetLogger("Transporter");
  134. public bool Connected
  135. {
  136. get { return this.socket.Connected; }
  137. }
  138. public Transporter(TcpClient socket, Action<FastStreamRequest> processer)
  139. {
  140. this.socket = socket;
  141. this.socket.NoDelay = true;
  142. this.messageProcesser = processer;
  143. transportState = TransportState.readHead;
  144. }
  145. public void start()
  146. {
  147. this.receive();
  148. }
  149. public void send(byte[] buffer)
  150. {
  151. if (this.transportState != TransportState.closed)
  152. {
  153. this.asyncSend = socket.GetStream().BeginWrite(buffer, 0, buffer.Length, new AsyncCallback(sendCallback), socket);
  154. // this.onSending = true;
  155. }
  156. }
  157. private void sendCallback(IAsyncResult asyncSend)
  158. {
  159. if (this.transportState == TransportState.closed)
  160. return;
  161. socket.GetStream().EndWrite(asyncSend);
  162. // Console.WriteLine("socket send end:"+ CUtils.CurrentTimeMS);
  163. // this.onSending = false;
  164. }
  165. public void receive()
  166. {
  167. this.asyncReceive = socket.GetStream().BeginRead(stateObject.buffer, 0, stateObject.buffer.Length, new AsyncCallback(endReceive), stateObject);
  168. // this.onReceiving = true;
  169. }
  170. internal void close()
  171. {
  172. this.transportState = TransportState.closed;
  173. /*try{
  174. if(this.onReceiving) socket.EndReceive (this.asyncReceive);
  175. if(this.onSending) socket.EndSend(this.asyncSend);
  176. }catch (Exception e){
  177. Console.WriteLine(e.Message);
  178. }*/
  179. }
  180. private void endReceive(IAsyncResult asyncReceive)
  181. {
  182. if (this.transportState == TransportState.closed)
  183. return;
  184. StateObject state = (StateObject)asyncReceive.AsyncState;
  185. TcpClient socket = this.socket;
  186. try
  187. {
  188. int length = socket.GetStream().EndRead(asyncReceive);
  189. // this.onReceiving = false;
  190. if (length > 0)
  191. {
  192. processBytes(state.buffer, 0, length);
  193. //Receive next message
  194. if (this.transportState != TransportState.closed)
  195. receive();
  196. }
  197. else
  198. {
  199. if (this.onDisconnect != null)
  200. this.onDisconnect();
  201. }
  202. }
  203. catch (Exception e)
  204. {
  205. log.Error(e.Message, e);
  206. if (this.onDisconnect != null)
  207. this.onDisconnect();
  208. }
  209. }
  210. /// <summary>
  211. /// 处理数据
  212. /// </summary>
  213. /// <param name="bytes"></param>
  214. /// <param name="offset"></param>
  215. /// <param name="limit"></param>
  216. internal void processBytes(byte[] bytes, int offset, int limit)
  217. {
  218. if (this.transportState == TransportState.readHead)
  219. {
  220. readHead(bytes, offset, limit);
  221. }
  222. else if (this.transportState == TransportState.readBody)
  223. {
  224. readBody(bytes, offset, limit);
  225. }
  226. }
  227. /// <summary>
  228. /// 从包头获取包长度
  229. /// </summary>
  230. /// <param name="header"></param>
  231. /// <returns></returns>
  232. private int getBodyLengthFromHeader(byte[] header)
  233. {
  234. try
  235. {
  236. BinaryReader headReader = new BinaryReader(new MemoryStream(header, 0, HeadLength));
  237. int idLength = headReader.ReadInt16();
  238. int dataLength = headReader.ReadInt32();
  239. return idLength + dataLength;
  240. }
  241. catch (Exception e)
  242. {
  243. log.Error(e.Message, e);
  244. }
  245. return 0;
  246. }
  247. /// <summary>
  248. /// 读取包头
  249. /// </summary>
  250. /// <param name="bytes"></param>
  251. /// <param name="offset"></param>
  252. /// <param name="limit"></param>
  253. /// <returns></returns>
  254. private bool readHead(byte[] bytes, int offset, int limit)
  255. {
  256. int length = limit - offset;
  257. int headNum = HeadLength - bufferOffset;
  258. if (length >= headNum)
  259. {
  260. Buffer.BlockCopy(bytes, offset, headBuffer, bufferOffset, headNum);
  261. //Get package length
  262. pkgLength = getBodyLengthFromHeader(headBuffer);
  263. //Init message buffer
  264. buffer = new byte[HeadLength + pkgLength];
  265. Buffer.BlockCopy(headBuffer, 0, buffer, 0, HeadLength);
  266. offset += headNum;
  267. bufferOffset = HeadLength;
  268. this.transportState = TransportState.readBody;
  269. if (offset <= limit) processBytes(bytes, offset, limit);
  270. return true;
  271. }
  272. else
  273. {
  274. Buffer.BlockCopy(bytes, offset, headBuffer, bufferOffset, length);
  275. bufferOffset += length;
  276. return false;
  277. }
  278. }
  279. /// <summary>
  280. /// 从byte转换为request
  281. /// </summary>
  282. /// <param name="header"></param>
  283. /// <param name="buffer"></param>
  284. /// <returns></returns>
  285. private FastStreamRequest resolveRequestInfo(byte[] header, byte[] buffer)
  286. {
  287. try
  288. {
  289. BinaryReader headReader = new BinaryReader(new MemoryStream(header, 0, HeadLength));
  290. int idLength = headReader.ReadInt16();
  291. int dataLength = headReader.ReadInt32();
  292. BinaryReader bodyReader = new BinaryReader(new MemoryStream(buffer, HeadLength, buffer.Length - HeadLength));
  293. string id = Encoding.UTF8.GetString(bodyReader.ReadBytes(idLength));
  294. return new FastStreamRequest(id, bodyReader.ReadBytes(dataLength));
  295. }
  296. catch (Exception e)
  297. {
  298. log.Error(e.Message, e);
  299. }
  300. return null;
  301. }
  302. /// <summary>
  303. /// 读取包体
  304. /// </summary>
  305. /// <param name="bytes"></param>
  306. /// <param name="offset"></param>
  307. /// <param name="limit"></param>
  308. private void readBody(byte[] bytes, int offset, int limit)
  309. {
  310. int length = pkgLength + HeadLength - bufferOffset;
  311. if ((offset + length) <= limit)
  312. {
  313. Buffer.BlockCopy(bytes, offset, buffer, bufferOffset, length);
  314. offset += length;
  315. //回调消息处理函数
  316. this.messageProcesser.Invoke(resolveRequestInfo(headBuffer, buffer));
  317. this.bufferOffset = 0;
  318. this.pkgLength = 0;
  319. if (this.transportState != TransportState.closed)
  320. this.transportState = TransportState.readHead;
  321. if (offset < limit)
  322. processBytes(bytes, offset, limit);
  323. }
  324. else
  325. {
  326. Buffer.BlockCopy(bytes, offset, buffer, bufferOffset, limit - offset);
  327. bufferOffset += limit - offset;
  328. this.transportState = TransportState.readBody;
  329. }
  330. }
  331. private void print(byte[] bytes, int offset, int length)
  332. {
  333. for (int i = offset; i < length; i++)
  334. Console.Write(Convert.ToString(bytes[i], 16) + " ");
  335. Console.WriteLine();
  336. }
  337. }
  338. /*-------------------------------------FastStreamRequest--------------------------------------------------------*/
  339. /// <summary>
  340. /// fastStream 请求结构
  341. /// </summary>
  342. class FastStreamRequest
  343. {
  344. private string playerId;
  345. private byte[] data;
  346. public string PlayerId
  347. {
  348. get { return playerId; }
  349. }
  350. public byte[] Data
  351. {
  352. get { return data; }
  353. }
  354. public FastStreamRequest(string playerId, byte[] data)
  355. {
  356. this.playerId = playerId;
  357. this.data = data;
  358. }
  359. }
  360. /*-------------------------------------FastStreamSession--------------------------------------------------------*/
  361. /// <summary>
  362. /// session
  363. /// </summary>
  364. class FastStreamSession : IFastSession
  365. {
  366. /// <summary>
  367. /// connector 服务器id
  368. /// </summary>
  369. private string connectorId;
  370. /// <summary>
  371. /// 发送队列
  372. /// </summary>
  373. private Queue<object> sendQueue = new Queue<object>();
  374. /// <summary>
  375. /// 数据传输协议
  376. /// </summary>
  377. private Transporter transporter;
  378. /// <summary>
  379. /// 发送线程
  380. /// </summary>
  381. private Thread sendThread;
  382. /// <summary>
  383. /// 新请求接收事件
  384. /// </summary>
  385. public delegate void NewRequestReceivedHandler(FastStreamSession session, FastStreamRequest request);
  386. public event NewRequestReceivedHandler NewRequestReceived;
  387. /// <summary>
  388. /// session关闭事件
  389. /// </summary>
  390. public delegate void SessionClosedHandler();
  391. public event SessionClosedHandler SessionClosed;
  392. /// <summary>
  393. /// 日志
  394. /// </summary>
  395. private Logger log = LoggerFactory.GetLogger("FastStreamSession");
  396. public FastStreamSession(TcpClient client)
  397. {
  398. this.transporter = new Transporter(client, (request) =>
  399. {
  400. NewRequestReceived(this, request);
  401. });
  402. this.transporter.onDisconnect = () =>
  403. {
  404. OnSessionClosed("");
  405. };
  406. }
  407. public void start()
  408. {
  409. OnSessionStarted();
  410. this.transporter.start();
  411. }
  412. public void doClose()
  413. {
  414. }
  415. protected void OnSessionStarted()
  416. {
  417. // Logger.Debug("OnSessionStarted");
  418. this.sendThread = new Thread(new ThreadStart(this.runSend));
  419. this.sendThread.IsBackground = true;
  420. this.sendThread.Start();
  421. }
  422. protected void OnSessionClosed(string reason)
  423. {
  424. //清理
  425. lock (this.sendQueue)
  426. {
  427. this.sendQueue.Clear();
  428. }
  429. if (this.sendThread != null)
  430. {
  431. try
  432. {
  433. this.sendThread.Join(1000);
  434. }
  435. catch (Exception err)
  436. {
  437. log.Error(err.Message, err);
  438. }
  439. this.sendThread = null;
  440. }
  441. SessionClosed();
  442. }
  443. /// <summary>
  444. /// 发送数据,目前是通过消息队列来实现
  445. /// </summary>
  446. /// <param name="data"></param>
  447. /// <param name="offset"></param>
  448. /// <param name="length"></param>
  449. public void Send(byte[] data, int offset, int length)
  450. {
  451. lock (this.sendQueue)
  452. {
  453. this.sendQueue.Enqueue(data);
  454. // 通知写线程开始工作。
  455. Monitor.PulseAll(this.sendQueue);
  456. }
  457. }
  458. /// <summary>
  459. /// 发送线程
  460. /// </summary>
  461. private void runSend()
  462. {
  463. while (transporter.Connected)
  464. {
  465. byte[] data = null;
  466. lock (this.sendQueue)
  467. {
  468. //if (this.sendQueue.Count >= 100)
  469. //{
  470. // log.Error("begin send -- this.sendQueue.Count :" + this.sendQueue.Count);
  471. //}
  472. if (this.sendQueue.Count > 0)
  473. {
  474. data = (byte[])this.sendQueue.Dequeue();
  475. }
  476. else
  477. {
  478. Monitor.Wait(this.sendQueue, 100);
  479. }
  480. }
  481. if (data != null)
  482. {
  483. try
  484. {
  485. transporter.send(data);
  486. }
  487. catch (Exception e)
  488. {
  489. log.Error(e.Message, e);
  490. }
  491. }
  492. }
  493. log.Error("send thread exit!");
  494. }
  495. public bool IsConnected()
  496. {
  497. return false;
  498. }
  499. public string GetDescribe()
  500. {
  501. return "";
  502. }
  503. public string ConnectorId
  504. {
  505. get { return connectorId; }
  506. set { connectorId = value; }
  507. }
  508. }
  509. /*-------------------------------------FastStreamServer--------------------------------------------------------*/
  510. /// <summary>
  511. /// server
  512. /// </summary>
  513. class FastStreamServer
  514. {
  515. public delegate void SessionClosedHandler(FastStreamSession session, string reason);
  516. public event SessionClosedHandler SessionClosed;
  517. public delegate void NewRequestReceivedHandler(FastStreamSession session, FastStreamRequest request);
  518. public event NewRequestReceivedHandler NewRequestReceived;
  519. private TcpListener listener;
  520. private List<FastStreamSession> clients = new List<FastStreamSession>();
  521. // private bool disposed = false;
  522. private bool isRunning;
  523. public void Setup(int port)
  524. {
  525. listener = new TcpListener(IPAddress.Any, port);
  526. //listener.AllowNatTraversal(true);
  527. }
  528. public void Start()
  529. {
  530. if (!isRunning)
  531. {
  532. isRunning = true;
  533. listener.Start();
  534. listener.BeginAcceptTcpClient(
  535. new AsyncCallback(HandleTcpClientAccepted), listener);
  536. }
  537. }
  538. private void HandleTcpClientAccepted(IAsyncResult ar)
  539. {
  540. if (isRunning)
  541. {
  542. TcpListener tcpListener = (TcpListener)ar.AsyncState;
  543. TcpClient tcpClient = tcpListener.EndAcceptTcpClient(ar);
  544. FastStreamSession internalClient = new FastStreamSession(tcpClient);
  545. internalClient.NewRequestReceived += (client, request) =>
  546. {
  547. this.NewRequestReceived(client, request);
  548. };
  549. internalClient.SessionClosed += () =>
  550. {
  551. lock (this.clients)
  552. {
  553. this.clients.Remove(internalClient);
  554. }
  555. this.SessionClosed(internalClient, "");
  556. };
  557. internalClient.start();
  558. lock (this.clients)
  559. {
  560. this.clients.Add(internalClient);
  561. // RaiseClientConnected(tcpClient);
  562. }
  563. tcpListener.BeginAcceptTcpClient(
  564. new AsyncCallback(HandleTcpClientAccepted), ar.AsyncState);
  565. }
  566. }
  567. public void Stop()
  568. {
  569. }
  570. }
  571. /// <summary>
  572. /// 网络服务
  573. /// </summary>
  574. private FastStreamServer server = new FastStreamServer();
  575. /// <summary>
  576. /// 网络连接
  577. /// </summary>
  578. ///
  579. private ConcurrentDictionary<string, FastStreamSession> sessions = new ConcurrentDictionary<string, FastStreamSession>();
  580. /// <summary>
  581. /// 日志
  582. /// </summary>
  583. private Logger log = LoggerFactory.GetLogger("MorefunFastStream");
  584. private IZone zone = null;
  585. /// <summary>
  586. /// 初始化
  587. /// </summary>
  588. /// <param name="config">配置</param>
  589. public override void Start(FastStreamConfig config, IZone zone)
  590. {
  591. log.Info("start on port:" + config.port);
  592. this.zone = zone;
  593. //监听事件
  594. this.server.Setup(config.port);
  595. this.server.NewRequestReceived += appServer_NewRequestReceived;
  596. this.server.SessionClosed += server_SessionClosed;
  597. //启动网络服务
  598. this.server.Start();
  599. }
  600. /// <summary>
  601. /// 网络断开
  602. /// </summary>
  603. /// <param name="session"></param>
  604. /// <param name="value"></param>
  605. private void server_SessionClosed(FastStreamSession session, string reason)
  606. {
  607. log.Error("session closed:" + session.ConnectorId + ",reason:" + reason);
  608. FastStreamSession outValue;
  609. sessions.TryRemove(session.ConnectorId, out outValue);
  610. }
  611. /// <summary>
  612. /// 收到数据包
  613. /// </summary>
  614. /// <param name="session"></param>
  615. /// <param name="requestInfo"></param>
  616. private void appServer_NewRequestReceived(FastStreamSession session, FastStreamRequest requestInfo)
  617. {
  618. try
  619. {
  620. //注意要捕获异常
  621. if (requestInfo.PlayerId.Equals("connetorId"))
  622. {
  623. //connetorId为握手协议,表示了session的身份
  624. string connetorId = Encoding.UTF8.GetString(requestInfo.Data);
  625. log.Info(connetorId + " connected!");
  626. session.ConnectorId = connetorId;
  627. sessions.AddOrUpdate(connetorId, session, (key, oldValue) => session);
  628. //test 回复
  629. //byte[] sendData = new byte[90]
  630. //{
  631. //110, 200, 21, 110, 200, 21, 110, 200, 21,
  632. //110, 200, 21, 110, 200, 21, 110, 200, 21,
  633. //110, 200, 21, 110, 200, 21, 110, 200, 21,
  634. //110, 200, 21, 110, 200, 21, 110, 200, 21,
  635. //110, 200, 21, 110, 200, 21, 110, 200, 21,
  636. //110, 200, 21, 110, 200, 21, 110, 200, 21,
  637. //110, 200, 21, 110, 200, 21, 110, 200, 21,
  638. //110, 200, 21, 110, 200, 21, 110, 200, 21,
  639. //110, 200, 21, 110, 200, 21, 110, 200, 21,
  640. //110, 200, 21, 110, 200, 21, 110, 200, 21};
  641. //for (int i = 0; i < 1000; ++i)
  642. //{
  643. // Send(connetorId, "good"+i, sendData);
  644. //}
  645. }
  646. else
  647. {
  648. //发送数据到场景
  649. //Task.Run(() =>
  650. // {
  651. try
  652. {
  653. zone.PlayerReceive(requestInfo.PlayerId, requestInfo.Data);
  654. }
  655. catch (Exception e)
  656. {
  657. log.Warn(e.Message + e.StackTrace, e);
  658. }
  659. //});
  660. }
  661. }
  662. catch (Exception e)
  663. {
  664. log.Error(e.Message, e);
  665. }
  666. }
  667. /// <summary>
  668. /// 停止服务
  669. /// </summary>
  670. public override void Stop()
  671. {
  672. this.server.Stop();
  673. }
  674. //发送数据到connector服务器
  675. public override void Send(IFastSession connetorId, string uid, string instanceId,ArraySegment<byte> data)
  676. {
  677. FastStreamSession socket = connetorId as FastStreamSession;
  678. Composer composer = new Composer(uid, instanceId,data);
  679. byte[] sendData = composer.getBytes();
  680. //log.Debug("player:" + uid + " socket.Send");
  681. socket.Send(sendData, 0, sendData.Length);
  682. }
  683. public override IFastSession GetSessionByID(string sessionID)
  684. {
  685. if (!sessions.ContainsKey(sessionID))
  686. {
  687. throw new Exception("connetor server " + sessionID + " not find!");
  688. }
  689. FastStreamSession socket;
  690. if (!sessions.TryGetValue(sessionID, out socket))
  691. {
  692. throw new Exception("connetor server " + sessionID + "TryGetValue fail!");
  693. }
  694. return socket;
  695. }
  696. }
  697. }