Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: utilize p2p.ExternalAddress properly for dialing #3581

Merged
merged 12 commits into from
Feb 7, 2025
47 changes: 37 additions & 10 deletions tm2/pkg/bft/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
"sync"
"time"

goErrors "errors"

"github.com/gnolang/gno/tm2/pkg/bft/appconn"
"github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/file"
"github.com/gnolang/gno/tm2/pkg/p2p/conn"
Expand Down Expand Up @@ -604,12 +602,10 @@
}

// Start the transport.
lAddr := n.config.P2P.ExternalAddress
if lAddr == "" {
lAddr = n.config.P2P.ListenAddress
}
// The listen address for the transport needs to be an address within reach of the machine NIC
listenAddress := p2pTypes.NetAddressString(n.nodeKey.ID(), n.config.P2P.ListenAddress)

addr, err := p2pTypes.NewNetAddressFromString(p2pTypes.NetAddressString(n.nodeKey.ID(), lAddr))
addr, err := p2pTypes.NewNetAddressFromString(listenAddress)
if err != nil {
return fmt.Errorf("unable to parse network address, %w", err)
}
Expand Down Expand Up @@ -903,7 +899,7 @@

nodeInfo := p2pTypes.NodeInfo{
VersionSet: vset,
PeerID: nodeKey.ID(),
NetAddress: nil, // The shared address depends on the configuration
Network: genDoc.ChainID,
Version: version.Version,
Channels: []byte{
Expand All @@ -918,13 +914,44 @@
},
}

// Make sure the discovery channel is shared with peers
// in case peer discovery is enabled
if config.P2P.PeerExchange {
nodeInfo.Channels = append(nodeInfo.Channels, discovery.Channel)
}

// Grab the supplied listen address.
// This address needs to be valid, but it can be unspecified.
// If the listen address is unspecified (port / IP unbound),
// then this address cannot be used by peers for dialing
addr, err := p2pTypes.NewNetAddressFromString(
p2pTypes.NetAddressString(nodeKey.ID(), config.P2P.ListenAddress),
)
if err != nil {
return p2pTypes.NodeInfo{}, fmt.Errorf("unable to parse network address, %w", err)
}

Check warning on line 932 in tm2/pkg/bft/node/node.go

View check run for this annotation

Codecov / codecov/patch

tm2/pkg/bft/node/node.go#L931-L932

Added lines #L931 - L932 were not covered by tests

// Use the transport listen address as the advertised address
nodeInfo.NetAddress = addr

// Prepare the advertised dial address (if any)
// for the node, which other peers can use to dial
if config.P2P.ExternalAddress != "" {
addr, err = p2pTypes.NewNetAddressFromString(
p2pTypes.NetAddressString(
nodeKey.ID(),
config.P2P.ExternalAddress,
),
)
if err != nil {
return p2pTypes.NodeInfo{}, fmt.Errorf("invalid p2p external address: %w", err)
}

Check warning on line 948 in tm2/pkg/bft/node/node.go

View check run for this annotation

Codecov / codecov/patch

tm2/pkg/bft/node/node.go#L940-L948

Added lines #L940 - L948 were not covered by tests

nodeInfo.NetAddress = addr

Check warning on line 950 in tm2/pkg/bft/node/node.go

View check run for this annotation

Codecov / codecov/patch

tm2/pkg/bft/node/node.go#L950

Added line #L950 was not covered by tests
}

