diff --git a/cmd/signozschemamigrator/schema_migrator/traces_migrations.go b/cmd/signozschemamigrator/schema_migrator/traces_migrations.go index e987853e..9fd41a15 100644 --- a/cmd/signozschemamigrator/schema_migrator/traces_migrations.go +++ b/cmd/signozschemamigrator/schema_migrator/traces_migrations.go @@ -536,4 +536,55 @@ var TracesMigrations = []SchemaMigrationRecord{ // no point of down here as we don't use these }, }, + { + MigrationID: 1004, + UpItems: []Operation{ + CreateTableOperation{ + Database: "signoz_traces", + Table: "tag_attributes_v2", + Columns: []Column{ + {Name: "unix_milli", Type: ColumnTypeInt64, Codec: "Delta(8), ZSTD(1)"}, + {Name: "tag_key", Type: ColumnTypeString, Codec: "ZSTD(1)"}, + {Name: "tag_type", Type: LowCardinalityColumnType{ColumnTypeString}, Codec: "ZSTD(1)"}, + {Name: "tag_data_type", Type: LowCardinalityColumnType{ColumnTypeString}, Codec: "ZSTD(1)"}, + {Name: "string_value", Type: ColumnTypeString, Codec: "ZSTD(1)"}, + {Name: "number_value", Type: NullableColumnType{ColumnTypeFloat64}, Codec: "ZSTD(1)"}, + }, + Indexes: []Index{ + {Name: "string_value_index", Expression: "string_value", Type: "ngrambf_v1(4, 1024, 3, 0)", Granularity: 1}, + {Name: "number_value_index", Expression: "number_value", Type: "minmax", Granularity: 1}, + }, + Engine: ReplacingMergeTree{ + MergeTree: MergeTree{ + PartitionBy: "toDate(unix_milli / 1000)", + OrderBy: "(tag_key, tag_type, tag_data_type, string_value, number_value)", + TTL: "toDateTime(unix_milli / 1000) + toIntervalSecond(1296000)", + Settings: TableSettings{ + {Name: "index_granularity", Value: "8192"}, + {Name: "ttl_only_drop_parts", Value: "1"}, + {Name: "allow_nullable_key", Value: "1"}, + }, + }, + }, + }, + CreateTableOperation{ + Database: "signoz_traces", + Table: "distributed_tag_attributes_v2", + Columns: []Column{ + {Name: "unix_milli", Type: ColumnTypeInt64, Codec: "Delta(8), ZSTD(1)"}, + {Name: "tag_key", Type: ColumnTypeString, Codec: "ZSTD(1)"}, + {Name: "tag_type", Type: LowCardinalityColumnType{ColumnTypeString}, Codec: "ZSTD(1)"}, + {Name: "tag_data_type", Type: LowCardinalityColumnType{ColumnTypeString}, Codec: "ZSTD(1)"}, + {Name: "string_value", Type: ColumnTypeString, Codec: "ZSTD(1)"}, + {Name: "number_value", Type: NullableColumnType{ColumnTypeFloat64}, Codec: "ZSTD(1)"}, + }, + Engine: Distributed{ + Database: "signoz_traces", + Table: "tag_attributes_v2", + ShardingKey: "cityHash64(rand())", + }, + }, + }, + DownItems: []Operation{}, + }, } diff --git a/exporter/clickhousetracesexporter/clickhouse_factory.go b/exporter/clickhousetracesexporter/clickhouse_factory.go index 475f218d..2d30a263 100644 --- a/exporter/clickhousetracesexporter/clickhouse_factory.go +++ b/exporter/clickhousetracesexporter/clickhouse_factory.go @@ -117,13 +117,16 @@ func (f *Factory) CreateSpanWriter() (Writer, error) { indexTable: cfg.IndexTable, errorTable: cfg.ErrorTable, attributeTable: cfg.AttributeTable, + attributeTableV2: cfg.AttributeTableV2, attributeKeyTable: cfg.AttributeKeyTable, encoding: cfg.Encoding, exporterId: cfg.ExporterId, - useNewSchema: cfg.UseNewSchema, - indexTableV3: cfg.IndexTableV3, - resourceTableV3: cfg.ResourceTableV3, + useNewSchema: cfg.UseNewSchema, + indexTableV3: cfg.IndexTableV3, + resourceTableV3: cfg.ResourceTableV3, + maxDistinctValues: cfg.MaxDistinctValues, + fetchKeysInterval: cfg.FetchKeysInterval, }) } @@ -141,13 +144,16 @@ func (f *Factory) CreateArchiveSpanWriter() (Writer, error) { indexTable: cfg.IndexTable, errorTable: cfg.ErrorTable, attributeTable: cfg.AttributeTable, + attributeTableV2: cfg.AttributeTableV2, attributeKeyTable: cfg.AttributeKeyTable, encoding: cfg.Encoding, exporterId: cfg.ExporterId, - useNewSchema: cfg.UseNewSchema, - indexTableV3: cfg.IndexTableV3, - resourceTableV3: cfg.ResourceTableV3, + useNewSchema: cfg.UseNewSchema, + indexTableV3: cfg.IndexTableV3, + resourceTableV3: cfg.ResourceTableV3, + maxDistinctValues: cfg.MaxDistinctValues, + fetchKeysInterval: cfg.FetchKeysInterval, }) } diff --git a/exporter/clickhousetracesexporter/config.go b/exporter/clickhousetracesexporter/config.go index 31cd57b2..48f4cda0 100644 --- a/exporter/clickhousetracesexporter/config.go +++ b/exporter/clickhousetracesexporter/config.go @@ -16,12 +16,18 @@ package clickhousetracesexporter import ( "fmt" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter/exporterhelper" ) +type AttributesLimits struct { + FetchKeysInterval time.Duration `mapstructure:"fetch_keys_interval" default:"10m"` + MaxDistinctValues int `mapstructure:"max_distinct_values" default:"25000"` +} + // Config defines configuration for tracing exporter. type Config struct { Options `mapstructure:",squash"` @@ -32,6 +38,8 @@ type Config struct { configretry.BackOffConfig `mapstructure:"retry_on_failure"` exporterhelper.QueueConfig `mapstructure:"sending_queue"` UseNewSchema bool `mapstructure:"use_new_schema" default:"false"` + + AttributesLimits AttributesLimits `mapstructure:"attributes_limits"` } var _ component.Config = (*Config)(nil) diff --git a/exporter/clickhousetracesexporter/config_test.go b/exporter/clickhousetracesexporter/config_test.go index fac2df05..7481f45a 100644 --- a/exporter/clickhousetracesexporter/config_test.go +++ b/exporter/clickhousetracesexporter/config_test.go @@ -52,5 +52,9 @@ func Test_loadConfig(t *testing.T) { NumConsumers: 5, QueueSize: 100, }, + AttributesLimits: AttributesLimits{ + FetchKeysInterval: 10 * time.Minute, + MaxDistinctValues: 25000, + }, }) } diff --git a/exporter/clickhousetracesexporter/factory.go b/exporter/clickhousetracesexporter/factory.go index 9784bd4d..14d9543e 100644 --- a/exporter/clickhousetracesexporter/factory.go +++ b/exporter/clickhousetracesexporter/factory.go @@ -16,6 +16,7 @@ package clickhousetracesexporter import ( "context" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configretry" @@ -35,6 +36,10 @@ func createDefaultConfig() component.Config { TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(), QueueConfig: exporterhelper.NewDefaultQueueConfig(), BackOffConfig: configretry.NewDefaultBackOffConfig(), + AttributesLimits: AttributesLimits{ + FetchKeysInterval: 10 * time.Minute, + MaxDistinctValues: 25000, + }, } } diff --git a/exporter/clickhousetracesexporter/options.go b/exporter/clickhousetracesexporter/options.go index 1ef7b14d..6fc84523 100644 --- a/exporter/clickhousetracesexporter/options.go +++ b/exporter/clickhousetracesexporter/options.go @@ -16,6 +16,7 @@ package clickhousetracesexporter import ( "context" + "time" "github.com/ClickHouse/clickhouse-go/v2" "github.com/google/uuid" @@ -30,6 +31,7 @@ const ( defaultErrorTable string = "distributed_signoz_error_index_v2" defaultSpansTable string = "distributed_signoz_spans" defaultAttributeTable string = "distributed_span_attributes" + defaultAttributeTableV2 string = "distributed_tag_attributes_v2" defaultAttributeKeyTable string = "distributed_span_attributes_keys" DefaultDurationSortTable string = "durationSort" DefaultDurationSortMVTable string = "durationSortMV" @@ -55,6 +57,7 @@ type namespaceConfig struct { SpansTable string ErrorTable string AttributeTable string + AttributeTableV2 string AttributeKeyTable string DurationSortTable string DurationSortMVTable string @@ -69,6 +72,8 @@ type namespaceConfig struct { UseNewSchema bool IndexTableV3 string ResourceTableV3 string + MaxDistinctValues int + FetchKeysInterval time.Duration } // Connecto defines how to connect to the database @@ -129,6 +134,7 @@ func NewOptions(exporterId uuid.UUID, config Config, primaryNamespace string, us ErrorTable: defaultErrorTable, SpansTable: defaultSpansTable, AttributeTable: defaultAttributeTable, + AttributeTableV2: defaultAttributeTableV2, AttributeKeyTable: defaultAttributeKeyTable, DurationSortTable: DefaultDurationSortTable, DurationSortMVTable: DefaultDurationSortMVTable, @@ -143,6 +149,8 @@ func NewOptions(exporterId uuid.UUID, config Config, primaryNamespace string, us UseNewSchema: useNewSchema, IndexTableV3: defaultIndexTableV3, ResourceTableV3: defaultResourceTableV3, + MaxDistinctValues: config.AttributesLimits.MaxDistinctValues, + FetchKeysInterval: config.AttributesLimits.FetchKeysInterval, }, others: make(map[string]*namespaceConfig, len(otherNamespaces)), } @@ -150,17 +158,20 @@ func NewOptions(exporterId uuid.UUID, config Config, primaryNamespace string, us for _, namespace := range otherNamespaces { if namespace == archiveNamespace { options.others[namespace] = &namespaceConfig{ - namespace: namespace, - Datasource: datasource, - OperationsTable: "", - IndexTable: "", - SpansTable: defaultArchiveSpansTable, - Encoding: defaultEncoding, - Connector: defaultConnector, - ExporterId: exporterId, - UseNewSchema: useNewSchema, - IndexTableV3: defaultIndexTableV3, - ResourceTableV3: defaultResourceTableV3, + namespace: namespace, + Datasource: datasource, + OperationsTable: "", + IndexTable: "", + SpansTable: defaultArchiveSpansTable, + Encoding: defaultEncoding, + Connector: defaultConnector, + ExporterId: exporterId, + UseNewSchema: useNewSchema, + IndexTableV3: defaultIndexTableV3, + ResourceTableV3: defaultResourceTableV3, + AttributeTableV2: defaultAttributeTableV2, + MaxDistinctValues: config.AttributesLimits.MaxDistinctValues, + FetchKeysInterval: config.AttributesLimits.FetchKeysInterval, } } else { options.others[namespace] = &namespaceConfig{namespace: namespace} diff --git a/exporter/clickhousetracesexporter/writer.go b/exporter/clickhousetracesexporter/writer.go index 792f7368..4c3b4683 100644 --- a/exporter/clickhousetracesexporter/writer.go +++ b/exporter/clickhousetracesexporter/writer.go @@ -19,11 +19,13 @@ import ( "encoding/json" "fmt" "strings" + "sync/atomic" "time" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/SigNoz/signoz-otel-collector/usage" + "github.com/SigNoz/signoz-otel-collector/utils" "github.com/google/uuid" "github.com/jellydator/ttlcache/v3" "go.opencensus.io/stats" @@ -46,6 +48,14 @@ const ( EncodingProto Encoding = "protobuf" ) +type shouldSkipKey struct { + TagKey string `ch:"tag_key"` + TagType string `ch:"tag_type"` + TagDataType string `ch:"tag_data_type"` + StringCount uint64 `ch:"string_count"` + NumberCount uint64 `ch:"number_count"` +} + // SpanWriter for writing spans to ClickHouse type SpanWriter struct { logger *zap.Logger @@ -55,6 +65,7 @@ type SpanWriter struct { errorTable string spansTable string attributeTable string + attributeTableV2 string attributeKeyTable string encoding Encoding exporterId uuid.UUID @@ -65,6 +76,12 @@ type SpanWriter struct { keysCache *ttlcache.Cache[string, struct{}] rfCache *ttlcache.Cache[string, struct{}] + + shouldSkipKeyValue atomic.Value // stores map[string]shouldSkipKey + + maxDistinctValues int + fetchKeysInterval time.Duration + fetchShouldSkipKeysTicker *time.Ticker } type WriterOptions struct { @@ -75,13 +92,16 @@ type WriterOptions struct { indexTable string errorTable string attributeTable string + attributeTableV2 string attributeKeyTable string encoding Encoding exporterId uuid.UUID - indexTableV3 string - resourceTableV3 string - useNewSchema bool + indexTableV3 string + resourceTableV3 string + useNewSchema bool + maxDistinctValues int + fetchKeysInterval time.Duration } // NewSpanWriter returns a SpanWriter for the database @@ -115,6 +135,7 @@ func NewSpanWriter(options WriterOptions) *SpanWriter { errorTable: options.errorTable, spansTable: options.spansTable, attributeTable: options.attributeTable, + attributeTableV2: options.attributeTableV2, attributeKeyTable: options.attributeKeyTable, encoding: options.encoding, exporterId: options.exporterId, @@ -124,11 +145,43 @@ func NewSpanWriter(options WriterOptions) *SpanWriter { useNewSchema: options.useNewSchema, keysCache: keysCache, rfCache: rfCache, + + maxDistinctValues: options.maxDistinctValues, + fetchKeysInterval: options.fetchKeysInterval, + fetchShouldSkipKeysTicker: time.NewTicker(options.fetchKeysInterval), } + go writer.fetchShouldSkipKeys() return writer } +func (e *SpanWriter) fetchShouldSkipKeys() { + for range e.fetchShouldSkipKeysTicker.C { + query := fmt.Sprintf(` + SELECT tag_key, tag_type, tag_data_type, countDistinct(string_value) as string_count, countDistinct(number_value) as number_count + FROM %s.%s + GROUP BY tag_key, tag_type, tag_data_type + HAVING string_count > %d OR number_count > %d`, e.traceDatabase, e.attributeTableV2, e.maxDistinctValues, e.maxDistinctValues) + + e.logger.Info("fetching should skip keys", zap.String("query", query)) + + keys := []shouldSkipKey{} + + err := e.db.Select(context.Background(), &keys, query) + if err != nil { + e.logger.Error("error while fetching should skip keys", zap.Error(err)) + } + + shouldSkipKeys := make(map[string]shouldSkipKey) + for _, key := range keys { + mapKey := utils.MakeKeyForAttributeKeys(key.TagKey, utils.TagType(key.TagType), utils.TagDataType(key.TagDataType)) + e.logger.Debug("adding to should skip keys", zap.String("key", mapKey), zap.Any("string_count", key.StringCount), zap.Any("number_count", key.NumberCount)) + shouldSkipKeys[mapKey] = key + } + e.shouldSkipKeyValue.Store(shouldSkipKeys) + } +} + func (w *SpanWriter) writeIndexBatch(ctx context.Context, batchSpans []*Span) error { var statement driver.Batch var err error diff --git a/exporter/clickhousetracesexporter/writerV3.go b/exporter/clickhousetracesexporter/writerV3.go index 2aecd89e..60c8ce51 100644 --- a/exporter/clickhousetracesexporter/writerV3.go +++ b/exporter/clickhousetracesexporter/writerV3.go @@ -2,6 +2,7 @@ package clickhousetracesexporter import ( "context" + "errors" "fmt" "strconv" "sync" @@ -137,7 +138,13 @@ func (w *SpanWriter) writeErrorBatchV3(ctx context.Context, batchSpans []*SpanV3 func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3) error { var tagKeyStatement driver.Batch var tagStatement driver.Batch + var tagStatementV2 driver.Batch var err error + var shouldSkipKeys map[string]shouldSkipKey + + if keys := w.shouldSkipKeyValue.Load(); keys != nil { + shouldSkipKeys = keys.(map[string]shouldSkipKey) + } defer func() { if tagKeyStatement != nil { @@ -146,6 +153,9 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3) if tagStatement != nil { _ = tagStatement.Abort() } + if tagStatementV2 != nil { + _ = tagStatementV2.Abort() + } }() tagStatement, err = w.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", w.traceDatabase, w.attributeTable), driver.WithReleaseConnection()) if err != nil { @@ -155,6 +165,10 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3) if err != nil { return fmt.Errorf("could not prepare batch for span attributes key table due to error: %w", err) } + tagStatementV2, err = w.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", w.traceDatabase, w.attributeTableV2), driver.WithReleaseConnection()) + if err != nil { + return fmt.Errorf("could not prepare batch for span attributes table v2 due to error: %w", err) + } // create map of span attributes of key, tagType, dataType and isColumn to avoid duplicates in batch mapOfSpanAttributeKeys := make(map[string]struct{}) @@ -198,6 +212,8 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3) } // add mapOfSpanAttributeKey to map mapOfSpanAttributeKeys[mapOfSpanAttributeKey] = struct{}{} + v2Key := utils.MakeKeyForAttributeKeys(spanAttribute.Key, utils.TagType(spanAttribute.TagType), utils.TagDataType(spanAttribute.DataType)) + unixMilli := (int64(span.StartTimeUnixNano/1e6) / 3600000) * 3600000 if spanAttribute.DataType == "string" { err = tagStatement.Append( @@ -209,6 +225,18 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3) nil, spanAttribute.IsColumn, ) + + if _, ok := shouldSkipKeys[v2Key]; !ok { + err = tagStatementV2.Append( + unixMilli, + spanAttribute.Key, + spanAttribute.TagType, + spanAttribute.DataType, + spanAttribute.StringValue, + nil, + ) + } + } else if spanAttribute.DataType == "float64" { err = tagStatement.Append( time.Unix(0, int64(span.StartTimeUnixNano)), @@ -219,6 +247,16 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3) spanAttribute.NumberValue, spanAttribute.IsColumn, ) + if _, ok = shouldSkipKeys[v2Key]; !ok { + err = tagStatementV2.Append( + unixMilli, + spanAttribute.Key, + spanAttribute.TagType, + spanAttribute.DataType, + nil, + spanAttribute.NumberValue, + ) + } } else if spanAttribute.DataType == "bool" { err = tagStatement.Append( time.Unix(0, int64(span.StartTimeUnixNano)), @@ -229,6 +267,16 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3) nil, spanAttribute.IsColumn, ) + if _, ok = shouldSkipKeys[v2Key]; !ok { + err = tagStatementV2.Append( + unixMilli, + spanAttribute.Key, + spanAttribute.TagType, + spanAttribute.DataType, + nil, + nil, + ) + } } if err != nil { return fmt.Errorf("could not append span to tag Statement batch due to error: %w", err) @@ -237,7 +285,8 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3) } tagStart := time.Now() - err = tagStatement.Send() + err1 := tagStatement.Send() + err2 := tagStatementV2.Send() stats.RecordWithTags(ctx, []tag.Mutator{ tag.Upsert(exporterKey, pipeline.SignalTraces.String()), @@ -245,8 +294,8 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3) }, writeLatencyMillis.M(int64(time.Since(tagStart).Milliseconds())), ) - if err != nil { - return fmt.Errorf("could not write to span attributes table due to error: %w", err) + if err1 != nil || err2 != nil { + return fmt.Errorf("could not write to span attributes table due to error: %w", errors.Join(err1, err2)) } tagKeyStart := time.Now()