Skip to content

Commit

Permalink
Refactor systemd listener:
Browse files Browse the repository at this point in the history
* Moved event listener instance and initialization to cmd.go main
* Added a file system watcher to the systemd enabled services directory to be notified of new services from being enabled/disabled so that the event listener can track their status
* Encapsulated the code to listen to service events generated by the systemd event listener so that it is called at the last line of the cmd.go#main function to make sure all observers are ready before start processing the service events
  • Loading branch information
jordigilh committed Aug 11, 2022
1 parent 8fd2b8c commit 623a857
Show file tree
Hide file tree
Showing 14 changed files with 254 additions and 133 deletions.
48 changes: 29 additions & 19 deletions cmd/device-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
os2 "github.com/project-flotta/flotta-device-worker/internal/os"
registration2 "github.com/project-flotta/flotta-device-worker/internal/registration"
"github.com/project-flotta/flotta-device-worker/internal/server"
workload2 "github.com/project-flotta/flotta-device-worker/internal/workload"
"github.com/project-flotta/flotta-device-worker/internal/service"
workload "github.com/project-flotta/flotta-device-worker/internal/workload"

"net"
"os"
Expand All @@ -41,9 +42,9 @@ const (
)

func initSystemdDirectory() error {
systemddir := filepath.Join(os.Getenv("HOME"), ".config/systemd/user/")

// init the flotta user systemd units directory
err := os.MkdirAll(systemddir, 0750)
err := os.MkdirAll(service.SystemdUserServicesFullPath, 0750)
if err != nil {
return err
}
Expand Down Expand Up @@ -141,6 +142,16 @@ func main() {
}
configManager := configuration2.NewConfigurationManager(dataDir)

// Systemd event listener
listener := service.NewDBusEventListener()
eventCh, err := listener.Connect()
if err != nil {
log.Fatal(err)
}
// Start listening to systemd bus events. These events generated from here can start being processed once all observers have been initialized, otherwise
// we risk in missing observers whilst processing the events.
go listener.Listen()

// --- Client metrics configuration ---
metricsStore, err := metrics.NewTSDB(dataDir)
if err != nil {
Expand All @@ -162,16 +173,16 @@ func main() {
dataTransferWatcher := metrics.NewDataTransferMetrics(metricsDaemon)
configManager.RegisterObserver(dataTransferWatcher)

wl, err := workload2.NewWorkloadManager(dataDir, deviceId)
workloadManager, err := workload.NewWorkloadManager(dataDir, deviceId, eventCh)
if err != nil {
log.Fatalf("cannot start Workload Manager. DeviceID: %s; err: %v", deviceId, err)
}

logsWrapper := logs.NewWorkloadsLogsTarget(wl)
logsWrapper := logs.NewWorkloadsLogsTarget(workloadManager)
configManager.RegisterObserver(logsWrapper)
wl.RegisterObserver(logsWrapper)
configManager.RegisterObserver(wl)
wl.RegisterObserver(workloadMetricWatcher)
workloadManager.RegisterObserver(logsWrapper)
configManager.RegisterObserver(workloadManager)
workloadManager.RegisterObserver(workloadMetricWatcher)

remoteWrite := metrics.NewRemoteWrite(dataDir, deviceId, metricsStore)
configManager.RegisterObserver(remoteWrite)
Expand All @@ -189,25 +200,21 @@ func main() {
}
configManager.RegisterObserver(mountManager)

dataMonitor := datatransfer.NewMonitor(wl, configManager)
wl.RegisterObserver(dataMonitor)
dataMonitor := datatransfer.NewMonitor(workloadManager, configManager)
workloadManager.RegisterObserver(dataMonitor)
configManager.RegisterObserver(dataMonitor)
dataMonitor.Start()

if err != nil {
log.Fatalf("cannot start metrics store. DeviceID: %s; err: %v", deviceId, err)
}

reg, err := registration2.NewRegistration(deviceId, &hw, dispatcherClient, configManager, wl)
reg, err := registration2.NewRegistration(deviceId, &hw, dispatcherClient, configManager, workloadManager)
if err != nil {
log.Fatalf("cannot start registration process: DeviceID: %s; err: %v", deviceId, err)
}

hbs := heartbeat2.NewHeartbeatService(dispatcherClient, configManager, wl, &hw, dataMonitor, deviceOs, reg)
hbs := heartbeat2.NewHeartbeatService(dispatcherClient, configManager, workloadManager, &hw, dataMonitor, deviceOs, reg)
configManager.RegisterObserver(hbs)

reg.DeregisterLater(
wl,
workloadManager,
configManager,
hbs,
dataMonitor,
Expand Down Expand Up @@ -239,16 +246,19 @@ func main() {

setupSignalHandler(metricsStore, ansibleManager)

go listenStartGracefulRebootChannel(wl, dataMonitor, systemMetricsWatcher, metricsStore, hbs, ansibleManager,
go listenStartGracefulRebootChannel(workloadManager, dataMonitor, systemMetricsWatcher, metricsStore, hbs, ansibleManager,
gracefulRebootChannel, deviceOs)

// All observers have been registered at this point. Ready to start listening to workload service events generated by changes to the systemd dbus
go workloadManager.ListenServiceEvents()

if err := s.Serve(l); err != nil {
log.Fatalf("cannot start worker server, err: %v", err)
}

}

func listenStartGracefulRebootChannel(wl *workload2.WorkloadManager, dataMonitor *datatransfer.Monitor,
func listenStartGracefulRebootChannel(wl *workload.WorkloadManager, dataMonitor *datatransfer.Monitor,
systemMetricsWatcher *metrics.SystemMetrics, metricsStore *metrics.TSDB, hbs *heartbeat2.Heartbeat,
ansibleManager *ansible.Manager, gracefulRebootChannel chan struct{}, deviceOs *os2.OS) {
// listen to the channel for getting StartGracefulReboot signal
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/digitalocean/godo v1.80.0 // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect
github.com/envoyproxy/protoc-gen-validate v0.6.7 // indirect
github.com/fsnotify/fsnotify v1.5.4
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-openapi/runtime v0.23.1 // indirect
Expand Down
5 changes: 3 additions & 2 deletions internal/ansible/mapping/mock_mapping.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (m *Manager) Update(message models.DeviceConfigurationMessage) error {
}

log.Infof("updating configuration. New config: %s\nOld config: %s", newJson, oldJson)
log.Debugf("[ConfigManager] observers :%v+", m.observers)
for _, observer := range m.observers {
err := observer.Update(message)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/metrics/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type SystemMetrics struct {
}

func NewSystemMetrics(daemon MetricsDaemon) (*SystemMetrics, error) {
nodeExporter, err := service.NewSystemd("node_exporter", nil, service.SystemBus, nil)
nodeExporter, err := service.NewSystemd("node_exporter", nil, service.SystemBus)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions internal/metrics/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (wrkM *WorkloadMetrics) WorkloadRemoved(workloadName string) {
}

func (wrkM *WorkloadMetrics) WorkloadStarted(workloadName string, report []*podman.PodReport) {
log.Infof("Starting target metrics for workload '%s'", workloadName)
for _, workload := range report {
cfg := wrkM.getWorkload(workloadName)
if cfg == nil {
Expand Down
150 changes: 124 additions & 26 deletions internal/service/event_listener.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package service

import (
"errors"
"fmt"
"os"
"path/filepath"
"strings"

"git.sr.ht/~spc/go-log"
"github.com/coreos/go-systemd/v22/dbus"
"github.com/fsnotify/fsnotify"
)

const (
// servicePostfixLength is the length of the string ".service". This value is used to extract the workload name from the service
// coming from systemd
servicePostfixLength = 8

EventStarted EventType = "started"
EventStopped EventType = "stopped"

systemdUserServicesDir = ".config/systemd/user"
defaultTargetWantsRelativePath = "default.target.wants/"

// To avoid confusion, we match the action verb tense coming from systemd sub state, even though it is not consistent
// The only addition to the list is the "removed" case, which we use when a service is removed
// from the system and we receive an empty unitStatus object.
Expand All @@ -34,6 +38,11 @@ const (
removed unitSubState = "removed"
)

var (
SystemdUserServicesFullPath = filepath.Join(os.Getenv("HOME"), systemdUserServicesDir)
enabledServicesFullPath = filepath.Join(SystemdUserServicesFullPath, defaultTargetWantsRelativePath)
)

type unitSubState string

type EventType string
Expand All @@ -43,41 +52,92 @@ type Event struct {
Type EventType
}

type EventListener struct {
type DBusEventListener struct {
observerCh chan *Event
set *dbus.SubscriptionSet
dbusCh <-chan map[string]*dbus.UnitStatus
dbusErrCh <-chan error
fsWatcher *fsnotify.Watcher
}

func NewEventListener() *EventListener {
return &EventListener{}
func NewDBusEventListener() *DBusEventListener {
return &DBusEventListener{}
}

func (e *EventListener) Connect() (chan *Event, error) {
func (e *DBusEventListener) Connect() (<-chan *Event, error) {
conn, err := newDbusConnection(UserBus)
if err != nil {
return nil, err
}
e.set = conn.NewSubscriptionSet()
e.observerCh = make(chan *Event)
if err := e.initializeSubscriptionSet(); err != nil {
return nil, err
}
e.observerCh = make(chan *Event, 1000)
return e.observerCh, nil
}

func (e *EventListener) Listen() chan *Event {
func (e *DBusEventListener) initializeSubscriptionSet() error {
_, err := os.Stat(enabledServicesFullPath)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return e.watchServiceDirectory()
}
return err
}
files, err := os.ReadDir(enabledServicesFullPath)
if err != nil {
return err
}
log.Infof("List of services to be monitored in %s:%s", enabledServicesFullPath, files)
for _, fd := range files {
f := filepath.Join(enabledServicesFullPath, fd.Name())
log.Debugf("Detected service file %s", f)
if !fileExist(f) {
log.Errorf("Hard link %s does not exist or broken link", fd.Name())
continue
}
e.Add(getServiceFileName(fd.Name()))
}
return e.watchServiceDirectory()
}

func (e *DBusEventListener) watchServiceDirectory() error {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
e.fsWatcher = watcher

_, err = os.Stat(SystemdUserServicesFullPath)
if err != nil {
return fmt.Errorf("systemd user directory not found: %s", err)
}

go e.watchFSEvents()
return e.fsWatcher.Add(SystemdUserServicesFullPath)
}

func (e *DBusEventListener) Listen() {
e.dbusCh, e.dbusErrCh = e.set.Subscribe()
for {
select {
case msg := <-e.dbusCh:
for name, unit := range msg {
log.Debugf("Captured event for %s: %v+", name, unit)
n := extractWorkloadName(name)
log.Debugf("Captured DBus event for %s: %v+", name, unit)
n, err := extractWorkloadName(name)
if err != nil {
log.Error(err)
continue
}
state := translateUnitSubStatus(unit)
log.Debugf("Systemd service for workload %s transitioned to sub state %s\n", n, state)
switch state {
case running:
log.Debugf("Sending start event to observer channel for workload %s", n)
e.observerCh <- &Event{WorkloadName: n, Type: EventStarted}
case removed, stop, dead, failed, start:
log.Debugf("Sending stop event to observer channel for workload %s", n)
e.observerCh <- &Event{WorkloadName: n, Type: EventStopped}
default:
log.Debugf("Ignoring unit sub state for service %s: %s", name, unit.SubState)
Expand All @@ -90,31 +150,69 @@ func (e *EventListener) Listen() chan *Event {

}

func (e *DBusEventListener) watchFSEvents() {
for {
select {
case event, ok := <-e.fsWatcher.Events:
if !ok {
log.Errorf("Error while watching for file system events at %s:%s", SystemdUserServicesFullPath, event)
continue
}
log.Debugf("Captured file system event:%s", event)
if !isAnEnabledService(event.Name) {
continue
}
svcName := getServiceFileName(event.Name)
switch {
case event.Op&fsnotify.Create == fsnotify.Create:
log.Infof("New service configuration detected at %s", event.Name)
e.Add(svcName)
case event.Op&fsnotify.Remove == fsnotify.Remove:
log.Infof("Service configuration removed at %s", event.Name)
e.Remove(svcName)
}
case err, ok := <-e.fsWatcher.Errors:
if !ok {
log.Errorf("Error detected on file system watcher: %s", err)
return
}
}
}
}

func translateUnitSubStatus(unit *dbus.UnitStatus) unitSubState {
if unit == nil {
return removed
}
return unitSubState(unit.SubState)
}

func (e *EventListener) Add(workloadName string) {
name := formatServiceName(workloadName)
log.Debugf("Adding service for events %s", name)
if !e.set.Contains(name) {
e.set.Add(name)
}
func (e *DBusEventListener) Add(serviceName string) {
log.Debugf("Adding service for event listener %s", serviceName)
e.set.Add(serviceName)
}

func (e *EventListener) Remove(workloadName string) {
name := formatServiceName(workloadName)
log.Debugf("Removing service for events %s", name)
e.set.Remove(name)
func (e *DBusEventListener) Remove(serviceName string) {
log.Debugf("Removing service from event listener %s", serviceName)
e.set.Remove(serviceName)
}

func extractWorkloadName(serviceName string) string {
return serviceName[:len(serviceName)-servicePostfixLength]
func extractWorkloadName(serviceName string) (string, error) {
if filepath.Ext(serviceName) != ServiceSuffix {
return "", fmt.Errorf("invalid file name or not a service %s", serviceName)
}
return strings.TrimSuffix(filepath.Base(serviceName), filepath.Ext(serviceName)), nil
}

func fileExist(path string) bool {
_, err := os.Stat(path)
return err == nil
}

func formatServiceName(workloadName string) string {
return fmt.Sprintf("%s.service", workloadName)
func isAnEnabledService(fullPath string) bool {
return filepath.Ext(fullPath) == ServiceSuffix && filepath.Dir(fullPath) == enabledServicesFullPath
}
func getServiceFileName(fullPath string) string {
_, filename := filepath.Split(fullPath)
return filename
}
Loading

0 comments on commit 623a857

Please sign in to comment.