From 63486e906b54ffe1998f8183c35e0e06636dee18 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Fri, 1 Apr 2022 15:50:06 +0200 Subject: [PATCH] Drop node simulation in replay --- consensus.go | 14 +- e2e/actions.go | 42 +++++ e2e/framework.go | 75 +++++--- e2e/fuzz/README.md | 45 +++++ e2e/fuzz/replay/replay_message.go | 4 +- e2e/fuzz/replay/replay_message_command.go | 67 +++---- e2e/fuzz/replay/replay_message_persister.go | 187 ++++++++++++++------ e2e/fuzz/replay/replay_message_reader.go | 184 +++++++++---------- e2e/fuzz/replay/replay_messages_notifier.go | 46 +++-- e2e/fuzz/replay/replay_node_execution.go | 153 ++++++++++++++++ e2e/fuzz/runner.go | 2 + state.go | 17 +- 12 files changed, 604 insertions(+), 232 deletions(-) create mode 100644 e2e/fuzz/README.md create mode 100644 e2e/fuzz/replay/replay_node_execution.go diff --git a/consensus.go b/consensus.go index 6d399d43..ae4f0e63 100644 --- a/consensus.go +++ b/consensus.go @@ -756,11 +756,16 @@ func (p *Pbft) GetProposal() *Proposal { return p.state.proposal } +// GetCurrentView returnes current view +func (p *Pbft) GetCurrentView() *View { + return p.state.view.Copy() +} + // getNextMessage reads a new message from the message queue func (p *Pbft) getNextMessage(span trace.Span, timeout time.Duration) (*MessageReq, bool) { - timeoutCh := time.After(timeout) + timeoutCh := p.notifier.CreateTimeoutChannel(timeout) for { - msg, discards := p.notifier.ReadNextMessage(p) + msg, discards := p.notifier.ReadNextMessage(p, timeoutCh) // send the discard messages p.logger.Printf("[TRACE] Current state %s, number of prepared messages: %d, number of committed messages %d", PbftState(p.state.state), p.state.numPrepared(), p.state.numCommitted()) @@ -775,8 +780,9 @@ func (p *Pbft) getNextMessage(span trace.Span, timeout time.Duration) (*MessageR return msg, true } - // wait until there is a new message or - // someone closes the stopCh (i.e. timeout for round change) + // wait until there is a new message, + // someone closes the timeoutCh (i.e. timeout for round change) or + // cancel function is triggered select { case <-timeoutCh: span.AddEvent("Timeout") diff --git a/e2e/actions.go b/e2e/actions.go index c4a7fd1d..2dd5d485 100644 --- a/e2e/actions.go +++ b/e2e/actions.go @@ -1,6 +1,7 @@ package e2e import ( + "encoding/json" "log" "math/rand" "time" @@ -8,6 +9,44 @@ import ( "github.com/0xPolygon/pbft-consensus" ) +const ( + DropNode string = "DropNode" + RevertDropNode string = "RevertDropNode" + Partition string = "Partition" + LastSequence string = "LastSequence" +) + +// MetaData is a struct that holds data about fuzz actions that happened and need to be saved in .flow file +type MetaData struct { + DataType string `json:"actionType"` + Data string `json:"data"` + Sequence uint64 `json:"sequence"` + Round uint64 `json:"round"` +} + +// NewMetaData creates a new MetaData object +func NewMetaData(dataType, data string, sequence, round uint64) *MetaData { + return &MetaData{ + DataType: dataType, + Data: data, + Sequence: sequence, + Round: round, + } +} + +// ConvertActionsToByteArrays converts ActionSerializable slice to JSON representation and returns it back as slice of byte arrays +func ConvertActionsToByteArrays(actions []*MetaData) ([][]byte, error) { + var allRawACtions [][]byte + for _, message := range actions { + currentRawAction, err := json.Marshal(message) + if err != nil { + return allRawACtions, err + } + allRawACtions = append(allRawACtions, currentRawAction) + } + return allRawACtions, nil +} + type RevertFunc func() type FunctionalAction interface { @@ -35,10 +74,13 @@ func (dn *DropNodeAction) Apply(c *Cluster) RevertFunc { log.Printf("Dropping node: '%s'.", nodeToStop) c.StopNode(nodeToStop.name) + view := nodeToStop.GetCurrentView() + c.replayMessageNotifier.HandleAction(NewMetaData(DropNode, nodeToStop.name, view.Sequence, view.Round)) return func() { log.Printf("Reverting stopped node %v\n", nodeToStop.name) nodeToStop.Start() + c.replayMessageNotifier.HandleAction(NewMetaData(RevertDropNode, nodeToStop.name, c.GetMaxHeight()+1, 0)) } } diff --git a/e2e/framework.go b/e2e/framework.go index a5be8cda..dd702965 100644 --- a/e2e/framework.go +++ b/e2e/framework.go @@ -133,11 +133,6 @@ func NewPBFTCluster(t *testing.T, config *ClusterConfig, hook ...transportHook) createBackend: config.CreateBackend, } - err = c.replayMessageNotifier.SaveMetaData(&names) - if err != nil { - log.Printf("[WARNING] Could not write node meta data to replay messages file. Reason: %v", err) - } - for _, name := range names { trace := c.tracer.Tracer(name) n, _ := newPBFTNode(name, config, names, trace, tt) @@ -286,6 +281,11 @@ func (n *node) GetNodeHeight() uint64 { return uint64(n.getSyncIndex()) + 1 } +// GetCurrentView gets current view from the state machine +func (n *node) GetCurrentView() *pbft.View { + return n.pbft.GetCurrentView() +} + func (c *Cluster) syncWithNetwork(nodeID string) (uint64, int64) { c.lock.Lock() defer c.lock.Unlock() @@ -389,6 +389,7 @@ func (c *Cluster) Stop() { n.Stop() } } + c.SaveState() if err := c.tracer.Shutdown(context.Background()); err != nil { panic("failed to shutdown TracerProvider") } @@ -398,6 +399,44 @@ func (c *Cluster) GetTransportHook() transportHook { return c.transport.getHook() } +// SaveState saves messages and action meta data to corresponding .flow files +func (c *Cluster) SaveState() { + c.saveMessages() + c.saveMetaData() +} + +// saveMessages saves all cached messages to .flow file +func (c *Cluster) saveMessages() { + err := c.replayMessageNotifier.SaveState() + if err != nil { + log.Printf("[WARNING] Could not write state to file. Reason: %v", err) + } +} + +// saveMetaData saves all cached actions meta data to .flow file +func (c *Cluster) saveMetaData() { + c.lock.Lock() + defer c.lock.Unlock() + + var nodeNames []string + var lastSequences []*MetaData + for name, node := range c.GetNodesMap() { + nodeNames = append(nodeNames, name) + view := node.GetCurrentView() + lastSequences = append(lastSequences, &MetaData{ + DataType: LastSequence, + Data: name, + Sequence: view.Sequence, + Round: view.Round, + }) + } + + err := c.replayMessageNotifier.SaveMetaData(nodeNames, lastSequences) + if err != nil { + log.Printf("[WARNING] Could not write node meta data to replay messages file. Reason: %v", err) + } +} + type node struct { // index of node synchronization with the cluster localSyncIndex int64 @@ -521,10 +560,7 @@ func (n *node) Start() { // start the execution n.pbft.Run(ctx) - err := n.c.replayMessageNotifier.SaveState() - if err != nil { - log.Printf("[WARNING] Could not write state to file. Reason: %v", err) - } + n.c.saveMessages() switch n.pbft.GetState() { case pbft.SyncState: @@ -536,6 +572,7 @@ func (n *node) Start() { n.setSyncIndex(currentSyncIndex + 1) default: // stopped + n.c.saveMessages() return } } @@ -716,29 +753,27 @@ func (v *valString) Len() int { // ReplayNotifier is an interface that expands the StateNotifier with additional methods for saving and loading replay messages type ReplayNotifier interface { pbft.StateNotifier - SaveMetaData(nodeNames *[]string) error + SaveMetaData(nodeNames []string, metaData []*MetaData) error SaveState() error HandleMessage(to pbft.NodeID, message *pbft.MessageReq) + HandleAction(action *MetaData) } // DefaultReplayNotifier is a null object implementation of ReplayNotifier interface type DefaultReplayNotifier struct { -} - -// HandleTimeout implements StateNotifier interface -func (n *DefaultReplayNotifier) HandleTimeout(to pbft.NodeID, msgType pbft.MsgType, view *pbft.View) { -} - -// ReadNextMessage is an implementation of StateNotifier interface -func (n *DefaultReplayNotifier) ReadNextMessage(p *pbft.Pbft) (*pbft.MessageReq, []*pbft.MessageReq) { - return p.ReadMessageWithDiscards() + pbft.DefaultStateNotifier } // SaveMetaData is an implementation of ReplayNotifier interface -func (n *DefaultReplayNotifier) SaveMetaData(nodeNames *[]string) error { return nil } +func (n *DefaultReplayNotifier) SaveMetaData(nodeNames []string, metaData []*MetaData) error { + return nil +} // SaveState is an implementation of ReplayNotifier interface func (n *DefaultReplayNotifier) SaveState() error { return nil } // HandleMessage is an implementation of ReplayNotifier interface func (n *DefaultReplayNotifier) HandleMessage(to pbft.NodeID, message *pbft.MessageReq) {} + +// HandleAction is an implementation of ReplayNotifier interface +func (n *DefaultReplayNotifier) HandleAction(action *MetaData) {} diff --git a/e2e/fuzz/README.md b/e2e/fuzz/README.md new file mode 100644 index 00000000..4ad9fa8d --- /dev/null +++ b/e2e/fuzz/README.md @@ -0,0 +1,45 @@ +- [FUZZ FRAMEWORK](#fuzz-framework) + - [Daemon](#daemon) + - [Replay Messages](#replay-messages) + - [Known issues](#known-issues) + +# FUZZ FRAMEWORK + +Fuzz framework is an automated software testing framework that involves providing invalid, unexpected, or random data as inputs to a computer program. The program is then monitored for exceptions such as crashes, failing built-in code assertions, or potential memory leaks. + +Fuzz framework enables building randomized test sets upon PBFT consensus algorithm. The end goal of the fuzz framework is to ensure that even under stress conditions, we achieve linearity on the PBFT consensus protocol, this is, under some time bounds and conditions (i.e. we cannot produce a block, if there are zero nodes active), a new block is being produced. + +## Daemon +Fuzz daemon represents a test runner for fuzz/randomized tests. It is a separate go module/process, which runs alongside the cluster of Pbft nodes. +It runs a Go routine and a single cluster with predefined number of nodes and randomly picks some subset of predefined actions and applies those to the cluster. + +Supported actions that can be simulated in fuzz daemon are: +1. Drop Node - simulates dropping of a node in cluster where a single node goes offline, and is not communicating with the rest of the network. +2. Partitions - simulates grouping of nodes in seperate partitions, where each partition only communicates with its member nodes. +3. Flow Map - simulates a message routing mechanism where some of the nodes within the cluster are fully connected, whereas some of them are partially connected (namely only to the subset of peer nodes). + +To run fuzz daemon (runner) run the following command: + +`go run ./e2e/fuzz/cmd/main.go fuzz-run -nodes={numberOfNodes} -duration={duration}` + +e.g., `go run ./e2e/fuzz/cmd/main.go fuzz-run -nodes=5 -duration=1h` + +To log node execution to separate log files for easier problem analysis, set environment variable `E2E_LOG_TO_FILES` to `true` before running the `fuzz-run` command. Logs are stored in the parent folder from which `fuzz-run` command was called, inside the `logs-{timestamp}` subfolder. Each node will have its own .log file where name of the file will be the name of the corresponding node. + +By default when running `fuzz-run` command, two `.flow` files will be saved containing all the messages that were gossiped during the fuzz daemon execution, alongside some meta data about actions that were simulated during fuzz running. Files are saved inside the parent folder from which `fuzz-run` command was called, inside the `SavedState` subfolder. File containing gossiped messages is named `messages-{timestamp}.flow`, and file containing node and actions meta data is called `metaData-{timestamp}.flow`. These files are needed to replay the fuzz exectuion using the **Replay Messages** feature. + +## Replay Messages +Fuzz framework relies on a randomized set of predefined actions, which are applied and reverted on **PolyBFT** nodes cluster. Since its execution is non-deterministic and algorithm failure can be discovered at any time, it is of utmost importance to have trace of triggers which led to the failure state and possibility to replay those triggers on-demand arbitrary number of times. This feature enables in-depth analysis of failure. + +Replay takes the provided `messages.flow` and `metaData.flow` files, creates a cluster with appropriate amount of nodes with same name as in fuzz run, pushes all the messages in appropriate node queues and starts the cluster. This enables quick replay of previously run execution and a way to analyze the problem that occurred on demand. + +To run replay messages, run the following command: + +`go run ./e2e/fuzz/cmd/main.go replay-messages -messagesFile={fullPathToMessagesFile} -metaDataFile={fullPathToMetaDataFile}` + +e.g., `go run ./e2e/fuzz/cmd/main.go replay-messages -messagesFile=../SavedData/messages.flow -metaDataFile=../SavedData/metaData.flow` + +**NOTE**: Replay does not save .flow files on its execution. + +## Known issues +When saving messages that are gossiped during the execution of fuzz daemon, messages will be sorted by sequence but they will not be in the order that they were gossiped, since `Gossip` method sends messages to each node asynchronously in separate go routins for each receiver. This means that a `PrePrepare` message may not be first in the `.flow` file since all the other messages are also sent in seperate routines. This does not have an effect or causes an issue on replay, since messages are seperated in different queues in **PolyBFT** state machine depending on its message type (`PrePrepare`, `Prepare`, `Commit`, `RoundChange`). \ No newline at end of file diff --git a/e2e/fuzz/replay/replay_message.go b/e2e/fuzz/replay/replay_message.go index 247a5e7e..9dc92fee 100644 --- a/e2e/fuzz/replay/replay_message.go +++ b/e2e/fuzz/replay/replay_message.go @@ -31,8 +31,8 @@ func NewReplayTimeoutMessage(to pbft.NodeID, msgType pbft.MsgType, view *pbft.Vi } } -// ConvertToByteArrays converts ReplayMessage slice to JSON representation and return it back as slice of byte arrays -func ConvertToByteArrays(messages []*ReplayMessage) ([][]byte, error) { +// ConvertMessagesToByteArrays converts ReplayMessage slice to JSON representation and return it back as slice of byte arrays +func ConvertMessagesToByteArrays(messages []*ReplayMessage) ([][]byte, error) { var allRawMessages [][]byte for _, message := range messages { currentRawMessage, err := json.Marshal(message) diff --git a/e2e/fuzz/replay/replay_message_command.go b/e2e/fuzz/replay/replay_message_command.go index 302c1658..6f6a60f7 100644 --- a/e2e/fuzz/replay/replay_message_command.go +++ b/e2e/fuzz/replay/replay_message_command.go @@ -6,7 +6,6 @@ import ( "fmt" "log" "strings" - "sync" "time" "github.com/0xPolygon/pbft-consensus" @@ -18,18 +17,20 @@ import ( type ReplayMessageCommand struct { UI cli.Ui - filePath string + messagesFilePath string + metaDataFilePath string } // Help implements the cli.Command interface -func (fc *ReplayMessageCommand) Help() string { +func (rmc *ReplayMessageCommand) Help() string { return `Runs the message and timeouts replay for analysis and testing purposes based on provided .flow file. - Usage: replay-messages -file={fullPathToFlowFile} + Usage: replay-messages -messagesFile={fullPathToFlowFile} -metaDataFile={fullPathToFlowFile} Options: - -file - Full path to .flow file containing messages and timeouts to be replayed by the fuzz framework` + -messagesFile - Full path to .flow file containing messages and timeouts to be replayed by the fuzz framework + -metaDataFile - Full path to .flow file containing meta data for nodes and fuzz actions` } // Synopsis implements the cli.Command interface @@ -45,17 +46,16 @@ func (rmc *ReplayMessageCommand) Run(args []string) int { return 1 } - messageReader := &replayMessageReader{ - msgProcessingDone: make(chan string), - } + messageReader := &replayMessageReader{} + nodeExecutionHandler := NewNodeExecutionHandler() - err = messageReader.openFile(rmc.filePath) + err = messageReader.openFiles(rmc.messagesFilePath, rmc.metaDataFilePath) if err != nil { rmc.UI.Error(err.Error()) return 1 } - nodeNames, err := messageReader.readNodeMetaData() + nodeNames, err := messageReader.readNodeMetaData(nodeExecutionHandler) if err != nil { rmc.UI.Error(err.Error()) return 1 @@ -67,47 +67,27 @@ func (rmc *ReplayMessageCommand) Run(args []string) int { prefix = (nodeNames[0])[:i] } - replayMessagesNotifier := NewReplayMessagesNotifierWithReader(messageReader) + replayMessagesNotifier := NewReplayMessagesNotifierForReplay(nodeExecutionHandler) nodesCount := len(nodeNames) + var cluster *e2e.Cluster config := &e2e.ClusterConfig{ Count: nodesCount, Name: "fuzz_cluster", Prefix: prefix, ReplayMessageNotifier: replayMessagesNotifier, RoundTimeout: e2e.GetPredefinedTimeout(time.Millisecond), - TransportHandler: func(to pbft.NodeID, msg *pbft.MessageReq) { replayMessagesNotifier.HandleMessage(to, msg) }, + TransportHandler: func(to pbft.NodeID, msg *pbft.MessageReq) { /* we do not gossip messages in replay */ }, CreateBackend: func() e2e.IntegrationBackend { return &ReplayBackend{messageReader: messageReader} }, } - cluster := e2e.NewPBFTCluster(nil, config) + cluster = e2e.NewPBFTCluster(nil, config) - messageReader.readMessages(cluster) + messageReader.readMessages(cluster, nodeExecutionHandler) messageReader.closeFile() - var wg sync.WaitGroup - wg.Add(1) - - nodesDone := make(map[string]bool, nodesCount) - go func() { - for { - select { - case nodeDone := <-messageReader.msgProcessingDone: - nodesDone[nodeDone] = true - cluster.StopNode(nodeDone) - if len(nodesDone) == nodesCount { - wg.Done() - return - } - default: - continue - } - } - }() - - cluster.Start() - wg.Wait() - cluster.Stop() + nodeExecutionHandler.startActionSimulation(cluster) + nodeExecutionHandler.stopActionSimulation(cluster) rmc.UI.Info("Done with execution") if err = replayMessagesNotifier.CloseFile(); err != nil { @@ -121,7 +101,8 @@ func (rmc *ReplayMessageCommand) Run(args []string) int { // NewFlagSet implements the FuzzCLICommand interface and creates a new flag set for command arguments func (rmc *ReplayMessageCommand) NewFlagSet() *flag.FlagSet { flagSet := flag.NewFlagSet("replay-messages", flag.ContinueOnError) - flagSet.StringVar(&rmc.filePath, "file", "", "Full path to .flow file containing messages and timeouts to be replayed by the fuzz framework") + flagSet.StringVar(&rmc.messagesFilePath, "messagesFile", "", "Full path to .flow file containing messages and timeouts to be replayed by the fuzz framework") + flagSet.StringVar(&rmc.metaDataFilePath, "metaDataFile", "", "Full path to .flow file containing meta data for nodes and fuzz actions") return flagSet } @@ -134,10 +115,16 @@ func (rmc *ReplayMessageCommand) validateInput(args []string) error { return err } - if rmc.filePath == "" { - err = errors.New("provided file path is empty") + if rmc.messagesFilePath == "" { + err = errors.New("provided messages file path is empty") return err } + + if rmc.messagesFilePath == "" { + err = errors.New("provided meta data file path is empty") + return err + } + return nil } diff --git a/e2e/fuzz/replay/replay_message_persister.go b/e2e/fuzz/replay/replay_message_persister.go index 9697c38e..4c438512 100644 --- a/e2e/fuzz/replay/replay_message_persister.go +++ b/e2e/fuzz/replay/replay_message_persister.go @@ -4,31 +4,60 @@ import ( "bufio" "encoding/json" "fmt" + "log" "os" "path/filepath" + "strings" "sync" "time" + + "github.com/0xPolygon/pbft-consensus/e2e" ) const directoryPath = "../SavedState" +// messagePersister is an interface for storing messages, node and action meta data to .flow files +type messagePersister interface { + saveMetaData(nodeNames []string, lastSequences []*e2e.MetaData) error + saveCachedMessages() error + addAction(action *e2e.MetaData) + addMessage(message *ReplayMessage) + closeFile() error +} + +// defaultMessagePersister is a default implementation of messagePersister interface +type defaultMessagePersister struct{} + +func (d *defaultMessagePersister) saveMetaData(nodeNames []string, lastSequences []*e2e.MetaData) error { + return nil +} +func (d *defaultMessagePersister) saveCachedMessages() error { return nil } +func (d *defaultMessagePersister) addAction(action *e2e.MetaData) {} +func (d *defaultMessagePersister) addMessage(message *ReplayMessage) {} +func (d *defaultMessagePersister) closeFile() error { return nil } + // replayMessagePersister encapsulates logic for saving messages in .flow file type replayMessagePersister struct { - lock sync.Mutex - messages []*ReplayMessage - file *os.File + lock sync.Mutex + timestamp string + messages []*ReplayMessage + messagesFile *os.File + metaDataFile *os.File + actions []*e2e.MetaData } // saveMetaData saves node meta data to .flow file -func (r *replayMessagePersister) saveMetaData(nodeNames *[]string) error { +func (r *replayMessagePersister) saveMetaData(nodeNames []string, lastSequences []*e2e.MetaData) error { var err error - if err = r.createFile(); err != nil { - return err + var file *os.File + if r.metaDataFile == nil { + if file, err = r.createFile(MetaDataFilePrefix); err != nil { + return err + } + r.metaDataFile = file } - bufWriter := bufio.NewWriter(r.file) - defer bufWriter.Flush() - + bufWriter := bufio.NewWriterSize(r.metaDataFile, maxCharactersPerLine) currentRawMessage, err := json.Marshal(nodeNames) if err != nil { return err @@ -39,7 +68,16 @@ func (r *replayMessagePersister) saveMetaData(nodeNames *[]string) error { return err } - _, err = bufWriter.Write([]byte("\n")) + bufWriter.Write([]byte("\n")) + + raw, err := e2e.ConvertActionsToByteArrays(lastSequences) + if err != nil { + log.Printf("[ERROR] An error occurred while converting last sequences to byte arrays") + } else { + r.writeToFile(raw, bufWriter) + } + + r.saveActions(bufWriter) return err } @@ -50,48 +88,27 @@ func (r *replayMessagePersister) saveCachedMessages() error { defer r.lock.Unlock() var err error - if err = r.createFile(); err != nil { - return err + var file *os.File + if r.messagesFile == nil { + if file, err = r.createFile(MessagesFilePrefix); err != nil { + return err + } + r.messagesFile = file } if r.messages != nil { - err = r.saveMessages(r.file) + err = r.saveMessages() } return err } -// createFile creates a .flow file to save messages and timeouts on the predifined location -func (r *replayMessagePersister) createFile() error { - if r.file == nil { - if _, err := os.Stat(directoryPath); os.IsNotExist(err) { - err := os.Mkdir(directoryPath, 0777) - if err != nil { - return err - } - } - - path, err := filepath.Abs(directoryPath) - if err != nil { - return err - } - - file, err := os.OpenFile(filepath.Join(path, fmt.Sprintf("%v_%v.flow", FileName, time.Now().Format(time.RFC3339))), os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660) - if err != nil { - return err - } - r.file = file - } - - return nil -} +// addAction adds an action that happened in fuzz to cache that will be saved to .flow file on cluster Stop +func (r *replayMessagePersister) addAction(action *e2e.MetaData) { + r.lock.Lock() + defer r.lock.Unlock() -// closeFile closes file created by the ReplayMessagesHandler if it is open -func (r *replayMessagePersister) closeFile() error { - if r.file != nil { - return r.file.Close() - } - return nil + r.actions = append(r.actions, action) } // addMessage adds a message from sequence to messages cache that will be written to .flow file @@ -102,17 +119,39 @@ func (r *replayMessagePersister) addMessage(message *ReplayMessage) { } // saveMessages saves ReplayMessages to the JSON file within the pre-defined directory. -func (r *replayMessagePersister) saveMessages(fileWriter *os.File) error { - rawMessages, err := ConvertToByteArrays(r.messages) +func (r *replayMessagePersister) saveMessages() error { + rawMessages, err := ConvertMessagesToByteArrays(r.messages) if err != nil { return err } - bufWriter := bufio.NewWriterSize(fileWriter, maxCharactersPerLine) - defer bufWriter.Flush() + bufWritter := bufio.NewWriterSize(r.messagesFile, maxCharactersPerLine) + r.writeToFile(rawMessages, bufWritter) + r.messages = nil - for _, rawMessage := range rawMessages { - _, err = bufWriter.Write(rawMessage) + return nil +} + +// saveActions saves cached actions to .flow file +func (r *replayMessagePersister) saveActions(bufWritter *bufio.Writer) { + rawActions, err := e2e.ConvertActionsToByteArrays(r.actions) + if err != nil { + log.Printf("[ERROR] An error occurred while converting actions to byte arrays") + return + } + + if len(rawActions) == 0 { //no actions were done in fuzz + return + } + + r.writeToFile(rawActions, bufWritter) +} + +// writeToFile writes that to designated file using bufio writer +func (r *replayMessagePersister) writeToFile(data [][]byte, bufWriter *bufio.Writer) error { + + for _, rawMessage := range data { + _, err := bufWriter.Write(rawMessage) if err != nil { return err } @@ -123,6 +162,54 @@ func (r *replayMessagePersister) saveMessages(fileWriter *os.File) error { } } - r.messages = nil + bufWriter.Flush() + return nil +} + +// createFile creates a .flow file to save messages and timeouts on the predifined location +func (r *replayMessagePersister) createFile(filePrefix string) (*os.File, error) { + if _, err := os.Stat(directoryPath); os.IsNotExist(err) { + err := os.Mkdir(directoryPath, 0777) + if err != nil { + return nil, err + } + } + + path, err := filepath.Abs(directoryPath) + if err != nil { + return nil, err + } + + if r.timestamp == "" { + r.timestamp = time.Now().Format(time.RFC3339) + } + + file, err := os.OpenFile(filepath.Join(path, fmt.Sprintf("%v_%v.flow", filePrefix, r.timestamp)), os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660) + if err != nil { + return nil, err + } + + return file, err +} + +// closeFile closes file created by the ReplayMessagesHandler if it is open +func (r *replayMessagePersister) closeFile() error { + var errStrings []string + if r.messagesFile != nil { + if err := r.messagesFile.Close(); err != nil { + errStrings = append(errStrings, err.Error()) + } + } + + if r.metaDataFile != nil { + if err := r.metaDataFile.Close(); err != nil { + errStrings = append(errStrings, err.Error()) + } + } + + if len(errStrings) > 0 { + return fmt.Errorf(strings.Join(errStrings, "\n")) + } + return nil } diff --git a/e2e/fuzz/replay/replay_message_reader.go b/e2e/fuzz/replay/replay_message_reader.go index c866dfa8..d9438950 100644 --- a/e2e/fuzz/replay/replay_message_reader.go +++ b/e2e/fuzz/replay/replay_message_reader.go @@ -4,9 +4,10 @@ import ( "bufio" "encoding/json" "errors" + "fmt" "log" "os" - "sync" + "strings" "github.com/0xPolygon/pbft-consensus" "github.com/0xPolygon/pbft-consensus/e2e" @@ -14,84 +15,120 @@ import ( const ( maxCharactersPerLine = 2048 * 1024 // Increase Scanner buffer size to 2MB per line - messageChunkSize = 200 + messageChunkSize = 200 // Size of a chunk of messages being loaded from .flow file ) -type sequenceMessages struct { - sequence uint64 - messages []*pbft.MessageReq -} - // replayMessageReader encapsulates logic for reading messages from flow file type replayMessageReader struct { - lock sync.Mutex - file *os.File - scanner *bufio.Scanner - msgProcessingDone chan string - nodesDoneWithExecution map[pbft.NodeID]bool - lastSequenceMessages map[pbft.NodeID]*sequenceMessages - prePrepareMessages map[uint64]*pbft.MessageReq + messagesFile *os.File + metaDataFile *os.File + prePrepareMessages map[uint64]*pbft.MessageReq } -// openFile opens the file on provided location -func (r *replayMessageReader) openFile(filePath string) error { - _, err := os.Stat(filePath) +// openFiles opens the .flow files on provided location +func (r *replayMessageReader) openFiles(messagesFilePath, metaDataFilePath string) error { + _, err := os.Stat(messagesFilePath) if err != nil { return err } - r.file, err = os.Open(filePath) + r.messagesFile, err = os.Open(messagesFilePath) if err != nil { return err } - r.scanner = bufio.NewScanner(r.file) + _, err = os.Stat(metaDataFilePath) + if err != nil { + return err + } - buffer := []byte{} - r.scanner.Buffer(buffer, maxCharactersPerLine) + r.metaDataFile, err = os.Open(metaDataFilePath) + if err != nil { + return err + } return nil } -// closeFile closes the opened .flow file +// closeFile closes the opened .flow files func (r *replayMessageReader) closeFile() error { - if r.file != nil { - return r.file.Close() + var errors []string + if r.messagesFile != nil { + if err := r.messagesFile.Close(); err != nil { + errors = append(errors, err.Error()) + } } + + if r.metaDataFile != nil { + if err := r.metaDataFile.Close(); err != nil { + errors = append(errors, err.Error()) + } + } + + if len(errors) > 0 { + return fmt.Errorf(strings.Join(errors, "\n")) + } + return nil } -// readNodeMetaData reads the first line of .flow file which should be a list of nodes -func (r *replayMessageReader) readNodeMetaData() ([]string, error) { +// readNodeMetaData reads the node names and action from metaData.flow file +func (r *replayMessageReader) readNodeMetaData(nodeExecutionHandler *replayNodeExecutionHandler) ([]string, error) { + scanner := bufio.NewScanner(r.metaDataFile) + buffer := []byte{} + scanner.Buffer(buffer, maxCharactersPerLine) + var nodeNames []string - r.scanner.Scan() // first line carries the node names needed to create appropriate number of nodes for replay - err := json.Unmarshal(r.scanner.Bytes(), &nodeNames) + scanner.Scan() // first line carries the node names needed to create appropriate number of nodes for replay + err := json.Unmarshal(scanner.Bytes(), &nodeNames) + nodeCount := len(nodeNames) if err != nil { return nil, err - } else if len(nodeNames) == 0 { + } else if nodeCount == 0 { err = errors.New("no nodes were found in .flow file, so no cluster will be started") + return nil, err + } + + nodeExecutionHandler.dropNodeActions = make(map[pbft.NodeID]map[uint64]*e2e.MetaData, nodeCount) + nodeExecutionHandler.revertDropNodeActions = make(map[pbft.NodeID]map[uint64]*e2e.MetaData, nodeCount) + nodeExecutionHandler.lastSequencesByNode = make(map[pbft.NodeID]*e2e.MetaData, nodeCount) + + for _, name := range nodeNames { + nodeExecutionHandler.dropNodeActions[pbft.NodeID(name)] = make(map[uint64]*e2e.MetaData) + nodeExecutionHandler.revertDropNodeActions[pbft.NodeID(name)] = make(map[uint64]*e2e.MetaData) + } + + for scanner.Scan() { + var action *e2e.MetaData + if err := json.Unmarshal(scanner.Bytes(), &action); err != nil { + log.Printf("[ERROR] Error happened on unmarshalling action in .flow file. Reason: %v", err) + return nodeNames, err + } + + switch action.DataType { + case e2e.DropNode: + nodeExecutionHandler.dropNodeActions[pbft.NodeID(action.Data)][action.Sequence] = action + case e2e.RevertDropNode: + nodeExecutionHandler.revertDropNodeActions[pbft.NodeID(action.Data)][action.Sequence] = action + case e2e.LastSequence: + nodeExecutionHandler.lastSequencesByNode[pbft.NodeID(action.Data)] = action + default: + panic("unknown action data type read from metaData.flow file") + } } return nodeNames, err } // readMessages reads messages from open .flow file and pushes them to appropriate nodes -func (r *replayMessageReader) readMessages(cluster *e2e.Cluster) { +func (r *replayMessageReader) readMessages(cluster *e2e.Cluster, nodeExecutionHandler *replayNodeExecutionHandler) { nodes := cluster.GetNodesMap() - nodesCount := len(nodes) - r.nodesDoneWithExecution = make(map[pbft.NodeID]bool, nodesCount) - r.lastSequenceMessages = make(map[pbft.NodeID]*sequenceMessages, nodesCount) - r.prePrepareMessages = make(map[uint64]*pbft.MessageReq) - messagesChannel := make(chan []*ReplayMessage) doneChannel := make(chan struct{}) + r.prePrepareMessages = make(map[uint64]*pbft.MessageReq) r.startChunkReading(messagesChannel, doneChannel) - nodeMessages := make(map[pbft.NodeID]map[uint64][]*pbft.MessageReq, nodesCount) - for _, n := range nodes { - nodeMessages[pbft.NodeID(n.GetName())] = make(map[uint64][]*pbft.MessageReq) - } isDone := false LOOP: @@ -101,11 +138,9 @@ LOOP: for _, message := range messages { node, exists := nodes[string(message.To)] if !exists { - log.Printf("[WARNING] Could not find node: %v to push message from .flow file.\n", message.To) + log.Printf("[WARNING] Could not find node: %v to push message from .flow file", message.To) } else { node.PushMessageInternal(message.Message) - nodeMessages[message.To][message.Message.View.Sequence] = append(nodeMessages[message.To][message.Message.View.Sequence], message.Message) - if !isTimeoutMessage(message.Message) && message.Message.Type == pbft.MessageReq_Preprepare { if _, isPrePrepareAdded := r.prePrepareMessages[message.Message.View.Sequence]; !isPrePrepareAdded { r.prePrepareMessages[message.Message.View.Sequence] = message.Message @@ -114,20 +149,6 @@ LOOP: } } case <-doneChannel: - for name, n := range nodeMessages { - nodeLastSequence := uint64(0) - for sequence := range n { - if nodeLastSequence < sequence { - nodeLastSequence = sequence - } - } - - r.lastSequenceMessages[name] = &sequenceMessages{ - sequence: nodeLastSequence, - messages: nodeMessages[name][nodeLastSequence], - } - } - isDone = true break LOOP } @@ -137,12 +158,16 @@ LOOP: // startChunkReading reads messages from .flow file in chunks func (r *replayMessageReader) startChunkReading(messagesChannel chan []*ReplayMessage, doneChannel chan struct{}) { go func() { + scanner := bufio.NewScanner(r.messagesFile) + buffer := []byte{} + scanner.Buffer(buffer, maxCharactersPerLine) + messages := make([]*ReplayMessage, 0) i := 0 - for r.scanner.Scan() { + for scanner.Scan() { var message *ReplayMessage - if err := json.Unmarshal(r.scanner.Bytes(), &message); err != nil { - log.Printf("[ERROR] Error happened on unmarshalling a message in .flow file. Reason: %v.\n", err) + if err := json.Unmarshal(scanner.Bytes(), &message); err != nil { + log.Printf("[ERROR] Error happened on unmarshalling a message in .flow file. Reason: %v", err) return } @@ -164,44 +189,3 @@ func (r *replayMessageReader) startChunkReading(messagesChannel chan []*ReplayMe doneChannel <- struct{}{} }() } - -// checkIfDoneWithExecution checks if node finished with processing all the messages from .flow file -func (r *replayMessageReader) checkIfDoneWithExecution(validatorId pbft.NodeID, msg *pbft.MessageReq) { - if msg.View.Sequence > r.lastSequenceMessages[validatorId].sequence || - (msg.View.Sequence == r.lastSequenceMessages[validatorId].sequence && r.areMessagesFromLastSequenceProcessed(msg, validatorId)) { - r.lock.Lock() - if _, isDone := r.nodesDoneWithExecution[validatorId]; !isDone { - r.nodesDoneWithExecution[validatorId] = true - r.msgProcessingDone <- string(validatorId) - } - r.lock.Unlock() - } -} - -// areMessagesFromLastSequenceProcessed checks if all the messages from the last sequence of given node are processed so that the node can be stoped -func (r *replayMessageReader) areMessagesFromLastSequenceProcessed(msg *pbft.MessageReq, validatorId pbft.NodeID) bool { - lastSequenceMessages := r.lastSequenceMessages[validatorId] - - lastSequenceMessagesCount := len(lastSequenceMessages.messages) - if lastSequenceMessagesCount > 0 { - messageIndexToRemove := -1 - for i, message := range lastSequenceMessages.messages { - if msg.Equal(message) { - messageIndexToRemove = i - break - } - } - - if messageIndexToRemove != -1 { - lastSequenceMessages.messages = append(lastSequenceMessages.messages[:messageIndexToRemove], lastSequenceMessages.messages[messageIndexToRemove+1:]...) - lastSequenceMessagesCount = len(lastSequenceMessages.messages) - } - } - - return lastSequenceMessagesCount == 0 -} - -// isTimeoutMessage checks if message in .flow file represents a timeout -func isTimeoutMessage(message *pbft.MessageReq) bool { - return message.Hash == nil && message.Proposal == nil && message.Seal == nil && message.From == "" -} diff --git a/e2e/fuzz/replay/replay_messages_notifier.go b/e2e/fuzz/replay/replay_messages_notifier.go index 3c930c9e..83899869 100644 --- a/e2e/fuzz/replay/replay_messages_notifier.go +++ b/e2e/fuzz/replay/replay_messages_notifier.go @@ -1,17 +1,21 @@ package replay import ( + "time" + "github.com/0xPolygon/pbft-consensus" + "github.com/0xPolygon/pbft-consensus/e2e" ) const ( - FileName = "messages" + MessagesFilePrefix = "messages" + MetaDataFilePrefix = "metaData" ) // ReplayMessagesNotifier is a struct that implements ReplayNotifier interface type ReplayMessagesNotifier struct { - messagePersister *replayMessagePersister - messageReader *replayMessageReader + messagePersister messagePersister + nodeExectionHandler *replayNodeExecutionHandler } // NewReplayMessagesNotifierWithPersister creates a new messages notifier with messages persister (required when fuzz-run is executed to save messages to file) @@ -21,17 +25,17 @@ func NewReplayMessagesNotifierWithPersister() *ReplayMessagesNotifier { } } -// NewReplayMessagesNotifierWithReader creates a new messages notifier with messages reader (required when replay-messages is executed to read messages from file) -func NewReplayMessagesNotifierWithReader(r *replayMessageReader) *ReplayMessagesNotifier { +// NewReplayMessagesNotifierForReplay creates a new messages notifier with plugins needed for replay +func NewReplayMessagesNotifierForReplay(nodeExecutionHandler *replayNodeExecutionHandler) *ReplayMessagesNotifier { return &ReplayMessagesNotifier{ - messageReader: r, - messagePersister: &replayMessagePersister{}, + messagePersister: &defaultMessagePersister{}, + nodeExectionHandler: nodeExecutionHandler, } } // SaveMetaData saves node meta data to .flow file -func (r *ReplayMessagesNotifier) SaveMetaData(nodeNames *[]string) error { - return r.messagePersister.saveMetaData(nodeNames) +func (r *ReplayMessagesNotifier) SaveMetaData(nodeNames []string, lastSequences []*e2e.MetaData) error { + return r.messagePersister.saveMetaData(nodeNames, lastSequences) } // SaveState saves currently cached messages and timeouts to .flow file @@ -50,14 +54,18 @@ func (r *ReplayMessagesNotifier) HandleTimeout(to pbft.NodeID, msgType pbft.MsgT } // ReadNextMessage is an implementation of StateNotifier interface -func (r *ReplayMessagesNotifier) ReadNextMessage(p *pbft.Pbft) (*pbft.MessageReq, []*pbft.MessageReq) { +func (r *ReplayMessagesNotifier) ReadNextMessage(p *pbft.Pbft, timeoutChannel chan time.Time) (*pbft.MessageReq, []*pbft.MessageReq) { msg, discards := p.ReadMessageWithDiscards() - if r.messageReader != nil && msg != nil { - if isTimeoutMessage(msg) { - return nil, nil + if r.nodeExectionHandler != nil { + validatorId := p.GetValidatorId() + if msg != nil { + if r.nodeExectionHandler.checkIsTimeout(validatorId, msg, timeoutChannel) { + return nil, nil + } } else { - r.messageReader.checkIfDoneWithExecution(p.GetValidatorId(), msg) + // if node has no more messages for given round and sequence in queue it was dropped in fuzz + r.nodeExectionHandler.checkIfShouldDrop(validatorId, p.GetCurrentView(), timeoutChannel) } } @@ -68,3 +76,13 @@ func (r *ReplayMessagesNotifier) ReadNextMessage(p *pbft.Pbft) (*pbft.MessageReq func (r *ReplayMessagesNotifier) CloseFile() error { return r.messagePersister.closeFile() } + +// CreateTimeoutChannel is an implementation of StateNotifier interface +func (r *ReplayMessagesNotifier) CreateTimeoutChannel(timeout time.Duration) chan time.Time { + return make(chan time.Time) +} + +// HandleAction is an implementation of ReplayMessageNotifier interface +func (r *ReplayMessagesNotifier) HandleAction(action *e2e.MetaData) { + r.messagePersister.addAction(action) +} diff --git a/e2e/fuzz/replay/replay_node_execution.go b/e2e/fuzz/replay/replay_node_execution.go new file mode 100644 index 00000000..b3b24d74 --- /dev/null +++ b/e2e/fuzz/replay/replay_node_execution.go @@ -0,0 +1,153 @@ +package replay + +import ( + "fmt" + "log" + "sync" + "time" + + "github.com/0xPolygon/pbft-consensus" + "github.com/0xPolygon/pbft-consensus/e2e" +) + +// replayNodeExecutionHandler handles the simulation of actions and +type replayNodeExecutionHandler struct { + actionLock sync.Mutex + nodesExecutionLock sync.Mutex + lastSequencesByNode map[pbft.NodeID]*e2e.MetaData + dropNodeActions map[pbft.NodeID]map[uint64]*e2e.MetaData + revertDropNodeActions map[pbft.NodeID]map[uint64]*e2e.MetaData + nodesDoneWithExection map[pbft.NodeID]bool + dropedNodes map[pbft.NodeID]uint64 + cluster *e2e.Cluster + msgExecutionDone chan string + dropNode chan string +} + +// NewNodeExecutionHandler creates a new replayNodeExecutionHandler +func NewNodeExecutionHandler() *replayNodeExecutionHandler { + return &replayNodeExecutionHandler{ + dropedNodes: make(map[pbft.NodeID]uint64), + msgExecutionDone: make(chan string), + dropNode: make(chan string), + } +} + +// startActionSimulation starts the cluster and simulates fuzz actions in replay that were saved in meta data file +func (r *replayNodeExecutionHandler) startActionSimulation(cluster *e2e.Cluster) { + r.cluster = cluster + revertTicker := time.NewTicker(10 * time.Millisecond) + nodesCount := len(cluster.GetNodes()) + r.nodesDoneWithExection = make(map[pbft.NodeID]bool, nodesCount) + stoppedNodes := make(map[string]bool, nodesCount) + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + maxHeight := uint64(0) + for { + select { + case <-revertTicker.C: + currentMaxHeight := cluster.GetMaxHeight() + 1 + if maxHeight < currentMaxHeight { + maxHeight = currentMaxHeight + } + r.checkForNodesToBeRestarted(maxHeight, cluster) + case nodeDone := <-r.msgExecutionDone: + cluster.StopNode(nodeDone) + stoppedNodes[nodeDone] = true + if len(stoppedNodes) == nodesCount { + wg.Done() + return + } + case nodeToDrop := <-r.dropNode: + cluster.StopNode(nodeToDrop) + default: + continue + } + } + }() + + cluster.Start() + wg.Wait() +} + +// stopActionSimulation stops the cluster and replay execution +func (r *replayNodeExecutionHandler) stopActionSimulation(cluster *e2e.Cluster) { + cluster.Stop() +} + +// checkIfDoneWithExecution checks if node finished with processing all the messages from .flow file +func (r *replayNodeExecutionHandler) checkIfDoneWithExecution(validatorId pbft.NodeID, sequence, round uint64) bool { + lastSequence := r.lastSequencesByNode[validatorId] + if sequence > lastSequence.Sequence || + (sequence == lastSequence.Sequence && round == lastSequence.Round) { + r.nodesExecutionLock.Lock() + defer r.nodesExecutionLock.Unlock() + + if _, isDone := r.nodesDoneWithExection[validatorId]; !isDone { + r.nodesDoneWithExection[validatorId] = true + r.msgExecutionDone <- string(validatorId) + } + return true + } + + return false +} + +// checkForNodesToBeRestarted checks if any node need to be restarted when a certain sequence is reached +func (r *replayNodeExecutionHandler) checkForNodesToBeRestarted(sequence uint64, cluster *e2e.Cluster) { + r.actionLock.Lock() + defer r.actionLock.Unlock() + + for node, sequenceDroped := range r.dropedNodes { + if _, hasSequence := r.revertDropNodeActions[node][sequence]; hasSequence && sequence >= sequenceDroped { + log.Printf("[REPLAY] Restarting node: %v in sequence: %v.\n", node, sequence) + delete(r.dropedNodes, node) + cluster.StartNode(string(node)) + } + } +} + +// checkIsTimeout checks if a message is a timeout and triggers the timeout channel of provided validator +func (r *replayNodeExecutionHandler) checkIsTimeout(validatorId pbft.NodeID, msg *pbft.MessageReq, timeoutChannel chan time.Time) bool { + if isTimeoutMessage(msg) { + go func() { + log.Printf("[REPLAY] A timeout occurred in node: %v, sequence: %v, round: %v.\n", validatorId, msg.View.Sequence, msg.View.Round) + timeoutChannel <- time.Now() + }() + return true + } + + return false +} + +// checkIfShouldDrop checks if node should be dropped or stopped in given sequence and round +func (r *replayNodeExecutionHandler) checkIfShouldDrop(validatorId pbft.NodeID, view *pbft.View, timeoutChannel chan time.Time) { + if _, exists := r.dropNodeActions[validatorId][view.Sequence]; exists { + log.Printf("[REPLAY] Dropping node: %v in sequence: %v.\n", validatorId, view.Sequence) + r.nodesExecutionLock.Lock() + defer r.nodesExecutionLock.Unlock() + + if r.lastSequencesByNode[validatorId].Sequence <= view.Sequence { + // it is dropped in the last sequence, there is no messages for it after this point, so it is done with execution + if _, isDone := r.nodesDoneWithExection[validatorId]; !isDone { + r.nodesDoneWithExection[validatorId] = true + r.msgExecutionDone <- string(validatorId) + } + } else { + r.dropedNodes[validatorId] = view.Sequence + r.dropNode <- string(validatorId) + } + } else { + if !r.checkIfDoneWithExecution(validatorId, view.Sequence, view.Round) { + panic(fmt.Sprintf("node: %v is incorrect state in replay. It should be either dropped or done.", validatorId)) + } + } +} + +// isTimeoutMessage checks if message in .flow file represents a timeout +func isTimeoutMessage(message *pbft.MessageReq) bool { + return message.Hash == nil && message.Proposal == nil && message.Seal == nil && message.From == "" +} diff --git a/e2e/fuzz/runner.go b/e2e/fuzz/runner.go index 58cd220d..8d78ffc5 100644 --- a/e2e/fuzz/runner.go +++ b/e2e/fuzz/runner.go @@ -108,6 +108,8 @@ func validateNodes(c *e2e.Cluster) { for _, n := range c.Nodes() { log.Printf("Node: %v, running: %v, locked: %v, height: %v, proposal: %v\n", n.GetName(), n.IsRunning(), n.IsLocked(), n.GetNodeHeight(), n.GetProposal()) } + + c.SaveState() panic("Desired height not reached.") } log.Println("Cluster validation done.") diff --git a/state.go b/state.go index 2a644eeb..84a5b90f 100644 --- a/state.go +++ b/state.go @@ -382,7 +382,9 @@ type StateNotifier interface { // HandleTimeout notifies that a timeout occured while getting next message HandleTimeout(to NodeID, msgType MsgType, view *View) // ReadNextMessage reads the next message from message queue of the state machine - ReadNextMessage(p *Pbft) (*MessageReq, []*MessageReq) + ReadNextMessage(p *Pbft, timeoutChannel chan time.Time) (*MessageReq, []*MessageReq) + //CreateTimeoutChannel creates a channel on which timeout will be signaled + CreateTimeoutChannel(timeout time.Duration) chan time.Time } // DefaultStateNotifier is a null object implementation of StateNotifier interface @@ -393,6 +395,17 @@ type DefaultStateNotifier struct { func (d *DefaultStateNotifier) HandleTimeout(to NodeID, msgType MsgType, view *View) {} // ReadNextMessage is an implementation of StateNotifier interface -func (d *DefaultStateNotifier) ReadNextMessage(p *Pbft) (*MessageReq, []*MessageReq) { +func (d *DefaultStateNotifier) ReadNextMessage(p *Pbft, timeoutChannel chan time.Time) (*MessageReq, []*MessageReq) { return p.ReadMessageWithDiscards() } + +// CreateTimeoutChannel is an implementation of StateNotifier interface +func (d *DefaultStateNotifier) CreateTimeoutChannel(timeout time.Duration) chan time.Time { + timeoutChannel := make(chan time.Time) + + time.AfterFunc(timeout, func() { + timeoutChannel <- time.Now() + }) + + return timeoutChannel +}