-
Notifications
You must be signed in to change notification settings - Fork 203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix race condition in otelcol processors #2027
base: main
Are you sure you want to change the base?
Conversation
…e_on_attribute_value to transform processor
f07ea86
to
122a517
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this, I think that's a step in the right direction.
The logic was not super clear to me at first in the scheduler, the idea is that the components are created in the paused state so they are not paused on the first run but are always resumed after being started because they are paused when the scheduler stops running, right? Maybe we could have this explanation in the scheduler code
// called before. See Pause for more details. | ||
func (c *Consumer) Resume() { | ||
c.pauseMut.Unlock() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If pause is called by accident on a paused consumer or resume on a running consumer, it will be dramatic. Should the consumer holds its state (smth like "isPaused") to avoid locking if its already paused or avoid resuming if it's already running for extra safety?
@@ -90,6 +115,10 @@ func (cs *Scheduler) Run(ctx context.Context) error { | |||
case <-ctx.Done(): | |||
return nil | |||
case <-cs.newComponentsCh: | |||
if !firstRun { // do not pause on first run | |||
cs.onPause() | |||
firstRun = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
firstRun cannot be set to false inside of the !firstRun statement. I think you should set is right after it, right?
@@ -100,6 +129,7 @@ func (cs *Scheduler) Run(ctx context.Context) error { | |||
|
|||
level.Debug(cs.log).Log("msg", "scheduling components", "count", len(components)) | |||
components = cs.startComponents(ctx, host, components...) | |||
cs.onResume() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the component was created with New() instead of NewPaused() then this will panic because on the first run you will try to unlock the mut that's not locked
PR Description
We have a general issue with OTel components where consumers may be used before the Start functions in OTel have finished running. This is because in OTel Start functions are non-blocking and sometimes do work to set things up, like it was the case for batch_processor. In Alloy, however we have Run function that is blocking for the lifetime of the component. As soon as it's called, we consider the component Running. In OTel, however, the Start function should be called and exit to consider a component running.
The solution here is to pause the Consumer until we are sure that the OTel component scheduler has called
Start
on all OTel components. The consumer will block any attempts to feed data to it.Which issue(s) this PR fixes
Notes to the Reviewer
PR Checklist