-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Migrate to use systemd events #217
base: main
Are you sure you want to change the base?
Conversation
9e198e5
to
cb3229e
Compare
Failing to properly configure the file permissions for |
Fixed in project-flotta/flotta-operator#290 . It is not a showstopper for this PR as it only impacts when running a custom RPM for the end to end tests. |
aae74c6
to
c6b74ca
Compare
internal/service/event_listener.go
Outdated
func (e *EventListener) Add(workloadName string) { | ||
name := fmt.Sprintf("%s.service", workloadName) | ||
log.Debugf("Adding service for events %s", name) | ||
if !e.set.Contains(name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which case this might happen?
An attempt to add an already created service should be translated to a 'replace', but don't think we should meet this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a safeguard just in case the service was already part of the filter. In theory this should never happen based on the nominal flow. If a service is replaced it will be first removed and then readded by the events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ended up removing it with the new logic to watch for file system events. The case where the service could be added twice is not possible anymore.
internal/service/event_listener.go
Outdated
} | ||
|
||
func (e *EventListener) Add(workloadName string) { | ||
name := fmt.Sprintf("%s.service", workloadName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pls pull fmt.Sprintf("%s.service", workloadName)
to its method to be reused by Remove
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
internal/service/systemd.go
Outdated
@@ -20,12 +20,20 @@ const ( | |||
DefaultRestartTimeout = 15 | |||
TimerSuffix = ".timer" | |||
ServiceSuffix = ".service" | |||
DefaultNameSeparator = "-" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleted. I wonder why no checks flagged this in the PR workflow.
internal/workload/wrapper.go
Outdated
|
||
err := svc.Stop() | ||
if err != nil { | ||
log.Errorf("unable to stop service %s:%s", workloadName, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/unable/Unable ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
internal/workload/wrapper.go
Outdated
if err != nil { | ||
return nil | ||
log.Errorf("unable to remove service from serviceManager %s:%s", workloadName, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/unable/Unable ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@jordigilh what I mentioned yesterday regarding restarts: Initialize all as normal, and deploy a workload; you will get the info here: After all the pods are running, you need to restart yggdrasild
there is no notification that the workloadStarted, so the metrics are not updated at all, and it'll not scrape as it should. |
That's weird, I'm actually seeing it:
Look at the last line:
Which is generated in this line: That's the only workload available for the device:
|
maybe the channel is not initialized at that time? |
In that case the message should not have been received by the wrapper. In my case I'm testing with a workload and an edgedevice that has no metrics nor logs, so maybe that's why there are no other messages.
Notice that there are no additional messages for metrics or logs after |
This is how I'm testing, and that should be received, but it's not. |
Confirmed. The problem is that the observers are not instantiated before the events are triggered. I will refactor the code to init the observers before the event listener is instantiated |
3a6a7c8
to
4427bfb
Compare
@eloycoto can you try again?. I made it Eloy's |
bb407e0
to
362dcd5
Compare
362dcd5
to
33657e0
Compare
@masayag @eloycoto I've refactored the code and removed the FS Notify logic. Instead the event listener is now an Observer and taps into the device config as source of truth to be aware of which services to watch events for. Take a look at the last commit if you don't want to be bothered with the rest. One thing that still bothers me is that we don't have any means to track other enabled user services, in case somehow a new service is added to the user outside of the agent's control. |
internal/service/event_listener.go
Outdated
removed unitSubState = "removed" | ||
) | ||
|
||
var () |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls remove it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
d14c973
to
c825ed8
Compare
cmd/device-worker/main.go
Outdated
@@ -20,7 +20,8 @@ import ( | |||
os2 "github.com/project-flotta/flotta-device-worker/internal/os" | |||
registration2 "github.com/project-flotta/flotta-device-worker/internal/registration" | |||
"github.com/project-flotta/flotta-device-worker/internal/server" | |||
workload2 "github.com/project-flotta/flotta-device-worker/internal/workload" | |||
"github.com/project-flotta/flotta-device-worker/internal/service" | |||
workload "github.com/project-flotta/flotta-device-worker/internal/workload" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this workload alias is not needed at all, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, will remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
internal/service/event_listener.go
Outdated
log.Infof("Starting DBus event listener") | ||
conn, err := newDbusConnection(UserBus) | ||
if err != nil { | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be this formarted with a message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was not sure what would be the added value. Any suggestion to what to append?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok I added this:
fmt.Errorf("error while starting event listener: %v", err)
svcName := DefaultServiceName(wl.Name) | ||
if !e.contains(svcName) { | ||
e.add(svcName) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The else conditions shouldn't raise an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok will add.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
log.Debugf("Captured DBus event for %s: %v+", name, unit) | ||
n, err := extractWorkloadName(name) | ||
if err != nil { | ||
log.Error(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add a wrapper error message here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any suggestions? The message already contains the service name
continue | ||
} | ||
state := translateUnitSubStatus(unit) | ||
log.Debugf("Systemd service for workload %s transitioned to sub state %s\n", n, state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will change the level to Info
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be too verbose with a few workloads in place. I had them all in Info and it was dumping too much info. Keep in mind that for every change in the unit's status it will print this message, even when the changes are to be ignored because the new sub-state is not relevant.
I had this enabled earlier and it was dumping too much information.
The changes in state are already captured in here:
log.Infof("Service for workload %s started", event.WorkloadName) |
log.Debugf("Sending stop event to observer channel for workload %s", n) | ||
e.eventCh <- &Event{WorkloadName: n, Type: EventStopped} | ||
default: | ||
log.Debugf("Ignoring unit sub state for service %s: %s", name, unit.SubState) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be an error? What cases are missing here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few sub states we ignore because they are not relevant to our needs:
- post-start
- post-stop
- pre-start
...
The ones we capture are the ones we want to take action.
internal/service/systemd.go
Outdated
@@ -267,7 +262,8 @@ func (s *systemd) Stop() error { | |||
} | |||
|
|||
func (s *systemd) Enable() error { | |||
conn, err := newDbusConnection(s.Rootless) | |||
log.Debugf("Enabling service %s", s.Name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enabling systemd service %v
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done with %s
internal/service/systemd.go
Outdated
@@ -278,7 +274,8 @@ func (s *systemd) Enable() error { | |||
} | |||
|
|||
func (s *systemd) Disable() error { | |||
conn, err := newDbusConnection(s.Rootless) | |||
log.Debugf("Disabling service %s", s.Name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Disabling systemd service
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good
internal/workload/podman/podman.go
Outdated
@@ -15,9 +15,9 @@ import ( | |||
"github.com/blang/semver" | |||
"github.com/go-openapi/swag" | |||
"github.com/project-flotta/flotta-device-worker/internal/service" | |||
api "github.com/project-flotta/flotta-device-worker/internal/workload/api" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this alias is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
acknowledged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
internal/workload/wrapper.go
Outdated
log.Infof("Service for workload %s started", event.WorkloadName) | ||
report, err := w.podManager.GetPodReportForPodName(event.WorkloadName) | ||
if err != nil { | ||
log.Errorf("unable to get pod report for workload %s:%v", event.WorkloadName, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will workload '%s' as line 364
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
understood
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
internal/workload/wrapper.go
Outdated
observer.WorkloadStarted(event.WorkloadName, []*podman.PodReport{report}) | ||
} | ||
case service.EventStopped: | ||
log.Infof("Service for workload %s stopped", event.WorkloadName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'%s'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
understood
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* Moved event listener instance and initialization to cmd.go main * Added a file system watcher to the systemd enabled services directory to be notified of new services from being enabled/disabled so that the event listener can track their status * Encapsulated the code to listen to service events generated by the systemd event listener so that it is called at the last line of the cmd.go#main function to make sure all observers are ready before start processing the service events
…ceived to avoid potential race condition between file system event and dbus event
c825ed8
to
7c39f0a
Compare
@eloycoto I've addressed some of your comments, there are a few that I've replied. PTAL when you have spare time. |
7c39f0a
to
20e0fb5
Compare
20e0fb5
to
13b2b9e
Compare
- Use- One routine to watch for service files to monitor
- Another routine to watch for status of those services.
fsNotify
to capture file system changes in the.config/systemd/user/
directory; whenever a new change is made to this subdirectory, a routine in flotta receives the event and determines if it's an enabled or disabled service or something else. If it's an enabled service, for instance, it will add the service to the list of services to watch in the event listener, which will trigger an event from the dbus to notify the status of the service. It might look cumbersome but it's consistent with systemd:String()
function to theObserver
interface inconfiguration.go
to help listing their names when debuging to display the observers registered instead of their memory references.The
services.json
file remains as it is required to identify which service files are linked to a workload. We could get the information from systemd's service unit definition, but I don't want to extend this PR more than it is already, as it is starting to look like a behemoth, when it was supposed to be horse size.Note: I tried to create an interface layer between dbus and the event listener so I could add unit tests. Unfortunately the initial effort translated into creating extra complexity because the dbus package is not providing interfaces. Suggestions are welcome.