Skip to content

Commit

Permalink
Drop node simulation in replay
Browse files Browse the repository at this point in the history
  • Loading branch information
goran-ethernal committed Apr 5, 2022
1 parent 14cda84 commit 63486e9
Show file tree
Hide file tree
Showing 12 changed files with 604 additions and 232 deletions.
14 changes: 10 additions & 4 deletions consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,11 +756,16 @@ func (p *Pbft) GetProposal() *Proposal {
return p.state.proposal
}

// GetCurrentView returnes current view
func (p *Pbft) GetCurrentView() *View {
return p.state.view.Copy()
}

// getNextMessage reads a new message from the message queue
func (p *Pbft) getNextMessage(span trace.Span, timeout time.Duration) (*MessageReq, bool) {
timeoutCh := time.After(timeout)
timeoutCh := p.notifier.CreateTimeoutChannel(timeout)
for {
msg, discards := p.notifier.ReadNextMessage(p)
msg, discards := p.notifier.ReadNextMessage(p, timeoutCh)
// send the discard messages
p.logger.Printf("[TRACE] Current state %s, number of prepared messages: %d, number of committed messages %d", PbftState(p.state.state), p.state.numPrepared(), p.state.numCommitted())

Expand All @@ -775,8 +780,9 @@ func (p *Pbft) getNextMessage(span trace.Span, timeout time.Duration) (*MessageR
return msg, true
}

// wait until there is a new message or
// someone closes the stopCh (i.e. timeout for round change)
// wait until there is a new message,
// someone closes the timeoutCh (i.e. timeout for round change) or
// cancel function is triggered
select {
case <-timeoutCh:
span.AddEvent("Timeout")
Expand Down
42 changes: 42 additions & 0 deletions e2e/actions.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,52 @@
package e2e

import (
"encoding/json"
"log"
"math/rand"
"time"

"github.com/0xPolygon/pbft-consensus"
)

const (
DropNode string = "DropNode"
RevertDropNode string = "RevertDropNode"
Partition string = "Partition"
LastSequence string = "LastSequence"
)

// MetaData is a struct that holds data about fuzz actions that happened and need to be saved in .flow file
type MetaData struct {
DataType string `json:"actionType"`
Data string `json:"data"`
Sequence uint64 `json:"sequence"`
Round uint64 `json:"round"`
}

// NewMetaData creates a new MetaData object
func NewMetaData(dataType, data string, sequence, round uint64) *MetaData {
return &MetaData{
DataType: dataType,
Data: data,
Sequence: sequence,
Round: round,
}
}

// ConvertActionsToByteArrays converts ActionSerializable slice to JSON representation and returns it back as slice of byte arrays
func ConvertActionsToByteArrays(actions []*MetaData) ([][]byte, error) {
var allRawACtions [][]byte
for _, message := range actions {
currentRawAction, err := json.Marshal(message)
if err != nil {
return allRawACtions, err
}
allRawACtions = append(allRawACtions, currentRawAction)
}
return allRawACtions, nil
}

type RevertFunc func()

type FunctionalAction interface {
Expand Down Expand Up @@ -35,10 +74,13 @@ func (dn *DropNodeAction) Apply(c *Cluster) RevertFunc {
log.Printf("Dropping node: '%s'.", nodeToStop)

c.StopNode(nodeToStop.name)
view := nodeToStop.GetCurrentView()
c.replayMessageNotifier.HandleAction(NewMetaData(DropNode, nodeToStop.name, view.Sequence, view.Round))

return func() {
log.Printf("Reverting stopped node %v\n", nodeToStop.name)
nodeToStop.Start()
c.replayMessageNotifier.HandleAction(NewMetaData(RevertDropNode, nodeToStop.name, c.GetMaxHeight()+1, 0))
}
}

Expand Down
75 changes: 55 additions & 20 deletions e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,6 @@ func NewPBFTCluster(t *testing.T, config *ClusterConfig, hook ...transportHook)
createBackend: config.CreateBackend,
}

err = c.replayMessageNotifier.SaveMetaData(&names)
if err != nil {
log.Printf("[WARNING] Could not write node meta data to replay messages file. Reason: %v", err)
}

for _, name := range names {
trace := c.tracer.Tracer(name)
n, _ := newPBFTNode(name, config, names, trace, tt)
Expand Down Expand Up @@ -286,6 +281,11 @@ func (n *node) GetNodeHeight() uint64 {
return uint64(n.getSyncIndex()) + 1
}

// GetCurrentView gets current view from the state machine
func (n *node) GetCurrentView() *pbft.View {
return n.pbft.GetCurrentView()
}

func (c *Cluster) syncWithNetwork(nodeID string) (uint64, int64) {
c.lock.Lock()
defer c.lock.Unlock()
Expand Down Expand Up @@ -389,6 +389,7 @@ func (c *Cluster) Stop() {
n.Stop()
}
}
c.SaveState()
if err := c.tracer.Shutdown(context.Background()); err != nil {
panic("failed to shutdown TracerProvider")
}
Expand All @@ -398,6 +399,44 @@ func (c *Cluster) GetTransportHook() transportHook {
return c.transport.getHook()
}

// SaveState saves messages and action meta data to corresponding .flow files
func (c *Cluster) SaveState() {
c.saveMessages()
c.saveMetaData()
}

// saveMessages saves all cached messages to .flow file
func (c *Cluster) saveMessages() {
err := c.replayMessageNotifier.SaveState()
if err != nil {
log.Printf("[WARNING] Could not write state to file. Reason: %v", err)
}
}

// saveMetaData saves all cached actions meta data to .flow file
func (c *Cluster) saveMetaData() {
c.lock.Lock()
defer c.lock.Unlock()

var nodeNames []string
var lastSequences []*MetaData
for name, node := range c.GetNodesMap() {
nodeNames = append(nodeNames, name)
view := node.GetCurrentView()
lastSequences = append(lastSequences, &MetaData{
DataType: LastSequence,
Data: name,
Sequence: view.Sequence,
Round: view.Round,
})
}

err := c.replayMessageNotifier.SaveMetaData(nodeNames, lastSequences)
if err != nil {
log.Printf("[WARNING] Could not write node meta data to replay messages file. Reason: %v", err)
}
}

type node struct {
// index of node synchronization with the cluster
localSyncIndex int64
Expand Down Expand Up @@ -521,10 +560,7 @@ func (n *node) Start() {

// start the execution
n.pbft.Run(ctx)
err := n.c.replayMessageNotifier.SaveState()
if err != nil {
log.Printf("[WARNING] Could not write state to file. Reason: %v", err)
}
n.c.saveMessages()

switch n.pbft.GetState() {
case pbft.SyncState:
Expand All @@ -536,6 +572,7 @@ func (n *node) Start() {
n.setSyncIndex(currentSyncIndex + 1)
default:
// stopped
n.c.saveMessages()
return
}
}
Expand Down Expand Up @@ -716,29 +753,27 @@ func (v *valString) Len() int {
// ReplayNotifier is an interface that expands the StateNotifier with additional methods for saving and loading replay messages
type ReplayNotifier interface {
pbft.StateNotifier
SaveMetaData(nodeNames *[]string) error
SaveMetaData(nodeNames []string, metaData []*MetaData) error
SaveState() error
HandleMessage(to pbft.NodeID, message *pbft.MessageReq)
HandleAction(action *MetaData)
}

// DefaultReplayNotifier is a null object implementation of ReplayNotifier interface
type DefaultReplayNotifier struct {
}

// HandleTimeout implements StateNotifier interface
func (n *DefaultReplayNotifier) HandleTimeout(to pbft.NodeID, msgType pbft.MsgType, view *pbft.View) {
}

// ReadNextMessage is an implementation of StateNotifier interface
func (n *DefaultReplayNotifier) ReadNextMessage(p *pbft.Pbft) (*pbft.MessageReq, []*pbft.MessageReq) {
return p.ReadMessageWithDiscards()
pbft.DefaultStateNotifier
}

// SaveMetaData is an implementation of ReplayNotifier interface
func (n *DefaultReplayNotifier) SaveMetaData(nodeNames *[]string) error { return nil }
func (n *DefaultReplayNotifier) SaveMetaData(nodeNames []string, metaData []*MetaData) error {
return nil
}

// SaveState is an implementation of ReplayNotifier interface
func (n *DefaultReplayNotifier) SaveState() error { return nil }

// HandleMessage is an implementation of ReplayNotifier interface
func (n *DefaultReplayNotifier) HandleMessage(to pbft.NodeID, message *pbft.MessageReq) {}

// HandleAction is an implementation of ReplayNotifier interface
func (n *DefaultReplayNotifier) HandleAction(action *MetaData) {}
45 changes: 45 additions & 0 deletions e2e/fuzz/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
- [FUZZ FRAMEWORK](#fuzz-framework)
- [Daemon](#daemon)
- [Replay Messages](#replay-messages)
- [Known issues](#known-issues)

# FUZZ FRAMEWORK

Fuzz framework is an automated software testing framework that involves providing invalid, unexpected, or random data as inputs to a computer program. The program is then monitored for exceptions such as crashes, failing built-in code assertions, or potential memory leaks.

Fuzz framework enables building randomized test sets upon PBFT consensus algorithm. The end goal of the fuzz framework is to ensure that even under stress conditions, we achieve linearity on the PBFT consensus protocol, this is, under some time bounds and conditions (i.e. we cannot produce a block, if there are zero nodes active), a new block is being produced.

## Daemon
Fuzz daemon represents a test runner for fuzz/randomized tests. It is a separate go module/process, which runs alongside the cluster of Pbft nodes.
It runs a Go routine and a single cluster with predefined number of nodes and randomly picks some subset of predefined actions and applies those to the cluster.

Supported actions that can be simulated in fuzz daemon are:
1. Drop Node - simulates dropping of a node in cluster where a single node goes offline, and is not communicating with the rest of the network.
2. Partitions - simulates grouping of nodes in seperate partitions, where each partition only communicates with its member nodes.
3. Flow Map - simulates a message routing mechanism where some of the nodes within the cluster are fully connected, whereas some of them are partially connected (namely only to the subset of peer nodes).

To run fuzz daemon (runner) run the following command:

`go run ./e2e/fuzz/cmd/main.go fuzz-run -nodes={numberOfNodes} -duration={duration}`

e.g., `go run ./e2e/fuzz/cmd/main.go fuzz-run -nodes=5 -duration=1h`

To log node execution to separate log files for easier problem analysis, set environment variable `E2E_LOG_TO_FILES` to `true` before running the `fuzz-run` command. Logs are stored in the parent folder from which `fuzz-run` command was called, inside the `logs-{timestamp}` subfolder. Each node will have its own .log file where name of the file will be the name of the corresponding node.

By default when running `fuzz-run` command, two `.flow` files will be saved containing all the messages that were gossiped during the fuzz daemon execution, alongside some meta data about actions that were simulated during fuzz running. Files are saved inside the parent folder from which `fuzz-run` command was called, inside the `SavedState` subfolder. File containing gossiped messages is named `messages-{timestamp}.flow`, and file containing node and actions meta data is called `metaData-{timestamp}.flow`. These files are needed to replay the fuzz exectuion using the **Replay Messages** feature.

## Replay Messages
Fuzz framework relies on a randomized set of predefined actions, which are applied and reverted on **PolyBFT** nodes cluster. Since its execution is non-deterministic and algorithm failure can be discovered at any time, it is of utmost importance to have trace of triggers which led to the failure state and possibility to replay those triggers on-demand arbitrary number of times. This feature enables in-depth analysis of failure.

Replay takes the provided `messages.flow` and `metaData.flow` files, creates a cluster with appropriate amount of nodes with same name as in fuzz run, pushes all the messages in appropriate node queues and starts the cluster. This enables quick replay of previously run execution and a way to analyze the problem that occurred on demand.

To run replay messages, run the following command:

`go run ./e2e/fuzz/cmd/main.go replay-messages -messagesFile={fullPathToMessagesFile} -metaDataFile={fullPathToMetaDataFile}`

e.g., `go run ./e2e/fuzz/cmd/main.go replay-messages -messagesFile=../SavedData/messages.flow -metaDataFile=../SavedData/metaData.flow`

**NOTE**: Replay does not save .flow files on its execution.

## Known issues
When saving messages that are gossiped during the execution of fuzz daemon, messages will be sorted by sequence but they will not be in the order that they were gossiped, since `Gossip` method sends messages to each node asynchronously in separate go routins for each receiver. This means that a `PrePrepare` message may not be first in the `.flow` file since all the other messages are also sent in seperate routines. This does not have an effect or causes an issue on replay, since messages are seperated in different queues in **PolyBFT** state machine depending on its message type (`PrePrepare`, `Prepare`, `Commit`, `RoundChange`).
4 changes: 2 additions & 2 deletions e2e/fuzz/replay/replay_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func NewReplayTimeoutMessage(to pbft.NodeID, msgType pbft.MsgType, view *pbft.Vi
}
}

// ConvertToByteArrays converts ReplayMessage slice to JSON representation and return it back as slice of byte arrays
func ConvertToByteArrays(messages []*ReplayMessage) ([][]byte, error) {
// ConvertMessagesToByteArrays converts ReplayMessage slice to JSON representation and return it back as slice of byte arrays
func ConvertMessagesToByteArrays(messages []*ReplayMessage) ([][]byte, error) {
var allRawMessages [][]byte
for _, message := range messages {
currentRawMessage, err := json.Marshal(message)
Expand Down
Loading

0 comments on commit 63486e9

Please sign in to comment.