Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop node simulation in replay #48

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
SavedState/
SavedState*/
**/launch.json
**/*.flow
**/*.log
Expand Down
14 changes: 10 additions & 4 deletions consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,11 +756,16 @@ func (p *Pbft) GetProposal() *Proposal {
return p.state.proposal
}

// GetCurrentView returns current view
func (p *Pbft) GetCurrentView() *View {
return p.state.view.Copy()
}

// getNextMessage reads a new message from the message queue
func (p *Pbft) getNextMessage(span trace.Span, timeout time.Duration) (*MessageReq, bool) {
timeoutCh := time.After(timeout)
timeoutCh := p.notifier.CreateTimeoutChannel(timeout)
for {
msg, discards := p.notifier.ReadNextMessage(p)
msg, discards := p.notifier.ReadNextMessage(p, timeoutCh)
// send the discard messages
p.logger.Printf("[TRACE] Current state %s, number of prepared messages: %d, number of committed messages %d", PbftState(p.state.state), p.state.numPrepared(), p.state.numCommitted())

Expand All @@ -775,8 +780,9 @@ func (p *Pbft) getNextMessage(span trace.Span, timeout time.Duration) (*MessageR
return msg, true
}

// wait until there is a new message or
// someone closes the stopCh (i.e. timeout for round change)
// wait until there is a new message,
// someone closes the timeoutCh (i.e. timeout for round change) or
// cancel function is triggered
select {
case <-timeoutCh:
span.AddEvent("Timeout")
Expand Down
9 changes: 9 additions & 0 deletions e2e/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ import (
"github.com/0xPolygon/pbft-consensus"
)

const (
DropNode string = "DropNode"
RevertDropNode string = "RevertDropNode"
LastSequence string = "LastSequence"
)

type RevertFunc func()

type FunctionalAction interface {
Expand Down Expand Up @@ -35,10 +41,13 @@ func (dn *DropNodeAction) Apply(c *Cluster) RevertFunc {
log.Printf("Dropping node: '%s'.", nodeToStop)

c.StopNode(nodeToStop.name)
view := nodeToStop.GetCurrentView()
c.replayMessageNotifier.HandleAction(DropNode, nodeToStop.name, view.Sequence, view.Round)

return func() {
log.Printf("Reverting stopped node %v\n", nodeToStop.name)
nodeToStop.Start()
c.replayMessageNotifier.HandleAction(RevertDropNode, nodeToStop.name, c.GetMaxHeight()+1, 0)
}
}

Expand Down
69 changes: 49 additions & 20 deletions e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,6 @@ func NewPBFTCluster(t *testing.T, config *ClusterConfig, hook ...transportHook)
createBackend: config.CreateBackend,
}

err = c.replayMessageNotifier.SaveMetaData(&names)
if err != nil {
log.Printf("[WARNING] Could not write node meta data to replay messages file. Reason: %v", err)
}

for _, name := range names {
trace := c.tracer.Tracer(name)
n, _ := newPBFTNode(name, config, names, trace, tt)
Expand Down Expand Up @@ -286,6 +281,11 @@ func (n *node) GetNodeHeight() uint64 {
return uint64(n.getSyncIndex()) + 1
}

// GetCurrentView gets current view from the state machine
func (n *node) GetCurrentView() *pbft.View {
return n.pbft.GetCurrentView()
}

func (c *Cluster) syncWithNetwork(nodeID string) (uint64, int64) {
c.lock.Lock()
defer c.lock.Unlock()
Expand Down Expand Up @@ -389,6 +389,7 @@ func (c *Cluster) Stop() {
n.Stop()
}
}
c.SaveState()
if err := c.tracer.Shutdown(context.Background()); err != nil {
panic("failed to shutdown TracerProvider")
}
Expand All @@ -398,6 +399,38 @@ func (c *Cluster) GetTransportHook() transportHook {
return c.transport.getHook()
}

// SaveState saves messages and action meta data to corresponding .flow files
func (c *Cluster) SaveState() {
c.saveMessages()
c.saveMetaData()
}

// saveMessages saves all cached messages to .flow file
func (c *Cluster) saveMessages() {
err := c.replayMessageNotifier.SaveState()
if err != nil {
log.Printf("[WARNING] Could not write state to file. Reason: %v", err)
}
}

// saveMetaData saves all cached actions meta data to .flow file
func (c *Cluster) saveMetaData() {
c.lock.Lock()
defer c.lock.Unlock()

var nodeNames []string
for name, node := range c.GetNodesMap() {
nodeNames = append(nodeNames, name)
view := node.GetCurrentView()
c.replayMessageNotifier.HandleAction(LastSequence, name, view.Sequence, view.Round)
}

err := c.replayMessageNotifier.SaveMetaData(nodeNames)
if err != nil {
log.Printf("[WARNING] Could not write node meta data to replay messages file. Reason: %v", err)
}
}

type node struct {
// index of node synchronization with the cluster
localSyncIndex int64
Expand Down Expand Up @@ -521,10 +554,7 @@ func (n *node) Start() {

// start the execution
n.pbft.Run(ctx)
err := n.c.replayMessageNotifier.SaveState()
if err != nil {
log.Printf("[WARNING] Could not write state to file. Reason: %v", err)
}
n.c.saveMessages()

switch n.pbft.GetState() {
case pbft.SyncState:
Expand All @@ -536,6 +566,7 @@ func (n *node) Start() {
n.setSyncIndex(currentSyncIndex + 1)
default:
// stopped
n.c.saveMessages()
return
}
}
Expand Down Expand Up @@ -716,29 +747,27 @@ func (v *valString) Len() int {
// ReplayNotifier is an interface that expands the StateNotifier with additional methods for saving and loading replay messages
type ReplayNotifier interface {
pbft.StateNotifier
SaveMetaData(nodeNames *[]string) error
SaveMetaData(nodeNames []string) error
SaveState() error
HandleMessage(to pbft.NodeID, message *pbft.MessageReq)
HandleAction(actionType, data string, sequence, round uint64)
}

// DefaultReplayNotifier is a null object implementation of ReplayNotifier interface
type DefaultReplayNotifier struct {
}

// HandleTimeout implements StateNotifier interface
func (n *DefaultReplayNotifier) HandleTimeout(to pbft.NodeID, msgType pbft.MsgType, view *pbft.View) {
}

// ReadNextMessage is an implementation of StateNotifier interface
func (n *DefaultReplayNotifier) ReadNextMessage(p *pbft.Pbft) (*pbft.MessageReq, []*pbft.MessageReq) {
return p.ReadMessageWithDiscards()
pbft.DefaultStateNotifier
}

// SaveMetaData is an implementation of ReplayNotifier interface
func (n *DefaultReplayNotifier) SaveMetaData(nodeNames *[]string) error { return nil }
func (n *DefaultReplayNotifier) SaveMetaData(nodeNames []string) error {
return nil
}

// SaveState is an implementation of ReplayNotifier interface
func (n *DefaultReplayNotifier) SaveState() error { return nil }

// HandleMessage is an implementation of ReplayNotifier interface
func (n *DefaultReplayNotifier) HandleMessage(to pbft.NodeID, message *pbft.MessageReq) {}

// HandleAction is an implementation of ReplayNotifier interface
func (n *DefaultReplayNotifier) HandleAction(actionType, data string, sequence, round uint64) {}
44 changes: 44 additions & 0 deletions e2e/fuzz/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
- [FUZZ FRAMEWORK](#fuzz-framework)
- [Daemon](#daemon)
- [Replay Messages](#replay-messages)
- [Known issues](#known-issues)

# FUZZ FRAMEWORK

Fuzz framework is an automated software testing framework that involves providing invalid, unexpected, or random data as inputs to a computer program. The program is then monitored for exceptions such as crashes, failing built-in code assertions, or potential memory leaks.

Fuzz framework enables building randomized test sets upon PBFT consensus algorithm. The end goal of the fuzz framework is to ensure that even under stress conditions, we achieve linearity on the PBFT consensus protocol, this is, under some time bounds and conditions (i.e. we cannot produce a block, if there are zero nodes active), a new block is being produced.

## Daemon
Fuzz daemon represents a test runner for fuzz/randomized tests. It is a separate go module/process, which runs alongside the cluster of Pbft nodes.
It runs a Go routine and a single cluster with predefined number of nodes and randomly picks some subset of predefined actions and applies those to the cluster.

Supported actions that can be simulated in fuzz daemon are:
1. Drop Node - simulates dropping of a node in cluster where a single node goes offline, and is not communicating with the rest of the network.
2. Partitions - simulates grouping of nodes in seperate partitions, where each partition only communicates with its member nodes.

To run fuzz daemon (runner) run the following command:

`go run ./e2e/fuzz/cmd/main.go fuzz-run -nodes={numberOfNodes} -duration={duration}`

e.g., `go run ./e2e/fuzz/cmd/main.go fuzz-run -nodes=5 -duration=1h`

To log node execution to separate log files for easier problem analysis, set environment variable `E2E_LOG_TO_FILES` to `true` before running the `fuzz-run` command. Logs are stored in the parent folder from which `fuzz-run` command was called, inside the `logs-{timestamp}` subfolder. Each node will have its own .log file where name of the file will be the name of the corresponding node.

By default when running `fuzz-run` command, two `.flow` files will be saved containing all the messages that were gossiped during the fuzz daemon execution, alongside some meta data about actions that were simulated during fuzz running. Files are saved inside the parent folder from which `fuzz-run` command was called, inside the `SavedState` subfolder. File containing gossiped messages is named `messages-{timestamp}.flow`, and file containing node and actions meta data is called `metaData-{timestamp}.flow`. These files are needed to replay the fuzz exectuion using the **Replay Messages** feature.

## Replay Messages
Fuzz framework relies on a randomized set of predefined actions, which are applied and reverted on **PBFT** nodes cluster. Since its execution is non-deterministic and algorithm failure can be discovered at any time, it is of utmost importance to have trace of triggers which led to the failure state and possibility to replay those triggers on-demand arbitrary number of times. This feature enables in-depth analysis of failure.

Replay takes the provided `messages.flow` and `metaData.flow` files, creates a cluster with appropriate amount of nodes with same name as in fuzz run, pushes all the messages in appropriate node queues and starts the cluster. This enables quick replay of previously run execution and a way to analyze the problem that occurred on demand.

To run replay messages, run the following command:

`go run ./e2e/fuzz/cmd/main.go replay-messages -filesDirectory={directoryWhereFlowFilesAreStored}`

e.g., `go run ./e2e/fuzz/cmd/main.go replay-messages -filesDirectory=../SavedData`

**NOTE**: Replay does not save .flow files on its execution.

## Known issues
When saving messages that are gossiped during the execution of fuzz daemon, messages will be sorted by sequence but they will not be in the order that they were gossiped, since `Gossip` method sends messages to each node asynchronously in separate go routins for each receiver. This means that a `PrePrepare` message may not be first in the `.flow` file since all the other messages are also sent in seperate routines. This does not have an effect or causes an issue on replay, since messages are seperated in different queues in **PBFT** state machine depending on its message type (`PrePrepare`, `Prepare`, `Commit`, `RoundChange`).
24 changes: 21 additions & 3 deletions e2e/fuzz/replay/replay_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,24 @@ import (
"github.com/0xPolygon/pbft-consensus"
)

// MetaData is a struct that holds data about fuzz actions that happened and need to be saved in .flow file
type MetaData struct {
DataType string `json:"actionType"`
Data string `json:"data"`
Sequence uint64 `json:"sequence"`
Round uint64 `json:"round"`
}

// NewMetaData creates a new MetaData object
func NewMetaData(dataType, data string, sequence, round uint64) *MetaData {
return &MetaData{
DataType: dataType,
Data: data,
Sequence: sequence,
Round: round,
}
}

// ReplayMessage is a struct that represents a single json object in .flow file
type ReplayMessage struct {
To pbft.NodeID `json:"to"`
Expand All @@ -31,10 +49,10 @@ func NewReplayTimeoutMessage(to pbft.NodeID, msgType pbft.MsgType, view *pbft.Vi
}
}

// ConvertToByteArrays converts ReplayMessage slice to JSON representation and return it back as slice of byte arrays
func ConvertToByteArrays(messages []*ReplayMessage) ([][]byte, error) {
// ConvertToByteArrays is a generic method that converts provided slice to JSON representation and returns it back as slice of byte arrays
func ConvertToByteArrays(items []interface{}) ([][]byte, error) {
var allRawMessages [][]byte
for _, message := range messages {
for _, message := range items {
currentRawMessage, err := json.Marshal(message)
if err != nil {
return allRawMessages, err
Expand Down
Loading