Skip to content

Commit

Permalink
On cluster formation, forward node address info to cluster nodes (#161)
Browse files Browse the repository at this point in the history
  • Loading branch information
Maelkum authored Aug 9, 2024
1 parent af0c229 commit 748c415
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 10 deletions.
7 changes: 4 additions & 3 deletions models/request/form_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ var _ (json.Marshaler) = (*FormCluster)(nil)
// FormCluster describes the `MessageFormCluster` request payload.
// It is sent on clustered execution of a request.
type FormCluster struct {
RequestID string `json:"request_id,omitempty"`
Peers []peer.ID `json:"peers,omitempty"`
Consensus consensus.Type `json:"consensus,omitempty"`
RequestID string `json:"request_id,omitempty"`
Peers []peer.ID `json:"peers,omitempty"`
Consensus consensus.Type `json:"consensus,omitempty"`
ConnectionInfo []peer.AddrInfo `json:"connection_info,omitempty"`
}

func (FormCluster) Type() string { return blockless.MessageFormCluster }
Expand Down
38 changes: 35 additions & 3 deletions node/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"sync"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/rs/zerolog/log"

Expand All @@ -25,6 +26,26 @@ func (n *Node) processFormCluster(ctx context.Context, from peer.ID, req request

n.log.Info().Str("request", req.RequestID).Strs("peers", blockless.PeerIDsToStr(req.Peers)).Str("consensus", req.Consensus.String()).Msg("received request to form consensus cluster")

// Add connection info about peers if we're not already connected to them.
for _, addrInfo := range req.ConnectionInfo {

if n.host.Host.ID() == addrInfo.ID {
continue
}

if n.host.Network().Connectedness(addrInfo.ID) == network.Connected {
continue
}

n.log.Debug().
Any("known", n.host.Network().Peerstore().Addrs(addrInfo.ID)).
Any("received", addrInfo.Addrs).
Stringer("peer", addrInfo.ID).
Msg("received addresses for fellow cluster replica")

n.host.Network().Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, ClusterAddressTTL)
}

switch req.Consensus {
case consensus.Raft:
return n.createRaftCluster(ctx, from, req)
Expand Down Expand Up @@ -72,9 +93,20 @@ func (n *Node) formCluster(ctx context.Context, requestID string, replicas []pee

// Create cluster formation request.
reqCluster := request.FormCluster{
RequestID: requestID,
Peers: replicas,
Consensus: consensus,
RequestID: requestID,
Peers: replicas,
Consensus: consensus,
ConnectionInfo: make([]peer.AddrInfo, 0, len(replicas)),
}

// Add connection info in case replicas don't already know of each other.
for _, replica := range replicas {
addrInfo := peer.AddrInfo{
ID: replica,
Addrs: n.host.Peerstore().Addrs(replica),
}

reqCluster.ConnectionInfo = append(reqCluster.ConnectionInfo, addrInfo)
}

// Request execution from peers.
Expand Down
5 changes: 2 additions & 3 deletions node/execution_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/rs/zerolog/log"

"github.com/blocklessnetwork/b7s/consensus/pbft"
"github.com/blocklessnetwork/b7s/models/execute"
Expand Down Expand Up @@ -52,13 +51,13 @@ func (n *Node) gatherExecutionResultsPBFT(ctx context.Context, requestID string,

pub, err := sender.ExtractPublicKey()
if err != nil {
log.Error().Err(err).Msg("could not derive public key from peer ID")
n.log.Error().Err(err).Msg("could not derive public key from peer ID")
return
}

err = er.VerifySignature(pub)
if err != nil {
log.Error().Err(err).Msg("could not verify signature of an execution response")
n.log.Error().Err(err).Msg("could not verify signature of an execution response")
return
}

Expand Down
2 changes: 2 additions & 0 deletions node/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const (
DefaultClusterFormationTimeout = 10 * time.Second
DefaultConcurrency = 10

ClusterAddressTTL = 30 * time.Minute

DefaultConsensusAlgorithm = consensus.Raft

DefaultAttributeLoadingSetting = false
Expand Down
2 changes: 1 addition & 1 deletion node/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (n *Node) Run(ctx context.Context) error {
continue
}

n.log.Trace().Str("topic", name).Str("peer", msg.ReceivedFrom.String()).Str("id", msg.ID).Msg("received message")
n.log.Trace().Str("topic", name).Str("peer", msg.ReceivedFrom.String()).Hex("id", []byte(msg.ID)).Msg("received message")

// Try to get a slot for processing the request.
n.sema <- struct{}{}
Expand Down

0 comments on commit 748c415

Please sign in to comment.