Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to use systemd events #217

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 33 additions & 23 deletions cmd/device-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
os2 "github.com/project-flotta/flotta-device-worker/internal/os"
registration2 "github.com/project-flotta/flotta-device-worker/internal/registration"
"github.com/project-flotta/flotta-device-worker/internal/server"
workload2 "github.com/project-flotta/flotta-device-worker/internal/workload"
"github.com/project-flotta/flotta-device-worker/internal/service"
"github.com/project-flotta/flotta-device-worker/internal/workload"

"net"
"os"
Expand All @@ -33,17 +34,21 @@ import (
"google.golang.org/grpc/credentials/insecure"
)

var yggdDispatchSocketAddr string
var gracefulRebootChannel chan struct{}

const (
defaultDataDir = "/var/local/yggdrasil"
defaultDataDir = "/var/local/yggdrasil"
systemdUserServicesDir = ".config/systemd/user"
)

var (
yggdDispatchSocketAddr string
gracefulRebootChannel chan struct{}
systemdUserServicesFullPath = filepath.Join(os.Getenv("HOME"), systemdUserServicesDir)
)

func initSystemdDirectory() error {
systemddir := filepath.Join(os.Getenv("HOME"), ".config/systemd/user/")

// init the flotta user systemd units directory
err := os.MkdirAll(systemddir, 0750)
err := os.MkdirAll(systemdUserServicesFullPath, 0750)
if err != nil {
return err
}
Expand Down Expand Up @@ -149,6 +154,12 @@ func main() {
}
configManager := configuration2.NewConfigurationManager(dataDir)

// Systemd event dbusEventListener
dbusEventListener := service.NewDBusEventListener()
// Start listening to systemd bus events. These events generated from here can start being processed once all observers have been initialized, otherwise
// we risk in missing observers whilst processing the events.
configManager.RegisterObserver(dbusEventListener)

// --- Client metrics configuration ---
metricsStore, err := metrics.NewTSDB(dataDir)
if err != nil {
Expand All @@ -170,16 +181,16 @@ func main() {
dataTransferWatcher := metrics.NewDataTransferMetrics(metricsDaemon)
configManager.RegisterObserver(dataTransferWatcher)

wl, err := workload2.NewWorkloadManager(dataDir, deviceId)
workloadManager, err := workload.NewWorkloadManager(dataDir, deviceId, dbusEventListener.GetEventChannel())
if err != nil {
log.Fatalf("cannot start Workload Manager. DeviceID: %s; err: %v", deviceId, err)
}

logsWrapper := logs.NewWorkloadsLogsTarget(wl)
logsWrapper := logs.NewWorkloadsLogsTarget(workloadManager)
configManager.RegisterObserver(logsWrapper)
wl.RegisterObserver(logsWrapper)
configManager.RegisterObserver(wl)
wl.RegisterObserver(workloadMetricWatcher)
workloadManager.RegisterObserver(logsWrapper)
configManager.RegisterObserver(workloadManager)
workloadManager.RegisterObserver(workloadMetricWatcher)

remoteWrite := metrics.NewRemoteWrite(dataDir, deviceId, metricsStore)
configManager.RegisterObserver(remoteWrite)
Expand All @@ -197,25 +208,21 @@ func main() {
}
configManager.RegisterObserver(mountManager)

dataMonitor := datatransfer.NewMonitor(wl, configManager)
wl.RegisterObserver(dataMonitor)
dataMonitor := datatransfer.NewMonitor(workloadManager, configManager)
workloadManager.RegisterObserver(dataMonitor)
configManager.RegisterObserver(dataMonitor)
dataMonitor.Start()

if err != nil {
log.Fatalf("cannot start metrics store. DeviceID: %s; err: %v", deviceId, err)
}

reg, err := registration2.NewRegistration(deviceId, &hw, dispatcherClient, configManager, wl)
reg, err := registration2.NewRegistration(deviceId, &hw, dispatcherClient, configManager, workloadManager)
if err != nil {
log.Fatalf("cannot start registration process: DeviceID: %s; err: %v", deviceId, err)
}

hbs := heartbeat2.NewHeartbeatService(dispatcherClient, configManager, wl, &hw, dataMonitor, deviceOs, reg)
hbs := heartbeat2.NewHeartbeatService(dispatcherClient, configManager, workloadManager, &hw, dataMonitor, deviceOs, reg)
configManager.RegisterObserver(hbs)

reg.DeregisterLater(
wl,
workloadManager,
configManager,
hbs,
dataMonitor,
Expand Down Expand Up @@ -247,16 +254,19 @@ func main() {

setupSignalHandler(metricsStore, ansibleManager)

go listenStartGracefulRebootChannel(wl, dataMonitor, systemMetricsWatcher, metricsStore, hbs, ansibleManager,
go listenStartGracefulRebootChannel(workloadManager, dataMonitor, systemMetricsWatcher, metricsStore, hbs, ansibleManager,
gracefulRebootChannel, deviceOs)

// All observers have been registered at this point. Ready to start listening to workload service events generated by changes to the systemd dbus
go workloadManager.ListenServiceEvents()

if err := s.Serve(l); err != nil {
log.Fatalf("cannot start worker server, err: %v", err)
}

}

func listenStartGracefulRebootChannel(wl *workload2.WorkloadManager, dataMonitor *datatransfer.Monitor,
func listenStartGracefulRebootChannel(wl *workload.WorkloadManager, dataMonitor *datatransfer.Monitor,
systemMetricsWatcher *metrics.SystemMetrics, metricsStore *metrics.TSDB, hbs *heartbeat2.Heartbeat,
ansibleManager *ansible.Manager, gracefulRebootChannel chan struct{}, deviceOs *os2.OS) {
// listen to the channel for getting StartGracefulReboot signal
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/digitalocean/godo v1.80.0 // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect
github.com/envoyproxy/protoc-gen-validate v0.6.7 // indirect
github.com/fsnotify/fsnotify v1.5.4
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-openapi/runtime v0.23.1 // indirect
Expand Down
6 changes: 4 additions & 2 deletions internal/ansible/ansible_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,10 @@ var _ = Describe("Ansible Runner", func() {
modTime1 := time.Now().Add(-3 * time.Hour)
modTime2 := time.Now().Add(-2 * time.Hour)

p1Sha := ansibleManager.MappingRepository.GetSha256(p1)
p2Sha := ansibleManager.MappingRepository.GetSha256(p2)
p1Sha, err := ansibleManager.MappingRepository.GetSha256(p1)
Expect(err).ToNot(HaveOccurred())
p2Sha, err := ansibleManager.MappingRepository.GetSha256(p2)
Expect(err).ToNot(HaveOccurred())

p1Path := path.Join(configDir, p1Sha)
p2Path := path.Join(configDir, p2Sha)
Expand Down
25 changes: 18 additions & 7 deletions internal/ansible/mapping/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type mapping struct {

//go:generate mockgen -package=mapping -destination=mock_mapping.go . MappingRepository
type MappingRepository interface {
GetSha256(fileContent []byte) string
GetSha256(fileContent []byte) (string, error)
Add(fileContent []byte, modTime time.Time) error
Remove(fileContent []byte) error
RemoveMappingFile() error
Expand Down Expand Up @@ -85,17 +85,24 @@ func (m *mappingRepository) GetAll() map[int]string {
return all
}

func (m *mappingRepository) GetSha256(fileContent []byte) string {
func (m *mappingRepository) GetSha256(fileContent []byte) (string, error) {
h := sha256.New()
h.Write(fileContent)
return fmt.Sprintf("%x", h.Sum(nil))
_, err := h.Write(fileContent)
if err != nil {
return "", err
}
return fmt.Sprintf("%x", h.Sum(nil)), nil
}

func (m *mappingRepository) Add(fileContent []byte, modTime time.Time) error {
m.lock.Lock()
defer m.lock.Unlock()
filePath := path.Join(m.configDir, m.GetSha256(fileContent))
err := os.WriteFile(filePath, []byte(fileContent), 0600)
sha, err := m.GetSha256(fileContent)
if err != nil {
return err
}
filePath := path.Join(m.configDir, sha)
err = os.WriteFile(filePath, []byte(fileContent), 0600)

if err != nil {
return err
Expand All @@ -111,7 +118,11 @@ func (m *mappingRepository) Remove(fileContent []byte) error {
m.lock.Lock()
defer m.lock.Unlock()

filePath := path.Join(m.configDir, m.GetSha256(fileContent))
sha, err := m.GetSha256(fileContent)
if err != nil {
return err
}
filePath := path.Join(m.configDir, sha)
modTime := m.pathToModTime[filePath]
delete(m.modTimeToPath, modTime)
delete(m.pathToModTime, filePath)
Expand Down
20 changes: 14 additions & 6 deletions internal/ansible/mapping/mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,19 @@ var _ = Describe("Mapping", func() {
repo, err = mapping.NewMappingRepository(dir)
Expect(err).ToNot(HaveOccurred())

sha256Test = repo.GetSha256([]byte("test"))
sha256Test, err = repo.GetSha256([]byte("test"))
Expect(err).ToNot(HaveOccurred())
filePathTest = path.Join(configDir, sha256Test)
})
AfterEach(func() {
err := repo.RemoveMappingFile()
Expect(err).ToNot(HaveOccurred())
})
It("sha256 Generation", func() {
s1 := repo.GetSha256([]byte("AAA"))
s2 := repo.GetSha256([]byte("AAA"))
s1, err := repo.GetSha256([]byte("AAA"))
Expect(err).ToNot(HaveOccurred())
s2, err := repo.GetSha256([]byte("AAA"))
Expect(err).ToNot(HaveOccurred())
Expect(s1).To(Equal(s2))
})
It("Should be created empty", func() {
Expand Down Expand Up @@ -86,12 +89,17 @@ var _ = Describe("Mapping", func() {

It("Should persist mappings", func() {
// given
filePath1 := path.Join(configDir, repo.GetSha256([]byte("test-one")))
filePath2 := path.Join(configDir, repo.GetSha256([]byte("test-two")))
sha, err := repo.GetSha256([]byte("test-one"))
Expect(err).ToNot(HaveOccurred())
filePath1 := path.Join(configDir, sha)
Expect(err).ToNot(HaveOccurred())
sha, err = repo.GetSha256([]byte("test-two"))
filePath2 := path.Join(configDir, sha)
Expect(err).ToNot(HaveOccurred())
modTime1 := time.Now()
modTime2 := modTime1.Add(1 * time.Minute)

err := repo.Add([]byte("test-one"), modTime1)
err = repo.Add([]byte("test-one"), modTime1)
Expect(err).ToNot(HaveOccurred())
Expect(repo.GetModTime(filePath1)).To(Equal(modTime1.UnixNano()))
Expect(repo.GetFilePath(modTime1)).To(Equal(filePath1))
Expand Down
5 changes: 3 additions & 2 deletions internal/ansible/mapping/mock_mapping.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion internal/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var (
type Observer interface {
Init(configuration models.DeviceConfigurationMessage) error
Update(configuration models.DeviceConfigurationMessage) error
String() string
}

type Manager struct {
Expand All @@ -48,10 +49,10 @@ type Manager struct {
func NewConfigurationManager(dataDir string) *Manager {
deviceConfigFile := path.Join(dataDir, "device-config.json")
log.Infof("device config file: %s", deviceConfigFile)
file, err := os.ReadFile(deviceConfigFile) //#nosec
var deviceConfiguration models.DeviceConfigurationMessage
initialConfig := atomic.Value{}
initialConfig.Store(false)
file, err := os.ReadFile(deviceConfigFile) //#nosec
if err != nil {
log.Error(err)
deviceConfiguration = defaultDeviceConfigurationMessage
Expand Down Expand Up @@ -146,6 +147,7 @@ func (m *Manager) Update(message models.DeviceConfigurationMessage) error {
}

log.Infof("updating configuration. New config: %s\nOld config: %s", newJson, oldJson)
log.Debugf("[ConfigManager] observers :%v+", m.observers)
for _, observer := range m.observers {
err := observer.Update(message)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions internal/configuration/configuration_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 0 additions & 40 deletions internal/heartbeat/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"reflect"
"sync"
"time"

"github.com/openshift/assisted-installer-agent/src/util"
"github.com/project-flotta/flotta-device-worker/internal/ansible"
os2 "github.com/project-flotta/flotta-device-worker/internal/os"
"github.com/project-flotta/flotta-device-worker/internal/registration"
Expand Down Expand Up @@ -759,44 +757,6 @@ func (d *DispatcherFailing) GetConfig(ctx context.Context, in *pb.Empty, opts ..
return nil, nil
}

func NewFilledInterfaceMock(mtu int, name string, macAddr string, flags net.Flags, addrs []string, isPhysical bool, isBonding bool, isVlan bool, speedMbps int64) *util.MockInterface {
hwAddr, _ := net.ParseMAC(macAddr)
ret := util.MockInterface{}
ret.On("IsPhysical").Return(isPhysical)
if isPhysical || isBonding || isVlan {
ret.On("Name").Return(name)
ret.On("MTU").Return(mtu)
ret.On("HardwareAddr").Return(hwAddr)
ret.On("Flags").Return(flags)
ret.On("Addrs").Return(toAddresses(addrs), nil)
ret.On("SpeedMbps").Return(speedMbps)
}
if !isPhysical {
ret.On("IsBonding").Return(isBonding)
}
if !(isPhysical || isBonding) {
ret.On("IsVlan").Return(isVlan)
}

return &ret
}

func toAddresses(addrs []string) []net.Addr {
ret := make([]net.Addr, 0)
for _, a := range addrs {
ret = append(ret, str2Addr(a))
}
return ret
}

func str2Addr(addrStr string) net.Addr {
ip, ipnet, err := net.ParseCIDR(addrStr)
if err != nil {
return &net.IPNet{}
}
return &net.IPNet{IP: ip, Mask: ipnet.Mask}
}

func initHwMock(hwMock *hardware.MockHardware, configManager *configuration.Manager, hostname string, ipv4 []string) (*gomock.Call, *gomock.Call, *gomock.Call) {
var m models.HardwareInfo
configManager.GetDeviceConfiguration().Heartbeat.HardwareProfile.Scope = heartbeat.ScopeDelta
Expand Down
4 changes: 4 additions & 0 deletions internal/logs/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,7 @@ func (w *WorkloadsLogsTarget) WorkloadStarted(workloadName string, report []*pod
log.Error("Cannot start workload logs", err)
}
}

func (w *WorkloadsLogsTarget) String() string {
return "workload logs"
}
4 changes: 4 additions & 0 deletions internal/metrics/remote_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,3 +532,7 @@ func toTimeSeries(series []Series) (timeSeries []prompb.TimeSeries, lowest time.
highest = fromDbTime(maxt)
return
}

func (r *RemoteWrite) String() string {
return "remote write"
}
2 changes: 1 addition & 1 deletion internal/metrics/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type SystemMetrics struct {
}

func NewSystemMetrics(daemon MetricsDaemon) (*SystemMetrics, error) {
nodeExporter, err := service.NewSystemdRootless("node_exporter", nil, false)
nodeExporter, err := service.NewSystemd("node_exporter", nil, service.SystemBus)
if err != nil {
return nil, err
}
Expand Down
Loading