发布:3.1.5

This commit is contained in:
若汝棋茗
2025-05-24 22:20:50 +08:00
parent ec285f3e9f
commit efeca7def3
21 changed files with 117 additions and 67 deletions

View File

@@ -1,7 +1,7 @@
<Project>
<PropertyGroup>
<BaseVersion>3.1.3</BaseVersion>
<BaseVersion>3.1.5</BaseVersion>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)'=='Release'">
<TouchSocketVersion>$(BaseVersion)</TouchSocketVersion>

View File

@@ -116,12 +116,7 @@ public static class CollectionsExtension
/// <typeparam name="TValue">字典中值的类型。</typeparam>
public static void AddOrUpdate<TKey, TValue>(this Dictionary<TKey, TValue> dictionary, TKey key, TValue value)
{
// 尝试向字典中添加键值对。如果键不存在,则成功添加;如果键已存在,则不会添加新的键值对。
if (!dictionary.TryAdd(key, value))
{
// 如果尝试添加失败,则说明键已存在,此时更新键对应的值。
dictionary[key] = value;
}
dictionary[key] = value;
}
@@ -135,12 +130,8 @@ public static class CollectionsExtension
/// <typeparam name="TValue">值的类型。</typeparam>
public static void AddOrUpdate<TKey, TValue>(this ConcurrentDictionary<TKey, TValue> dictionary, TKey key, TValue value)
{
// 尝试向字典中添加键值对。如果键已存在,则不会添加。
if (!dictionary.TryAdd(key, value))
{
// 如果键已存在,更新其值。
dictionary[key] = value;
}
// 直接使用内置方法,语义更明确
dictionary.AddOrUpdate(key, value, (_, _) => value);
}

View File

