diff --git a/src/linker.doc.web/docs/3、打洞和中继/3.2、中继.md b/src/linker.doc.web/docs/3、打洞和中继/3.2、中继.md index f5ae0f70..f2bd1401 100644 --- a/src/linker.doc.web/docs/3、打洞和中继/3.2、中继.md +++ b/src/linker.doc.web/docs/3、打洞和中继/3.2、中继.md @@ -20,7 +20,8 @@ sidebar_position: 2 :::tip[说明] 1. 如果你有多个服务器,希望将这些服务器作为一个中继节点 -2. 在服务端 `configs/server.json` 中(`Relay->Distributed`)下配置 +2. 在主服务器外的其它服务器部署一个服务端 +3. 然后 `configs/server.json` 中(`Relay->Distributed`)下修改配置 ```json "Distributed": { diff --git a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/1.1.3、ICS.md b/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/1.1.2、ICS.md similarity index 95% rename from src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/1.1.3、ICS.md rename to src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/1.1.2、ICS.md index 68109886..59b084a1 100644 --- a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/1.1.3、ICS.md +++ b/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/1.1.2、ICS.md @@ -2,7 +2,7 @@ sidebar_position: 4 --- -# 1.1.3、ICS +# 1.1.2、ICS :::tip[说明] 1. 如果系统没有netframework4.6.2,就下载安装一下 diff --git a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/1.1、点对网.md b/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/1.1、点对网.md index c51bc304..38675977 100644 --- a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/1.1、点对网.md +++ b/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/1.1、点对网.md @@ -17,7 +17,7 @@ sidebar_position: 2 1. linux,已经自动添加NAT转发(在`OpenWrt`,需要在`防火墙 - 区域设置`中将`转发`设置为`接受`) 2. windows,暂时找到两种NAT方式 1. NetNat,请参照 1.1.1、NetNat - 2. RRAS,请参照1.1.2、RRAS(Routing and Remote Access Service) + 2. ICS,请参照1.1.2、ICS(Internet Connection Sharing) 3. macos,需要你自己在**被访问端**添加NAT转发 ``` # 开启ip转发 diff --git a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras1.png b/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras1.png deleted file mode 100644 index 165b519c..00000000 Binary files a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras1.png and /dev/null differ diff --git a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras10.png b/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras10.png deleted file mode 100644 index e9266778..00000000 Binary files a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras10.png and /dev/null differ diff --git a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras11.png b/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras11.png deleted file mode 100644 index 411379ea..00000000 Binary files a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras11.png and /dev/null differ diff --git a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras12.png b/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras12.png deleted file mode 100644 index 97c63aec..00000000 Binary files a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras12.png and /dev/null differ diff --git a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras13.png b/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras13.png deleted file mode 100644 index 234fa8f2..00000000 Binary files a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras13.png and /dev/null differ diff --git a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras2.png b/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras2.png deleted file mode 100644 index a049f872..00000000 Binary files a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras2.png and /dev/null differ diff --git a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras3.png b/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras3.png deleted file mode 100644 index a8465bcb..00000000 Binary files a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras3.png and /dev/null differ diff --git a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras4.png b/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras4.png deleted file mode 100644 index c8e851bc..00000000 Binary files a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras4.png and /dev/null differ diff --git a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras5.png b/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras5.png deleted file mode 100644 index 66ec0e91..00000000 Binary files a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras5.png and /dev/null differ diff --git a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras6.png b/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras6.png deleted file mode 100644 index ea90ef5c..00000000 Binary files a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras6.png and /dev/null differ diff --git a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras7.png b/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras7.png deleted file mode 100644 index b37fffeb..00000000 Binary files a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras7.png and /dev/null differ diff --git a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras8.png b/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras8.png deleted file mode 100644 index b38c5f61..00000000 Binary files a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras8.png and /dev/null differ diff --git a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras9.png b/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras9.png deleted file mode 100644 index 54bd0e35..00000000 Binary files a/src/linker.doc.web/docs/4、通信功能/4.1、虚拟网卡/img/rras9.png and /dev/null differ diff --git a/src/linker.messenger.flow/RelayFlow.cs b/src/linker.messenger.flow/RelayFlow.cs index aba5b760..09d3102b 100644 --- a/src/linker.messenger.flow/RelayFlow.cs +++ b/src/linker.messenger.flow/RelayFlow.cs @@ -27,7 +27,7 @@ namespace linker.messenger.flow public sealed class RelayReportResolverFlow : RelayServerReportResolver { private readonly RelayReportFlow relayReportFlow; - public RelayReportResolverFlow(RelayReportFlow relayReportFlow, RelayServerMasterTransfer relayServerTransfer) : base(relayServerTransfer) + public RelayReportResolverFlow(RelayReportFlow relayReportFlow, RelayServerMasterTransfer relayServerTransfer,IRelayServerMasterStore relayServerMasterStore,IMessengerResolver messengerResolver) : base(relayServerTransfer, relayServerMasterStore, messengerResolver) { this.relayReportFlow = relayReportFlow; } diff --git a/src/linker.messenger.relay/client/RelayClientTestTransfer.cs b/src/linker.messenger.relay/client/RelayClientTestTransfer.cs index 1148a7c1..3c1f2971 100644 --- a/src/linker.messenger.relay/client/RelayClientTestTransfer.cs +++ b/src/linker.messenger.relay/client/RelayClientTestTransfer.cs @@ -46,7 +46,8 @@ namespace linker.messenger.relay Nodes = await transport.RelayTestAsync(new RelayTestInfo { MachineId = signInClientStore.Id, - SecretKey = relayClientStore.Server.SecretKey + SecretKey = relayClientStore.Server.SecretKey, + UserId = signInClientStore.Server.UserId }); var tasks = Nodes.Select(async (c) => { diff --git a/src/linker.messenger.relay/client/transport/IRelayClientTransport.cs b/src/linker.messenger.relay/client/transport/IRelayClientTransport.cs index 3d71cdde..8e0872a4 100644 --- a/src/linker.messenger.relay/client/transport/IRelayClientTransport.cs +++ b/src/linker.messenger.relay/client/transport/IRelayClientTransport.cs @@ -58,6 +58,7 @@ namespace linker.messenger.relay.client.transport public string MachineId { get; set; } public string SecretKey { get; set; } public IPEndPoint Server { get; set; } + public string UserId { get; set; } } /// diff --git a/src/linker.messenger.relay/client/transport/RelayClientTransportSelfHost.cs b/src/linker.messenger.relay/client/transport/RelayClientTransportSelfHost.cs index 1a1d6e16..c64a81c4 100644 --- a/src/linker.messenger.relay/client/transport/RelayClientTransportSelfHost.cs +++ b/src/linker.messenger.relay/client/transport/RelayClientTransportSelfHost.cs @@ -28,6 +28,7 @@ namespace linker.messenger.relay.client.transport private readonly IRelayClientStore relayClientStore; private readonly SignInClientState signInClientState; private readonly IMessengerStore messengerStore; + public RelayClientTransportSelfHost(IMessengerSender messengerSender, ISerializer serializer, IRelayClientStore relayClientStore, SignInClientState signInClientState, IMessengerStore messengerStore) { this.messengerSender = messengerSender; diff --git a/src/linker.messenger.relay/messenger/RelayMessenger.cs b/src/linker.messenger.relay/messenger/RelayMessenger.cs index 167f5707..95963189 100644 --- a/src/linker.messenger.relay/messenger/RelayMessenger.cs +++ b/src/linker.messenger.relay/messenger/RelayMessenger.cs @@ -81,7 +81,7 @@ namespace linker.messenger.relay.messenger TransportName = "test", }, cache, null); - var nodes = relayServerTransfer.GetNodes(string.IsNullOrWhiteSpace(result)); + var nodes = relayServerTransfer.GetNodes(string.IsNullOrWhiteSpace(result), info.UserId); connection.Write(serializer.Serialize(nodes)); } @@ -107,13 +107,14 @@ namespace linker.messenger.relay.messenger RelayAskResultInfo result = new RelayAskResultInfo(); string error = await relayValidatorTransfer.Validate(info, cacheFrom, cacheTo); - result.Nodes = relayServerTransfer.GetNodes(string.IsNullOrWhiteSpace(error)); + bool validated = string.IsNullOrWhiteSpace(error); + result.Nodes = relayServerTransfer.GetNodes(validated); List cdkeys = await relayServerCdkeyStore.Get(info.UserId); if (result.Nodes.Count > 0) { - result.FlowingId = relayServerTransfer.AddRelay(cacheFrom.MachineId, cacheFrom.MachineName, cacheTo.MachineId, cacheTo.MachineName, cacheFrom.GroupId, cdkeys); + result.FlowingId = relayServerTransfer.AddRelay(cacheFrom.MachineId, cacheFrom.MachineName, cacheTo.MachineId, cacheTo.MachineName, cacheFrom.GroupId, validated, cdkeys); } connection.Write(serializer.Serialize(result)); @@ -257,5 +258,42 @@ namespace linker.messenger.relay.messenger connection.Write(serializer.Serialize(page)); } + + + /// + /// 获取缓存 + /// + /// + /// + [MessengerId((ushort)RelayMessengerIds.NodeGetCache)] + public async Task NodeGetCache(IConnection connection) + { + + } + /// + /// 获取缓存 + /// + /// + /// + [MessengerId((ushort)RelayMessengerIds.NodeReport)] + public async Task NodeReport(IConnection connection) + { + + } + /// + /// 消耗流量报告 + /// + /// + /// + [MessengerId((ushort)RelayMessengerIds.TrafficReport)] + public async Task TrafficReport(IConnection connection) + { + RelayTrafficReportInfo info = serializer.Deserialize(connection.ReceiveRequestWrap.Payload.Span); + if (relayServerStore.SecretKey != info.SecretKey ) + { + connection.Write(serializer.Serialize(new Dictionary())); + return; + } + } } } diff --git a/src/linker.messenger.relay/messenger/RelayMessengerIds.cs b/src/linker.messenger.relay/messenger/RelayMessengerIds.cs index 0fe78b01..1ff00617 100644 --- a/src/linker.messenger.relay/messenger/RelayMessengerIds.cs +++ b/src/linker.messenger.relay/messenger/RelayMessengerIds.cs @@ -20,6 +20,11 @@ DelCdkey = 2110, AccessCdkey = 2111, + + NodeGetCache = 2112, + NodeReport = 2113, + TrafficReport = 2114, + Max = 2199 } } diff --git a/src/linker.messenger.relay/server/IRelayServerNodeStore.cs b/src/linker.messenger.relay/server/IRelayServerNodeStore.cs index 6b6fe5cd..7890e39e 100644 --- a/src/linker.messenger.relay/server/IRelayServerNodeStore.cs +++ b/src/linker.messenger.relay/server/IRelayServerNodeStore.cs @@ -79,6 +79,8 @@ namespace linker.messenger.relay.server #else public string MasterSecretKey { get; set; } = string.Empty; #endif + + public List UserIds { get; set; } = new List(); } public sealed partial class RelayServerNodeReportInfo @@ -102,6 +104,8 @@ namespace linker.messenger.relay.server public IPEndPoint EndPoint { get; set; } public long LastTicks { get; set; } + + public List UserIds { get; set; } = new List(); } diff --git a/src/linker.messenger.relay/server/RelayServerMasterTransfer.cs b/src/linker.messenger.relay/server/RelayServerMasterTransfer.cs index cc763c02..c1494754 100644 --- a/src/linker.messenger.relay/server/RelayServerMasterTransfer.cs +++ b/src/linker.messenger.relay/server/RelayServerMasterTransfer.cs @@ -12,7 +12,6 @@ namespace linker.messenger.relay.server { private ulong relayFlowingId = 0; - private readonly ICrypto crypto; private readonly ConcurrentDictionary reports = new ConcurrentDictionary(); @@ -22,11 +21,10 @@ namespace linker.messenger.relay.server { this.relayCaching = relayCaching; this.serializer = serializer; - crypto = CryptoFactory.CreateSymmetric(relayServerMasterStore.Master.SecretKey); } - public ulong AddRelay(string fromid, string fromName, string toid, string toName, string groupid, List cdkeys) + public ulong AddRelay(string fromid, string fromName, string toid, string toName, string groupid, bool validated, List cdkeys) { ulong flowingId = Interlocked.Increment(ref relayFlowingId); @@ -38,6 +36,7 @@ namespace linker.messenger.relay.server ToId = toid, ToName = toName, GroupId = groupid, + Validated = validated, Cdkey = cdkeys }; bool added = relayCaching.TryAdd($"{fromid}->{toid}->{flowingId}", cache, 15000); @@ -46,39 +45,29 @@ namespace linker.messenger.relay.server return flowingId; } - public Memory TryGetRelayCache(string key) + public bool TryGetRelayCache(string key, out RelayCacheInfo value) { - if (relayCaching.TryGetValue(key, out RelayCacheInfo value)) - { - byte[] bytes = crypto.Encode(serializer.Serialize(value)); - return bytes; - } - return Helper.EmptyArray; + return relayCaching.TryGetValue(key, out value); } - /// /// 设置节点 /// /// /// - public void SetNodeReport(IPEndPoint ep, Memory data) + public void SetNodeReport(IConnection connection, RelayServerNodeReportInfo info) { try { - if (crypto == null) return; - data = crypto.Decode(data.ToArray()); - RelayServerNodeReportInfo relayNodeReportInfo = serializer.Deserialize(data.Span); - - if (relayNodeReportInfo.Id == RelayServerNodeInfo.MASTER_NODE_ID) + if (info.Id == RelayServerNodeInfo.MASTER_NODE_ID) { - relayNodeReportInfo.EndPoint = new IPEndPoint(IPAddress.Any, 0); + info.EndPoint = new IPEndPoint(IPAddress.Any, 0); } - else if (relayNodeReportInfo.EndPoint.Address.Equals(IPAddress.Any)) + else if (info.EndPoint.Address.Equals(IPAddress.Any)) { - relayNodeReportInfo.EndPoint.Address = ep.Address; + info.EndPoint.Address = connection.Address.Address; } - relayNodeReportInfo.LastTicks = Environment.TickCount64; - reports.AddOrUpdate(relayNodeReportInfo.Id, relayNodeReportInfo, (a, b) => relayNodeReportInfo); + info.LastTicks = Environment.TickCount64; + reports.AddOrUpdate(info.Id, info, (a, b) => info); } catch (Exception ex) { @@ -93,10 +82,10 @@ namespace linker.messenger.relay.server /// /// 是否已认证 /// - public List GetNodes(bool validated) + public List GetNodes(bool validated, string userid) { var result = reports.Values - .Where(c => c.Public || (c.Public == false && validated)) + .Where(c => c.Public || (c.Public == false && validated) || c.UserIds.Contains(userid)) .Where(c => Environment.TickCount64 - c.LastTicks < 15000) .Where(c => c.ConnectionRatio < 100 && (c.MaxGbTotal == 0 || (c.MaxGbTotal > 0 && c.MaxGbTotalLastBytes > 0))) .OrderByDescending(c => c.LastTicks); diff --git a/src/linker.messenger.relay/server/RelayServerNodeTransfer.cs b/src/linker.messenger.relay/server/RelayServerNodeTransfer.cs index 19f5855a..3c448489 100644 --- a/src/linker.messenger.relay/server/RelayServerNodeTransfer.cs +++ b/src/linker.messenger.relay/server/RelayServerNodeTransfer.cs @@ -1,8 +1,11 @@ using linker.libs; using linker.libs.extends; +using linker.messenger.relay.messenger; using System.Buffers; +using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; +using System.Runtime.CompilerServices; namespace linker.messenger.relay.server { @@ -12,62 +15,59 @@ namespace linker.messenger.relay.server public class RelayServerNodeTransfer { private uint connectionNum = 0; - - private readonly ICrypto cryptoNode; - private readonly ICrypto cryptoMaster; + private IConnection localConnection; + private IConnection remoteConnection; private ulong bytes = 0; private ulong lastBytes = 0; RelaySpeedLimit limitTotal = new RelaySpeedLimit(); + private readonly ConcurrentDictionary trafficDict = new ConcurrentDictionary(); + private readonly ConcurrentDictionary cdkeyLastBytes = new ConcurrentDictionary(); private readonly ISerializer serializer; private readonly IRelayServerNodeStore relayServerNodeStore; private readonly IRelayServerMasterStore relayServerMasterStore; - public RelayServerNodeTransfer(ISerializer serializer, IRelayServerNodeStore relayServerNodeStore, IRelayServerMasterStore relayServerMasterStore) + private readonly IMessengerResolver messengerResolver; + private readonly IMessengerSender messengerSender; + + public RelayServerNodeTransfer(ISerializer serializer, IRelayServerNodeStore relayServerNodeStore, IRelayServerMasterStore relayServerMasterStore, IMessengerResolver messengerResolver, IMessengerSender messengerSender) { this.serializer = serializer; this.relayServerNodeStore = relayServerNodeStore; this.relayServerMasterStore = relayServerMasterStore; + this.messengerResolver = messengerResolver; + this.messengerSender = messengerSender; limitTotal.SetLimit((uint)Math.Ceiling((relayServerNodeStore.Node.MaxBandwidthTotal * 1024 * 1024) / 8.0)); - cryptoNode = CryptoFactory.CreateSymmetric(relayServerNodeStore.Node.MasterSecretKey); - cryptoMaster = CryptoFactory.CreateSymmetric(relayServerMasterStore.Master.SecretKey); + TrafficTask(); ReportTask(); + SignInTask(); + } public async ValueTask TryGetRelayCache(string key, string nodeid) { - byte[] buffer = ArrayPool.Shared.Rent(2 * 1024); try { - IPEndPoint server = nodeid == RelayServerNodeInfo.MASTER_NODE_ID - ? new IPEndPoint(IPAddress.Loopback, relayServerNodeStore.ServicePort) - : await NetworkHelper.GetEndPointAsync(relayServerNodeStore.Node.MasterHost, 1802); - ICrypto crypto = nodeid == RelayServerNodeInfo.MASTER_NODE_ID ? cryptoMaster : cryptoNode; + IConnection connection = nodeid == RelayServerNodeInfo.MASTER_NODE_ID ? localConnection : remoteConnection; - Socket socket = new Socket(server.AddressFamily, SocketType.Stream, ProtocolType.Tcp); - long start = Environment.TickCount64; - await socket.ConnectAsync(server).ConfigureAwait(false); - long time = Environment.TickCount64 - start; - - await socket.SendAsync(new byte[] { (byte)ResolverType.RelayReport }); - await socket.SendAsync(key.ToBytes()); - int length = await socket.ReceiveAsync(buffer.AsMemory(), SocketFlags.None).AsTask().WaitAsync(TimeSpan.FromMilliseconds(Math.Max(time * 2, 5000))).ConfigureAwait(false); - socket.SafeClose(); - - RelayCacheInfo result = serializer.Deserialize(crypto.Decode(buffer.AsMemory(0, length).ToArray()).Span); - return result; + MessageResponeInfo resp = await messengerSender.SendReply(new MessageRequestWrap + { + Connection = connection, + MessengerId = (ushort)RelayMessengerIds.NodeGetCache, + Payload = serializer.Serialize(key) + }); + if (resp.Code == MessageResponeCodes.OK && resp.Data.Length > 0) + { + return serializer.Deserialize(resp.Data.Span); + } } catch (Exception ex) { if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG) LoggerHelper.Instance.Error($"{ex}"); } - finally - { - ArrayPool.Shared.Return(buffer); - } return null; } @@ -75,9 +75,40 @@ namespace linker.messenger.relay.server /// 无效请求 /// /// - public bool Validate() + public bool Validate(RelayCacheInfo relayCache) { - return ValidateConnection() && ValidateBytes(); + //已认证的没有流量限制 + if (relayCache.Validated) return true; + //流量卡有的,就能继续用 + if (relayCache.Cdkey.Any(c => c.LastBytes > 0)) return true; + + return ValidateConnection(relayCache) && ValidateBytes(relayCache); + } + /// + /// 连接数是否够 + /// + /// + private bool ValidateConnection(RelayCacheInfo relayCache) + { + bool res = relayServerNodeStore.Node.MaxConnection == 0 || relayServerNodeStore.Node.MaxConnection * 2 > connectionNum; + if (res == false && LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG) + LoggerHelper.Instance.Debug($"relay ValidateConnection false,{connectionNum}/{relayServerNodeStore.Node.MaxConnection * 2}"); + + return res; + } + /// + /// 流量是否够 + /// + /// + private bool ValidateBytes(RelayCacheInfo relayCache) + { + bool res = relayServerNodeStore.Node.MaxGbTotal == 0 + || (relayServerNodeStore.Node.MaxGbTotal > 0 && relayServerNodeStore.Node.MaxGbTotalLastBytes > 0); + + if (res == false && LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG) + LoggerHelper.Instance.Debug($"relay ValidateBytes false,{relayServerNodeStore.Node.MaxGbTotalLastBytes}bytes/{relayServerNodeStore.Node.MaxGbTotal}gb"); + + return res; } /// @@ -94,66 +125,14 @@ namespace linker.messenger.relay.server { Interlocked.Decrement(ref connectionNum); } - /// - /// 连接数是否够 - /// - /// - public bool ValidateConnection() - { - bool res = relayServerNodeStore.Node.MaxConnection == 0 || relayServerNodeStore.Node.MaxConnection * 2 > connectionNum; - if (res == false && LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG) - LoggerHelper.Instance.Debug($"relay ValidateConnection false,{connectionNum}/{relayServerNodeStore.Node.MaxConnection * 2}"); - return res; - } - - /// - /// 流量是否够 - /// - /// - public bool ValidateBytes() - { - bool res = relayServerNodeStore.Node.MaxGbTotal == 0 - || (relayServerNodeStore.Node.MaxGbTotal > 0 && relayServerNodeStore.Node.MaxGbTotalLastBytes > 0); - - if (res == false && LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG) - LoggerHelper.Instance.Debug($"relay ValidateBytes false,{relayServerNodeStore.Node.MaxGbTotalLastBytes}bytes/{relayServerNodeStore.Node.MaxGbTotal}gb"); - - return res; - } - /// - /// 添加流量 - /// - /// - /// - public bool AddBytes(ulong length) - { - bytes += length; - if (relayServerNodeStore.Node.MaxGbTotal == 0) - { - return true; - } - - if (relayServerNodeStore.Node.MaxGbTotalLastBytes >= length) - relayServerNodeStore.SetMaxGbTotalLastBytes(relayServerNodeStore.Node.MaxGbTotalLastBytes - length); - else relayServerNodeStore.SetMaxGbTotalLastBytes(0); - return relayServerNodeStore.Node.MaxGbTotalLastBytes > 0; - } - - /// - /// 获取单个限速 - /// - /// - public uint GetBandwidthLimit() - { - return (uint)Math.Ceiling((relayServerNodeStore.Node.MaxBandwidth * 1024 * 1024) / 8.0); - } /// /// 是否需要总限速 /// /// - public bool NeedLimit() + public bool NeedLimit(RelayTrafficCacheInfo relayCache) { + if (relayCache.Cache.Validated) return false; return limitTotal.NeedLimit(); } /// @@ -167,8 +146,86 @@ namespace linker.messenger.relay.server } - private void ResetBytes() + /// + /// 开始计算流量 + /// + /// + public void AddTrafficCache(RelayTrafficCacheInfo relayCache) { + SetLimit(relayCache); + trafficDict.TryAdd(relayCache.Cache.FlowId, relayCache); + } + /// + /// 取消计算流量 + /// + /// + public void RemoveTrafficCache(RelayTrafficCacheInfo relayCache) + { + trafficDict.TryRemove(relayCache.Cache.FlowId, out _); + foreach (var item in relayCache.Cache.Cdkey) + { + cdkeyLastBytes.TryRemove(item.Id, out _); + } + } + /// + /// 消耗流量 + /// + /// + /// + public bool AddBytes(RelayTrafficCacheInfo cache, ulong length) + { + Interlocked.Add(ref bytes, length); + + //验证过的,不消耗流量 + if (cache.Cache.Validated) return true; + //节点无流量限制的,不消耗流量 + if (relayServerNodeStore.Node.MaxGbTotal == 0) return true; + + Interlocked.Add(ref cache.Sendt, length); + + if (cache.CurrentCdkey != null) return cache.CurrentCdkey.LastBytes > 0; + + return relayServerNodeStore.Node.MaxGbTotalLastBytes > 0; + } + + /// + /// 设置限速 + /// + /// + private void SetLimit(RelayTrafficCacheInfo relayCache) + { + //验证过的,无限制 + if (relayCache.Cache.Validated) + { + relayCache.Limit.SetLimit(0); + return; + } + + //节点无限制 + if (relayServerNodeStore.Node.MaxBandwidth == 0) + { + relayCache.Limit.SetLimit((uint)Math.Ceiling((relayServerNodeStore.Node.MaxBandwidth * 1024 * 1024) / 8.0)); + return; + } + + RelayServerCdkeyInfo currentCdkey = relayCache.Cache.Cdkey.Where(c => c.LastBytes > 0).OrderByDescending(c => c.Bandwidth).FirstOrDefault(); + //有cdkey,且带宽大于节点带宽,就用cdkey的带宽 + if (currentCdkey != null && (currentCdkey.Bandwidth == 0 || currentCdkey.Bandwidth > relayServerNodeStore.Node.MaxBandwidth)) + { + relayCache.CurrentCdkey = currentCdkey; + relayCache.Limit.SetLimit((uint)Math.Ceiling((relayCache.CurrentCdkey.Bandwidth * 1024 * 1024) / 8.0)); + return; + } + + relayCache.Limit.SetLimit((uint)Math.Ceiling((relayServerNodeStore.Node.MaxBandwidth * 1024 * 1024) / 8.0)); + } + + private void ResetNodeBytes(ulong length) + { + if (relayServerNodeStore.Node.MaxGbTotalLastBytes >= length) + relayServerNodeStore.SetMaxGbTotalLastBytes(relayServerNodeStore.Node.MaxGbTotalLastBytes - length); + else relayServerNodeStore.SetMaxGbTotalLastBytes(0); + if (relayServerNodeStore.Node.MaxGbTotalMonth != DateTime.Now.Month) { relayServerNodeStore.SetMaxGbTotalMonth(DateTime.Now.Month); @@ -176,16 +233,78 @@ namespace linker.messenger.relay.server } relayServerNodeStore.Confirm(); } + private void TrafficTask() + { + TimerHelper.SetInterval(async () => + { + //需要报告Cdkey的流量 + _ = messengerSender.SendReply(new MessageRequestWrap + { + Connection = relayServerNodeStore.Node.Id == RelayServerNodeInfo.MASTER_NODE_ID ? localConnection : remoteConnection, + MessengerId = (ushort)RelayMessengerIds.TrafficReport, + Payload = serializer.Serialize(new RelayTrafficReportInfo + { + Id2Bytes = trafficDict.Values + .Where(c => c.CurrentCdkey != null && c.Sendt > 0) + .GroupBy(c => c.CurrentCdkey.Id) + .ToDictionary(c => c.Key, d => (ulong)d.Sum(d => (decimal)d.Sendt)), + UpdateIds = trafficDict.Values.SelectMany(c => c.Cache.Cdkey).Select(c => c.Id).Distinct().ToList(), + SecretKey = relayServerNodeStore.Node.Id == RelayServerNodeInfo.MASTER_NODE_ID + ? relayServerMasterStore.Master.SecretKey + : relayServerNodeStore.Node.MasterSecretKey + }), + Timeout = 4000, + + }).ContinueWith((result) => + { + //更新cdkey的剩余流量 + if (result.Result.Code != MessageResponeCodes.OK || result.Result.Data.Length == 0) + { + return; + } + Dictionary dic = serializer.Deserialize>(result.Result.Data.Span); + foreach (var item in dic) + { + cdkeyLastBytes.AddOrUpdate(item.Key, item.Value, (a, b) => item.Value); + } + }); + + foreach (var cache in trafficDict.Values.Where(c => c.CurrentCdkey != null)) + { + ulong length = cache.Sendt; + Interlocked.Add(ref Unsafe.As(ref cache.Sendt), -(long)length); + + if (cdkeyLastBytes.TryGetValue(cache.CurrentCdkey.Id, out ulong value)) + { + cache.CurrentCdkey.LastBytes = value; + } + if (cache.CurrentCdkey.LastBytes <= 0) + { + SetLimit(cache); + } + } + + foreach (var cache in trafficDict.Values.Where(c => c.CurrentCdkey == null)) + { + ulong length = cache.Sendt; + Interlocked.Add(ref Unsafe.As(ref cache.Sendt), -(long)length); + ResetNodeBytes(length); + } + await Task.CompletedTask; + + return true; + }, () => 5000); + } private void ReportTask() { TimerHelper.SetInterval(async () => { - if(LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG) + if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG) { LoggerHelper.Instance.Debug($"relay report : {relayServerNodeStore.Node.ToJson()}"); } - ResetBytes(); + IEnumerable nodes = new List { //默认报告给自己,作为本服务器的一个默认中继节点 @@ -215,7 +334,7 @@ namespace linker.messenger.relay.server { try { - ICrypto crypto = node.Id == RelayServerNodeInfo.MASTER_NODE_ID ? cryptoMaster : cryptoNode; + IConnection connection = node.Id == RelayServerNodeInfo.MASTER_NODE_ID ? localConnection : remoteConnection; IPEndPoint endPoint = await NetworkHelper.GetEndPointAsync(node.Host, relayServerNodeStore.ServicePort) ?? new IPEndPoint(IPAddress.Any, relayServerNodeStore.ServicePort); RelayServerNodeReportInfo relayNodeReportInfo = new RelayServerNodeReportInfo @@ -231,19 +350,15 @@ namespace linker.messenger.relay.server MaxConnection = node.MaxConnection, ConnectionRatio = Math.Round(node.MaxConnection == 0 ? 0 : connectionNum / 2.0 / node.MaxConnection, 2), EndPoint = endPoint, + UserIds = node.UserIds }; - IPEndPoint ep = await NetworkHelper.GetEndPointAsync(node.MasterHost, relayServerNodeStore.ServicePort); - - byte[] content = crypto.Encode(serializer.Serialize(relayNodeReportInfo)); - byte[] data = new byte[content.Length + 1]; - data[0] = (byte)ResolverType.RelayReport; - content.AsMemory().CopyTo(data.AsMemory(1)); - - using UdpClient udpClient = new UdpClient(AddressFamily.InterNetwork); - udpClient.Client.WindowsUdpBug(); - - await udpClient.SendAsync(data, ep); + await messengerSender.SendOnly(new MessageRequestWrap + { + Connection = connection, + MessengerId = (ushort)RelayMessengerIds.NodeReport, + Payload = serializer.Serialize(relayNodeReportInfo) + }); } catch (Exception ex) { @@ -256,5 +371,62 @@ namespace linker.messenger.relay.server return true; }, () => 5000); } + + private void SignInTask() + { + TimerHelper.SetInterval(async () => + { + if ((remoteConnection == null || remoteConnection.Connected == false) && string.IsNullOrWhiteSpace(relayServerNodeStore.Node.MasterHost) == false) + { + remoteConnection = await SignIn(relayServerNodeStore.Node.MasterHost, relayServerNodeStore.Node.MasterSecretKey).ConfigureAwait(false); + } + if (localConnection == null || localConnection.Connected == false) + { + localConnection = await SignIn(new IPEndPoint(IPAddress.Loopback, relayServerNodeStore.ServicePort).ToString(), relayServerMasterStore.Master.SecretKey).ConfigureAwait(false); + } + return true; + }, () => 3000); + } + private async Task SignIn(string host, string secretKey) + { + byte[] bytes = ArrayPool.Shared.Rent(1024); + try + { + byte[] secretKeyBytes = secretKey.Md5().ToBytes(); + + bytes[0] = (byte)secretKeyBytes.Length; + secretKeyBytes.AsSpan().CopyTo(bytes.AsSpan(1)); + + + IPEndPoint remote = await NetworkHelper.GetEndPointAsync(host, 1802); + + Socket socket = new Socket(remote.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + socket.KeepAlive(); + await socket.ConnectAsync(remote).WaitAsync(TimeSpan.FromMilliseconds(5000)).ConfigureAwait(false); + return await messengerResolver.BeginReceiveClient(socket, true, (byte)ResolverType.RelayReport, bytes).ConfigureAwait(false); + } + catch (Exception) + { + } + finally + { + ArrayPool.Shared.Return(bytes); + } + return null; + } } + + public sealed partial class RelayTrafficReportInfo + { + /// + /// cdkey id 和 流量 + /// + public Dictionary Id2Bytes { get; set; } + /// + /// 需要知道哪些cdkey的剩余流量 + /// + public List UpdateIds { get; set; } + public string SecretKey { get; set; } + } + } diff --git a/src/linker.messenger.relay/server/RelayServerReportResolver.cs b/src/linker.messenger.relay/server/RelayServerReportResolver.cs index 7a46745d..7b8c7a87 100644 --- a/src/linker.messenger.relay/server/RelayServerReportResolver.cs +++ b/src/linker.messenger.relay/server/RelayServerReportResolver.cs @@ -1,4 +1,5 @@ -using linker.libs.extends; +using linker.libs; +using linker.libs.extends; using System.Buffers; using System.Net; using System.Net.Sockets; @@ -8,14 +9,19 @@ namespace linker.messenger.relay.server /// /// 中继节点报告处理器 /// - public class RelayServerReportResolver: IResolver + public class RelayServerReportResolver : IResolver { public byte Type => (byte)ResolverType.RelayReport; private readonly RelayServerMasterTransfer relayServerTransfer; - public RelayServerReportResolver(RelayServerMasterTransfer relayServerTransfer) + private readonly IRelayServerMasterStore relayServerMasterStore; + private readonly IMessengerResolver messengerResolver; + + public RelayServerReportResolver(RelayServerMasterTransfer relayServerTransfer, IRelayServerMasterStore relayServerMasterStore, IMessengerResolver messengerResolver) { this.relayServerTransfer = relayServerTransfer; + this.relayServerMasterStore = relayServerMasterStore; + this.messengerResolver = messengerResolver; } public virtual void AddReceive(ulong bytes) @@ -31,15 +37,14 @@ namespace linker.messenger.relay.server try { AddReceive((ulong)memory.Length); - int length = await socket.ReceiveAsync(buffer.AsMemory(), SocketFlags.None).ConfigureAwait(false); + int length = await socket.ReceiveAsync(buffer.AsMemory(0, 1), SocketFlags.None).ConfigureAwait(false); AddReceive((ulong)length); + await socket.ReceiveAsync(buffer.AsMemory(0, length), SocketFlags.None).ConfigureAwait(false); - string key = buffer.AsMemory(0,length).GetString(); - Memory bytes = relayServerTransfer.TryGetRelayCache(key); - if (bytes.Length > 0) + string key = buffer.AsMemory(0, length).GetString(); + if (relayServerMasterStore.Master.SecretKey.Md5() == key) { - AddSendt((ulong)bytes.Length); - await socket.SendAsync(bytes); + await messengerResolver.BeginReceiveServer(socket, Helper.EmptyArray); } } catch (Exception) @@ -49,13 +54,10 @@ namespace linker.messenger.relay.server { ArrayPool.Shared.Return(buffer); } - } public async Task Resolve(Socket socket, IPEndPoint ep, Memory memory) { - AddReceive((ulong)memory.Length); - relayServerTransfer.SetNodeReport(ep, memory); await Task.CompletedTask; } } diff --git a/src/linker.messenger.relay/server/RelayServerResolver.cs b/src/linker.messenger.relay/server/RelayServerResolver.cs index 513ea6a6..872551de 100644 --- a/src/linker.messenger.relay/server/RelayServerResolver.cs +++ b/src/linker.messenger.relay/server/RelayServerResolver.cs @@ -52,17 +52,6 @@ namespace linker.messenger.relay.server int length = await socket.ReceiveAsync(buffer.AsMemory(), SocketFlags.None).ConfigureAwait(false); RelayMessageInfo relayMessage = serializer.Deserialize(buffer.AsMemory(0, length).Span); - if (relayMessage.Type == RelayMessengerType.Ask && relayMessage.NodeId != RelayServerNodeInfo.MASTER_NODE_ID) - { - if (relayServerNodeTransfer.Validate() == false) - { - if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG) - LoggerHelper.Instance.Error($"relay {relayMessage.Type} Validate false,flowid:{relayMessage.FlowId}"); - await socket.SendAsync(new byte[] { 1 }); - socket.SafeClose(); - return; - } - } //ask 是发起端来的,那key就是 发起端->目标端, answer的,目标和来源会交换,所以转换一下 string key = relayMessage.Type == RelayMessengerType.Ask ? $"{relayMessage.FromId}->{relayMessage.ToId}->{relayMessage.FlowId}" : $"{relayMessage.ToId}->{relayMessage.FromId}->{relayMessage.FlowId}"; //获取缓存 @@ -71,9 +60,20 @@ namespace linker.messenger.relay.server { if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG) LoggerHelper.Instance.Error($"relay {relayMessage.Type} get cache fail,flowid:{relayMessage.FlowId}"); + await socket.SendAsync(new byte[] { 1 }); socket.SafeClose(); return; } + + if (relayMessage.Type == RelayMessengerType.Ask && relayServerNodeTransfer.Validate(relayCache) == false) + { + if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG) + LoggerHelper.Instance.Error($"relay {relayMessage.Type} Validate false,flowid:{relayMessage.FlowId}"); + await socket.SendAsync(new byte[] { 1 }); + socket.SafeClose(); + return; + } + //流量统计 AddReceive(relayCache.FromId, relayCache.FromName, relayCache.ToName, relayCache.GroupId, (ulong)length); try @@ -84,7 +84,6 @@ namespace linker.messenger.relay.server { //添加本地缓存 RelayWrapInfo relayWrap = new RelayWrapInfo { Socket = socket, Tcs = new TaskCompletionSource() }; - relayWrap.Limit.SetLimit(relayServerNodeTransfer.GetBandwidthLimit()); relayDic.TryAdd(relayCache.FlowId, relayWrap); @@ -105,8 +104,9 @@ namespace linker.messenger.relay.server } relayWrap.Tcs.SetResult(); - _ = CopyToAsync(relayCache, relayWrap.Limit, socket, relayWrap.Socket, relayMessage.NodeId != RelayServerNodeInfo.MASTER_NODE_ID); - _ = CopyToAsync(relayCache, relayWrap.Limit, relayWrap.Socket, socket, relayMessage.NodeId != RelayServerNodeInfo.MASTER_NODE_ID); + RelaySpeedLimit limit = new RelaySpeedLimit(); + _ = CopyToAsync(relayCache, limit, socket, relayWrap.Socket); + _ = CopyToAsync(relayCache, limit, relayWrap.Socket, socket); } break; default: @@ -139,48 +139,48 @@ namespace linker.messenger.relay.server ArrayPool.Shared.Return(buffer); } } - private async Task CopyToAsync(RelayCacheInfo cache, RelaySpeedLimit limit, Socket source, Socket destination, bool needLimit) + private async Task CopyToAsync(RelayCacheInfo cache, RelaySpeedLimit limit, Socket source, Socket destination) { + RelayTrafficCacheInfo trafficCacheInfo = new RelayTrafficCacheInfo { Cache = cache, Sendt = 0, Limit = limit }; byte[] buffer = new byte[8 * 1024]; try { relayServerNodeTransfer.IncrementConnectionNum(); + relayServerNodeTransfer.AddTrafficCache(trafficCacheInfo); int bytesRead; while ((bytesRead = await source.ReceiveAsync(buffer.AsMemory(), SocketFlags.None).ConfigureAwait(false)) != 0) { - if (needLimit) + //流量限制 + if (relayServerNodeTransfer.AddBytes(trafficCacheInfo, (ulong)bytesRead) == false) { - //流量限制 - if (relayServerNodeTransfer.AddBytes((ulong)bytesRead) == false) - { - source.SafeClose(); - break; - } + source.SafeClose(); + break; + } - //总速度 - if (relayServerNodeTransfer.NeedLimit()) + //总速度 + if (relayServerNodeTransfer.NeedLimit(trafficCacheInfo)) + { + int length = bytesRead; + relayServerNodeTransfer.TryLimit(ref length); + while (length > 0) { - int length = bytesRead; + await Task.Delay(30).ConfigureAwait(false); relayServerNodeTransfer.TryLimit(ref length); - while (length > 0) - { - await Task.Delay(30).ConfigureAwait(false); - relayServerNodeTransfer.TryLimit(ref length); - } - } - //单个速度 - if (limit.NeedLimit()) - { - int length = bytesRead; - limit.TryLimit(ref length); - while (length > 0) - { - await Task.Delay(30).ConfigureAwait(false); - limit.TryLimit(ref length); - } } } + //单个速度 + if (limit.NeedLimit()) + { + int length = bytesRead; + limit.TryLimit(ref length); + while (length > 0) + { + await Task.Delay(30).ConfigureAwait(false); + limit.TryLimit(ref length); + } + } + AddReceive(cache.FromId, cache.FromName, cache.ToName, cache.GroupId, (ulong)bytesRead); AddSendt(cache.FromId, cache.FromName, cache.ToName, cache.GroupId, (ulong)bytesRead); await destination.SendAsync(buffer.AsMemory(0, bytesRead), SocketFlags.None).ConfigureAwait(false); @@ -192,6 +192,7 @@ namespace linker.messenger.relay.server finally { relayServerNodeTransfer.DecrementConnectionNum(); + relayServerNodeTransfer.RemoveTrafficCache(trafficCacheInfo); source.SafeClose(); destination.SafeClose(); } @@ -256,15 +257,22 @@ namespace linker.messenger.relay.server public string ToId { get; set; } public string ToName { get; set; } public string GroupId { get; set; } + public bool Validated { get; set; } public List Cdkey { get; set; } } + public sealed class RelayTrafficCacheInfo + { + public ulong Sendt; + public RelaySpeedLimit Limit { get; set; } + public RelayCacheInfo Cache { get; set; } + public RelayServerCdkeyInfo CurrentCdkey { get; set; } + } public sealed class RelayWrapInfo { public TaskCompletionSource Tcs { get; set; } public Socket Socket { get; set; } - public RelaySpeedLimit Limit { get; set; } = new RelaySpeedLimit(); } public sealed partial class RelayMessageInfo diff --git a/src/linker.messenger.serializer.memorypack/Entry.cs b/src/linker.messenger.serializer.memorypack/Entry.cs index a1df4b26..cfeed455 100644 --- a/src/linker.messenger.serializer.memorypack/Entry.cs +++ b/src/linker.messenger.serializer.memorypack/Entry.cs @@ -60,6 +60,7 @@ namespace linker.messenger.serializer.memorypack MemoryPackFormatterProvider.Register(new RelayServerCdkeyPageResultInfoFormatter()); MemoryPackFormatterProvider.Register(new RelayServerCdkeyAddInfoFormatter()); MemoryPackFormatterProvider.Register(new RelayServerCdkeyDelInfoFormatter()); + MemoryPackFormatterProvider.Register(new RelayTrafficReportInfoFormatter()); MemoryPackFormatterProvider.Register(new AccessUpdateInfoFormatter()); diff --git a/src/linker.messenger.serializer.memorypack/RelaySerializer.cs b/src/linker.messenger.serializer.memorypack/RelaySerializer.cs index 045162fd..86968b1e 100644 --- a/src/linker.messenger.serializer.memorypack/RelaySerializer.cs +++ b/src/linker.messenger.serializer.memorypack/RelaySerializer.cs @@ -18,11 +18,13 @@ namespace linker.messenger.serializer.memorypack [MemoryPackInclude, MemoryPackAllowSerialize] IPEndPoint Server => info.Server; + [MemoryPackInclude] + string UserId => info.UserId; [MemoryPackConstructor] - SerializableRelayTestInfo(string machineId, string secretKey, IPEndPoint server) + SerializableRelayTestInfo(string machineId, string secretKey, IPEndPoint server, string userid) { - var info = new RelayTestInfo { MachineId = machineId, SecretKey = secretKey, Server = server }; + var info = new RelayTestInfo { MachineId = machineId, SecretKey = secretKey, Server = server, UserId = userid }; this.info = info; } @@ -183,6 +185,10 @@ namespace linker.messenger.serializer.memorypack [MemoryPackInclude] long LastTicks => info.LastTicks; + [MemoryPackInclude] + List UserIds => info.UserIds; + + [MemoryPackConstructor] SerializableRelayServerNodeReportInfo( string id, string name, @@ -190,7 +196,7 @@ namespace linker.messenger.serializer.memorypack double maxGbTotal, ulong maxGbTotalLastBytes, double connectionRatio, double bandwidthRatio, bool Public, int delay, - IPEndPoint endPoint, long lastTicks) + IPEndPoint endPoint, long lastTicks, List userIds) { var info = new RelayServerNodeReportInfo { @@ -207,6 +213,7 @@ namespace linker.messenger.serializer.memorypack MaxGbTotalLastBytes = maxGbTotalLastBytes, Name = name, Public = Public, + UserIds = userIds }; this.info = info; } @@ -318,11 +325,15 @@ namespace linker.messenger.serializer.memorypack [MemoryPackInclude] string GroupId => info.GroupId; + [MemoryPackInclude] + bool Validated => info.Validated; + + [MemoryPackInclude, MemoryPackAllowSerialize] List Cdkey => info.Cdkey; [MemoryPackConstructor] - SerializableRelayCacheInfo(ulong flowId, string fromId, string fromName, string toId, string toName, string groupId, List cdkey) + SerializableRelayCacheInfo(ulong flowId, string fromId, string fromName, string toId, string toName, string groupId, bool validated, List cdkey) { var info = new RelayCacheInfo { @@ -332,7 +343,8 @@ namespace linker.messenger.serializer.memorypack GroupId = groupId, ToId = toId, ToName = toName, - Cdkey = cdkey + Cdkey = cdkey, + Validated = validated }; this.info = info; } @@ -770,4 +782,64 @@ namespace linker.messenger.serializer.memorypack } } + + + [MemoryPackable] + public readonly partial struct SerializableRelayTrafficReportInfo + { + [MemoryPackIgnore] + public readonly RelayTrafficReportInfo info; + + [MemoryPackInclude] + Dictionary Id2Bytes => info.Id2Bytes; + [MemoryPackInclude] + List UpdateIds => info.UpdateIds; + [MemoryPackInclude] + string SecretKey => info.SecretKey; + + [MemoryPackConstructor] + SerializableRelayTrafficReportInfo(Dictionary id2Bytes, List updateIds, string secretKey) + { + var info = new RelayTrafficReportInfo + { + Id2Bytes = id2Bytes, + UpdateIds = updateIds, + SecretKey = secretKey + }; + this.info = info; + } + + public SerializableRelayTrafficReportInfo(RelayTrafficReportInfo info) + { + this.info = info; + } + } + public class RelayTrafficReportInfoFormatter : MemoryPackFormatter + { + public override void Serialize(ref MemoryPackWriter writer, scoped ref RelayTrafficReportInfo value) + { + if (value == null) + { + writer.WriteNullObjectHeader(); + return; + } + + writer.WritePackable(new SerializableRelayTrafficReportInfo(value)); + } + + public override void Deserialize(ref MemoryPackReader reader, scoped ref RelayTrafficReportInfo value) + { + if (reader.PeekIsNull()) + { + reader.Advance(1); // skip null block + value = null; + return; + } + + var wrapped = reader.ReadPackable(); + value = wrapped.info; + } + } + + } diff --git a/src/linker.messenger.sforward/client/SForwardClientTransfer.cs b/src/linker.messenger.sforward/client/SForwardClientTransfer.cs index 0b8da8dc..e1ac275d 100644 --- a/src/linker.messenger.sforward/client/SForwardClientTransfer.cs +++ b/src/linker.messenger.sforward/client/SForwardClientTransfer.cs @@ -26,7 +26,7 @@ namespace linker.messenger.sforward.client this.signInClientStore = signInClientStore; this.sForwardClientStore = sForwardClientStore; - signInClientState.NetworkFirstEnabledHandle += () => Start(); + signInClientState.OnSignInSuccess += (i) => Start(); this.serializer = serializer; } diff --git a/src/linker.messenger.signin/SignInClientTransfer.cs b/src/linker.messenger.signin/SignInClientTransfer.cs index 1fb64f60..b3d39d2e 100644 --- a/src/linker.messenger.signin/SignInClientTransfer.cs +++ b/src/linker.messenger.signin/SignInClientTransfer.cs @@ -125,7 +125,7 @@ namespace linker.messenger.signin Socket socket = new Socket(remote.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp); socket.KeepAlive(); await socket.ConnectAsync(remote).WaitAsync(TimeSpan.FromMilliseconds(5000)).ConfigureAwait(false); - clientSignInState.Connection = await messengerResolver.BeginReceiveClient(socket, true, (byte)ResolverType.Messenger).ConfigureAwait(false); + clientSignInState.Connection = await messengerResolver.BeginReceiveClient(socket, true, (byte)ResolverType.Messenger, Helper.EmptyArray).ConfigureAwait(false); return true; } diff --git a/src/linker.messenger/MessengerResolver.cs b/src/linker.messenger/MessengerResolver.cs index ffff9d20..a4a82e24 100644 --- a/src/linker.messenger/MessengerResolver.cs +++ b/src/linker.messenger/MessengerResolver.cs @@ -15,7 +15,7 @@ namespace linker.messenger public interface IMessengerResolver { public Task BeginReceiveClient(Socket socket); - public Task BeginReceiveClient(Socket socket, bool sendFlag, byte flag); + public Task BeginReceiveClient(Socket socket, bool sendFlag, byte flag, byte[] data); public void AddMessenger(List list); public Task BeginReceiveServer(Socket socket, Memory memory); public Task BeginReceiveServer(Socket socket, IPEndPoint ep, Memory memory); @@ -104,7 +104,7 @@ namespace linker.messenger /// public async Task BeginReceiveClient(Socket socket) { - return await BeginReceiveClient(socket, false, 0); + return await BeginReceiveClient(socket, false, 0, Helper.EmptyArray); } /// /// 以客户端模式接收数据 @@ -113,7 +113,7 @@ namespace linker.messenger /// /// /// - public async Task BeginReceiveClient(Socket socket, bool sendFlag, byte flag) + public async Task BeginReceiveClient(Socket socket, bool sendFlag, byte flag, byte[] data) { try { @@ -126,6 +126,11 @@ namespace linker.messenger { await socket.SendAsync(new byte[] { flag }).ConfigureAwait(false); } + if (data.Length > 0) + { + await socket.SendAsync(data).ConfigureAwait(false); + } + NetworkStream networkStream = new NetworkStream(socket, false); SslStream sslStream = new SslStream(networkStream, true, new RemoteCertificateValidationCallback(ValidateServerCertificate), null); await sslStream.AuthenticateAsClientAsync(new SslClientAuthenticationOptions @@ -167,7 +172,7 @@ namespace linker.messenger Type voidType = typeof(void); Type midType = typeof(MessengerIdAttribute); - LoggerHelper.Instance.Info($"add messenger {string.Join(",",list.Select(c=>c.GetType().Name))}"); + LoggerHelper.Instance.Info($"add messenger {string.Join(",", list.Select(c => c.GetType().Name))}"); foreach (IMessenger messenger in list.Distinct()) { diff --git a/src/linker.messenger/ResolverTransfer.cs b/src/linker.messenger/ResolverTransfer.cs index 6924daf7..96347ce0 100644 --- a/src/linker.messenger/ResolverTransfer.cs +++ b/src/linker.messenger/ResolverTransfer.cs @@ -36,7 +36,7 @@ namespace linker.messenger /// public async Task BeginReceive(Socket socket) { - byte[] buffer = ArrayPool.Shared.Rent(1024); + byte[] buffer = ArrayPool.Shared.Rent(32); try { if (socket == null || socket.RemoteEndPoint == null) @@ -44,8 +44,6 @@ namespace linker.messenger return; } - //socket.KeepAlive(); - int length = await socket.ReceiveAsync(buffer.AsMemory(0, 1), SocketFlags.None).ConfigureAwait(false); byte type = buffer[0]; diff --git a/version.txt b/version.txt index 3621b243..502e96dc 100644 --- a/version.txt +++ b/version.txt @@ -1,5 +1,5 @@ v1.6.9 -2025-03-02 17:55:03 +2025-03-03 17:44:29 1. 优化linux下路由跟踪问题 2. 优化linux下获取本机IP问题 3. 增加ICS,让win7+、win server2008+支持NAT