Skip to content

feat(reporting): add metrics reporting to the new manager experience #2337

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions api/controllers/install/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,19 @@ func WithStateMachine(stateMachine statemachine.Interface) InstallControllerOpti

func NewInstallController(opts ...InstallControllerOption) (*InstallController, error) {
controller := &InstallController{
store: store.NewMemoryStore(),
rc: runtimeconfig.New(nil),
logger: logger.NewDiscardLogger(),
stateMachine: NewStateMachine(),
store: store.NewMemoryStore(),
rc: runtimeconfig.New(nil),
logger: logger.NewDiscardLogger(),
}

for _, opt := range opts {
opt(controller)
}

if controller.stateMachine == nil {
controller.stateMachine = NewStateMachine(WithStateMachineLogger(controller.logger))
}

if controller.hostUtils == nil {
controller.hostUtils = hostutils.New(
hostutils.WithLogger(controller.logger),
Expand Down
33 changes: 33 additions & 0 deletions api/controllers/install/reporting_handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package install

import (
"context"
"fmt"

"github.com/replicatedhq/embedded-cluster/api/internal/statemachine"
)

func (c *InstallController) RegisterReportingHandlers() {
c.stateMachine.RegisterEventHandler(StateSucceeded, c.reportInstallSucceeded)
c.stateMachine.RegisterEventHandler(StateFailed, c.reportInstallFailed)
c.stateMachine.RegisterEventHandler(StatePreflightsFailed, c.reportPreflightsFailed)
c.stateMachine.RegisterEventHandler(StatePreflightsFailedBypassed, c.reportPreflightsBypassed)
}

func (c *InstallController) reportInstallSucceeded(ctx context.Context, _, _ statemachine.State) {
c.metricsReporter.ReportInstallationSucceeded(ctx)
}

func (c *InstallController) reportInstallFailed(ctx context.Context, from, _ statemachine.State) {
if from == StateInfrastructureInstalling {
c.metricsReporter.ReportInstallationFailed(ctx, fmt.Errorf(c.install.Steps.Infra.Status.Description))

Check failure on line 23 in api/controllers/install/reporting_handlers.go

View workflow job for this annotation

GitHub Actions / Sanitize

non-constant format string in call to fmt.Errorf
}
}

func (c *InstallController) reportPreflightsFailed(ctx context.Context, _, _ statemachine.State) {
c.metricsReporter.ReportPreflightsFailed(ctx, c.install.Steps.HostPreflight.Output)
}

func (c *InstallController) reportPreflightsBypassed(ctx context.Context, _, _ statemachine.State) {
c.metricsReporter.ReportPreflightsFailed(ctx, c.install.Steps.HostPreflight.Output)
}
14 changes: 12 additions & 2 deletions api/controllers/install/statemachine.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package install

import "github.com/replicatedhq/embedded-cluster/api/internal/statemachine"
import (
"github.com/replicatedhq/embedded-cluster/api/internal/statemachine"
"github.com/sirupsen/logrus"
)

const (
// StateNew is the initial state of the install process
Expand Down Expand Up @@ -40,6 +43,7 @@ var validStateTransitions = map[statemachine.State][]statemachine.State{

type StateMachineOptions struct {
CurrentState statemachine.State
Logger logrus.FieldLogger
}

type StateMachineOption func(*StateMachineOptions)
Expand All @@ -50,6 +54,12 @@ func WithCurrentState(currentState statemachine.State) StateMachineOption {
}
}

func WithStateMachineLogger(logger logrus.FieldLogger) StateMachineOption {
return func(o *StateMachineOptions) {
o.Logger = logger
}
}

// NewStateMachine creates a new state machine starting in the New state
func NewStateMachine(opts ...StateMachineOption) statemachine.Interface {
options := &StateMachineOptions{
Expand All @@ -58,5 +68,5 @@ func NewStateMachine(opts ...StateMachineOption) statemachine.Interface {
for _, opt := range opts {
opt(options)
}
return statemachine.New(options.CurrentState, validStateTransitions)
return statemachine.New(options.CurrentState, validStateTransitions, statemachine.WithLogger(options.Logger))
}
89 changes: 86 additions & 3 deletions api/internal/statemachine/statemachine.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package statemachine

import (
"context"
"fmt"
"slices"
"sync"
"time"

"github.com/replicatedhq/embedded-cluster/api/pkg/logger"
"github.com/sirupsen/logrus"
)

// State represents the possible states of the install process
Expand All @@ -28,8 +33,15 @@ type Interface interface {
AcquireLock() (Lock, error)
// IsLockAcquired checks if a lock already exists on the state machine.
IsLockAcquired() bool
// RegisterEventHandler registers an async event handler for reporting events in the state machine.
RegisterEventHandler(targetState State, handler EventHandler)
// UnregisterEventHandler unregisters an async event handler for reporting events in the state machine.
UnregisterEventHandler(targetState State)
}

// EventHandler is a function that handles state transition events. Used to report state changes.
type EventHandler func(ctx context.Context, fromState, toState State)

type Lock interface {
// Release releases the lock.
Release()
Expand All @@ -41,14 +53,41 @@ type stateMachine struct {
validStateTransitions map[State][]State
lock *lock
mu sync.RWMutex
eventHandlers map[State][]EventHandler
logger logrus.FieldLogger
handlerTimeout time.Duration // Timeout for event handlers to complete, default is 5 seconds
}

// StateMachineOption is a configurable state machine option.
type StateMachineOption func(*stateMachine)

// New creates a new state machine starting in the given state with the given valid state
// transitions.
func New(currentState State, validStateTransitions map[State][]State) *stateMachine {
return &stateMachine{
// transitions and options.
func New(currentState State, validStateTransitions map[State][]State, opts ...StateMachineOption) *stateMachine {
sm := &stateMachine{
currentState: currentState,
validStateTransitions: validStateTransitions,
logger: logger.NewDiscardLogger(),
eventHandlers: make(map[State][]EventHandler),
handlerTimeout: 5 * time.Second,
}

for _, opt := range opts {
opt(sm)
}

return sm
}

func WithLogger(logger logrus.FieldLogger) StateMachineOption {
return func(sm *stateMachine) {
sm.logger = logger
}
}

func WithHandlerTimeout(handlerTimeout time.Duration) StateMachineOption {
return func(sm *stateMachine) {
sm.handlerTimeout = handlerTimeout
}
}

Expand Down Expand Up @@ -123,11 +162,55 @@ func (sm *stateMachine) Transition(lock Lock, nextState State) error {
return fmt.Errorf("invalid transition from %s to %s", sm.currentState, nextState)
}

fromState := sm.currentState
sm.currentState = nextState

// Trigger event handlers asynchronously after successful transition
sm.triggerHandlers(fromState, nextState)

return nil
}

func (sm *stateMachine) RegisterEventHandler(targetState State, handler EventHandler) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.eventHandlers[targetState] = append(sm.eventHandlers[targetState], handler)
}

func (sm *stateMachine) UnregisterEventHandler(targetState State) {
sm.mu.Lock()
defer sm.mu.Unlock()
delete(sm.eventHandlers, targetState)
}

// triggerHandlers triggers event handlers asynchronously
func (sm *stateMachine) triggerHandlers(from, next State) {
handlers, exists := sm.eventHandlers[next]
if !exists || len(handlers) == 0 {
return
}
handlersCopy := make([]EventHandler, len(handlers))
// Make a copy of the handlers to avoid potential changes to the slice during execution
copy(handlersCopy, handlers)

go func(from, to State, handlerList []EventHandler) {
ctx, cancel := context.WithTimeout(context.Background(), sm.handlerTimeout)
defer cancel()

for _, handler := range handlerList {
func() {
defer func() {
if r := recover(); r != nil {
// Log panic but don't affect the transition
sm.logger.Errorf("event handler panic from %s to %s: %v\n", from, next, r)
}
}()
handler(ctx, from, to)
}()
}
}(from, next, handlersCopy)
}

func (sm *stateMachine) isValidTransition(currentState State, newState State) bool {
validTransitions, ok := sm.validStateTransitions[currentState]
if !ok {
Expand Down
Loading
Loading