From ea3f0fa185c27840e0e3def7bb8681409d841692 Mon Sep 17 00:00:00 2001 From: Maelkum Date: Thu, 10 Oct 2024 11:58:35 +0200 Subject: [PATCH] Discard PBFT executions for too small clusters + fix PBFT result accounting --- consensus/consensus.go | 18 ++++++++++++++++++ models/request/execute.go | 19 +++++++++++++++++++ models/response/execute.go | 7 ++++++- node/execution_results.go | 2 ++ node/head_execute.go | 13 +++++++++++-- node/roll_call.go | 9 +++++++-- 6 files changed, 63 insertions(+), 5 deletions(-) diff --git a/consensus/consensus.go b/consensus/consensus.go index faf387ef..05b4dad6 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -2,6 +2,7 @@ package consensus import ( "fmt" + "strings" ) // Type identifies consensus protocols suported by Blockless. @@ -31,3 +32,20 @@ func (t Type) Valid() bool { return false } } + +func Parse(s string) (Type, error) { + + if s == "" { + return 0, nil + } + + switch strings.ToLower(s) { + case "raft": + return Raft, nil + + case "pbft": + return PBFT, nil + } + + return 0, fmt.Errorf("unknown consensus value (%s)", s) +} diff --git a/models/request/execute.go b/models/request/execute.go index ba32c6a3..64be9cbc 100644 --- a/models/request/execute.go +++ b/models/request/execute.go @@ -2,8 +2,11 @@ package request import ( "encoding/json" + "fmt" "time" + "github.com/blocklessnetwork/b7s/consensus" + "github.com/blocklessnetwork/b7s/consensus/pbft" "github.com/blocklessnetwork/b7s/models/blockless" "github.com/blocklessnetwork/b7s/models/codes" "github.com/blocklessnetwork/b7s/models/execute" @@ -44,3 +47,19 @@ func (e Execute) MarshalJSON() ([]byte, error) { } return json.Marshal(rec) } + +func (e Execute) Valid() error { + + c, err := consensus.Parse(e.Config.ConsensusAlgorithm) + if err != nil { + return fmt.Errorf("could not parse consensus algorithm: %w", err) + } + + if c == consensus.PBFT && + e.Config.NodeCount > 0 && + e.Config.NodeCount < pbft.MinimumReplicaCount { + return fmt.Errorf("minimum %v nodes needed for PBFT consensus", pbft.MinimumReplicaCount) + } + + return nil +} diff --git a/models/response/execute.go b/models/response/execute.go index f2174439..52eb51ed 100644 --- a/models/response/execute.go +++ b/models/response/execute.go @@ -30,7 +30,7 @@ type Execute struct { Signature string `json:"signature,omitempty"` // Used to communicate the reason for failure to the user. - Message string `json:"message,omitempty"` + ErrorMessage string `json:"message,omitempty"` } func (e *Execute) WithResults(r execute.ResultMap) *Execute { @@ -43,6 +43,11 @@ func (e *Execute) WithCluster(c execute.Cluster) *Execute { return e } +func (e *Execute) WithErrorMessage(err error) *Execute { + e.ErrorMessage = err.Error() + return e +} + func (Execute) Type() string { return blockless.MessageExecuteResponse } func (e Execute) MarshalJSON() ([]byte, error) { diff --git a/node/execution_results.go b/node/execution_results.go index a0286ca3..7b232c39 100644 --- a/node/execution_results.go +++ b/node/execution_results.go @@ -89,6 +89,8 @@ func (n *Node) gatherExecutionResultsPBFT(ctx context.Context, requestID string, result.peers = append(result.peers, sender) result.metadata[sender] = exres.Metadata + results[reskey] = result + if uint(len(result.peers)) >= count { n.log.Info().Str("request", requestID).Int("peers", len(peers)).Uint("matching_results", count).Msg("have enough matching results") exCancel() diff --git a/node/head_execute.go b/node/head_execute.go index 6256eb37..d2a25232 100644 --- a/node/head_execute.go +++ b/node/head_execute.go @@ -21,6 +21,15 @@ import ( // NOTE: head node typically receives execution requests from the REST API. This message handling is not cognizant of subgroups. func (n *Node) headProcessExecute(ctx context.Context, from peer.ID, req request.Execute) error { + err := req.Valid() + if err != nil { + err = n.send(ctx, from, req.Response(codes.Invalid).WithErrorMessage(err)) + if err != nil { + return fmt.Errorf("could not send response: %w", err) + } + return nil + } + requestID := newRequestID() log := n.log.With().Str("request", req.RequestID).Str("peer", from.String()).Str("function", req.FunctionID).Logger() @@ -35,7 +44,7 @@ func (n *Node) headProcessExecute(ctx context.Context, from peer.ID, req request res := req.Response(code).WithResults(results).WithCluster(cluster) // Communicate the reason for failure in these cases. if errors.Is(err, blockless.ErrRollCallTimeout) || errors.Is(err, blockless.ErrExecutionNotEnoughNodes) { - res.Message = err.Error() + res.ErrorMessage = err.Error() } // Send the response, whatever it may be (success or failure). @@ -70,7 +79,7 @@ func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Re // Create a logger with relevant context. log := n.log.With().Str("request", requestID).Str("function", req.FunctionID).Int("node_count", nodeCount).Logger() - consensusAlgo, err := parseConsensusAlgorithm(req.Config.ConsensusAlgorithm) + consensusAlgo, err := consensus.Parse(req.Config.ConsensusAlgorithm) if err != nil { log.Error().Str("value", req.Config.ConsensusAlgorithm).Str("default", n.cfg.DefaultConsensus.String()).Err(err).Msg("could not parse consensus algorithm from the user request, using default") consensusAlgo = n.cfg.DefaultConsensus diff --git a/node/roll_call.go b/node/roll_call.go index bb111cfa..40b6b433 100644 --- a/node/roll_call.go +++ b/node/roll_call.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/blocklessnetwork/b7s/consensus" + "github.com/blocklessnetwork/b7s/consensus/pbft" "github.com/blocklessnetwork/b7s/models/blockless" "github.com/blocklessnetwork/b7s/models/codes" "github.com/blocklessnetwork/b7s/models/execute" @@ -94,7 +95,7 @@ func (n *Node) executeRollCall( requestID string, functionID string, nodeCount int, - consensus consensus.Type, + consensusAlgo consensus.Type, topic string, attributes *execute.Attributes, timeout int, @@ -108,7 +109,7 @@ func (n *Node) executeRollCall( n.rollCall.create(requestID) defer n.rollCall.remove(requestID) - err := n.publishRollCall(ctx, requestID, functionID, consensus, topic, attributes) + err := n.publishRollCall(ctx, requestID, functionID, consensusAlgo, topic, attributes) if err != nil { return nil, fmt.Errorf("could not publish roll call: %w", err) } @@ -169,6 +170,10 @@ rollCallResponseLoop: } } + if consensusAlgo == consensus.PBFT && len(reportingPeers) < pbft.MinimumReplicaCount { + return nil, fmt.Errorf("not enough peers reported for PBFT consensus (have: %v, need: %v)", len(reportingPeers), pbft.MinimumReplicaCount) + } + return reportingPeers, nil }