diff --git a/.gitignore b/.gitignore index e111e089..404bb981 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ -SavedState/ +SavedState*/ **/launch.json **/*.flow **/*.log diff --git a/consensus.go b/consensus.go index 6d399d43..5a2f5bae 100644 --- a/consensus.go +++ b/consensus.go @@ -756,11 +756,16 @@ func (p *Pbft) GetProposal() *Proposal { return p.state.proposal } +// GetCurrentView returns current view +func (p *Pbft) GetCurrentView() *View { + return p.state.view.Copy() +} + // getNextMessage reads a new message from the message queue func (p *Pbft) getNextMessage(span trace.Span, timeout time.Duration) (*MessageReq, bool) { - timeoutCh := time.After(timeout) + timeoutCh := p.notifier.CreateTimeoutChannel(timeout) for { - msg, discards := p.notifier.ReadNextMessage(p) + msg, discards := p.notifier.ReadNextMessage(p, timeoutCh) // send the discard messages p.logger.Printf("[TRACE] Current state %s, number of prepared messages: %d, number of committed messages %d", PbftState(p.state.state), p.state.numPrepared(), p.state.numCommitted()) @@ -775,8 +780,9 @@ func (p *Pbft) getNextMessage(span trace.Span, timeout time.Duration) (*MessageR return msg, true } - // wait until there is a new message or - // someone closes the stopCh (i.e. timeout for round change) + // wait until there is a new message, + // someone closes the timeoutCh (i.e. timeout for round change) or + // cancel function is triggered select { case <-timeoutCh: span.AddEvent("Timeout") diff --git a/e2e/actions.go b/e2e/actions.go index c4a7fd1d..b0a4d290 100644 --- a/e2e/actions.go +++ b/e2e/actions.go @@ -8,6 +8,12 @@ import ( "github.com/0xPolygon/pbft-consensus" ) +const ( + DropNode string = "DropNode" + RevertDropNode string = "RevertDropNode" + LastSequence string = "LastSequence" +) + type RevertFunc func() type FunctionalAction interface { @@ -35,10 +41,13 @@ func (dn *DropNodeAction) Apply(c *Cluster) RevertFunc { log.Printf("Dropping node: '%s'.", nodeToStop) c.StopNode(nodeToStop.name) + view := nodeToStop.GetCurrentView() + c.replayMessageNotifier.HandleAction(DropNode, nodeToStop.name, view.Sequence, view.Round) return func() { log.Printf("Reverting stopped node %v\n", nodeToStop.name) nodeToStop.Start() + c.replayMessageNotifier.HandleAction(RevertDropNode, nodeToStop.name, c.GetMaxHeight()+1, 0) } } diff --git a/e2e/framework.go b/e2e/framework.go index a5be8cda..faa9621c 100644 --- a/e2e/framework.go +++ b/e2e/framework.go @@ -133,11 +133,6 @@ func NewPBFTCluster(t *testing.T, config *ClusterConfig, hook ...transportHook) createBackend: config.CreateBackend, } - err = c.replayMessageNotifier.SaveMetaData(&names) - if err != nil { - log.Printf("[WARNING] Could not write node meta data to replay messages file. Reason: %v", err) - } - for _, name := range names { trace := c.tracer.Tracer(name) n, _ := newPBFTNode(name, config, names, trace, tt) @@ -286,6 +281,11 @@ func (n *node) GetNodeHeight() uint64 { return uint64(n.getSyncIndex()) + 1 } +// GetCurrentView gets current view from the state machine +func (n *node) GetCurrentView() *pbft.View { + return n.pbft.GetCurrentView() +} + func (c *Cluster) syncWithNetwork(nodeID string) (uint64, int64) { c.lock.Lock() defer c.lock.Unlock() @@ -389,6 +389,7 @@ func (c *Cluster) Stop() { n.Stop() } } + c.SaveState() if err := c.tracer.Shutdown(context.Background()); err != nil { panic("failed to shutdown TracerProvider") } @@ -398,6 +399,38 @@ func (c *Cluster) GetTransportHook() transportHook { return c.transport.getHook() } +// SaveState saves messages and action meta data to corresponding .flow files +func (c *Cluster) SaveState() { + c.saveMessages() + c.saveMetaData() +} + +// saveMessages saves all cached messages to .flow file +func (c *Cluster) saveMessages() { + err := c.replayMessageNotifier.SaveState() + if err != nil { + log.Printf("[WARNING] Could not write state to file. Reason: %v", err) + } +} + +// saveMetaData saves all cached actions meta data to .flow file +func (c *Cluster) saveMetaData() { + c.lock.Lock() + defer c.lock.Unlock() + + var nodeNames []string + for name, node := range c.GetNodesMap() { + nodeNames = append(nodeNames, name) + view := node.GetCurrentView() + c.replayMessageNotifier.HandleAction(LastSequence, name, view.Sequence, view.Round) + } + + err := c.replayMessageNotifier.SaveMetaData(nodeNames) + if err != nil { + log.Printf("[WARNING] Could not write node meta data to replay messages file. Reason: %v", err) + } +} + type node struct { // index of node synchronization with the cluster localSyncIndex int64 @@ -521,10 +554,7 @@ func (n *node) Start() { // start the execution n.pbft.Run(ctx) - err := n.c.replayMessageNotifier.SaveState() - if err != nil { - log.Printf("[WARNING] Could not write state to file. Reason: %v", err) - } + n.c.saveMessages() switch n.pbft.GetState() { case pbft.SyncState: @@ -536,6 +566,7 @@ func (n *node) Start() { n.setSyncIndex(currentSyncIndex + 1) default: // stopped + n.c.saveMessages() return } } @@ -716,29 +747,27 @@ func (v *valString) Len() int { // ReplayNotifier is an interface that expands the StateNotifier with additional methods for saving and loading replay messages type ReplayNotifier interface { pbft.StateNotifier - SaveMetaData(nodeNames *[]string) error + SaveMetaData(nodeNames []string) error SaveState() error HandleMessage(to pbft.NodeID, message *pbft.MessageReq) + HandleAction(actionType, data string, sequence, round uint64) } // DefaultReplayNotifier is a null object implementation of ReplayNotifier interface type DefaultReplayNotifier struct { -} - -// HandleTimeout implements StateNotifier interface -func (n *DefaultReplayNotifier) HandleTimeout(to pbft.NodeID, msgType pbft.MsgType, view *pbft.View) { -} - -// ReadNextMessage is an implementation of StateNotifier interface -func (n *DefaultReplayNotifier) ReadNextMessage(p *pbft.Pbft) (*pbft.MessageReq, []*pbft.MessageReq) { - return p.ReadMessageWithDiscards() + pbft.DefaultStateNotifier } // SaveMetaData is an implementation of ReplayNotifier interface -func (n *DefaultReplayNotifier) SaveMetaData(nodeNames *[]string) error { return nil } +func (n *DefaultReplayNotifier) SaveMetaData(nodeNames []string) error { + return nil +} // SaveState is an implementation of ReplayNotifier interface func (n *DefaultReplayNotifier) SaveState() error { return nil } // HandleMessage is an implementation of ReplayNotifier interface func (n *DefaultReplayNotifier) HandleMessage(to pbft.NodeID, message *pbft.MessageReq) {} + +// HandleAction is an implementation of ReplayNotifier interface +func (n *DefaultReplayNotifier) HandleAction(actionType, data string, sequence, round uint64) {} diff --git a/e2e/fuzz/README.md b/e2e/fuzz/README.md new file mode 100644 index 00000000..609f23aa --- /dev/null +++ b/e2e/fuzz/README.md @@ -0,0 +1,44 @@ +- [FUZZ FRAMEWORK](#fuzz-framework) + - [Daemon](#daemon) + - [Replay Messages](#replay-messages) + - [Known issues](#known-issues) + +# FUZZ FRAMEWORK + +Fuzz framework is an automated software testing framework that involves providing invalid, unexpected, or random data as inputs to a computer program. The program is then monitored for exceptions such as crashes, failing built-in code assertions, or potential memory leaks. + +Fuzz framework enables building randomized test sets upon PBFT consensus algorithm. The end goal of the fuzz framework is to ensure that even under stress conditions, we achieve linearity on the PBFT consensus protocol, this is, under some time bounds and conditions (i.e. we cannot produce a block, if there are zero nodes active), a new block is being produced. + +## Daemon +Fuzz daemon represents a test runner for fuzz/randomized tests. It is a separate go module/process, which runs alongside the cluster of Pbft nodes. +It runs a Go routine and a single cluster with predefined number of nodes and randomly picks some subset of predefined actions and applies those to the cluster. + +Supported actions that can be simulated in fuzz daemon are: +1. Drop Node - simulates dropping of a node in cluster where a single node goes offline, and is not communicating with the rest of the network. +2. Partitions - simulates grouping of nodes in seperate partitions, where each partition only communicates with its member nodes. + +To run fuzz daemon (runner) run the following command: + +`go run ./e2e/fuzz/cmd/main.go fuzz-run -nodes={numberOfNodes} -duration={duration}` + +e.g., `go run ./e2e/fuzz/cmd/main.go fuzz-run -nodes=5 -duration=1h` + +To log node execution to separate log files for easier problem analysis, set environment variable `E2E_LOG_TO_FILES` to `true` before running the `fuzz-run` command. Logs are stored in the parent folder from which `fuzz-run` command was called, inside the `logs-{timestamp}` subfolder. Each node will have its own .log file where name of the file will be the name of the corresponding node. + +By default when running `fuzz-run` command, two `.flow` files will be saved containing all the messages that were gossiped during the fuzz daemon execution, alongside some meta data about actions that were simulated during fuzz running. Files are saved inside the parent folder from which `fuzz-run` command was called, inside the `SavedState` subfolder. File containing gossiped messages is named `messages-{timestamp}.flow`, and file containing node and actions meta data is called `metaData-{timestamp}.flow`. These files are needed to replay the fuzz exectuion using the **Replay Messages** feature. + +## Replay Messages +Fuzz framework relies on a randomized set of predefined actions, which are applied and reverted on **PBFT** nodes cluster. Since its execution is non-deterministic and algorithm failure can be discovered at any time, it is of utmost importance to have trace of triggers which led to the failure state and possibility to replay those triggers on-demand arbitrary number of times. This feature enables in-depth analysis of failure. + +Replay takes the provided `messages.flow` and `metaData.flow` files, creates a cluster with appropriate amount of nodes with same name as in fuzz run, pushes all the messages in appropriate node queues and starts the cluster. This enables quick replay of previously run execution and a way to analyze the problem that occurred on demand. + +To run replay messages, run the following command: + +`go run ./e2e/fuzz/cmd/main.go replay-messages -filesDirectory={directoryWhereFlowFilesAreStored}` + +e.g., `go run ./e2e/fuzz/cmd/main.go replay-messages -filesDirectory=../SavedData` + +**NOTE**: Replay does not save .flow files on its execution. + +## Known issues +When saving messages that are gossiped during the execution of fuzz daemon, messages will be sorted by sequence but they will not be in the order that they were gossiped, since `Gossip` method sends messages to each node asynchronously in separate go routins for each receiver. This means that a `PrePrepare` message may not be first in the `.flow` file since all the other messages are also sent in seperate routines. This does not have an effect or causes an issue on replay, since messages are seperated in different queues in **PBFT** state machine depending on its message type (`PrePrepare`, `Prepare`, `Commit`, `RoundChange`). \ No newline at end of file diff --git a/e2e/fuzz/replay/replay_message.go b/e2e/fuzz/replay/replay_message.go index 247a5e7e..a11bd13a 100644 --- a/e2e/fuzz/replay/replay_message.go +++ b/e2e/fuzz/replay/replay_message.go @@ -6,6 +6,24 @@ import ( "github.com/0xPolygon/pbft-consensus" ) +// MetaData is a struct that holds data about fuzz actions that happened and need to be saved in .flow file +type MetaData struct { + DataType string `json:"actionType"` + Data string `json:"data"` + Sequence uint64 `json:"sequence"` + Round uint64 `json:"round"` +} + +// NewMetaData creates a new MetaData object +func NewMetaData(dataType, data string, sequence, round uint64) *MetaData { + return &MetaData{ + DataType: dataType, + Data: data, + Sequence: sequence, + Round: round, + } +} + // ReplayMessage is a struct that represents a single json object in .flow file type ReplayMessage struct { To pbft.NodeID `json:"to"` @@ -31,10 +49,10 @@ func NewReplayTimeoutMessage(to pbft.NodeID, msgType pbft.MsgType, view *pbft.Vi } } -// ConvertToByteArrays converts ReplayMessage slice to JSON representation and return it back as slice of byte arrays -func ConvertToByteArrays(messages []*ReplayMessage) ([][]byte, error) { +// ConvertToByteArrays is a generic method that converts provided slice to JSON representation and returns it back as slice of byte arrays +func ConvertToByteArrays(items []interface{}) ([][]byte, error) { var allRawMessages [][]byte - for _, message := range messages { + for _, message := range items { currentRawMessage, err := json.Marshal(message) if err != nil { return allRawMessages, err diff --git a/e2e/fuzz/replay/replay_message_command.go b/e2e/fuzz/replay/replay_message_command.go index 302c1658..54543c3a 100644 --- a/e2e/fuzz/replay/replay_message_command.go +++ b/e2e/fuzz/replay/replay_message_command.go @@ -6,7 +6,6 @@ import ( "fmt" "log" "strings" - "sync" "time" "github.com/0xPolygon/pbft-consensus" @@ -18,18 +17,18 @@ import ( type ReplayMessageCommand struct { UI cli.Ui - filePath string + filesDirectory string } // Help implements the cli.Command interface -func (fc *ReplayMessageCommand) Help() string { +func (rmc *ReplayMessageCommand) Help() string { return `Runs the message and timeouts replay for analysis and testing purposes based on provided .flow file. - Usage: replay-messages -file={fullPathToFlowFile} + Usage: replay-messages -messagesFile={fullPathToFlowFile} -metaDataFile={fullPathToFlowFile} Options: - -file - Full path to .flow file containing messages and timeouts to be replayed by the fuzz framework` + -filesDirectory - Directory containing .flow files for messages and actions meta data to be replayed by the fuzz framework` } // Synopsis implements the cli.Command interface @@ -45,17 +44,16 @@ func (rmc *ReplayMessageCommand) Run(args []string) int { return 1 } - messageReader := &replayMessageReader{ - msgProcessingDone: make(chan string), - } + messageReader := &replayMessageReader{} + nodeExecutionHandler := NewNodeExecutionHandler() - err = messageReader.openFile(rmc.filePath) + err = messageReader.openFiles(rmc.filesDirectory) if err != nil { rmc.UI.Error(err.Error()) return 1 } - nodeNames, err := messageReader.readNodeMetaData() + nodeNames, err := messageReader.readNodeMetaData(nodeExecutionHandler) if err != nil { rmc.UI.Error(err.Error()) return 1 @@ -67,47 +65,27 @@ func (rmc *ReplayMessageCommand) Run(args []string) int { prefix = (nodeNames[0])[:i] } - replayMessagesNotifier := NewReplayMessagesNotifierWithReader(messageReader) + replayMessagesNotifier := NewReplayMessagesNotifierForReplay(nodeExecutionHandler) nodesCount := len(nodeNames) + var cluster *e2e.Cluster config := &e2e.ClusterConfig{ Count: nodesCount, Name: "fuzz_cluster", Prefix: prefix, ReplayMessageNotifier: replayMessagesNotifier, RoundTimeout: e2e.GetPredefinedTimeout(time.Millisecond), - TransportHandler: func(to pbft.NodeID, msg *pbft.MessageReq) { replayMessagesNotifier.HandleMessage(to, msg) }, + TransportHandler: func(to pbft.NodeID, msg *pbft.MessageReq) { /* we do not gossip messages in replay */ }, CreateBackend: func() e2e.IntegrationBackend { return &ReplayBackend{messageReader: messageReader} }, } - cluster := e2e.NewPBFTCluster(nil, config) + cluster = e2e.NewPBFTCluster(nil, config) - messageReader.readMessages(cluster) + messageReader.readMessages(cluster, nodeExecutionHandler) messageReader.closeFile() - var wg sync.WaitGroup - wg.Add(1) - - nodesDone := make(map[string]bool, nodesCount) - go func() { - for { - select { - case nodeDone := <-messageReader.msgProcessingDone: - nodesDone[nodeDone] = true - cluster.StopNode(nodeDone) - if len(nodesDone) == nodesCount { - wg.Done() - return - } - default: - continue - } - } - }() - - cluster.Start() - wg.Wait() - cluster.Stop() + nodeExecutionHandler.startActionSimulation(cluster) + nodeExecutionHandler.stopActionSimulation(cluster) rmc.UI.Info("Done with execution") if err = replayMessagesNotifier.CloseFile(); err != nil { @@ -121,7 +99,7 @@ func (rmc *ReplayMessageCommand) Run(args []string) int { // NewFlagSet implements the FuzzCLICommand interface and creates a new flag set for command arguments func (rmc *ReplayMessageCommand) NewFlagSet() *flag.FlagSet { flagSet := flag.NewFlagSet("replay-messages", flag.ContinueOnError) - flagSet.StringVar(&rmc.filePath, "file", "", "Full path to .flow file containing messages and timeouts to be replayed by the fuzz framework") + flagSet.StringVar(&rmc.filesDirectory, "filesDirectory", "", "Directory containing .flow files for messages and actions meta data to be replayed by the fuzz framework") return flagSet } @@ -134,10 +112,11 @@ func (rmc *ReplayMessageCommand) validateInput(args []string) error { return err } - if rmc.filePath == "" { - err = errors.New("provided file path is empty") + if rmc.filesDirectory == "" { + err = errors.New("provided files directory is empty") return err } + return nil } diff --git a/e2e/fuzz/replay/replay_message_persister.go b/e2e/fuzz/replay/replay_message_persister.go index 9697c38e..30c6e7aa 100644 --- a/e2e/fuzz/replay/replay_message_persister.go +++ b/e2e/fuzz/replay/replay_message_persister.go @@ -4,31 +4,58 @@ import ( "bufio" "encoding/json" "fmt" + "log" "os" "path/filepath" + "strings" "sync" "time" ) const directoryPath = "../SavedState" +// messagePersister is an interface for storing messages, node and action meta data to .flow files +type messagePersister interface { + saveMetaData(nodeNames []string) error + saveCachedMessages() error + addAction(action *MetaData) + addMessage(message *ReplayMessage) + closeFile() error +} + +// defaultMessagePersister is a default implementation of messagePersister interface +type defaultMessagePersister struct{} + +func (d *defaultMessagePersister) saveMetaData(nodeNames []string) error { + return nil +} +func (d *defaultMessagePersister) saveCachedMessages() error { return nil } +func (d *defaultMessagePersister) addAction(action *MetaData) {} +func (d *defaultMessagePersister) addMessage(message *ReplayMessage) {} +func (d *defaultMessagePersister) closeFile() error { return nil } + // replayMessagePersister encapsulates logic for saving messages in .flow file type replayMessagePersister struct { - lock sync.Mutex - messages []*ReplayMessage - file *os.File + lock sync.Mutex + timestamp string + messages []interface{} + messagesFile *os.File + metaDataFile *os.File + actions []interface{} } // saveMetaData saves node meta data to .flow file -func (r *replayMessagePersister) saveMetaData(nodeNames *[]string) error { +func (r *replayMessagePersister) saveMetaData(nodeNames []string) error { var err error - if err = r.createFile(); err != nil { - return err + var file *os.File + if r.metaDataFile == nil { + if file, err = r.createFile(MetaDataFilePrefix); err != nil { + return err + } + r.metaDataFile = file } - bufWriter := bufio.NewWriter(r.file) - defer bufWriter.Flush() - + bufWriter := bufio.NewWriterSize(r.metaDataFile, maxCharactersPerLine) currentRawMessage, err := json.Marshal(nodeNames) if err != nil { return err @@ -39,7 +66,9 @@ func (r *replayMessagePersister) saveMetaData(nodeNames *[]string) error { return err } - _, err = bufWriter.Write([]byte("\n")) + bufWriter.Write([]byte("\n")) + + r.saveActions(bufWriter) return err } @@ -50,48 +79,27 @@ func (r *replayMessagePersister) saveCachedMessages() error { defer r.lock.Unlock() var err error - if err = r.createFile(); err != nil { - return err + var file *os.File + if r.messagesFile == nil { + if file, err = r.createFile(MessagesFilePrefix); err != nil { + return err + } + r.messagesFile = file } if r.messages != nil { - err = r.saveMessages(r.file) + err = r.saveMessages() } return err } -// createFile creates a .flow file to save messages and timeouts on the predifined location -func (r *replayMessagePersister) createFile() error { - if r.file == nil { - if _, err := os.Stat(directoryPath); os.IsNotExist(err) { - err := os.Mkdir(directoryPath, 0777) - if err != nil { - return err - } - } - - path, err := filepath.Abs(directoryPath) - if err != nil { - return err - } - - file, err := os.OpenFile(filepath.Join(path, fmt.Sprintf("%v_%v.flow", FileName, time.Now().Format(time.RFC3339))), os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660) - if err != nil { - return err - } - r.file = file - } - - return nil -} +// addAction adds an action that happened in fuzz to cache that will be saved to .flow file on cluster Stop +func (r *replayMessagePersister) addAction(action *MetaData) { + r.lock.Lock() + defer r.lock.Unlock() -// closeFile closes file created by the ReplayMessagesHandler if it is open -func (r *replayMessagePersister) closeFile() error { - if r.file != nil { - return r.file.Close() - } - return nil + r.actions = append(r.actions, action) } // addMessage adds a message from sequence to messages cache that will be written to .flow file @@ -102,17 +110,39 @@ func (r *replayMessagePersister) addMessage(message *ReplayMessage) { } // saveMessages saves ReplayMessages to the JSON file within the pre-defined directory. -func (r *replayMessagePersister) saveMessages(fileWriter *os.File) error { +func (r *replayMessagePersister) saveMessages() error { rawMessages, err := ConvertToByteArrays(r.messages) if err != nil { return err } - bufWriter := bufio.NewWriterSize(fileWriter, maxCharactersPerLine) - defer bufWriter.Flush() + bufWritter := bufio.NewWriterSize(r.messagesFile, maxCharactersPerLine) + r.writeToFile(rawMessages, bufWritter) + r.messages = nil + + return nil +} - for _, rawMessage := range rawMessages { - _, err = bufWriter.Write(rawMessage) +// saveActions saves cached actions to .flow file +func (r *replayMessagePersister) saveActions(bufWritter *bufio.Writer) { + rawActions, err := ConvertToByteArrays(r.actions) + if err != nil { + log.Printf("[ERROR] An error occurred while converting actions to byte arrays") + return + } + + if len(rawActions) == 0 { //no actions were done in fuzz + return + } + + r.writeToFile(rawActions, bufWritter) +} + +// writeToFile writes that to designated file using bufio writer +func (r *replayMessagePersister) writeToFile(data [][]byte, bufWriter *bufio.Writer) error { + + for _, rawMessage := range data { + _, err := bufWriter.Write(rawMessage) if err != nil { return err } @@ -123,6 +153,55 @@ func (r *replayMessagePersister) saveMessages(fileWriter *os.File) error { } } - r.messages = nil + bufWriter.Flush() + return nil +} + +// createFile creates a .flow file to save messages and timeouts on the predifined location +func (r *replayMessagePersister) createFile(filePrefix string) (*os.File, error) { + if r.timestamp == "" { + r.timestamp = time.Now().Format(time.RFC3339) + } + + directory := fmt.Sprintf("%v_%v", directoryPath, r.timestamp) + if _, err := os.Stat(directory); os.IsNotExist(err) { + err := os.Mkdir(directory, 0777) + if err != nil { + return nil, err + } + } + + path, err := filepath.Abs(directory) + if err != nil { + return nil, err + } + + file, err := os.OpenFile(filepath.Join(path, fmt.Sprintf("%v.flow", filePrefix)), os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660) + if err != nil { + return nil, err + } + + return file, err +} + +// closeFile closes file created by the ReplayMessagesHandler if it is open +func (r *replayMessagePersister) closeFile() error { + var errStrings []string + if r.messagesFile != nil { + if err := r.messagesFile.Close(); err != nil { + errStrings = append(errStrings, err.Error()) + } + } + + if r.metaDataFile != nil { + if err := r.metaDataFile.Close(); err != nil { + errStrings = append(errStrings, err.Error()) + } + } + + if len(errStrings) > 0 { + return fmt.Errorf(strings.Join(errStrings, "\n")) + } + return nil } diff --git a/e2e/fuzz/replay/replay_message_reader.go b/e2e/fuzz/replay/replay_message_reader.go index c866dfa8..4478f100 100644 --- a/e2e/fuzz/replay/replay_message_reader.go +++ b/e2e/fuzz/replay/replay_message_reader.go @@ -4,9 +4,11 @@ import ( "bufio" "encoding/json" "errors" + "fmt" "log" "os" - "sync" + "path/filepath" + "strings" "github.com/0xPolygon/pbft-consensus" "github.com/0xPolygon/pbft-consensus/e2e" @@ -14,84 +16,127 @@ import ( const ( maxCharactersPerLine = 2048 * 1024 // Increase Scanner buffer size to 2MB per line - messageChunkSize = 200 + messageChunkSize = 200 // Size of a chunk of messages being loaded from .flow file ) -type sequenceMessages struct { - sequence uint64 - messages []*pbft.MessageReq -} - // replayMessageReader encapsulates logic for reading messages from flow file type replayMessageReader struct { - lock sync.Mutex - file *os.File - scanner *bufio.Scanner - msgProcessingDone chan string - nodesDoneWithExecution map[pbft.NodeID]bool - lastSequenceMessages map[pbft.NodeID]*sequenceMessages - prePrepareMessages map[uint64]*pbft.MessageReq + messagesFile *os.File + metaDataFile *os.File + prePrepareMessages map[uint64]*pbft.MessageReq } -// openFile opens the file on provided location -func (r *replayMessageReader) openFile(filePath string) error { - _, err := os.Stat(filePath) +// openFiles opens the .flow files on provided location +func (r *replayMessageReader) openFiles(filesDirectory string) error { + _, err := os.Stat(filesDirectory) if err != nil { return err } - r.file, err = os.Open(filePath) + messagesFilePath := filepath.Join(filesDirectory, fmt.Sprintf("%v.flow", MessagesFilePrefix)) + _, err = os.Stat(messagesFilePath) if err != nil { return err } - r.scanner = bufio.NewScanner(r.file) + r.messagesFile, err = os.Open(messagesFilePath) + if err != nil { + return err + } - buffer := []byte{} - r.scanner.Buffer(buffer, maxCharactersPerLine) + metaDataFilePath := filepath.Join(filesDirectory, fmt.Sprintf("%v.flow", MetaDataFilePrefix)) + _, err = os.Stat(metaDataFilePath) + if err != nil { + return err + } + + r.metaDataFile, err = os.Open(metaDataFilePath) + if err != nil { + return err + } return nil } -// closeFile closes the opened .flow file +// closeFile closes the opened .flow files func (r *replayMessageReader) closeFile() error { - if r.file != nil { - return r.file.Close() + var errors []string + if r.messagesFile != nil { + if err := r.messagesFile.Close(); err != nil { + errors = append(errors, err.Error()) + } } + + if r.metaDataFile != nil { + if err := r.metaDataFile.Close(); err != nil { + errors = append(errors, err.Error()) + } + } + + if len(errors) > 0 { + return fmt.Errorf(strings.Join(errors, "\n")) + } + return nil } -// readNodeMetaData reads the first line of .flow file which should be a list of nodes -func (r *replayMessageReader) readNodeMetaData() ([]string, error) { +// readNodeMetaData reads the node names and action from metaData.flow file +func (r *replayMessageReader) readNodeMetaData(nodeExecutionHandler *replayNodeExecutionHandler) ([]string, error) { + scanner := bufio.NewScanner(r.metaDataFile) + buffer := []byte{} + scanner.Buffer(buffer, maxCharactersPerLine) + var nodeNames []string - r.scanner.Scan() // first line carries the node names needed to create appropriate number of nodes for replay - err := json.Unmarshal(r.scanner.Bytes(), &nodeNames) + scanner.Scan() // first line carries the node names needed to create appropriate number of nodes for replay + err := json.Unmarshal(scanner.Bytes(), &nodeNames) + nodeCount := len(nodeNames) if err != nil { return nil, err - } else if len(nodeNames) == 0 { + } else if nodeCount == 0 { err = errors.New("no nodes were found in .flow file, so no cluster will be started") + return nil, err + } + + nodeExecutionHandler.dropNodeActions = make(map[pbft.NodeID]map[uint64]*MetaData, nodeCount) + nodeExecutionHandler.revertDropNodeActions = make(map[pbft.NodeID]map[uint64]*MetaData, nodeCount) + nodeExecutionHandler.lastSequencesByNode = make(map[pbft.NodeID]*MetaData, nodeCount) + + for _, name := range nodeNames { + nodeExecutionHandler.dropNodeActions[pbft.NodeID(name)] = make(map[uint64]*MetaData) + nodeExecutionHandler.revertDropNodeActions[pbft.NodeID(name)] = make(map[uint64]*MetaData) + } + + for scanner.Scan() { + var action *MetaData + if err := json.Unmarshal(scanner.Bytes(), &action); err != nil { + log.Printf("[ERROR] Error happened on unmarshalling action in .flow file. Reason: %v", err) + return nodeNames, err + } + + switch action.DataType { + case e2e.DropNode: + nodeExecutionHandler.dropNodeActions[pbft.NodeID(action.Data)][action.Sequence] = action + case e2e.RevertDropNode: + nodeExecutionHandler.revertDropNodeActions[pbft.NodeID(action.Data)][action.Sequence] = action + case e2e.LastSequence: + nodeExecutionHandler.lastSequencesByNode[pbft.NodeID(action.Data)] = action + default: + panic("unknown action data type read from metaData.flow file") + } } return nodeNames, err } // readMessages reads messages from open .flow file and pushes them to appropriate nodes -func (r *replayMessageReader) readMessages(cluster *e2e.Cluster) { +func (r *replayMessageReader) readMessages(cluster *e2e.Cluster, nodeExecutionHandler *replayNodeExecutionHandler) { nodes := cluster.GetNodesMap() - nodesCount := len(nodes) - r.nodesDoneWithExecution = make(map[pbft.NodeID]bool, nodesCount) - r.lastSequenceMessages = make(map[pbft.NodeID]*sequenceMessages, nodesCount) - r.prePrepareMessages = make(map[uint64]*pbft.MessageReq) - messagesChannel := make(chan []*ReplayMessage) doneChannel := make(chan struct{}) + r.prePrepareMessages = make(map[uint64]*pbft.MessageReq) r.startChunkReading(messagesChannel, doneChannel) - nodeMessages := make(map[pbft.NodeID]map[uint64][]*pbft.MessageReq, nodesCount) - for _, n := range nodes { - nodeMessages[pbft.NodeID(n.GetName())] = make(map[uint64][]*pbft.MessageReq) - } isDone := false LOOP: @@ -101,11 +146,9 @@ LOOP: for _, message := range messages { node, exists := nodes[string(message.To)] if !exists { - log.Printf("[WARNING] Could not find node: %v to push message from .flow file.\n", message.To) + log.Printf("[WARNING] Could not find node: %v to push message from .flow file", message.To) } else { node.PushMessageInternal(message.Message) - nodeMessages[message.To][message.Message.View.Sequence] = append(nodeMessages[message.To][message.Message.View.Sequence], message.Message) - if !isTimeoutMessage(message.Message) && message.Message.Type == pbft.MessageReq_Preprepare { if _, isPrePrepareAdded := r.prePrepareMessages[message.Message.View.Sequence]; !isPrePrepareAdded { r.prePrepareMessages[message.Message.View.Sequence] = message.Message @@ -114,20 +157,6 @@ LOOP: } } case <-doneChannel: - for name, n := range nodeMessages { - nodeLastSequence := uint64(0) - for sequence := range n { - if nodeLastSequence < sequence { - nodeLastSequence = sequence - } - } - - r.lastSequenceMessages[name] = &sequenceMessages{ - sequence: nodeLastSequence, - messages: nodeMessages[name][nodeLastSequence], - } - } - isDone = true break LOOP } @@ -137,12 +166,16 @@ LOOP: // startChunkReading reads messages from .flow file in chunks func (r *replayMessageReader) startChunkReading(messagesChannel chan []*ReplayMessage, doneChannel chan struct{}) { go func() { + scanner := bufio.NewScanner(r.messagesFile) + buffer := []byte{} + scanner.Buffer(buffer, maxCharactersPerLine) + messages := make([]*ReplayMessage, 0) i := 0 - for r.scanner.Scan() { + for scanner.Scan() { var message *ReplayMessage - if err := json.Unmarshal(r.scanner.Bytes(), &message); err != nil { - log.Printf("[ERROR] Error happened on unmarshalling a message in .flow file. Reason: %v.\n", err) + if err := json.Unmarshal(scanner.Bytes(), &message); err != nil { + log.Printf("[ERROR] Error happened on unmarshalling a message in .flow file. Reason: %v", err) return } @@ -164,44 +197,3 @@ func (r *replayMessageReader) startChunkReading(messagesChannel chan []*ReplayMe doneChannel <- struct{}{} }() } - -// checkIfDoneWithExecution checks if node finished with processing all the messages from .flow file -func (r *replayMessageReader) checkIfDoneWithExecution(validatorId pbft.NodeID, msg *pbft.MessageReq) { - if msg.View.Sequence > r.lastSequenceMessages[validatorId].sequence || - (msg.View.Sequence == r.lastSequenceMessages[validatorId].sequence && r.areMessagesFromLastSequenceProcessed(msg, validatorId)) { - r.lock.Lock() - if _, isDone := r.nodesDoneWithExecution[validatorId]; !isDone { - r.nodesDoneWithExecution[validatorId] = true - r.msgProcessingDone <- string(validatorId) - } - r.lock.Unlock() - } -} - -// areMessagesFromLastSequenceProcessed checks if all the messages from the last sequence of given node are processed so that the node can be stoped -func (r *replayMessageReader) areMessagesFromLastSequenceProcessed(msg *pbft.MessageReq, validatorId pbft.NodeID) bool { - lastSequenceMessages := r.lastSequenceMessages[validatorId] - - lastSequenceMessagesCount := len(lastSequenceMessages.messages) - if lastSequenceMessagesCount > 0 { - messageIndexToRemove := -1 - for i, message := range lastSequenceMessages.messages { - if msg.Equal(message) { - messageIndexToRemove = i - break - } - } - - if messageIndexToRemove != -1 { - lastSequenceMessages.messages = append(lastSequenceMessages.messages[:messageIndexToRemove], lastSequenceMessages.messages[messageIndexToRemove+1:]...) - lastSequenceMessagesCount = len(lastSequenceMessages.messages) - } - } - - return lastSequenceMessagesCount == 0 -} - -// isTimeoutMessage checks if message in .flow file represents a timeout -func isTimeoutMessage(message *pbft.MessageReq) bool { - return message.Hash == nil && message.Proposal == nil && message.Seal == nil && message.From == "" -} diff --git a/e2e/fuzz/replay/replay_messages_notifier.go b/e2e/fuzz/replay/replay_messages_notifier.go index 3c930c9e..3d4d08e7 100644 --- a/e2e/fuzz/replay/replay_messages_notifier.go +++ b/e2e/fuzz/replay/replay_messages_notifier.go @@ -1,17 +1,20 @@ package replay import ( + "time" + "github.com/0xPolygon/pbft-consensus" ) const ( - FileName = "messages" + MessagesFilePrefix = "messages" + MetaDataFilePrefix = "metaData" ) // ReplayMessagesNotifier is a struct that implements ReplayNotifier interface type ReplayMessagesNotifier struct { - messagePersister *replayMessagePersister - messageReader *replayMessageReader + messagePersister messagePersister + nodeExectionHandler *replayNodeExecutionHandler } // NewReplayMessagesNotifierWithPersister creates a new messages notifier with messages persister (required when fuzz-run is executed to save messages to file) @@ -21,16 +24,16 @@ func NewReplayMessagesNotifierWithPersister() *ReplayMessagesNotifier { } } -// NewReplayMessagesNotifierWithReader creates a new messages notifier with messages reader (required when replay-messages is executed to read messages from file) -func NewReplayMessagesNotifierWithReader(r *replayMessageReader) *ReplayMessagesNotifier { +// NewReplayMessagesNotifierForReplay creates a new messages notifier with plugins needed for replay +func NewReplayMessagesNotifierForReplay(nodeExecutionHandler *replayNodeExecutionHandler) *ReplayMessagesNotifier { return &ReplayMessagesNotifier{ - messageReader: r, - messagePersister: &replayMessagePersister{}, + messagePersister: &defaultMessagePersister{}, + nodeExectionHandler: nodeExecutionHandler, } } // SaveMetaData saves node meta data to .flow file -func (r *ReplayMessagesNotifier) SaveMetaData(nodeNames *[]string) error { +func (r *ReplayMessagesNotifier) SaveMetaData(nodeNames []string) error { return r.messagePersister.saveMetaData(nodeNames) } @@ -50,14 +53,19 @@ func (r *ReplayMessagesNotifier) HandleTimeout(to pbft.NodeID, msgType pbft.MsgT } // ReadNextMessage is an implementation of StateNotifier interface -func (r *ReplayMessagesNotifier) ReadNextMessage(p *pbft.Pbft) (*pbft.MessageReq, []*pbft.MessageReq) { +func (r *ReplayMessagesNotifier) ReadNextMessage(p *pbft.Pbft, timeoutChannel chan time.Time) (*pbft.MessageReq, []*pbft.MessageReq) { msg, discards := p.ReadMessageWithDiscards() - if r.messageReader != nil && msg != nil { - if isTimeoutMessage(msg) { - return nil, nil + if r.nodeExectionHandler != nil { + validatorId := p.GetValidatorId() + if msg != nil { + r.nodeExectionHandler.checkForNodesToBeRestarted(msg.View.Sequence) + if r.nodeExectionHandler.checkIsTimeout(validatorId, msg, timeoutChannel) { + return nil, nil + } } else { - r.messageReader.checkIfDoneWithExecution(p.GetValidatorId(), msg) + // if node has no more messages for given round and sequence in queue it was dropped in fuzz + r.nodeExectionHandler.checkIfShouldDrop(validatorId, p.GetCurrentView(), timeoutChannel) } } @@ -68,3 +76,13 @@ func (r *ReplayMessagesNotifier) ReadNextMessage(p *pbft.Pbft) (*pbft.MessageReq func (r *ReplayMessagesNotifier) CloseFile() error { return r.messagePersister.closeFile() } + +// CreateTimeoutChannel is an implementation of StateNotifier interface +func (r *ReplayMessagesNotifier) CreateTimeoutChannel(timeout time.Duration) chan time.Time { + return make(chan time.Time) +} + +// HandleAction is an implementation of ReplayMessageNotifier interface +func (r *ReplayMessagesNotifier) HandleAction(actionType, data string, sequence, round uint64) { + r.messagePersister.addAction(NewMetaData(actionType, data, sequence, round)) +} diff --git a/e2e/fuzz/replay/replay_node_execution.go b/e2e/fuzz/replay/replay_node_execution.go new file mode 100644 index 00000000..95e7b611 --- /dev/null +++ b/e2e/fuzz/replay/replay_node_execution.go @@ -0,0 +1,153 @@ +package replay + +import ( + "fmt" + "log" + "sync" + "time" + + "github.com/0xPolygon/pbft-consensus" + "github.com/0xPolygon/pbft-consensus/e2e" +) + +// replayNodeExecutionHandler handles the simulation of actions and +type replayNodeExecutionHandler struct { + actionLock sync.Mutex + nodesExecutionLock sync.Mutex + lastSequencesByNode map[pbft.NodeID]*MetaData + dropNodeActions map[pbft.NodeID]map[uint64]*MetaData + revertDropNodeActions map[pbft.NodeID]map[uint64]*MetaData + nodesDoneWithExection map[pbft.NodeID]bool + dropedNodes map[pbft.NodeID]uint64 + cluster *e2e.Cluster + msgExecutionDone chan string + dropNode chan string +} + +// NewNodeExecutionHandler creates a new replayNodeExecutionHandler +func NewNodeExecutionHandler() *replayNodeExecutionHandler { + return &replayNodeExecutionHandler{ + dropedNodes: make(map[pbft.NodeID]uint64), + msgExecutionDone: make(chan string), + dropNode: make(chan string), + } +} + +// startActionSimulation starts the cluster and simulates fuzz actions in replay that were saved in meta data file +func (r *replayNodeExecutionHandler) startActionSimulation(cluster *e2e.Cluster) { + r.cluster = cluster + revertTicker := time.NewTicker(10 * time.Millisecond) + nodesCount := len(cluster.GetNodes()) + r.nodesDoneWithExection = make(map[pbft.NodeID]bool, nodesCount) + stoppedNodes := make(map[string]bool, nodesCount) + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + maxHeight := uint64(0) + for { + select { + case <-revertTicker.C: + currentMaxHeight := cluster.GetMaxHeight() + 1 + if maxHeight < currentMaxHeight { + maxHeight = currentMaxHeight + } + r.checkForNodesToBeRestarted(maxHeight) + case nodeDone := <-r.msgExecutionDone: + cluster.StopNode(nodeDone) + stoppedNodes[nodeDone] = true + if len(stoppedNodes) == nodesCount { + wg.Done() + return + } + case nodeToDrop := <-r.dropNode: + cluster.StopNode(nodeToDrop) + default: + continue + } + } + }() + + cluster.Start() + wg.Wait() +} + +// stopActionSimulation stops the cluster and replay execution +func (r *replayNodeExecutionHandler) stopActionSimulation(cluster *e2e.Cluster) { + cluster.Stop() +} + +// checkIfDoneWithExecution checks if node finished with processing all the messages from .flow file +func (r *replayNodeExecutionHandler) checkIfDoneWithExecution(validatorId pbft.NodeID, sequence, round uint64) bool { + lastSequence := r.lastSequencesByNode[validatorId] + if sequence > lastSequence.Sequence || + (sequence == lastSequence.Sequence && round == lastSequence.Round) { + r.nodesExecutionLock.Lock() + defer r.nodesExecutionLock.Unlock() + + if _, isDone := r.nodesDoneWithExection[validatorId]; !isDone { + r.nodesDoneWithExection[validatorId] = true + r.msgExecutionDone <- string(validatorId) + } + return true + } + + return false +} + +// checkForNodesToBeRestarted checks if any node need to be restarted when a certain sequence is reached +func (r *replayNodeExecutionHandler) checkForNodesToBeRestarted(sequence uint64) { + r.actionLock.Lock() + defer r.actionLock.Unlock() + + for node, dropedSequence := range r.dropedNodes { + if _, hasSequence := r.revertDropNodeActions[node][sequence]; hasSequence && sequence >= dropedSequence { + log.Printf("[REPLAY] Restarting node: %v in sequence: %v.", node, sequence) + delete(r.dropedNodes, node) + r.cluster.StartNode(string(node)) + } + } +} + +// checkIsTimeout checks if a message is a timeout and triggers the timeout channel of provided validator +func (r *replayNodeExecutionHandler) checkIsTimeout(validatorId pbft.NodeID, msg *pbft.MessageReq, timeoutChannel chan time.Time) bool { + if isTimeoutMessage(msg) { + go func() { + log.Printf("[REPLAY] A timeout occurred in node: %v, sequence: %v, round: %v.", validatorId, msg.View.Sequence, msg.View.Round) + timeoutChannel <- time.Now() + }() + return true + } + + return false +} + +// checkIfShouldDrop checks if node should be dropped or stopped in given sequence and round +func (r *replayNodeExecutionHandler) checkIfShouldDrop(validatorId pbft.NodeID, view *pbft.View, timeoutChannel chan time.Time) { + if _, exists := r.dropNodeActions[validatorId][view.Sequence]; exists { + log.Printf("[REPLAY] Dropping node: %v in sequence: %v.", validatorId, view.Sequence) + r.nodesExecutionLock.Lock() + defer r.nodesExecutionLock.Unlock() + + if r.lastSequencesByNode[validatorId].Sequence <= view.Sequence { + // it is dropped in the last sequence, there is no messages for it after this point, so it is done with execution + if _, isDone := r.nodesDoneWithExection[validatorId]; !isDone { + r.nodesDoneWithExection[validatorId] = true + r.msgExecutionDone <- string(validatorId) + } + } else { + r.dropedNodes[validatorId] = view.Sequence + r.dropNode <- string(validatorId) + } + } else { + if !r.checkIfDoneWithExecution(validatorId, view.Sequence, view.Round) { + panic(fmt.Sprintf("node: %v is incorrect state in replay. It should be either dropped or done.", validatorId)) + } + } +} + +// isTimeoutMessage checks if message in .flow file represents a timeout +func isTimeoutMessage(message *pbft.MessageReq) bool { + return message.Hash == nil && message.Proposal == nil && message.Seal == nil && message.From == "" +} diff --git a/e2e/fuzz/runner.go b/e2e/fuzz/runner.go index 58cd220d..8d78ffc5 100644 --- a/e2e/fuzz/runner.go +++ b/e2e/fuzz/runner.go @@ -108,6 +108,8 @@ func validateNodes(c *e2e.Cluster) { for _, n := range c.Nodes() { log.Printf("Node: %v, running: %v, locked: %v, height: %v, proposal: %v\n", n.GetName(), n.IsRunning(), n.IsLocked(), n.GetNodeHeight(), n.GetProposal()) } + + c.SaveState() panic("Desired height not reached.") } log.Println("Cluster validation done.") diff --git a/state.go b/state.go index 2a644eeb..84a5b90f 100644 --- a/state.go +++ b/state.go @@ -382,7 +382,9 @@ type StateNotifier interface { // HandleTimeout notifies that a timeout occured while getting next message HandleTimeout(to NodeID, msgType MsgType, view *View) // ReadNextMessage reads the next message from message queue of the state machine - ReadNextMessage(p *Pbft) (*MessageReq, []*MessageReq) + ReadNextMessage(p *Pbft, timeoutChannel chan time.Time) (*MessageReq, []*MessageReq) + //CreateTimeoutChannel creates a channel on which timeout will be signaled + CreateTimeoutChannel(timeout time.Duration) chan time.Time } // DefaultStateNotifier is a null object implementation of StateNotifier interface @@ -393,6 +395,17 @@ type DefaultStateNotifier struct { func (d *DefaultStateNotifier) HandleTimeout(to NodeID, msgType MsgType, view *View) {} // ReadNextMessage is an implementation of StateNotifier interface -func (d *DefaultStateNotifier) ReadNextMessage(p *Pbft) (*MessageReq, []*MessageReq) { +func (d *DefaultStateNotifier) ReadNextMessage(p *Pbft, timeoutChannel chan time.Time) (*MessageReq, []*MessageReq) { return p.ReadMessageWithDiscards() } + +// CreateTimeoutChannel is an implementation of StateNotifier interface +func (d *DefaultStateNotifier) CreateTimeoutChannel(timeout time.Duration) chan time.Time { + timeoutChannel := make(chan time.Time) + + time.AfterFunc(timeout, func() { + timeoutChannel <- time.Now() + }) + + return timeoutChannel +}