This commit is contained in:
snltty
2025-03-03 17:44:29 +08:00
parent d12d4a1b99
commit 303581a8b3
34 changed files with 497 additions and 199 deletions

View File

@@ -20,7 +20,8 @@ sidebar_position: 2
:::tip[说明]
1. 如果你有多个服务器,希望将这些服务器作为一个中继节点
2. 在服务端 `configs/server.json` 中(`Relay->Distributed`)下配置
2.主服务器外的其它服务器部署一个服务端
3. 然后 `configs/server.json` 中(`Relay->Distributed`)下修改配置
```json
"Distributed": {

View File

@@ -2,7 +2,7 @@
sidebar_position: 4
---
# 1.1.3、ICS
# 1.1.2、ICS
:::tip[说明]
1. 如果系统没有<a href="https://dotnet.microsoft.com/zh-cn/download/dotnet-framework/thank-you/net462-web-installer" target="_blank">netframework4.6.2</a>,就下载安装一下

View File

@@ -17,7 +17,7 @@ sidebar_position: 2
1. linux已经自动添加NAT转发(在`OpenWrt`,需要在`防火墙 - 区域设置`中将`转发`设置为`接受`)
2. windows暂时找到两种NAT方式
1. NetNat<a href="./1.1.1、NetNat">请参照 1.1.1、NetNat</a>
2. RRAS<a href="./1.1.2、RRAS">请参照1.1.2、RRAS(Routing and Remote Access Service)</a>
2. ICS<a href="./1.1.2、ICS">请参照1.1.2、ICS(Internet Connection Sharing)</a>
3. macos需要你自己在**被访问端**添加NAT转发
```
# 开启ip转发

Binary file not shown.

Before

Width:  |  Height:  |  Size: 50 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 78 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 38 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 39 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 67 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 42 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 46 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 74 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 83 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 42 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 25 KiB

View File

@@ -27,7 +27,7 @@ namespace linker.messenger.flow
public sealed class RelayReportResolverFlow : RelayServerReportResolver
{
private readonly RelayReportFlow relayReportFlow;
public RelayReportResolverFlow(RelayReportFlow relayReportFlow, RelayServerMasterTransfer relayServerTransfer) : base(relayServerTransfer)
public RelayReportResolverFlow(RelayReportFlow relayReportFlow, RelayServerMasterTransfer relayServerTransfer,IRelayServerMasterStore relayServerMasterStore,IMessengerResolver messengerResolver) : base(relayServerTransfer, relayServerMasterStore, messengerResolver)
{
this.relayReportFlow = relayReportFlow;
}

View File

@@ -46,7 +46,8 @@ namespace linker.messenger.relay
Nodes = await transport.RelayTestAsync(new RelayTestInfo
{
MachineId = signInClientStore.Id,
SecretKey = relayClientStore.Server.SecretKey
SecretKey = relayClientStore.Server.SecretKey,
UserId = signInClientStore.Server.UserId
});
var tasks = Nodes.Select(async (c) =>
{

View File

@@ -58,6 +58,7 @@ namespace linker.messenger.relay.client.transport
public string MachineId { get; set; }
public string SecretKey { get; set; }
public IPEndPoint Server { get; set; }
public string UserId { get; set; }
}
/// <summary>

View File

@@ -28,6 +28,7 @@ namespace linker.messenger.relay.client.transport
private readonly IRelayClientStore relayClientStore;
private readonly SignInClientState signInClientState;
private readonly IMessengerStore messengerStore;
public RelayClientTransportSelfHost(IMessengerSender messengerSender, ISerializer serializer, IRelayClientStore relayClientStore, SignInClientState signInClientState, IMessengerStore messengerStore)
{
this.messengerSender = messengerSender;

View File

@@ -81,7 +81,7 @@ namespace linker.messenger.relay.messenger
TransportName = "test",
}, cache, null);
var nodes = relayServerTransfer.GetNodes(string.IsNullOrWhiteSpace(result));
var nodes = relayServerTransfer.GetNodes(string.IsNullOrWhiteSpace(result), info.UserId);
connection.Write(serializer.Serialize(nodes));
}
@@ -107,13 +107,14 @@ namespace linker.messenger.relay.messenger
RelayAskResultInfo result = new RelayAskResultInfo();
string error = await relayValidatorTransfer.Validate(info, cacheFrom, cacheTo);
result.Nodes = relayServerTransfer.GetNodes(string.IsNullOrWhiteSpace(error));
bool validated = string.IsNullOrWhiteSpace(error);
result.Nodes = relayServerTransfer.GetNodes(validated);
List<RelayServerCdkeyInfo> cdkeys = await relayServerCdkeyStore.Get(info.UserId);
if (result.Nodes.Count > 0)
{
result.FlowingId = relayServerTransfer.AddRelay(cacheFrom.MachineId, cacheFrom.MachineName, cacheTo.MachineId, cacheTo.MachineName, cacheFrom.GroupId, cdkeys);
result.FlowingId = relayServerTransfer.AddRelay(cacheFrom.MachineId, cacheFrom.MachineName, cacheTo.MachineId, cacheTo.MachineName, cacheFrom.GroupId, validated, cdkeys);
}
connection.Write(serializer.Serialize(result));
@@ -257,5 +258,42 @@ namespace linker.messenger.relay.messenger
connection.Write(serializer.Serialize(page));
}
/// <summary>
/// 获取缓存
/// </summary>
/// <param name="connection"></param>
/// <returns></returns>
[MessengerId((ushort)RelayMessengerIds.NodeGetCache)]
public async Task NodeGetCache(IConnection connection)
{
}
/// <summary>
/// 获取缓存
/// </summary>
/// <param name="connection"></param>
/// <returns></returns>
[MessengerId((ushort)RelayMessengerIds.NodeReport)]
public async Task NodeReport(IConnection connection)
{
}
/// <summary>
/// 消耗流量报告
/// </summary>
/// <param name="connection"></param>
/// <returns></returns>
[MessengerId((ushort)RelayMessengerIds.TrafficReport)]
public async Task TrafficReport(IConnection connection)
{
RelayTrafficReportInfo info = serializer.Deserialize<RelayTrafficReportInfo>(connection.ReceiveRequestWrap.Payload.Span);
if (relayServerStore.SecretKey != info.SecretKey )
{
connection.Write(serializer.Serialize(new Dictionary<string,ulong>()));
return;
}
}
}
}

