From 960945b230daf04af40602fc7f78e29bb4d2c217 Mon Sep 17 00:00:00 2001 From: Kacper Zienkiewicz Date: Thu, 8 Aug 2024 11:18:01 +0200 Subject: [PATCH] [#61710] linux-client: generic goroutine scheduler for log collection --- devices/linux-client/telemetry/data.go | 6 + devices/linux-client/telemetry/telemetry.go | 199 ++++++++++++++++++ .../linux-client/telemetry/telemetry_test.go | 190 +++++++++++++++++ devices/linux-client/telemetry/writers.go | 120 +++++++++++ 4 files changed, 515 insertions(+) create mode 100644 devices/linux-client/telemetry/data.go create mode 100644 devices/linux-client/telemetry/telemetry.go create mode 100644 devices/linux-client/telemetry/telemetry_test.go create mode 100644 devices/linux-client/telemetry/writers.go diff --git a/devices/linux-client/telemetry/data.go b/devices/linux-client/telemetry/data.go new file mode 100644 index 0000000..2b31bb3 --- /dev/null +++ b/devices/linux-client/telemetry/data.go @@ -0,0 +1,6 @@ +package telemetry + +// Struct that's passed to a logger function within the LoggerContext +type LoggerArgs struct { + Placeholder bool // Easily extensible +} diff --git a/devices/linux-client/telemetry/telemetry.go b/devices/linux-client/telemetry/telemetry.go new file mode 100644 index 0000000..3e9fddd --- /dev/null +++ b/devices/linux-client/telemetry/telemetry.go @@ -0,0 +1,199 @@ +package telemetry + +import ( + "errors" + "fmt" + "sync" + "time" +) + +type LoggerContext struct { + Args LoggerArgs // Arguments + + Done <-chan struct{} // Should only be used by a function of type PersistentLoggerFunc + +} + +/*** Logger function types ***/ + +/* Recurring logger function */ +type RecurringLogger func(ctx LoggerContext) +/* +Purpose: + Designed for one-time, periodic data collection, + scheduled by the manager. +Bahavior: + Logs should be collected promptly and the function + should not block for extended periods. Prolonged blocking + can result in missed log intervals. +*/ + +/* Persistent logger function */ +type PersistentLogger func(ctx LoggerContext) +/* +Purpose: + Suitable for stream-based log data, where logging begins + once and continues until explicitly stopped. Not-scheduled, + needs to take care of its own termination. +Behavior: + Requires continuous cooperation with the scheduler. The logging + goroutine should terminate when `ctx.Done` is signaled. +*/ + +type Logger interface { + log(ctx LoggerContext) +} + +func (r RecurringLogger) log(ctx LoggerContext) { + r(ctx) +} + +func (p PersistentLogger) log(ctx LoggerContext) { + go p(ctx) +} + +type logTask struct { + done chan struct{} + interval time.Duration + logger Logger + args LoggerArgs + + mu sync.Mutex + running bool +} + +func (task *logTask) runRecurringLogger(ctx LoggerContext) { + ticks := time.Tick(task.interval) + +loop: + for { + select { + case <-ticks: + task.logger.log(ctx) + case <-task.done: + break loop + } + } +} + +func (task *logTask) runPersistentLogger(ctx LoggerContext) { + done := make(chan struct{}) + ctx.Done = done + task.logger.log(ctx) + +loop: + for { + select { + case <-task.done: + close(done) + break loop + } + } +} + +func (task *logTask) start() { + task.mu.Lock() + defer task.mu.Unlock() + + if task.running { + return + } + + task.done = make(chan struct{}) + task.running = true + ctx := LoggerContext{Args: task.args, Done: nil} + + switch task.logger.(type) { + case RecurringLogger: + go task.runRecurringLogger(ctx) + case PersistentLogger: + go task.runPersistentLogger(ctx) + } + + task.running = true +} + +func (task *logTask) stop() { + task.mu.Lock() + defer task.mu.Unlock() + + if !task.running { + return + } + close(task.done) + task.running = false +} + +func (task *logTask) isRunning() bool { + task.mu.Lock() + defer task.mu.Unlock() + return task.running +} + +type LogManager struct { + tasks map[string]*logTask + mu sync.Mutex +} + +func MakeLogManager() *LogManager { + return &LogManager{ + tasks: make(map[string]*logTask), + } +} + +func (manager *LogManager) AddTask(name string, logger Logger, interval ...time.Duration) error { + manager.mu.Lock() + defer manager.mu.Unlock() + + if _, exists := manager.tasks[name]; exists { + return errors.New(fmt.Sprintf("task \"%s\" already exists", name)) + } + var task *logTask + switch logger.(type) { + case RecurringLogger: + if len(interval) == 0 { + task = &logTask{logger: logger, interval: time.Second} + } else { + task = &logTask{logger: logger, interval: interval[0]} + } + case PersistentLogger: + task = &logTask{logger: logger, interval: 0} + } + manager.tasks[name] = task + + return nil +} + +func (manager *LogManager) StartTask(name string, args LoggerArgs) error { + manager.mu.Lock() + defer manager.mu.Unlock() + + if task, exists := manager.tasks[name]; exists { + if task.isRunning() { + return errors.New(fmt.Sprintf("task \"%s\" is already running", name)) + } + task.args = args + task.start() + } else { + return errors.New(fmt.Sprintf("task \"%s\" does not exist", name)) + } + + return nil +} + +func (manager *LogManager) StopTask(name string) error { + manager.mu.Lock() + defer manager.mu.Unlock() + + if task, exists := manager.tasks[name]; exists { + if task.isRunning() { + task.stop() + } else { + return errors.New(fmt.Sprintf("task \"%s\" is not running", name)) + } + } else { + return errors.New(fmt.Sprintf("task \"%s\" does not exist", name)) + } + + return nil +} diff --git a/devices/linux-client/telemetry/telemetry_test.go b/devices/linux-client/telemetry/telemetry_test.go new file mode 100644 index 0000000..46a34b4 --- /dev/null +++ b/devices/linux-client/telemetry/telemetry_test.go @@ -0,0 +1,190 @@ +package telemetry + +import ( + "fmt" + "math/rand" + "regexp" + "sync" + "testing" + "time" +) + +func nop() { + i := 0 + switch i { + case 0: + case 1: + fmt.Println("nop") + } +} + +func persistentLogger(ctx LoggerContext) { + ticks := time.Tick(time.Millisecond) + for { + select { + case <-ticks: + nop() + case <-ctx.Done: + return + } + } +} + +func recurringLogger(ctx LoggerContext) { + nop() +} + +func randBool() bool { + rand.Seed(time.Now().UnixNano()) + return rand.Intn(2) == 1 +} + +func TestDoesNotExist(t *testing.T) { + lm := MakeLogManager() + ld := LoggerArgs{} + + name := "DoesntExist" + want := regexp.MustCompile(`\btask "` + name + `" does not exist\b`) + + err := lm.StartTask(name, ld) + if err == nil || !want.MatchString(err.Error()) { + t.Fatalf(`.StartTask("DoesntExist") = %v, want match for %#q`, err, want) + } + + if err = lm.StopTask(name); err == nil || !want.MatchString(err.Error()) { + t.Fatalf(`.StopTask("DoesntExist") = %v, want match for %#q`, err, want) + } +} + +func TestNotRunning(t *testing.T) { + lm := MakeLogManager() + + name := "NotRunning" + want := regexp.MustCompile(`\btask "` + name + `" is not running\b`) + var rl RecurringLogger = func(ctx LoggerContext) { + nop() + } + + lm.AddTask(name, rl) + if err := lm.StopTask(name); err == nil || !want.MatchString(err.Error()) { + t.Fatalf(`.StopTask("NotRunning") = %v, want match for %#q`, err, want) + + } +} + +func TestAlreadyRunning(t *testing.T) { + lm := MakeLogManager() + ld := LoggerArgs{} + + name := "AlreadyRunning" + want := regexp.MustCompile(`\btask "` + name + `" is already running\b`) + var rl RecurringLogger = func(ctx LoggerContext) { + nop() + } + + lm.AddTask(name, rl) + lm.StartTask(name, ld) + if err := lm.StartTask(name, ld); err == nil || !want.MatchString(err.Error()) { + t.Fatalf(`.StartTask("AlreadyRunning") = %v, want match for %#q`, err, want) + } + lm.StopTask(name) +} + +func TestAlreadyExists(t *testing.T) { + lm := MakeLogManager() + + name := "AlreadyExists" + want := regexp.MustCompile(`\btask "` + name + `" already exists\b`) + var rl RecurringLogger = func(ctx LoggerContext) { + nop() + } + lm.AddTask(name, rl) + if err := lm.AddTask(name, rl); err == nil || !want.MatchString(err.Error()) { + t.Fatalf(`.AddTask("AlreadyExists") = %v, want match for %#q`, err, want) + } +} + +func FuzzRecurringLogger(f *testing.F) { + lm := MakeLogManager() + ld := LoggerArgs{} + var rl RecurringLogger = recurringLogger + + f.Add("name") + f.Fuzz(func(t *testing.T, name string) { + lm.AddTask(name, rl) + lm.StartTask(name, ld) + lm.StopTask(name) + }) +} + +func FuzzPersistentLogger(f *testing.F) { + lm := MakeLogManager() + ld := LoggerArgs{} + var pl PersistentLogger = persistentLogger + + f.Add("name") + f.Fuzz(func(t *testing.T, name string) { + lm.AddTask(name, pl) + lm.StartTask(name, ld) + lm.StopTask(name) + }) +} + +func FuzzRandomOrder(f *testing.F) { + lm := MakeLogManager() + + rand.Seed(time.Now().UnixNano()) + + funcs := []func(*LogManager, string){ + add, + start, + stop, + } + + f.Add("name") + f.Fuzz(func(t *testing.T, name string) { + var wg sync.WaitGroup + shuffledFuncs := shuffleFuncs(funcs) + + for _, f := range shuffledFuncs { + wg.Add(1) + go func(f func(*LogManager, string)) { + defer wg.Done() + f(lm, name) + if rand.Float64() < 0.5 { + f(lm, name) + } + }(f) + } + wg.Wait() + }) + +} + +func add(lm *LogManager, name string) { + if randBool() { + var pl PersistentLogger = persistentLogger + lm.AddTask(name, pl) + } else { + var rl RecurringLogger = recurringLogger + lm.AddTask(name, rl) + } +} + +func start(lm *LogManager, name string) { + ld := LoggerArgs{} + lm.StartTask(name, ld) +} + +func stop(lm *LogManager, name string) { + lm.StopTask(name) +} + +func shuffleFuncs(funcs []func(*LogManager, string)) []func(*LogManager, string) { + shuffled := make([]func(*LogManager, string), len(funcs)) + copy(shuffled, funcs) + rand.Shuffle(len(shuffled), func(i, j int) { + shuffled[i], shuffled[j] = shuffled[j], shuffled[i] + }) + return shuffled +} diff --git a/devices/linux-client/telemetry/writers.go b/devices/linux-client/telemetry/writers.go new file mode 100644 index 0000000..83d2b72 --- /dev/null +++ b/devices/linux-client/telemetry/writers.go @@ -0,0 +1,120 @@ +package telemetry + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "regexp" + "sort" + "time" +) + +type SizeLimitWriter struct { + filePath string + maxSize uint64 + maxFiles uint64 + fp *os.File + closed bool +} + +func MakeSizeLimitWriter(filePath string, maxSize uint64, maxFiles uint64) (*SizeLimitWriter, error) { + fp, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nil, err + } + + w := SizeLimitWriter{filePath: filePath, maxSize: maxSize, maxFiles: maxFiles, fp: fp} + return &w, nil +} + +func (w *SizeLimitWriter) Write(output []byte) (int, error) { + // If .Closed() was called, do nothing + if w.closed { + return 0, errors.New(".Close() was already called") + } + + // Check if the buffer size is smaller than max filesize + if uint64(len(output)) > w.maxSize { + return 0, errors.New("input buffer larger than the maximum size of the log file") + } + + // Check if file is too big to fit the new buffer, if yes then rotate + fi, err := w.fp.Stat() + if err != nil { + return 0, err + } + if uint64(fi.Size()) + uint64(len(output)) > w.maxSize { + if w.fp, err = rotate(w.fp, w.filePath, w.maxFiles); err != nil { + return 0, err + } + } + + // Do the write + n, err := w.fp.Write(output) + return n, err +} + +func (w *SizeLimitWriter) Close() { + w.fp.Close() + w.closed = true +} + +func rotate(fp *os.File, filePath string, maxFiles uint64) (*os.File, error) { + // Close the current file + if fp != nil { + err := fp.Close() + fp = nil + if err != nil { + return fp, err + } + } + + // Rename the old file to be concatenated with Unix epoch + _, err := os.Stat(filePath) + if err == nil { + err = os.Rename(filePath, filePath+"."+fmt.Sprintf("%d", time.Now().Unix())) + if err != nil { + return fp, err + } + } + + // List the files in the log directory + dir := filepath.Dir(filePath) + entries, err := os.ReadDir(filepath.Dir(dir)) + if err != nil { + return fp, nil + } + + // Regexp to match log filename with Unix epoch + re := regexp.MustCompile(regexp.QuoteMeta(filepath.Base(filePath)) + `\.[0-9]+`) + + // Remove all that don't match + directories + i := 0 + for _, e := range entries { + s := e.Name() + fi, _ := os.Stat(dir + "/" + s) + if !fi.IsDir() && re.MatchString(s) { + entries[i] = e + i++ + } + } + for j := i; j < len(entries); j++ { + entries[j] = nil + } + entries = entries[:i] + + // Sort and remove old ones if there's too many of them + if uint64(len(entries)) >= maxFiles { + sort.Slice(entries, func(i, j int) bool { + return entries[i].Name() < entries[j].Name() + }) + for i := 0; uint64(i) != uint64(len(entries))-maxFiles+1; i++ { + os.Remove(dir + "/" + entries[i].Name()) + } + } + + // Create a new file for writing and return + fp, err = os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY, 0644) + return fp, err +}