优化Protocol

This commit is contained in:
若汝棋茗
2021-06-11 01:25:33 +08:00
parent 7294717cde
commit e1687b6023
15 changed files with 466 additions and 102 deletions

View File

@@ -834,7 +834,7 @@ namespace RRQMSocket.FileTransfer
}
}
private ByteBlock SendWait(int agreement, int waitTime, ByteBlock byteBlock = null)
private ByteBlock SendWait(short agreement, int waitTime, ByteBlock byteBlock = null)
{
//1001:请求下载
//1002:请求下载分包

View File

@@ -223,8 +223,8 @@ namespace RRQMSocket.FileTransfer
{
byte[] buffer = byteBlock.Buffer;
int r = (int)byteBlock.Length;
int agreement = BitConverter.ToInt32(buffer, 0);
int returnAgreement;
short agreement = BitConverter.ToInt16(buffer, 0);
short returnAgreement;
ByteBlock returnByteBlock = this.BytePool.GetByteBlock(this.BufferLength);
switch (agreement)
{

View File

@@ -17,6 +17,7 @@ using RRQMCore.Serialization;
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace RRQMSocket.RPC.RRQMRPC
@@ -111,7 +112,8 @@ namespace RRQMSocket.RPC.RRQMRPC
/// <exception cref="RRQMTimeoutException"></exception>
public RPCProxyInfo GetProxyInfo()
{
agreementHelper.SocketSend(100, (string)this.ClientConfig.GetValue(TcpRPCClientConfig.ProxyTokenProperty));
byte[] data = Encoding.UTF8.GetBytes((string)this.ClientConfig.GetValue(TcpRPCClientConfig.ProxyTokenProperty));
agreementHelper.SocketSend(100, data);
this.singleWaitData.Wait(1000 * 10);
if (this.proxyFile == null)

View File

@@ -37,34 +37,12 @@ namespace RRQMSocket
#region
/// <summary>
/// 发送简单协议信息
/// 发送简单协议
/// </summary>
/// <param name="agreement"></param>
/// <param name="text"></param>
public void SocketSend(int agreement, string text)
public void SocketSend(short agreement)
{
byte[] data = Encoding.UTF8.GetBytes(text == null ? string.Empty : text);
this.SocketSend(agreement, data);
}
/// <summary>
/// 发送简单协议状态
/// </summary>
/// <param name="agreement"></param>
public void SocketSend(int agreement)
{
this.SocketSend(agreement, string.Empty);
}
/// <summary>
/// 发送int数字
/// </summary>
/// <param name="agreement"></param>
/// <param name="number"></param>
public void SocketSend(int agreement, int number)
{
byte[] data = BitConverter.GetBytes(number);
this.SocketSend(agreement, data);
this.SocketSend(agreement, new byte[0], 0, 0);
}
/// <summary>
@@ -72,7 +50,7 @@ namespace RRQMSocket
/// </summary>
/// <param name="agreement"></param>
/// <param name="dataBuffer"></param>
public void SocketSend(int agreement, byte[] dataBuffer)
public void SocketSend(short agreement, byte[] dataBuffer)
{
this.SocketSend(agreement, dataBuffer, 0, dataBuffer.Length);
}
@@ -82,59 +60,59 @@ namespace RRQMSocket
/// </summary>
/// <param name="agreement"></param>
/// <param name="dataByteBlock"></param>
public void SocketSend(int agreement, ByteBlock dataByteBlock)
public void SocketSend(short agreement, ByteBlock dataByteBlock)
{
this.SocketSend(agreement, dataByteBlock.Buffer, 0, (int)dataByteBlock.Length);
}
/// <summary>
/// 发送字节
/// </summary>
/// <param name="agreement"></param>
/// <param name="status"></param>
/// <param name="dataBuffer"></param>
public void SocketSend(int agreement, byte status, byte[] dataBuffer)
{
ByteBlock byteBlock = this.bytePool.GetByteBlock(dataBuffer.Length + 1);
///// <summary>
///// 发送字节
///// </summary>
///// <param name="agreement"></param>
///// <param name="status"></param>
///// <param name="dataBuffer"></param>
//public void SocketSend(short agreement, byte status, byte[] dataBuffer)
//{
// ByteBlock byteBlock = this.bytePool.GetByteBlock(dataBuffer.Length + 1);
byteBlock.Write(status);
byteBlock.Write(dataBuffer);
try
{
this.SocketSend(agreement, byteBlock.Buffer, 0, (int)byteBlock.Length);
}
catch (Exception ex)
{
throw ex;
}
finally
{
byteBlock.Dispose();
}
}
// byteBlock.Write(status);
// byteBlock.Write(dataBuffer);
// try
// {
// this.SocketSend(agreement, byteBlock.Buffer, 0, (int)byteBlock.Length);
// }
// catch (Exception ex)
// {
// throw ex;
// }
// finally
// {
// byteBlock.Dispose();
// }
//}
/// <summary>
/// 发送流
/// </summary>
/// <param name="dataByteBlock"></param>
public void SocketSend(ByteBlock dataByteBlock)
{
int dataLen = (int)dataByteBlock.Position + 4;
ByteBlock byteBlock = this.bytePool.GetByteBlock(dataLen);
byte[] lenBytes = BitConverter.GetBytes(dataLen);
///// <summary>
///// 发送流
///// </summary>
///// <param name="dataByteBlock"></param>
//public void SocketSend(ByteBlock dataByteBlock)
//{
// int dataLen = (int)dataByteBlock.Position + 4;
// ByteBlock byteBlock = this.bytePool.GetByteBlock(dataLen);
// byte[] lenBytes = BitConverter.GetBytes(dataLen);
byteBlock.Write(lenBytes);
// byteBlock.Write(lenBytes);
byteBlock.Write(dataByteBlock.Buffer, 0, (int)dataByteBlock.Position);
try
{
this.mainSocket.Send(byteBlock.Buffer, 0, (int)byteBlock.Position, SocketFlags.None);
}
finally
{
byteBlock.Dispose();
}
}
// byteBlock.Write(dataByteBlock.Buffer, 0, (int)dataByteBlock.Position);
// try
// {
// this.mainSocket.Send(byteBlock.Buffer, 0, (int)byteBlock.Position, SocketFlags.None);
// }
// finally
// {
// byteBlock.Dispose();
// }
//}
/// <summary>
/// 发送字节
@@ -143,17 +121,19 @@ namespace RRQMSocket
/// <param name="dataBuffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
public void SocketSend(int agreement, byte[] dataBuffer, int offset, int length)
public void SocketSend(short agreement, byte[] dataBuffer, int offset, int length)
{
int dataLen = length - offset + 8;
int dataLen = length - offset + 6;
ByteBlock byteBlock = this.bytePool.GetByteBlock(dataLen);
byte[] lenBytes = BitConverter.GetBytes(dataLen);
byte[] agreementBytes = BitConverter.GetBytes(agreement);
byteBlock.Write(lenBytes);
byteBlock.Write(agreementBytes);
byteBlock.Write(dataBuffer, offset, length);
if (length > 0)
{
byteBlock.Write(dataBuffer, offset, length);
}
try
{
this.mainSocket.Send(byteBlock.Buffer, 0, (int)byteBlock.Position, SocketFlags.None);

View File

@@ -2,8 +2,11 @@
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using RRQMCore.ByteManager;
using RRQMCore.Exceptions;
using RRQMCore.Log;
namespace RRQMSocket
{
@@ -12,6 +15,26 @@ namespace RRQMSocket
/// </summary>
public abstract class ProtocolClient : TokenClient
{
/// <summary>
/// 构造函数
/// </summary>
public ProtocolClient()
{
waitHandle = new AutoResetEvent(false);
}
private EventWaitHandle waitHandle;
private RRQMAgreementHelper agreementHelper;
/// <summary>
/// 连接到服务器时
/// </summary>
/// <param name="e"></param>
protected override void OnConnectedService(MesEventArgs e)
{
base.OnConnectedService(e);
this.agreementHelper = new RRQMAgreementHelper(this);
}
/// <summary>
/// 载入配置,协议客户端数据处理适配器不可更改。
/// </summary>
@@ -22,6 +45,93 @@ namespace RRQMSocket
this.DataHandlingAdapter = new FixedHeaderDataHandlingAdapter();
}
/// <summary>
/// 重新设置ID,并且同步到服务器
/// </summary>
/// <param name="id"></param>
public override void ResetID(string id)
{
this.agreementHelper.SocketSend(0,Encoding.UTF8.GetBytes(id));
if (this.waitHandle.WaitOne(5000))
{
return;
}
throw new RRQMException("同步ID超时");
}
/// <summary>
/// 发送字节流
/// </summary>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
/// <exception cref="RRQMNotConnectedException"></exception>
/// <exception cref="RRQMOverlengthException"></exception>
/// <exception cref="RRQMException"></exception>
public sealed override void Send(byte[] buffer, int offset, int length)
{
this.agreementHelper.SocketSend(-1, buffer, offset, length);
}
/// <summary>
/// 发送字节流(仍然为同步发送)
/// </summary>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
public sealed override void SendAsync(byte[] buffer, int offset, int length)
{
this.agreementHelper.SocketSend(-1, buffer, offset, length);
}
/// <summary>
/// 发送字节
/// </summary>
/// <param name="agreement"></param>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
public void Send(short agreement, byte[] buffer, int offset, int length)
{
if (agreement > 0)
{
this.agreementHelper.SocketSend(agreement, buffer, offset, length);
}
else
{
throw new RRQMException("小等于0的协议为系统使用协议");
}
}
/// <summary>
/// 发送字节
/// </summary>
/// <param name="agreement"></param>
/// <param name="dataBuffer"></param>
public void SocketSend(short agreement, byte[] dataBuffer)
{
this.Send(agreement, dataBuffer, 0, dataBuffer.Length);
}
/// <summary>
/// 发送协议流
/// </summary>
/// <param name="agreement"></param>
/// <param name="dataByteBlock"></param>
public void Send(short agreement, ByteBlock dataByteBlock)
{
this.Send(agreement, dataByteBlock.Buffer, 0, (int)dataByteBlock.Length);
}
/// <summary>
/// 发送协议状态
/// </summary>
/// <param name="agreement"></param>
public void Send(short agreement)
{
this.Send(agreement,new byte[0],0,0);
}
/// <summary>
/// 密封方法
/// </summary>
@@ -29,11 +139,21 @@ namespace RRQMSocket
/// <param name="obj"></param>
protected sealed override void HandleReceivedData(ByteBlock byteBlock, object obj)
{
int agreement = BitConverter.ToInt32(byteBlock.Buffer, 0);
short agreement = BitConverter.ToInt16(byteBlock.Buffer, 0);
switch (agreement)
{
case 0:
{
try
{
string id = Encoding.UTF8.GetString(byteBlock.Buffer, 2, (int)byteBlock.Length - 2);
base.ResetID(id);
this.waitHandle.Set();
}
catch (Exception ex)
{
this.Logger.Debug(LogType.Error, this, "重置ID错误", ex);
}
break;
}
default:
@@ -48,11 +168,20 @@ namespace RRQMSocket
/// <summary>
/// 收到协议数据,由于性能考虑,
/// byteBlock数据源并未剔除协议数据
/// 所以真实数据起点为4
/// 长度为Length-4
/// 所以真实数据起点为2
/// 长度为Length-2
/// </summary>
/// <param name="agreement"></param>
/// <param name="byteBlock"></param>
protected abstract void HandleProtocolData(int agreement,ByteBlock byteBlock);
protected abstract void HandleProtocolData(short agreement,ByteBlock byteBlock);
/// <summary>
/// 释放资源
/// </summary>
public override void Dispose()
{
base.Dispose();
this.waitHandle.Dispose();
}
}
}

View File

@@ -0,0 +1,30 @@
using RRQMCore.ByteManager;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace RRQMSocket
{
/// <summary>
/// 协议客户端
/// </summary>
public class SimpleProtocolClient : ProtocolClient
{
/// <summary>
/// 接收到数据
/// </summary>
public event Action<short, ByteBlock> Received;
/// <summary>
/// 处理协议数据
/// </summary>
/// <param name="agreement"></param>
/// <param name="byteBlock"></param>
protected sealed override void HandleProtocolData(short agreement, ByteBlock byteBlock)
{
this.Received?.Invoke(agreement, byteBlock);
}
}
}

View File

@@ -30,16 +30,6 @@ namespace RRQMSocket
/// <param name="byteBlock"></param>
/// <param name="obj"></param>
protected sealed override void HandleReceivedData(ByteBlock byteBlock, object obj)
{
OnReceived(byteBlock, obj);
}
/// <summary>
/// 接收到数据
/// </summary>
/// <param name="byteBlock"></param>
/// <param name="obj"></param>
protected virtual void OnReceived(ByteBlock byteBlock, object obj)
{
this.Received?.Invoke(byteBlock, obj);
}

View File

@@ -54,7 +54,7 @@ namespace RRQMSocket
}
/// <summary>
/// 重新设置ID
/// 重新设置ID,但是不会同步到服务器
/// </summary>
/// <param name="id"></param>
public virtual void ResetID(string id)

View File

@@ -1,4 +1,5 @@
using System;
using RRQMCore.Exceptions;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@@ -11,5 +12,22 @@ namespace RRQMSocket
/// </summary>
public abstract class ProtocolService<TClient> : TokenService<TClient>where TClient:ProtocolSocketClient,new()
{
/// <summary>
/// 重置ID
/// </summary>
/// <param name="oldID"></param>
/// <param name="newID"></param>
public override void ResetID(string oldID, string newID)
{
base.ResetID(oldID, newID);
if (this.TryGetSocketClient(newID, out TClient client))
{
client.agreementHelper.SocketSend(0, Encoding.UTF8.GetBytes(newID));
}
else
{
throw new RRQMException("新ID不可用请清理客户端重新修改ID");
}
}
}
}

View File

@@ -0,0 +1,46 @@
using RRQMCore.ByteManager;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace RRQMSocket
{
/// <summary>
/// 简单协议服务器
/// </summary>
public class SimpleProtocolService : ProtocolService<SimpleProtocolSocketClient>
{
/// <summary>
/// 处理数据
/// </summary>
public event Action<SimpleProtocolSocketClient, short, ByteBlock> Received;
/// <summary>
/// 成功连接后创建(或从对象池中获得)辅助类,
/// 用户可以在该方法中再进行自定义设置,
/// 但是如果该对象是从对象池获得的话,为避免重复设定某些值,
/// 例如事件等请先判断CreatOption.NewCreat值再做处理。
/// </summary>
public event Action<SimpleProtocolSocketClient, CreateOption> CreateSocketCliect;
/// <summary>
/// 接收辅助类
/// </summary>
/// <param name="tcpSocketClient"></param>
/// <param name="createOption"></param>
protected override void OnCreateSocketCliect(SimpleProtocolSocketClient tcpSocketClient, CreateOption createOption)
{
this.CreateSocketCliect?.Invoke(tcpSocketClient, createOption);
if (createOption.NewCreate)
{
tcpSocketClient.OnReceived = this.OnReceive;
}
}
private void OnReceive(SimpleProtocolSocketClient socketClient, short agreement, ByteBlock byteBlock)
{
this.Received?.Invoke(socketClient, agreement, byteBlock);
}
}
}

View File

@@ -59,6 +59,7 @@ namespace RRQMSocket
{
get { return clearInterval; }
}
/// <summary>
/// 最大可连接数
/// </summary>
@@ -66,6 +67,7 @@ namespace RRQMSocket
{
get { return maxCount; }
}
/// <summary>
/// 获取服务器配置
/// </summary>
@@ -87,7 +89,6 @@ namespace RRQMSocket
get { return socketClients; }
}
private string name;
/// <summary>
/// 服务器名称
@@ -400,7 +401,7 @@ namespace RRQMSocket
this.OnCreateSocketCliect(client, creatOption);
client.id = creatOption.ID;
if (this.socketClients.TryAdd(client))
if (!this.socketClients.TryAdd(client))
{
throw new RRQMException("ID重复");
}

View File

@@ -110,7 +110,7 @@ namespace RRQMSocket
this.OnCreateSocketCliect(client, creatOption);
client.id = creatOption.ID;
if (this.SocketClients.TryAdd(client))
if (!this.SocketClients.TryAdd(client))
{
throw new RRQMException("ID重复");
}

View File

@@ -4,6 +4,8 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RRQMCore.ByteManager;
using RRQMCore.Exceptions;
using RRQMCore.Log;
namespace RRQMSocket
{
@@ -12,23 +14,142 @@ namespace RRQMSocket
/// </summary>
public abstract class ProtocolSocketClient : SocketClient
{
internal RRQMAgreementHelper agreementHelper;
/// <summary>
/// 新建
/// 接收之前
/// </summary>
public sealed override void Create()
protected override void OnBeforeReceive()
{
base.OnBeforeReceive();
this.agreementHelper = new RRQMAgreementHelper(this);
this.DataHandlingAdapter = new FixedHeaderDataHandlingAdapter();
}
/// <summary>
/// 收到消息
/// 发送字节流
/// </summary>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
/// <exception cref="RRQMNotConnectedException"></exception>
/// <exception cref="RRQMOverlengthException"></exception>
/// <exception cref="RRQMException"></exception>
public sealed override void Send(byte[] buffer, int offset, int length)
{
this.agreementHelper.SocketSend(-1, buffer, offset, length);
}
/// <summary>
/// 发送字节流(仍然为同步发送)
/// </summary>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
public sealed override void SendAsync(byte[] buffer, int offset, int length)
{
this.agreementHelper.SocketSend(-1, buffer, offset, length);
}
/// <summary>
/// 发送字节
/// </summary>
/// <param name="agreement"></param>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="length"></param>
public void Send(short agreement, byte[] buffer, int offset, int length)
{
if (agreement > 0)
{
this.agreementHelper.SocketSend(agreement, buffer, offset, length);
}
else
{
throw new RRQMException("小等于0的协议为系统使用协议");
}
}
/// <summary>
/// 发送字节
/// </summary>
/// <param name="agreement"></param>
/// <param name="dataBuffer"></param>
public void SocketSend(short agreement, byte[] dataBuffer)
{
this.Send(agreement, dataBuffer, 0, dataBuffer.Length);
}
/// <summary>
/// 发送协议流
/// </summary>
/// <param name="agreement"></param>
/// <param name="dataByteBlock"></param>
public void Send(short agreement, ByteBlock dataByteBlock)
{
this.Send(agreement, dataByteBlock.Buffer, 0, (int)dataByteBlock.Length);
}
/// <summary>
/// 发送协议状态
/// </summary>
/// <param name="agreement"></param>
public void Send(short agreement)
{
this.Send(agreement, new byte[0], 0, 0);
}
/// <summary>
/// 重新设置ID
/// </summary>
/// <param name="id"></param>
protected override void ResetID(string id)
{
base.ResetID(id);
}
/// <summary>
/// 密封方法
/// </summary>
/// <param name="byteBlock"></param>
/// <param name="obj"></param>
protected sealed override void HandleReceivedData(ByteBlock byteBlock, object obj)
{
int agreement = BitConverter.ToInt32(byteBlock.Buffer, 0);
short agreement = BitConverter.ToInt16(byteBlock.Buffer, 0);
switch (agreement)
{
case 0:
{
try
{
string id = Encoding.UTF8.GetString(byteBlock.Buffer, 2, (int)byteBlock.Length - 2);
base.ResetID(id);
this.agreementHelper.SocketSend(0, Encoding.UTF8.GetBytes(this.id));
}
catch (Exception ex)
{
this.Logger.Debug(LogType.Error, this, "重置ID错误", ex);
}
break;
}
default:
{
HandleProtocolData(agreement, byteBlock);
break;
}
}
}
/// <summary>
/// 收到协议数据,由于性能考虑,
/// byteBlock数据源并未剔除协议数据
/// 所以真实数据起点为2
/// 长度为Length-2。
/// </summary>
/// <param name="agreement"></param>
/// <param name="byteBlock"></param>
protected abstract void HandleProtocolData(short agreement, ByteBlock byteBlock);
}
}

View File

@@ -0,0 +1,30 @@
using RRQMCore.ByteManager;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace RRQMSocket
{
/// <summary>
/// SimpleProtocolSocketClient
/// </summary>
public sealed class SimpleProtocolSocketClient : ProtocolSocketClient
{
/// <summary>
/// 收到消息
/// </summary>
internal Action<SimpleProtocolSocketClient,short, ByteBlock> OnReceived;
/// <summary>
/// 处理协议数据
/// </summary>
/// <param name="agreement"></param>
/// <param name="byteBlock"></param>
protected override void HandleProtocolData(short agreement, ByteBlock byteBlock)
{
this.OnReceived.Invoke(this,agreement,byteBlock);
}
}
}

View File

@@ -118,6 +118,22 @@ namespace RRQMSocket
this.dataHandlingAdapter.Received(clientBuffer.byteBlock);
}
/// <summary>
/// 重新设置ID
/// </summary>
/// <param name="id"></param>
protected virtual void ResetID(string id)
{
this.Service.ResetID(this.id,id);
}
/// <summary>
/// 在接收之前
/// </summary>
protected virtual void OnBeforeReceive()
{
}
/// <summary>
/// 重新获取,父类方法不可覆盖
@@ -224,6 +240,7 @@ namespace RRQMSocket
{
try
{
this.OnBeforeReceive();
SocketAsyncEventArgs eventArgs = new SocketAsyncEventArgs();
eventArgs.Completed += this.EventArgs_Completed;
ByteBlock byteBlock = this.BytePool.GetByteBlock(this.BufferLength);