From 3d39528b8f50c916125e65ac220b7d1859111611 Mon Sep 17 00:00:00 2001 From: Yusheng Li Date: Mon, 23 Dec 2024 23:46:00 +0800 Subject: [PATCH] Revert "refactor tracing" (#73) --- config.yml | 9 ++++---- config/config.go | 3 +-- config/config_test.go | 40 ++++++++++++++++++------------------ config/metrics.go | 31 ++++++++++++++++++++++------ config/opentelemetry.go | 25 ---------------------- config/tracing.go | 3 --- db/dao/attempt_dao.go | 5 +++-- db/dao/attempt_detail_dao.go | 4 ++-- db/dao/dao.go | 34 +++++++++++++++--------------- db/db.go | 3 ++- dispatcher/dispatcher.go | 3 ++- docker-compose.yml | 1 + pkg/metrics/metrics.go | 4 ++-- pkg/metrics/opentelemetry.go | 3 ++- pkg/queue/redis/redis.go | 9 +++++--- pkg/taskqueue/redis.go | 9 +++++--- test/tracing/worker_test.go | 4 ++-- worker/deliverer/http.go | 4 ++-- worker/worker.go | 2 +- 19 files changed, 99 insertions(+), 97 deletions(-) delete mode 100644 config/opentelemetry.go diff --git a/config.yml b/config.yml index 51bf53b..17ac221 100644 --- a/config.yml +++ b/config.yml @@ -73,8 +73,8 @@ metrics: attributes: # global attributes for each metric env: prod #exports: [ opentelemetry ] # list of enabled vendor exports. supported value are opentelemetry - push_interval: 10 # interval(in seconds) at which metrics are sent to the OpenTelemetry Collector opentelemetry: + push_interval: 10 # interval(in seconds) at which metrics are sent to the OpenTelemetry Collector protocol: http/protobuf # supported value are http/protobuf, grpc endpoint: http://localhost:4318/v1/metrics # http/protobuf(http://localhost:4318/v1/metrics), grpc(localhost:4317) @@ -83,9 +83,10 @@ metrics: #------------------------------------------------------------------------------ tracing: enabled: false - attributes: # global attributes for each trace + attributes: # global attributes for each trace env: prod sampling_rate: 1.0 opentelemetry: - protocol: http/protobuf # supported value are http/protobuf, grpc - endpoint: http://localhost:4318/v1/traces # http/protobuf(http://localhost:4318/v1/traces), grpc(localhost:4317) + protocol: http/protobuf # supported value are http/protobuf, grpc + endpoint: http://localhost:4318/v1/traces # http/protobuf(http://localhost:4318/v1/traces), grpc(localhost:4317) + \ No newline at end of file diff --git a/config/config.go b/config/config.go index 2a57269..72c1094 100644 --- a/config/config.go +++ b/config/config.go @@ -58,6 +58,7 @@ func (cfg Config) Validate() error { if err := cfg.Metrics.Validate(); err != nil { return err } + if err := cfg.Tracing.Validate(); err != nil { return err } @@ -74,7 +75,6 @@ func Init() (*Config, error) { if err != nil { return nil, err } - return &cfg, nil } @@ -98,6 +98,5 @@ func InitWithFile(filename string) (*Config, error) { if err != nil { return nil, err } - return &cfg, nil } diff --git a/config/config_test.go b/config/config_test.go index eee9dda..044c3e1 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -172,11 +172,11 @@ func TestMetricsConfig(t *testing.T) { { desc: "sanity", cfg: MetricsConfig{ - Attributes: nil, - Exports: nil, - PushInterval: 1, - Opentelemetry: Opentelemetry{ - Protocol: "http/protobuf", + Attributes: nil, + Exports: nil, + OpenTelemetry: Opentelemetry{ + PushInterval: 1, + Protocol: "http/protobuf", }, }, expectedValidateErr: nil, @@ -184,11 +184,11 @@ func TestMetricsConfig(t *testing.T) { { desc: "invalid export", cfg: MetricsConfig{ - Attributes: nil, - Exports: []Export{"unknown"}, - PushInterval: 1, - Opentelemetry: Opentelemetry{ - Protocol: "http/protobuf", + Attributes: nil, + Exports: []Export{"unknown"}, + OpenTelemetry: Opentelemetry{ + PushInterval: 1, + Protocol: "http/protobuf", }, }, expectedValidateErr: errors.New("invalid export: unknown"), @@ -196,11 +196,11 @@ func TestMetricsConfig(t *testing.T) { { desc: "invalid export", cfg: MetricsConfig{ - Attributes: nil, - Exports: nil, - PushInterval: 1, - Opentelemetry: Opentelemetry{ - Protocol: "unknown", + Attributes: nil, + Exports: nil, + OpenTelemetry: Opentelemetry{ + PushInterval: 1, + Protocol: "unknown", }, }, expectedValidateErr: errors.New("invalid protocol: unknown"), @@ -208,11 +208,11 @@ func TestMetricsConfig(t *testing.T) { { desc: "invalid PushInterval", cfg: MetricsConfig{ - Attributes: nil, - Exports: nil, - PushInterval: 61, - Opentelemetry: Opentelemetry{ - Protocol: "http/protobuf", + Attributes: nil, + Exports: nil, + OpenTelemetry: Opentelemetry{ + PushInterval: 61, + Protocol: "http/protobuf", }, }, expectedValidateErr: errors.New("interval must be in the range [1, 60]"), diff --git a/config/metrics.go b/config/metrics.go index f35f602..67bd7e2 100644 --- a/config/metrics.go +++ b/config/metrics.go @@ -8,12 +8,11 @@ import ( type MetricsConfig struct { Attributes Map `yaml:"attributes" envconfig:"ATTRIBUTES"` Exports []Export `yaml:"exports" envconfig:"EXPORTS"` - PushInterval uint32 `yaml:"push_interval" default:"10"` - Opentelemetry Opentelemetry `yaml:"opentelemetry"` + OpenTelemetry Opentelemetry `yaml:"opentelemetry" envconfig:"OPENTELEMETRY"` } func (cfg *MetricsConfig) Validate() error { - if err := cfg.Opentelemetry.Validate(); err != nil { + if err := cfg.OpenTelemetry.Validate(); err != nil { return err } for _, export := range cfg.Exports { @@ -21,9 +20,6 @@ func (cfg *MetricsConfig) Validate() error { return fmt.Errorf("invalid export: %s", export) } } - if cfg.PushInterval < 1 || cfg.PushInterval > 60 { - return fmt.Errorf("interval must be in the range [1, 60]") - } return nil } @@ -32,3 +28,26 @@ type Export string const ( ExportOpenTelemetry Export = "opentelemetry" ) + +type OtlpProtocol string + +const ( + OtlpProtocolGRPC OtlpProtocol = "grpc" + OtlpProtocolHTTP OtlpProtocol = "http/protobuf" +) + +type Opentelemetry struct { + PushInterval uint32 `yaml:"push_interval" default:"10"` + Protocol OtlpProtocol `yaml:"protocol" envconfig:"PROTOCOL" default:"http/protobuf"` + Endpoint string `yaml:"endpoint" envconfig:"ENDPOINT" default:"http://localhost:4318/v1/metrics"` +} + +func (cfg Opentelemetry) Validate() error { + if cfg.PushInterval < 1 || cfg.PushInterval > 60 { + return fmt.Errorf("interval must be in the range [1, 60]") + } + if !slices.Contains([]OtlpProtocol{OtlpProtocolGRPC, OtlpProtocolHTTP}, cfg.Protocol) { + return fmt.Errorf("invalid protocol: %s", cfg.Protocol) + } + return nil +} diff --git a/config/opentelemetry.go b/config/opentelemetry.go deleted file mode 100644 index 692774b..0000000 --- a/config/opentelemetry.go +++ /dev/null @@ -1,25 +0,0 @@ -package config - -import ( - "fmt" - "slices" -) - -type OtlpProtocol string - -const ( - OtlpProtocolGRPC OtlpProtocol = "grpc" - OtlpProtocolHTTP OtlpProtocol = "http/protobuf" -) - -type Opentelemetry struct { - Protocol OtlpProtocol `yaml:"protocol" envconfig:"PROTOCOL" default:"http/protobuf"` - Endpoint string `yaml:"endpoint" envconfig:"ENDPOINT" default:"http://localhost:4318/v1/metrics"` -} - -func (cfg Opentelemetry) Validate() error { - if !slices.Contains([]OtlpProtocol{OtlpProtocolGRPC, OtlpProtocolHTTP}, cfg.Protocol) { - return fmt.Errorf("invalid protocol: %s", cfg.Protocol) - } - return nil -} diff --git a/config/tracing.go b/config/tracing.go index 1186f6e..a5b1014 100644 --- a/config/tracing.go +++ b/config/tracing.go @@ -15,8 +15,5 @@ func (cfg TracingConfig) Validate() error { if cfg.SamplingRate > 1 || cfg.SamplingRate < 0 { return errors.New("sampling_rate must be in the range [0, 1]") } - if err := cfg.Opentelemetry.Validate(); err != nil { - return err - } return nil } diff --git a/db/dao/attempt_dao.go b/db/dao/attempt_dao.go index f2f48f1..7c350db 100644 --- a/db/dao/attempt_dao.go +++ b/db/dao/attempt_dao.go @@ -3,8 +3,8 @@ package dao import ( "context" "fmt" - sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" + sq "github.com/Masterminds/squirrel" "github.com/webhookx-io/webhookx/constants" "github.com/webhookx-io/webhookx/db/entities" "github.com/webhookx-io/webhookx/pkg/tracing" @@ -64,8 +64,9 @@ func (dao *attemptDao) UpdateStatusBatch(ctx context.Context, status entities.At } func (dao *attemptDao) UpdateErrorCode(ctx context.Context, id string, status entities.AttemptStatus, code entities.AttemptErrorCode) error { - ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.updateErrorCode", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) + tracingCtx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.updateErrorCode", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) defer span.End() + ctx = tracingCtx _, err := dao.update(ctx, id, map[string]interface{}{ "status": status, diff --git a/db/dao/attempt_detail_dao.go b/db/dao/attempt_detail_dao.go index cac460d..91b2b41 100644 --- a/db/dao/attempt_detail_dao.go +++ b/db/dao/attempt_detail_dao.go @@ -29,9 +29,9 @@ func NewAttemptDetailDao(db *sqlx.DB, workspace bool) AttemptDetailDAO { } func (dao *attemptDetailDao) Upsert(ctx context.Context, attemptDetail *entities.AttemptDetail) error { - ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.upsert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) + tracingCtx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.upsert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) defer span.End() - + ctx = tracingCtx now := time.Now() values := []interface{}{attemptDetail.ID, attemptDetail.RequestHeaders, attemptDetail.RequestBody, attemptDetail.ResponseHeaders, attemptDetail.ResponseBody, now, now, attemptDetail.WorkspaceId} diff --git a/db/dao/dao.go b/db/dao/dao.go index 6fac04c..3cf98b4 100644 --- a/db/dao/dao.go +++ b/db/dao/dao.go @@ -6,8 +6,8 @@ import ( "encoding/json" "errors" "fmt" - sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" + sq "github.com/Masterminds/squirrel" "github.com/webhookx-io/webhookx/config" "github.com/webhookx-io/webhookx/constants" "github.com/webhookx-io/webhookx/db/errs" @@ -93,9 +93,9 @@ func (dao *DAO[T]) UnsafeDB(ctx context.Context) Queryable { } func (dao *DAO[T]) Get(ctx context.Context, id string) (entity *T, err error) { - ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.get", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) + tracingCtx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.get", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) defer span.End() - + ctx = tracingCtx builder := psql.Select("*").From(dao.opts.Table).Where(sq.Eq{"id": id}) if dao.workspace { wid := ucontext.GetWorkspaceID(ctx) @@ -128,9 +128,9 @@ func (dao *DAO[T]) selectByField(ctx context.Context, field string, value string } func (dao *DAO[T]) Delete(ctx context.Context, id string) (bool, error) { - ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.delete", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) + tracingCtx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.delete", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) defer span.End() - + ctx = tracingCtx builder := psql.Delete(dao.opts.Table).Where(sq.Eq{"id": id}) if dao.workspace { wid := ucontext.GetWorkspaceID(ctx) @@ -153,9 +153,9 @@ func (dao *DAO[T]) Delete(ctx context.Context, id string) (bool, error) { } func (dao *DAO[T]) Page(ctx context.Context, q query.Queryer) (list []*T, total int64, err error) { - ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.page", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) + tracingCtx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.page", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) defer span.End() - + ctx = tracingCtx total, err = dao.Count(ctx, q.WhereMap()) if err != nil { return @@ -165,9 +165,9 @@ func (dao *DAO[T]) Page(ctx context.Context, q query.Queryer) (list []*T, total } func (dao *DAO[T]) Count(ctx context.Context, where map[string]interface{}) (total int64, err error) { - ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.count", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) + tracingCtx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.count", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) defer span.End() - + ctx = tracingCtx builder := psql.Select("COUNT(*)").From(dao.opts.Table) if len(where) > 0 { builder = builder.Where(where) @@ -183,9 +183,9 @@ func (dao *DAO[T]) Count(ctx context.Context, where map[string]interface{}) (tot } func (dao *DAO[T]) List(ctx context.Context, q query.Queryer) (list []*T, err error) { - ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.list", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) + tracingCtx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.list", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) defer span.End() - + ctx = tracingCtx builder := psql.Select("*").From(dao.opts.Table) where := q.WhereMap() if len(where) > 0 { @@ -230,9 +230,9 @@ func travel(entity interface{}, fn func(field reflect.StructField, value reflect } func (dao *DAO[T]) Insert(ctx context.Context, entity *T) error { - ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.insert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) + tracingCtx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.insert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) defer span.End() - + ctx = tracingCtx columns := make([]string, 0) values := make([]interface{}, 0) travel(entity, func(f reflect.StructField, v reflect.Value) { @@ -257,9 +257,9 @@ func (dao *DAO[T]) Insert(ctx context.Context, entity *T) error { } func (dao *DAO[T]) BatchInsert(ctx context.Context, entities []*T) error { - ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.batch_insert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) + tracingCtx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.batch_insert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) defer span.End() - + ctx = tracingCtx if len(entities) == 0 { return nil } @@ -328,9 +328,9 @@ func (dao *DAO[T]) update(ctx context.Context, id string, maps map[string]interf } func (dao *DAO[T]) Update(ctx context.Context, entity *T) error { - ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.update", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) + tracingCtx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.update", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) defer span.End() - + ctx = tracingCtx var id string builder := psql.Update(dao.opts.Table) travel(entity, func(f reflect.StructField, v reflect.Value) { diff --git a/db/db.go b/db/db.go index e629ccf..ed61bea 100644 --- a/db/db.go +++ b/db/db.go @@ -65,8 +65,9 @@ func (db *DB) Ping() error { } func (db *DB) TX(ctx context.Context, fn func(ctx context.Context) error) error { - ctx, span := tracing.Start(ctx, "db.transaction", trace.WithSpanKind(trace.SpanKindServer)) + tracingCtx, span := tracing.Start(ctx, "db.transaction", trace.WithSpanKind(trace.SpanKindServer)) defer span.End() + ctx = tracingCtx tx, err := db.DB.Beginx() if err != nil { diff --git a/dispatcher/dispatcher.go b/dispatcher/dispatcher.go index afbf68c..971a6d0 100644 --- a/dispatcher/dispatcher.go +++ b/dispatcher/dispatcher.go @@ -47,8 +47,9 @@ func (d *Dispatcher) DispatchBatch(ctx context.Context, events []*entities.Event } func (d *Dispatcher) dispatchBatch(ctx context.Context, events []*entities.Event) (int, error) { - ctx, span := tracing.Start(ctx, "dispatcher.dispatch", trace.WithSpanKind(trace.SpanKindServer)) + tracingCtx, span := tracing.Start(ctx, "dispatcher.dispatch", trace.WithSpanKind(trace.SpanKindServer)) defer span.End() + ctx = tracingCtx if len(events) == 0 { return 0, nil diff --git a/docker-compose.yml b/docker-compose.yml index e6925d6..72d8ace 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -27,6 +27,7 @@ services: WEBHOOKX_PROXY_LISTEN: 0.0.0.0:8081 WEBHOOKX_PROXY_QUEUE_REDIS_HOST: redis WEBHOOKX_PROXY_QUEUE_REDIS_PORT: 6379 + ports: - "8080:8080" - "8081:8081" diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index dc3d4af..2cea523 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -60,8 +60,8 @@ func New(cfg config.MetricsConfig) (*Metrics, error) { } if len(cfg.Exports) > 0 { - m.Interval = time.Second * time.Duration(cfg.PushInterval) - err := SetupOpentelemetry(m.ctx, cfg.Attributes, cfg.Opentelemetry, m) + m.Interval = time.Second * time.Duration(cfg.OpenTelemetry.PushInterval) + err := SetupOpentelemetry(m.ctx, cfg.Attributes, cfg.OpenTelemetry, m) if err != nil { return nil, err } diff --git a/pkg/metrics/opentelemetry.go b/pkg/metrics/opentelemetry.go index ab26e88..c6c234c 100644 --- a/pkg/metrics/opentelemetry.go +++ b/pkg/metrics/opentelemetry.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/semconv/v1.26.0" + "time" ) const ( @@ -62,7 +63,7 @@ func SetupOpentelemetry(ctx context.Context, attributes map[string]string, cfg c } opts := []metric.PeriodicReaderOption{ - metric.WithInterval(metrics.Interval), + metric.WithInterval(time.Second * time.Duration(cfg.PushInterval)), } meterProvider := metric.NewMeterProvider( diff --git a/pkg/queue/redis/redis.go b/pkg/queue/redis/redis.go index 795393c..63e33dc 100644 --- a/pkg/queue/redis/redis.go +++ b/pkg/queue/redis/redis.go @@ -56,8 +56,9 @@ func NewRedisQueue(opts RedisQueueOptions, logger *zap.SugaredLogger, metrics *m } func (q *RedisQueue) Enqueue(ctx context.Context, message *queue.Message) error { - ctx, span := tracing.Start(ctx, "redis.queue.enqueue", trace.WithSpanKind(trace.SpanKindServer)) + tracingCtx, span := tracing.Start(ctx, "redis.queue.enqueue", trace.WithSpanKind(trace.SpanKindServer)) defer span.End() + ctx = tracingCtx args := &redis.XAddArgs{ Stream: q.stream, @@ -92,8 +93,9 @@ func toMessage(values map[string]interface{}) *queue.Message { } func (q *RedisQueue) Dequeue(ctx context.Context, opt *queue.Options) ([]*queue.Message, error) { - ctx, span := tracing.Start(ctx, "redis.queue.dequeue", trace.WithSpanKind(trace.SpanKindServer)) + tracingCtx, span := tracing.Start(ctx, "redis.queue.dequeue", trace.WithSpanKind(trace.SpanKindServer)) defer span.End() + ctx = tracingCtx var count int64 = 1 if opt != nil && opt.Count != 0 { @@ -136,8 +138,9 @@ func (q *RedisQueue) Dequeue(ctx context.Context, opt *queue.Options) ([]*queue. } func (q *RedisQueue) Delete(ctx context.Context, messages []*queue.Message) error { - ctx, span := tracing.Start(ctx, "redis.queue.delete", trace.WithSpanKind(trace.SpanKindServer)) + tracingCtx, span := tracing.Start(ctx, "redis.queue.delete", trace.WithSpanKind(trace.SpanKindServer)) defer span.End() + ctx = tracingCtx ids := make([]string, 0, len(messages)) for _, message := range messages { diff --git a/pkg/taskqueue/redis.go b/pkg/taskqueue/redis.go index afa78e2..465611e 100644 --- a/pkg/taskqueue/redis.go +++ b/pkg/taskqueue/redis.go @@ -93,8 +93,9 @@ func NewRedisQueue(opts RedisTaskQueueOptions, logger *zap.SugaredLogger, metric } func (q *RedisTaskQueue) Add(ctx context.Context, tasks []*TaskMessage) error { - ctx, span := tracing.Start(ctx, "taskqueue.redis.add", trace.WithSpanKind(trace.SpanKindServer)) + tracingCtx, span := tracing.Start(ctx, "taskqueue.redis.add", trace.WithSpanKind(trace.SpanKindServer)) defer span.End() + ctx = tracingCtx members := make([]redis.Z, 0, len(tasks)) strs := make([]interface{}, 0, len(tasks)*2) @@ -118,8 +119,9 @@ func (q *RedisTaskQueue) Add(ctx context.Context, tasks []*TaskMessage) error { } func (q *RedisTaskQueue) Get(ctx context.Context, opts *GetOptions) ([]*TaskMessage, error) { - ctx, span := tracing.Start(ctx, "taskqueue.redis.get", trace.WithSpanKind(trace.SpanKindServer)) + tracingCtx, span := tracing.Start(ctx, "taskqueue.redis.get", trace.WithSpanKind(trace.SpanKindServer)) defer span.End() + ctx = tracingCtx keys := []string{q.queue, q.queueData, q.invisibleQueue} argv := []interface{}{ @@ -150,8 +152,9 @@ func (q *RedisTaskQueue) Get(ctx context.Context, opts *GetOptions) ([]*TaskMess } func (q *RedisTaskQueue) Delete(ctx context.Context, task *TaskMessage) error { - ctx, span := tracing.Start(ctx, "taskqueue.redis.delete", trace.WithSpanKind(trace.SpanKindServer)) + tracingCtx, span := tracing.Start(ctx, "taskqueue.redis.delete", trace.WithSpanKind(trace.SpanKindServer)) defer span.End() + ctx = tracingCtx q.log.Debugf("[redis-queue]: delete task %s", task.ID) pipeline := q.c.Pipeline() diff --git a/test/tracing/worker_test.go b/test/tracing/worker_test.go index 9a4ca4b..2f31522 100644 --- a/test/tracing/worker_test.go +++ b/test/tracing/worker_test.go @@ -54,7 +54,7 @@ var _ = Describe("tracing worker", Ordered, func() { } expectedScopeSpans := map[string]map[string]string{ "worker.submit": {}, - "worker.handle_task": {}, + "worker.handle": {}, "dao.endpoints.get": {}, "dao.plugins.list": {}, "dao.events.get": {}, @@ -107,7 +107,7 @@ var _ = Describe("tracing worker", Ordered, func() { } // make sure worker handle full trace - traceID := trace.getTraceIDBySpanName("worker.handle_task") + traceID := trace.getTraceIDBySpanName("worker.handle") if traceID == "" { fmt.Printf("trace id not exist") return false diff --git a/worker/deliverer/http.go b/worker/deliverer/http.go index f574184..8edd023 100644 --- a/worker/deliverer/http.go +++ b/worker/deliverer/http.go @@ -34,9 +34,9 @@ func timing(fn func()) time.Duration { } func (d *HTTPDeliverer) Deliver(ctx context.Context, req *Request) (res *Response) { - ctx, span := tracing.Start(ctx, "worker.deliver", trace.WithSpanKind(trace.SpanKindClient)) + tracingCtx, span := tracing.Start(ctx, "worker.deliver", trace.WithSpanKind(trace.SpanKindClient)) defer span.End() - + ctx = tracingCtx timeout := req.Timeout if timeout == 0 { timeout = d.defaultTimeout diff --git a/worker/worker.go b/worker/worker.go index 2afd51e..157775a 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -212,7 +212,7 @@ func (w *Worker) processRequeue() { func (w *Worker) handleTask(ctx context.Context, task *taskqueue.TaskMessage) error { if w.tracer != nil { - tracingCtx, span := w.tracer.Start(ctx, "worker.handle_task", trace.WithSpanKind(trace.SpanKindServer)) + tracingCtx, span := w.tracer.Start(ctx, "worker.handle", trace.WithSpanKind(trace.SpanKindServer)) defer span.End() ctx = tracingCtx }