Skip to content

Commit

Permalink
Merge pull request #6769 from onflow/janez/expose-node-component-mana…
Browse files Browse the repository at this point in the history
…gement

Expose node component management
  • Loading branch information
janezpodhostnik authored Jan 2, 2025
2 parents a7656ea + e6fb7ad commit 0f4013e
Show file tree
Hide file tree
Showing 15 changed files with 206 additions and 103 deletions.
4 changes: 3 additions & 1 deletion cmd/access/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"context"

"github.com/onflow/flow-go/cmd"
nodebuilder "github.com/onflow/flow-go/cmd/access/node_builder"
"github.com/onflow/flow-go/model/flow"
Expand All @@ -24,5 +26,5 @@ func main() {
if err != nil {
builder.Logger.Fatal().Err(err).Send()
}
node.Run()
node.Run(context.Background())
}
3 changes: 2 additions & 1 deletion cmd/collection/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -646,7 +647,7 @@ func main() {
if err != nil {
nodeBuilder.Logger.Fatal().Err(err).Send()
}
node.Run()
node.Run(context.Background())
}

// createQCContractClient creates QC contract client
Expand Down
3 changes: 2 additions & 1 deletion cmd/consensus/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -933,7 +934,7 @@ func main() {
if err != nil {
nodeBuilder.Logger.Fatal().Err(err).Send()
}
node.Run()
node.Run(context.Background())
}

func loadBeaconPrivateKey(dir string, myID flow.Identifier) (*encodable.RandomBeaconPrivKey, error) {
Expand Down
4 changes: 3 additions & 1 deletion cmd/execution/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"context"

"github.com/onflow/flow-go/cmd"
"github.com/onflow/flow-go/model/flow"
)
Expand All @@ -19,5 +21,5 @@ func main() {
if err != nil {
exeBuilder.FlowNodeBuilder.Logger.Fatal().Err(err).Send()
}
node.Run()
node.Run(context.Background())
}
4 changes: 3 additions & 1 deletion cmd/ghost/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"context"

"github.com/spf13/pflag"

"github.com/onflow/flow-go/cmd"
Expand Down Expand Up @@ -45,5 +47,5 @@ func main() {
if err != nil {
nodeBuilder.Logger.Fatal().Err(err).Send()
}
node.Run()
node.Run(context.Background())
}
66 changes: 51 additions & 15 deletions cmd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,61 @@ type Node interface {
// Run initiates all common components (logger, database, protocol state etc.)
// then starts each component. It also sets up a channel to gracefully shut
// down each component if a SIGINT is received.
Run()
// The context can also be used to signal the node to shutdown.
Run(ctx context.Context)
}

// FlowNodeImp is created by the FlowNodeBuilder with all components ready to be started.
// The Run function starts all the components, and is blocked until either a termination
// signal is received or a irrecoverable error is encountered.
type FlowNodeImp struct {
component.Component
NodeImp
*NodeConfig
}

// NodeImp can be used to create a node instance from:
// - a logger: to be used during startup and shutdown
// - a component: that will be started with Run
// - a cleanup function: that will be called after the component has been stopped
// - a fatal error handler: to handle any error received from the component
type NodeImp struct {
component.Component
logger zerolog.Logger
postShutdown func() error
fatalHandler func(error)
}

