From b3194212b49bd25e9ea1607b8c784192ae827ec8 Mon Sep 17 00:00:00 2001 From: snltty <1069410172@qq.com> Date: Mon, 10 Mar 2025 17:55:18 +0800 Subject: [PATCH] cdkey --- readme/Demo/Config/Config.php | 3 +- .../client/RelayApiController.cs | 2 +- .../client/RelayClientTestTransfer.cs | 2 +- .../client/transport/IRelayClientTransport.cs | 2 +- .../transport/RelayClientTransportSelfHost.cs | 16 +- .../messenger/RelayMessenger.cs | 76 ++++++--- .../messenger/RelayMessengerIds.cs | 2 + .../server/IRelayServerCdkeyStore.cs | 15 +- .../server/IRelayServerNodeStore.cs | 12 +- .../server/RelayServerMasterTransfer.cs | 67 ++++++-- .../server/RelayServerNodeTransfer.cs | 161 +++++++----------- .../server/RelayServerResolver.cs | 30 ++-- .../Entry.cs | 2 + .../RelaySerializer.cs | 158 ++++++++++++++++- .../relay/RelayServerCdkeyStore.cs | 10 +- src/linker.web/public/archlinux.svg | 1 + .../src/views/full/devices/tunnel.js | 1 - .../src/views/full/devices/tuntap.js | 2 +- .../src/views/full/server/relayCdkey/Add.vue | 2 +- version.txt | 2 +- 20 files changed, 380 insertions(+), 186 deletions(-) create mode 100644 src/linker.web/public/archlinux.svg diff --git a/readme/Demo/Config/Config.php b/readme/Demo/Config/Config.php index 88021b6e..1ffabd67 100644 --- a/readme/Demo/Config/Config.php +++ b/readme/Demo/Config/Config.php @@ -2,6 +2,5 @@ declare (strict_types=1); return [ - 'STATUS' => '1', - 'KeyId' => 'FAFAB92C-DFDF-1221-DEA2-40A0E915EB10', + 'STATUS' => '0' ]; \ No newline at end of file diff --git a/src/linker.messenger.relay/client/RelayApiController.cs b/src/linker.messenger.relay/client/RelayApiController.cs index a0d0b8f6..1f077688 100644 --- a/src/linker.messenger.relay/client/RelayApiController.cs +++ b/src/linker.messenger.relay/client/RelayApiController.cs @@ -43,7 +43,7 @@ namespace linker.messenger.relay return true; } - public List Subscribe(ApiControllerParamsInfo param) + public List Subscribe(ApiControllerParamsInfo param) { relayTestTransfer.Subscribe(); return relayTestTransfer.Nodes; diff --git a/src/linker.messenger.relay/client/RelayClientTestTransfer.cs b/src/linker.messenger.relay/client/RelayClientTestTransfer.cs index acbadfd1..12bb4fdd 100644 --- a/src/linker.messenger.relay/client/RelayClientTestTransfer.cs +++ b/src/linker.messenger.relay/client/RelayClientTestTransfer.cs @@ -18,7 +18,7 @@ namespace linker.messenger.relay private readonly ISignInClientStore signInClientStore; private readonly IRelayClientStore relayClientStore; - public List Nodes { get; private set; } = new List(); + public List Nodes { get; private set; } = new List(); public RelayClientTestTransfer(RelayClientTransfer relayTransfer, SignInClientState signInClientState, ISignInClientStore signInClientStore, IRelayClientStore relayClientStore) { diff --git a/src/linker.messenger.relay/client/transport/IRelayClientTransport.cs b/src/linker.messenger.relay/client/transport/IRelayClientTransport.cs index 051efece..dcf45f06 100644 --- a/src/linker.messenger.relay/client/transport/IRelayClientTransport.cs +++ b/src/linker.messenger.relay/client/transport/IRelayClientTransport.cs @@ -47,7 +47,7 @@ namespace linker.messenger.relay.client.transport /// /// /// - public Task> RelayTestAsync(RelayTestInfo170 relayTestInfo); + public Task> RelayTestAsync(RelayTestInfo170 relayTestInfo); } /// diff --git a/src/linker.messenger.relay/client/transport/RelayClientTransportSelfHost.cs b/src/linker.messenger.relay/client/transport/RelayClientTransportSelfHost.cs index 3405dde7..6de9b519 100644 --- a/src/linker.messenger.relay/client/transport/RelayClientTransportSelfHost.cs +++ b/src/linker.messenger.relay/client/transport/RelayClientTransportSelfHost.cs @@ -44,7 +44,7 @@ namespace linker.messenger.relay.client.transport try { //问一下能不能中继 - RelayAskResultInfo relayAskResultInfo = await RelayAsk(relayInfo); + RelayAskResultInfo170 relayAskResultInfo = await RelayAsk(relayInfo); relayInfo.FlowingId = relayAskResultInfo.FlowingId; if (relayInfo.FlowingId == 0 || relayAskResultInfo.Nodes.Count == 0) { @@ -119,7 +119,7 @@ namespace linker.messenger.relay.client.transport return null; } - private async Task RelayAsk(RelayInfo170 relayInfo) + private async Task RelayAsk(RelayInfo170 relayInfo) { MessageResponeInfo resp = await messengerSender.SendReply(new MessageRequestWrap { @@ -130,15 +130,15 @@ namespace linker.messenger.relay.client.transport }).ConfigureAwait(false); if (resp.Code != MessageResponeCodes.OK) { - return new RelayAskResultInfo(); + return new RelayAskResultInfo170(); } - RelayAskResultInfo result = serializer.Deserialize(resp.Data.Span); + RelayAskResultInfo170 result = serializer.Deserialize(resp.Data.Span); return result; } - private async Task ConnectNodeServer(RelayInfo170 relayInfo, List nodes) + private async Task ConnectNodeServer(RelayInfo170 relayInfo, List nodes) { byte[] buffer = ArrayPool.Shared.Rent(1 * 1024); @@ -306,7 +306,7 @@ namespace linker.messenger.relay.client.transport return null; } - public async Task> RelayTestAsync(RelayTestInfo170 relayTestInfo) + public async Task> RelayTestAsync(RelayTestInfo170 relayTestInfo) { try { @@ -320,13 +320,13 @@ namespace linker.messenger.relay.client.transport if (resp.Code == MessageResponeCodes.OK) { - return serializer.Deserialize>(resp.Data.Span); + return serializer.Deserialize>(resp.Data.Span); } } catch (Exception) { } - return new List(); + return new List(); } } } diff --git a/src/linker.messenger.relay/messenger/RelayMessenger.cs b/src/linker.messenger.relay/messenger/RelayMessenger.cs index 5665e681..a56bc729 100644 --- a/src/linker.messenger.relay/messenger/RelayMessenger.cs +++ b/src/linker.messenger.relay/messenger/RelayMessenger.cs @@ -70,15 +70,22 @@ namespace linker.messenger.relay.messenger public async Task RelayTest(IConnection connection) { RelayTestInfo info = serializer.Deserialize(connection.ReceiveRequestWrap.Payload.Span); - await RelayTest(connection, info); + await RelayTest(connection, info, (validated) => + { + List list = relayServerTransfer.GetNodes(validated).Select(c => (RelayServerNodeReportInfo)c).ToList(); + return serializer.Serialize(list); + }); } [MessengerId((ushort)RelayMessengerIds.RelayTest170)] public async Task RelayTest170(IConnection connection) { RelayTestInfo170 info = serializer.Deserialize(connection.ReceiveRequestWrap.Payload.Span); - await RelayTest(connection, info); + await RelayTest(connection, info, (validated) => + { + return serializer.Serialize(relayServerTransfer.GetNodes(validated)); + }); } - private async Task RelayTest(IConnection connection, RelayTestInfo info) + private async Task RelayTest(IConnection connection, RelayTestInfo info, Func data) { if (signCaching.TryGet(connection.Id, out SignCacheInfo cache) == false) { @@ -94,8 +101,7 @@ namespace linker.messenger.relay.messenger TransportName = "test", }, cache, null); - var nodes = relayServerTransfer.GetNodes(string.IsNullOrWhiteSpace(result)); - connection.Write(serializer.Serialize(nodes)); + connection.Write(data(string.IsNullOrWhiteSpace(result))); } /// @@ -106,18 +112,6 @@ namespace linker.messenger.relay.messenger public async Task RelayAsk(IConnection connection) { RelayInfo info = serializer.Deserialize(connection.ReceiveRequestWrap.Payload.Span); - await RelayAsk(connection, info, new List()); - } - [MessengerId((ushort)RelayMessengerIds.RelayAsk170)] - public async Task RelayAsk170(IConnection connection) - { - RelayInfo170 info = serializer.Deserialize(connection.ReceiveRequestWrap.Payload.Span); - - List cdkeys = (await relayServerCdkeyStore.GetAvailable(info.UserId)).Select(c => new RelayServerCdkeyInfo { Bandwidth = c.Bandwidth, CdkeyId = c.CdkeyId, LastBytes = c.LastBytes }).ToList(); - await RelayAsk(connection, info, cdkeys); - } - public async Task RelayAsk(IConnection connection, RelayInfo info, List cdkeys) - { if (signCaching.TryGet(connection.Id, out SignCacheInfo cacheFrom) == false || signCaching.TryGet(info.RemoteMachineId, out SignCacheInfo cacheTo) == false || cacheFrom.GroupId != cacheTo.GroupId) { connection.Write(serializer.Serialize(new RelayAskResultInfo { })); @@ -132,10 +126,39 @@ namespace linker.messenger.relay.messenger RelayAskResultInfo result = new RelayAskResultInfo(); string error = await relayValidatorTransfer.Validate(info, cacheFrom, cacheTo); bool validated = string.IsNullOrWhiteSpace(error); + result.Nodes = relayServerTransfer.GetNodes(validated).Select(c => (RelayServerNodeReportInfo)c).ToList(); + + if (result.Nodes.Count > 0) + { + result.FlowingId = relayServerTransfer.AddRelay(cacheFrom.MachineId, cacheFrom.MachineName, cacheTo.MachineId, cacheTo.MachineName, cacheFrom.GroupId, validated, new List()); + } + + connection.Write(serializer.Serialize(result)); + } + [MessengerId((ushort)RelayMessengerIds.RelayAsk170)] + public async Task RelayAsk170(IConnection connection) + { + RelayInfo170 info = serializer.Deserialize(connection.ReceiveRequestWrap.Payload.Span); + if (signCaching.TryGet(connection.Id, out SignCacheInfo cacheFrom) == false || signCaching.TryGet(info.RemoteMachineId, out SignCacheInfo cacheTo) == false || cacheFrom.GroupId != cacheTo.GroupId) + { + connection.Write(serializer.Serialize(new RelayAskResultInfo170 { })); + return; + } + + info.RemoteMachineId = cacheTo.MachineId; + info.FromMachineId = cacheFrom.MachineId; + info.RemoteMachineName = cacheTo.MachineName; + info.FromMachineName = cacheFrom.MachineName; + + RelayAskResultInfo170 result = new RelayAskResultInfo170(); + string error = await relayValidatorTransfer.Validate(info, cacheFrom, cacheTo); + bool validated = string.IsNullOrWhiteSpace(error); result.Nodes = relayServerTransfer.GetNodes(validated); if (result.Nodes.Count > 0) { + List cdkeys = (await relayServerCdkeyStore.GetAvailable(info.UserId)).Select(c => new RelayServerCdkeyInfo { Bandwidth = c.Bandwidth, CdkeyId = c.CdkeyId, LastBytes = c.LastBytes }).ToList(); + result.FlowingId = relayServerTransfer.AddRelay(cacheFrom.MachineId, cacheFrom.MachineName, cacheTo.MachineId, cacheTo.MachineName, cacheFrom.GroupId, validated, cdkeys); } @@ -240,7 +263,7 @@ namespace linker.messenger.relay.messenger [MessengerId((ushort)RelayMessengerIds.NodeReport)] public void NodeReport(IConnection connection) { - RelayServerNodeReportInfo info = serializer.Deserialize(connection.ReceiveRequestWrap.Payload.Span); + RelayServerNodeReportInfo170 info = serializer.Deserialize(connection.ReceiveRequestWrap.Payload.Span); if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG) { LoggerHelper.Instance.Debug($"relay node report : {info.ToJson()}"); @@ -287,17 +310,26 @@ namespace linker.messenger.relay.messenger /// /// [MessengerId((ushort)RelayMessengerIds.TrafficReport)] - public async Task TrafficReport(IConnection connection) + public void TrafficReport(IConnection connection) { RelayTrafficUpdateInfo info = serializer.Deserialize(connection.ReceiveRequestWrap.Payload.Span); if (relayServerStore.SecretKey != info.SecretKey) { - connection.Write(serializer.Serialize(new Dictionary())); return; } - Dictionary result = await relayServerTransfer.AddTraffic(info); - connection.Write(serializer.Serialize(result)); + relayServerTransfer.AddTraffic(info.Dic); } + /// + /// 下发剩余流量 + /// + /// + [MessengerId((ushort)RelayMessengerIds.SendLastBytes)] + public void SendLastBytes(IConnection connection) + { + Dictionary info = serializer.Deserialize>(connection.ReceiveRequestWrap.Payload.Span); + relayServerNodeTransfer.UpdateLastBytes(info); + } + /// diff --git a/src/linker.messenger.relay/messenger/RelayMessengerIds.cs b/src/linker.messenger.relay/messenger/RelayMessengerIds.cs index 886a4ef9..7cc758ab 100644 --- a/src/linker.messenger.relay/messenger/RelayMessengerIds.cs +++ b/src/linker.messenger.relay/messenger/RelayMessengerIds.cs @@ -35,6 +35,8 @@ RelayAsk170 = 2120, RelayForward170 = 2121, + SendLastBytes = 2122, + Max = 2199 } } diff --git a/src/linker.messenger.relay/server/IRelayServerCdkeyStore.cs b/src/linker.messenger.relay/server/IRelayServerCdkeyStore.cs index f341d481..c495738a 100644 --- a/src/linker.messenger.relay/server/IRelayServerCdkeyStore.cs +++ b/src/linker.messenger.relay/server/IRelayServerCdkeyStore.cs @@ -1,4 +1,7 @@ -namespace linker.messenger.relay.server +using linker.libs; +using System.Net; + +namespace linker.messenger.relay.server { public interface IRelayServerCdkeyStore { @@ -54,6 +57,12 @@ /// public Task Traffic(Dictionary dic); /// + /// 获取剩余流量 + /// + /// + /// + public Task> GetLastBytes(List ids); + /// /// 分页 /// /// @@ -66,7 +75,11 @@ /// /// 加解密密钥 /// +#if DEBUG + public string SecretKey { get; set; } = Helper.GlobalString; +#else public string SecretKey { get; set; } = Guid.NewGuid().ToString().ToUpper(); +#endif } public sealed partial class RelayServerCdkeyPageRequestInfo diff --git a/src/linker.messenger.relay/server/IRelayServerNodeStore.cs b/src/linker.messenger.relay/server/IRelayServerNodeStore.cs index ed0311e3..02c8bf46 100644 --- a/src/linker.messenger.relay/server/IRelayServerNodeStore.cs +++ b/src/linker.messenger.relay/server/IRelayServerNodeStore.cs @@ -103,7 +103,7 @@ namespace linker.messenger.relay.server public string Url { get; set; } = "https://linker-doc.snltty.com"; } - public sealed partial class RelayServerNodeReportInfo + public partial class RelayServerNodeReportInfo { public string Id { get; set; } = string.Empty; public string Name { get; set; } = string.Empty; @@ -124,7 +124,9 @@ namespace linker.messenger.relay.server public IPEndPoint EndPoint { get; set; } public long LastTicks { get; set; } - + } + public sealed partial class RelayServerNodeReportInfo170: RelayServerNodeReportInfo + { public string Url { get; set; } = "https://linker-doc.snltty.com"; [JsonIgnore] @@ -138,4 +140,10 @@ namespace linker.messenger.relay.server public List Nodes { get; set; } = new List(); } + public sealed partial class RelayAskResultInfo170 + { + public ulong FlowingId { get; set; } + + public List Nodes { get; set; } = new List(); + } } diff --git a/src/linker.messenger.relay/server/RelayServerMasterTransfer.cs b/src/linker.messenger.relay/server/RelayServerMasterTransfer.cs index c9ac72eb..77b4093b 100644 --- a/src/linker.messenger.relay/server/RelayServerMasterTransfer.cs +++ b/src/linker.messenger.relay/server/RelayServerMasterTransfer.cs @@ -13,8 +13,9 @@ namespace linker.messenger.relay.server { private ulong relayFlowingId = 0; - private readonly ConcurrentDictionary reports = new ConcurrentDictionary(); + private readonly ConcurrentDictionary reports = new ConcurrentDictionary(); private readonly ConcurrentQueue> trafficQueue = new ConcurrentQueue>(); + private readonly ConcurrentQueue> trafficIdsQueue = new ConcurrentQueue>(); private readonly IRelayServerCaching relayCaching; private readonly ISerializer serializer; @@ -61,7 +62,7 @@ namespace linker.messenger.relay.server /// /// /// - public void SetNodeReport(IConnection connection, RelayServerNodeReportInfo info) + public void SetNodeReport(IConnection connection, RelayServerNodeReportInfo170 info) { try { @@ -93,7 +94,7 @@ namespace linker.messenger.relay.server { if (RelayServerNodeInfo.MASTER_NODE_ID == info.Id) return; - if (reports.TryGetValue(info.Id, out RelayServerNodeReportInfo cache)) + if (reports.TryGetValue(info.Id, out RelayServerNodeReportInfo170 cache)) { await messengerSender.SendOnly(new MessageRequestWrap { @@ -109,7 +110,7 @@ namespace linker.messenger.relay.server /// /// 是否已认证 /// - public List GetNodes(bool validated) + public List GetNodes(bool validated) { var result = reports.Values .Where(c => c.Public || validated) @@ -134,7 +135,7 @@ namespace linker.messenger.relay.server /// public bool NodeValidate(string nodeId) { - return reports.TryGetValue(nodeId, out RelayServerNodeReportInfo relayNodeReportInfo) && relayNodeReportInfo.Public == false; + return reports.TryGetValue(nodeId, out RelayServerNodeReportInfo170 relayNodeReportInfo) && relayNodeReportInfo.Public == false; } /// @@ -142,16 +143,10 @@ namespace linker.messenger.relay.server /// /// /// - public async Task> AddTraffic(RelayTrafficUpdateInfo relayTrafficUpdateInfo) + public void AddTraffic(Dictionary dic) { - if (relayTrafficUpdateInfo.Dic.Count > 0) - trafficQueue.Enqueue(relayTrafficUpdateInfo.Dic); - - if (relayTrafficUpdateInfo.Ids == null || relayTrafficUpdateInfo.Ids.Count == 0) - { - return new Dictionary(); - } - return (await relayServerCdkeyStore.Get(relayTrafficUpdateInfo.Ids)).ToDictionary(c => c.CdkeyId, c => c.LastBytes); + if (dic.Count > 0) + trafficQueue.Enqueue(dic); } private void TrafficTask() { @@ -159,10 +154,50 @@ namespace linker.messenger.relay.server { while (trafficQueue.TryDequeue(out Dictionary dic)) { - await relayServerCdkeyStore.Traffic(dic).ConfigureAwait(false); + try + { + await relayServerCdkeyStore.Traffic(dic).ConfigureAwait(false); + + var ids = dic.Keys.ToList(); + while (ids.Count > 0) + { + var id = ids.Take(100).ToList(); + trafficIdsQueue.Enqueue(id); + ids.RemoveRange(0, id.Count); + } + } + catch (Exception ex) + { + LoggerHelper.Instance.Error(ex); + } } return true; - }, 3000); + }, 500); + TimerHelper.SetIntervalLong(async () => + { + while (trafficIdsQueue.TryDequeue(out List ids)) + { + try + { + Dictionary id2last = await relayServerCdkeyStore.GetLastBytes(ids).ConfigureAwait(false); + if (id2last.Count == 0) continue; + byte[] bytes = serializer.Serialize(id2last); + + await Task.WhenAll(reports.Values.Select(c => messengerSender.SendOnly(new MessageRequestWrap + { + Connection = c.Connection, + MessengerId = (ushort)RelayMessengerIds.SendLastBytes, + Payload = bytes, + })).ToList()); + } + catch (Exception ex) + { + LoggerHelper.Instance.Error(ex); + } + } + return true; + }, 500); + } } } diff --git a/src/linker.messenger.relay/server/RelayServerNodeTransfer.cs b/src/linker.messenger.relay/server/RelayServerNodeTransfer.cs index fa0f5a07..6050a391 100644 --- a/src/linker.messenger.relay/server/RelayServerNodeTransfer.cs +++ b/src/linker.messenger.relay/server/RelayServerNodeTransfer.cs @@ -13,7 +13,7 @@ namespace linker.messenger.relay.server /// public class RelayServerNodeTransfer { - public string Id=>relayServerNodeStore.Node.Id; + public string Id => relayServerNodeStore.Node.Id; private uint connectionNum = 0; private IConnection localConnection; @@ -23,7 +23,6 @@ namespace linker.messenger.relay.server private long lastBytes = 0; private RelaySpeedLimit limitTotal = new RelaySpeedLimit(); private readonly ConcurrentDictionary trafficDict = new ConcurrentDictionary(); - private readonly ConcurrentDictionary cdkeyLastBytes = new ConcurrentDictionary(); private readonly ISerializer serializer; private readonly IRelayServerNodeStore relayServerNodeStore; @@ -169,10 +168,6 @@ namespace linker.messenger.relay.server public void RemoveTrafficCache(RelayTrafficCacheInfo relayCache) { trafficDict.TryRemove(relayCache.Cache.FlowId, out _); - foreach (var item in relayCache.Cache.Cdkey) - { - cdkeyLastBytes.TryRemove(item.CdkeyId, out _); - } } /// /// 消耗流量 @@ -211,23 +206,40 @@ namespace linker.messenger.relay.server RelayServerCdkeyInfo currentCdkey = relayCache.Cache.Cdkey.Where(c => c.LastBytes > 0).OrderByDescending(c => c.Bandwidth).FirstOrDefault(); //有cdkey,且带宽大于节点带宽,就用cdkey的带宽 - if (currentCdkey != null && (currentCdkey.Bandwidth == 0 || currentCdkey.Bandwidth > relayServerNodeStore.Node.MaxBandwidth)) + if (currentCdkey != null && (currentCdkey.Bandwidth == 0 || currentCdkey.Bandwidth >= relayServerNodeStore.Node.MaxBandwidth || relayServerNodeStore.Node.MaxGbTotalLastBytes == 0)) { relayCache.CurrentCdkey = currentCdkey; relayCache.Limit.SetLimit((uint)Math.Ceiling((relayCache.CurrentCdkey.Bandwidth * 1024 * 1024) / 8.0)); return; } - relayCache.CurrentCdkey = null; relayCache.Limit.SetLimit((uint)Math.Ceiling((relayServerNodeStore.Node.MaxBandwidth * 1024 * 1024) / 8.0)); } + /// + /// 更新剩余流量 + /// + /// + public void UpdateLastBytes(Dictionary dic) + { + if (dic.Count == 0) return; + + Dictionary cdkeys = trafficDict.Values.SelectMany(c => c.Cache.Cdkey).ToDictionary(c => c.CdkeyId, c => c); + //更新剩余流量 + foreach (KeyValuePair item in dic) + { + if (cdkeys.TryGetValue(item.Key, out RelayServerCdkeyInfo info)) + { + info.LastBytes = item.Value; + } + } + } private void ResetNodeBytes() { foreach (var cache in trafficDict.Values.Where(c => c.CurrentCdkey == null)) { - long length = cache.Sendt; - Interlocked.Exchange(ref cache.Sendt, 0); + long length = Interlocked.Exchange(ref cache.Sendt, 0); + if (relayServerNodeStore.Node.MaxGbTotalLastBytes >= length) relayServerNodeStore.SetMaxGbTotalLastBytes(relayServerNodeStore.Node.MaxGbTotalLastBytes - length); else relayServerNodeStore.SetMaxGbTotalLastBytes(0); @@ -239,106 +251,56 @@ namespace linker.messenger.relay.server } relayServerNodeStore.Confirm(); } - private void DownloadBytes() + private async Task UploadBytes() { - TimerHelper.Async(async () => - { - List ids = trafficDict.Values.SelectMany(c => c.Cache.Cdkey).Select(c => c.CdkeyId).Distinct().ToList(); - while (ids.Count > 0) - { - //分批更新,可能数量很大 - List id = ids.Take(100).ToList(); - ids.RemoveRange(0, id.Count); + var cdkeys = trafficDict.Values.Where(c => c.CurrentCdkey != null && c.Sendt > 0).ToList(); + Dictionary id2sent = cdkeys.GroupBy(c => c.CurrentCdkey.CdkeyId).ToDictionary(c => c.Key, d => d.Sum(d => { d.SendtCache = d.Sendt; return d.SendtCache; })); + if (id2sent.Count == 0) return; - MessageResponeInfo resp = await messengerSender.SendReply(new MessageRequestWrap - { - Connection = relayServerNodeStore.Node.Id == RelayServerNodeInfo.MASTER_NODE_ID ? localConnection : remoteConnection, - MessengerId = (ushort)RelayMessengerIds.TrafficReport, - Payload = serializer.Serialize(new RelayTrafficUpdateInfo - { - Dic = [], - Ids = id, - SecretKey = relayServerNodeStore.Node.Id == RelayServerNodeInfo.MASTER_NODE_ID + bool result = await messengerSender.SendOnly(new MessageRequestWrap + { + Connection = relayServerNodeStore.Node.Id == RelayServerNodeInfo.MASTER_NODE_ID ? localConnection : remoteConnection, + MessengerId = (ushort)RelayMessengerIds.TrafficReport, + Payload = serializer.Serialize(new RelayTrafficUpdateInfo + { + Dic = id2sent, + SecretKey = relayServerNodeStore.Node.Id == RelayServerNodeInfo.MASTER_NODE_ID ? relayServerMasterStore.Master.SecretKey : relayServerNodeStore.Node.MasterSecretKey - }), - Timeout = 4000 - }); - - if (resp.Code == MessageResponeCodes.OK && resp.Data.Length > 0) - { - Dictionary dic = serializer.Deserialize>(resp.Data.Span); - //更新剩余流量 - foreach (KeyValuePair item in dic) - { - cdkeyLastBytes.AddOrUpdate(item.Key, item.Value, (a, b) => item.Value); - } - //查不到的,归零 - foreach (long item in id.Except(dic.Keys)) - { - cdkeyLastBytes.AddOrUpdate(item, 0, (a, b) => 0); - } - } - } + }), + Timeout = 4000 }); - } - private void UploadBytes() - { - TimerHelper.Async(async () => + + if (result) { - MessageResponeInfo resp = await messengerSender.SendReply(new MessageRequestWrap + //成功报告了流量,就重新计数 + foreach (var cache in cdkeys) { - Connection = relayServerNodeStore.Node.Id == RelayServerNodeInfo.MASTER_NODE_ID ? localConnection : remoteConnection, - MessengerId = (ushort)RelayMessengerIds.TrafficReport, - Payload = serializer.Serialize(new RelayTrafficUpdateInfo - { - Dic = trafficDict.Values.Where(c => c.CurrentCdkey != null && c.Sendt > 0).GroupBy(c => c.CurrentCdkey.CdkeyId).ToDictionary(c => c.Key, d => d.Sum(d => d.Sendt)), - Ids = [], - SecretKey = relayServerNodeStore.Node.Id == RelayServerNodeInfo.MASTER_NODE_ID - ? relayServerMasterStore.Master.SecretKey - : relayServerNodeStore.Node.MasterSecretKey - }), - Timeout = 4000 - }); - - if (resp.Code == MessageResponeCodes.OK) - { - try - { - serializer.Deserialize>(resp.Data.Span); - //成功报告了流量,就重新计数 - foreach (var cache in trafficDict.Values.Where(c => c.CurrentCdkey != null)) - { - Interlocked.Exchange(ref cache.Sendt, 0); - //检查一下是不是需要更新剩余流量 - if (cdkeyLastBytes.TryGetValue(cache.CurrentCdkey.CdkeyId, out long value)) - { - cache.CurrentCdkey.LastBytes = value; - } - //当前cdkey流量用完了,就重新找找新的cdkey - if (cache.CurrentCdkey.LastBytes <= 0) - { - SetLimit(cache); - } - } - } - catch (Exception) + Interlocked.Add(ref cache.Sendt, -cache.SendtCache); + Interlocked.Exchange(ref cache.SendtCache, 0); + //当前cdkey流量用完了,就重新找找新的cdkey + if (cache.CurrentCdkey.LastBytes <= 0) { + SetLimit(cache); } } - }); + } } private void TrafficTask() { - TimerHelper.SetIntervalLong(() => + TimerHelper.SetIntervalLong(async () => { - UploadBytes(); - DownloadBytes(); - - ResetNodeBytes(); - + try + { + ResetNodeBytes(); + await UploadBytes(); + } + catch (Exception ex) + { + LoggerHelper.Instance.Error(ex); + } return true; - }, 5000); + }, 3000); } private void ReportTask() @@ -377,7 +339,7 @@ namespace linker.messenger.relay.server IConnection connection = node.Id == RelayServerNodeInfo.MASTER_NODE_ID ? localConnection : remoteConnection; IPEndPoint endPoint = await NetworkHelper.GetEndPointAsync(node.Host, relayServerNodeStore.ServicePort) ?? new IPEndPoint(IPAddress.Any, relayServerNodeStore.ServicePort); - RelayServerNodeReportInfo relayNodeReportInfo = new RelayServerNodeReportInfo + RelayServerNodeReportInfo170 relayNodeReportInfo = new RelayServerNodeReportInfo170 { Id = node.Id, Name = node.Name, @@ -389,7 +351,8 @@ namespace linker.messenger.relay.server MaxGbTotalLastBytes = node.MaxGbTotalLastBytes, MaxConnection = node.MaxConnection, ConnectionRatio = Math.Round(connectionNum / 2.0), - EndPoint = endPoint + EndPoint = endPoint, + Url = node.Url }; await messengerSender.SendOnly(new MessageRequestWrap @@ -465,10 +428,6 @@ namespace linker.messenger.relay.server /// cdkey id 和 流量 /// public Dictionary Dic { get; set; } - /// - /// 需要知道哪些cdkey的剩余流量 - /// - public List Ids { get; set; } public string SecretKey { get; set; } } diff --git a/src/linker.messenger.relay/server/RelayServerResolver.cs b/src/linker.messenger.relay/server/RelayServerResolver.cs index 1c9c7c35..875578cc 100644 --- a/src/linker.messenger.relay/server/RelayServerResolver.cs +++ b/src/linker.messenger.relay/server/RelayServerResolver.cs @@ -104,9 +104,14 @@ namespace linker.messenger.relay.server } relayWrap.Tcs.SetResult(); - RelaySpeedLimit limit = new RelaySpeedLimit(); - _ = CopyToAsync(relayCache, limit, socket, relayWrap.Socket); - _ = CopyToAsync(relayCache, limit, relayWrap.Socket, socket); + RelayTrafficCacheInfo trafficCacheInfo = new RelayTrafficCacheInfo { Cache = relayCache, Sendt = 0, Limit = new RelaySpeedLimit() }; + relayServerNodeTransfer.AddTrafficCache(trafficCacheInfo); + relayServerNodeTransfer.IncrementConnectionNum(); + _ = Task.WhenAll(CopyToAsync(trafficCacheInfo, socket, relayWrap.Socket), CopyToAsync(trafficCacheInfo, relayWrap.Socket, socket)).ContinueWith((result) => + { + relayServerNodeTransfer.DecrementConnectionNum(); + relayServerNodeTransfer.RemoveTrafficCache(trafficCacheInfo); + }); } break; default: @@ -139,15 +144,11 @@ namespace linker.messenger.relay.server ArrayPool.Shared.Return(buffer); } } - private async Task CopyToAsync(RelayCacheInfo cache, RelaySpeedLimit limit, Socket source, Socket destination) + private async Task CopyToAsync(RelayTrafficCacheInfo trafficCacheInfo, Socket source, Socket destination) { - RelayTrafficCacheInfo trafficCacheInfo = new RelayTrafficCacheInfo { Cache = cache, Sendt = 0, Limit = limit }; byte[] buffer = new byte[8 * 1024]; try { - relayServerNodeTransfer.IncrementConnectionNum(); - relayServerNodeTransfer.AddTrafficCache(trafficCacheInfo); - int bytesRead; while ((bytesRead = await source.ReceiveAsync(buffer.AsMemory(), SocketFlags.None).ConfigureAwait(false)) != 0) { @@ -170,19 +171,19 @@ namespace linker.messenger.relay.server } } //单个速度 - if (limit.NeedLimit()) + if (trafficCacheInfo.Limit.NeedLimit()) { int length = bytesRead; - limit.TryLimit(ref length); + trafficCacheInfo.Limit.TryLimit(ref length); while (length > 0) { await Task.Delay(30).ConfigureAwait(false); - limit.TryLimit(ref length); + trafficCacheInfo.Limit.TryLimit(ref length); } } - AddReceive(cache.FromId, cache.FromName, cache.ToName, cache.GroupId, bytesRead); - AddSendt(cache.FromId, cache.FromName, cache.ToName, cache.GroupId, bytesRead); + AddReceive(trafficCacheInfo.Cache.FromId, trafficCacheInfo.Cache.FromName, trafficCacheInfo.Cache.ToName, trafficCacheInfo.Cache.GroupId, bytesRead); + AddSendt(trafficCacheInfo.Cache.FromId, trafficCacheInfo.Cache.FromName, trafficCacheInfo.Cache.ToName, trafficCacheInfo.Cache.GroupId, bytesRead); await destination.SendAsync(buffer.AsMemory(0, bytesRead), SocketFlags.None).ConfigureAwait(false); } } @@ -191,8 +192,6 @@ namespace linker.messenger.relay.server } finally { - relayServerNodeTransfer.DecrementConnectionNum(); - relayServerNodeTransfer.RemoveTrafficCache(trafficCacheInfo); source.SafeClose(); destination.SafeClose(); } @@ -264,6 +263,7 @@ namespace linker.messenger.relay.server public sealed class RelayTrafficCacheInfo { public long Sendt; + public long SendtCache; public RelaySpeedLimit Limit { get; set; } public RelayCacheInfo Cache { get; set; } public RelayServerCdkeyInfo CurrentCdkey { get; set; } diff --git a/src/linker.messenger.serializer.memorypack/Entry.cs b/src/linker.messenger.serializer.memorypack/Entry.cs index 383575ba..da5e1037 100644 --- a/src/linker.messenger.serializer.memorypack/Entry.cs +++ b/src/linker.messenger.serializer.memorypack/Entry.cs @@ -55,7 +55,9 @@ namespace linker.messenger.serializer.memorypack MemoryPackFormatterProvider.Register(new RelayInfo170Formatter()); MemoryPackFormatterProvider.Register(new RelayServerNodeUpdateInfoFormatter()); MemoryPackFormatterProvider.Register(new RelayServerNodeReportInfoFormatter()); + MemoryPackFormatterProvider.Register(new RelayServerNodeReportInfo170Formatter()); MemoryPackFormatterProvider.Register(new RelayAskResultInfoFormatter()); + MemoryPackFormatterProvider.Register(new RelayAskResultInfo170Formatter()); MemoryPackFormatterProvider.Register(new RelayCacheInfoFormatter()); MemoryPackFormatterProvider.Register(new RelayMessageInfoFormatter()); MemoryPackFormatterProvider.Register(new RelayServerCdkeyInfoFormatter()); diff --git a/src/linker.messenger.serializer.memorypack/RelaySerializer.cs b/src/linker.messenger.serializer.memorypack/RelaySerializer.cs index c6d2e8b5..8b210bae 100644 --- a/src/linker.messenger.serializer.memorypack/RelaySerializer.cs +++ b/src/linker.messenger.serializer.memorypack/RelaySerializer.cs @@ -456,8 +456,6 @@ namespace linker.messenger.serializer.memorypack IPEndPoint EndPoint => info.EndPoint; [MemoryPackInclude] long LastTicks => info.LastTicks; - [MemoryPackInclude] - string Url => info.Url; [MemoryPackConstructor] @@ -467,7 +465,7 @@ namespace linker.messenger.serializer.memorypack double maxGbTotal, long maxGbTotalLastBytes, double connectionRatio, double bandwidthRatio, bool Public, int delay, - IPEndPoint endPoint, long lastTicks, string url) + IPEndPoint endPoint, long lastTicks) { var info = new RelayServerNodeReportInfo { @@ -483,8 +481,7 @@ namespace linker.messenger.serializer.memorypack MaxGbTotal = maxGbTotal, MaxGbTotalLastBytes = maxGbTotalLastBytes, Name = name, - Public = Public, - Url = url + Public = Public }; this.info = info; } @@ -521,7 +518,102 @@ namespace linker.messenger.serializer.memorypack } } + [MemoryPackable] + public readonly partial struct SerializableRelayServerNodeReportInfo170 + { + [MemoryPackIgnore] + public readonly RelayServerNodeReportInfo170 info; + [MemoryPackInclude] + string Id => info.Id; + [MemoryPackInclude] + string Name => info.Name; + [MemoryPackInclude] + int MaxConnection => info.MaxConnection; + [MemoryPackInclude] + double MaxBandwidth => info.MaxBandwidth; + [MemoryPackInclude] + double MaxBandwidthTotal => info.MaxBandwidthTotal; + [MemoryPackInclude] + double MaxGbTotal => info.MaxGbTotal; + [MemoryPackInclude] + long MaxGbTotalLastBytes => info.MaxGbTotalLastBytes; + [MemoryPackInclude] + double ConnectionRatio => info.ConnectionRatio; + [MemoryPackInclude] + double BandwidthRatio => info.BandwidthRatio; + [MemoryPackInclude] + bool Public => info.Public; + [MemoryPackInclude] + int Delay => info.Delay; + [MemoryPackInclude, MemoryPackAllowSerialize] + IPEndPoint EndPoint => info.EndPoint; + [MemoryPackInclude] + long LastTicks => info.LastTicks; + [MemoryPackInclude] + string Url => info.Url; + + + [MemoryPackConstructor] + SerializableRelayServerNodeReportInfo170( + string id, string name, + int maxConnection, double maxBandwidth, double maxBandwidthTotal, + double maxGbTotal, long maxGbTotalLastBytes, + double connectionRatio, double bandwidthRatio, + bool Public, int delay, + IPEndPoint endPoint, long lastTicks, string url) + { + var info = new RelayServerNodeReportInfo170 + { + BandwidthRatio = bandwidthRatio, + ConnectionRatio = connectionRatio, + Delay = delay, + EndPoint = endPoint, + Id = id, + LastTicks = lastTicks, + MaxBandwidth = maxBandwidth, + MaxBandwidthTotal = maxBandwidthTotal, + MaxConnection = maxConnection, + MaxGbTotal = maxGbTotal, + MaxGbTotalLastBytes = maxGbTotalLastBytes, + Name = name, + Public = Public, + Url = url + }; + this.info = info; + } + + public SerializableRelayServerNodeReportInfo170(RelayServerNodeReportInfo170 info) + { + this.info = info; + } + } + public class RelayServerNodeReportInfo170Formatter : MemoryPackFormatter + { + public override void Serialize(ref MemoryPackWriter writer, scoped ref RelayServerNodeReportInfo170 value) + { + if (value == null) + { + writer.WriteNullObjectHeader(); + return; + } + + writer.WritePackable(new SerializableRelayServerNodeReportInfo170(value)); + } + + public override void Deserialize(ref MemoryPackReader reader, scoped ref RelayServerNodeReportInfo170 value) + { + if (reader.PeekIsNull()) + { + reader.Advance(1); // skip null block + value = null; + return; + } + + var wrapped = reader.ReadPackable(); + value = wrapped.info; + } + } [MemoryPackable] @@ -576,6 +668,57 @@ namespace linker.messenger.serializer.memorypack + [MemoryPackable] + public readonly partial struct SerializableRelayAskResultInfo170 + { + [MemoryPackIgnore] + public readonly RelayAskResultInfo170 info; + + [MemoryPackInclude] + ulong FlowingId => info.FlowingId; + [MemoryPackInclude] + List Nodes => info.Nodes; + + [MemoryPackConstructor] + SerializableRelayAskResultInfo170(ulong flowingId, List nodes) + { + var info = new RelayAskResultInfo170 { FlowingId = flowingId, Nodes = nodes }; + this.info = info; + } + + public SerializableRelayAskResultInfo170(RelayAskResultInfo170 info) + { + this.info = info; + } + } + public class RelayAskResultInfo170Formatter : MemoryPackFormatter + { + public override void Serialize(ref MemoryPackWriter writer, scoped ref RelayAskResultInfo170 value) + { + if (value == null) + { + writer.WriteNullObjectHeader(); + return; + } + + writer.WritePackable(new SerializableRelayAskResultInfo170(value)); + } + + public override void Deserialize(ref MemoryPackReader reader, scoped ref RelayAskResultInfo170 value) + { + if (reader.PeekIsNull()) + { + reader.Advance(1); // skip null block + value = null; + return; + } + + var wrapped = reader.ReadPackable(); + value = wrapped.info; + } + } + + [MemoryPackable] public readonly partial struct SerializableRelayCacheInfo @@ -1150,17 +1293,14 @@ namespace linker.messenger.serializer.memorypack [MemoryPackInclude] Dictionary Dic => info.Dic; [MemoryPackInclude] - List Ids => info.Ids; - [MemoryPackInclude] string SecretKey => info.SecretKey; [MemoryPackConstructor] - SerializableRelayTrafficUpdateInfo(Dictionary dic, List ids, string secretKey) + SerializableRelayTrafficUpdateInfo(Dictionary dic, string secretKey) { var info = new RelayTrafficUpdateInfo { Dic = dic, - Ids = ids, SecretKey = secretKey }; this.info = info; diff --git a/src/linker.messenger.store.file/relay/RelayServerCdkeyStore.cs b/src/linker.messenger.store.file/relay/RelayServerCdkeyStore.cs index 12f46f96..dd249668 100644 --- a/src/linker.messenger.store.file/relay/RelayServerCdkeyStore.cs +++ b/src/linker.messenger.store.file/relay/RelayServerCdkeyStore.cs @@ -168,10 +168,14 @@ namespace linker.messenger.store.file.relay } return await Task.FromResult(true); } + public async Task> GetLastBytes(List ids) + { + return await Task.FromResult(liteCollection.Find(c => ids.Contains(c.CdkeyId)).ToDictionary(c => c.CdkeyId, c => c.LastBytes)); + } public async Task> GetAvailable(string userid) { - return await Task.FromResult(liteCollection.Find(x => x.UserId == userid && x.LastBytes > 0 && x.StartTime <= DateTime.Now && x.EndTime < DateTime.Now && x.Deleted == false).ToList()); + return await Task.FromResult(liteCollection.Find(x => x.UserId == userid && x.LastBytes > 0 && x.StartTime <= DateTime.Now && x.EndTime >= DateTime.Now && x.Deleted == false).ToList()); } public async Task> Get(List ids) { @@ -184,11 +188,11 @@ namespace linker.messenger.store.file.relay if (info.Flag.HasFlag(RelayServerCdkeyPageRequestFlag.TimeIn)) { - query = query.Where(x => x.EndTime > DateTime.Now); + query = query.Where(x => x.StartTime <= DateTime.Now && x.EndTime >= DateTime.Now); } if (info.Flag.HasFlag(RelayServerCdkeyPageRequestFlag.TimeOut)) { - query = query.Where(x => x.EndTime < DateTime.Now); + query = query.Where(x =>x.StartTime > DateTime.Now || x.EndTime < DateTime.Now); } if (info.Flag.HasFlag(RelayServerCdkeyPageRequestFlag.BytesIn)) { diff --git a/src/linker.web/public/archlinux.svg b/src/linker.web/public/archlinux.svg new file mode 100644 index 00000000..15f91d14 --- /dev/null +++ b/src/linker.web/public/archlinux.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/src/linker.web/src/views/full/devices/tunnel.js b/src/linker.web/src/views/full/devices/tunnel.js index d5fac7de..569626c2 100644 --- a/src/linker.web/src/views/full/devices/tunnel.js +++ b/src/linker.web/src/views/full/devices/tunnel.js @@ -1,6 +1,5 @@ import { getTunnelInfo, refreshTunnel } from "@/apis/tunnel"; import { injectGlobalData } from "@/provide"; -import { ElMessage } from "element-plus"; import { inject, provide, ref } from "vue"; const tunnelSymbol = Symbol(); diff --git a/src/linker.web/src/views/full/devices/tuntap.js b/src/linker.web/src/views/full/devices/tuntap.js index 3732a34a..1561bf75 100644 --- a/src/linker.web/src/views/full/devices/tuntap.js +++ b/src/linker.web/src/views/full/devices/tuntap.js @@ -16,7 +16,7 @@ export const provideTuntap = () => { provide(tuntapSymbol, tuntap); const systems = { - linux: ['debian', 'ubuntu', 'alpine', 'rocky', 'centos', 'fedora'], + linux: ['debian', 'ubuntu', 'alpine', 'rocky', 'centos', 'fedora', 'archlinux'], openwrt: ['openwrt'], ubuntu: ['ubuntu'], windows: ['windows'], diff --git a/src/linker.web/src/views/full/server/relayCdkey/Add.vue b/src/linker.web/src/views/full/server/relayCdkey/Add.vue index d1cdbb03..2087a710 100644 --- a/src/linker.web/src/views/full/server/relayCdkey/Add.vue +++ b/src/linker.web/src/views/full/server/relayCdkey/Add.vue @@ -3,7 +3,7 @@
- + Mbps diff --git a/version.txt b/version.txt index 1982bff2..ddde4f5e 100644 --- a/version.txt +++ b/version.txt @@ -1,5 +1,5 @@ v1.6.9 -2025-03-09 18:12:09 +2025-03-10 17:55:18 1. 优化linux下路由跟踪问题 2. 优化linux下获取本机IP问题 3. 增加ICS,让win7+、win server2008+支持NAT