Skip to content

Commit

Permalink
Move from OpenCensus to OTel API (#452)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv authored Dec 23, 2024
1 parent 9966c2a commit cc3d7be
Show file tree
Hide file tree
Showing 19 changed files with 588 additions and 391 deletions.
33 changes: 25 additions & 8 deletions exporter/clickhouselogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ import (
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -77,6 +80,8 @@ type clickhouseLogsExporter struct {
wg *sync.WaitGroup
closeChan chan struct{}

durationHistogram metric.Float64Histogram

useNewSchema bool

keysCache *ttlcache.Cache[string, struct{}]
Expand All @@ -88,7 +93,10 @@ type clickhouseLogsExporter struct {
fetchShouldSkipKeysTicker *time.Ticker
}

func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, error) {
func newExporter(set exporter.Settings, cfg *Config) (*clickhouseLogsExporter, error) {
logger := set.Logger
meter := set.MeterProvider.Meter("github.com/SigNoz/signoz-otel-collector/exporter/clickhouselogsexporter")

if err := cfg.Validate(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -118,6 +126,14 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro
return nil, err
}

durationHistogram, err := meter.Float64Histogram(
"exporter_db_write_latency",
metric.WithUnit("ms"),
metric.WithExplicitBucketBoundaries(250, 500, 750, 1000, 2000, 2500, 3000, 4000, 5000, 6000, 8000, 10000, 15000, 25000, 30000),
)
if err != nil {
return nil, err
}
// keys cache is used to avoid duplicate inserts for the same attribute key.
keysCache := ttlcache.New[string, struct{}](
ttlcache.WithTTL[string, struct{}](240*time.Minute),
Expand All @@ -136,7 +152,6 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro
go rfCache.Start()

return &clickhouseLogsExporter{
id: id,
db: client,
insertLogsSQL: insertLogsSQL,
insertLogsSQLV2: insertLogsSQLV2,
Expand All @@ -146,6 +161,7 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
useNewSchema: cfg.UseNewSchema,
durationHistogram: durationHistogram,
keysCache: keysCache,
rfCache: rfCache,
maxDistinctValues: cfg.AttributesLimits.MaxDistinctValues,
Expand Down Expand Up @@ -557,12 +573,13 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
// store the duration for send the data
for i := 0; i < chLen; i++ {
sendDuration := <-chDuration
stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(exporterKey, pipeline.SignalLogs.String()),
tag.Upsert(tableKey, sendDuration.Name),
},
writeLatencyMillis.M(int64(sendDuration.duration.Milliseconds())),
e.durationHistogram.Record(
ctx,
float64(sendDuration.duration.Milliseconds()),
metric.WithAttributes(
attribute.String("table", sendDuration.Name),
attribute.String("exporter", pipeline.SignalLogs.String()),
),
)
}

Expand Down
22 changes: 1 addition & 21 deletions exporter/clickhouselogsexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ import (
"fmt"
"time"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter"
Expand All @@ -37,25 +34,8 @@ const (
tableName = "logs"
)

var (
writeLatencyMillis = stats.Int64("exporter_db_write_latency", "Time taken (in millis) for exporter to write batch", "ms")
exporterKey = tag.MustNewKey("exporter")
tableKey = tag.MustNewKey("table")
)

// NewFactory creates a factory for Elastic exporter.
func NewFactory() exporter.Factory {
writeLatencyDistribution := view.Distribution(100, 250, 500, 750, 1000, 2000, 4000, 8000, 16000, 32000, 64000, 128000, 256000, 512000)

writeLatencyView := &view.View{
Name: "exporter_db_write_latency",
Measure: writeLatencyMillis,
Description: writeLatencyMillis.Description(),
TagKeys: []tag.Key{exporterKey, tableKey},
Aggregation: writeLatencyDistribution,
}

view.Register(writeLatencyView)

return exporter.NewFactory(
component.MustNewType(typeStr),
Expand Down Expand Up @@ -84,7 +64,7 @@ func createLogsExporter(
cfg component.Config,
) (exporter.Logs, error) {
c := cfg.(*Config)
exporter, err := newExporter(set.Logger, c)
exporter, err := newExporter(set, c)
if err != nil {
return nil, fmt.Errorf("cannot configure clickhouse logs exporter: %w", err)
}
Expand Down
100 changes: 67 additions & 33 deletions exporter/clickhousemetricsexporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ import (
"github.com/jellydator/ttlcache/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/sirupsen/logrus"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pipeline"
semconv "go.opentelemetry.io/collector/semconv/v1.13.0"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"

"github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/base"
"github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/utils/timeseries"
Expand All @@ -61,7 +64,7 @@ const (
// clickHouse implements storage interface for the ClickHouse.
type clickHouse struct {
conn clickhouse.Conn
l *logrus.Entry
l *zap.Logger
database string
maxTimeSeriesInQuery int

Expand All @@ -80,6 +83,8 @@ type clickHouse struct {
mWrittenTimeSeries prometheus.Counter

exporterID uuid.UUID

durationHistogram metric.Float64Histogram
}

type ClickHouseParams struct {
Expand All @@ -92,10 +97,13 @@ type ClickHouseParams struct {
WriteTSToV4 bool
DisableV2 bool
ExporterId uuid.UUID
Settings exporter.Settings
}

func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
l := logrus.WithField("component", "clickhouse")

logger := params.Settings.Logger
meter := params.Settings.MeterProvider.Meter("github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter")

options, err := clickhouse.ParseDSN(params.DSN)

Expand Down Expand Up @@ -124,9 +132,19 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
)
go cache.Start()

durationHistogram, err := meter.Float64Histogram(
"exporter_db_write_latency",
metric.WithDescription("Time taken to write data to ClickHouse"),
metric.WithUnit("ms"),
metric.WithExplicitBucketBoundaries(250, 500, 750, 1000, 2000, 2500, 3000, 4000, 5000, 6000, 8000, 10000, 15000, 25000, 30000),
)
if err != nil {
return nil, err
}

ch := &clickHouse{
conn: conn,
l: l,
l: logger,
database: options.Auth.Database,
maxTimeSeriesInQuery: params.MaxTimeSeriesInQuery,
cache: cache,
Expand All @@ -139,10 +157,11 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
Name: "written_time_series",
Help: "Number of written time series.",
}),
watcherInterval: params.WatcherInterval,
writeTSToV4: params.WriteTSToV4,
disableV2: params.DisableV2,
exporterID: params.ExporterId,
watcherInterval: params.WatcherInterval,
writeTSToV4: params.WriteTSToV4,
disableV2: params.DisableV2,
exporterID: params.ExporterId,
durationHistogram: durationHistogram,
}

go func() {
Expand Down Expand Up @@ -176,20 +195,20 @@ func (ch *clickHouse) shardCountWatcher(ctx context.Context) {

ch.timeSeriesRW.Lock()
if ch.prevShardCount != shardCount {
ch.l.Infof("Shard count changed from %d to %d. Resetting time series map.", ch.prevShardCount, shardCount)
ch.l.Info("Shard count changed. Resetting time series map.", zap.Uint64("prev", ch.prevShardCount), zap.Uint64("current", shardCount))
ch.timeSeries = make(map[uint64]struct{})
}
ch.prevShardCount = shardCount
ch.timeSeriesRW.Unlock()
return nil
}()
if err != nil {
ch.l.Error(err)
ch.l.Error("error getting shard count", zap.Error(err))
}

select {
case <-ctx.Done():
ch.l.Warn(ctx.Err())
ch.l.Warn("shard count watcher stopped", zap.Error(ctx.Err()))
return
case <-ticker.C:
}
Expand Down Expand Up @@ -254,7 +273,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
fingerprintToName[f][envLabel] = env
}
if len(fingerprints) != len(timeSeries) {
ch.l.Debugf("got %d fingerprints, but only %d of them were unique time series", len(fingerprints), len(timeSeries))
ch.l.Debug("got fingerprints, but only unique time series", zap.Int("fingerprints", len(fingerprints)), zap.Int("time series", len(timeSeries)))
}

// find new time series
Expand Down Expand Up @@ -299,11 +318,14 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr

start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, pipeline.SignalMetrics.String()),
tag.Upsert(tableKey, DISTRIBUTED_TIME_SERIES_TABLE),
ch.durationHistogram.Record(
ctx,
float64(time.Since(start).Milliseconds()),
metric.WithAttributes(
attribute.String("exporter", pipeline.SignalMetrics.String()),
attribute.String("table", DISTRIBUTED_TIME_SERIES_TABLE),
),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
return err
}()

Expand Down Expand Up @@ -339,11 +361,14 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
}
start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, pipeline.SignalMetrics.String()),
tag.Upsert(tableKey, DISTRIBUTED_SAMPLES_TABLE),
ch.durationHistogram.Record(
ctx,
float64(time.Since(start).Milliseconds()),
metric.WithAttributes(
attribute.String("exporter", pipeline.SignalMetrics.String()),
attribute.String("table", DISTRIBUTED_SAMPLES_TABLE),
),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
return err
}()
if err != nil {
Expand Down Expand Up @@ -396,11 +421,14 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr

start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, pipeline.SignalMetrics.String()),
tag.Upsert(tableKey, DISTRIBUTED_SAMPLES_TABLE_V4),
ch.durationHistogram.Record(
ctx,
float64(time.Since(start).Milliseconds()),
metric.WithAttributes(
attribute.String("exporter", pipeline.SignalMetrics.String()),
attribute.String("table", DISTRIBUTED_SAMPLES_TABLE_V4),
),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
return err
}()

Expand Down Expand Up @@ -451,11 +479,14 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr

start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, pipeline.SignalMetrics.String()),
tag.Upsert(tableKey, DISTRIBUTED_TIME_SERIES_TABLE_V4),
ch.durationHistogram.Record(
ctx,
float64(time.Since(start).Milliseconds()),
metric.WithAttributes(
attribute.String("exporter", pipeline.SignalMetrics.String()),
attribute.String("table", DISTRIBUTED_TIME_SERIES_TABLE_V4),
),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
return err
}()

Expand All @@ -467,7 +498,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
n := len(newTimeSeries)
if n != 0 {
ch.mWrittenTimeSeries.Add(float64(n))
ch.l.Debugf("Wrote %d new time series.", n)
ch.l.Debug("wrote new time series", zap.Int("count", n))
}

err = func() error {
Expand Down Expand Up @@ -540,11 +571,14 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr

start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, pipeline.SignalMetrics.String()),
tag.Upsert(tableKey, DISTRIBUTED_EXP_HIST_TABLE),
ch.durationHistogram.Record(
ctx,
float64(time.Since(start).Milliseconds()),
metric.WithAttributes(
attribute.String("exporter", pipeline.SignalMetrics.String()),
attribute.String("table", DISTRIBUTED_EXP_HIST_TABLE),
),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
return err
}()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions exporter/clickhousemetricsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func NewPrwExporter(cfg *Config, set exporter.Settings) (*PrwExporter, error) {
WriteTSToV4: cfg.WriteTSToV4,
DisableV2: cfg.DisableV2,
ExporterId: id,
Settings: set,
}
ch, err := NewClickHouse(params)
if err != nil {
Expand Down
20 changes: 0 additions & 20 deletions exporter/clickhousemetricsexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import (
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configopaque"
Expand All @@ -36,26 +33,9 @@ const (
typeStr = "clickhousemetricswrite"
)

var (
writeLatencyMillis = stats.Int64("exporter_db_write_latency", "Time taken (in millis) for exporter to write batch", "ms")
exporterKey = tag.MustNewKey("exporter")
tableKey = tag.MustNewKey("table")
)

// NewFactory creates a new Prometheus Remote Write exporter.
func NewFactory() exporter.Factory {

writeLatencyDistribution := view.Distribution(100, 250, 500, 750, 1000, 2000, 4000, 8000, 16000, 32000, 64000, 128000, 256000, 512000)

writeLatencyView := &view.View{
Name: "exporter_db_write_latency",
Measure: writeLatencyMillis,
Description: writeLatencyMillis.Description(),
TagKeys: []tag.Key{exporterKey, tableKey},
Aggregation: writeLatencyDistribution,
}

view.Register(writeLatencyView)
return exporter.NewFactory(
component.MustNewType(typeStr),
createDefaultConfig,
Expand Down
Loading

0 comments on commit cc3d7be

Please sign in to comment.