Skip to content

Commit

Permalink
Merge pull request #224 from safing/fix/metrics-modules-api
Browse files Browse the repository at this point in the history
Fix metrics, modules, api
  • Loading branch information
dhaavi authored Oct 13, 2023
2 parents 7872911 + be48ba3 commit 83b7095
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 116 deletions.
2 changes: 1 addition & 1 deletion api/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func authenticateRequest(w http.ResponseWriter, r *http.Request, targetHandler h
switch requiredPermission { //nolint:exhaustive
case NotFound:
// Not found.
tracer.Trace("api: authenticated handler reported: not found")
tracer.Debug("api: no API endpoint registered for this path")
http.Error(w, "Not found.", http.StatusNotFound)
return nil
case NotSupported:
Expand Down
1 change: 1 addition & 0 deletions api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func (mh *mainHandler) handle(w http.ResponseWriter, r *http.Request) error {
http.Error(lrw, "Method not allowed.", http.StatusMethodNotAllowed)
return nil
default:
tracer.Debug("api: no handler registered for this path")
http.Error(lrw, "Not found.", http.StatusNotFound)
return nil
}
Expand Down
5 changes: 0 additions & 5 deletions metrics/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/safing/portbase/api"
"github.com/safing/portbase/config"
"github.com/safing/portbase/log"
"github.com/safing/portbase/utils"
)

func registerAPI() error {
Expand Down Expand Up @@ -140,11 +139,7 @@ func writeMetricsTo(ctx context.Context, url string) error {
)
}

var metricsPusherDone = utils.NewBroadcastFlag()

func metricsWriter(ctx context.Context) error {
defer metricsPusherDone.NotifyAndReset()

pushURL := pushOption()
ticker := module.NewSleepyTicker(1*time.Minute, 0)
defer ticker.Stop()
Expand Down
2 changes: 1 addition & 1 deletion metrics/metrics_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

const hostStatTTL = 1 * time.Second

func registeHostMetrics() (err error) {
func registerHostMetrics() (err error) {
// Register load average metrics.
_, err = NewGauge("host/load/avg/1", nil, getFloat64HostStat(LoadAvg1), &Options{Name: "Host Load Avg 1min", Permission: api.PermitUser})
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions metrics/metrics_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package metrics
import (
"runtime"
"strings"
"sync/atomic"

"github.com/safing/portbase/info"
)

var reportedStart atomic.Bool

func registerInfoMetric() error {
meta := info.GetInfo()
_, err := NewGauge(
Expand All @@ -26,6 +29,10 @@ func registerInfoMetric() error {
"comment": commentOption(),
},
func() float64 {
// Report as 0 the first time in order to detect (re)starts.
if reportedStart.CompareAndSwap(false, true) {
return 0
}
return 1
},
nil,
Expand Down
2 changes: 1 addition & 1 deletion metrics/metrics_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"github.com/safing/portbase/log"
)

func registeLogMetrics() (err error) {
func registerLogMetrics() (err error) {
_, err = NewFetchingCounter(
"logs/warning/total",
nil,
Expand Down
24 changes: 12 additions & 12 deletions metrics/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"sort"
"sync"
"time"

"github.com/safing/portbase/modules"
)
Expand Down Expand Up @@ -59,11 +58,11 @@ func start() error {
return err
}

if err := registeHostMetrics(); err != nil {
if err := registerHostMetrics(); err != nil {
return err
}

if err := registeLogMetrics(); err != nil {
if err := registerLogMetrics(); err != nil {
return err
}

Expand All @@ -82,16 +81,13 @@ func stop() error {
// Wait until the metrics pusher is done, as it may have started reporting
// and may report a higher number than we store to disk. For persistent
// metrics it can then happen that the first report is lower than the
// previous report, making prometheus think that al that happened since the
// previous report, making prometheus think that all that happened since the
// last report, due to the automatic restart detection.
done := metricsPusherDone.NewFlag()
done.Refresh()
if !done.IsSet() {
select {
case <-done.Signal():
case <-time.After(10 * time.Second):
}
}

// The registry is read locked when writing metrics.
// Write lock the registry to make sure all writes are finished.
registryLock.Lock()
registryLock.Unlock() //nolint:staticcheck

storePersistentMetrics()

Expand Down Expand Up @@ -120,6 +116,10 @@ func register(m Metric) error {
// Set flag that first metric is now registered.
firstMetricRegistered = true

if module.Status() < modules.StatusStarting {
return fmt.Errorf("registering metric %q too early", m.ID())
}

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions metrics/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var (
})

// ErrAlreadyInitialized is returned when trying to initialize an option
// more than once.
// more than once or if the time window for initializing is over.
ErrAlreadyInitialized = errors.New("already initialized")
)

Expand Down Expand Up @@ -55,7 +55,7 @@ func EnableMetricPersistence(key string) error {

// Load metrics from storage.
var err error
storage, err = getMetricsStorage(key)
storage, err = getMetricsStorage(storageKey)
switch {
case err == nil:
// Continue.
Expand Down
90 changes: 43 additions & 47 deletions modules/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ type Module struct { //nolint:maligned
// start
startComplete chan struct{}
// stop
Ctx context.Context
cancelCtx func()
stopFlag *abool.AtomicBool
stopComplete chan struct{}
Ctx context.Context
cancelCtx func()
stopFlag *abool.AtomicBool
stopCompleted *abool.AtomicBool
stopComplete chan struct{}

// workers/tasks
ctrlFuncRunning *abool.AtomicBool
Expand Down Expand Up @@ -255,12 +256,10 @@ func (m *Module) checkIfStopComplete() {
atomic.LoadInt32(m.taskCnt) == 0 &&
atomic.LoadInt32(m.microTaskCnt) == 0 {

m.Lock()
defer m.Unlock()

if m.stopComplete != nil {
if m.stopCompleted.SetToIf(false, true) {
m.Lock()
defer m.Unlock()
close(m.stopComplete)
m.stopComplete = nil
}
}
}
Expand All @@ -283,60 +282,56 @@ func (m *Module) stop(reports chan *report) {
// Reset start/stop signal channels.
m.startComplete = make(chan struct{})
m.stopComplete = make(chan struct{})
m.stopCompleted.SetTo(false)

// Make a copy of the stop channel.
stopComplete := m.stopComplete

// Set status and cancel context.
// Set status.
m.status = StatusStopping
m.stopFlag.Set()
m.cancelCtx()

go m.stopAllTasks(reports, stopComplete)
go m.stopAllTasks(reports)
}

func (m *Module) stopAllTasks(reports chan *report, stopComplete chan struct{}) {
// start shutdown function
var stopFnError error
stopFuncRunning := abool.New()
if m.stopFn != nil {
stopFuncRunning.Set()
go func() {
stopFnError = m.runCtrlFn("stop module", m.stopFn)
stopFuncRunning.UnSet()
m.checkIfStopComplete()
}()
} else {
m.checkIfStopComplete()
}
func (m *Module) stopAllTasks(reports chan *report) {
// Manually set the control function flag in order to stop completion by race
// condition before stop function has even started.
m.ctrlFuncRunning.Set()

// Set stop flag for everyone checking this flag before we activate any stop trigger.
m.stopFlag.Set()

// Cancel the context to notify all workers and tasks.
m.cancelCtx()

// Start stop function.
stopFnError := m.startCtrlFn("stop module", m.stopFn)

// wait for results
select {
case <-stopComplete:
// case <-time.After(moduleStopTimeout):
case <-m.stopComplete:
// Complete!
case <-time.After(moduleStopTimeout):
log.Warningf(
"%s: timed out while waiting for stopfn/workers/tasks to finish: stopFn=%v/%v workers=%d tasks=%d microtasks=%d, continuing shutdown...",
"%s: timed out while waiting for stopfn/workers/tasks to finish: stopFn=%v workers=%d tasks=%d microtasks=%d, continuing shutdown...",
m.Name,
stopFuncRunning.IsSet(), m.ctrlFuncRunning.IsSet(),
m.ctrlFuncRunning.IsSet(),
atomic.LoadInt32(m.workerCnt),
atomic.LoadInt32(m.taskCnt),
atomic.LoadInt32(m.microTaskCnt),
)
}

// collect error
// Check for stop fn status.
var err error
if stopFuncRunning.IsNotSet() && stopFnError != nil {
err = stopFnError
}
// set status
if err != nil {
m.Error(
fmt.Sprintf("%s:stop-failed", m.Name),
fmt.Sprintf("Stopping module %s failed", m.Name),
fmt.Sprintf("Failed to stop module: %s", err.Error()),
)
select {
case err = <-stopFnError:
if err != nil {
// Set error as module error.
m.Error(
fmt.Sprintf("%s:stop-failed", m.Name),
fmt.Sprintf("Stopping module %s failed", m.Name),
fmt.Sprintf("Failed to stop module: %s", err.Error()),
)
}
default:
}

// Always set to offline in order to let other modules shutdown in order.
Expand Down Expand Up @@ -384,7 +379,7 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ...
Name: name,
enabled: abool.NewBool(false),
enabledAsDependency: abool.NewBool(false),
sleepMode: abool.NewBool(true),
sleepMode: abool.NewBool(true), // Change (for init) is triggered below.
sleepWaitingChannel: make(chan time.Time),
prepFn: prep,
startFn: start,
Expand All @@ -393,6 +388,7 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ...
Ctx: ctx,
cancelCtx: cancelCtx,
stopFlag: abool.NewBool(false),
stopCompleted: abool.NewBool(true),
ctrlFuncRunning: abool.NewBool(false),
workerCnt: &workerCnt,
taskCnt: &taskCnt,
Expand All @@ -401,7 +397,7 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ...
depNames: dependencies,
}

// Sleep mode is disabled by default
// Sleep mode is disabled by default.
newModule.Sleep(false)

return newModule
Expand Down
Loading

0 comments on commit 83b7095

Please sign in to comment.