优化QuestDb IHttpClientFactory及IServiceCollection相关逻辑

This commit is contained in:
d4ilys
2025-10-16 09:49:57 +08:00
parent 704bb5fc6b
commit 3b10c5f428
3 changed files with 89 additions and 70 deletions

View File

@@ -21,6 +21,7 @@ using System.Threading.Tasks;
using System.Web;
using FreeSql.Provider.QuestDb;
using System.Net;
using Microsoft.Extensions.DependencyInjection;
public static partial class QuestDbGlobalExtensions
{
@@ -35,8 +36,51 @@ public static partial class QuestDbGlobalExtensions
private static readonly QuestDbAdo _questDbAdo = new QuestDbAdo();
public static FreeSqlBuilder UseQuestDbRestAPI(this FreeSqlBuilder build, string host, string username = "",
string password = "") => RestAPIExtension.UseQuestDbRestAPI(build, host, username, password);
/// <summary>
/// 启动QuestDb Http功能
/// </summary>
/// <param name="builder"></param>
/// <param name="host"></param>
/// <param name="username"></param>
/// <param name="password"></param>
/// <returns></returns>
public static FreeSqlBuilder UseQuestDbRestAPI(this FreeSqlBuilder builder, string host, string username = "",
string password = "")
{
//初始化容器添加HttpClient
ServiceContainer.Initialize(service =>
{
service.AddHttpClient("QuestDb", client => client.BaseAddress = new Uri(host))
.ConfigurePrimaryHttpMessageHandler(handlerBuilder =>
{
//忽略SSL验证
return new HttpClientHandler
{
ClientCertificateOptions = ClientCertificateOption.Manual,
ServerCertificateCustomValidationCallback =
(httpRequestMessage, cert, certChain, policyErrors) => true
};
});
var description = new QuestResetApiFeatures()
{
BaseAddress = host
};
if (!string.IsNullOrWhiteSpace(username) && !string.IsNullOrWhiteSpace(password))
{
var base64 = Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}"));
description.BasicToken = $"Basic {base64}";
}
service.AddSingleton(description);
});
//RestApi需要无参数
builder.UseNoneCommandParameter(true);
return builder;
}
/// <summary>
/// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。
@@ -50,7 +94,7 @@ public static partial class QuestDbGlobalExtensions
public static ISelect<T1> LatestOn<T1, TKey>(this ISelect<T1> select, Expression<Func<T1, DateTime?>> timestamp,
Expression<Func<T1, TKey>> partition)
{
LatestOnExtension.InternelImpl(timestamp, partition);
LatestOnExtension.InternalImpl(timestamp, partition);
return select;
}
@@ -67,7 +111,7 @@ public static partial class QuestDbGlobalExtensions
Expression<Func<T1, DateTime?>> timestamp,
Expression<Func<T1, TKey>> partition) where T2 : class
{
LatestOnExtension.InternelImpl(timestamp, partition);
LatestOnExtension.InternalImpl(timestamp, partition);
return select;
}
@@ -84,7 +128,7 @@ public static partial class QuestDbGlobalExtensions
Expression<Func<T1, DateTime?>> timestamp,
Expression<Func<T1, TKey>> partition) where T2 : class where T3 : class
{
LatestOnExtension.InternelImpl(timestamp, partition);
LatestOnExtension.InternalImpl(timestamp, partition);
return select;
}
@@ -101,7 +145,7 @@ public static partial class QuestDbGlobalExtensions
Expression<Func<T1, DateTime?>> timestamp,
Expression<Func<T1, TKey>> partition) where T2 : class where T3 : class where T4 : class
{
LatestOnExtension.InternelImpl(timestamp, partition);
LatestOnExtension.InternalImpl(timestamp, partition);
return select;
}
@@ -162,19 +206,19 @@ public static partial class QuestDbGlobalExtensions
public static async Task<int> ExecuteQuestDbBulkCopyAsync<T>(this IInsert<T> that,
string dateFormat = "yyyy/M/d H:mm:ss") where T : class
{
//思路:通过提供的RestAPI imp实现快速复制
if (string.IsNullOrWhiteSpace(RestAPIExtension.BaseUrl))
var features = ServiceContainer.GetService<QuestResetApiFeatures>();
if (string.IsNullOrWhiteSpace(features.BaseAddress))
{
throw new Exception(
"BulkCopy功能需要启用RestAPI启用方式new FreeSqlBuilder().UseQuestDbRestAPI(\"localhost:9000\", \"username\", \"password\")");
@"BulkCopy功能需要启用RestAPI启用方式new FreeSqlBuilder().UseQuestDbRestAPI(""localhost:9000"", ""username"", ""password"")");
}
var result = 0;
try
{
var client = QuestDbContainer.GetService<IHttpClientFactory>().CreateClient();
var boundary = "---------------" + DateTime.Now.Ticks.ToString("x");
var boundary = $"---------------{DateTime.Now.Ticks:x}";
var list = new List<Hashtable>();
var insert = that as QuestDbInsert<T>;
var name = insert.InternalTableRuleInvoke(); //获取表名
@@ -199,7 +243,7 @@ public static partial class QuestDbGlobalExtensions
}
});
var schema = JsonConvert.SerializeObject(list);
using (MemoryStream stream = new MemoryStream())
using (var stream = new MemoryStream())
{
//写入CSV文件
using (var writer = new StreamWriter(stream))
@@ -208,29 +252,27 @@ public static partial class QuestDbGlobalExtensions
await csv.WriteRecordsAsync(insert._source);
}
var client = features.HttpClient;
var httpContent = new MultipartFormDataContent(boundary);
if (!string.IsNullOrWhiteSpace(RestAPIExtension.authorization))
client.DefaultRequestHeaders.Add("Authorization", RestAPIExtension.authorization);
if (!string.IsNullOrWhiteSpace(features.BasicToken))
client.DefaultRequestHeaders.Add("Authorization", features.BasicToken);
httpContent.Add(new StringContent(schema), "schema");
httpContent.Add(new ByteArrayContent(stream.ToArray()), "data");
//boundary带双引号 可能导致服务器错误情况
httpContent.Headers.Remove("Content-Type");
httpContent.Headers.TryAddWithoutValidation("Content-Type",
"multipart/form-data; boundary=" + boundary);
$"multipart/form-data; boundary={boundary}");
var httpResponseMessage =
await client.PostAsync($"{RestAPIExtension.BaseUrl}/imp?name={name}", httpContent);
await client.PostAsync($"imp?name={name}", httpContent);
var readAsStringAsync = await httpResponseMessage.Content.ReadAsStringAsync();
var splitByLine = SplitByLine(readAsStringAsync);
foreach (var s in splitByLine)
foreach (var strings in from s in splitByLine
where s.Contains("Rows")
select s.Split('|')
into strings
where strings[1].Trim() == "Rows imported"
select strings)
{
if (s.Contains("Rows"))
{
var strings = s.Split('|');
if (strings[1].Trim() == "Rows imported")
{
result = Convert.ToInt32(strings[2].Trim());
}
}
result = Convert.ToInt32(strings[2].Trim());
}
}
}
@@ -249,7 +291,8 @@ public static partial class QuestDbGlobalExtensions
/// <param name="insert"></param>
/// <param name="dateFormat">导入时,时间格式 默认:yyyy/M/d H:mm:ss</param>
/// <returns></returns>
public static int ExecuteQuestDbBulkCopy<T>(this IInsert<T> insert, string dateFormat = "yyyy/M/d H:mm:ss") where T : class
public static int ExecuteQuestDbBulkCopy<T>(this IInsert<T> insert, string dateFormat = "yyyy/M/d H:mm:ss")
where T : class
{
return ExecuteQuestDbBulkCopyAsync(insert, dateFormat).ConfigureAwait(false).GetAwaiter().GetResult();
}
@@ -294,7 +337,7 @@ static class LatestOnExtension
LatestOnString.Value = string.Empty;
}
internal static void InternelImpl<T1, TKey>(Expression<Func<T1, DateTime?>> timestamp,
internal static void InternalImpl<T1, TKey>(Expression<Func<T1, DateTime?>> timestamp,
Expression<Func<T1, TKey>> partition)
{
IsExistence.Value = true;
@@ -308,42 +351,22 @@ static class LatestOnExtension
}
}
static class RestAPIExtension
class QuestResetApiFeatures
{
internal static string BaseUrl = string.Empty;
internal static string authorization = string.Empty;
internal string BaseAddress { get; set; }
internal static async Task<string> ExecAsync(string sql)
internal string BasicToken { get; set; }
internal HttpClient HttpClient => ServiceContainer.GetService<IHttpClientFactory>().CreateClient("QuestDb");
internal async Task<string> ExecAsync(string sql)
{
//HTTP GET 执行SQL
var result = string.Empty;
var client = QuestDbContainer.GetService<IHttpClientFactory>().CreateClient();
var url = $"{BaseUrl}/exec?query={HttpUtility.UrlEncode(sql)}";
if (!string.IsNullOrWhiteSpace(authorization))
client.DefaultRequestHeaders.Add("Authorization", authorization);
var httpResponseMessage = await client.GetAsync(url);
result = await httpResponseMessage.Content.ReadAsStringAsync();
var url = $"exec?query={HttpUtility.UrlEncode(sql)}";
if (!string.IsNullOrWhiteSpace(BasicToken))
HttpClient.DefaultRequestHeaders.Add("Authorization", BasicToken);
var httpResponseMessage = await HttpClient.GetAsync(url);
var result = await httpResponseMessage.Content.ReadAsStringAsync();
return result;
}
internal static FreeSqlBuilder UseQuestDbRestAPI(FreeSqlBuilder buider, string host, string username = "",
string password = "")
{
BaseUrl = host;
if (BaseUrl.EndsWith("/"))
BaseUrl = BaseUrl.Remove(BaseUrl.Length - 1);
if (!BaseUrl.ToLower().StartsWith("http"))
BaseUrl = $"http://{BaseUrl}";
//生成TOKEN
if (!string.IsNullOrWhiteSpace(username) && !string.IsNullOrWhiteSpace(password))
{
var base64 = Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}"));
authorization = $"Basic {base64}";
}
//RestApi需要无参数
buider.UseNoneCommandParameter(true);
return buider;
}
}

View File

@@ -118,10 +118,6 @@ namespace FreeSql.QuestDb
Select0Provider._dicMethodDataReaderGetValue[typeof(Guid)] =
typeof(DbDataReader).GetMethod("GetGuid", new Type[] { typeof(int) });
QuestDbContainer.Initialize(service =>
{
service.AddHttpClient();
});
}
}

View File

@@ -1,21 +1,21 @@
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Text;
using Microsoft.Extensions.DependencyInjection;
namespace FreeSql.Provider.QuestDb
{
internal class QuestDbContainer
internal class ServiceContainer
{
//作用于HttpClientFatory
private static IServiceCollection Services;
private static IServiceCollection _services;
internal static IServiceProvider ServiceProvider { get; private set; }
internal static void Initialize(Action<IServiceCollection> service)
{
Services = new ServiceCollection();
service?.Invoke(Services);
ServiceProvider = Services.BuildServiceProvider();
_services = new ServiceCollection();
service?.Invoke(_services);
ServiceProvider = _services.BuildServiceProvider();
}
internal static T GetService<T>()