Skip to content

Commit

Permalink
Fix race condition in otelcol processors
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr committed Nov 4, 2024
1 parent 0bd2ebb commit f07ea86
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 14 deletions.
32 changes: 32 additions & 0 deletions internal/component/otelcol/internal/lazyconsumer/lazyconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
type Consumer struct {
ctx context.Context

// pauseMut is used to implement Pause & Resume semantics. When a write lock is held - this consumer is paused.
// See Pause method for more info.
pauseMut sync.RWMutex

mut sync.RWMutex
metricsConsumer otelconsumer.Metrics
logsConsumer otelconsumer.Logs
Expand All @@ -36,6 +40,13 @@ func New(ctx context.Context) *Consumer {
return &Consumer{ctx: ctx}
}

// NewPaused is like New, but returns a Consumer that is paused by calling Pause method.
func NewPaused(ctx context.Context) *Consumer {
c := New(ctx)
c.Pause()
return c
}

// Capabilities implements otelconsumer.baseConsumer.
func (c *Consumer) Capabilities() otelconsumer.Capabilities {
return otelconsumer.Capabilities{
Expand All @@ -52,6 +63,9 @@ func (c *Consumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
return c.ctx.Err()
}

c.pauseMut.RLock() // wait until resumed
defer c.pauseMut.RUnlock()

c.mut.RLock()
defer c.mut.RUnlock()

Expand All @@ -73,6 +87,9 @@ func (c *Consumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error
return c.ctx.Err()
}

c.pauseMut.RLock() // wait until resumed
defer c.pauseMut.RUnlock()

c.mut.RLock()
defer c.mut.RUnlock()

Expand All @@ -94,6 +111,9 @@ func (c *Consumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
return c.ctx.Err()
}

c.pauseMut.RLock() // wait until resumed
defer c.pauseMut.RUnlock()

c.mut.RLock()
defer c.mut.RUnlock()

Expand All @@ -119,3 +139,15 @@ func (c *Consumer) SetConsumers(t otelconsumer.Traces, m otelconsumer.Metrics, l
c.logsConsumer = l
c.tracesConsumer = t
}

// Pause will stop the consumer until Resume is called. While paused, the calls to Consume* methods will block.
// After calling Pause once, it must not be called again until Resume is called.
func (c *Consumer) Pause() {
c.pauseMut.Lock()
}

// Resume will revert the Pause call and the consumer will continue to work. Resume must not be called if Pause wasn't
// called before. See Pause for more details.
func (c *Consumer) Resume() {
c.pauseMut.Unlock()
}
34 changes: 32 additions & 2 deletions internal/component/otelcol/internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/runtime/logging/level"
otelcomponent "go.opentelemetry.io/collector/component"
"go.uber.org/multierr"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/runtime/logging/level"
)

// Scheduler implements manages a set of OpenTelemetry Collector components.
Expand All @@ -39,6 +40,11 @@ type Scheduler struct {

// newComponentsCh is written to when schedComponents gets updated.
newComponentsCh chan struct{}

// onPause is called when scheduler is making changes to running components.
onPause func()
// onResume is called when scheduler is done making changes to running components.
onResume func()
}

// New creates a new unstarted Scheduler. Call Run to start it, and call
Expand All @@ -47,6 +53,20 @@ func New(l log.Logger) *Scheduler {
return &Scheduler{
log: l,
newComponentsCh: make(chan struct{}, 1),
onPause: func() {},
onResume: func() {},
}
}

// NewWithPauseCallbacks is like New, but allows to specify onPause and onResume callbacks. The scheduler is assumed to
// start paused and only when its components are scheduled, it will call onResume. From then on, each update to running
// components via Schedule method will trigger a call to onPause and then onResume.
func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler {
return &Scheduler{
log: l,
newComponentsCh: make(chan struct{}, 1),
onPause: onPause,
onResume: onResume,
}
}

Expand Down Expand Up @@ -75,11 +95,16 @@ func (cs *Scheduler) Schedule(h otelcomponent.Host, cc ...otelcomponent.Componen
// Run starts the Scheduler. Run will watch for schedule components to appear
// and run them, terminating previously running components if they exist.
func (cs *Scheduler) Run(ctx context.Context) error {
firstRun := true
var components []otelcomponent.Component

// Make sure we terminate all of our running components on shutdown.
defer func() {
if !firstRun { // always handle the callbacks correctly
cs.onPause()
}
cs.stopComponents(context.Background(), components...)
cs.onResume()
}()

// Wait for a write to cs.newComponentsCh. The initial list of components is
Expand All @@ -90,6 +115,10 @@ func (cs *Scheduler) Run(ctx context.Context) error {
case <-ctx.Done():
return nil
case <-cs.newComponentsCh:
if !firstRun { // do not pause on first run
cs.onPause()
firstRun = false
}
// Stop the old components before running new scheduled ones.
cs.stopComponents(ctx, components...)

Expand All @@ -100,6 +129,7 @@ func (cs *Scheduler) Run(ctx context.Context) error {

level.Debug(cs.log).Log("msg", "scheduling components", "count", len(components))
components = cs.startComponents(ctx, host, components...)
cs.onResume()
}
}
}
Expand Down
25 changes: 13 additions & 12 deletions internal/component/otelcol/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ import (
"errors"
"os"

"github.com/prometheus/client_golang/prometheus"
otelcomponent "go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
otelextension "go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/pipeline"
otelprocessor "go.opentelemetry.io/collector/processor"
sdkprometheus "go.opentelemetry.io/otel/exporters/prometheus"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/sdk/metric"

"github.com/grafana/alloy/internal/build"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/otelcol"
Expand All @@ -18,16 +29,6 @@ import (
"github.com/grafana/alloy/internal/component/otelcol/internal/scheduler"
"github.com/grafana/alloy/internal/service/livedebugging"
"github.com/grafana/alloy/internal/util/zapadapter"
"github.com/prometheus/client_golang/prometheus"
otelcomponent "go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
otelextension "go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/pipeline"
otelprocessor "go.opentelemetry.io/collector/processor"
sdkprometheus "go.opentelemetry.io/otel/exporters/prometheus"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/sdk/metric"
)

// Arguments is an extension of component.Arguments which contains necessary
Expand Down Expand Up @@ -94,7 +95,7 @@ func New(opts component.Options, f otelprocessor.Factory, args Arguments) (*Proc

ctx, cancel := context.WithCancel(context.Background())

consumer := lazyconsumer.New(ctx)
consumer := lazyconsumer.NewPaused(ctx)

// Create a lazy collector where metrics from the upstream component will be
// forwarded.
Expand All @@ -116,7 +117,7 @@ func New(opts component.Options, f otelprocessor.Factory, args Arguments) (*Proc
factory: f,
consumer: consumer,

sched: scheduler.New(opts.Logger),
sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
collector: collector,

liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), opts.ID),
Expand Down

0 comments on commit f07ea86

Please sign in to comment.