Skip to content

Commit

Permalink
fix: fix query in and not in error;
Browse files Browse the repository at this point in the history
     fix init table sql;
     fix query time using UTC time
  • Loading branch information
Qinyouzeng committed Nov 16, 2023
1 parent 30cca58 commit d600609
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 232 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,9 @@ public static class ServiceExtensitions
{
internal static ILogger? Logger { get; private set; }

public static IServiceCollection AddMASAStackClickhouse(this IServiceCollection services, string connectionStr, string? logTable = null, string? traceTable = null)
public static IServiceCollection AddMASAStackClickhouse(this IServiceCollection services, string connectionStr, string logTable, string traceTable, string? logSourceTable = null, string? traceSourceTable = null)
{
services.AddScoped(services => new MasaStackClickhouseConnection(connectionStr, logTable, traceTable))
.AddScoped<ILogService, LogService>()
.AddScoped<ITraceService, TraceService>();
Init(services, false);
return services;
}

public static IServiceCollection AddMASAStackClickhouse(this IServiceCollection services, string connectionStr, string logTable, string traceTable, string logSourceTable, string traceSourceTable)
{
services.AddScoped(services => new MasaStackClickhouseConnection(connectionStr, logTable, logSourceTable, traceTable, traceSourceTable))
services.AddScoped(services => new MasaStackClickhouseConnection(connectionStr, logTable, traceTable, logSourceTable, traceSourceTable))
.AddScoped<ILogService, LogService>()
.AddScoped<ITraceService, TraceService>();
Init(services);
Expand All @@ -41,152 +32,156 @@ private static void InitTable(MasaStackClickhouseConnection connection)
var database = connection.ConnectionSettings?.Database;
database ??= new ClickHouseConnectionSettings(connection.ConnectionString).Database;

if (Convert.ToInt32(connection.ExecuteScalar($"select * from system.tables where database ='{database}' and name in ['{MasaStackClickhouseConnection.TraceTable}','{MasaStackClickhouseConnection.LogTable}']")) > 0)
if (Convert.ToInt32(connection.ExecuteScalar($"select count() from system.tables where database ='{database}' and name in ['{MasaStackClickhouseConnection.TraceTable.Split('.')[1]}','{MasaStackClickhouseConnection.LogTable.Split('.')[1]}']")) > 0)
return;

var createTableSqls = new string[]{ @$"CREATE TABLE {MasaStackClickhouseConnection.LogTable}
(
var createTableSqls = new string[]{

@$"CREATE TABLE {MasaStackClickhouseConnection.LogTable}
(
`Timestamp` DateTime64(9) CODEC(Delta(8), ZSTD(1)),
`TraceId` String CODEC(ZSTD(1)),
`SpanId` String CODEC(ZSTD(1)),
`TraceFlags` UInt32 CODEC(ZSTD(1)),
`SeverityText` LowCardinality(String) CODEC(ZSTD(1)),
`SeverityNumber` Int32 CODEC(ZSTD(1)),
`ServiceName` LowCardinality(String) CODEC(ZSTD(1)),
`Body` String CODEC(ZSTD(1)),
`ResourceSchemaUrl` String CODEC(ZSTD(1)),
`ResourceAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
`Resources` String CODEC(ZSTD(1)),
`ScopeSchemaUrl` String CODEC(ZSTD(1)),
`ScopeName` String CODEC(ZSTD(1)),
`ScopeVersion` String CODEC(ZSTD(1)),
`ScopeAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
`LogAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_scope_attr_key mapKeys(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_scope_attr_value mapValues(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_log_attr_key mapKeys(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_log_attr_value mapValues(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1
`Scopes` String CODEC(ZSTD(1)),
`Logs` String CODEC(ZSTD(1)),
`Resource.service.namespace` String CODEC(ZSTD(1)),
`Resource.service.version` String CODEC(ZSTD(1)),
`Resource.service.instance.id` String CODEC(ZSTD(1)),
`Attributes.TaskId` String CODEC(ZSTD(1)),
`Attributes.exception.message` String CODEC(ZSTD(1)),
ResourceAttributesKeys Array(String) CODEC(ZSTD(1)),
ResourceAttributesValues Array(String) CODEC(ZSTD(1)),
LogAttributesKeys Array(String) CODEC(ZSTD(1)),
LogAttributesValues Array(String) CODEC(ZSTD(1)),
INDEX idx_log_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_log_servicename ServiceName TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_log_serviceinstanceid `Resource.service.instance.id` TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_log_severitytext SeverityText TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_log_taskid `Attributes.TaskId` TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_string_body Body TYPE tokenbf_v1(30720, 2, 0) GRANULARITY 1,
INDEX idx_string_exceptionmessage Attributes.exception.message TYPE tokenbf_v1(30720, 2, 0) GRANULARITY 1
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(Timestamp)
PARTITION BY toDate(Timestamp)
ORDER BY (
Timestamp,
ServiceName,
SeverityText,
TraceId,
SpanId
)
Timestamp,
`Resource.service.namespace`,
ServiceName
)
TTL toDateTime(Timestamp) + toIntervalDay(30)
SETTINGS index_granularity = 8192,
ttl_only_drop_parts = 1;
",
@$"CREATE TABLE {MasaStackClickhouseConnection.TraceTable}
(
`Timestamp` DateTime64(9) CODEC(Delta(8),
ZSTD(1)),
`Timestamp` DateTime64(9) CODEC(Delta(8), ZSTD(1)),
`TraceId` String CODEC(ZSTD(1)),
`SpanId` String CODEC(ZSTD(1)),
`ParentSpanId` String CODEC(ZSTD(1)),
`TraceState` String CODEC(ZSTD(1)),
`SpanName` LowCardinality(String) CODEC(ZSTD(1)),
`SpanKind` LowCardinality(String) CODEC(ZSTD(1)),
`ServiceName` LowCardinality(String) CODEC(ZSTD(1)),
`ResourceAttributes` Map(LowCardinality(String),
String) CODEC(ZSTD(1)),
`Resources` String CODEC(ZSTD(1)),
`ScopeName` String CODEC(ZSTD(1)),
`ScopeVersion` String CODEC(ZSTD(1)),
`SpanAttributes` Map(LowCardinality(String),
String) CODEC(ZSTD(1)),
`Spans` String CODEC(ZSTD(1)),
`Duration` Int64 CODEC(ZSTD(1)),
`StatusCode` LowCardinality(String) CODEC(ZSTD(1)),
`StatusMessage` String CODEC(ZSTD(1)),
`Events.Timestamp` Array(DateTime64(9)) CODEC(ZSTD(1)),
`Events.Name` Array(LowCardinality(String)) CODEC(ZSTD(1)),
`Events.Attributes` Array(Map(LowCardinality(String),
String)) CODEC(ZSTD(1)),
`Events.Attributes` Array(Map(LowCardinality(String), String)) CODEC(ZSTD(1)),
`Links.TraceId` Array(String) CODEC(ZSTD(1)),
`Links.SpanId` Array(String) CODEC(ZSTD(1)),
`Links.TraceState` Array(String) CODEC(ZSTD(1)),
`Links.Attributes` Array(Map(LowCardinality(String),
String)) CODEC(ZSTD(1)),
`Links.Attributes` Array(Map(LowCardinality(String), String)) CODEC(ZSTD(1)),
`Resource.service.namespace` String CODEC(ZSTD(1)),
`Resource.service.version` String CODEC(ZSTD(1)),
`Resource.service.instance.id` String CODEC(ZSTD(1)),
`Attributes.http.status_code` String CODEC(ZSTD(1)),
`Attributes.http.response_content_body` String CODEC(ZSTD(1)),
`Attributes.http.request_content_body` String CODEC(ZSTD(1)),
`Attributes.http.target` String CODEC(ZSTD(1)),
`Attributes.exception.message` String CODEC(ZSTD(1)),
`ResourceAttributesKeys` Array(String) CODEC(ZSTD(1)),
`ResourceAttributesValues` Array(String) CODEC(ZSTD(1)),
`SpanAttributesKeys` Array(String) CODEC(ZSTD(1)),
`SpanAttributesValues` Array(String) CODEC(ZSTD(1)),
INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_span_attr_key mapKeys(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_span_attr_value mapValues(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_duration Duration TYPE minmax GRANULARITY 1
INDEX idx_trace_servicename ServiceName TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_trace_servicenamespace Resource.service.namespace TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_trace_serviceinstanceid Resource.service.instance.id TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_trace_statuscode Attributes.http.status_code TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_string_requestbody Attributes.http.request_content_body TYPE tokenbf_v1(30720, 2, 0) GRANULARITY 1,
INDEX idx_string_responsebody Attributes.http.response_content_body TYPE tokenbf_v1(30720, 2, 0) GRANULARITY 1,
INDEX idx_string_exceptionmessage Attributes.exception.message TYPE tokenbf_v1(30720, 2, 0) GRANULARITY 1
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(Timestamp)
PARTITION BY toDate(Timestamp)
ORDER BY (
Timestamp,
ServiceName,
TraceId
Timestamp,
Resource.service.namespace,
ServiceName
)
TTL toDateTime(Timestamp) + toIntervalDay(30)
SETTINGS index_granularity = 8192,
ttl_only_drop_parts = 1;
",
$@"CREATE MATERIALIZED VIEW {MasaStackClickhouseConnection.LogTable}_v TO {MasaStackClickhouseConnection.LogTable}
AS
SELECT * FROM {MasaStackClickhouseConnection.LogSourceTable};
SELECT
Timestamp,TraceId,SpanId,TraceFlags,SeverityText,SeverityNumber,ServiceName,Body,ResourceSchemaUrl,toJSONString(ResourceAttributes) as Resources,
ScopeSchemaUrl,ScopeName,ScopeVersion,toJSONString(ScopeAttributes) as Scopes,toJSONString(LogAttributes) as Logs,
ResourceAttributes['service.namespace'] as `Resource.service.namespace`,ResourceAttributes['service.version'] as `Resource.service.version`,
ResourceAttributes['service.instance.id'] as `Resource.service.instance.id`,
LogAttributes['TaskId'] as `Attributes.TaskId`,LogAttributes['exception.message'] as `Attributes.exception.message`,
mapKeys(ResourceAttributes) as ResourceAttributesKeys,mapValues(ResourceAttributes) as ResourceAttributesValues,
mapKeys(LogAttributes) as LogAttributesKeys,mapValues(LogAttributes) as LogAttributesValues
FROM {MasaStackClickhouseConnection.LogSourceTable};
",
$@"CREATE MATERIALIZED VIEW {MasaStackClickhouseConnection.TraceTable}_v TO {MasaStackClickhouseConnection.TraceTable}
AS
SELECT * FROM {MasaStackClickhouseConnection.TraceSourceTable};
SELECT
Timestamp,TraceId,SpanId,ParentSpanId,TraceState,SpanName,SpanKind,ServiceName,toJSONString(ResourceAttributes) AS Resources,
ScopeName,ScopeVersion,toJSONString(SpanAttributes) AS Spans,
Duration,StatusCode,StatusMessage,Events.Timestamp,Events.Name,Events.Attributes,
Links.TraceId,Links.SpanId,Links.TraceState,Links.Attributes,
ResourceAttributes['service.namespace'] as `Resource.service.namespace`,ResourceAttributes['service.version'] as `Resource.service.version`,
ResourceAttributes['service.instance.id'] as `Resource.service.instance.id`,
SpanAttributes['http.status_code'] as `Attributes.http.status_code`,
SpanAttributes['http.response_content_body'] as `Attributes.http.response_content_body`,
SpanAttributes['http.request_content_body'] as `Attributes.http.request_content_body`,
SpanAttributes['http.target'] as `Attributes.http.target`,
SpanAttributes['exception.message'] as `Attributes.exception.message`,
mapKeys(ResourceAttributes) AS ResourceAttributesKeys,
mapValues(ResourceAttributes) AS ResourceAttributesValues,
mapKeys(SpanAttributes) AS SpanAttributesKeys,
mapValues(SpanAttributes) AS SpanAttributesValues
FROM {MasaStackClickhouseConnection.TraceSourceTable};
" };

foreach (var sql in createTableSqls)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,15 @@ internal class MasaStackClickhouseConnection : ClickHouseConnection

public static string MappingTable { get; private set; }

public MasaStackClickhouseConnection(string connection, string? logTable = null, string? traceTable = null)
{
ArgumentNullException.ThrowIfNull(connection);
ConnectionString = connection;
logTable ??= "otel_logs";
traceTable ??= "otel_traces";
if (!string.IsNullOrEmpty(ConnectionSettings.Database))
{
LogTable = $"{ConnectionSettings.Database}.{logTable}";
TraceTable = $"{ConnectionSettings.Database}.{traceTable}";
MappingTable = $"{ConnectionSettings.Database}.otel_mapping";
}
else
{
LogTable = logTable;
TraceTable = traceTable;
MappingTable = "otel_mapping";
}
}

public MasaStackClickhouseConnection(string connection, string logTable, string logSourceTable, string traceTable, string traceSourceTable)
public MasaStackClickhouseConnection(string connection, string logTable, string traceTable, string? logSourceTable = null, string? traceSourceTable = null)
{
ArgumentNullException.ThrowIfNull(connection);
ArgumentNullException.ThrowIfNull(logTable);
ArgumentNullException.ThrowIfNull(logSourceTable);
ArgumentNullException.ThrowIfNull(traceTable);
ArgumentNullException.ThrowIfNull(traceSourceTable);
ConnectionString = connection;
logSourceTable ??= "otel_logs";
traceSourceTable ??= "otel_traces";

if (!string.IsNullOrEmpty(ConnectionSettings.Database))
{
LogTable = $"{ConnectionSettings.Database}.{logTable}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
global using Masa.Utils.Models;
global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.Logging;
global using System.Collections;
global using System.Data;
global using System.Data.Common;
global using System.Text;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Masa.Contrib.StackSdks.Tsc.Clickhouse.Tests;

internal class Common
{
public static void InitTableData(bool isLog)
public static void InitTable(bool isLog)
{
var name = isLog ? "log" : "trace";
using var connection = new ClickHouseConnection(Consts.ConnectionString);
Expand All @@ -26,4 +26,19 @@ public static void InitTableData(bool isLog)
cmd.ExecuteNonQuery();
}
}

public static void InitTableData(bool isLog)
{
var name = isLog ? "log" : "trace";
using var connection = new ClickHouseConnection(Consts.ConnectionString);
connection.Open();
using var cmd = connection.CreateCommand();
var path = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Data/otel_{name}_data.txt");
using (var dataReader = new StreamReader(path))
{
var sql = dataReader.ReadToEnd();
cmd.CommandText = sql;
cmd.ExecuteNonQuery();
}
}
}

This file was deleted.

Loading

0 comments on commit d600609

Please sign in to comment.