From 748c41539a039c96f79c1a34a5ec9ba23a4fbcdf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksandar=20=C4=8Cekrli=C4=87?= Date: Fri, 9 Aug 2024 14:50:12 +0200 Subject: [PATCH] On cluster formation, forward node address info to cluster nodes (#161) --- models/request/form_cluster.go | 7 ++++--- node/cluster.go | 38 +++++++++++++++++++++++++++++++--- node/execution_results.go | 5 ++--- node/params.go | 2 ++ node/run.go | 2 +- 5 files changed, 44 insertions(+), 10 deletions(-) diff --git a/models/request/form_cluster.go b/models/request/form_cluster.go index 40ad4772..0261e92a 100644 --- a/models/request/form_cluster.go +++ b/models/request/form_cluster.go @@ -14,9 +14,10 @@ var _ (json.Marshaler) = (*FormCluster)(nil) // FormCluster describes the `MessageFormCluster` request payload. // It is sent on clustered execution of a request. type FormCluster struct { - RequestID string `json:"request_id,omitempty"` - Peers []peer.ID `json:"peers,omitempty"` - Consensus consensus.Type `json:"consensus,omitempty"` + RequestID string `json:"request_id,omitempty"` + Peers []peer.ID `json:"peers,omitempty"` + Consensus consensus.Type `json:"consensus,omitempty"` + ConnectionInfo []peer.AddrInfo `json:"connection_info,omitempty"` } func (FormCluster) Type() string { return blockless.MessageFormCluster } diff --git a/node/cluster.go b/node/cluster.go index fe0cb7b4..4345b671 100644 --- a/node/cluster.go +++ b/node/cluster.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/rs/zerolog/log" @@ -25,6 +26,26 @@ func (n *Node) processFormCluster(ctx context.Context, from peer.ID, req request n.log.Info().Str("request", req.RequestID).Strs("peers", blockless.PeerIDsToStr(req.Peers)).Str("consensus", req.Consensus.String()).Msg("received request to form consensus cluster") + // Add connection info about peers if we're not already connected to them. + for _, addrInfo := range req.ConnectionInfo { + + if n.host.Host.ID() == addrInfo.ID { + continue + } + + if n.host.Network().Connectedness(addrInfo.ID) == network.Connected { + continue + } + + n.log.Debug(). + Any("known", n.host.Network().Peerstore().Addrs(addrInfo.ID)). + Any("received", addrInfo.Addrs). + Stringer("peer", addrInfo.ID). + Msg("received addresses for fellow cluster replica") + + n.host.Network().Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, ClusterAddressTTL) + } + switch req.Consensus { case consensus.Raft: return n.createRaftCluster(ctx, from, req) @@ -72,9 +93,20 @@ func (n *Node) formCluster(ctx context.Context, requestID string, replicas []pee // Create cluster formation request. reqCluster := request.FormCluster{ - RequestID: requestID, - Peers: replicas, - Consensus: consensus, + RequestID: requestID, + Peers: replicas, + Consensus: consensus, + ConnectionInfo: make([]peer.AddrInfo, 0, len(replicas)), + } + + // Add connection info in case replicas don't already know of each other. + for _, replica := range replicas { + addrInfo := peer.AddrInfo{ + ID: replica, + Addrs: n.host.Peerstore().Addrs(replica), + } + + reqCluster.ConnectionInfo = append(reqCluster.ConnectionInfo, addrInfo) } // Request execution from peers. diff --git a/node/execution_results.go b/node/execution_results.go index 6fcbcc46..a0286ca3 100644 --- a/node/execution_results.go +++ b/node/execution_results.go @@ -6,7 +6,6 @@ import ( "sync" "github.com/libp2p/go-libp2p/core/peer" - "github.com/rs/zerolog/log" "github.com/blocklessnetwork/b7s/consensus/pbft" "github.com/blocklessnetwork/b7s/models/execute" @@ -52,13 +51,13 @@ func (n *Node) gatherExecutionResultsPBFT(ctx context.Context, requestID string, pub, err := sender.ExtractPublicKey() if err != nil { - log.Error().Err(err).Msg("could not derive public key from peer ID") + n.log.Error().Err(err).Msg("could not derive public key from peer ID") return } err = er.VerifySignature(pub) if err != nil { - log.Error().Err(err).Msg("could not verify signature of an execution response") + n.log.Error().Err(err).Msg("could not verify signature of an execution response") return } diff --git a/node/params.go b/node/params.go index 8139a6aa..ef519628 100644 --- a/node/params.go +++ b/node/params.go @@ -15,6 +15,8 @@ const ( DefaultClusterFormationTimeout = 10 * time.Second DefaultConcurrency = 10 + ClusterAddressTTL = 30 * time.Minute + DefaultConsensusAlgorithm = consensus.Raft DefaultAttributeLoadingSetting = false diff --git a/node/run.go b/node/run.go index 95fb68d8..54444ee3 100644 --- a/node/run.go +++ b/node/run.go @@ -86,7 +86,7 @@ func (n *Node) Run(ctx context.Context) error { continue } - n.log.Trace().Str("topic", name).Str("peer", msg.ReceivedFrom.String()).Str("id", msg.ID).Msg("received message") + n.log.Trace().Str("topic", name).Str("peer", msg.ReceivedFrom.String()).Hex("id", []byte(msg.ID)).Msg("received message") // Try to get a slot for processing the request. n.sema <- struct{}{}