@@ -65,11 +65,6 @@ public abstract class DmtpActor : DisposableObject, IDmtpActor
/// </summary>
public Func<DmtpActor, PackageRouterEventArgs, Task> Routing { get; set; }
///// <summary>
///// 发送数据接口
///// </summary>
//public Action<DmtpActor, ArraySegment<byte>[]> OutputSend { get; set; }
/// <summary>
/// 异步发送数据接口
/// </summary>
@@ -89,7 +84,7 @@ public abstract class DmtpActor : DisposableObject, IDmtpActor
public string Id { get; set; }
/// <inheritdoc/>
public bool Online { get; protected set; }
public virtual bool Online => this.m_online;
/// <inheritdoc/>
public bool IsReliable { get; }
@@ -112,8 +107,9 @@ public abstract class DmtpActor : DisposableObject, IDmtpActor
private readonly ConcurrentDictionary<int, InternalChannel> m_userChannels = new ConcurrentDictionary<int, InternalChannel>();
private readonly AsyncResetEvent m_handshakeFinished = new AsyncResetEvent(false, false);
private CancellationTokenSource m_cancellationTokenSource;
private bool m_online;
private readonly Lock m_syncRoot = new Lock();
private Dictionary<Type, IActor> m_actors = new Dictionary<Type, IActor>();
private readonly Dictionary<Type, IActor> m_actors = new Dictionary<Type, IActor>();
#endregion
/// <summary>
@@ -146,7 +142,7 @@ public abstract class DmtpActor : DisposableObject, IDmtpActor
/// <exception cref="TimeoutException"></exception>
public virtual async Task HandshakeAsync(string verifyToken, string id, int millisecondsTimeout, Metadata metadata, CancellationToken token)
{
if (this.Online)
if (this.m_online)
{
return;
}
@@ -181,7 +177,7 @@ public abstract class DmtpActor : DisposableObject, IDmtpActor
if (verifyResult.Status == 1)
{
this.Id = verifyResult.Id;
this.Online = true;
this.m_online = true;
_ = EasyTask.SafeRun(this.PrivateOnHandshaked, new DmtpVerifyEventArgs()
{
Id = verifyResult.Id,
@@ -225,20 +221,13 @@ public abstract class DmtpActor : DisposableObject, IDmtpActor
{
lock (this.m_syncRoot)
{
if (!this.Online)
if (!this.m_online)
{
return;
}
this.Online = false;
this.m_online = false;
this.WaitHandlePool.CancelAll();
this.m_cancellationTokenSource?.Cancel();
foreach (var item in this.m_actors)
{
item.Value.SafeDispose();
}
this.m_actors.Clear();
}
if (manual || this.Closing == null)
@@ -433,7 +422,7 @@ public abstract class DmtpActor : DisposableObject, IDmtpActor
waitVerify.Metadata = args.Metadata;
waitVerify.Message = args.Message ?? TouchSocketCoreResource.OperationSuccessful;
await this.SendJsonObjectAsync(P2_Handshake_Response, waitVerify).ConfigureAwait(EasyTask.ContinueOnCapturedContext);
this.Online = true;
this.m_online = true;
args.Message ??= TouchSocketCoreResource.OperationSuccessful;
this.m_cancellationTokenSource = new CancellationTokenSource();
_ = EasyTask.SafeRun(this.PrivateOnHandshaked, args);
@@ -840,6 +829,10 @@ public abstract class DmtpActor : DisposableObject, IDmtpActor
private async Task<bool> PrivatePingAsync(string targetId, int millisecondsTimeout)
{
if (!this.Online)
{
return false;
}
var waitPing = new WaitPing
{
TargetId = targetId,
@@ -878,17 +871,35 @@ public abstract class DmtpActor : DisposableObject, IDmtpActor
}
#region Actor
/// <inheritdoc/>
public bool TryAddActor<TActor>(TActor actor) where TActor : class, IActor
{
ThrowHelper.ThrowArgumentNullExceptionIf(actor, nameof(actor));
var type = typeof(TActor);
lock (this.m_syncRoot)
{
if (this.m_actors.ContainsKey(type))
{
return false;
}
this.m_actors.Add(type, actor);
return true;
}
}
/// <inheritdoc/>
public void AddActor<TActor>(TActor actor) where TActor : class, IActor
{
ThrowHelper.ThrowArgumentNullExceptionIf(actor, nameof(actor));
var type = typeof(TActor);
if (this.m_actors.ContainsKey(type))
lock (this.m_syncRoot)
{
ThrowHelper.ThrowException(TouchSocketDmtpResource.ActorAlreadyExists.Format(type));
this.m_actors.AddOrUpdate(type, actor);
}
this.m_actors.Add(type, actor);
}
/// <inheritdoc/>
@@ -913,11 +924,11 @@ public abstract class DmtpActor : DisposableObject, IDmtpActor
{
lock (this.m_syncRoot)
{
if (!this.Online)
if (!this.m_online)
{
return;
}
this.Online = false;
this.m_online = false;
this.Closing = null;
this.Routing = null;
@@ -927,6 +938,12 @@ public abstract class DmtpActor : DisposableObject, IDmtpActor
this.IdChanged = null;
//this.OutputSend = null;
foreach (var item in this.m_actors)
{
item.Value.SafeDispose();
}
this.m_actors.Clear();
this.WaitHandlePool.CancelAll();
this.WaitHandlePool.SafeDispose();
}
@@ -956,7 +973,7 @@ public abstract class DmtpActor : DisposableObject, IDmtpActor
/// <returns></returns>
public async Task<Result> SendCloseAsync(string msg)
{
if (!this.Online)
if (!this.m_online)
{
return Result.FromFail(TouchSocketResource.ClientNotConnected);
}

View File

@@ -122,6 +122,16 @@ public interface IDmtpActor : IDisposableObject, IOnlineClient, IClosableClient,
/// <param name="actor">要添加的 Actor 实例。</param>
void AddActor<TActor>(TActor actor) where TActor : class, IActor;
/// <summary>
/// 尝试添加一个实现了 <see cref="IActor"/> 接口的 Actor 实例。
/// </summary>
/// <typeparam name="TActor">Actor 的具体类型,必须实现 <see cref="IActor"/> 接口。</typeparam>
/// <param name="actor">要添加的 Actor 实例。</param>
/// <returns>如果添加成功则返回 <see langword="true"/>,否则返回 <see langword="false"/>。</returns>
bool TryAddActor<TActor>(TActor actor) where TActor : class, IActor;
/// <summary>
/// 获取指定类型的 Actor 实例。
/// </summary>

View File

@@ -23,7 +23,7 @@ internal sealed class UdpDmtpClient : DmtpActor, IUdpDmtpClient
private readonly EndPoint m_endPoint;
private readonly UdpDmtp m_udpSession;
private IPluginManager m_pluginManager;
public override bool Online => true;
/// <summary>
/// UdpDmtp终端客户端
/// </summary>
@@ -62,8 +62,6 @@ internal sealed class UdpDmtpClient : DmtpActor, IUdpDmtpClient
return false;
}
this.Online = true;
args = new DmtpVerifyEventArgs()
{
Id = this.Id

View File

@@ -24,12 +24,14 @@ public static class DmtpActorExtension
#region Ping
/// <inheritdoc cref="IDmtpActor.PingAsync(int)"/>
[AsyncToSyncWarning]
public static bool Ping(this IDmtpActorObject client, int millisecondsTimeout = 5000)
{
return client.DmtpActor.PingAsync(millisecondsTimeout).GetFalseAwaitResult();
}
/// <inheritdoc cref="IDmtpActor.PingAsync(string, int)"/>
[AsyncToSyncWarning]
public static bool Ping(this IDmtpActorObject client, string targetId, int millisecondsTimeout = 5000)
{
return client.DmtpActor.PingAsync(targetId, millisecondsTimeout).GetFalseAwaitResult();

View File

@@ -64,7 +64,7 @@ public sealed class DmtpFileTransferFeature : PluginBase, IDmtpHandshakingPlugin
MaxSmallFileLength = this.MaxSmallFileLength
};
dmtpFileTransferActor.SetProtocolFlags(this.StartProtocol);
client.DmtpActor.AddActor<DmtpFileTransferActor>(dmtpFileTransferActor);
client.DmtpActor.TryAddActor<DmtpFileTransferActor>(dmtpFileTransferActor);
await e.InvokeNext().ConfigureAwait(EasyTask.ContinueOnCapturedContext);
}

View File

@@ -81,6 +81,7 @@ public static class DmtpFileTransferActorExtension
/// <param name="targetId">目标文件的标识。</param>
/// <param name="fileOperator">文件操作对象,用于处理文件传输过程中的具体操作。</param>
/// <returns>返回文件拉取操作的结果。</returns>
[AsyncToSyncWarning]
public static Result PullFile(this IDmtpFileTransferActor actor, string targetId, FileOperator fileOperator)
{
return PullFileAsync(actor, targetId, fileOperator).GetFalseAwaitResult();
@@ -227,6 +228,7 @@ public static class DmtpFileTransferActorExtension
/// <param name="targetId">目标标识符,标识文件推送的目的地</param>
/// <param name="fileOperator">文件操作对象,包含待推送的文件信息和操作方法</param>
/// <returns>返回文件推送操作的结果</returns>
[AsyncToSyncWarning]
public static Result PushFile(this IDmtpFileTransferActor actor, string targetId, FileOperator fileOperator)
{
return PushFileAsync(actor, targetId, fileOperator).GetFalseAwaitResult();
@@ -348,9 +350,10 @@ public static class DmtpFileTransferActorExtension
/// <param name="actor">提供文件传输功能的actor。</param>
/// <param name="fileOperator">用于处理文件的文件操作器。</param>
/// <returns>返回一个<see cref="Result"/>类型的值,包含操作的结果信息。</returns>
[AsyncToSyncWarning]
public static Result PullFile(this IDmtpFileTransferActor actor, FileOperator fileOperator)
{
return PullFile(actor, default, fileOperator);
return PullFileAsync(actor, default, fileOperator).GetFalseAwaitResult();
}
@@ -372,10 +375,11 @@ public static class DmtpFileTransferActorExtension
/// <param name="actor">实现文件传输操作的演员。</param>
/// <param name="fileOperator">文件操作对象,用于指定文件操作。</param>
/// <returns>返回文件推送操作的结果。</returns>
[AsyncToSyncWarning]
public static Result PushFile(this IDmtpFileTransferActor actor, FileOperator fileOperator)
{
// 调用重载的PushFile方法使用默认值进行文件推送操作
return PushFile(actor, default, fileOperator);
return PushFileAsync(actor, default, fileOperator).GetFalseAwaitResult();
}

View File

@@ -59,7 +59,7 @@ public class RedisFeature : PluginBase, IDmtpHandshakingPlugin, IDmtpReceivedPlu
};
dmtpRedisActor.SetProtocolFlags(this.StartProtocol);
client.DmtpActor.AddActor<DmtpRedisActor>(dmtpRedisActor);
client.DmtpActor.TryAddActor<DmtpRedisActor>(dmtpRedisActor);
await e.InvokeNext().ConfigureAwait(EasyTask.ContinueOnCapturedContext);
}

View File

@@ -255,7 +255,7 @@ public class DmtpRpcActor :DisposableObject, IDmtpRpcActor
}
}
private async Task InvokeThisAsync(object o)
private async Task InvokeThisAsync(IDmtpRpcCallContext o)
{
var callContext = (DmtpRpcCallContext)o;
var rpcRequestPackage = callContext.DmtpRpcPackage;
@@ -291,9 +291,9 @@ public class DmtpRpcActor :DisposableObject, IDmtpRpcActor
}
else
{
if (rpcRequestPackage.Feedback == FeedbackType.WaitInvoke && rpcMethod.HasCallContext)
if (rpcRequestPackage.Feedback == FeedbackType.WaitInvoke)
{
this.m_callContextDic.TryAdd(rpcRequestPackage.Sign, callContext);
this.m_callContextDic.AddOrUpdate(rpcRequestPackage.Sign, callContext);
}
}
@@ -353,8 +353,9 @@ public class DmtpRpcActor :DisposableObject, IDmtpRpcActor
byteBlock.Dispose();
}
}
catch
catch(Exception ex)
{
this.DmtpActor.Logger?.Exception(this, ex);
}
finally
{
@@ -383,7 +384,7 @@ public class DmtpRpcActor :DisposableObject, IDmtpRpcActor
{
if (disposing)
{
Dispatcher.SafeDispose();
this.Dispatcher.SafeDispose();
}
base.Dispose(disposing);
}

View File

@@ -19,7 +19,7 @@ namespace TouchSocket.Dmtp.Rpc;
/// <summary>
/// TargetDmtpRpcActor
/// </summary>
internal class TargetDmtpRpcActor : IRpcClient
internal class TargetDmtpRpcActor : IDmtpRpcActor
{
private readonly IDmtpRpcActor m_rpcActor;
private readonly string m_targetId;
@@ -35,8 +35,29 @@ internal class TargetDmtpRpcActor : IRpcClient
this.m_rpcActor = rpcActor; // 初始化RPC行为接口
}
public IRpcDispatcher<IDmtpActor, IDmtpRpcCallContext> Dispatcher => this.m_rpcActor.Dispatcher;
public IDmtpActor DmtpActor => this.m_rpcActor.DmtpActor;
public bool DisposedValue => this.m_rpcActor.DisposedValue;
public void Dispose()
{
this.m_rpcActor.Dispose();
}
public Task<bool> InputReceivedData(DmtpMessage message)
{
return this.m_rpcActor.InputReceivedData(message);
}
public Task<object> InvokeAsync(string invokeKey, Type returnType, IInvokeOption invokeOption, params object[] parameters)
{
return this.m_rpcActor.InvokeAsync(invokeKey, returnType, invokeOption, parameters);
return this.m_rpcActor.InvokeAsync(this.m_targetId, invokeKey, returnType, invokeOption, parameters);
}
public Task<object> InvokeAsync(string targetId, string invokeKey, Type returnType, IInvokeOption invokeOption, params object[] parameters)
{
return this.m_rpcActor.InvokeAsync(targetId, invokeKey, returnType, invokeOption, parameters);
}
}

View File

@@ -37,12 +37,12 @@ public static class DmtpRpcActorExtension
#endregion DependencyProperty
/// <summary>
/// 新创建一个直接向目标地址请求的<see cref="IRpcClient"/>客户端。
/// 新创建一个直接向目标地址请求的<see cref="IDmtpRpcActor"/>客户端。
/// </summary>
/// <param name="client">要为其创建目标DMTP RPC演员的客户端。</param>
/// <param name="targetId">目标地址的标识符。</param>
/// <returns>返回一个新的<see cref="IRpcClient"/>实例,该实例能够直接向指定目标地址发起请求。</returns>
public static IRpcClient CreateTargetDmtpRpcActor(this IDmtpActorObject client, string targetId)
/// <returns>返回一个新的<see cref="IDmtpRpcActor"/>实例,该实例能够直接向指定目标地址发起请求。</returns>
public static IDmtpRpcActor CreateTargetDmtpRpcActor(this IDmtpActorObject client, string targetId)
{
// 使用指定的目标ID和当前客户端的DMTP RPC演员创建一个新的目标DMTP RPC演员。
return new TargetDmtpRpcActor(targetId, client.GetDmtpRpcActor());

View File

@@ -230,7 +230,7 @@ public class DmtpRpcFeature : PluginBase, IDmtpFeature, IDmtpHandshakingPlugin,
dmtpRpcActor.GetInvokeMethod = this.GetInvokeMethod;
dmtpRpcActor.SetProtocolFlags(this.StartProtocol);
client.DmtpActor.AddActor<DmtpRpcActor>(dmtpRpcActor);
client.DmtpActor.TryAddActor<DmtpRpcActor>(dmtpRpcActor);
await e.InvokeNext().ConfigureAwait(EasyTask.ContinueOnCapturedContext);
}

View File

@@ -264,7 +264,7 @@ public static class WebSocketExtension
/// <summary>
/// WebSocketMessageCombinatorProperty
/// </summary>
public static DependencyProperty<WebSocketMessageCombinator> WebSocketMessageCombinatorProperty =
public readonly static DependencyProperty<WebSocketMessageCombinator> WebSocketMessageCombinatorProperty =
new DependencyProperty<WebSocketMessageCombinator>("WebSocketMessageCombinator", (obj) =>
{
var combinator = new WebSocketMessageCombinator();

View File

@@ -119,6 +119,7 @@ public abstract class RpcAttribute : Attribute
codeString.AppendLine($"/// <exception cref=\"{item.Key.FullName}\">{item.Value}</exception>");
}
codeString.AppendLine("[AsyncToSyncWarning]");
codeString.Append("public static ");
codeString.Append(this.GetReturn(rpcMethod, false));
codeString.Append(' ');
@@ -244,7 +245,7 @@ public abstract class RpcAttribute : Attribute
{
codeString.AppendLine($"/// <exception cref=\"{item.Key.FullName}\">{item.Value}</exception>");
}
codeString.AppendLine("[AsyncToSyncWarning]");
codeString.Append("public ");
codeString.Append(this.GetReturn(rpcMethod, false));
codeString.Append(' ');
@@ -337,6 +338,7 @@ public abstract class RpcAttribute : Attribute
codeString.AppendLine($"/// <exception cref=\"{item.Key.FullName}\">{item.Value}</exception>");
}
codeString.AppendLine("[AsyncToSyncWarning]");
codeString.Append(this.GetReturn(rpcMethod, false));
codeString.Append(' ');
codeString.Append(this.GetMethodName(rpcMethod, false));

View File

@@ -29,7 +29,7 @@ public class ConcurrencyRpcDispatcher<TRpcActor, TCallContext> :DisposableObject
public bool Reenterable => true;
/// <inheritdoc/>
public Task Dispatcher(TRpcActor actor, TCallContext callContext, Func<object, Task> func)
public Task Dispatcher(TRpcActor actor, TCallContext callContext, Func<TCallContext, Task> func)
{
// 获取当前调用上下文中定义的RPC方法信息
var rpcMethod = callContext.RpcMethod;

View File

@@ -33,7 +33,7 @@ public interface IRpcDispatcher<TRpcActor, TCallContext>:IDisposableObject
/// <param name="callContext">调用的上下文信息,包含调用相关的元数据。</param>
/// <param name="func">一个函数委托,表示实际执行的异步操作。</param>
/// <returns>一个任务,表示异步操作的完成。</returns>
Task Dispatcher(TRpcActor actor, TCallContext callContext, Func<object, Task> func);
Task Dispatcher(TRpcActor actor, TCallContext callContext, Func<TCallContext, Task> func);
/// <summary>
/// 获取一个值,指示是否可重新进入。

View File

@@ -30,7 +30,7 @@ public class ImmediateRpcDispatcher<TRpcActor, TCallContext> :DisposableObject,
public bool Reenterable => false;
/// <inheritdoc/>
public Task Dispatcher(TRpcActor actor, TCallContext callContext, Func<object, Task> func)
public Task Dispatcher(TRpcActor actor, TCallContext callContext, Func<TCallContext, Task> func)
{
// 直接调用传入的函数并传递调用上下文,不进行任何处理或延迟执行。
return func(callContext);

View File

@@ -44,7 +44,7 @@ public class QueueRpcDispatcher<TRpcActor, TCallContext> : DisposableObject, IRp
private readonly AsyncQueue<InvokeContext> m_queue = new();
/// <inheritdoc/>
public Task Dispatcher(TRpcActor actor, TCallContext callContext, Func<object, Task> func)
public Task Dispatcher(TRpcActor actor, TCallContext callContext, Func<TCallContext, Task> func)
{
this.m_queue.Enqueue(new InvokeContext(callContext, func));
return EasyTask.CompletedTask;
@@ -84,13 +84,13 @@ public class QueueRpcDispatcher<TRpcActor, TCallContext> : DisposableObject, IRp
private readonly struct InvokeContext
{
public InvokeContext(TCallContext iDmtpRpcCallContext, Func<object, Task> func)
public InvokeContext(TCallContext iDmtpRpcCallContext, Func<TCallContext, Task> func)
{
this.IDmtpRpcCallContext = iDmtpRpcCallContext;
this.Func = func;
}
public Func<object, Task> Func { get; }
public Func<TCallContext, Task> Func { get; }
public TCallContext IDmtpRpcCallContext { get; }
}
}

View File

@@ -24,15 +24,17 @@ public static class RpcClientExtension
#region RpcClient
/// <inheritdoc cref="IRpcClient.InvokeAsync(string, Type, IInvokeOption, object[])"/>
[AsyncToSyncWarning]
public static object Invoke(this IRpcClient client, string invokeKey, Type returnType, IInvokeOption invokeOption, params object[] parameters)
{
return client.InvokeAsync(invokeKey, returnType, invokeOption, parameters).GetFalseAwaitResult();
}
/// <inheritdoc cref="IRpcClient.InvokeAsync(string, Type, IInvokeOption, object[])"/>
[AsyncToSyncWarning]
public static T InvokeT<T>(this IRpcClient client, string invokeKey, IInvokeOption invokeOption, params object[] parameters)
{
return (T)(client.Invoke(invokeKey, typeof(T), invokeOption, parameters));
return (T)(client.InvokeAsync(invokeKey, typeof(T), invokeOption, parameters).GetFalseAwaitResult());
}
/// <inheritdoc cref="IRpcClient.InvokeAsync(string, Type, IInvokeOption, object[])"/>
@@ -46,15 +48,17 @@ public static class RpcClientExtension
#region ITargetRpcClient
/// <inheritdoc cref="IRpcClient.InvokeAsync(string, Type, IInvokeOption, object[])"/>
[AsyncToSyncWarning]
public static object Invoke(this ITargetRpcClient client, string targetId, string invokeKey, Type returnType, IInvokeOption invokeOption, params object[] parameters)
{
return client.InvokeAsync(targetId, invokeKey, returnType, invokeOption, parameters).GetFalseAwaitResult();
}
/// <inheritdoc cref="ITargetRpcClient.InvokeAsync(string, string, Type, IInvokeOption, object[])"/>
[AsyncToSyncWarning]
public static T InvokeT<T>(this ITargetRpcClient client, string targetId, string invokeKey, IInvokeOption invokeOption, params object[] parameters)
{
return (T)(client.Invoke(targetId, invokeKey, typeof(T), invokeOption, parameters));
return (T)(client.InvokeAsync(targetId, invokeKey, typeof(T), invokeOption, parameters).GetFalseAwaitResult());
}
/// <inheritdoc cref="ITargetRpcClient.InvokeAsync(string, string, Type, IInvokeOption, object[])"/>

View File

@@ -90,7 +90,7 @@ public abstract class RpcDispatchProxy<TClient, TAttribute> : DispatchProxy wher
}
default:
{
result = this.GetClient().Invoke(invokeKey, rpcMethod.RealReturnType, invokeOption, ps);
result = this.GetClient().InvokeAsync(invokeKey, rpcMethod.RealReturnType, invokeOption, ps).GetFalseAwaitResult();
break;
}
}