Skip to content

Commit

Permalink
Revert "refactor tracing" (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
vm-001 authored Dec 23, 2024
1 parent 510d952 commit 3d39528
Show file tree
Hide file tree
Showing 19 changed files with 99 additions and 97 deletions.
9 changes: 5 additions & 4 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ metrics:
attributes: # global attributes for each metric
env: prod
#exports: [ opentelemetry ] # list of enabled vendor exports. supported value are opentelemetry
push_interval: 10 # interval(in seconds) at which metrics are sent to the OpenTelemetry Collector
opentelemetry:
push_interval: 10 # interval(in seconds) at which metrics are sent to the OpenTelemetry Collector
protocol: http/protobuf # supported value are http/protobuf, grpc
endpoint: http://localhost:4318/v1/metrics # http/protobuf(http://localhost:4318/v1/metrics), grpc(localhost:4317)

Expand All @@ -83,9 +83,10 @@ metrics:
#------------------------------------------------------------------------------
tracing:
enabled: false
attributes: # global attributes for each trace
attributes: # global attributes for each trace
env: prod
sampling_rate: 1.0
opentelemetry:
protocol: http/protobuf # supported value are http/protobuf, grpc
endpoint: http://localhost:4318/v1/traces # http/protobuf(http://localhost:4318/v1/traces), grpc(localhost:4317)
protocol: http/protobuf # supported value are http/protobuf, grpc
endpoint: http://localhost:4318/v1/traces # http/protobuf(http://localhost:4318/v1/traces), grpc(localhost:4317)

3 changes: 1 addition & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (cfg Config) Validate() error {
if err := cfg.Metrics.Validate(); err != nil {
return err
}

if err := cfg.Tracing.Validate(); err != nil {
return err
}
Expand All @@ -74,7 +75,6 @@ func Init() (*Config, error) {
if err != nil {
return nil, err
}

return &cfg, nil
}

Expand All @@ -98,6 +98,5 @@ func InitWithFile(filename string) (*Config, error) {
if err != nil {
return nil, err
}

return &cfg, nil
}
40 changes: 20 additions & 20 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,47 +172,47 @@ func TestMetricsConfig(t *testing.T) {
{
desc: "sanity",
cfg: MetricsConfig{
Attributes: nil,
Exports: nil,
PushInterval: 1,
Opentelemetry: Opentelemetry{
Protocol: "http/protobuf",
Attributes: nil,
Exports: nil,
OpenTelemetry: Opentelemetry{
PushInterval: 1,
Protocol: "http/protobuf",
},
},
expectedValidateErr: nil,
},
{
desc: "invalid export",
cfg: MetricsConfig{
Attributes: nil,
Exports: []Export{"unknown"},
PushInterval: 1,
Opentelemetry: Opentelemetry{
Protocol: "http/protobuf",
Attributes: nil,
Exports: []Export{"unknown"},
OpenTelemetry: Opentelemetry{
PushInterval: 1,
Protocol: "http/protobuf",
},
},
expectedValidateErr: errors.New("invalid export: unknown"),
},
{
desc: "invalid export",
cfg: MetricsConfig{
Attributes: nil,
Exports: nil,
PushInterval: 1,
Opentelemetry: Opentelemetry{
Protocol: "unknown",
Attributes: nil,
Exports: nil,
OpenTelemetry: Opentelemetry{
PushInterval: 1,
Protocol: "unknown",
},
},
expectedValidateErr: errors.New("invalid protocol: unknown"),
},
{
desc: "invalid PushInterval",
cfg: MetricsConfig{
Attributes: nil,
Exports: nil,
PushInterval: 61,
Opentelemetry: Opentelemetry{
Protocol: "http/protobuf",
Attributes: nil,
Exports: nil,
OpenTelemetry: Opentelemetry{
PushInterval: 61,
Protocol: "http/protobuf",
},
},
expectedValidateErr: errors.New("interval must be in the range [1, 60]"),
Expand Down
31 changes: 25 additions & 6 deletions config/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,18 @@ import (
type MetricsConfig struct {
Attributes Map `yaml:"attributes" envconfig:"ATTRIBUTES"`
Exports []Export `yaml:"exports" envconfig:"EXPORTS"`
PushInterval uint32 `yaml:"push_interval" default:"10"`
Opentelemetry Opentelemetry `yaml:"opentelemetry"`
OpenTelemetry Opentelemetry `yaml:"opentelemetry" envconfig:"OPENTELEMETRY"`
}

func (cfg *MetricsConfig) Validate() error {
if err := cfg.Opentelemetry.Validate(); err != nil {
if err := cfg.OpenTelemetry.Validate(); err != nil {
return err
}
for _, export := range cfg.Exports {
if !slices.Contains([]Export{ExportOpenTelemetry}, export) {
return fmt.Errorf("invalid export: %s", export)
}
}
if cfg.PushInterval < 1 || cfg.PushInterval > 60 {
return fmt.Errorf("interval must be in the range [1, 60]")
}
return nil
}

Expand All @@ -32,3 +28,26 @@ type Export string
const (
ExportOpenTelemetry Export = "opentelemetry"
)

type OtlpProtocol string

const (
OtlpProtocolGRPC OtlpProtocol = "grpc"
OtlpProtocolHTTP OtlpProtocol = "http/protobuf"
)

type Opentelemetry struct {
PushInterval uint32 `yaml:"push_interval" default:"10"`
Protocol OtlpProtocol `yaml:"protocol" envconfig:"PROTOCOL" default:"http/protobuf"`
Endpoint string `yaml:"endpoint" envconfig:"ENDPOINT" default:"http://localhost:4318/v1/metrics"`
}

func (cfg Opentelemetry) Validate() error {
if cfg.PushInterval < 1 || cfg.PushInterval > 60 {
return fmt.Errorf("interval must be in the range [1, 60]")
}
if !slices.Contains([]OtlpProtocol{OtlpProtocolGRPC, OtlpProtocolHTTP}, cfg.Protocol) {
return fmt.Errorf("invalid protocol: %s", cfg.Protocol)
}
return nil
}
25 changes: 0 additions & 25 deletions config/opentelemetry.go

This file was deleted.

3 changes: 0 additions & 3 deletions config/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,5 @@ func (cfg TracingConfig) Validate() error {
if cfg.SamplingRate > 1 || cfg.SamplingRate < 0 {
return errors.New("sampling_rate must be in the range [0, 1]")
}
if err := cfg.Opentelemetry.Validate(); err != nil {
return err
}
return nil
}
5 changes: 3 additions & 2 deletions db/dao/attempt_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package dao
import (
"context"
"fmt"
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
sq "github.com/Masterminds/squirrel"
"github.com/webhookx-io/webhookx/constants"
"github.com/webhookx-io/webhookx/db/entities"
"github.com/webhookx-io/webhookx/pkg/tracing"
Expand Down Expand Up @@ -64,8 +64,9 @@ func (dao *attemptDao) UpdateStatusBatch(ctx context.Context, status entities.At
}

func (dao *attemptDao) UpdateErrorCode(ctx context.Context, id string, status entities.AttemptStatus, code entities.AttemptErrorCode) error {
ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.updateErrorCode", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
tracingCtx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.updateErrorCode", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
ctx = tracingCtx

_, err := dao.update(ctx, id, map[string]interface{}{
"status": status,
Expand Down
4 changes: 2 additions & 2 deletions db/dao/attempt_detail_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ func NewAttemptDetailDao(db *sqlx.DB, workspace bool) AttemptDetailDAO {
}

func (dao *attemptDetailDao) Upsert(ctx context.Context, attemptDetail *entities.AttemptDetail) error {
ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.upsert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
tracingCtx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.upsert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

ctx = tracingCtx
now := time.Now()
values := []interface{}{attemptDetail.ID, attemptDetail.RequestHeaders, attemptDetail.RequestBody, attemptDetail.ResponseHeaders, attemptDetail.ResponseBody, now, now, attemptDetail.WorkspaceId}

Expand Down
34 changes: 17 additions & 17 deletions db/dao/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"encoding/json"
"errors"
"fmt"
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
sq "github.com/Masterminds/squirrel"
"github.com/webhookx-io/webhookx/config"
"github.com/webhookx-io/webhookx/constants"
"github.com/webhookx-io/webhookx/db/errs"
Expand Down Expand Up @@ -93,9 +93,9 @@ func (dao *DAO[T]) UnsafeDB(ctx context.Context) Queryable {
}

func (dao *DAO[T]) Get(ctx context.Context, id string) (entity *T, err error) {
ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.get", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
tracingCtx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.get", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

ctx = tracingCtx
builder := psql.Select("*").From(dao.opts.Table).Where(sq.Eq{"id": id})
if dao.workspace {
wid := ucontext.GetWorkspaceID(ctx)
Expand Down Expand Up @@ -128,9 +128,9 @@ func (dao *DAO[T]) selectByField(ctx context.Context, field string, value string
}

func (dao *DAO[T]) Delete(ctx context.Context, id string) (bool, error) {
ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.delete", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
tracingCtx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.delete", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

ctx = tracingCtx
builder := psql.Delete(dao.opts.Table).Where(sq.Eq{"id": id})
if dao.workspace {
wid := ucontext.GetWorkspaceID(ctx)
Expand All @@ -153,9 +153,9 @@ func (dao *DAO[T]) Delete(ctx context.Context, id string) (bool, error) {
}

func (dao *DAO[T]) Page(ctx context.Context, q query.Queryer) (list []*T, total int64, err error) {
ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.page", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
tracingCtx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.page", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

ctx = tracingCtx
total, err = dao.Count(ctx, q.WhereMap())
if err != nil {
return
Expand All @@ -165,9 +165,9 @@ func (dao *DAO[T]) Page(ctx context.Context, q query.Queryer) (list []*T, total
}

func (dao *DAO[T]) Count(ctx context.Context, where map[string]interface{}) (total int64, err error) {
ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.count", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
tracingCtx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.count", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

ctx = tracingCtx
builder := psql.Select("COUNT(*)").From(dao.opts.Table)
if len(where) > 0 {
builder = builder.Where(where)
Expand All @@ -183,9 +183,9 @@ func (dao *DAO[T]) Count(ctx context.Context, where map[string]interface{}) (tot
}

func (dao *DAO[T]) List(ctx context.Context, q query.Queryer) (list []*T, err error) {
ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.list", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
tracingCtx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.list", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

ctx = tracingCtx
builder := psql.Select("*").From(dao.opts.Table)
where := q.WhereMap()
if len(where) > 0 {
Expand Down Expand Up @@ -230,9 +230,9 @@ func travel(entity interface{}, fn func(field reflect.StructField, value reflect
}

func (dao *DAO[T]) Insert(ctx context.Context, entity *T) error {
ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.insert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
tracingCtx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.insert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

ctx = tracingCtx
columns := make([]string, 0)
values := make([]interface{}, 0)
travel(entity, func(f reflect.StructField, v reflect.Value) {
Expand All @@ -257,9 +257,9 @@ func (dao *DAO[T]) Insert(ctx context.Context, entity *T) error {
}

func (dao *DAO[T]) BatchInsert(ctx context.Context, entities []*T) error {
ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.batch_insert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
tracingCtx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.batch_insert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

ctx = tracingCtx
if len(entities) == 0 {
return nil
}
Expand Down Expand Up @@ -328,9 +328,9 @@ func (dao *DAO[T]) update(ctx context.Context, id string, maps map[string]interf
}

func (dao *DAO[T]) Update(ctx context.Context, entity *T) error {
ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.update", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
tracingCtx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.update", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

ctx = tracingCtx
var id string
builder := psql.Update(dao.opts.Table)
travel(entity, func(f reflect.StructField, v reflect.Value) {
Expand Down
3 changes: 2 additions & 1 deletion db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ func (db *DB) Ping() error {
}

func (db *DB) TX(ctx context.Context, fn func(ctx context.Context) error) error {
ctx, span := tracing.Start(ctx, "db.transaction", trace.WithSpanKind(trace.SpanKindServer))
tracingCtx, span := tracing.Start(ctx, "db.transaction", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
ctx = tracingCtx

tx, err := db.DB.Beginx()
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ func (d *Dispatcher) DispatchBatch(ctx context.Context, events []*entities.Event
}

func (d *Dispatcher) dispatchBatch(ctx context.Context, events []*entities.Event) (int, error) {
ctx, span := tracing.Start(ctx, "dispatcher.dispatch", trace.WithSpanKind(trace.SpanKindServer))
tracingCtx, span := tracing.Start(ctx, "dispatcher.dispatch", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
ctx = tracingCtx

if len(events) == 0 {
return 0, nil
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ services:
WEBHOOKX_PROXY_LISTEN: 0.0.0.0:8081
WEBHOOKX_PROXY_QUEUE_REDIS_HOST: redis
WEBHOOKX_PROXY_QUEUE_REDIS_PORT: 6379

ports:
- "8080:8080"
- "8081:8081"
Expand Down
4 changes: 2 additions & 2 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func New(cfg config.MetricsConfig) (*Metrics, error) {
}

if len(cfg.Exports) > 0 {
m.Interval = time.Second * time.Duration(cfg.PushInterval)
err := SetupOpentelemetry(m.ctx, cfg.Attributes, cfg.Opentelemetry, m)
m.Interval = time.Second * time.Duration(cfg.OpenTelemetry.PushInterval)
err := SetupOpentelemetry(m.ctx, cfg.Attributes, cfg.OpenTelemetry, m)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/metrics/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/semconv/v1.26.0"
"time"
)

const (
Expand Down Expand Up @@ -62,7 +63,7 @@ func SetupOpentelemetry(ctx context.Context, attributes map[string]string, cfg c
}

opts := []metric.PeriodicReaderOption{
metric.WithInterval(metrics.Interval),
metric.WithInterval(time.Second * time.Duration(cfg.PushInterval)),
}

meterProvider := metric.NewMeterProvider(
Expand Down
Loading

0 comments on commit 3d39528

Please sign in to comment.