// Validate the node info
err := nodeInfo.Validate()
if err != nil && !goErrors.Is(err, p2pTypes.ErrUnspecifiedIP) {
if err := nodeInfo.Validate(); err != nil {
return p2pTypes.NodeInfo{}, fmt.Errorf("unable to validate node info, %w", err)
}

Expand Down
14 changes: 7 additions & 7 deletions tm2/pkg/internal/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@
VersionSet: versionset.VersionSet{
versionset.VersionInfo{Name: "p2p", Version: "v0.0.0"},
},
PeerID: key.ID(),
Network: "testing",
Software: "p2ptest",
Version: "v1.2.3-rc.0-deadbeef",
Channels: cfg.Channels,
Moniker: fmt.Sprintf("node-%d", index),
NetAddress: addr,
Network: "testing",
Software: "p2ptest",
Version: "v1.2.3-rc.0-deadbeef",
Channels: cfg.Channels,
Moniker: fmt.Sprintf("node-%d", index),
Other: p2pTypes.NodeInfoOther{
TxIndex: "off",
RPCAddress: fmt.Sprintf("127.0.0.1:%d", 0),
Expand Down Expand Up @@ -231,7 +231,7 @@
func (mp *Peer) Send(_ byte, _ []byte) bool { return true }
func (mp *Peer) NodeInfo() p2pTypes.NodeInfo {
return p2pTypes.NodeInfo{
PeerID: mp.id,
NetAddress: mp.addr,

Check warning on line 234 in tm2/pkg/internal/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

tm2/pkg/internal/p2p/p2p.go#L234

Added line #L234 was not covered by tests
}
}
func (mp *Peer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} }
Expand Down
23 changes: 18 additions & 5 deletions tm2/pkg/p2p/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,15 @@

// Validate the message
if err := msg.ValidateBasic(); err != nil {
r.Logger.Error("unable to validate discovery message", "err", err)
r.Logger.Warn("unable to validate discovery message", "err", err)

return
}

switch msg := msg.(type) {
case *Request:
if err := r.handleDiscoveryRequest(peer); err != nil {
r.Logger.Error("unable to handle discovery request", "err", err)
r.Logger.Warn("unable to handle discovery request", "err", err)

Check warning on line 171 in tm2/pkg/p2p/discovery/discovery.go

View check run for this annotation

Codecov / codecov/patch

tm2/pkg/p2p/discovery/discovery.go#L171

Added line #L171 was not covered by tests
}
case *Response:
// Make the peers available for dialing on the switch
Expand All @@ -186,9 +186,21 @@
peers = make([]*types.NetAddress, 0, len(localPeers))
)

// Exclude the private peers from being shared
// Exclude the private peers from being shared,
// as well as peers who are not dialable
localPeers = slices.DeleteFunc(localPeers, func(p p2p.PeerConn) bool {
return p.IsPrivate()
var (
// Private peers are peers whose information is kept private to the node
privatePeer = p.IsPrivate()
// The reason we don't validate the net address with .Routable()
// is because of legacy logic that supports local loopbacks as advertised
// peer addresses. Introducing a .Routable() constraint will filter all
// local loopback addresses shared by peers, and will cause local deployments
// (and unit test deployments) to break and require additional setup
invalidDialAddress = p.NodeInfo().DialAddress().Validate() != nil
)

return privatePeer || invalidDialAddress
})

// Check if there is anything to share,
Expand All @@ -207,7 +219,8 @@
}

for _, p := range localPeers {
peers = append(peers, p.SocketAddr())
// Make sure only routable peers are shared
peers = append(peers, p.NodeInfo().DialAddress())
}

