using System; using System.Collections.Generic; using System.Linq; using System.Text; using Slice; using System.Threading; using CommonNetwork_ICE.Util; using CommonNetwork_ICE.Common; using System.Collections.Concurrent; using System.Collections; using CommonLang.Log; namespace CommonServer_ICE.Msg { /// <summary> /// 服务器端可靠消息包循环检测重新发送处理类 /// </summary> internal class SeverSendMsgQueue { private static Logger log = LoggerFactory.GetLogger("SeverSendMsgQueue"); private Thread thread; // 是否运行线程 private bool runFlag; // 消息包存储哈希表 private Hashtable map; public SeverSendMsgQueue(Hashtable map) { this.map = map; } public void Start() { thread = new Thread(Run); runFlag = true; thread.Start(); } public void End() { runFlag = false; if (thread != null) { thread.Join(); thread.Abort(); } } /// <summary> /// 加入新消息 /// </summary> /// <param name="message"></param> public void AddPacket(MsgSessionPacket packet) { lock (packet.ID) { LinkedList<MsgSessionPacket> packetList = (LinkedList<MsgSessionPacket>)map[packet.ID]; if (packetList == null) { packetList = new LinkedList<MsgSessionPacket>(); packetList.AddFirst(packet); map.Add(packet.ID, packetList); return; } packetList.AddLast(packet); } } /// <summary> /// 消息发送 /// </summary> private void Run() { while (runFlag) { try { if (map.Count == 0) { // 等待下一轮 Thread.Sleep(Env.SEND_WAIT_ACT_THREAD_SLEEP_TIME); continue; } LinkedList<MsgSessionPacket>[] al = new LinkedList<MsgSessionPacket>[map.Count]; map.Values.CopyTo(al, 0); // 循环每一个会话的消息队列,将队列头取出,判断是否收到应答序号,若收到则删除消息,然后发送下一条消息,未收到重新发送 foreach (LinkedList<MsgSessionPacket> packetList in al) { if (packetList.Count == 0) { continue; } MsgSessionPacket packet = packetList.First.Value; lock (packet.ID) { if (packet.Session == null || !packet.Session.IsConnected) { // 如果跟当前消息包的会话已经销毁或者已经断开连接,放弃该会话所有关键消息包 map.Remove(packet.ID); continue; } if (packet.Session.SendedRecvLastSerial == packet.Session.SendedLastSerial) { // 如果当前消息包已经收到应答,删除该消息包 if (packet.Message.serial == packet.Session.SendedRecvLastSerial) { packetList.RemoveFirst(); } // 如果有下一条 if (packetList.Count > 0) { // 发送下一条关键包 packet = packetList.First.Value; bool successed = packet.Session.SendTo(packet.Message); if (successed) { packet.Session.LastHingeSendTime = DateTime.Now; packet.Session.SendedLastSerial = packet.Message.serial; packet.Session.NotifySentMsg(packet.SrcMessage); } } } else { // 超时时间外,重新发送数据 if (packet.Session.LastHingeSendTime.AddSeconds(Env.SEND_WAIT_ACK_TIME_OUT).CompareTo(DateTime.Now) < 0) { log.Trace("没收到关键包应答,服务器重发数据:" + packet.Message.serial); bool successed = packet.Session.SendTo(packet.Message); if (successed) { packet.Session.LastHingeSendTime = DateTime.Now; } } } } } } catch (Exception err) { log.Error(err.Message, err); } } } } }