-
Notifications
You must be signed in to change notification settings - Fork 147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Daisy: Consolidate reading of serial port output. #1243
Conversation
Skipping CI for Draft Pull Request. |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: EricEdens The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
for _, cb := range callbacks { | ||
subscribers = append(subscribers, cb.channel) | ||
if cb.pollingInterval < pollingFrequency { | ||
cb.pollingInterval = pollingFrequency |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to the above comment : what if a watcher is registered after start? Should the polling interval be updated in Watch()
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intention was to disallow late subscribers, specifically by not sending them updates if they call subscribe after watch, as that would be a clearer failure compared to sending them partial logs.
/hold |
(*watcher).Watch(name, serialPortToArchive, c, serialPortPollInterval) | ||
(*watcher).start(name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StepCreateInstance is starting the watcher, while StepWaitForInstance is registering to watch the same instance. Does that mean this code relies on the fact CreateInstnace.logSerialOutput is always called after WaitForInstance. validateForWaitForInstancesSignal()? Can this order of calls be changed in the future, leading to panic()?
Can we have all watcher.start() run in a single place, after we're sure steps have registered themselves as watchers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm glad you brought this up, as the current approach feels fragile to me as well. Any ideas on where that central place might be?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since serial port output can start as soon as an instance is created, it makes sense to have start()
in step_create_instance.go. I'm not sure we can have start()
somewhere else and ensure no serial output is lost.
Because of this, in the future, we might have subscribers call Watch()
after start()
. If we do allow this (currently it panics), we can handle it in a couple of ways:
- buffer output on the watcher side (up to a point to allow some later subcribers, but not have to store whole output for the duration of the execution), or
- allow subcribers to know if output has already started for given port/instance name before subcribing (and if any of it will be missed) and let them decide if they can live with this or not (by throwing panic on their own). If there was no output before late subcriber calls
Watch()
then nothing is lost and things should work as expected.
There are probably other possible solutions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've designed this system to specifically disallow late subscribers for two reasons:
- We don't need it now.
- We don't have an upcoming use case for it.
Am I missing something here? Is there a use case that you're aware of?
Regarding how Watch
and start
are wired into the current steps I've addressed potential fragility in three ways:
- I've documented the behavior: https://github.com/GoogleCloudPlatform/compute-image-tools/pull/1243/files#diff-364879f77dafcc960afbb3cc4ded0dabR35
- If someone inadvertently performs a late subscription, they won't receive updates, so it will fail loudly.
- If someone changes
step_wait_for_instances_signal.go
and movesWatch
to an incorrect location, then tests will fail (unit and integration)
Regarding your two potential solutions (buffer or indicate that polling has already started), I think those are fantastic options if we need to support late subscribers. Do you have a use case in mind, though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion "Watch" and "start" should be bounded together. No matter whether "start" has been done, it should be called again blindly without side effect.
It's also a reason why we should allow "late subscribe": it may have been started.
It's not a good idea to always rely on create_instance to start the watcher. The reason is that we definitely should be able to wait for a signal from a instance which is not created in the current daisy workflow.
/retest |
@@ -51,73 +57,46 @@ func (ci *CreateInstances) UnmarshalJSON(b []byte) error { | |||
return nil | |||
} | |||
|
|||
func logSerialOutput(ctx context.Context, s *Step, ii InstanceInterface, ib *InstanceBase, port int64, interval time.Duration) { | |||
func logSerialOutput(s *Step, name string, watcher *SerialOutputWatcher, wcProvider func() io.WriteCloser) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear what the 'name' presents for when I read it the 1st time.
|
||
// SerialOutputWatcher returns a SerialOutputWatcher that can be used to subscribe to the | ||
// serial output of instances managed by Daisy. | ||
func (w *Workflow) SerialOutputWatcher() *SerialOutputWatcher { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it thread safe?
(*watcher).Watch(name, serialPortToArchive, c, serialPortPollInterval) | ||
(*watcher).start(name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion "Watch" and "start" should be bounded together. No matter whether "start" has been done, it should be called again blindly without side effect.
It's also a reason why we should allow "late subscribe": it may have been started.
It's not a good idea to always rely on create_instance to start the watcher. The reason is that we definitely should be able to wait for a signal from a instance which is not created in the current daisy workflow.
Daisy reads serial port output (serial output) in two places: once in createInstances for log archival, and once in waitForInstancesSignal for control flow. These readers are independent: they both make GCP API calls, manage their own errors, and perform their own retries.
Uncoordinated access causes two errors:
This change introduces a pubsub mechanism to allow a single serial port reader to broadcast updates to multiple consumers.
Fixes #1160