This commit is contained in:
snltty
2025-03-06 21:15:19 +08:00
parent 303581a8b3
commit c6eeab74b9
86 changed files with 1091 additions and 557 deletions

View File

@@ -5,7 +5,6 @@ using System.Buffers;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
namespace linker.messenger.relay.server
{
@@ -18,11 +17,11 @@ namespace linker.messenger.relay.server
private IConnection localConnection;
private IConnection remoteConnection;
private ulong bytes = 0;
private ulong lastBytes = 0;
RelaySpeedLimit limitTotal = new RelaySpeedLimit();
private long bytes = 0;
private long lastBytes = 0;
private RelaySpeedLimit limitTotal = new RelaySpeedLimit();
private readonly ConcurrentDictionary<ulong, RelayTrafficCacheInfo> trafficDict = new ConcurrentDictionary<ulong, RelayTrafficCacheInfo>();
private readonly ConcurrentDictionary<string, ulong> cdkeyLastBytes = new ConcurrentDictionary<string, ulong>();
private readonly ConcurrentDictionary<long, long> cdkeyLastBytes = new ConcurrentDictionary<long, long>();
private readonly ISerializer serializer;
private readonly IRelayServerNodeStore relayServerNodeStore;
@@ -50,7 +49,7 @@ namespace linker.messenger.relay.server
{
try
{
IConnection connection = nodeid == RelayServerNodeInfo.MASTER_NODE_ID ? localConnection : remoteConnection;
IConnection connection = relayServerNodeStore.Node.Id == RelayServerNodeInfo.MASTER_NODE_ID ? localConnection : remoteConnection;
MessageResponeInfo resp = await messengerSender.SendReply(new MessageRequestWrap
{
@@ -133,6 +132,7 @@ namespace linker.messenger.relay.server
public bool NeedLimit(RelayTrafficCacheInfo relayCache)
{
if (relayCache.Cache.Validated) return false;
//if (relayCache.CurrentCdkey != null) return false;
return limitTotal.NeedLimit();
}
/// <summary>
@@ -164,7 +164,7 @@ namespace linker.messenger.relay.server
trafficDict.TryRemove(relayCache.Cache.FlowId, out _);
foreach (var item in relayCache.Cache.Cdkey)
{
cdkeyLastBytes.TryRemove(item.Id, out _);
cdkeyLastBytes.TryRemove(item.CdkeyId, out _);
}
}
/// <summary>
@@ -172,7 +172,7 @@ namespace linker.messenger.relay.server
/// </summary>
/// <param name="length"></param>
/// <returns></returns>
public bool AddBytes(RelayTrafficCacheInfo cache, ulong length)
public bool AddBytes(RelayTrafficCacheInfo cache, long length)
{
Interlocked.Add(ref bytes, length);
@@ -194,20 +194,13 @@ namespace linker.messenger.relay.server
/// <param name="relayCache"></param>
private void SetLimit(RelayTrafficCacheInfo relayCache)
{
//验证过的,无限制
if (relayCache.Cache.Validated)
//无限制
if (relayCache.Cache.Validated || relayServerNodeStore.Node.MaxBandwidth == 0)
{
relayCache.Limit.SetLimit(0);
return;
}
//节点无限制
if (relayServerNodeStore.Node.MaxBandwidth == 0)
{
relayCache.Limit.SetLimit((uint)Math.Ceiling((relayServerNodeStore.Node.MaxBandwidth * 1024 * 1024) / 8.0));
return;
}
RelayServerCdkeyInfo currentCdkey = relayCache.Cache.Cdkey.Where(c => c.LastBytes > 0).OrderByDescending(c => c.Bandwidth).FirstOrDefault();
//有cdkey且带宽大于节点带宽就用cdkey的带宽
if (currentCdkey != null && (currentCdkey.Bandwidth == 0 || currentCdkey.Bandwidth > relayServerNodeStore.Node.MaxBandwidth))
@@ -220,91 +213,129 @@ namespace linker.messenger.relay.server
relayCache.Limit.SetLimit((uint)Math.Ceiling((relayServerNodeStore.Node.MaxBandwidth * 1024 * 1024) / 8.0));
}
private void ResetNodeBytes(ulong length)
private void ResetNodeBytes()
{
if (relayServerNodeStore.Node.MaxGbTotalLastBytes >= length)
relayServerNodeStore.SetMaxGbTotalLastBytes(relayServerNodeStore.Node.MaxGbTotalLastBytes - length);
else relayServerNodeStore.SetMaxGbTotalLastBytes(0);
foreach (var cache in trafficDict.Values.Where(c => c.CurrentCdkey == null))
{
long length = cache.Sendt;
Interlocked.Exchange(ref cache.Sendt, 0);
if (relayServerNodeStore.Node.MaxGbTotalLastBytes >= length)
relayServerNodeStore.SetMaxGbTotalLastBytes(relayServerNodeStore.Node.MaxGbTotalLastBytes - length);
else relayServerNodeStore.SetMaxGbTotalLastBytes(0);
}
if (relayServerNodeStore.Node.MaxGbTotalMonth != DateTime.Now.Month)
{
relayServerNodeStore.SetMaxGbTotalMonth(DateTime.Now.Month);
relayServerNodeStore.SetMaxGbTotalLastBytes((ulong)(relayServerNodeStore.Node.MaxGbTotal * 1024 * 1024 * 1024));
relayServerNodeStore.SetMaxGbTotalLastBytes((long)(relayServerNodeStore.Node.MaxGbTotal * 1024 * 1024 * 1024));
}
relayServerNodeStore.Confirm();
}
private void TrafficTask()
private void DownloadBytes()
{
TimerHelper.SetInterval(async () =>
TimerHelper.Async(async () =>
{
//需要报告Cdkey的流量
_ = messengerSender.SendReply(new MessageRequestWrap
List<long> ids = trafficDict.Values.SelectMany(c => c.Cache.Cdkey).Select(c => c.CdkeyId).Distinct().ToList();
while (ids.Count > 0)
{
//分批更新,可能数量很大
List<long> id = ids.Take(100).ToList();
ids.RemoveRange(0, id.Count);
MessageResponeInfo resp = await messengerSender.SendReply(new MessageRequestWrap
{
Connection = relayServerNodeStore.Node.Id == RelayServerNodeInfo.MASTER_NODE_ID ? localConnection : remoteConnection,
MessengerId = (ushort)RelayMessengerIds.TrafficReport,
Payload = serializer.Serialize(new RelayTrafficUpdateInfo
{
Dic = [],
Ids = id,
SecretKey = relayServerNodeStore.Node.Id == RelayServerNodeInfo.MASTER_NODE_ID
? relayServerMasterStore.Master.SecretKey
: relayServerNodeStore.Node.MasterSecretKey
}),
Timeout = 4000
});
if (resp.Code == MessageResponeCodes.OK && resp.Data.Length > 0)
{
Dictionary<long, long> dic = serializer.Deserialize<Dictionary<long, long>>(resp.Data.Span);
//更新剩余流量
foreach (KeyValuePair<long, long> item in dic)
{
cdkeyLastBytes.AddOrUpdate(item.Key, item.Value, (a, b) => item.Value);
}
//查不到的,归零
foreach (long item in id.Except(dic.Keys))
{
cdkeyLastBytes.AddOrUpdate(item, 0, (a, b) => 0);
}
}
}
});
}
private void UploadBytes()
{
TimerHelper.Async(async () =>
{
MessageResponeInfo resp = await messengerSender.SendReply(new MessageRequestWrap
{
Connection = relayServerNodeStore.Node.Id == RelayServerNodeInfo.MASTER_NODE_ID ? localConnection : remoteConnection,
MessengerId = (ushort)RelayMessengerIds.TrafficReport,
Payload = serializer.Serialize(new RelayTrafficReportInfo
Payload = serializer.Serialize(new RelayTrafficUpdateInfo
{
Id2Bytes = trafficDict.Values
.Where(c => c.CurrentCdkey != null && c.Sendt > 0)
.GroupBy(c => c.CurrentCdkey.Id)
.ToDictionary(c => c.Key, d => (ulong)d.Sum(d => (decimal)d.Sendt)),
UpdateIds = trafficDict.Values.SelectMany(c => c.Cache.Cdkey).Select(c => c.Id).Distinct().ToList(),
Dic = trafficDict.Values.Where(c => c.CurrentCdkey != null && c.Sendt > 0).GroupBy(c => c.CurrentCdkey.CdkeyId).ToDictionary(c => c.Key, d => d.Sum(d => d.Sendt)),
Ids = [],
SecretKey = relayServerNodeStore.Node.Id == RelayServerNodeInfo.MASTER_NODE_ID
? relayServerMasterStore.Master.SecretKey
: relayServerNodeStore.Node.MasterSecretKey
? relayServerMasterStore.Master.SecretKey
: relayServerNodeStore.Node.MasterSecretKey
}),
Timeout = 4000,
}).ContinueWith((result) =>
{
//更新cdkey的剩余流量
if (result.Result.Code != MessageResponeCodes.OK || result.Result.Data.Length == 0)
{
return;
}
Dictionary<string, ulong> dic = serializer.Deserialize<Dictionary<string, ulong>>(result.Result.Data.Span);
foreach (var item in dic)
{
cdkeyLastBytes.AddOrUpdate(item.Key, item.Value, (a, b) => item.Value);
}
Timeout = 4000
});
foreach (var cache in trafficDict.Values.Where(c => c.CurrentCdkey != null))
if (resp.Code == MessageResponeCodes.OK)
{
ulong length = cache.Sendt;
Interlocked.Add(ref Unsafe.As<ulong, long>(ref cache.Sendt), -(long)length);
if (cdkeyLastBytes.TryGetValue(cache.CurrentCdkey.Id, out ulong value))
try
{
cache.CurrentCdkey.LastBytes = value;
serializer.Deserialize<Dictionary<string, ulong>>(resp.Data.Span);
//成功报告了流量,就重新计数
foreach (var cache in trafficDict.Values.Where(c => c.CurrentCdkey != null))
{
Interlocked.Exchange(ref cache.Sendt, 0);
//检查一下是不是需要更新剩余流量
if (cdkeyLastBytes.TryGetValue(cache.CurrentCdkey.CdkeyId, out long value))
{
cache.CurrentCdkey.LastBytes = value;
}
//当前cdkey流量用完了就重新找找新的cdkey
if (cache.CurrentCdkey.LastBytes <= 0)
{
SetLimit(cache);
}
}
}
if (cache.CurrentCdkey.LastBytes <= 0)
catch (Exception)
{
SetLimit(cache);
}
}
});
}
private void TrafficTask()
{
TimerHelper.SetIntervalLong(() =>
{
UploadBytes();
DownloadBytes();
foreach (var cache in trafficDict.Values.Where(c => c.CurrentCdkey == null))
{
ulong length = cache.Sendt;
Interlocked.Add(ref Unsafe.As<ulong, long>(ref cache.Sendt), -(long)length);
ResetNodeBytes(length);
}
await Task.CompletedTask;
ResetNodeBytes();
return true;
}, () => 5000);
}, 5000);
}
private void ReportTask()
{
TimerHelper.SetInterval(async () =>
TimerHelper.SetIntervalLong(async () =>
{
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
{
LoggerHelper.Instance.Debug($"relay report : {relayServerNodeStore.Node.ToJson()}");
}
IEnumerable<RelayServerNodeInfo> nodes = new List<RelayServerNodeInfo>
{
//默认报告给自己,作为本服务器的一个默认中继节点
@@ -349,8 +380,7 @@ namespace linker.messenger.relay.server
MaxGbTotalLastBytes = node.MaxGbTotalLastBytes,
MaxConnection = node.MaxConnection,
ConnectionRatio = Math.Round(node.MaxConnection == 0 ? 0 : connectionNum / 2.0 / node.MaxConnection, 2),
EndPoint = endPoint,
UserIds = node.UserIds
EndPoint = endPoint
};
await messengerSender.SendOnly(new MessageRequestWrap
@@ -364,17 +394,17 @@ namespace linker.messenger.relay.server
{
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
{
LoggerHelper.Instance.Debug($"relay report : {ex}");
LoggerHelper.Instance.Error($"relay report : {ex}");
}
}
}
return true;
}, () => 5000);
}, 5000);
}
private void SignInTask()
{
TimerHelper.SetInterval(async () =>
TimerHelper.SetIntervalLong(async () =>
{
if ((remoteConnection == null || remoteConnection.Connected == false) && string.IsNullOrWhiteSpace(relayServerNodeStore.Node.MasterHost) == false)
{
@@ -385,7 +415,7 @@ namespace linker.messenger.relay.server
localConnection = await SignIn(new IPEndPoint(IPAddress.Loopback, relayServerNodeStore.ServicePort).ToString(), relayServerMasterStore.Master.SecretKey).ConfigureAwait(false);
}
return true;
}, () => 3000);
}, 3000);
}
private async Task<IConnection> SignIn(string host, string secretKey)
{
@@ -403,10 +433,14 @@ namespace linker.messenger.relay.server
Socket socket = new Socket(remote.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
socket.KeepAlive();
await socket.ConnectAsync(remote).WaitAsync(TimeSpan.FromMilliseconds(5000)).ConfigureAwait(false);
return await messengerResolver.BeginReceiveClient(socket, true, (byte)ResolverType.RelayReport, bytes).ConfigureAwait(false);
return await messengerResolver.BeginReceiveClient(socket, true, (byte)ResolverType.RelayReport, bytes.AsMemory(0, secretKeyBytes.Length + 1)).ConfigureAwait(false);
}
catch (Exception)
catch (Exception ex)
{
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
{
LoggerHelper.Instance.Error(ex);
}
}
finally
{
@@ -416,16 +450,16 @@ namespace linker.messenger.relay.server
}
}
public sealed partial class RelayTrafficReportInfo
public sealed partial class RelayTrafficUpdateInfo
{
/// <summary>
/// cdkey id 和 流量
/// </summary>
public Dictionary<string, ulong> Id2Bytes { get; set; }
public Dictionary<long, long> Dic { get; set; }
/// <summary>
/// 需要知道哪些cdkey的剩余流量
/// </summary>
public List<string> UpdateIds { get; set; }
public List<long> Ids { get; set; }
public string SecretKey { get; set; }
}