// Create the response, and marshal
Expand Down
6 changes: 3 additions & 3 deletions tm2/pkg/p2p/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestReactor_DiscoveryResponse(t *testing.T) {

slices.ContainsFunc(resp.Peers, func(addr *types.NetAddress) bool {
for _, localP := range peers {
if localP.SocketAddr().Equals(*addr) {
if localP.NodeInfo().DialAddress().Equals(*addr) {
return true
}
}
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestReactor_DiscoveryResponse(t *testing.T) {

slices.ContainsFunc(resp.Peers, func(addr *types.NetAddress) bool {
for _, localP := range peers {
if localP.SocketAddr().Equals(*addr) {
if localP.NodeInfo().DialAddress().Equals(*addr) {
return true
}
}
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestReactor_DiscoveryResponse(t *testing.T) {
peerAddrs := make([]*types.NetAddress, 0, len(peers))

for _, p := range peers {
peerAddrs = append(peerAddrs, p.SocketAddr())
peerAddrs = append(peerAddrs, p.NodeInfo().DialAddress())
}

// Prepare the message
Expand Down
2 changes: 1 addition & 1 deletion tm2/pkg/p2p/mock/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func GeneratePeers(t *testing.T, count int) []*Peer {
},
NodeInfoFn: func() types.NodeInfo {
return types.NodeInfo{
PeerID: key.ID(),
NetAddress: addr,
}
},
SocketAddrFn: func() *types.NetAddress {
Expand Down
2 changes: 1 addition & 1 deletion tm2/pkg/p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (p *peer) OnStop() {

// ID returns the peer's ID - the hex encoded hash of its pubkey.
func (p *peer) ID() types.ID {
return p.nodeInfo.PeerID
return p.nodeInfo.ID()
}

// NodeInfo returns a copy of the peer's NodeInfo.
Expand Down
4 changes: 3 additions & 1 deletion tm2/pkg/p2p/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ func TestPeer_Properties(t *testing.T) {
},
},
nodeInfo: types.NodeInfo{
PeerID: id,
NetAddress: &types.NetAddress{
ID: id,
},
},
connInfo: &ConnInfo{
Outbound: testCase.outbound,
Expand Down
61 changes: 40 additions & 21 deletions tm2/pkg/p2p/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,57 +407,76 @@
peersToDial = make([]*types.NetAddress, 0)
)

// Gather addresses of persistent peers that are missing or
// not already in the dial queue
sw.persistentPeers.Range(func(key, value any) bool {
var (
id = key.(types.ID)
addr = value.(*types.NetAddress)
)

// Check if the peer is part of the peer set
// or is scheduled for dialing
if peers.Has(id) || sw.dialQueue.Has(addr) {
return true
if !peers.Has(id) && !sw.dialQueue.Has(addr) {
peersToDial = append(peersToDial, addr)
}

peersToDial = append(peersToDial, addr)

return true
})

if len(peersToDial) == 0 {
// No persistent peers are missing
// No persistent peers need dialing
return
}

// Calculate the dial items
// Prepare dial items with the appropriate backoff
dialItems := make([]dial.Item, 0, len(peersToDial))
for _, p := range peersToDial {
item := getBackoffItem(p.ID)
for _, addr := range peersToDial {
item := getBackoffItem(addr.ID)

if item == nil {
dialItem := dial.Item{
Time: time.Now(),
Address: p,
}
// First attempt
now := time.Now()

dialItems = append(dialItems,
dial.Item{
Time: now,
Address: addr,
},
)

dialItems = append(dialItems, dialItem)
setBackoffItem(p.ID, &backoffItem{dialItem.Time, 0})
setBackoffItem(addr.ID, &backoffItem{
lastDialTime: now,
attempts: 0,
})

continue
}

setBackoffItem(p.ID, &backoffItem{
lastDialTime: time.Now().Add(
// Subsequent attempt: apply backoff
var (
attempts = item.attempts + 1
dialTime = time.Now().Add(

Check warning on line 457 in tm2/pkg/p2p/switch.go

View check run for this annotation

Codecov / codecov/patch

tm2/pkg/p2p/switch.go#L455-L457

Added lines #L455 - L457 were not covered by tests
calculateBackoff(
item.attempts,
time.Second,
10*time.Minute,
),
),
attempts: item.attempts + 1,
)
)

dialItems = append(dialItems,
dial.Item{
Time: dialTime,
Address: addr,
},
)

setBackoffItem(addr.ID, &backoffItem{
lastDialTime: dialTime,
attempts: attempts,

Check warning on line 475 in tm2/pkg/p2p/switch.go

View check run for this annotation

Codecov / codecov/patch

tm2/pkg/p2p/switch.go#L463-L475

Added lines #L463 - L475 were not covered by tests
})
}

// Add the peers to the dial queue
// Add these items to the dial queue
sw.dialItems(dialItems...)
}

Expand Down
2 changes: 1 addition & 1 deletion tm2/pkg/p2p/switch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ func TestMultiplexSwitch_DialPeers(t *testing.T) {
// as the transport (node)
p.NodeInfoFn = func() types.NodeInfo {
return types.NodeInfo{
PeerID: addr.ID,
NetAddress: &addr,
}
}

Expand Down
1 change: 1 addition & 0 deletions tm2/pkg/p2p/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func (mt *MultiplexTransport) Close() error {
}

mt.cancelFn()

return mt.listener.Close()
}

Expand Down
8 changes: 4 additions & 4 deletions tm2/pkg/p2p/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func TestMultiplexTransport_Accept(t *testing.T) {

ni := types.NodeInfo{
Network: network, // common network
PeerID: id,
NetAddress: na,
Version: "v1.0.0-rc.0",
Moniker: fmt.Sprintf("node-%d", index),
VersionSet: make(versionset.VersionSet, 0), // compatible version set
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestMultiplexTransport_Accept(t *testing.T) {

ni := types.NodeInfo{
Network: chainID,
PeerID: id,
NetAddress: na,
Version: "v1.0.0-rc.0",
Moniker: fmt.Sprintf("node-%d", index),
VersionSet: make(versionset.VersionSet, 0), // compatible version set
Expand Down Expand Up @@ -389,7 +389,7 @@ func TestMultiplexTransport_Accept(t *testing.T) {

ni := types.NodeInfo{
Network: network, // common network
PeerID: key.ID(),
NetAddress: na,
Version: "v1.0.0-rc.0",
Moniker: fmt.Sprintf("node-%d", index),
VersionSet: make(versionset.VersionSet, 0), // compatible version set
Expand Down Expand Up @@ -467,7 +467,7 @@ func TestMultiplexTransport_Accept(t *testing.T) {

ni := types.NodeInfo{
Network: network, // common network
PeerID: key.ID(),
NetAddress: na,
Version: "v1.0.0-rc.0",
Moniker: fmt.Sprintf("node-%d", index),
VersionSet: make(versionset.VersionSet, 0), // compatible version set
Expand Down
Loading
Loading