using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Slice;
using System.Threading;
using CommonNetwork_ICE.Util;
using CommonNetwork_ICE.Common;
using System.Collections.Concurrent;
using System.Collections;
using CommonLang.Log;
namespace CommonServer_ICE.Msg
{
///
/// 服务器端可靠消息包循环检测重新发送处理类
///
internal class SeverSendMsgQueue
{
private static Logger log = LoggerFactory.GetLogger("SeverSendMsgQueue");
private Thread thread;
// 是否运行线程
private bool runFlag;
// 消息包存储哈希表
private Hashtable map;
public SeverSendMsgQueue(Hashtable map)
{
this.map = map;
}
public void Start()
{
thread = new Thread(Run);
runFlag = true;
thread.Start();
}
public void End()
{
runFlag = false;
if (thread != null)
{
thread.Join();
thread.Abort();
}
}
///
/// 加入新消息
///
///
public void AddPacket(MsgSessionPacket packet)
{
lock (packet.ID)
{
LinkedList packetList = (LinkedList)map[packet.ID];
if (packetList == null)
{
packetList = new LinkedList();
packetList.AddFirst(packet);
map.Add(packet.ID, packetList);
return;
}
packetList.AddLast(packet);
}
}
///
/// 消息发送
///
private void Run()
{
while (runFlag)
{
try
{
if (map.Count == 0)
{
// 等待下一轮
Thread.Sleep(Env.SEND_WAIT_ACT_THREAD_SLEEP_TIME);
continue;
}
LinkedList[] al = new LinkedList[map.Count];
map.Values.CopyTo(al, 0);
// 循环每一个会话的消息队列,将队列头取出,判断是否收到应答序号,若收到则删除消息,然后发送下一条消息,未收到重新发送
foreach (LinkedList packetList in al)
{
if (packetList.Count == 0)
{
continue;
}
MsgSessionPacket packet = packetList.First.Value;
lock (packet.ID)
{
if (packet.Session == null || !packet.Session.IsConnected)
{
// 如果跟当前消息包的会话已经销毁或者已经断开连接,放弃该会话所有关键消息包
map.Remove(packet.ID);
continue;
}
if (packet.Session.SendedRecvLastSerial == packet.Session.SendedLastSerial)
{
// 如果当前消息包已经收到应答,删除该消息包
if (packet.Message.serial == packet.Session.SendedRecvLastSerial)
{
packetList.RemoveFirst();
}
// 如果有下一条
if (packetList.Count > 0)
{
// 发送下一条关键包
packet = packetList.First.Value;
bool successed = packet.Session.SendTo(packet.Message);
if (successed)
{
packet.Session.LastHingeSendTime = DateTime.Now;
packet.Session.SendedLastSerial = packet.Message.serial;
packet.Session.NotifySentMsg(packet.SrcMessage);
}
}
}
else
{
// 超时时间外,重新发送数据
if (packet.Session.LastHingeSendTime.AddSeconds(Env.SEND_WAIT_ACK_TIME_OUT).CompareTo(DateTime.Now) < 0)
{
log.Trace("没收到关键包应答,服务器重发数据:" + packet.Message.serial);
bool successed = packet.Session.SendTo(packet.Message);
if (successed)
{
packet.Session.LastHingeSendTime = DateTime.Now;
}
}
}
}
}
}
catch (Exception err)
{
log.Error(err.Message, err);
}
}
}
}
}