using CommonLang.Log;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pomelo
{
///
/// morefunFastStream管理器
///
public class FastStream_old : FastStream
{
/*-------------------------------------Composer--------------------------------------------------------*/
///
/// 数据合成bytes
///
class Composer
{
private const int LEFT_SHIFT_BITS = 1 << 7;
private MemoryStream stream;
public Composer(string uid,string instanceId, ArraySegment data)
{
byte[] bytesUid = System.Text.UTF8Encoding.UTF8.GetBytes(uid);
byte[] bytesInstanceId = System.Text.UTF8Encoding.UTF8.GetBytes(instanceId);
int contentSize = 8 + bytesUid.Length + bytesInstanceId.Length + data.Count;
int lengthSize = calLengthSize(contentSize);
this.stream = new MemoryStream(lengthSize + contentSize);
//composer head
writeLength(contentSize, lengthSize);
//协议 head
writeU16((ushort)bytesUid.Length);
writeU16((ushort)bytesInstanceId.Length);
writeU32((uint)data.Count);
//uid
writeBytes(bytesUid, 0, bytesUid.Length);
//instanceId
writeBytes(bytesInstanceId, 0, bytesInstanceId.Length);
//data
writeBytes(data.Array, data.Offset, data.Count);
}
public byte[] getBytes()
{
return stream.GetBuffer();
}
public static int calLengthSize(int length)
{
int res = 0;
while (length > 0)
{
length = length >> 7;
res++;
}
return res;
}
public void writeLength(int data, int size)
{
int offset = size - 1, b;
byte[] bytes = new byte[size];
for (; offset >= 0; offset--)
{
b = data % LEFT_SHIFT_BITS;
if (offset < size - 1)
{
b |= 0x80;
}
bytes[offset] = (byte)b;
data = data >> 7;
}
stream.Write(bytes, 0, bytes.Length);
}
public void writeU16(UInt16 value)
{
stream.WriteByte((byte)(value));
stream.WriteByte((byte)(value >> 8));
}
public void writeU32(UInt32 value)
{
stream.WriteByte((byte)(value));
stream.WriteByte((byte)(value >> 8));
stream.WriteByte((byte)(value >> 16));
stream.WriteByte((byte)(value >> 24));
}
public void writeBytes(byte[] value, int offset, int length)
{
stream.Write(value, offset, length);
}
}
/*-------------------------------------Transporter--------------------------------------------------------*/
///
/// 数据传输协议
///
class Transporter
{
///
/// 传输状态
///
enum TransportState
{
readHead = 1, // on read head
readBody = 2, // on read body
closed = 3 // connection closed, will ignore all the message and wait for clean up
}
/*-------------------------------------TransportState--------------------------------------------------------*/
//读取buffer
class StateObject
{
public const int BufferSize = 1024;
internal byte[] buffer = new byte[BufferSize];
}
public const int HeadLength = 6;
private TcpClient socket;
private Action messageProcesser;
//Used for get message
private StateObject stateObject = new StateObject();
private TransportState transportState;
private IAsyncResult asyncReceive;
private IAsyncResult asyncSend;
// private bool onSending = false;
// private bool onReceiving = false;
private byte[] headBuffer = new byte[HeadLength];
private byte[] buffer;
private int bufferOffset = 0;
private int pkgLength = 0;
internal Action onDisconnect = null;
///
/// 日志
///
private Logger log = LoggerFactory.GetLogger("Transporter");
public bool Connected
{
get { return this.socket.Connected; }
}
public Transporter(TcpClient socket, Action processer)
{
this.socket = socket;
this.socket.NoDelay = true;
this.messageProcesser = processer;
transportState = TransportState.readHead;
}
public void start()
{
this.receive();
}
public void send(byte[] buffer)
{
if (this.transportState != TransportState.closed)
{
this.asyncSend = socket.GetStream().BeginWrite(buffer, 0, buffer.Length, new AsyncCallback(sendCallback), socket);
// this.onSending = true;
}
}
private void sendCallback(IAsyncResult asyncSend)
{
if (this.transportState == TransportState.closed)
return;
socket.GetStream().EndWrite(asyncSend);
// Console.WriteLine("socket send end:"+ CUtils.CurrentTimeMS);
// this.onSending = false;
}
public void receive()
{
this.asyncReceive = socket.GetStream().BeginRead(stateObject.buffer, 0, stateObject.buffer.Length, new AsyncCallback(endReceive), stateObject);
// this.onReceiving = true;
}
internal void close()
{
this.transportState = TransportState.closed;
/*try{
if(this.onReceiving) socket.EndReceive (this.asyncReceive);
if(this.onSending) socket.EndSend(this.asyncSend);
}catch (Exception e){
Console.WriteLine(e.Message);
}*/
}
private void endReceive(IAsyncResult asyncReceive)
{
if (this.transportState == TransportState.closed)
return;
StateObject state = (StateObject)asyncReceive.AsyncState;
TcpClient socket = this.socket;
try
{
int length = socket.GetStream().EndRead(asyncReceive);
// this.onReceiving = false;
if (length > 0)
{
processBytes(state.buffer, 0, length);
//Receive next message
if (this.transportState != TransportState.closed)
receive();
}
else
{
if (this.onDisconnect != null)
this.onDisconnect();
}
}
catch (Exception e)
{
log.Error(e.Message, e);
if (this.onDisconnect != null)
this.onDisconnect();
}
}
///
/// 处理数据
///
///
///
///
internal void processBytes(byte[] bytes, int offset, int limit)
{
if (this.transportState == TransportState.readHead)
{
readHead(bytes, offset, limit);
}
else if (this.transportState == TransportState.readBody)
{
readBody(bytes, offset, limit);
}
}
///
/// 从包头获取包长度
///
///
///
private int getBodyLengthFromHeader(byte[] header)
{
try
{
BinaryReader headReader = new BinaryReader(new MemoryStream(header, 0, HeadLength));
int idLength = headReader.ReadInt16();
int dataLength = headReader.ReadInt32();
return idLength + dataLength;
}
catch (Exception e)
{
log.Error(e.Message, e);
}
return 0;
}
///
/// 读取包头
///
///
///
///
///
private bool readHead(byte[] bytes, int offset, int limit)
{
int length = limit - offset;
int headNum = HeadLength - bufferOffset;
if (length >= headNum)
{
Buffer.BlockCopy(bytes, offset, headBuffer, bufferOffset, headNum);
//Get package length
pkgLength = getBodyLengthFromHeader(headBuffer);
//Init message buffer
buffer = new byte[HeadLength + pkgLength];
Buffer.BlockCopy(headBuffer, 0, buffer, 0, HeadLength);
offset += headNum;
bufferOffset = HeadLength;
this.transportState = TransportState.readBody;
if (offset <= limit) processBytes(bytes, offset, limit);
return true;
}
else
{
Buffer.BlockCopy(bytes, offset, headBuffer, bufferOffset, length);
bufferOffset += length;
return false;
}
}
///
/// 从byte转换为request
///
///
///
///
private FastStreamRequest resolveRequestInfo(byte[] header, byte[] buffer)
{
try
{
BinaryReader headReader = new BinaryReader(new MemoryStream(header, 0, HeadLength));
int idLength = headReader.ReadInt16();
int dataLength = headReader.ReadInt32();
BinaryReader bodyReader = new BinaryReader(new MemoryStream(buffer, HeadLength, buffer.Length - HeadLength));
string id = Encoding.UTF8.GetString(bodyReader.ReadBytes(idLength));
return new FastStreamRequest(id, bodyReader.ReadBytes(dataLength));
}
catch (Exception e)
{
log.Error(e.Message, e);
}
return null;
}
///
/// 读取包体
///
///
///
///
private void readBody(byte[] bytes, int offset, int limit)
{
int length = pkgLength + HeadLength - bufferOffset;
if ((offset + length) <= limit)
{
Buffer.BlockCopy(bytes, offset, buffer, bufferOffset, length);
offset += length;
//回调消息处理函数
this.messageProcesser.Invoke(resolveRequestInfo(headBuffer, buffer));
this.bufferOffset = 0;
this.pkgLength = 0;
if (this.transportState != TransportState.closed)
this.transportState = TransportState.readHead;
if (offset < limit)
processBytes(bytes, offset, limit);
}
else
{
Buffer.BlockCopy(bytes, offset, buffer, bufferOffset, limit - offset);
bufferOffset += limit - offset;
this.transportState = TransportState.readBody;
}
}
private void print(byte[] bytes, int offset, int length)
{
for (int i = offset; i < length; i++)
Console.Write(Convert.ToString(bytes[i], 16) + " ");
Console.WriteLine();
}
}
/*-------------------------------------FastStreamRequest--------------------------------------------------------*/
///
/// fastStream 请求结构
///
class FastStreamRequest
{
private string playerId;
private byte[] data;
public string PlayerId
{
get { return playerId; }
}
public byte[] Data
{
get { return data; }
}
public FastStreamRequest(string playerId, byte[] data)
{
this.playerId = playerId;
this.data = data;
}
}
/*-------------------------------------FastStreamSession--------------------------------------------------------*/
///
/// session
///
class FastStreamSession : IFastSession
{
///
/// connector 服务器id
///
private string connectorId;
///
/// 发送队列
///
private Queue