Skip to content

Commit

Permalink
[#61710] linux-client: generic goroutine scheduler for log collection
Browse files Browse the repository at this point in the history
  • Loading branch information
Kacper Zienkiewicz committed Aug 8, 2024
1 parent 78cbbb6 commit 960945b
Show file tree
Hide file tree
Showing 4 changed files with 515 additions and 0 deletions.
6 changes: 6 additions & 0 deletions devices/linux-client/telemetry/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package telemetry

// Struct that's passed to a logger function within the LoggerContext
type LoggerArgs struct {
Placeholder bool // Easily extensible
}
199 changes: 199 additions & 0 deletions devices/linux-client/telemetry/telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package telemetry

import (
"errors"
"fmt"
"sync"
"time"
)

type LoggerContext struct {
Args LoggerArgs // Arguments

Done <-chan struct{} // Should only be used by a function of type PersistentLoggerFunc

}

/*** Logger function types ***/

/* Recurring logger function */
type RecurringLogger func(ctx LoggerContext)
/*
Purpose:
Designed for one-time, periodic data collection,
scheduled by the manager.
Bahavior:
Logs should be collected promptly and the function
should not block for extended periods. Prolonged blocking
can result in missed log intervals.
*/

/* Persistent logger function */
type PersistentLogger func(ctx LoggerContext)
/*
Purpose:
Suitable for stream-based log data, where logging begins
once and continues until explicitly stopped. Not-scheduled,
needs to take care of its own termination.
Behavior:
Requires continuous cooperation with the scheduler. The logging
goroutine should terminate when `ctx.Done` is signaled.
*/

type Logger interface {
log(ctx LoggerContext)
}

func (r RecurringLogger) log(ctx LoggerContext) {
r(ctx)
}

func (p PersistentLogger) log(ctx LoggerContext) {
go p(ctx)
}

type logTask struct {
done chan struct{}
interval time.Duration
logger Logger
args LoggerArgs

mu sync.Mutex
running bool
}

func (task *logTask) runRecurringLogger(ctx LoggerContext) {
ticks := time.Tick(task.interval)

loop:
for {
select {
case <-ticks:
task.logger.log(ctx)
case <-task.done:
break loop
}
}
}

func (task *logTask) runPersistentLogger(ctx LoggerContext) {
done := make(chan struct{})
ctx.Done = done
task.logger.log(ctx)

loop:
for {
select {
case <-task.done:
close(done)
break loop
}
}
}

func (task *logTask) start() {
task.mu.Lock()
defer task.mu.Unlock()

if task.running {
return
}

task.done = make(chan struct{})
task.running = true
ctx := LoggerContext{Args: task.args, Done: nil}

switch task.logger.(type) {
case RecurringLogger:
go task.runRecurringLogger(ctx)
case PersistentLogger:
go task.runPersistentLogger(ctx)
}

task.running = true
}

func (task *logTask) stop() {
task.mu.Lock()
defer task.mu.Unlock()

if !task.running {
return
}
close(task.done)
task.running = false
}

func (task *logTask) isRunning() bool {
task.mu.Lock()
defer task.mu.Unlock()
return task.running
}

type LogManager struct {
tasks map[string]*logTask
mu sync.Mutex
}

func MakeLogManager() *LogManager {
return &LogManager{
tasks: make(map[string]*logTask),
}
}

func (manager *LogManager) AddTask(name string, logger Logger, interval ...time.Duration) error {
manager.mu.Lock()
defer manager.mu.Unlock()

if _, exists := manager.tasks[name]; exists {
return errors.New(fmt.Sprintf("task \"%s\" already exists", name))
}
var task *logTask
switch logger.(type) {
case RecurringLogger:
if len(interval) == 0 {
task = &logTask{logger: logger, interval: time.Second}
} else {
task = &logTask{logger: logger, interval: interval[0]}
}
case PersistentLogger:
task = &logTask{logger: logger, interval: 0}
}
manager.tasks[name] = task

return nil
}

func (manager *LogManager) StartTask(name string, args LoggerArgs) error {
manager.mu.Lock()
defer manager.mu.Unlock()

if task, exists := manager.tasks[name]; exists {
if task.isRunning() {
return errors.New(fmt.Sprintf("task \"%s\" is already running", name))
}
task.args = args
task.start()
} else {
return errors.New(fmt.Sprintf("task \"%s\" does not exist", name))
}

return nil
}

func (manager *LogManager) StopTask(name string) error {
manager.mu.Lock()
defer manager.mu.Unlock()

if task, exists := manager.tasks[name]; exists {
if task.isRunning() {
task.stop()
} else {
return errors.New(fmt.Sprintf("task \"%s\" is not running", name))
}
} else {
return errors.New(fmt.Sprintf("task \"%s\" does not exist", name))
}

return nil
}
190 changes: 190 additions & 0 deletions devices/linux-client/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package telemetry

import (
"fmt"
"math/rand"
"regexp"
"sync"
"testing"
"time"
)

func nop() {
i := 0
switch i {
case 0:
case 1:
fmt.Println("nop")
}
}

func persistentLogger(ctx LoggerContext) {
ticks := time.Tick(time.Millisecond)
for {
select {
case <-ticks:
nop()
case <-ctx.Done:
return
}
}
}

func recurringLogger(ctx LoggerContext) {
nop()
}

func randBool() bool {
rand.Seed(time.Now().UnixNano())
return rand.Intn(2) == 1
}

func TestDoesNotExist(t *testing.T) {
lm := MakeLogManager()
ld := LoggerArgs{}

name := "DoesntExist"
want := regexp.MustCompile(`\btask "` + name + `" does not exist\b`)

err := lm.StartTask(name, ld)
if err == nil || !want.MatchString(err.Error()) {
t.Fatalf(`.StartTask("DoesntExist") = %v, want match for %#q`, err, want)
}

if err = lm.StopTask(name); err == nil || !want.MatchString(err.Error()) {
t.Fatalf(`.StopTask("DoesntExist") = %v, want match for %#q`, err, want)
}
}

func TestNotRunning(t *testing.T) {
lm := MakeLogManager()

name := "NotRunning"
want := regexp.MustCompile(`\btask "` + name + `" is not running\b`)
var rl RecurringLogger = func(ctx LoggerContext) {
nop()
}

lm.AddTask(name, rl)
if err := lm.StopTask(name); err == nil || !want.MatchString(err.Error()) {
t.Fatalf(`.StopTask("NotRunning") = %v, want match for %#q`, err, want)

}
}

func TestAlreadyRunning(t *testing.T) {
lm := MakeLogManager()
ld := LoggerArgs{}

name := "AlreadyRunning"
want := regexp.MustCompile(`\btask "` + name + `" is already running\b`)
var rl RecurringLogger = func(ctx LoggerContext) {
nop()
}

lm.AddTask(name, rl)
lm.StartTask(name, ld)
if err := lm.StartTask(name, ld); err == nil || !want.MatchString(err.Error()) {
t.Fatalf(`.StartTask("AlreadyRunning") = %v, want match for %#q`, err, want)
}
lm.StopTask(name)
}

func TestAlreadyExists(t *testing.T) {
lm := MakeLogManager()

name := "AlreadyExists"
want := regexp.MustCompile(`\btask "` + name + `" already exists\b`)
var rl RecurringLogger = func(ctx LoggerContext) {
nop()
}
lm.AddTask(name, rl)
if err := lm.AddTask(name, rl); err == nil || !want.MatchString(err.Error()) {
t.Fatalf(`.AddTask("AlreadyExists") = %v, want match for %#q`, err, want)
}
}

func FuzzRecurringLogger(f *testing.F) {
lm := MakeLogManager()
ld := LoggerArgs{}
var rl RecurringLogger = recurringLogger

f.Add("name")
f.Fuzz(func(t *testing.T, name string) {
lm.AddTask(name, rl)
lm.StartTask(name, ld)
lm.StopTask(name)
})
}

func FuzzPersistentLogger(f *testing.F) {
lm := MakeLogManager()
ld := LoggerArgs{}
var pl PersistentLogger = persistentLogger

f.Add("name")
f.Fuzz(func(t *testing.T, name string) {
lm.AddTask(name, pl)
lm.StartTask(name, ld)
lm.StopTask(name)
})
}

func FuzzRandomOrder(f *testing.F) {
lm := MakeLogManager()

rand.Seed(time.Now().UnixNano())

funcs := []func(*LogManager, string){
add,
start,
stop,
}

f.Add("name")
f.Fuzz(func(t *testing.T, name string) {
var wg sync.WaitGroup
shuffledFuncs := shuffleFuncs(funcs)

for _, f := range shuffledFuncs {
wg.Add(1)
go func(f func(*LogManager, string)) {
defer wg.Done()
f(lm, name)
if rand.Float64() < 0.5 {
f(lm, name)
}
}(f)
}
wg.Wait()
})

}

func add(lm *LogManager, name string) {
if randBool() {
var pl PersistentLogger = persistentLogger
lm.AddTask(name, pl)
} else {
var rl RecurringLogger = recurringLogger
lm.AddTask(name, rl)
}
}

func start(lm *LogManager, name string) {
ld := LoggerArgs{}
lm.StartTask(name, ld)
}

func stop(lm *LogManager, name string) {
lm.StopTask(name)
}

func shuffleFuncs(funcs []func(*LogManager, string)) []func(*LogManager, string) {
shuffled := make([]func(*LogManager, string), len(funcs))
copy(shuffled, funcs)
rand.Shuffle(len(shuffled), func(i, j int) {
shuffled[i], shuffled[j] = shuffled[j], shuffled[i]
})
return shuffled
}
Loading

0 comments on commit 960945b

Please sign in to comment.