View File

@@ -20,6 +20,11 @@
DelCdkey = 2110,
AccessCdkey = 2111,
NodeGetCache = 2112,
NodeReport = 2113,
TrafficReport = 2114,
Max = 2199
}
}

View File

@@ -79,6 +79,8 @@ namespace linker.messenger.relay.server
#else
public string MasterSecretKey { get; set; } = string.Empty;
#endif
public List<string> UserIds { get; set; } = new List<string>();
}
public sealed partial class RelayServerNodeReportInfo
@@ -102,6 +104,8 @@ namespace linker.messenger.relay.server
public IPEndPoint EndPoint { get; set; }
public long LastTicks { get; set; }
public List<string> UserIds { get; set; } = new List<string>();
}

View File

@@ -12,7 +12,6 @@ namespace linker.messenger.relay.server
{
private ulong relayFlowingId = 0;
private readonly ICrypto crypto;
private readonly ConcurrentDictionary<string, RelayServerNodeReportInfo> reports = new ConcurrentDictionary<string, RelayServerNodeReportInfo>();
@@ -22,11 +21,10 @@ namespace linker.messenger.relay.server
{
this.relayCaching = relayCaching;
this.serializer = serializer;
crypto = CryptoFactory.CreateSymmetric(relayServerMasterStore.Master.SecretKey);
}
public ulong AddRelay(string fromid, string fromName, string toid, string toName, string groupid, List<RelayServerCdkeyInfo> cdkeys)
public ulong AddRelay(string fromid, string fromName, string toid, string toName, string groupid, bool validated, List<RelayServerCdkeyInfo> cdkeys)
{
ulong flowingId = Interlocked.Increment(ref relayFlowingId);
@@ -38,6 +36,7 @@ namespace linker.messenger.relay.server
ToId = toid,
ToName = toName,
GroupId = groupid,
Validated = validated,
Cdkey = cdkeys
};
bool added = relayCaching.TryAdd($"{fromid}->{toid}->{flowingId}", cache, 15000);
@@ -46,39 +45,29 @@ namespace linker.messenger.relay.server
return flowingId;
}
public Memory<byte> TryGetRelayCache(string key)
public bool TryGetRelayCache(string key, out RelayCacheInfo value)
{
if (relayCaching.TryGetValue(key, out RelayCacheInfo value))
{
byte[] bytes = crypto.Encode(serializer.Serialize(value));
return bytes;
}
return Helper.EmptyArray;
return relayCaching.TryGetValue(key, out value);
}
/// <summary>
/// 设置节点
/// </summary>
/// <param name="ep"></param>
/// <param name="data"></param>
public void SetNodeReport(IPEndPoint ep, Memory<byte> data)
public void SetNodeReport(IConnection connection, RelayServerNodeReportInfo info)
{
try
{
if (crypto == null) return;
data = crypto.Decode(data.ToArray());
RelayServerNodeReportInfo relayNodeReportInfo = serializer.Deserialize<RelayServerNodeReportInfo>(data.Span);
if (relayNodeReportInfo.Id == RelayServerNodeInfo.MASTER_NODE_ID)
if (info.Id == RelayServerNodeInfo.MASTER_NODE_ID)
{
relayNodeReportInfo.EndPoint = new IPEndPoint(IPAddress.Any, 0);
info.EndPoint = new IPEndPoint(IPAddress.Any, 0);
}
else if (relayNodeReportInfo.EndPoint.Address.Equals(IPAddress.Any))
else if (info.EndPoint.Address.Equals(IPAddress.Any))
{
relayNodeReportInfo.EndPoint.Address = ep.Address;
info.EndPoint.Address = connection.Address.Address;
}
relayNodeReportInfo.LastTicks = Environment.TickCount64;
reports.AddOrUpdate(relayNodeReportInfo.Id, relayNodeReportInfo, (a, b) => relayNodeReportInfo);
info.LastTicks = Environment.TickCount64;
reports.AddOrUpdate(info.Id, info, (a, b) => info);
}
catch (Exception ex)
{
@@ -93,10 +82,10 @@ namespace linker.messenger.relay.server
/// </summary>
/// <param name="validated">是否已认证</param>
/// <returns></returns>
public List<RelayServerNodeReportInfo> GetNodes(bool validated)
public List<RelayServerNodeReportInfo> GetNodes(bool validated, string userid)
{
var result = reports.Values
.Where(c => c.Public || (c.Public == false && validated))
.Where(c => c.Public || (c.Public == false && validated) || c.UserIds.Contains(userid))
.Where(c => Environment.TickCount64 - c.LastTicks < 15000)
.Where(c => c.ConnectionRatio < 100 && (c.MaxGbTotal == 0 || (c.MaxGbTotal > 0 && c.MaxGbTotalLastBytes > 0)))
.OrderByDescending(c => c.LastTicks);

View File

@@ -1,8 +1,11 @@
using linker.libs;
using linker.libs.extends;
using linker.messenger.relay.messenger;
using System.Buffers;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
namespace linker.messenger.relay.server
{
@@ -12,62 +15,59 @@ namespace linker.messenger.relay.server
public class RelayServerNodeTransfer
{
private uint connectionNum = 0;
private readonly ICrypto cryptoNode;
private readonly ICrypto cryptoMaster;
private IConnection localConnection;
private IConnection remoteConnection;
private ulong bytes = 0;
private ulong lastBytes = 0;
RelaySpeedLimit limitTotal = new RelaySpeedLimit();
private readonly ConcurrentDictionary<ulong, RelayTrafficCacheInfo> trafficDict = new ConcurrentDictionary<ulong, RelayTrafficCacheInfo>();
private readonly ConcurrentDictionary<string, ulong> cdkeyLastBytes = new ConcurrentDictionary<string, ulong>();
private readonly ISerializer serializer;
private readonly IRelayServerNodeStore relayServerNodeStore;
private readonly IRelayServerMasterStore relayServerMasterStore;
public RelayServerNodeTransfer(ISerializer serializer, IRelayServerNodeStore relayServerNodeStore, IRelayServerMasterStore relayServerMasterStore)
private readonly IMessengerResolver messengerResolver;
private readonly IMessengerSender messengerSender;
public RelayServerNodeTransfer(ISerializer serializer, IRelayServerNodeStore relayServerNodeStore, IRelayServerMasterStore relayServerMasterStore, IMessengerResolver messengerResolver, IMessengerSender messengerSender)
{
this.serializer = serializer;
this.relayServerNodeStore = relayServerNodeStore;
this.relayServerMasterStore = relayServerMasterStore;
this.messengerResolver = messengerResolver;
this.messengerSender = messengerSender;
limitTotal.SetLimit((uint)Math.Ceiling((relayServerNodeStore.Node.MaxBandwidthTotal * 1024 * 1024) / 8.0));
cryptoNode = CryptoFactory.CreateSymmetric(relayServerNodeStore.Node.MasterSecretKey);
cryptoMaster = CryptoFactory.CreateSymmetric(relayServerMasterStore.Master.SecretKey);
TrafficTask();
ReportTask();
SignInTask();
}
public async ValueTask<RelayCacheInfo> TryGetRelayCache(string key, string nodeid)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(2 * 1024);
try
{
IPEndPoint server = nodeid == RelayServerNodeInfo.MASTER_NODE_ID
? new IPEndPoint(IPAddress.Loopback, relayServerNodeStore.ServicePort)
: await NetworkHelper.GetEndPointAsync(relayServerNodeStore.Node.MasterHost, 1802);
ICrypto crypto = nodeid == RelayServerNodeInfo.MASTER_NODE_ID ? cryptoMaster : cryptoNode;
IConnection connection = nodeid == RelayServerNodeInfo.MASTER_NODE_ID ? localConnection : remoteConnection;
Socket socket = new Socket(server.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
long start = Environment.TickCount64;
await socket.ConnectAsync(server).ConfigureAwait(false);
long time = Environment.TickCount64 - start;
await socket.SendAsync(new byte[] { (byte)ResolverType.RelayReport });
await socket.SendAsync(key.ToBytes());
int length = await socket.ReceiveAsync(buffer.AsMemory(), SocketFlags.None).AsTask().WaitAsync(TimeSpan.FromMilliseconds(Math.Max(time * 2, 5000))).ConfigureAwait(false);
socket.SafeClose();
RelayCacheInfo result = serializer.Deserialize<RelayCacheInfo>(crypto.Decode(buffer.AsMemory(0, length).ToArray()).Span);
return result;
MessageResponeInfo resp = await messengerSender.SendReply(new MessageRequestWrap
{
Connection = connection,
MessengerId = (ushort)RelayMessengerIds.NodeGetCache,
Payload = serializer.Serialize(key)
});
if (resp.Code == MessageResponeCodes.OK && resp.Data.Length > 0)
{
return serializer.Deserialize<RelayCacheInfo>(resp.Data.Span);
}
}
catch (Exception ex)
{
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
LoggerHelper.Instance.Error($"{ex}");
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
return null;
}
@@ -75,9 +75,40 @@ namespace linker.messenger.relay.server
/// 无效请求
/// </summary>
/// <returns></returns>
public bool Validate()
public bool Validate(RelayCacheInfo relayCache)
{
return ValidateConnection() && ValidateBytes();
//已认证的没有流量限制
if (relayCache.Validated) return true;
//流量卡有的,就能继续用
if (relayCache.Cdkey.Any(c => c.LastBytes > 0)) return true;
return ValidateConnection(relayCache) && ValidateBytes(relayCache);
}
/// <summary>
/// 连接数是否够
/// </summary>
/// <returns></returns>
private bool ValidateConnection(RelayCacheInfo relayCache)
{
bool res = relayServerNodeStore.Node.MaxConnection == 0 || relayServerNodeStore.Node.MaxConnection * 2 > connectionNum;
if (res == false && LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
LoggerHelper.Instance.Debug($"relay ValidateConnection false,{connectionNum}/{relayServerNodeStore.Node.MaxConnection * 2}");
return res;
}
/// <summary>
/// 流量是否够
/// </summary>
/// <returns></returns>
private bool ValidateBytes(RelayCacheInfo relayCache)
{
bool res = relayServerNodeStore.Node.MaxGbTotal == 0
|| (relayServerNodeStore.Node.MaxGbTotal > 0 && relayServerNodeStore.Node.MaxGbTotalLastBytes > 0);
if (res == false && LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
LoggerHelper.Instance.Debug($"relay ValidateBytes false,{relayServerNodeStore.Node.MaxGbTotalLastBytes}bytes/{relayServerNodeStore.Node.MaxGbTotal}gb");
return res;
}
/// <summary>
@@ -94,66 +125,14 @@ namespace linker.messenger.relay.server
{
Interlocked.Decrement(ref connectionNum);
}
/// <summary>
/// 连接数是否够
/// </summary>
/// <returns></returns>
public bool ValidateConnection()
{
bool res = relayServerNodeStore.Node.MaxConnection == 0 || relayServerNodeStore.Node.MaxConnection * 2 > connectionNum;
if (res == false && LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
LoggerHelper.Instance.Debug($"relay ValidateConnection false,{connectionNum}/{relayServerNodeStore.Node.MaxConnection * 2}");
return res;
}
/// <summary>
/// 流量是否够
/// </summary>
/// <returns></returns>
public bool ValidateBytes()
{
bool res = relayServerNodeStore.Node.MaxGbTotal == 0
|| (relayServerNodeStore.Node.MaxGbTotal > 0 && relayServerNodeStore.Node.MaxGbTotalLastBytes > 0);
if (res == false && LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
LoggerHelper.Instance.Debug($"relay ValidateBytes false,{relayServerNodeStore.Node.MaxGbTotalLastBytes}bytes/{relayServerNodeStore.Node.MaxGbTotal}gb");
return res;
}
/// <summary>
/// 添加流量
/// </summary>
/// <param name="length"></param>
/// <returns></returns>
public bool AddBytes(ulong length)
{
bytes += length;
if (relayServerNodeStore.Node.MaxGbTotal == 0)
{
return true;
}
if (relayServerNodeStore.Node.MaxGbTotalLastBytes >= length)
relayServerNodeStore.SetMaxGbTotalLastBytes(relayServerNodeStore.Node.MaxGbTotalLastBytes - length);
else relayServerNodeStore.SetMaxGbTotalLastBytes(0);
return relayServerNodeStore.Node.MaxGbTotalLastBytes > 0;
}
/// <summary>
/// 获取单个限速
/// </summary>
/// <returns></returns>
public uint GetBandwidthLimit()
{
return (uint)Math.Ceiling((relayServerNodeStore.Node.MaxBandwidth * 1024 * 1024) / 8.0);
}
/// <summary>
/// 是否需要总限速
/// </summary>
/// <returns></returns>
public bool NeedLimit()
public bool NeedLimit(RelayTrafficCacheInfo relayCache)
{
if (relayCache.Cache.Validated) return false;
return limitTotal.NeedLimit();
}
/// <summary>
@@ -167,8 +146,86 @@ namespace linker.messenger.relay.server
}
private void ResetBytes()
/// <summary>
/// 开始计算流量
/// </summary>
/// <param name="relayCache"></param>
public void AddTrafficCache(RelayTrafficCacheInfo relayCache)
{
SetLimit(relayCache);
trafficDict.TryAdd(relayCache.Cache.FlowId, relayCache);
}
/// <summary>
/// 取消计算流量
/// </summary>
/// <param name="relayCache"></param>
public void RemoveTrafficCache(RelayTrafficCacheInfo relayCache)
{
trafficDict.TryRemove(relayCache.Cache.FlowId, out _);
foreach (var item in relayCache.Cache.Cdkey)
{
cdkeyLastBytes.TryRemove(item.Id, out _);
}
}
/// <summary>
/// 消耗流量
/// </summary>
/// <param name="length"></param>
/// <returns></returns>
public bool AddBytes(RelayTrafficCacheInfo cache, ulong length)
{
Interlocked.Add(ref bytes, length);
//验证过的,不消耗流量
if (cache.Cache.Validated) return true;
//节点无流量限制的,不消耗流量
if (relayServerNodeStore.Node.MaxGbTotal == 0) return true;
Interlocked.Add(ref cache.Sendt, length);
if (cache.CurrentCdkey != null) return cache.CurrentCdkey.LastBytes > 0;
return relayServerNodeStore.Node.MaxGbTotalLastBytes > 0;
}
/// <summary>
/// 设置限速
/// </summary>
/// <param name="relayCache"></param>
private void SetLimit(RelayTrafficCacheInfo relayCache)
{
//验证过的,无限制
if (relayCache.Cache.Validated)
{
relayCache.Limit.SetLimit(0);
return;
}
//节点无限制
if (relayServerNodeStore.Node.MaxBandwidth == 0)
{
relayCache.Limit.SetLimit((uint)Math.Ceiling((relayServerNodeStore.Node.MaxBandwidth * 1024 * 1024) / 8.0));
return;
}
RelayServerCdkeyInfo currentCdkey = relayCache.Cache.Cdkey.Where(c => c.LastBytes > 0).OrderByDescending(c => c.Bandwidth).FirstOrDefault();
//有cdkey且带宽大于节点带宽就用cdkey的带宽
if (currentCdkey != null && (currentCdkey.Bandwidth == 0 || currentCdkey.Bandwidth > relayServerNodeStore.Node.MaxBandwidth))
{
relayCache.CurrentCdkey = currentCdkey;
relayCache.Limit.SetLimit((uint)Math.Ceiling((relayCache.CurrentCdkey.Bandwidth * 1024 * 1024) / 8.0));
return;
}
relayCache.Limit.SetLimit((uint)Math.Ceiling((relayServerNodeStore.Node.MaxBandwidth * 1024 * 1024) / 8.0));
}
private void ResetNodeBytes(ulong length)
{
if (relayServerNodeStore.Node.MaxGbTotalLastBytes >= length)
relayServerNodeStore.SetMaxGbTotalLastBytes(relayServerNodeStore.Node.MaxGbTotalLastBytes - length);
else relayServerNodeStore.SetMaxGbTotalLastBytes(0);
if (relayServerNodeStore.Node.MaxGbTotalMonth != DateTime.Now.Month)
{
relayServerNodeStore.SetMaxGbTotalMonth(DateTime.Now.Month);
@@ -176,16 +233,78 @@ namespace linker.messenger.relay.server
}
relayServerNodeStore.Confirm();
}
private void TrafficTask()
{
TimerHelper.SetInterval(async () =>
{
//需要报告Cdkey的流量
_ = messengerSender.SendReply(new MessageRequestWrap
{
Connection = relayServerNodeStore.Node.Id == RelayServerNodeInfo.MASTER_NODE_ID ? localConnection : remoteConnection,
MessengerId = (ushort)RelayMessengerIds.TrafficReport,
Payload = serializer.Serialize(new RelayTrafficReportInfo
{
Id2Bytes = trafficDict.Values
.Where(c => c.CurrentCdkey != null && c.Sendt > 0)
.GroupBy(c => c.CurrentCdkey.Id)
.ToDictionary(c => c.Key, d => (ulong)d.Sum(d => (decimal)d.Sendt)),
UpdateIds = trafficDict.Values.SelectMany(c => c.Cache.Cdkey).Select(c => c.Id).Distinct().ToList(),
SecretKey = relayServerNodeStore.Node.Id == RelayServerNodeInfo.MASTER_NODE_ID
? relayServerMasterStore.Master.SecretKey
: relayServerNodeStore.Node.MasterSecretKey
}),
Timeout = 4000,
}).ContinueWith((result) =>
{
//更新cdkey的剩余流量
if (result.Result.Code != MessageResponeCodes.OK || result.Result.Data.Length == 0)
{
return;
}
Dictionary<string, ulong> dic = serializer.Deserialize<Dictionary<string, ulong>>(result.Result.Data.Span);
foreach (var item in dic)
{
cdkeyLastBytes.AddOrUpdate(item.Key, item.Value, (a, b) => item.Value);
}
});
foreach (var cache in trafficDict.Values.Where(c => c.CurrentCdkey != null))
{
ulong length = cache.Sendt;
Interlocked.Add(ref Unsafe.As<ulong, long>(ref cache.Sendt), -(long)length);
if (cdkeyLastBytes.TryGetValue(cache.CurrentCdkey.Id, out ulong value))
{
cache.CurrentCdkey.LastBytes = value;
}
if (cache.CurrentCdkey.LastBytes <= 0)
{
SetLimit(cache);
}
}
foreach (var cache in trafficDict.Values.Where(c => c.CurrentCdkey == null))
{
ulong length = cache.Sendt;
Interlocked.Add(ref Unsafe.As<ulong, long>(ref cache.Sendt), -(long)length);
ResetNodeBytes(length);
}
await Task.CompletedTask;
return true;
}, () => 5000);
}
private void ReportTask()
{
TimerHelper.SetInterval(async () =>
{
if(LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
{
LoggerHelper.Instance.Debug($"relay report : {relayServerNodeStore.Node.ToJson()}");
}
ResetBytes();
IEnumerable<RelayServerNodeInfo> nodes = new List<RelayServerNodeInfo>
{
//默认报告给自己,作为本服务器的一个默认中继节点
@@ -215,7 +334,7 @@ namespace linker.messenger.relay.server
{
try
{
ICrypto crypto = node.Id == RelayServerNodeInfo.MASTER_NODE_ID ? cryptoMaster : cryptoNode;
IConnection connection = node.Id == RelayServerNodeInfo.MASTER_NODE_ID ? localConnection : remoteConnection;
IPEndPoint endPoint = await NetworkHelper.GetEndPointAsync(node.Host, relayServerNodeStore.ServicePort) ?? new IPEndPoint(IPAddress.Any, relayServerNodeStore.ServicePort);
RelayServerNodeReportInfo relayNodeReportInfo = new RelayServerNodeReportInfo
@@ -231,19 +350,15 @@ namespace linker.messenger.relay.server
MaxConnection = node.MaxConnection,
ConnectionRatio = Math.Round(node.MaxConnection == 0 ? 0 : connectionNum / 2.0 / node.MaxConnection, 2),
EndPoint = endPoint,
UserIds = node.UserIds
};
IPEndPoint ep = await NetworkHelper.GetEndPointAsync(node.MasterHost, relayServerNodeStore.ServicePort);
byte[] content = crypto.Encode(serializer.Serialize(relayNodeReportInfo));
byte[] data = new byte[content.Length + 1];
data[0] = (byte)ResolverType.RelayReport;
content.AsMemory().CopyTo(data.AsMemory(1));
using UdpClient udpClient = new UdpClient(AddressFamily.InterNetwork);
udpClient.Client.WindowsUdpBug();
await udpClient.SendAsync(data, ep);
await messengerSender.SendOnly(new MessageRequestWrap
{
Connection = connection,
MessengerId = (ushort)RelayMessengerIds.NodeReport,
Payload = serializer.Serialize(relayNodeReportInfo)
});
}
catch (Exception ex)
{
@@ -256,5 +371,62 @@ namespace linker.messenger.relay.server
return true;
}, () => 5000);
}
private void SignInTask()
{
TimerHelper.SetInterval(async () =>
{
if ((remoteConnection == null || remoteConnection.Connected == false) && string.IsNullOrWhiteSpace(relayServerNodeStore.Node.MasterHost) == false)
{
remoteConnection = await SignIn(relayServerNodeStore.Node.MasterHost, relayServerNodeStore.Node.MasterSecretKey).ConfigureAwait(false);
}
if (localConnection == null || localConnection.Connected == false)
{
localConnection = await SignIn(new IPEndPoint(IPAddress.Loopback, relayServerNodeStore.ServicePort).ToString(), relayServerMasterStore.Master.SecretKey).ConfigureAwait(false);
}
return true;
}, () => 3000);
}
private async Task<IConnection> SignIn(string host, string secretKey)
{
byte[] bytes = ArrayPool<byte>.Shared.Rent(1024);
try
{
byte[] secretKeyBytes = secretKey.Md5().ToBytes();
bytes[0] = (byte)secretKeyBytes.Length;
secretKeyBytes.AsSpan().CopyTo(bytes.AsSpan(1));
IPEndPoint remote = await NetworkHelper.GetEndPointAsync(host, 1802);
Socket socket = new Socket(remote.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
socket.KeepAlive();
await socket.ConnectAsync(remote).WaitAsync(TimeSpan.FromMilliseconds(5000)).ConfigureAwait(false);
return await messengerResolver.BeginReceiveClient(socket, true, (byte)ResolverType.RelayReport, bytes).ConfigureAwait(false);
}
catch (Exception)
{
}
finally
{
ArrayPool<byte>.Shared.Return(bytes);
}
return null;
}
}
public sealed partial class RelayTrafficReportInfo
{
/// <summary>
/// cdkey id 和 流量
/// </summary>
public Dictionary<string, ulong> Id2Bytes { get; set; }
/// <summary>
/// 需要知道哪些cdkey的剩余流量
/// </summary>
public List<string> UpdateIds { get; set; }
public string SecretKey { get; set; }
}
}

View File

@@ -1,4 +1,5 @@
using linker.libs.extends;
using linker.libs;
using linker.libs.extends;
using System.Buffers;
using System.Net;
using System.Net.Sockets;
@@ -8,14 +9,19 @@ namespace linker.messenger.relay.server
/// <summary>
/// 中继节点报告处理器
/// </summary>
public class RelayServerReportResolver: IResolver
public class RelayServerReportResolver : IResolver
{
public byte Type => (byte)ResolverType.RelayReport;
private readonly RelayServerMasterTransfer relayServerTransfer;
public RelayServerReportResolver(RelayServerMasterTransfer relayServerTransfer)
private readonly IRelayServerMasterStore relayServerMasterStore;
private readonly IMessengerResolver messengerResolver;
public RelayServerReportResolver(RelayServerMasterTransfer relayServerTransfer, IRelayServerMasterStore relayServerMasterStore, IMessengerResolver messengerResolver)
{
this.relayServerTransfer = relayServerTransfer;
this.relayServerMasterStore = relayServerMasterStore;
this.messengerResolver = messengerResolver;
}
public virtual void AddReceive(ulong bytes)
@@ -31,15 +37,14 @@ namespace linker.messenger.relay.server
try
{
AddReceive((ulong)memory.Length);
int length = await socket.ReceiveAsync(buffer.AsMemory(), SocketFlags.None).ConfigureAwait(false);
int length = await socket.ReceiveAsync(buffer.AsMemory(0, 1), SocketFlags.None).ConfigureAwait(false);
AddReceive((ulong)length);
await socket.ReceiveAsync(buffer.AsMemory(0, length), SocketFlags.None).ConfigureAwait(false);
string key = buffer.AsMemory(0,length).GetString();
Memory<byte> bytes = relayServerTransfer.TryGetRelayCache(key);
if (bytes.Length > 0)
string key = buffer.AsMemory(0, length).GetString();
if (relayServerMasterStore.Master.SecretKey.Md5() == key)
{
AddSendt((ulong)bytes.Length);
await socket.SendAsync(bytes);
await messengerResolver.BeginReceiveServer(socket, Helper.EmptyArray);
}
}
catch (Exception)
@@ -49,13 +54,10 @@ namespace linker.messenger.relay.server
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
public async Task Resolve(Socket socket, IPEndPoint ep, Memory<byte> memory)
{
AddReceive((ulong)memory.Length);
relayServerTransfer.SetNodeReport(ep, memory);
await Task.CompletedTask;
}
}

View File

@@ -52,17 +52,6 @@ namespace linker.messenger.relay.server
int length = await socket.ReceiveAsync(buffer.AsMemory(), SocketFlags.None).ConfigureAwait(false);
RelayMessageInfo relayMessage = serializer.Deserialize<RelayMessageInfo>(buffer.AsMemory(0, length).Span);
if (relayMessage.Type == RelayMessengerType.Ask && relayMessage.NodeId != RelayServerNodeInfo.MASTER_NODE_ID)
{
if (relayServerNodeTransfer.Validate() == false)
{
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
LoggerHelper.Instance.Error($"relay {relayMessage.Type} Validate false,flowid:{relayMessage.FlowId}");
await socket.SendAsync(new byte[] { 1 });
socket.SafeClose();
return;
}
}
//ask 是发起端来的那key就是 发起端->目标端, answer的目标和来源会交换所以转换一下
string key = relayMessage.Type == RelayMessengerType.Ask ? $"{relayMessage.FromId}->{relayMessage.ToId}->{relayMessage.FlowId}" : $"{relayMessage.ToId}->{relayMessage.FromId}->{relayMessage.FlowId}";
//获取缓存
@@ -71,9 +60,20 @@ namespace linker.messenger.relay.server
{
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
LoggerHelper.Instance.Error($"relay {relayMessage.Type} get cache fail,flowid:{relayMessage.FlowId}");
await socket.SendAsync(new byte[] { 1 });
socket.SafeClose();
return;
}
if (relayMessage.Type == RelayMessengerType.Ask && relayServerNodeTransfer.Validate(relayCache) == false)
{
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
LoggerHelper.Instance.Error($"relay {relayMessage.Type} Validate false,flowid:{relayMessage.FlowId}");
await socket.SendAsync(new byte[] { 1 });
socket.SafeClose();
return;
}
//流量统计
AddReceive(relayCache.FromId, relayCache.FromName, relayCache.ToName, relayCache.GroupId, (ulong)length);
try
@@ -84,7 +84,6 @@ namespace linker.messenger.relay.server
{
//添加本地缓存
RelayWrapInfo relayWrap = new RelayWrapInfo { Socket = socket, Tcs = new TaskCompletionSource() };
relayWrap.Limit.SetLimit(relayServerNodeTransfer.GetBandwidthLimit());
relayDic.TryAdd(relayCache.FlowId, relayWrap);
@@ -105,8 +104,9 @@ namespace linker.messenger.relay.server
}
relayWrap.Tcs.SetResult();
_ = CopyToAsync(relayCache, relayWrap.Limit, socket, relayWrap.Socket, relayMessage.NodeId != RelayServerNodeInfo.MASTER_NODE_ID);
_ = CopyToAsync(relayCache, relayWrap.Limit, relayWrap.Socket, socket, relayMessage.NodeId != RelayServerNodeInfo.MASTER_NODE_ID);
RelaySpeedLimit limit = new RelaySpeedLimit();
_ = CopyToAsync(relayCache, limit, socket, relayWrap.Socket);
_ = CopyToAsync(relayCache, limit, relayWrap.Socket, socket);
}
break;
default:
@@ -139,48 +139,48 @@ namespace linker.messenger.relay.server
ArrayPool<byte>.Shared.Return(buffer);
}
}
private async Task CopyToAsync(RelayCacheInfo cache, RelaySpeedLimit limit, Socket source, Socket destination, bool needLimit)
private async Task CopyToAsync(RelayCacheInfo cache, RelaySpeedLimit limit, Socket source, Socket destination)
{
RelayTrafficCacheInfo trafficCacheInfo = new RelayTrafficCacheInfo { Cache = cache, Sendt = 0, Limit = limit };
byte[] buffer = new byte[8 * 1024];
try
{
relayServerNodeTransfer.IncrementConnectionNum();
relayServerNodeTransfer.AddTrafficCache(trafficCacheInfo);
int bytesRead;
while ((bytesRead = await source.ReceiveAsync(buffer.AsMemory(), SocketFlags.None).ConfigureAwait(false)) != 0)
{
if (needLimit)
//流量限制
if (relayServerNodeTransfer.AddBytes(trafficCacheInfo, (ulong)bytesRead) == false)
{
//流量限制
if (relayServerNodeTransfer.AddBytes((ulong)bytesRead) == false)
{
source.SafeClose();
break;
}
source.SafeClose();
break;
}
//总速度
if (relayServerNodeTransfer.NeedLimit())
//总速度
if (relayServerNodeTransfer.NeedLimit(trafficCacheInfo))
{
int length = bytesRead;
relayServerNodeTransfer.TryLimit(ref length);
while (length > 0)
{
int length = bytesRead;
await Task.Delay(30).ConfigureAwait(false);
relayServerNodeTransfer.TryLimit(ref length);
while (length > 0)
{
await Task.Delay(30).ConfigureAwait(false);
relayServerNodeTransfer.TryLimit(ref length);
}
}
//单个速度
if (limit.NeedLimit())
{
int length = bytesRead;
limit.TryLimit(ref length);
while (length > 0)
{
await Task.Delay(30).ConfigureAwait(false);
limit.TryLimit(ref length);
}
}
}
//单个速度
if (limit.NeedLimit())
{
int length = bytesRead;
limit.TryLimit(ref length);
while (length > 0)
{
await Task.Delay(30).ConfigureAwait(false);
limit.TryLimit(ref length);
}
}
AddReceive(cache.FromId, cache.FromName, cache.ToName, cache.GroupId, (ulong)bytesRead);
AddSendt(cache.FromId, cache.FromName, cache.ToName, cache.GroupId, (ulong)bytesRead);
await destination.SendAsync(buffer.AsMemory(0, bytesRead), SocketFlags.None).ConfigureAwait(false);
@@ -192,6 +192,7 @@ namespace linker.messenger.relay.server
finally
{
relayServerNodeTransfer.DecrementConnectionNum();
relayServerNodeTransfer.RemoveTrafficCache(trafficCacheInfo);
source.SafeClose();
destination.SafeClose();
}
@@ -256,15 +257,22 @@ namespace linker.messenger.relay.server
public string ToId { get; set; }
public string ToName { get; set; }
public string GroupId { get; set; }
public bool Validated { get; set; }
public List<RelayServerCdkeyInfo> Cdkey { get; set; }
}
public sealed class RelayTrafficCacheInfo
{
public ulong Sendt;
public RelaySpeedLimit Limit { get; set; }
public RelayCacheInfo Cache { get; set; }
public RelayServerCdkeyInfo CurrentCdkey { get; set; }
}
public sealed class RelayWrapInfo
{
public TaskCompletionSource Tcs { get; set; }
public Socket Socket { get; set; }
public RelaySpeedLimit Limit { get; set; } = new RelaySpeedLimit();
}
public sealed partial class RelayMessageInfo

View File

@@ -60,6 +60,7 @@ namespace linker.messenger.serializer.memorypack
MemoryPackFormatterProvider.Register(new RelayServerCdkeyPageResultInfoFormatter());
MemoryPackFormatterProvider.Register(new RelayServerCdkeyAddInfoFormatter());
MemoryPackFormatterProvider.Register(new RelayServerCdkeyDelInfoFormatter());
MemoryPackFormatterProvider.Register(new RelayTrafficReportInfoFormatter());
MemoryPackFormatterProvider.Register(new AccessUpdateInfoFormatter());

View File

@@ -18,11 +18,13 @@ namespace linker.messenger.serializer.memorypack
[MemoryPackInclude, MemoryPackAllowSerialize]
IPEndPoint Server => info.Server;
[MemoryPackInclude]
string UserId => info.UserId;
[MemoryPackConstructor]
SerializableRelayTestInfo(string machineId, string secretKey, IPEndPoint server)
SerializableRelayTestInfo(string machineId, string secretKey, IPEndPoint server, string userid)
{
var info = new RelayTestInfo { MachineId = machineId, SecretKey = secretKey, Server = server };
var info = new RelayTestInfo { MachineId = machineId, SecretKey = secretKey, Server = server, UserId = userid };
this.info = info;
}
@@ -183,6 +185,10 @@ namespace linker.messenger.serializer.memorypack
[MemoryPackInclude]
long LastTicks => info.LastTicks;
[MemoryPackInclude]
List<string> UserIds => info.UserIds;
[MemoryPackConstructor]
SerializableRelayServerNodeReportInfo(
string id, string name,
@@ -190,7 +196,7 @@ namespace linker.messenger.serializer.memorypack
double maxGbTotal, ulong maxGbTotalLastBytes,
double connectionRatio, double bandwidthRatio,
bool Public, int delay,
IPEndPoint endPoint, long lastTicks)
IPEndPoint endPoint, long lastTicks, List<string> userIds)
{
var info = new RelayServerNodeReportInfo
{
@@ -207,6 +213,7 @@ namespace linker.messenger.serializer.memorypack
MaxGbTotalLastBytes = maxGbTotalLastBytes,
Name = name,
Public = Public,
UserIds = userIds
};
this.info = info;
}
@@ -318,11 +325,15 @@ namespace linker.messenger.serializer.memorypack
[MemoryPackInclude]
string GroupId => info.GroupId;
[MemoryPackInclude]
bool Validated => info.Validated;
[MemoryPackInclude, MemoryPackAllowSerialize]
List<RelayServerCdkeyInfo> Cdkey => info.Cdkey;
[MemoryPackConstructor]
SerializableRelayCacheInfo(ulong flowId, string fromId, string fromName, string toId, string toName, string groupId, List<RelayServerCdkeyInfo> cdkey)
SerializableRelayCacheInfo(ulong flowId, string fromId, string fromName, string toId, string toName, string groupId, bool validated, List<RelayServerCdkeyInfo> cdkey)
{
var info = new RelayCacheInfo
{
@@ -332,7 +343,8 @@ namespace linker.messenger.serializer.memorypack
GroupId = groupId,
ToId = toId,
ToName = toName,
Cdkey = cdkey
Cdkey = cdkey,
Validated = validated
};
this.info = info;
}
@@ -770,4 +782,64 @@ namespace linker.messenger.serializer.memorypack
}
}
[MemoryPackable]
public readonly partial struct SerializableRelayTrafficReportInfo
{
[MemoryPackIgnore]
public readonly RelayTrafficReportInfo info;
[MemoryPackInclude]
Dictionary<string, ulong> Id2Bytes => info.Id2Bytes;
[MemoryPackInclude]
List<string> UpdateIds => info.UpdateIds;
[MemoryPackInclude]
string SecretKey => info.SecretKey;
[MemoryPackConstructor]
SerializableRelayTrafficReportInfo(Dictionary<string, ulong> id2Bytes, List<string> updateIds, string secretKey)
{
var info = new RelayTrafficReportInfo
{
Id2Bytes = id2Bytes,
UpdateIds = updateIds,
SecretKey = secretKey
};
this.info = info;
}
public SerializableRelayTrafficReportInfo(RelayTrafficReportInfo info)
{
this.info = info;
}
}
public class RelayTrafficReportInfoFormatter : MemoryPackFormatter<RelayTrafficReportInfo>
{
public override void Serialize<TBufferWriter>(ref MemoryPackWriter<TBufferWriter> writer, scoped ref RelayTrafficReportInfo value)
{
if (value == null)
{
writer.WriteNullObjectHeader();
return;
}
writer.WritePackable(new SerializableRelayTrafficReportInfo(value));
}
public override void Deserialize(ref MemoryPackReader reader, scoped ref RelayTrafficReportInfo value)
{
if (reader.PeekIsNull())
{
reader.Advance(1); // skip null block
value = null;
return;
}
var wrapped = reader.ReadPackable<SerializableRelayTrafficReportInfo>();
value = wrapped.info;
}
}
}

View File

@@ -26,7 +26,7 @@ namespace linker.messenger.sforward.client
this.signInClientStore = signInClientStore;
this.sForwardClientStore = sForwardClientStore;
signInClientState.NetworkFirstEnabledHandle += () => Start();
signInClientState.OnSignInSuccess += (i) => Start();
this.serializer = serializer;
}

View File

@@ -125,7 +125,7 @@ namespace linker.messenger.signin
Socket socket = new Socket(remote.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
socket.KeepAlive();
await socket.ConnectAsync(remote).WaitAsync(TimeSpan.FromMilliseconds(5000)).ConfigureAwait(false);
clientSignInState.Connection = await messengerResolver.BeginReceiveClient(socket, true, (byte)ResolverType.Messenger).ConfigureAwait(false);
clientSignInState.Connection = await messengerResolver.BeginReceiveClient(socket, true, (byte)ResolverType.Messenger, Helper.EmptyArray).ConfigureAwait(false);
return true;
}

View File

@@ -15,7 +15,7 @@ namespace linker.messenger
public interface IMessengerResolver
{
public Task<IConnection> BeginReceiveClient(Socket socket);
public Task<IConnection> BeginReceiveClient(Socket socket, bool sendFlag, byte flag);
public Task<IConnection> BeginReceiveClient(Socket socket, bool sendFlag, byte flag, byte[] data);
public void AddMessenger(List<IMessenger> list);
public Task BeginReceiveServer(Socket socket, Memory<byte> memory);
public Task BeginReceiveServer(Socket socket, IPEndPoint ep, Memory<byte> memory);
@@ -104,7 +104,7 @@ namespace linker.messenger
/// <returns></returns>
public async Task<IConnection> BeginReceiveClient(Socket socket)
{
return await BeginReceiveClient(socket, false, 0);
return await BeginReceiveClient(socket, false, 0, Helper.EmptyArray);
}
/// <summary>
/// 以客户端模式接收数据
@@ -113,7 +113,7 @@ namespace linker.messenger
/// <param name="sendFlag"></param>
/// <param name="flag"></param>
/// <returns></returns>
public async Task<IConnection> BeginReceiveClient(Socket socket, bool sendFlag, byte flag)
public async Task<IConnection> BeginReceiveClient(Socket socket, bool sendFlag, byte flag, byte[] data)
{
try
{
@@ -126,6 +126,11 @@ namespace linker.messenger
{
await socket.SendAsync(new byte[] { flag }).ConfigureAwait(false);
}
if (data.Length > 0)
{
await socket.SendAsync(data).ConfigureAwait(false);
}
NetworkStream networkStream = new NetworkStream(socket, false);
SslStream sslStream = new SslStream(networkStream, true, new RemoteCertificateValidationCallback(ValidateServerCertificate), null);
await sslStream.AuthenticateAsClientAsync(new SslClientAuthenticationOptions
@@ -167,7 +172,7 @@ namespace linker.messenger
Type voidType = typeof(void);
Type midType = typeof(MessengerIdAttribute);
LoggerHelper.Instance.Info($"add messenger {string.Join(",",list.Select(c=>c.GetType().Name))}");
LoggerHelper.Instance.Info($"add messenger {string.Join(",", list.Select(c => c.GetType().Name))}");
foreach (IMessenger messenger in list.Distinct())
{

View File

@@ -36,7 +36,7 @@ namespace linker.messenger
/// <returns></returns>
public async Task BeginReceive(Socket socket)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
byte[] buffer = ArrayPool<byte>.Shared.Rent(32);
try
{
if (socket == null || socket.RemoteEndPoint == null)
@@ -44,8 +44,6 @@ namespace linker.messenger
return;
}
//socket.KeepAlive();
int length = await socket.ReceiveAsync(buffer.AsMemory(0, 1), SocketFlags.None).ConfigureAwait(false);
byte type = buffer[0];

View File

@@ -1,5 +1,5 @@
v1.6.9
2025-03-02 17:55:03
2025-03-03 17:44:29
1. 优化linux下路由跟踪问题
2. 优化linux下获取本机IP问题
3. 增加ICS让win7+、win server2008+支持NAT