// NewNode returns a new node instance
func NewNode(component component.Component, cfg *NodeConfig, logger zerolog.Logger, cleanup func() error, handleFatal func(error)) Node {
func NewNode(
component component.Component,
cfg *NodeConfig,
logger zerolog.Logger,
cleanup func() error,
handleFatal func(error),
) Node {
return &FlowNodeImp{
NodeConfig: cfg,
NodeImp: NewBaseNode(
component,
logger.With().
Str("node_role", cfg.BaseConfig.NodeRole).
Hex("spork_id", logging.ID(cfg.SporkID)).
Logger(),
cleanup,
handleFatal,
),
}
}

// NewBaseNode returns a new base node instance
func NewBaseNode(
component component.Component,
logger zerolog.Logger,
cleanup func() error,
handleFatal func(error),
) NodeImp {
return NodeImp{
Component: component,
NodeConfig: cfg,
logger: logger,
postShutdown: cleanup,
fatalHandler: handleFatal,
Expand All @@ -51,13 +87,10 @@ func NewNode(component component.Component, cfg *NodeConfig, logger zerolog.Logg
// which point it gracefully shuts down.
// Any unhandled irrecoverable errors thrown in child components will propagate up to here and
// result in a fatal error.
func (node *FlowNodeImp) Run() {
// Cancelling this context notifies all child components that it's time to shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
func (node *NodeImp) Run(ctx context.Context) {

// Block until node is shutting down
err := node.run(ctx, cancel)
err := node.run(ctx)

// Any error received is considered fatal.
if err != nil {
Expand All @@ -73,14 +106,18 @@ func (node *FlowNodeImp) Run() {
node.logger.Error().Err(err).Msg("error encountered during cleanup")
}

node.logger.Info().Msgf("%s node shutdown complete", node.BaseConfig.NodeRole)
node.logger.Info().Msg("node shutdown complete")
}

// run starts the node and blocks until a SIGINT/SIGTERM is received or an error is encountered.
// It returns:
// - nil if a termination signal is received, and all components have been gracefully stopped.
// - error if a irrecoverable error is received
func (node *FlowNodeImp) run(ctx context.Context, shutdown context.CancelFunc) error {
// - error if an irrecoverable error is received
func (node *NodeImp) run(ctx context.Context) error {
// Cancelling this context notifies all child components that it's time to shut down
ctx, shutdown := context.WithCancel(ctx)
defer shutdown()

// Components will pass unhandled irrecoverable errors to this channel via signalerCtx (or a
// child context). Any errors received on this channel should halt the node.
signalerCtx, errChan := irrecoverable.WithSignaler(ctx)
Expand All @@ -97,8 +134,7 @@ func (node *FlowNodeImp) run(ctx context.Context, shutdown context.CancelFunc) e
select {
case <-node.Ready():
node.logger.Info().
Hex("spork_id", logging.ID(node.SporkID)).
Msgf("%s node startup complete", node.BaseConfig.NodeRole)
Msg("node startup complete")
case <-ctx.Done():
}
}()
Expand All @@ -118,7 +154,7 @@ func (node *FlowNodeImp) run(ctx context.Context, shutdown context.CancelFunc) e

// 3: Shut down
// Send shutdown signal to components
node.logger.Info().Msgf("%s node shutting down", node.BaseConfig.NodeRole)
node.logger.Info().Msg("node shutting down")
shutdown()

// Block here until all components have stopped or an irrecoverable error is received.
Expand Down
17 changes: 10 additions & 7 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ import (
const NotSet = "not set"

type BuilderFunc func(nodeConfig *NodeConfig) error
type ReadyDoneFactory func(node *NodeConfig) (module.ReadyDoneAware, error)

// ReadyDoneFactory is a function that returns a ReadyDoneAware component or an error if
// the factory cannot create the component
type ReadyDoneFactory[Input any] func(input Input) (module.ReadyDoneAware, error)

// NodeBuilder declares the initialization methods needed to bootstrap up a Flow node
type NodeBuilder interface {
Expand Down Expand Up @@ -73,7 +76,7 @@ type NodeBuilder interface {
// The ReadyDoneFactory may return either a `Component` or `ReadyDoneAware` instance.
// In both cases, the object is started according to its interface when the node is run,
// and the node will wait for the component to exit gracefully.
Component(name string, f ReadyDoneFactory) NodeBuilder
Component(name string, f ReadyDoneFactory[*NodeConfig]) NodeBuilder

// DependableComponent adds a new component to the node that conforms to the ReadyDoneAware
// interface. The builder will wait until all of the components in the dependencies list are ready
Expand All @@ -86,15 +89,15 @@ type NodeBuilder interface {
// IMPORTANT: Dependable components are started in parallel with no guaranteed run order, so all
// dependencies must be initialized outside of the ReadyDoneFactory, and their `Ready()` method
// MUST be idempotent.
DependableComponent(name string, f ReadyDoneFactory, dependencies *DependencyList) NodeBuilder
DependableComponent(name string, f ReadyDoneFactory[*NodeConfig], dependencies *DependencyList) NodeBuilder

// RestartableComponent adds a new component to the node that conforms to the ReadyDoneAware
// interface, and calls the provided error handler when an irrecoverable error is encountered.
// Use RestartableComponent if the component is not critical to the node's safe operation and
// can/should be independently restarted when an irrecoverable error is encountered.
//
// Any irrecoverable errors thrown by the component will be passed to the provided error handler.
RestartableComponent(name string, f ReadyDoneFactory, errorHandler component.OnError) NodeBuilder
RestartableComponent(name string, f ReadyDoneFactory[*NodeConfig], errorHandler component.OnError) NodeBuilder

// ShutdownFunc adds a callback function that is called after all components have exited.
// All shutdown functions are called regardless of errors returned by previous callbacks. Any
Expand Down Expand Up @@ -299,16 +302,16 @@ func DefaultBaseConfig() *BaseConfig {
// DependencyList is a slice of ReadyDoneAware implementations that are used by DependableComponent
// to define the list of dependencies that must be ready before starting the component.
type DependencyList struct {
components []module.ReadyDoneAware
Components []module.ReadyDoneAware
}

func NewDependencyList(components ...module.ReadyDoneAware) *DependencyList {
return &DependencyList{
components: components,
Components: components,
}
}

// Add adds a new ReadyDoneAware implementation to the list of dependencies.
func (d *DependencyList) Add(component module.ReadyDoneAware) {
d.components = append(d.components, component)
d.Components = append(d.Components, component)
}
49 changes: 44 additions & 5 deletions cmd/node_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"errors"
"os"
"syscall"
Expand Down Expand Up @@ -42,7 +43,7 @@ func TestRunShutsDownCleanly(t *testing.T) {

finished := make(chan struct{})
go func() {
node.Run()
node.Run(context.Background())
close(finished)
}()

Expand All @@ -62,6 +63,44 @@ func TestRunShutsDownCleanly(t *testing.T) {
}, testLogger.logs)
})

t.Run("Run shuts down gracefully on context cancel", func(t *testing.T) {
testLogger.Reset()
manager := component.NewComponentManagerBuilder().
AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
testLogger.Log("worker starting up")
ready()
testLogger.Log("worker startup complete")

<-ctx.Done()
testLogger.Log("worker shutting down")
testLogger.Log("worker shutdown complete")
}).
Build()
node := NewNode(manager, nodeConfig, logger, postShutdown, fatalHandler)

ctx, cancel := context.WithCancel(context.Background())

finished := make(chan struct{})
go func() {
node.Run(ctx)
close(finished)
}()

<-node.Ready()

cancel()

<-finished

assert.Equal(t, []string{
"worker starting up",
"worker startup complete",
"worker shutting down",
"worker shutdown complete",
"running cleanup",
}, testLogger.logs)
})

t.Run("Run encounters error during postShutdown", func(t *testing.T) {
testLogger.Reset()
manager := component.NewComponentManagerBuilder().
Expand All @@ -82,7 +121,7 @@ func TestRunShutsDownCleanly(t *testing.T) {

finished := make(chan struct{})
go func() {
node.Run()
node.Run(context.Background())
close(finished)
}()

Expand Down Expand Up @@ -123,7 +162,7 @@ func TestRunShutsDownCleanly(t *testing.T) {

finished := make(chan struct{})
go func() {
node.Run()
node.Run(context.Background())
close(finished)
}()

Expand Down Expand Up @@ -157,7 +196,7 @@ func TestRunShutsDownCleanly(t *testing.T) {

finished := make(chan struct{})
go func() {
node.Run()
node.Run(context.Background())
close(finished)
}()

Expand Down Expand Up @@ -191,7 +230,7 @@ func TestRunShutsDownCleanly(t *testing.T) {

finished := make(chan struct{})
go func() {
node.Run()
node.Run(context.Background())
close(finished)
}()

Expand Down
4 changes: 3 additions & 1 deletion cmd/observer/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"context"

nodebuilder "github.com/onflow/flow-go/cmd/observer/node_builder"
)

Expand All @@ -22,5 +24,5 @@ func main() {
if err != nil {
anb.Logger.Fatal().Err(err).Send()
}
node.Run()
node.Run(context.Background())
}
Loading

0 comments on commit 0f4013e

Please sign in to comment.