Skip to content

Commit

Permalink
adds ability for nodes to use a timeout and -1 for all nodes responde…
Browse files Browse the repository at this point in the history
…d in a group

Signed-off-by: Derek Anderson <[email protected]>
  • Loading branch information
dmikey committed Feb 8, 2024
1 parent d2a26af commit 6b6db44
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 4 deletions.
4 changes: 4 additions & 0 deletions models/execute/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type Config struct {

// NodeCount specifies how many nodes should execute this request.
NodeCount int `json:"number_of_nodes,omitempty"`

// When should the execution timeout
Timeout int `json:"timeout,omitempty"`

// Consensus algorithm to use. Raft and PBFT are supported at this moment.
ConsensusAlgorithm string `json:"consensus_algorithm,omitempty"`

Expand Down
4 changes: 2 additions & 2 deletions node/head_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (n *Node) headProcessExecute(ctx context.Context, from peer.ID, payload []b
// The returned map contains execution results, mapped to the peer IDs of peers who reported them.
func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Request, subgroup string) (codes.Code, execute.ResultMap, execute.Cluster, error) {

nodeCount := 1
nodeCount := -1
if req.Config.NodeCount > 1 {
nodeCount = req.Config.NodeCount
}
Expand All @@ -90,7 +90,7 @@ func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Re
log.Info().Msg("processing execution request")

// Phase 1. - Issue roll call to nodes.
reportingPeers, err := n.executeRollCall(ctx, requestID, req.FunctionID, nodeCount, consensusAlgo, subgroup, req.Config.Attributes)
reportingPeers, err := n.executeRollCall(ctx, requestID, req.FunctionID, nodeCount, consensusAlgo, subgroup, req.Config.Attributes, req.Config.Timeout)
if err != nil {
code := codes.Error
if errors.Is(err, blockless.ErrRollCallTimeout) {
Expand Down
19 changes: 17 additions & 2 deletions node/roll_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/libp2p/go-libp2p/core/peer"

Expand Down Expand Up @@ -110,6 +111,7 @@ func (n *Node) executeRollCall(
consensus consensus.Type,
topic string,
attributes *execute.Attributes,
timeout int,
) ([]peer.ID, error) {

// Create a logger with relevant context.
Expand All @@ -128,7 +130,12 @@ func (n *Node) executeRollCall(
log.Info().Msg("roll call published")

// Limit for how long we wait for responses.
tctx, exCancel := context.WithTimeout(ctx, n.cfg.RollCallTimeout)
t := n.cfg.RollCallTimeout
if(timeout > 0) {
t = time.Duration(timeout) * time.Second
}

tctx, exCancel := context.WithTimeout(ctx, t)
defer exCancel()

// Peers that have reported on roll call.
Expand All @@ -140,6 +147,12 @@ rollCallResponseLoop:
// Request timed out.
case <-tctx.Done():

// -1 means we'll take any peers reporting
if (len(reportingPeers) > 1 && nodeCount == -1) {
log.Info().Msg("enough peers reported for roll call")
break rollCallResponseLoop
}

log.Warn().Msg("roll call timed out")
return nil, blockless.ErrRollCallTimeout

Expand All @@ -161,7 +174,9 @@ rollCallResponseLoop:
log.Info().Str("peer", reply.From.String()).Msg("roll called peer chosen for execution")

reportingPeers = append(reportingPeers, reply.From)
if len(reportingPeers) >= nodeCount {

// -1 means we'll take any peers reporting
if len(reportingPeers) >= nodeCount && nodeCount != -1 {
log.Info().Msg("enough peers reported for roll call")
break rollCallResponseLoop
}
Expand Down

0 comments on commit 6b6db44

Please sign in to comment.