diff --git a/cmd/signozschemamigrator/schema_migrator/logs_migrations.go b/cmd/signozschemamigrator/schema_migrator/logs_migrations.go index ba565a8a..48671a62 100644 --- a/cmd/signozschemamigrator/schema_migrator/logs_migrations.go +++ b/cmd/signozschemamigrator/schema_migrator/logs_migrations.go @@ -80,4 +80,55 @@ ORDER BY name ASC`, }, }, }, + { + MigrationID: 1001, + UpItems: []Operation{ + CreateTableOperation{ + Database: "signoz_logs", + Table: "tag_attributes_v2", + Columns: []Column{ + {Name: "unix_milli", Type: ColumnTypeInt64, Codec: "Delta(8), ZSTD(1)"}, + {Name: "tag_key", Type: ColumnTypeString, Codec: "ZSTD(1)"}, + {Name: "tag_type", Type: LowCardinalityColumnType{ColumnTypeString}, Codec: "ZSTD(1)"}, + {Name: "tag_data_type", Type: LowCardinalityColumnType{ColumnTypeString}, Codec: "ZSTD(1)"}, + {Name: "string_value", Type: ColumnTypeString, Codec: "ZSTD(1)"}, + {Name: "number_value", Type: NullableColumnType{ColumnTypeFloat64}, Codec: "ZSTD(1)"}, + }, + Indexes: []Index{ + {Name: "string_value_index", Expression: "string_value", Type: "ngrambf_v1(4, 1024, 3, 0)", Granularity: 1}, + {Name: "number_value_index", Expression: "number_value", Type: "minmax", Granularity: 1}, + }, + Engine: ReplacingMergeTree{ + MergeTree: MergeTree{ + PartitionBy: "toDate(unix_milli / 1000)", + OrderBy: "(tag_key, tag_type, tag_data_type, string_value, number_value)", + TTL: "toDateTime(unix_milli / 1000) + toIntervalSecond(1296000)", + Settings: TableSettings{ + {Name: "index_granularity", Value: "8192"}, + {Name: "ttl_only_drop_parts", Value: "1"}, + {Name: "allow_nullable_key", Value: "1"}, + }, + }, + }, + }, + CreateTableOperation{ + Database: "signoz_logs", + Table: "distributed_tag_attributes_v2", + Columns: []Column{ + {Name: "unix_milli", Type: ColumnTypeInt64, Codec: "Delta(8), ZSTD(1)"}, + {Name: "tag_key", Type: ColumnTypeString, Codec: "ZSTD(1)"}, + {Name: "tag_type", Type: LowCardinalityColumnType{ColumnTypeString}, Codec: "ZSTD(1)"}, + {Name: "tag_data_type", Type: LowCardinalityColumnType{ColumnTypeString}, Codec: "ZSTD(1)"}, + {Name: "string_value", Type: ColumnTypeString, Codec: "ZSTD(1)"}, + {Name: "number_value", Type: NullableColumnType{ColumnTypeFloat64}, Codec: "ZSTD(1)"}, + }, + Engine: Distributed{ + Database: "signoz_logs", + Table: "tag_attributes_v2", + ShardingKey: "cityHash64(rand())", + }, + }, + }, + DownItems: []Operation{}, + }, } diff --git a/exporter/clickhouselogsexporter/config.go b/exporter/clickhouselogsexporter/config.go index 98bdd662..74aff29b 100644 --- a/exporter/clickhouselogsexporter/config.go +++ b/exporter/clickhouselogsexporter/config.go @@ -16,12 +16,18 @@ package clickhouselogsexporter import ( "errors" + "time" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.uber.org/multierr" ) +type AttributesLimits struct { + FetchKeysInterval time.Duration `mapstructure:"fetch_keys_interval" default:"10m"` + MaxDistinctValues int `mapstructure:"max_distinct_values" default:"25000"` +} + // Config defines configuration for ClickHouse exporter. type Config struct { exporterhelper.TimeoutConfig `mapstructure:",squash"` @@ -33,6 +39,8 @@ type Config struct { // For http protocol reference: [mailru/go-clickhouse/#dsn](https://github.com/mailru/go-clickhouse/#dsn). DSN string `mapstructure:"dsn"` UseNewSchema bool `mapstructure:"use_new_schema" default:"false"` + + AttributesLimits AttributesLimits `mapstructure:"attributes_limits"` } var ( diff --git a/exporter/clickhouselogsexporter/config_test.go b/exporter/clickhouselogsexporter/config_test.go index 074e867b..be6000d9 100644 --- a/exporter/clickhouselogsexporter/config_test.go +++ b/exporter/clickhouselogsexporter/config_test.go @@ -63,6 +63,10 @@ func TestLoadConfig(t *testing.T) { NumConsumers: 10, QueueSize: 100, }, + AttributesLimits: AttributesLimits{ + FetchKeysInterval: 10 * time.Minute, + MaxDistinctValues: 25000, + }, }) defaultCfg.(*Config).UseNewSchema = true diff --git a/exporter/clickhouselogsexporter/exporter.go b/exporter/clickhouselogsexporter/exporter.go index ac6f958d..18ca0260 100644 --- a/exporter/clickhouselogsexporter/exporter.go +++ b/exporter/clickhouselogsexporter/exporter.go @@ -23,6 +23,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/ClickHouse/clickhouse-go/v2" @@ -36,6 +37,7 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pipeline" @@ -45,6 +47,7 @@ import ( const ( DISTRIBUTED_LOGS_TABLE = "distributed_logs" DISTRIBUTED_TAG_ATTRIBUTES = "distributed_tag_attributes" + DISTRIBUTED_TAG_ATTRIBUTES_V2 = "distributed_tag_attributes_v2" DISTRIBUTED_LOGS_TABLE_V2 = "distributed_logs_v2" DISTRIBUTED_LOGS_RESOURCE_V2 = "distributed_logs_v2_resource" DISTRIBUTED_LOGS_ATTRIBUTE_KEYS = "distributed_logs_attribute_keys" @@ -52,6 +55,14 @@ const ( DISTRIBUTED_LOGS_RESOURCE_V2_SECONDS = 1800 ) +type shouldSkipKey struct { + TagKey string `ch:"tag_key"` + TagType string `ch:"tag_type"` + TagDataType string `ch:"tag_data_type"` + StringCount uint64 `ch:"string_count"` + NumberCount uint64 `ch:"number_count"` +} + type clickhouseLogsExporter struct { id uuid.UUID db clickhouse.Conn @@ -70,6 +81,11 @@ type clickhouseLogsExporter struct { keysCache *ttlcache.Cache[string, struct{}] rfCache *ttlcache.Cache[string, struct{}] + + shouldSkipKeyValue atomic.Value // stores map[string]shouldSkipKey + maxDistinctValues int + fetchKeysInterval time.Duration + fetchShouldSkipKeysTicker *time.Ticker } func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, error) { @@ -120,25 +136,63 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro go rfCache.Start() return &clickhouseLogsExporter{ - id: id, - db: client, - insertLogsSQL: insertLogsSQL, - insertLogsSQLV2: insertLogsSQLV2, - logger: logger, - cfg: cfg, - usageCollector: collector, - wg: new(sync.WaitGroup), - closeChan: make(chan struct{}), - useNewSchema: cfg.UseNewSchema, - keysCache: keysCache, - rfCache: rfCache, + id: id, + db: client, + insertLogsSQL: insertLogsSQL, + insertLogsSQLV2: insertLogsSQLV2, + logger: logger, + cfg: cfg, + usageCollector: collector, + wg: new(sync.WaitGroup), + closeChan: make(chan struct{}), + useNewSchema: cfg.UseNewSchema, + keysCache: keysCache, + rfCache: rfCache, + maxDistinctValues: cfg.AttributesLimits.MaxDistinctValues, + fetchKeysInterval: cfg.AttributesLimits.FetchKeysInterval, }, nil } +func (e *clickhouseLogsExporter) Start(ctx context.Context, host component.Host) error { + e.fetchShouldSkipKeysTicker = time.NewTicker(e.fetchKeysInterval) + go e.fetchShouldSkipKeys() + return nil +} + +func (e *clickhouseLogsExporter) fetchShouldSkipKeys() { + for range e.fetchShouldSkipKeysTicker.C { + query := fmt.Sprintf(` + SELECT tag_key, tag_type, tag_data_type, countDistinct(string_value) as string_count, countDistinct(number_value) as number_count + FROM %s.%s + GROUP BY tag_key, tag_type, tag_data_type + HAVING string_count > %d OR number_count > %d`, databaseName, DISTRIBUTED_TAG_ATTRIBUTES_V2, e.maxDistinctValues, e.maxDistinctValues) + + e.logger.Info("fetching should skip keys", zap.String("query", query)) + + keys := []shouldSkipKey{} + + err := e.db.Select(context.Background(), &keys, query) + if err != nil { + e.logger.Error("error while fetching should skip keys", zap.Error(err)) + } + + shouldSkipKeys := make(map[string]shouldSkipKey) + for _, key := range keys { + mapKey := utils.MakeKeyForAttributeKeys(key.TagKey, utils.TagType(key.TagType), utils.TagDataType(key.TagDataType)) + e.logger.Debug("adding to should skip keys", zap.String("key", mapKey), zap.Any("string_count", key.StringCount), zap.Any("number_count", key.NumberCount)) + shouldSkipKeys[mapKey] = key + } + e.shouldSkipKeyValue.Store(shouldSkipKeys) + } +} + // Shutdown will shutdown the exporter. func (e *clickhouseLogsExporter) Shutdown(_ context.Context) error { close(e.closeChan) e.wg.Wait() + if e.fetchShouldSkipKeysTicker != nil { + e.fetchShouldSkipKeysTicker.Stop() + } if e.usageCollector != nil { e.usageCollector.Stop() } @@ -236,14 +290,6 @@ func tsBucket(ts int64, bucketSize int64) int64 { return (int64(ts) / int64(bucketSize)) * int64(bucketSize) } -func makeKeyForRFCache(bucketTs int64, fingerprint string) string { - var v strings.Builder - v.WriteString(strconv.Itoa(int(bucketTs))) - v.WriteString(":") - v.WriteString(fingerprint) - return v.String() -} - func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.Logs) error { resourcesSeen := map[int64]map[string]string{} @@ -251,10 +297,16 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L var insertResourcesStmtV2 driver.Batch var statement driver.Batch var tagStatement driver.Batch + var tagStatementV2 driver.Batch var attributeKeysStmt driver.Batch var resourceKeysStmt driver.Batch var err error + var shouldSkipKeys map[string]shouldSkipKey + if e.shouldSkipKeyValue.Load() != nil { + shouldSkipKeys = e.shouldSkipKeyValue.Load().(map[string]shouldSkipKey) + } + defer func() { if statement != nil { _ = statement.Abort() @@ -281,9 +333,9 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L return errors.New("shutdown has been called") default: start := time.Now() - chLen := 4 + chLen := 5 if !e.useNewSchema { - chLen = 5 + chLen = 6 statement, err = e.db.PrepareBatch(ctx, e.insertLogsSQL, driver.WithReleaseConnection()) if err != nil { return fmt.Errorf("PrepareBatch:%w", err) @@ -295,6 +347,11 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L return fmt.Errorf("PrepareTagBatch:%w", err) } + tagStatementV2, err = e.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", databaseName, DISTRIBUTED_TAG_ATTRIBUTES_V2), driver.WithReleaseConnection()) + if err != nil { + return fmt.Errorf("PrepareTagBatchV2:%w", err) + } + attributeKeysStmt, err = e.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", databaseName, DISTRIBUTED_LOGS_ATTRIBUTE_KEYS), driver.WithReleaseConnection()) if err != nil { return fmt.Errorf("PrepareAttributeKeysBatch:%w", err) @@ -328,7 +385,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L } resourceJson := string(serializedRes) - err = e.addAttrsToTagStatement(tagStatement, attributeKeysStmt, resourceKeysStmt, utils.TagTypeResource, resources, e.useNewSchema) + err = e.addAttrsToTagStatement(tagStatement, tagStatementV2, attributeKeysStmt, resourceKeysStmt, utils.TagTypeResource, resources, e.useNewSchema, shouldSkipKeys) if err != nil { return err } @@ -344,7 +401,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L scopeAttributes := attributesToSlice(scope.Attributes(), true) scopeMap := attributesToMap(scope.Attributes(), true) - err := e.addAttrsToTagStatement(tagStatement, attributeKeysStmt, resourceKeysStmt, utils.TagTypeScope, scopeAttributes, e.useNewSchema) + err := e.addAttrsToTagStatement(tagStatement, tagStatementV2, attributeKeysStmt, resourceKeysStmt, utils.TagTypeScope, scopeAttributes, e.useNewSchema, shouldSkipKeys) if err != nil { return err } @@ -388,7 +445,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L attributes := attributesToSlice(r.Attributes(), false) attrsMap := attributesToMap(r.Attributes(), false) - err = e.addAttrsToTagStatement(tagStatement, attributeKeysStmt, resourceKeysStmt, utils.TagTypeAttribute, attributes, e.useNewSchema) + err = e.addAttrsToTagStatement(tagStatement, tagStatementV2, attributeKeysStmt, resourceKeysStmt, utils.TagTypeAttribute, attributes, e.useNewSchema, shouldSkipKeys) if err != nil { return err } @@ -467,7 +524,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L for bucketTs, resources := range resourcesSeen { for resourceLabels, fingerprint := range resources { // if a resource fingerprint is seen in the bucket already, skip inserting it again. - key := makeKeyForRFCache(bucketTs, fingerprint) + key := utils.MakeKeyForRFCache(bucketTs, fingerprint) if e.rfCache.Get(key) != nil { e.logger.Debug("resource fingerprint already present in cache, skipping", zap.String("key", key)) continue @@ -493,6 +550,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L go send(insertResourcesStmtV2, DISTRIBUTED_LOGS_RESOURCE_V2, chDuration, chErr, &wg) go send(attributeKeysStmt, DISTRIBUTED_LOGS_ATTRIBUTE_KEYS, chDuration, chErr, &wg) go send(resourceKeysStmt, DISTRIBUTED_LOGS_RESOURCE_KEYS, chDuration, chErr, &wg) + go send(tagStatementV2, DISTRIBUTED_TAG_ATTRIBUTES_V2, chDuration, chErr, &wg) wg.Wait() close(chErr) @@ -579,10 +637,6 @@ func getStringifiedBody(body pcommon.Value) string { return strBody } -func makeCacheKeyForAttributeKeys(tagKey string, tagType utils.TagType, tagDataType utils.TagDataType) string { - return fmt.Sprintf("%s:%s:%s", tagKey, tagType, tagDataType) -} - func (e *clickhouseLogsExporter) addAttrsToAttributeKeysStatement( attributeKeysStmt driver.Batch, resourceKeysStmt driver.Batch, @@ -590,7 +644,7 @@ func (e *clickhouseLogsExporter) addAttrsToAttributeKeysStatement( tagType utils.TagType, datatype utils.TagDataType, ) { - cacheKey := makeCacheKeyForAttributeKeys(key, tagType, datatype) + cacheKey := utils.MakeKeyForAttributeKeys(key, tagType, datatype) // skip if the key is already present if item := e.keysCache.Get(cacheKey); item != nil { e.logger.Debug("key already present in cache, skipping", zap.String("key", key)) @@ -614,13 +668,21 @@ func (e *clickhouseLogsExporter) addAttrsToAttributeKeysStatement( func (e *clickhouseLogsExporter) addAttrsToTagStatement( statement driver.Batch, + tagStatementV2 driver.Batch, attributeKeysStmt driver.Batch, resourceKeysStmt driver.Batch, tagType utils.TagType, attrs attributesToSliceResponse, useNewSchema bool, + shouldSkipKeys map[string]shouldSkipKey, ) error { + unixMilli := (time.Now().UnixMilli() / 3600000) * 3600000 for i, v := range attrs.StringKeys { + key := utils.MakeKeyForAttributeKeys(v, tagType, utils.TagDataTypeString) + if _, ok := shouldSkipKeys[key]; ok { + e.logger.Debug("key has been skipped", zap.String("key", key)) + continue + } err := statement.Append( time.Now(), v, @@ -631,9 +693,20 @@ func (e *clickhouseLogsExporter) addAttrsToTagStatement( nil, ) if err != nil { - return fmt.Errorf("could not append string attribute to batch, err: %s", err) + return fmt.Errorf("could not append string attribute to batch, err: %w", err) } e.addAttrsToAttributeKeysStatement(attributeKeysStmt, resourceKeysStmt, v, tagType, utils.TagDataTypeString) + err = tagStatementV2.Append( + unixMilli, + v, + tagType, + utils.TagDataTypeString, + attrs.StringValues[i], + nil, + ) + if err != nil { + return fmt.Errorf("could not append string attribute to batch, err: %w", err) + } } intTypeName := "int64" @@ -641,6 +714,12 @@ func (e *clickhouseLogsExporter) addAttrsToTagStatement( intTypeName = "float64" } for i, v := range attrs.IntKeys { + key := utils.MakeKeyForAttributeKeys(v, tagType, utils.TagDataTypeNumber) + if _, ok := shouldSkipKeys[key]; ok { + e.logger.Debug("key has been skipped", zap.String("key", key)) + continue + } + err := statement.Append( time.Now(), v, @@ -651,11 +730,27 @@ func (e *clickhouseLogsExporter) addAttrsToTagStatement( attrs.IntValues[i], ) if err != nil { - return fmt.Errorf("could not append number attribute to batch, err: %s", err) + return fmt.Errorf("could not append number attribute to batch, err: %w", err) } e.addAttrsToAttributeKeysStatement(attributeKeysStmt, resourceKeysStmt, v, tagType, utils.TagDataTypeNumber) + err = tagStatementV2.Append( + unixMilli, + v, + tagType, + utils.TagDataTypeNumber, + nil, + attrs.IntValues[i], + ) + if err != nil { + return fmt.Errorf("could not append number attribute to batch, err: %w", err) + } } for i, v := range attrs.FloatKeys { + key := utils.MakeKeyForAttributeKeys(v, tagType, utils.TagDataTypeNumber) + if _, ok := shouldSkipKeys[key]; ok { + e.logger.Debug("key has been skipped", zap.String("key", key)) + continue + } err := statement.Append( time.Now(), v, @@ -666,11 +761,27 @@ func (e *clickhouseLogsExporter) addAttrsToTagStatement( attrs.FloatValues[i], ) if err != nil { - return fmt.Errorf("could not append number attribute to batch, err: %s", err) + return fmt.Errorf("could not append number attribute to batch, err: %w", err) } e.addAttrsToAttributeKeysStatement(attributeKeysStmt, resourceKeysStmt, v, tagType, utils.TagDataTypeNumber) + err = tagStatementV2.Append( + unixMilli, + v, + tagType, + utils.TagDataTypeNumber, + nil, + attrs.FloatValues[i], + ) + if err != nil { + return fmt.Errorf("could not append number attribute to batch, err: %w", err) + } } for _, v := range attrs.BoolKeys { + key := utils.MakeKeyForAttributeKeys(v, tagType, utils.TagDataTypeBool) + if _, ok := shouldSkipKeys[key]; ok { + e.logger.Debug("key has been skipped", zap.String("key", key)) + continue + } err := statement.Append( time.Now(), v, @@ -681,9 +792,20 @@ func (e *clickhouseLogsExporter) addAttrsToTagStatement( nil, ) if err != nil { - return fmt.Errorf("could not append bool attribute to batch, err: %s", err) + return fmt.Errorf("could not append bool attribute to batch, err: %w", err) } e.addAttrsToAttributeKeysStatement(attributeKeysStmt, resourceKeysStmt, v, tagType, utils.TagDataTypeBool) + err = tagStatementV2.Append( + unixMilli, + v, + tagType, + utils.TagDataTypeBool, + nil, + nil, + ) + if err != nil { + return fmt.Errorf("could not append bool attribute to batch, err: %w", err) + } } return nil } diff --git a/exporter/clickhouselogsexporter/factory.go b/exporter/clickhouselogsexporter/factory.go index 798c1a6d..59809bee 100644 --- a/exporter/clickhouselogsexporter/factory.go +++ b/exporter/clickhouselogsexporter/factory.go @@ -17,6 +17,7 @@ package clickhouselogsexporter import ( "context" "fmt" + "time" "go.opencensus.io/stats" "go.opencensus.io/stats/view" @@ -68,6 +69,10 @@ func createDefaultConfig() component.Config { TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(), QueueConfig: exporterhelper.NewDefaultQueueConfig(), BackOffConfig: configretry.NewDefaultBackOffConfig(), + AttributesLimits: AttributesLimits{ + FetchKeysInterval: 10 * time.Minute, + MaxDistinctValues: 25000, + }, } } @@ -89,6 +94,7 @@ func createLogsExporter( set, cfg, exporter.pushLogsData, + exporterhelper.WithStart(exporter.Start), exporterhelper.WithShutdown(exporter.Shutdown), exporterhelper.WithTimeout(c.TimeoutConfig), exporterhelper.WithQueue(c.QueueConfig), diff --git a/utils/utils.go b/utils/utils.go index 7161d753..db3e6f7c 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -59,3 +59,13 @@ func MakeKeyForRFCache(bucketTs int64, fingerprint string) string { v.WriteString(fingerprint) return v.String() } + +func MakeKeyForAttributeKeys(tagKey string, tagType TagType, tagDataType TagDataType) string { + var key strings.Builder + key.WriteString(tagKey) + key.WriteString(":") + key.WriteString(string(tagType)) + key.WriteString(":") + key.WriteString(string(tagDataType)) + return key.String() +}