diff --git a/.github/workflows/pr_run_test_ci.yml b/.github/workflows/pr_run_test_ci.yml index 076c03b71..d73fe8ad3 100644 --- a/.github/workflows/pr_run_test_ci.yml +++ b/.github/workflows/pr_run_test_ci.yml @@ -87,6 +87,11 @@ jobs: analysis-icu analysis-smartcn analysis-kuromoji + - name: Setup ClickHouse + uses: vahid-sohrabloo/clickhouse-action@v1 + with: + # Version of ClickHouse to use + version: latest # optional, default is latest - name: Build and analyze for internal env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # Needed to get PR information, if any diff --git a/Masa.Framework.sln b/Masa.Framework.sln index 7bb958301..e5b607c0d 100644 --- a/Masa.Framework.sln +++ b/Masa.Framework.sln @@ -709,9 +709,13 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tsc", "tsc", "{6042AE23-A07 EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "DynamicsCRM", "DynamicsCRM", "{64B54122-44F1-4379-9422-953EF706A3A6}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Masa.Utils.DynamicsCrm.Core", "src\Utils\DynamicsCrm\Masa.Utils.DynamicsCrm.Core\Masa.Utils.DynamicsCrm.Core.csproj", "{83310F46-E1C7-4438-B32A-9F6F7EA13FCF}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Masa.Utils.DynamicsCrm.Core", "src\Utils\DynamicsCrm\Masa.Utils.DynamicsCrm.Core\Masa.Utils.DynamicsCrm.Core.csproj", "{83310F46-E1C7-4438-B32A-9F6F7EA13FCF}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Masa.Utils.DynamicsCrm.EntityFrameworkCore", "src\Utils\DynamicsCrm\Masa.Utils.DynamicsCrm.EntityFrameworkCore\Masa.Utils.DynamicsCrm.EntityFrameworkCore.csproj", "{8A51A2A9-FBF4-40DC-AD89-AD3B9D3A50DC}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Masa.Utils.DynamicsCrm.EntityFrameworkCore", "src\Utils\DynamicsCrm\Masa.Utils.DynamicsCrm.EntityFrameworkCore\Masa.Utils.DynamicsCrm.EntityFrameworkCore.csproj", "{8A51A2A9-FBF4-40DC-AD89-AD3B9D3A50DC}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Masa.Contrib.StackSdks.Tsc.Clickhouse", "src\Contrib\StackSdks\Masa.Contrib.StackSdks.Tsc.Clickhouse\Masa.Contrib.StackSdks.Tsc.Clickhouse.csproj", "{43389D12-17E1-4F07-9A42-5CFCC24D08B2}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests", "src\Contrib\StackSdks\Tests\Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests\Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests.csproj", "{289BF8C8-968F-4105-A65E-C1C6FD8857F2}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -2569,6 +2573,22 @@ Global {8A51A2A9-FBF4-40DC-AD89-AD3B9D3A50DC}.Release|Any CPU.Build.0 = Release|Any CPU {8A51A2A9-FBF4-40DC-AD89-AD3B9D3A50DC}.Release|x64.ActiveCfg = Release|Any CPU {8A51A2A9-FBF4-40DC-AD89-AD3B9D3A50DC}.Release|x64.Build.0 = Release|Any CPU + {43389D12-17E1-4F07-9A42-5CFCC24D08B2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {43389D12-17E1-4F07-9A42-5CFCC24D08B2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {43389D12-17E1-4F07-9A42-5CFCC24D08B2}.Debug|x64.ActiveCfg = Debug|Any CPU + {43389D12-17E1-4F07-9A42-5CFCC24D08B2}.Debug|x64.Build.0 = Debug|Any CPU + {43389D12-17E1-4F07-9A42-5CFCC24D08B2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {43389D12-17E1-4F07-9A42-5CFCC24D08B2}.Release|Any CPU.Build.0 = Release|Any CPU + {43389D12-17E1-4F07-9A42-5CFCC24D08B2}.Release|x64.ActiveCfg = Release|Any CPU + {43389D12-17E1-4F07-9A42-5CFCC24D08B2}.Release|x64.Build.0 = Release|Any CPU + {289BF8C8-968F-4105-A65E-C1C6FD8857F2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {289BF8C8-968F-4105-A65E-C1C6FD8857F2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {289BF8C8-968F-4105-A65E-C1C6FD8857F2}.Debug|x64.ActiveCfg = Debug|Any CPU + {289BF8C8-968F-4105-A65E-C1C6FD8857F2}.Debug|x64.Build.0 = Debug|Any CPU + {289BF8C8-968F-4105-A65E-C1C6FD8857F2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {289BF8C8-968F-4105-A65E-C1C6FD8857F2}.Release|Any CPU.Build.0 = Release|Any CPU + {289BF8C8-968F-4105-A65E-C1C6FD8857F2}.Release|x64.ActiveCfg = Release|Any CPU + {289BF8C8-968F-4105-A65E-C1C6FD8857F2}.Release|x64.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -2921,6 +2941,8 @@ Global {64B54122-44F1-4379-9422-953EF706A3A6} = {5944A182-13B8-4DA6-AEE2-0A01E64A9648} {83310F46-E1C7-4438-B32A-9F6F7EA13FCF} = {64B54122-44F1-4379-9422-953EF706A3A6} {8A51A2A9-FBF4-40DC-AD89-AD3B9D3A50DC} = {64B54122-44F1-4379-9422-953EF706A3A6} + {43389D12-17E1-4F07-9A42-5CFCC24D08B2} = {6042AE23-A07E-4F6F-B1C3-F17617AEB722} + {289BF8C8-968F-4105-A65E-C1C6FD8857F2} = {E4AD67C8-9255-4013-A3C4-962694399770} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {40383055-CC50-4600-AD9A-53C14F620D03} diff --git a/src/BuildingBlocks/StackSdks/Masa.BuildingBlocks.StackSdks.Tsc.Contracts/Model/Aggregate/SimpleAggregateRequestDto.cs b/src/BuildingBlocks/StackSdks/Masa.BuildingBlocks.StackSdks.Tsc.Contracts/Model/Aggregate/SimpleAggregateRequestDto.cs index 1d2f35507..88931700b 100644 --- a/src/BuildingBlocks/StackSdks/Masa.BuildingBlocks.StackSdks.Tsc.Contracts/Model/Aggregate/SimpleAggregateRequestDto.cs +++ b/src/BuildingBlocks/StackSdks/Masa.BuildingBlocks.StackSdks.Tsc.Contracts/Model/Aggregate/SimpleAggregateRequestDto.cs @@ -17,4 +17,9 @@ public class SimpleAggregateRequestDto : BaseRequestDto /// currently support elasticsearch: https://www.elastic.co/guide/en/elasticsearch/reference/7.17/search-aggregations-bucket-datehistogram-aggregation.html /// public string Interval { get; set; } + + /// + /// only fro type Group by, true return type is IEnumerable>,false is IEnumerable + /// + public bool AllValue { get; set; } } diff --git a/src/BuildingBlocks/StackSdks/Masa.BuildingBlocks.StackSdks.Tsc.Contracts/Service/ILogService.cs b/src/BuildingBlocks/StackSdks/Masa.BuildingBlocks.StackSdks.Tsc.Contracts/Service/ILogService.cs index 1cb2a0a03..019f14608 100644 --- a/src/BuildingBlocks/StackSdks/Masa.BuildingBlocks.StackSdks.Tsc.Contracts/Service/ILogService.cs +++ b/src/BuildingBlocks/StackSdks/Masa.BuildingBlocks.StackSdks.Tsc.Contracts/Service/ILogService.cs @@ -10,7 +10,7 @@ public interface ILogService Task> GetMappingAsync(); /// - /// when query type: Count,Sum,Avg and DistinctCount return type is double, DateHistogram return IEnumerable> ,GroupBy return IEnumerable + /// when query type: Count,Sum,Avg and DistinctCount return type is double, DateHistogram return IEnumerable> ,GroupBy return IEnumerable, AllValue is true return IEnumerable> /// /// /// diff --git a/src/BuildingBlocks/StackSdks/Masa.BuildingBlocks.StackSdks.Tsc.Contracts/Service/ITraceService.cs b/src/BuildingBlocks/StackSdks/Masa.BuildingBlocks.StackSdks.Tsc.Contracts/Service/ITraceService.cs index aab8560fa..d91502a9e 100644 --- a/src/BuildingBlocks/StackSdks/Masa.BuildingBlocks.StackSdks.Tsc.Contracts/Service/ITraceService.cs +++ b/src/BuildingBlocks/StackSdks/Masa.BuildingBlocks.StackSdks.Tsc.Contracts/Service/ITraceService.cs @@ -17,4 +17,6 @@ public interface ITraceService /// /// Task AggregateAsync(SimpleAggregateRequestDto query); + + Task GetMaxDelayTraceIdAsync(BaseRequestDto query); } diff --git a/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Extensions/IDbConnectionExtensitions.cs b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Extensions/IDbConnectionExtensitions.cs new file mode 100644 index 000000000..8d1ec72d1 --- /dev/null +++ b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Extensions/IDbConnectionExtensitions.cs @@ -0,0 +1,529 @@ +// Copyright (c) MASA Stack All rights reserved. +// Licensed under the MIT License. See LICENSE.txt in the project root for license information. + +namespace System.Data.Common; + +internal static class IDbConnectionExtensitions +{ + public static PaginatedListBase QueryTrace(this IDbConnection connection, BaseRequestDto query) + { + var (where, parameters, ors) = AppendWhere(query); + var orderBy = AppendOrderBy(query, false); + var countSql = CombineOrs($"select count() as `total` from {MasaStackClickhouseConnection.TraceTable} where {where}", ors); + var total = Convert.ToInt64(ExecuteScalar(connection, $"select sum(`total`) from {countSql}", parameters?.ToArray())); + var start = (query.Page - 1) * query.PageSize; + var result = new PaginatedListBase() { Total = total, Result = new() }; + if (total > 0 && start - total < 0) + { + var querySql = CombineOrs($"select ServiceName,Timestamp,TraceId,SpanId,ParentSpanId,TraceState,SpanKind,Duration,SpanName,Spans,Resources from {MasaStackClickhouseConnection.TraceTable} where {where}", ors, orderBy); + result.Result = Query(connection, $"select * from {querySql} as t limit {start},{query.PageSize}", parameters?.ToArray(), ConvertTraceDto); + } + return result; + } + + public static PaginatedListBase QueryLog(this IDbConnection connection, BaseRequestDto query) + { + var (where, parameters, ors) = AppendWhere(query, false); + var orderBy = AppendOrderBy(query, true); + var countSql = CombineOrs($"select count() as `total` from {MasaStackClickhouseConnection.LogTable} where {where}", ors); + var total = Convert.ToInt64(ExecuteScalar(connection, $"select sum(`total`) from {countSql}", parameters?.ToArray())); + var start = (query.Page - 1) * query.PageSize; + var result = new PaginatedListBase() { Total = total, Result = new() }; + + + if (total > 0 && start - total < 0) + { + var querySql = CombineOrs($"select Timestamp,TraceId,SpanId,TraceFlags,SeverityText,SeverityNumber,ServiceName,Body,Resources,Logs from {MasaStackClickhouseConnection.LogTable} where {where}", ors, orderBy); + result.Result = Query(connection, $"select * from {querySql} as t limit {start},{query.PageSize}", parameters?.ToArray(), ConvertLogDto); + } + return result; + } + + private static string CombineOrs(string sql, IEnumerable ors, string? orderBy = null) + { + if (ors == null || !ors.Any()) + return $"({sql} {orderBy})"; + + var text = new StringBuilder(); + foreach (var or in ors) + { + text.AppendLine($" union all {sql}{or} {orderBy}"); + } + text.Remove(0, 11).Insert(0, '(').Append(')'); + return text.ToString(); + } + + public static List GetMapping(this IDbConnection dbConnection, bool isLog) + { + var type = isLog ? "log" : "trace"; + var result = dbConnection.Query($"select DISTINCT Name from otel_mapping Array join Name where `Type`='{type}_basic' order by Name", default, ConvertToMapping); + if (result == null || !result.Any()) + return default!; + + var attributes = dbConnection.Query($"select DISTINCT concat('Attributes.',Name) from otel_mapping Array join Name where `Type`='{type}_attributes' order by Name", default, ConvertToMapping); + var resources = dbConnection.Query("select DISTINCT concat('Resource.',Name) from otel_mapping Array join Name where `Type`='resource' order by Name", default, ConvertToMapping); + if (attributes != null && attributes.Any()) result.AddRange(attributes); + if (resources != null && resources.Any()) result.AddRange(resources); + + return result; + } + + public static List GetTraceByTraceId(this IDbConnection connection, string traceId) + { + string where = $"TraceId=@TraceId"; + return Query(connection, $"select * from (select Timestamp,TraceId,SpanId,ParentSpanId,TraceState,SpanKind,Duration,SpanName,Spans,Resources from {MasaStackClickhouseConnection.TraceTable} where {where}) as t limit 1000", new IDataParameter[] { new ClickHouseParameter { ParameterName = "TraceId", Value = traceId } }, ConvertTraceDto); + } + + public static string AppendOrderBy(BaseRequestDto query, bool isLog) + { + var str = query.Sort?.IsDesc ?? false ? " desc" : ""; + return $" order by Timestamp{str}"; + } + + public static (string where, List @parameters, List ors) AppendWhere(BaseRequestDto query, bool isTrace = true) + { + var sql = new StringBuilder(); + var @paramerters = new List(); + + if (query.Start > DateTime.MinValue && query.Start < DateTime.MaxValue + && query.End > DateTime.MinValue && query.End < DateTime.MaxValue + && query.End > query.Start) + { + sql.Append($" and Timestamp BETWEEN @Start and @End"); + @paramerters.Add(new ClickHouseParameter() { ParameterName = "Start", Value = query.Start.ToLocalTime(), DbType = DbType.DateTime2 }); + @paramerters.Add(new ClickHouseParameter() { ParameterName = "End", Value = query.End.ToLocalTime(), DbType = DbType.DateTime2 }); + } + if (!string.IsNullOrEmpty(query.Service)) + { + sql.Append(" and ServiceName=@ServiceName"); + @paramerters.Add(new ClickHouseParameter() { ParameterName = "ServiceName", Value = query.Service }); + } + if (!string.IsNullOrEmpty(query.Instance)) + { + sql.Append(" and `Resource.service.instance.id`=@ServiceInstanceId"); + @paramerters.Add(new ClickHouseParameter() { ParameterName = "ServiceInstanceId", Value = query.Instance }); + } + if (isTrace && !string.IsNullOrEmpty(query.Endpoint)) + { + sql.Append(" and `Attributes.http.target`=@HttpTarget"); + @paramerters.Add(new ClickHouseParameter() { ParameterName = "HttpTarget", Value = query.Instance }); + } + var ors = AppendKeyword(query.Keyword, paramerters, isTrace); + AppendConditions(query.Conditions, paramerters, sql, isTrace); + + if (!string.IsNullOrEmpty(query.RawQuery)) + sql.Append($" and ({query.RawQuery})"); + + if (sql.Length > 0) + sql.Remove(0, 4); + return (sql.ToString(), @paramerters, ors); + } + + private static List AppendKeyword(string keyword, List @paramerters, bool isTrace = true) + { + var sqls = new List(); + if (string.IsNullOrEmpty(keyword)) + return sqls; + + //status_code + if (int.TryParse(keyword, out var num) && num != 0 && num - 1000 < 0 && isTrace) + { + sqls.Add(" and `Attributes.http.status_code`=@HttpStatusCode"); + sqls.Add(" and `Attributes.http.request_content_body` like @Keyword"); + paramerters.Add(new ClickHouseParameter() { ParameterName = "HttpStatusCode", Value = num }); + paramerters.Add(new ClickHouseParameter() { ParameterName = "Keyword", Value = $"%{keyword}%" }); + return sqls; + } + + if (isTrace) + { + sqls.Add(" and `Attributes.http.request_content_body` like @Keyword"); + sqls.Add(" and `Attributes.http.response_content_body` like @Keyword"); + sqls.Add(" and `Attributes.exception.message` like @Keyword"); + } + else + { + if (keyword.Equals("error", StringComparison.CurrentCultureIgnoreCase)) + sqls.Add(" and SeverityText='Error'"); + sqls.Add(" and Body like @Keyword"); + sqls.Add(" and `Attributes.exception.message` like @Keyword"); + } + paramerters.Add(new ClickHouseParameter() { ParameterName = "Keyword", Value = $"%{keyword}%" }); + return sqls; + } + + private static void AppendConditions(IEnumerable? conditions, List @paramerters, StringBuilder sql, bool isTrace = true) + { + if (conditions == null || !conditions.Any()) + return; + + foreach (var item in conditions) + { + var name = GetName(item.Name, !isTrace); + + if (item.Value is DateTime time) + { + item.Value = time.ToLocalTime(); + } + if (item.Name.StartsWith("resource.", StringComparison.CurrentCultureIgnoreCase)) + { + var filed = item.Name["resource.".Length..]; + if (string.Equals(filed, "service.name")) + { + AppendField(item, @paramerters, sql, name, "ServiceName"); + } + else if (string.Equals(filed, "service.instance.id")) + { + AppendField(item, @paramerters, sql, name, "ServiceInstanceId"); + } + else if (string.Equals(filed, "service.namespace")) + { + AppendField(item, @paramerters, sql, name, "ServiceNameSpace"); + } + } + else if (item.Name.StartsWith("attributes.", StringComparison.CurrentCultureIgnoreCase)) + { + var filed = item.Name["attributes.".Length..]; + AppendField(item, @paramerters, sql, name, filed.Replace('.', '_')); + } + else + { + AppendField(item, @paramerters, sql, name, name); + } + } + } + + private static void AppendField(FieldConditionDto item, List @paramerters, StringBuilder sql, string fieldName, string paramName) + { + if (item.Value is string str && string.IsNullOrEmpty(str) || item.Value is IEnumerable collects && !collects.Any()) + return; + switch (item.Type) + { + case ConditionTypes.Equal: + { + if (@paramerters.Exists(p => p.ParameterName == paramName)) + break; + ParseWhere(sql, item.Value, paramerters, fieldName, paramName, "="); + } + break; + case ConditionTypes.NotIn: + { + ParseWhere(sql, item.Value, paramerters, fieldName, $"{paramName}s", "not in"); + } + break; + case ConditionTypes.In: + { + ParseWhere(sql, item.Value, paramerters, fieldName, $"{paramName}s", "in"); + } + break; + case ConditionTypes.LessEqual: + { + ParseWhere(sql, item.Value, paramerters, fieldName, $"lte_{paramName}", "<="); + } + break; + case ConditionTypes.GreatEqual: + { + ParseWhere(sql, item.Value, paramerters, fieldName, $"gte_{paramName}", ">="); + } + break; + case ConditionTypes.Less: + { + ParseWhere(sql, item.Value, paramerters, fieldName, $"lt_{paramName}", "<"); + } + break; + case ConditionTypes.Great: + { + ParseWhere(sql, item.Value, paramerters, fieldName, $"gt_{paramName}", ">"); + } + break; + } + } + + private static void ParseWhere(StringBuilder sql, object value, List @paramerters, string fieldName, string paramName, string compare) + { + DbType dbType = value is DateTime ? DbType.DateTime2 : DbType.AnsiString; + sql.Append($" and {fieldName} {compare} @{paramName}"); + @paramerters.Add(new ClickHouseParameter { ParameterName = $"{paramName}", Value = value, DbType = dbType }); + } + + public static object? ExecuteScalar(this IDbConnection dbConnection, string sql, IDataParameter[]? @parameters = null) + { + using var cmd = dbConnection.CreateCommand(); + cmd.CommandText = sql; + if (@parameters != null && @parameters.Any()) + foreach (var p in @parameters) + cmd.Parameters.Add(p); + OpenConnection(dbConnection); + try + { + return cmd.ExecuteScalar(); + } + catch (Exception ex) + { + ServiceExtensitions.Logger?.LogError(ex, "execute sql error:{rawSql}, paramters:{parameters}", sql, parameters); + throw; + } + } + + private static void OpenConnection(IDbConnection dbConnection) + { + switch (dbConnection.State) + { + case ConnectionState.Closed: + dbConnection.Open(); + break; + case ConnectionState.Broken: + dbConnection.Close(); + dbConnection.Open(); + break; + } + } + + public static List Query(this IDbConnection dbConnection, string sql, IDataParameter[]? @parameters, Func parse) + { + using var cmd = dbConnection.CreateCommand(); + cmd.CommandText = sql; + if (@parameters != null && @parameters.Any()) + foreach (var p in @parameters) + cmd.Parameters.Add(p); + OpenConnection(dbConnection); + try + { + using var reader = cmd.ExecuteReader(); + if (reader == null) + return new List(); + var list = new List(); + while (reader.NextResult()) + while (reader.Read()) + { + list.Add(parse.Invoke(reader)); + } + + return list; + } + catch (Exception ex) + { + ServiceExtensitions.Logger?.LogError(ex, "query sql error:{rawSql}, paramters:{parameters}", sql, parameters); + throw; + } + } + + public static MappingResponseDto ConvertToMapping(IDataReader reader) + { + return new MappingResponseDto + { + Name = reader[0].ToString()!, + Type = "string" + }; + } + + public static TraceResponseDto ConvertTraceDto(IDataReader reader) + { + var startTime = Convert.ToDateTime(reader["Timestamp"]); + long ns = Convert.ToInt64(reader["Duration"]); + string resource = reader["Resources"].ToString()!, spans = reader["Spans"].ToString()!; + var result = new TraceResponseDto + { + TraceId = reader["TraceId"].ToString()!, + EndTimestamp = startTime.AddMilliseconds(ns / 1e6), + Kind = reader["SpanKind"].ToString()!, + Name = reader["SpanName"].ToString()!, + ParentSpanId = reader["ParentSpanId"].ToString()!, + SpanId = reader["SpanId"].ToString()!, + Timestamp = startTime + }; + if (!string.IsNullOrEmpty(resource)) + result.Resource = JsonSerializer.Deserialize>(resource)!; + if (!string.IsNullOrEmpty(spans)) + result.Attributes = JsonSerializer.Deserialize>(spans)!; + return result; + } + + public static LogResponseDto ConvertLogDto(IDataReader reader) + { + string resource = reader["Resources"].ToString()!, logs = reader["Logs"].ToString()!; + var result = new LogResponseDto + { + TraceId = reader["TraceId"].ToString()!, + Body = reader["Body"].ToString()!, + SeverityNumber = Convert.ToInt32(reader["SeverityNumber"]), + SeverityText = reader["SeverityText"].ToString()!, + TraceFlags = Convert.ToInt32(reader["TraceFlags"]), + SpanId = reader["SpanId"].ToString()!, + Timestamp = Convert.ToDateTime(reader["Timestamp"]), + }; + if (!string.IsNullOrEmpty(resource)) + result.Resource = JsonSerializer.Deserialize>(resource)!; + if (!string.IsNullOrEmpty(logs)) + result.Attributes = JsonSerializer.Deserialize>(logs)!; + return result; + } + + public static object AggregationQuery(this IDbConnection dbConnection, SimpleAggregateRequestDto requestDto, bool isLog = true) + { + var sql = new StringBuilder("select "); + var append = new StringBuilder(); + var appendWhere = new StringBuilder(); + var name = GetName(requestDto.Name, isLog); + AppendAggtype(requestDto, sql, append, name, out var isScalar); + sql.AppendFormat(" from {0} ", isLog ? MasaStackClickhouseConnection.LogTable : MasaStackClickhouseConnection.TraceTable); + var (where, @paremeters, _) = AppendWhere(requestDto, !isLog); + sql.Append($" where {appendWhere} {where}"); + sql.Append(append); + var paramArray = @paremeters?.ToArray()!; + + if (isScalar) + { + return dbConnection.ExecuteScalar(sql.ToString(), paramArray)!; + } + else + { + return AggTerm(dbConnection, sql.ToString(), paramArray, requestDto.Type, requestDto.AllValue); + } + } + + private static object AggTerm(IDbConnection dbConnection, string sql, IDataParameter[] paramArray, AggregateTypes aggregateTypes, bool isAllValue) + { + var result = dbConnection.Query(sql, paramArray, reader => + { + if (aggregateTypes == AggregateTypes.GroupBy) + { + if (isAllValue) + return KeyValuePair.Create(reader[0].ToString(), Convert.ToInt64(reader[1])); + else + return reader[0]; + } + else + { + var time = Convert.ToDateTime(reader[0]); + var timestamp = new DateTimeOffset(time).ToUnixTimeMilliseconds(); + return KeyValuePair.Create(timestamp, Convert.ToInt64(reader[1])); + } + }); + if (aggregateTypes == AggregateTypes.GroupBy) + { + if (isAllValue) + return result.Select(item => (KeyValuePair)item).ToList(); + else + return result.Select(item => item.ToString()).ToList(); + } + return result; + } + + private static void AppendAggtype(SimpleAggregateRequestDto requestDto, StringBuilder sql, StringBuilder append, string name, out bool isScalar) + { + isScalar = false; + switch (requestDto.Type) + { + case AggregateTypes.Avg: + sql.Append($"AVG({name}) as a"); + isScalar = true; + break; + case AggregateTypes.Count: + sql.Append($"Count({name}) as a"); + isScalar = true; + break; + case AggregateTypes.DistinctCount: + sql.Append($"Count(DISTINCT {name}) as a"); + isScalar = true; + break; + case AggregateTypes.Sum: + sql.Append($"SUM({name}) as a"); + isScalar = true; + break; + case AggregateTypes.GroupBy: + sql.Append($"{name} as a,Count({name}) as b"); + append.Append($" and a<>'' Group By a order by b desc"); + break; + case AggregateTypes.DateHistogram: + sql.Append($"toStartOfInterval({name}, INTERVAL {ConvertInterval(requestDto.Interval)} minute ) as `time`,count() as `count`"); + append.Append($" Group by `time` order by `time`"); + break; + } + } + + private static string GetName(string name, bool isLog) + { + if (name.Equals("@timestamp", StringComparison.CurrentCultureIgnoreCase)) + return "Timestamp"; + + if (!isLog && name.Equals("kind", StringComparison.InvariantCultureIgnoreCase)) + return "SpanKind"; + + if (name.StartsWith("resource.", StringComparison.CurrentCultureIgnoreCase)) + return GetResourceName(name); + + if (name.StartsWith("attributes.", StringComparison.CurrentCultureIgnoreCase)) + return GetAttributeName(name, isLog); + + return name; + } + + private static string GetResourceName(string name) + { + var field = name[("resource.".Length)..]; + if (field.Equals("service.name", StringComparison.CurrentCultureIgnoreCase)) + return "ServiceName"; + + if (field.Equals("service.namespace", StringComparison.CurrentCultureIgnoreCase) || field.Equals("service.instance.id", StringComparison.CurrentCultureIgnoreCase)) + return $"Resource.{field}"; + + return $"ResourceAttributesValues[indexOf(ResourceAttributesKeys,'{field}')]"; + } + + private static string GetAttributeName(string name, bool isLog) + { + var pre = isLog ? "Log" : "Span"; + var field = name[("attributes.".Length)..]; + if (isLog && (field.Equals("exception.message", StringComparison.CurrentCultureIgnoreCase))) + return $"Attributes.{field}"; + + if (!isLog && (field.Equals("http.status_code", StringComparison.CurrentCultureIgnoreCase) + || field.Equals("http.request_content_body", StringComparison.CurrentCultureIgnoreCase) + || field.Equals("http.response_content_body", StringComparison.CurrentCultureIgnoreCase) + || field.Equals("exception.message", StringComparison.CurrentCultureIgnoreCase)) + ) + return $"Attributes.{field}"; + + return $"{pre}AttributesValues[indexOf({pre}AttributesKeys,'{field}')]"; + } + + public static int ConvertInterval(string s) + { + var unit = Regex.Replace(s, @"\d+", "", RegexOptions.IgnoreCase, TimeSpan.FromSeconds(5)); + int t = 1; + switch (unit) + { + case "s": + t = 1; + break; + case "m": + t = 60; + break; + case "h": + t = 3600; + break; + case "d": + t = 3600 * 24; + break; + case "w": + t = 3600 * 24 * 7; + break; + case "month": + t = 3600 * 24 * 30; + break; + } + var num = Convert.ToInt64(s.Replace(unit, "")); + num *= t; + if (num - 60 < 0) + return 1; + return (int)(num / 60); + } + + public static string GetMaxDelayTraceId(this IDbConnection dbConnection, BaseRequestDto requestDto) + { + var (where, parameters, _) = AppendWhere(requestDto); + var text = $"select * from( TraceId from {MasaStackClickhouseConnection.TraceTable} where {where} order by Duration desc) as t limit 1"; + return dbConnection.ExecuteScalar(text, parameters?.ToArray())?.ToString()!; + } +} diff --git a/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Extensions/ServiceExtensitions.cs b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Extensions/ServiceExtensitions.cs new file mode 100644 index 000000000..5fdb0ae7a --- /dev/null +++ b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Extensions/ServiceExtensitions.cs @@ -0,0 +1,252 @@ +// Copyright (c) MASA Stack All rights reserved. +// Licensed under the MIT License. See LICENSE.txt in the project root for license information. + +namespace Microsoft.Extensions.DependencyInjection; + +public static class ServiceExtensitions +{ + internal static ILogger? Logger { get; private set; } + + public static IServiceCollection AddMASAStackClickhouse(this IServiceCollection services, string connectionStr, string logTable, string traceTable, string? logSourceTable = null, string? traceSourceTable = null) + { + services.AddScoped(services => new MasaStackClickhouseConnection(connectionStr, logTable, traceTable, logSourceTable, traceSourceTable)) + .AddScoped() + .AddScoped(); + Init(services); + return services; + } + + private static void Init(IServiceCollection services, bool createTable = true) + { + var serviceProvider = services.BuildServiceProvider(); + var logfactory = serviceProvider.GetService(); + Logger = logfactory?.CreateLogger("Masa.Contrib.StackSdks.Tsc.Clickhouse"); + var connection = serviceProvider.GetRequiredService(); + if (createTable) + InitTable(connection); + InitMappingTable(connection); + } + + private static void InitTable(MasaStackClickhouseConnection connection) + { + var database = connection.ConnectionSettings?.Database; + database ??= new ClickHouseConnectionSettings(connection.ConnectionString).Database; + + if (Convert.ToInt32(connection.ExecuteScalar($"select * from system.tables where database ='{database}' and name in ['{MasaStackClickhouseConnection.TraceTable}','{MasaStackClickhouseConnection.LogTable}']")) > 0) + return; + + var createTableSqls = new string[]{ + + @$"CREATE TABLE {MasaStackClickhouseConnection.LogTable} +( + `Timestamp` DateTime64(9) CODEC(Delta(8), ZSTD(1)), + `TraceId` String CODEC(ZSTD(1)), + `SpanId` String CODEC(ZSTD(1)), + `TraceFlags` UInt32 CODEC(ZSTD(1)), + `SeverityText` LowCardinality(String) CODEC(ZSTD(1)), + `SeverityNumber` Int32 CODEC(ZSTD(1)), + `ServiceName` LowCardinality(String) CODEC(ZSTD(1)), + `Body` String CODEC(ZSTD(1)), + `ResourceSchemaUrl` String CODEC(ZSTD(1)), + `Resources` String CODEC(ZSTD(1)), + `ScopeSchemaUrl` String CODEC(ZSTD(1)), + `ScopeName` String CODEC(ZSTD(1)), + `ScopeVersion` String CODEC(ZSTD(1)), + `Scopes` String CODEC(ZSTD(1)), + `Logs` String CODEC(ZSTD(1)), + + `Resource.service.namespace` String CODEC(ZSTD(1)), + `Resource.service.version` String CODEC(ZSTD(1)), + `Resource.service.instance.id` String CODEC(ZSTD(1)), + + `Attributes.taskId` String CODEC(ZSTD(1)), + `Attributes.exception.message` String CODEC(ZSTD(1)), + + ResourceAttributesKeys Array(String) CODEC(ZSTD(1)), + ResourceAttributesValues Array(String) CODEC(ZSTD(1)), + LogAttributesKeys Array(String) CODEC(ZSTD(1)), + LogAttributesValues Array(String) CODEC(ZSTD(1)), + + INDEX idx_log_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_log_servicename ServiceName TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_log_serviceinstanceid `Resource.service.instance.id` TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_log_severitytext SeverityText TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_log_taskid `Attributes.taskId` TYPE bloom_filter(0.001) GRANULARITY 1, + + INDEX idx_string_body Body TYPE tokenbf_v1(30720, 2, 0) GRANULARITY 1, + INDEX idx_string_exceptionmessage Attributes.exception.message TYPE tokenbf_v1(30720, 2, 0) GRANULARITY 1 +) +ENGINE = MergeTree +PARTITION BY toDate(Timestamp) +ORDER BY ( + Timestamp, + `Resource.service.namespace`, + ServiceName + ) +TTL toDateTime(Timestamp) + toIntervalDay(30) +SETTINGS index_granularity = 8192, + ttl_only_drop_parts = 1; +", +@$"CREATE TABLE {MasaStackClickhouseConnection.TraceTable} +( + `Timestamp` DateTime64(9) CODEC(Delta(8), ZSTD(1)), + `TraceId` String CODEC(ZSTD(1)), + `SpanId` String CODEC(ZSTD(1)), + `ParentSpanId` String CODEC(ZSTD(1)), + `TraceState` String CODEC(ZSTD(1)), + `SpanName` LowCardinality(String) CODEC(ZSTD(1)), + `SpanKind` LowCardinality(String) CODEC(ZSTD(1)), + `ServiceName` LowCardinality(String) CODEC(ZSTD(1)), + `Resources` String CODEC(ZSTD(1)), + `ScopeName` String CODEC(ZSTD(1)), + `ScopeVersion` String CODEC(ZSTD(1)), + `Spans` String CODEC(ZSTD(1)), + `Duration` Int64 CODEC(ZSTD(1)), + `StatusCode` LowCardinality(String) CODEC(ZSTD(1)), + `StatusMessage` String CODEC(ZSTD(1)), + `Events.Timestamp` Array(DateTime64(9)) CODEC(ZSTD(1)), + `Events.Name` Array(LowCardinality(String)) CODEC(ZSTD(1)), + `Events.Attributes` Array(Map(LowCardinality(String), String)) CODEC(ZSTD(1)), + `Links.TraceId` Array(String) CODEC(ZSTD(1)), + `Links.SpanId` Array(String) CODEC(ZSTD(1)), + `Links.TraceState` Array(String) CODEC(ZSTD(1)), + `Links.Attributes` Array(Map(LowCardinality(String), String)) CODEC(ZSTD(1)), + + `Resource.service.namespace` String CODEC(ZSTD(1)), + `Resource.service.version` String CODEC(ZSTD(1)), + `Resource.service.instance.id` String CODEC(ZSTD(1)), + + `Attributes.http.status_code` String CODEC(ZSTD(1)), + `Attributes.http.response_content_body` String CODEC(ZSTD(1)), + `Attributes.http.request_content_body` String CODEC(ZSTD(1)), + `Attributes.http.target` String CODEC(ZSTD(1)), + `Attributes.exception.message` String CODEC(ZSTD(1)), + + `ResourceAttributesKeys` Array(String) CODEC(ZSTD(1)), + `ResourceAttributesValues` Array(String) CODEC(ZSTD(1)), + `SpanAttributesKeys` Array(String) CODEC(ZSTD(1)), + `SpanAttributesValues` Array(String) CODEC(ZSTD(1)), + + INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_trace_servicename ServiceName TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_trace_servicenamespace Resource.service.namespace TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_trace_serviceinstanceid Resource.service.instance.id TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_trace_statuscode Attributes.http.status_code TYPE bloom_filter(0.001) GRANULARITY 1, + + INDEX idx_string_requestbody Attributes.http.request_content_body TYPE tokenbf_v1(30720, 2, 0) GRANULARITY 1, + INDEX idx_string_responsebody Attributes.http.response_content_body TYPE tokenbf_v1(30720, 2, 0) GRANULARITY 1, + INDEX idx_string_exceptionmessage Attributes.exception.message TYPE tokenbf_v1(30720, 2, 0) GRANULARITY 1 +) +ENGINE = MergeTree +PARTITION BY toDate(Timestamp) +ORDER BY ( + Timestamp, + Resource.service.namespace, + ServiceName + ) +TTL toDateTime(Timestamp) + toIntervalDay(30) +SETTINGS index_granularity = 8192, + ttl_only_drop_parts = 1; +", +$@"CREATE MATERIALIZED VIEW {MasaStackClickhouseConnection.LogTable}_v TO {MasaStackClickhouseConnection.LogTable} +AS +SELECT +Timestamp,TraceId,SpanId,TraceFlags,SeverityText,SeverityNumber,ServiceName,Body,ResourceSchemaUrl,toJSONString(ResourceAttributes) as Resources, +ScopeSchemaUrl,ScopeName,ScopeVersion,toJSONString(ScopeAttributes) as Scopes,toJSONString(LogAttributes) as Logs, +ResourceAttributes['service.namespace'] as `Resource.service.namespace`,ResourceAttributes['service.version'] as `Resource.service.version`, +ResourceAttributes['service.instance.id'] as `Resource.service.instance.id`, +LogAttributes['TaskId'] as `Attributes.taskId`,LogAttributes['exception.message'] as `Attributes.exception.message`, +mapKeys(ResourceAttributes) as ResourceAttributesKeys,mapValues(ResourceAttributes) as ResourceAttributesValues, +mapKeys(LogAttributes) as LogAttributesKeys,mapValues(LogAttributes) as LogAttributesValues +FROM {MasaStackClickhouseConnection.LogSourceTable}; +", +$@"CREATE MATERIALIZED VIEW {MasaStackClickhouseConnection.TraceTable}_v TO {MasaStackClickhouseConnection.TraceTable} +AS +SELECT + Timestamp,TraceId,SpanId,ParentSpanId,TraceState,SpanName,SpanKind,ServiceName,toJSONString(ResourceAttributes) AS Resources, + ScopeName,ScopeVersion,toJSONString(SpanAttributes) AS Spans, + Duration,StatusCode,StatusMessage,Events.Timestamp,Events.Name,Events.Attributes, + Links.TraceId,Links.SpanId,Links.TraceState,Links.Attributes, + + ResourceAttributes['service.namespace'] as `Resource.service.namespace`,ResourceAttributes['service.version'] as `Resource.service.version`, + ResourceAttributes['service.instance.id'] as `Resource.service.instance.id`, + + SpanAttributes['http.status_code'] as `Attributes.http.status_code`, + SpanAttributes['http.response_content_body'] as `Attributes.http.response_content_body`, + SpanAttributes['http.request_content_body'] as `Attributes.http.request_content_body`, + SpanAttributes['http.target'] as `Attributes.http.target`, + SpanAttributes['exception.message'] as `Attributes.exception.message`, + + mapKeys(ResourceAttributes) AS ResourceAttributesKeys, + mapValues(ResourceAttributes) AS ResourceAttributesValues, + mapKeys(SpanAttributes) AS SpanAttributesKeys, + mapValues(SpanAttributes) AS SpanAttributesValues +FROM {MasaStackClickhouseConnection.TraceSourceTable}; +" }; + + foreach (var sql in createTableSqls) + { + ExecuteSql(connection, sql); + } + } + + private static void InitMappingTable(MasaStackClickhouseConnection connection) + { + var database = connection.ConnectionSettings?.Database; + database ??= new ClickHouseConnectionSettings(connection.ConnectionString).Database; + + var list = MasaStackClickhouseConnection.MappingTable.Split('.'); + var mappingTable = list[list.Length - 1]; + if (Convert.ToInt32(connection.ExecuteScalar($"select count() from system.tables where database ='{database}' and name in ['{mappingTable}']")) > 0) + return; + + var initSqls = new string[]{ +$@" +CREATE TABLE {database}.otel_mapping +( + `Name` Array(String), + `Type` String +) +ENGINE = MergeTree +ORDER BY Name +SETTINGS index_granularity = 8192;", +@$"CREATE MATERIALIZED VIEW {database}.v_otel_traces_attribute_mapping to {MasaStackClickhouseConnection.MappingTable} +as +select DISTINCT arraySort(mapKeys(SpanAttributes)) as Name, 'trace_attributes' as Type +from {MasaStackClickhouseConnection.TraceTable}", +$@"CREATE MATERIALIZED VIEW {database}.v_otel_traces_resource_mapping to {MasaStackClickhouseConnection.MappingTable} +as +select DISTINCT arraySort(mapKeys(ResourceAttributes)) as Name, 'trace_resource' as Type +from {MasaStackClickhouseConnection.TraceTable}", +$@"CREATE MATERIALIZED VIEW {database}.v_otel_logs_attribute_mapping to {MasaStackClickhouseConnection.MappingTable} +as +select DISTINCT arraySort(mapKeys(LogAttributes)) as Name, 'log_attributes' as Type +from {MasaStackClickhouseConnection.LogTable}", +$@"CREATE MATERIALIZED VIEW {database}.v_otel_logs_resource_mapping to {MasaStackClickhouseConnection.MappingTable} +as +select DISTINCT arraySort(mapKeys(ResourceAttributes)) as Name, 'log_resource' as Type +from {MasaStackClickhouseConnection.LogTable}", +$@"insert into {MasaStackClickhouseConnection.MappingTable} +values (['Timestamp','TraceId','SpanId','TraceFlag','SeverityText','SeverityNumber','Body'],'log_basic'), +(['Timestamp','TraceId','SpanId','ParentSpanId','TraceState','SpanKind','Duration'],'trace_basic'); +" }; + foreach (var sql in initSqls) + ExecuteSql(connection, sql); + } + + private static void ExecuteSql(MasaStackClickhouseConnection connection, string sql) + { + using var cmd = connection.CreateCommand(); + if (connection.State != ConnectionState.Open) + connection.Open(); + cmd.CommandText = sql; + try + { + cmd.ExecuteNonQuery(); + } + catch (Exception ex) + { + Logger?.LogError(ex, "ExecuteSql {rawSql} error", sql); + } + } +} diff --git a/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/LogService.cs b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/LogService.cs new file mode 100644 index 000000000..672a45da2 --- /dev/null +++ b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/LogService.cs @@ -0,0 +1,28 @@ +// Copyright (c) MASA Stack All rights reserved. +// Licensed under the MIT License. See LICENSE.txt in the project root for license information. + +namespace Masa.Contrib.StackSdks.Tsc.Clickhouse; + +internal class LogService : ILogService +{ + private readonly IDbConnection _dbConnection; + public LogService(MasaStackClickhouseConnection connection) + { + _dbConnection = connection; + } + + public Task AggregateAsync(SimpleAggregateRequestDto query) + { + return Task.FromResult(_dbConnection.AggregationQuery(query)); + } + + public Task> ListAsync(BaseRequestDto query) + { + return Task.FromResult(_dbConnection.QueryLog(query)); + } + + public Task> GetMappingAsync() + { + return Task.FromResult(_dbConnection.GetMapping(true).AsEnumerable()); + } +} diff --git a/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Masa.Contrib.StackSdks.Tsc.Clickhouse.csproj b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Masa.Contrib.StackSdks.Tsc.Clickhouse.csproj new file mode 100644 index 000000000..357f1b3ef --- /dev/null +++ b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Masa.Contrib.StackSdks.Tsc.Clickhouse.csproj @@ -0,0 +1,20 @@ + + + + net6.0 + enable + enable + + + + + + + + + + + + + + diff --git a/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Model/MASAStackClickhouseConnection.cs b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Model/MASAStackClickhouseConnection.cs new file mode 100644 index 000000000..3a56c084b --- /dev/null +++ b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Model/MASAStackClickhouseConnection.cs @@ -0,0 +1,44 @@ +// Copyright (c) MASA Stack All rights reserved. +// Licensed under the MIT License. See LICENSE.txt in the project root for license information. + +namespace Masa.Contrib.StackSdks.Tsc.Clickhouse; + +internal class MasaStackClickhouseConnection : ClickHouseConnection +{ + public static string LogSourceTable { get; private set; } + + public static string TraceSourceTable { get; private set; } + + public static string LogTable { get; private set; } + + public static string TraceTable { get; private set; } + + public static string MappingTable { get; private set; } + + public MasaStackClickhouseConnection(string connection, string logTable, string traceTable, string? logSourceTable = null, string? traceSourceTable = null) + { + ArgumentNullException.ThrowIfNull(connection); + ArgumentNullException.ThrowIfNull(logTable); + ArgumentNullException.ThrowIfNull(traceTable); + ConnectionString = connection; + logSourceTable ??= "otel_logs"; + traceSourceTable ??= "otel_traces"; + + if (!string.IsNullOrEmpty(ConnectionSettings.Database)) + { + LogTable = $"{ConnectionSettings.Database}.{logTable}"; + TraceTable = $"{ConnectionSettings.Database}.{traceTable}"; + TraceSourceTable = $"{ConnectionSettings.Database}.{traceSourceTable}"; + LogSourceTable = $"{ConnectionSettings.Database}.{logSourceTable}"; + MappingTable = $"{ConnectionSettings.Database}.otel_mapping"; + } + else + { + LogTable = logTable; + TraceTable = traceTable; + TraceSourceTable = traceSourceTable; + LogSourceTable = logSourceTable; + MappingTable = "otel_mapping"; + } + } +} diff --git a/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Properties/PublishProfiles/FolderProfile.pubxml b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Properties/PublishProfiles/FolderProfile.pubxml new file mode 100644 index 000000000..154e03532 --- /dev/null +++ b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/Properties/PublishProfiles/FolderProfile.pubxml @@ -0,0 +1,13 @@ + + + + + Release + Any CPU + bin\Release\net6.0\publish\ + FileSystem + <_TargetId>Folder + + \ No newline at end of file diff --git a/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/TraceService.cs b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/TraceService.cs new file mode 100644 index 000000000..8742c5d2e --- /dev/null +++ b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/TraceService.cs @@ -0,0 +1,39 @@ +// Copyright (c) MASA Stack All rights reserved. +// Licensed under the MIT License. See LICENSE.txt in the project root for license information. + +namespace Masa.Contrib.StackSdks.Tsc.Clickhouse; + +internal class TraceService : ITraceService +{ + private readonly IDbConnection _dbConnection; + + public TraceService(MasaStackClickhouseConnection connection) + { + _dbConnection = connection; + } + + public Task AggregateAsync(SimpleAggregateRequestDto query) + { + return Task.FromResult(_dbConnection.AggregationQuery(query, false)); + } + + public Task> GetAsync(string traceId) + { + return Task.FromResult(_dbConnection.GetTraceByTraceId(traceId).AsEnumerable()); + } + + public Task GetMaxDelayTraceIdAsync(BaseRequestDto query) + { + return Task.FromResult(_dbConnection.GetMaxDelayTraceId(query)); + } + + public Task> ListAsync(BaseRequestDto query) + { + return Task.FromResult(_dbConnection.QueryTrace(query)); + } + + public Task> ScrollAsync(BaseRequestDto query) + { + return Task.FromResult(_dbConnection.QueryTrace(query)); + } +} diff --git a/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/_Imports.cs b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/_Imports.cs new file mode 100644 index 000000000..fa2b94e53 --- /dev/null +++ b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Clickhouse/_Imports.cs @@ -0,0 +1,18 @@ +// Copyright (c) MASA Stack All rights reserved. +// Licensed under the MIT License. See LICENSE.txt in the project root for license information. + +global using ClickHouse.Ado; +global using Masa.BuildingBlocks.StackSdks.Tsc.Contracts.Log; +global using Masa.BuildingBlocks.StackSdks.Tsc.Contracts.Model; +global using Masa.BuildingBlocks.StackSdks.Tsc.Contracts.Model.Aggregate; +global using Masa.BuildingBlocks.StackSdks.Tsc.Contracts.Service; +global using Masa.BuildingBlocks.StackSdks.Tsc.Contracts.Trace; +global using Masa.Contrib.StackSdks.Tsc.Clickhouse; +global using Masa.Utils.Models; +global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.Logging; +global using System.Data; +global using System.Data.Common; +global using System.Text; +global using System.Text.Json; +global using System.Text.RegularExpressions; diff --git a/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Elasticsearch/Extenistions/IElasticClientExtenstion.cs b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Elasticsearch/Extenistions/IElasticClientExtenstion.cs index 87bf59e9c..66f112672 100644 --- a/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Elasticsearch/Extenistions/IElasticClientExtenstion.cs +++ b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Elasticsearch/Extenistions/IElasticClientExtenstion.cs @@ -175,6 +175,22 @@ await client.SearchAsync(ElasticConstant.Trace.IndexName, query, (response, q) => result = SetAggregationResult(response, q)); return result; } + + public static async Task GetMaxDelayTraceIdAsync(this IElasticClient client, BaseRequestDto query) + { + string traceId = default!; + await client.SearchAsync(ElasticConstant.Trace.IndexName, query, + (SearchDescriptor searchDescriptor) => searchDescriptor.AddCondition(query, false) + .Sort(sort => sort.Script(script => script.Order(SortOrder.Descending).Script(s => s.Source("doc['EndTimestamp'].value.toEpochSecond()-doc['@timestamp'].value.toEpochSecond()")).Type("number"))) + .Size(1), + (response, q) => + { + var result = SetTraceResult(response); + traceId = result.Result.FirstOrDefault()?.TraceId!; + }); + + return traceId; + } #endregion #region set condition @@ -433,7 +449,7 @@ private static object SetAggregationResult(ISearchResponse response, Sim } else if (item is BucketAggregate bucketAggregate) { - return GetBucketValue(bucketAggregate, aggModel.Type); + return GetBucketValue(bucketAggregate, aggModel.Type, aggModel.AllValue); } } return default!; @@ -444,10 +460,16 @@ private static double GetDouble(ValueAggregate value) return value.Value ?? default; } - private static object GetBucketValue(BucketAggregate value, AggregateTypes type) + private static object GetBucketValue(BucketAggregate value, AggregateTypes type, bool isAll) { if (type == AggregateTypes.GroupBy) - return value.Items.Select(it => ((KeyedBucket)it).Key.ToString()).ToList(); + { + if (isAll) + return value.Items.Select(it => KeyValuePair.Create(((KeyedBucket)it).Key.ToString(), ((KeyedBucket)it).DocCount)).ToList(); + else + return value.Items.Select(it => ((KeyedBucket)it).Key.ToString()).ToList(); + } + else if (type == AggregateTypes.DateHistogram) { var result = new List>(); diff --git a/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Elasticsearch/TraceService.cs b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Elasticsearch/TraceService.cs index b4a34f189..c56164656 100644 --- a/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Elasticsearch/TraceService.cs +++ b/src/Contrib/StackSdks/Masa.Contrib.StackSdks.Tsc.Elasticsearch/TraceService.cs @@ -22,6 +22,11 @@ public async Task> GetAsync(string traceId) return (await _client.SearchTraceAsync(new BaseRequestDto { TraceId = traceId, Page = 1, PageSize = ElasticConstant.MaxRecordCount - 1 })).Result; } + public Task GetMaxDelayTraceIdAsync(BaseRequestDto query) + { + return _client.GetMaxDelayTraceIdAsync(query); + } + public Task> ListAsync(BaseRequestDto query) { return _client.SearchTraceAsync(query); diff --git a/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Common.cs b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Common.cs new file mode 100644 index 000000000..09fee8546 --- /dev/null +++ b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Common.cs @@ -0,0 +1,44 @@ +// Copyright (c) MASA Stack All rights reserved. +// Licensed under the MIT License. See LICENSE.txt in the project root for license information. + +namespace Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests; + +internal class Common +{ + public static void InitTable(bool isLog) + { + var name = isLog ? "log" : "trace"; + using var connection = new ClickHouseConnection(Consts.ConnectionString); + connection.Open(); + using var cmd = connection.CreateCommand(); + var path = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Data/otel_{name}.txt"); + using (var reader = new StreamReader(path)) + { + var sql = reader.ReadToEnd(); + cmd.CommandText = sql; + cmd.ExecuteNonQuery(); + } + path = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Data/otel_{name}_data.txt"); + using (var dataReader = new StreamReader(path)) + { + var sql = dataReader.ReadToEnd(); + cmd.CommandText = sql; + cmd.ExecuteNonQuery(); + } + } + + public static void InitTableData(bool isLog) + { + var name = isLog ? "log" : "trace"; + using var connection = new ClickHouseConnection(Consts.ConnectionString); + connection.Open(); + using var cmd = connection.CreateCommand(); + var path = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Data/otel_{name}_data.txt"); + using (var dataReader = new StreamReader(path)) + { + var sql = dataReader.ReadToEnd(); + cmd.CommandText = sql; + cmd.ExecuteNonQuery(); + } + } +} diff --git a/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Consts.cs b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Consts.cs new file mode 100644 index 000000000..738be3cc6 --- /dev/null +++ b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Consts.cs @@ -0,0 +1,10 @@ +// Copyright (c) MASA Stack All rights reserved. +// Licensed under the MIT License. See LICENSE.txt in the project root for license information. + +namespace Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests; + +internal class Consts +{ + public const string ConnectionString = "Compress=True;CheckCompressedHash=False;Compressor=lz4;SocketTimeout=5000;Host=localhost;Port=9000;Database=default;User=default"; + //public const string ConnectionString = "Compress=True;CheckCompressedHash=False;Compressor=lz4;SocketTimeout=5000;Host=192.168.51.234;Port=19003;Database=default;User=default"; +} diff --git a/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Data/otel_log.txt b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Data/otel_log.txt new file mode 100644 index 000000000..ce5111e75 --- /dev/null +++ b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Data/otel_log.txt @@ -0,0 +1,57 @@ +CREATE TABLE otel_logs +( + `Timestamp` DateTime64(9) CODEC(Delta(8), ZSTD(1)), + + `TraceId` String CODEC(ZSTD(1)), + + `SpanId` String CODEC(ZSTD(1)), + + `TraceFlags` UInt32 CODEC(ZSTD(1)), + + `SeverityText` LowCardinality(String) CODEC(ZSTD(1)), + + `SeverityNumber` Int32 CODEC(ZSTD(1)), + + `ServiceName` LowCardinality(String) CODEC(ZSTD(1)), + + `Body` String CODEC(ZSTD(1)), + + `ResourceSchemaUrl` String CODEC(ZSTD(1)), + + `ResourceAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + + `ScopeSchemaUrl` String CODEC(ZSTD(1)), + + `ScopeName` String CODEC(ZSTD(1)), + + `ScopeVersion` String CODEC(ZSTD(1)), + + `ScopeAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + + `LogAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + + INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, + + INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + + INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + + INDEX idx_scope_attr_key mapKeys(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + + INDEX idx_scope_attr_value mapValues(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + + INDEX idx_log_attr_key mapKeys(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + + INDEX idx_log_attr_value mapValues(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + + INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1 +) +ENGINE = MergeTree +PARTITION BY toDate(Timestamp) +ORDER BY (ServiceName, + SeverityText, + toUnixTimestamp(Timestamp), + TraceId) +TTL toDateTime(Timestamp) + toIntervalDay(30) +SETTINGS index_granularity = 8192, + ttl_only_drop_parts = 1; \ No newline at end of file diff --git a/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Data/otel_log_data.txt b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Data/otel_log_data.txt new file mode 100644 index 000000000..a8ead19ac --- /dev/null +++ b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Data/otel_log_data.txt @@ -0,0 +1,5 @@ +INSERT INTO otel_logs (`Timestamp`, TraceId, SpanId, TraceFlags, SeverityText, SeverityNumber, ServiceName, Body, ResourceSchemaUrl, ResourceAttributes, ScopeSchemaUrl, ScopeName, ScopeVersion, ScopeAttributes, LogAttributes) +VALUES +('2023-11-02 09:59:21.282', '3a749e0df4bde3713ea47ed0b8efe83f', '97ef48656ca34d09', 1, 'Information', 9, 'service', 'DrinkingCubeProMqOperation received:2120872750', '', '{''service.namespace'':''Staging'',''service.version'':''1.0.0'',''service.instance.id'':''f14beab6-1632-4262-8434-f545e5892ae4'',''telemetry.sdk.name'':''opentelemetry'',''telemetry.sdk.language'':''dotnet'',''telemetry.sdk.version'':''1.5.1'',''service.layer'':''service'',''service.name'':''service''}', '', '', '', '{}', '{''dotnet.ilogger.category'':''Service.DrinkingCubePro.IoTDrinkingCubeProSubService'',''{OriginalFormat}'':''DrinkingCubeProMqOperation received:2120872750'',''SpanId'':''97ef48656ca34d09'',''TraceId'':''3a749e0df4bde3713ea47ed0b8efe83f'',''ParentId'':''0000000000000000''}'), +('2023-11-02 09:59:20.434', 'c119aa7717eea47be25332c82ceeff2e', '2158ac1037130e81', 1, 'Information', 9, 'service', 'DrinkingCubeProMqOperation received:2102801854', '', '{''service.name'':''service'',''service.namespace'':''Test'',''service.version'':''1.0.0'',''service.instance.id'':''c6bcf682-8028-44bd-92d2-188b99658ce3'',''telemetry.sdk.name'':''opentelemetry'',''telemetry.sdk.language'':''dotnet'',''telemetry.sdk.version'':''1.5.1'',''service.layer'':''service''}', '', '', '', '{}', '{''TraceId'':''c119aa7717eea47be25332c82ceeff2e'',''ParentId'':''0000000000000000'',''dotnet.ilogger.category'':''Device.Service.DrinkingCubePro.IoTDrinkingCubeProSubService'',''{OriginalFormat}'':''DrinkingCubeProMqOperation received:2102801854'',''SpanId'':''2158ac1037130e81''}'), +('2023-11-02 09:59:20.375', '9f6eb756b0368219564bbfad593477f7', '6717dd37c6bbe77a', 1, 'Information', 9, 'service', 'KafKa写入{"type":2,"dataType":1,"content":"{\"appProperty\":{\"dataTimestamp\":1698919160336,\"datastream\":\"dp_upload\",\"deviceId\":\"10001\"},\"body\":{\"common\":{\"iot_uid\":\"100008\",\"mode\":\"0\",\"rssi\":\"10,99\",\"serial\":100001},\"data\":\"hVRDJl/wyQHEkxJ5/xNQp/N7aJTowIpyxtxzUvDABVwMPki1u7b85soGhmGBDoi0W+ySz4J8XU1jRzIbrprEcG0dGA9xoobdFL4N0rL+GiI0xxHxSqr/r1xjnscD8P1VR64aJYq9dBXsN37n/R3F+BYkWXagAczqmLUQvDe56ZPQEmnAHM1pL4BanNw9vAu5aShEHg2TR2z1WpG2NOrAug==\"},\"sysProperty\":{\"messageType\":\"deviceDatapoint\",\"productId\":\"300888\"},\"PubTime\":1698919160408}","deviceInfoId":"8F5D73CD-87A4-4128-BD7C-E0ACB9CCAA00","serial":"1684450839","result":"","decodeContent":"{\"curState\":\"2\",\"totalWater\":\"2272\",\"WaterFlow\":\"261\",\"life1\":\"100000\",\"life2\":\"72126\",\"life4\"\"life5\"\"max_life1\":\"100000\",\"max_life2\":\"100000\",\"max_life3\"\"max_life4\"\"max_life5\"\"active1\":\"1267478839\",\"active2\":\"68360197\",\"active3\"\"active4\"\"active5\"\"inWater\":\"105\",\"pureWater\":\"3\",\"alarm\":\"1\",\"Lock\":\"2\",\"inWaterOffset\":\"0\",\"pureWaterOffset\":\"0\",\"Wash\":\"0\",\"totalWater_filter_1\":\"2272\",\"totalWater_filter_2\":\"2272\",\"totalWater_filter_3\"\"totalWater_filter_4\"\"totalWater_filter_5\"\"water_temp\":\"26\",\"work_mode\":\"0\"}","mode":"0","communicationMethod":"","isDeleted""deleterUserId":"00000000-0000-0000-0000-000000000000","deletionTime":"0001-01-01T00:00:00","creationTime":"2023-11-02T09:59:20.354","creatorUserId":"00000000-0000-0000-0000-000000000000","lastModificationTime":"2023-11-02T09:59:20.3692973Z","lastModifierUserId":"00000000-0000-0000-0000-000000000000","id":"38239f21-2629-4a9b-a27e-8e97c6b775f4"}', '', '{''service.layer'':''service'',''service.name'':''service'',''service.namespace'':''Test'',''service.version'':''1.0.0'',''service.instance.id'':''c6bcf682-8028-44bd-92d2-188b99658ce3'',''telemetry.sdk.name'':''opentelemetry'',''telemetry.sdk.language'':''dotnet'',''telemetry.sdk.version'':''1.5.1''}', '', '', '', '{}', '{''RequestId'':''0HMUQMQ237O8Q:00000BF7'',''RequestPath'':''/api/iotsub/MQOperation'',''dotnet.ilogger.category'':''Services.Device.Infrastructure.Repositories.MessageRepository'',''{OriginalFormat}'':''KafKa写入{"type":2,"dataType":1,"content":"{\\"appProperty\\":{\\"dataTimestamp\\":1698919160336,\\"datastream\\":\\"dp_upload\\",\\"deviceId\\":\\"787320964\\"},\\"body\\":{\\"common\\":{\\"iot_uid\\":\\"10000008\\",\\"mode\\":\\"0\\",\\"rssi\\":\\"10,99\\",\\"serial\\":1684450839},\\"data\\":\\"hVRDJl/wyQHEkxJ5/xNQp/N7aJTowIpyxtxzUvDABVwMPki1u7b85soGhmGBDoi0W+ySz4J8XU1jRzIbrprEcG0dGA9xoobdFL4N0rL+GiI0xxHxSqr/r1xjnscD8P1VR64aJYq9dBXsN37n/R3F+BYkWXagAczqmLUQvDe56ZPQEmnAHM1pL4BanNw9vAu5aShEHg2TR2z1WpG2NOrAug==\\"},\\"sysProperty\\":{\\"messageType\\":\\"deviceDatapoint\\",\\"productId\\":\\"300888\\"},\\"PubTime\\":1698919160408}","deviceInfoId":"dcd65148-ab5c-44ec-940a-038aad7114cc","serial":"1684450839","result":"","decodeContent":"{\\"curState\\":\\"2\\",\\"totalWater\\":\\"2272\\",\\"WaterFlow\\":\\"261\\",\\"life1\\":\\"100000\\",\\"life2\\":\\"72126\\",\\"life3\\"\\"life4\\"\\"life5\\"\\"max_life1\\":\\"100000\\",\\"max_life2\\":\\"100000\\",\\"max_life3\\"\\"max_life4\\"\\"max_life5\\"\\"active1\\":\\"1267478839\\",\\"active2\\":\\"68360197\\",\\"active3\\"\\"active4\\"\\"active5\\"\\"inWater\\":\\"105\\",\\"pureWater\\":\\"3\\",\\"alarm\\":\\"1\\",\\"Lock\\":\\"2\\",\\"inWaterOffset\\":\\"0\\",\\"pureWaterOffset\\":\\"0\\",\\"Wash\\":\\"0\\",\\"totalWater_filter_1\\":\\"2272\\",\\"totalWater_filter_2\\":\\"2272\\",\\"totalWater_filter_3\\"\\"totalWater_filter_4\\"\\"totalWater_filter_5\\"\\"water_temp\\":\\"26\\",\\"work_mode\\":\\"0\\"}","mode":"0","communicationMethod":"","isDeleted""deleterUserId":"00000000-0000-0000-0000-000000000000","deletionTime":"0001-01-01T00:00:00","creationTime":"2023-11-02T09:59:20.354","creatorUserId":"00000000-0000-0000-0000-000000000000","lastModificationTime":"2023-11-02T09:59:20.3692973Z","lastModifierUserId":"00000000-0000-0000-0000-000000000000","id":"38239f21-2629-4a9b-a27e-8e97c6b775f4"}'',''SpanId'':''6717dd37c6bbe77a'',''TraceId'':''9f6eb756b0368219564bbfad593477f7'',''ParentId'':''0000000000000000'',''ConnectionId'':''0HMUQMQ237O8Q''}'); diff --git a/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Data/otel_trace.txt b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Data/otel_trace.txt new file mode 100644 index 000000000..0bc804ed9 --- /dev/null +++ b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Data/otel_trace.txt @@ -0,0 +1,69 @@ +CREATE TABLE otel_traces +( + + `Timestamp` DateTime64(9) CODEC(Delta(8), ZSTD(1)), + + `TraceId` String CODEC(ZSTD(1)), + + `SpanId` String CODEC(ZSTD(1)), + + `ParentSpanId` String CODEC(ZSTD(1)), + + `TraceState` String CODEC(ZSTD(1)), + + `SpanName` LowCardinality(String) CODEC(ZSTD(1)), + + `SpanKind` LowCardinality(String) CODEC(ZSTD(1)), + + `ServiceName` LowCardinality(String) CODEC(ZSTD(1)), + + `ResourceAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + + `ScopeName` String CODEC(ZSTD(1)), + + `ScopeVersion` String CODEC(ZSTD(1)), + + `SpanAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + + `Duration` Int64 CODEC(ZSTD(1)), + + `StatusCode` LowCardinality(String) CODEC(ZSTD(1)), + + `StatusMessage` String CODEC(ZSTD(1)), + + `Events.Timestamp` Array(DateTime64(9)) CODEC(ZSTD(1)), + + `Events.Name` Array(LowCardinality(String)) CODEC(ZSTD(1)), + + `Events.Attributes` Array(Map(LowCardinality(String), String)) CODEC(ZSTD(1)), + + `Links.TraceId` Array(String) CODEC(ZSTD(1)), + + `Links.SpanId` Array(String) CODEC(ZSTD(1)), + + `Links.TraceState` Array(String) CODEC(ZSTD(1)), + + `Links.Attributes` Array(Map(LowCardinality(String), String)) CODEC(ZSTD(1)), + + INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, + + INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + + INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + + INDEX idx_span_attr_key mapKeys(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + + INDEX idx_span_attr_value mapValues(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + + INDEX idx_duration Duration TYPE minmax GRANULARITY 1 +) +ENGINE = MergeTree +PARTITION BY toDate(Timestamp) +ORDER BY (ServiceName, + SpanName, + toUnixTimestamp(Timestamp), + TraceId) +TTL toDateTime(Timestamp) + toIntervalDay(30) +SETTINGS index_granularity = 8192, + ttl_only_drop_parts = 1; + diff --git a/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Data/otel_trace_data.txt b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Data/otel_trace_data.txt new file mode 100644 index 000000000..5b16494df --- /dev/null +++ b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Data/otel_trace_data.txt @@ -0,0 +1,5 @@ + INSERT INTO otel_traces (`Timestamp`, TraceId, SpanId, ParentSpanId, TraceState, SpanName, SpanKind, ServiceName, ResourceAttributes, ScopeName, ScopeVersion, SpanAttributes, Duration, StatusCode, StatusMessage, `Events.Timestamp`, `Events.Name`, `Events.Attributes`, `Links.TraceId`, `Links.SpanId`, `Links.TraceState`, `Links.Attributes`) +VALUES +('2023-11-02 09:39:30.865', '2821e37c871ee22bcc10b43ddd7ca657', '402f0ed4040433e6', '1c9a707a29a9e449', '', 'HTTP POST', 'SPAN_KIND_CLIENT', 'service', '{''service.instance.id'':''c56534c5-2f62-48e8-82a5-6d8ecdbdd7de'',''telemetry.sdk.name'':''opentelemetry'',''telemetry.sdk.language'':''dotnet'',''telemetry.sdk.version'':''1.5.1'',''service.layer'':''service'',''service.name'':''service'',''service.namespace'':''Test'',''service.version'':''1.0.0''}', 'OpenTelemetry.Instrumentation.Http.HttpClient', '1.0.0.0', '{''http.flavor'':''1.1'',''host.name'':''service-device-676656994-xbnnq'',''http.request_content_body'':''{"deviceInfoId":"5EF36830-438D-4B7F-BA15-AF3A138412BA","activeTime":"2022-03-26T03:15:56.878289","active1":"","active2":"","active3":"","active4":"","active5":"","life1":"","life2":"","life3":"","life4":"","life5":"","maxLife1":"","maxLife2":"","maxLife3":"","maxLife4":"","maxLife5":"","lock":""}'',''peer.service'':''127.0.0.1:3500'',''http.scheme'':''http'',''http.method'':''POST'',''net.peer.name'':''127.0.0.1'',''net.peer.port'':''3500'',''http.url'':''http://127.0.0.1:3500/v1.0/invoke/service/method/api/filter/ActivateDeviceFilter'',''http.status_code'':''200''}', 4828300, 'STATUS_CODE_UNSET', '', [], [], [], [], [], [], []), +('2023-11-02 09:39:30.865', '2821e37c871ee22bcc10b43ddd7ca657', '402f0ed4040433e6', '1c9a707a29a9e449', '', 'HTTP POST', 'SPAN_KIND_CLIENT', 'service', '{''service.instance.id'':''c56534c5-2f62-48e8-82a5-6d8ecdbdd7de'',''telemetry.sdk.name'':''opentelemetry'',''telemetry.sdk.language'':''dotnet'',''telemetry.sdk.version'':''1.5.1'',''service.layer'':''service'',''service.name'':''service'',''service.namespace'':''Test'',''service.version'':''1.0.0''}', 'OpenTelemetry.Instrumentation.Http.HttpClient', '1.0.0.0', '{''http.flavor'':''1.1'',''host.name'':''service-device-676656994-xbnnq'',''http.request_content_body'':''{"deviceInfoId":"C829B9EE-E911-44F8-8E74-1EF991DA6803","activeTime":"2022-03-26T03:15:56.878289","active1":"","active2":"","active3":"","active4":"","active5":"","life1":"","life2":"","life3":"","life4":"","life5":"","maxLife1":"","maxLife2":"","maxLife3":"","maxLife4":"","maxLife5":"","lock":""}'',''peer.service'':''127.0.0.1:3500'',''http.scheme'':''http'',''http.method'':''POST'',''net.peer.name'':''127.0.0.1'',''net.peer.port'':''3500'',''http.url'':''http://127.0.0.1:3500/v1.0/invoke/service/method/api/filter/ActivateDeviceFilter'',''http.status_code'':''200''}', 4828300, 'STATUS_CODE_UNSET', '', [], [], [], [], [], [], []), +('2023-11-02 09:39:30.865', '2821e37c871ee22bcc10b43ddd7ca657', '402f0ed4040433e6', '1c9a707a29a9e449', '', 'HTTP POST', 'SPAN_KIND_CLIENT', 'service', '{''service.instance.id'':''c56534c5-2f62-48e8-82a5-6d8ecdbdd7de'',''telemetry.sdk.name'':''opentelemetry'',''telemetry.sdk.language'':''dotnet'',''telemetry.sdk.version'':''1.5.1'',''service.layer'':''service'',''service.name'':''service'',''service.namespace'':''Test'',''service.version'':''1.0.0''}', 'OpenTelemetry.Instrumentation.Http.HttpClient', '1.0.0.0', '{''http.flavor'':''1.1'',''host.name'':''service-device-676656994-xbnnq'',''http.request_content_body'':''{"deviceInfoId":"8995f797-0385-4b27-9452-181eb0bb4079","activeTime":"2022-03-26T03:15:56.878289","active1":"","active2":"","active3":"","active4":"","active5":"","life1":"","life2":"","life3":"","life4":"","life5":"","maxLife1":"","maxLife2":"","maxLife3":"","maxLife4":"","maxLife5":"","lock":""}'',''peer.service'':''127.0.0.1:3500'',''http.scheme'':''http'',''http.method'':''POST'',''net.peer.name'':''127.0.0.1'',''net.peer.port'':''3500'',''http.url'':''http://127.0.0.1:3500/v1.0/invoke/service/method/api/filter/ActivateDeviceFilter'',''http.status_code'':''200''}', 4828300, 'STATUS_CODE_UNSET', '', [], [], [], [], [], [], []) \ No newline at end of file diff --git a/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/LogServiceTests.cs b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/LogServiceTests.cs new file mode 100644 index 000000000..188235712 --- /dev/null +++ b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/LogServiceTests.cs @@ -0,0 +1,89 @@ +// Copyright (c) MASA Stack All rights reserved. +// Licensed under the MIT License. See LICENSE.txt in the project root for license information. + +namespace Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests; + +[TestClass] +public class LogServiceTests +{ + private static ILogService logService; + private readonly DateTime startTime= DateTime.Parse("2023-11-02 09:00:00"); + + [ClassInitialize] + public static void Initialized(TestContext testContext) + { + Common.InitTable(true); + Common.InitTable(false); + var services = new ServiceCollection(); + services.AddLogging(builder => builder.AddConsole()); + services.AddMASAStackClickhouse(Consts.ConnectionString, "custom_log", "custom_trace"); + Common.InitTableData(true); + logService = services.BuildServiceProvider().GetRequiredService(); + } + + [TestMethod] + public async Task QueryListTest() + { + var query = new BaseRequestDto + { + Page = 1, + PageSize = 10, + Start = startTime, + End = startTime.AddHours(1), + Keyword="Kafka", + Conditions = new List { + new FieldConditionDto{ + Name="Resource.service.name", + Type= ConditionTypes.Equal, + Value="service" + }, + new FieldConditionDto{ + Name="Resource.service.namespace", + Type=ConditionTypes.NotEqual, + Value="Test" + }, + new FieldConditionDto{ + Name="Resource.service.name", + Type=ConditionTypes.In, + Value=new List{ "service" } + }, + new FieldConditionDto{ + Name="Resource.service.name", + Type= ConditionTypes.NotIn, + Value=new List{"a","b" } + } + }, + }; + var result = await logService.ListAsync(query); + Assert.IsNotNull(result); + } + + [TestMethod] + public async Task MappingTest() + { + var mapping = await logService.GetMappingAsync(); + Assert.IsNotNull(mapping); + } + + [TestMethod] + public async Task AggTest() + { + var request = new SimpleAggregateRequestDto + { + Name = "Resource.service.name", + Type = AggregateTypes.Count, + Start = startTime, + End = startTime.AddHours(1), + }; + var result = await logService.AggregateAsync(request); + Assert.IsNotNull(result); + var num1 = Convert.ToInt64(result); + + request.Name = "Timestamp"; + request.Type = AggregateTypes.DateHistogram; + request.Interval = "5m"; + result = await logService.AggregateAsync(request); + Assert.IsNotNull(result); + Assert.IsTrue(result is IEnumerable); + } +} diff --git a/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests.csproj b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests.csproj new file mode 100644 index 000000000..7eb4caba2 --- /dev/null +++ b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests.csproj @@ -0,0 +1,56 @@ + + + + net6.0 + enable + enable + false + true + + + + + + + + + + + + Always + + + Always + + + Always + + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + Always + + + + \ No newline at end of file diff --git a/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/TraceServiceTests.cs b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/TraceServiceTests.cs new file mode 100644 index 000000000..85d1b2e53 --- /dev/null +++ b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/TraceServiceTests.cs @@ -0,0 +1,80 @@ +// Copyright (c) MASA Stack All rights reserved. +// Licensed under the MIT License. See LICENSE.txt in the project root for license information. + +namespace Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests; + +[TestClass] +public class TraceServiceTests +{ + private static ITraceService traceService; + private readonly DateTime startTime = DateTime.Parse("2023-11-02 09:00:00"); + + [ClassInitialize] + public static void Initialized(TestContext testContext) + { + var services = new ServiceCollection(); + services.AddLogging(builder=>builder.AddConsole()); + services.AddMASAStackClickhouse(Consts.ConnectionString,"custom_log", "custom_trace"); + Common.InitTableData(false); + traceService = services.BuildServiceProvider().GetRequiredService(); + } + + [TestMethod] + public async Task QueryListTest() + { + var query = new BaseRequestDto + { + Page = 1, + PageSize = 10, + Start = startTime, + End = startTime.AddHours(1) + }; + var result = await traceService.ListAsync(query); + Assert.IsNotNull(result); + } + + [TestMethod] + public async Task TraceIdTest() + { + var result = await traceService.GetAsync("3a749e0df4bde3713ea47ed0b8efe83f"); + Assert.IsNotNull(result); + } + + [TestMethod] + public async Task AggTest() + { + var request = new SimpleAggregateRequestDto + { + Name = "Resource.service.name", + Type = AggregateTypes.Count, + Start = startTime, + End = startTime.AddHours(1), + }; + var result = await traceService.AggregateAsync(request); + Assert.IsNotNull(result); + var num1 = Convert.ToInt64(result); + + request.Type = AggregateTypes.DistinctCount; + result = await traceService.AggregateAsync(request); + var num2 = Convert.ToInt64(result); + Assert.IsTrue(num1 - num2 >= 0); + + request.Type = AggregateTypes.GroupBy; + result = await traceService.AggregateAsync(request); + Assert.IsTrue(result is IEnumerable); + + request.Name = "Duration"; + request.Type = AggregateTypes.Avg; + result = await traceService.AggregateAsync(request); + + request.Type = AggregateTypes.Sum; + result = await traceService.AggregateAsync(request); + + request.Name = "Timestamp"; + request.Type = AggregateTypes.DateHistogram; + request.Interval = "5m"; + result = await traceService.AggregateAsync(request); + Assert.IsNotNull(result); + Assert.IsTrue(result is IEnumerable); + } +} diff --git a/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/_Imports.cs b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/_Imports.cs new file mode 100644 index 000000000..27a16da29 --- /dev/null +++ b/src/Contrib/StackSdks/Tests/Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests/_Imports.cs @@ -0,0 +1,10 @@ +// Copyright (c) MASA Stack All rights reserved. +// Licensed under the MIT License. See LICENSE.txt in the project root for license information. + +global using ClickHouse.Ado; +global using Masa.BuildingBlocks.StackSdks.Tsc.Contracts.Model; +global using Masa.BuildingBlocks.StackSdks.Tsc.Contracts.Model.Aggregate; +global using Masa.BuildingBlocks.StackSdks.Tsc.Contracts.Service; +global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.Logging; +global using Microsoft.VisualStudio.TestTools.UnitTesting;