Files
TouchSocket/RRQMSocket.RPC.JsonRpc/Socket/JsonRpcClient.cs
2021-08-13 21:41:12 +08:00

382 lines
15 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//------------------------------------------------------------------------------
// 此代码版权除特别声明或在RRQMCore.XREF命名空间的代码归作者本人若汝棋茗所有
// 源代码使用协议遵循本仓库的开源协议及附加协议若本仓库没有设置则按MIT开源协议授权
// CSDN博客https://blog.csdn.net/qq_40374647
// 哔哩哔哩视频https://space.bilibili.com/94253567
// Gitee源代码仓库https://gitee.com/RRQM_Home
// Github源代码仓库https://github.com/RRQM
// 交流QQ群234762506
// 感谢您的下载和使用
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using RRQMCore.ByteManager;
using RRQMCore.Exceptions;
using RRQMCore.Helper;
using RRQMCore.Run;
using RRQMCore.XREF.Newtonsoft.Json;
using RRQMCore.XREF.Newtonsoft.Json.Linq;
using RRQMSocket.Http;
using RRQMSocket.RPC.RRQMRPC;
using System;
using System.Text;
namespace RRQMSocket.RPC.JsonRpc
{
/// <summary>
/// JsonRpc客户端
/// </summary>
public class JsonRpcClient : TcpClient, IJsonRpcClient
{
private int maxPackageSize;
private JsonRpcProtocolType protocolType;
private RRQMWaitHandle<WaitResult> waitHandle;
/// <summary>
/// 构造函数
/// </summary>
public JsonRpcClient()
{
waitHandle = new RRQMWaitHandle<WaitResult>();
}
/// <summary>
/// 最大数据包长度
/// </summary>
public int MaxPackageSize
{
get { return maxPackageSize; }
}
/// <summary>
/// 协议类型
/// </summary>
public JsonRpcProtocolType ProtocolType
{
get { return protocolType; }
}
/// <summary>
/// RPC调用
/// </summary>
/// <param name="method">方法名</param>
/// <param name="invokeOption">调用配置</param>
/// <param name="parameters">参数</param>
/// <param name="types"></param>
/// <exception cref="RRQMTimeoutException"></exception>
/// <exception cref="RRQMRPCInvokeException"></exception>
/// <exception cref="RRQMException"></exception>
/// <returns></returns>
public T Invoke<T>(string method, InvokeOption invokeOption, ref object[] parameters, Type[] types)
{
JsonRpcWaitContext context = new JsonRpcWaitContext();
WaitData<WaitResult> waitData = this.waitHandle.GetWaitData(context);
ByteBlock byteBlock = this.BytePool.GetByteBlock(this.BufferLength);
if (invokeOption == null)
{
invokeOption = InvokeOption.WaitInvoke;
}
try
{
JObject jobject = new JObject();
jobject.Add("jsonrpc", JToken.FromObject("2.0"));
jobject.Add("method", JToken.FromObject(method));
jobject.Add("params", JToken.FromObject(parameters));
if (invokeOption.FeedbackType == FeedbackType.WaitInvoke)
{
jobject.Add("id", JToken.FromObject(context.Sign.ToString()));
}
else
{
jobject.Add("id", null);
}
switch (this.protocolType)
{
case JsonRpcProtocolType.Tcp:
{
byteBlock.Write(Encoding.UTF8.GetBytes(jobject.ToString(Formatting.None)));
break;
}
case JsonRpcProtocolType.Http:
{
HttpRequest httpRequest = new HttpRequest();
httpRequest.Method = "POST";
httpRequest.FromJson(jobject.ToString(Formatting.None));
httpRequest.Build(byteBlock);
}
break;
}
switch (invokeOption.FeedbackType)
{
case FeedbackType.OnlySend:
{
this.SendAsync(byteBlock);
}
break;
case FeedbackType.WaitSend:
case FeedbackType.WaitInvoke:
{
this.Send(byteBlock);
}
break;
default:
break;
}
}
catch (Exception ex)
{
throw ex;
}
finally
{
byteBlock.Dispose();
}
switch (invokeOption.FeedbackType)
{
case FeedbackType.OnlySend:
case FeedbackType.WaitSend:
{
this.waitHandle.Destroy(waitData);
return default;
}
case FeedbackType.WaitInvoke:
{
waitData.Wait(invokeOption.Timeout);
JsonRpcWaitContext resultContext = (JsonRpcWaitContext)waitData.WaitResult;
this.waitHandle.Destroy(waitData);
if (resultContext.Status == 0)
{
throw new RRQMTimeoutException("等待结果超时");
}
if (resultContext.error != null)
{
throw new RRQMRPCException(resultContext.error.message);
}
try
{
if (typeof(T).IsPrimitive || typeof(T) == typeof(string))
{
return (T)resultContext.Return.ToString().ParseToType(typeof(T));
}
return JsonConvert.DeserializeObject<T>(resultContext.Return.ToString());
}
catch (Exception ex)
{
throw ex;
}
}
default:
return default;
}
}
/// <summary>
/// RPC调用
/// </summary>
/// <param name="method">方法名</param>
/// <param name="invokeOption">调用配置</param>
/// <param name="parameters">参数</param>
/// <param name="types"></param>
/// <exception cref="RRQMTimeoutException"></exception>
/// <exception cref="RRQMRPCInvokeException"></exception>
/// <exception cref="RRQMException"></exception>
public void Invoke(string method, InvokeOption invokeOption, ref object[] parameters, Type[] types)
{
JsonRpcWaitContext context = new JsonRpcWaitContext();
WaitData<WaitResult> waitData = this.waitHandle.GetWaitData(context);
ByteBlock byteBlock = this.BytePool.GetByteBlock(this.BufferLength);
if (invokeOption == null)
{
invokeOption = InvokeOption.WaitInvoke;
}
try
{
JObject jobject = new JObject();
jobject.Add("jsonrpc", JToken.FromObject("2.0"));
jobject.Add("method", JToken.FromObject(method));
jobject.Add("params", JToken.FromObject(parameters));
if (invokeOption.FeedbackType == FeedbackType.WaitInvoke)
{
jobject.Add("id", JToken.FromObject(context.Sign.ToString()));
}
else
{
jobject.Add("id", null);
}
switch (this.protocolType)
{
case JsonRpcProtocolType.Tcp:
{
byteBlock.Write(Encoding.UTF8.GetBytes(jobject.ToString(Formatting.None)));
break;
}
case JsonRpcProtocolType.Http:
{
HttpRequest httpRequest = new HttpRequest();
httpRequest.Method = "POST";
httpRequest.FromJson(jobject.ToString(Formatting.None));
httpRequest.Build(byteBlock);
}
break;
}
switch (invokeOption.FeedbackType)
{
case FeedbackType.OnlySend:
{
this.SendAsync(byteBlock);
}
break;
case FeedbackType.WaitSend:
case FeedbackType.WaitInvoke:
{
this.Send(byteBlock);
}
break;
default:
break;
}
}
catch (Exception ex)
{
throw ex;
}
finally
{
byteBlock.Dispose();
}
switch (invokeOption.FeedbackType)
{
case FeedbackType.OnlySend:
case FeedbackType.WaitSend:
{
this.waitHandle.Destroy(waitData);
return;
}
case FeedbackType.WaitInvoke:
{
waitData.Wait(invokeOption.Timeout);
JsonRpcWaitContext resultContext = (JsonRpcWaitContext)waitData.WaitResult;
this.waitHandle.Destroy(waitData);
if (resultContext.Status == 0)
{
throw new RRQMTimeoutException("等待结果超时");
}
if (resultContext.error != null)
{
throw new RRQMRPCException(resultContext.error.message);
}
return;
}
default:
return;
}
}
/// <summary>
/// RPC调用
/// </summary>
/// <param name="method">方法名</param>
/// <param name="invokeOption">调用配置</param>
/// <param name="parameters">参数</param>
/// <exception cref="RRQMTimeoutException"></exception>
/// <exception cref="RRQMRPCInvokeException"></exception>
/// <exception cref="RRQMException"></exception>
public void Invoke(string method, InvokeOption invokeOption, params object[] parameters)
{
this.Invoke(method, invokeOption, ref parameters, null);
}
/// <summary>
/// RPC调用
/// </summary>
/// <param name="method">方法名</param>
/// <param name="invokeOption">调用配置</param>
/// <param name="parameters">参数</param>
/// <exception cref="RRQMTimeoutException"></exception>
/// <exception cref="RRQMRPCInvokeException"></exception>
/// <exception cref="RRQMException"></exception>
/// <returns></returns>
public T Invoke<T>(string method, InvokeOption invokeOption, params object[] parameters)
{
return this.Invoke<T>(method, invokeOption, ref parameters, null);
}
/// <summary>
/// 处理数据
/// </summary>
/// <param name="byteBlock"></param>
/// <param name="obj"></param>
protected override void HandleReceivedData(ByteBlock byteBlock, object obj)
{
switch (this.protocolType)
{
case JsonRpcProtocolType.Tcp:
{
string jsonString = Encoding.UTF8.GetString(byteBlock.Buffer, 0, byteBlock.Len);
JsonResponseContext responseContext = (JsonResponseContext)JsonConvert.DeserializeObject(jsonString, typeof(JsonResponseContext));
if (responseContext != null)
{
JsonRpcWaitContext waitContext = new JsonRpcWaitContext();
waitContext.Status = 1;
waitContext.Sign = int.Parse(responseContext.id);
waitContext.error = responseContext.error;
waitContext.Return = responseContext.result;
this.waitHandle.SetRun(waitContext);
}
break;
}
case JsonRpcProtocolType.Http:
{
HttpResponse httpResponse = (HttpResponse)obj;
JsonResponseContext responseContext = (JsonResponseContext)JsonConvert.DeserializeObject(httpResponse.Body, typeof(JsonResponseContext));
if (responseContext != null)
{
JsonRpcWaitContext waitContext = new JsonRpcWaitContext();
waitContext.Status = 1;
waitContext.Sign = int.Parse(responseContext.id);
waitContext.error = responseContext.error;
waitContext.Return = responseContext.result;
this.waitHandle.SetRun(waitContext);
}
break;
}
}
}
/// <summary>
/// 载入配置
/// </summary>
/// <param name="clientConfig"></param>
protected override void LoadConfig(TcpClientConfig clientConfig)
{
base.LoadConfig(clientConfig);
this.maxPackageSize = (int)clientConfig.GetValue(JsonRpcClientConfig.MaxPackageSizeProperty);
this.protocolType = (JsonRpcProtocolType)clientConfig.GetValue(JsonRpcClientConfig.ProtocolTypeProperty);
switch (this.protocolType)
{
case JsonRpcProtocolType.Tcp:
base.SetDataHandlingAdapter(new TerminatorDataHandlingAdapter(this.maxPackageSize, "\r\n"));
break;
case JsonRpcProtocolType.Http:
base.SetDataHandlingAdapter(new HttpDataHandlingAdapter(this.maxPackageSize, HttpType.Client));
break;
}
}
}
}