Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster discoverability improvement - include addresses of the replicas #161

Merged
merged 1 commit into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions models/request/form_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ var _ (json.Marshaler) = (*FormCluster)(nil)
// FormCluster describes the `MessageFormCluster` request payload.
// It is sent on clustered execution of a request.
type FormCluster struct {
RequestID string `json:"request_id,omitempty"`
Peers []peer.ID `json:"peers,omitempty"`
Consensus consensus.Type `json:"consensus,omitempty"`
RequestID string `json:"request_id,omitempty"`
Peers []peer.ID `json:"peers,omitempty"`
Consensus consensus.Type `json:"consensus,omitempty"`
ConnectionInfo []peer.AddrInfo `json:"connection_info,omitempty"`
}

func (FormCluster) Type() string { return blockless.MessageFormCluster }
Expand Down
38 changes: 35 additions & 3 deletions node/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"sync"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/rs/zerolog/log"

Expand All @@ -25,6 +26,26 @@ func (n *Node) processFormCluster(ctx context.Context, from peer.ID, req request

n.log.Info().Str("request", req.RequestID).Strs("peers", blockless.PeerIDsToStr(req.Peers)).Str("consensus", req.Consensus.String()).Msg("received request to form consensus cluster")

// Add connection info about peers if we're not already connected to them.
for _, addrInfo := range req.ConnectionInfo {

if n.host.Host.ID() == addrInfo.ID {
continue
}

if n.host.Network().Connectedness(addrInfo.ID) == network.Connected {
continue
}

n.log.Debug().
Any("known", n.host.Network().Peerstore().Addrs(addrInfo.ID)).
Any("received", addrInfo.Addrs).
Stringer("peer", addrInfo.ID).
Msg("received addresses for fellow cluster replica")

n.host.Network().Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, ClusterAddressTTL)
}

switch req.Consensus {
case consensus.Raft:
return n.createRaftCluster(ctx, from, req)
Expand Down Expand Up @@ -72,9 +93,20 @@ func (n *Node) formCluster(ctx context.Context, requestID string, replicas []pee

// Create cluster formation request.
reqCluster := request.FormCluster{
RequestID: requestID,
Peers: replicas,
Consensus: consensus,
RequestID: requestID,
Peers: replicas,
Consensus: consensus,
ConnectionInfo: make([]peer.AddrInfo, 0, len(replicas)),
}

// Add connection info in case replicas don't already know of each other.
for _, replica := range replicas {
addrInfo := peer.AddrInfo{
ID: replica,
Addrs: n.host.Peerstore().Addrs(replica),
}

reqCluster.ConnectionInfo = append(reqCluster.ConnectionInfo, addrInfo)
}

// Request execution from peers.
Expand Down
5 changes: 2 additions & 3 deletions node/execution_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/rs/zerolog/log"

"github.com/blocklessnetwork/b7s/consensus/pbft"
"github.com/blocklessnetwork/b7s/models/execute"
Expand Down Expand Up @@ -52,13 +51,13 @@ func (n *Node) gatherExecutionResultsPBFT(ctx context.Context, requestID string,

pub, err := sender.ExtractPublicKey()
if err != nil {
log.Error().Err(err).Msg("could not derive public key from peer ID")
n.log.Error().Err(err).Msg("could not derive public key from peer ID")
return
}

err = er.VerifySignature(pub)
if err != nil {
log.Error().Err(err).Msg("could not verify signature of an execution response")
n.log.Error().Err(err).Msg("could not verify signature of an execution response")
return
}

Expand Down
2 changes: 2 additions & 0 deletions node/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const (
DefaultClusterFormationTimeout = 10 * time.Second
DefaultConcurrency = 10

ClusterAddressTTL = 30 * time.Minute

DefaultConsensusAlgorithm = consensus.Raft

DefaultAttributeLoadingSetting = false
Expand Down
2 changes: 1 addition & 1 deletion node/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (n *Node) Run(ctx context.Context) error {
continue
}

n.log.Trace().Str("topic", name).Str("peer", msg.ReceivedFrom.String()).Str("id", msg.ID).Msg("received message")
n.log.Trace().Str("topic", name).Str("peer", msg.ReceivedFrom.String()).Hex("id", []byte(msg.ID)).Msg("received message")

// Try to get a slot for processing the request.
n.sema <- struct{}{}
Expand Down
Loading