Skip to content

Commit

Permalink
Discard PBFT executions for too small clusters + fix PBFT result acco…
Browse files Browse the repository at this point in the history
…unting
  • Loading branch information
Maelkum committed Oct 10, 2024
1 parent 84eb6d3 commit ea3f0fa
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 5 deletions.
18 changes: 18 additions & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package consensus

import (
"fmt"
"strings"
)

// Type identifies consensus protocols suported by Blockless.
Expand Down Expand Up @@ -31,3 +32,20 @@ func (t Type) Valid() bool {
return false
}
}

func Parse(s string) (Type, error) {

if s == "" {
return 0, nil
}

switch strings.ToLower(s) {
case "raft":
return Raft, nil

case "pbft":
return PBFT, nil
}

return 0, fmt.Errorf("unknown consensus value (%s)", s)
}
19 changes: 19 additions & 0 deletions models/request/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package request

import (
"encoding/json"
"fmt"
"time"

"github.com/blocklessnetwork/b7s/consensus"
"github.com/blocklessnetwork/b7s/consensus/pbft"
"github.com/blocklessnetwork/b7s/models/blockless"
"github.com/blocklessnetwork/b7s/models/codes"
"github.com/blocklessnetwork/b7s/models/execute"
Expand Down Expand Up @@ -44,3 +47,19 @@ func (e Execute) MarshalJSON() ([]byte, error) {
}
return json.Marshal(rec)
}

func (e Execute) Valid() error {

c, err := consensus.Parse(e.Config.ConsensusAlgorithm)
if err != nil {
return fmt.Errorf("could not parse consensus algorithm: %w", err)
}

if c == consensus.PBFT &&
e.Config.NodeCount > 0 &&
e.Config.NodeCount < pbft.MinimumReplicaCount {
return fmt.Errorf("minimum %v nodes needed for PBFT consensus", pbft.MinimumReplicaCount)
}

return nil
}
7 changes: 6 additions & 1 deletion models/response/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Execute struct {
Signature string `json:"signature,omitempty"`

// Used to communicate the reason for failure to the user.
Message string `json:"message,omitempty"`
ErrorMessage string `json:"message,omitempty"`
}

func (e *Execute) WithResults(r execute.ResultMap) *Execute {
Expand All @@ -43,6 +43,11 @@ func (e *Execute) WithCluster(c execute.Cluster) *Execute {
return e
}

func (e *Execute) WithErrorMessage(err error) *Execute {
e.ErrorMessage = err.Error()
return e
}

func (Execute) Type() string { return blockless.MessageExecuteResponse }

func (e Execute) MarshalJSON() ([]byte, error) {
Expand Down
2 changes: 2 additions & 0 deletions node/execution_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ func (n *Node) gatherExecutionResultsPBFT(ctx context.Context, requestID string,
result.peers = append(result.peers, sender)
result.metadata[sender] = exres.Metadata

results[reskey] = result

if uint(len(result.peers)) >= count {
n.log.Info().Str("request", requestID).Int("peers", len(peers)).Uint("matching_results", count).Msg("have enough matching results")
exCancel()
Expand Down
13 changes: 11 additions & 2 deletions node/head_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ import (
// NOTE: head node typically receives execution requests from the REST API. This message handling is not cognizant of subgroups.
func (n *Node) headProcessExecute(ctx context.Context, from peer.ID, req request.Execute) error {

err := req.Valid()
if err != nil {
err = n.send(ctx, from, req.Response(codes.Invalid).WithErrorMessage(err))
if err != nil {
return fmt.Errorf("could not send response: %w", err)
}
return nil
}

requestID := newRequestID()

log := n.log.With().Str("request", req.RequestID).Str("peer", from.String()).Str("function", req.FunctionID).Logger()
Expand All @@ -35,7 +44,7 @@ func (n *Node) headProcessExecute(ctx context.Context, from peer.ID, req request
res := req.Response(code).WithResults(results).WithCluster(cluster)
// Communicate the reason for failure in these cases.
if errors.Is(err, blockless.ErrRollCallTimeout) || errors.Is(err, blockless.ErrExecutionNotEnoughNodes) {
res.Message = err.Error()
res.ErrorMessage = err.Error()
}

// Send the response, whatever it may be (success or failure).
Expand Down Expand Up @@ -70,7 +79,7 @@ func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Re
// Create a logger with relevant context.
log := n.log.With().Str("request", requestID).Str("function", req.FunctionID).Int("node_count", nodeCount).Logger()

consensusAlgo, err := parseConsensusAlgorithm(req.Config.ConsensusAlgorithm)
consensusAlgo, err := consensus.Parse(req.Config.ConsensusAlgorithm)
if err != nil {
log.Error().Str("value", req.Config.ConsensusAlgorithm).Str("default", n.cfg.DefaultConsensus.String()).Err(err).Msg("could not parse consensus algorithm from the user request, using default")
consensusAlgo = n.cfg.DefaultConsensus
Expand Down
9 changes: 7 additions & 2 deletions node/roll_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"

"github.com/blocklessnetwork/b7s/consensus"
"github.com/blocklessnetwork/b7s/consensus/pbft"
"github.com/blocklessnetwork/b7s/models/blockless"
"github.com/blocklessnetwork/b7s/models/codes"
"github.com/blocklessnetwork/b7s/models/execute"
Expand Down Expand Up @@ -94,7 +95,7 @@ func (n *Node) executeRollCall(
requestID string,
functionID string,
nodeCount int,
consensus consensus.Type,
consensusAlgo consensus.Type,
topic string,
attributes *execute.Attributes,
timeout int,
Expand All @@ -108,7 +109,7 @@ func (n *Node) executeRollCall(
n.rollCall.create(requestID)
defer n.rollCall.remove(requestID)

err := n.publishRollCall(ctx, requestID, functionID, consensus, topic, attributes)
err := n.publishRollCall(ctx, requestID, functionID, consensusAlgo, topic, attributes)
if err != nil {
return nil, fmt.Errorf("could not publish roll call: %w", err)
}
Expand Down Expand Up @@ -169,6 +170,10 @@ rollCallResponseLoop:
}
}

if consensusAlgo == consensus.PBFT && len(reportingPeers) < pbft.MinimumReplicaCount {
return nil, fmt.Errorf("not enough peers reported for PBFT consensus (have: %v, need: %v)", len(reportingPeers), pbft.MinimumReplicaCount)
}

return reportingPeers, nil
}

Expand Down

0 comments on commit ea3f0fa

Please sign in to comment.