diff --git a/cmd/bee/cmd/cmd.go b/cmd/bee/cmd/cmd.go index 77a5d4bd6f7..2ae489880a8 100644 --- a/cmd/bee/cmd/cmd.go +++ b/cmd/bee/cmd/cmd.go @@ -33,6 +33,7 @@ const ( optionNameP2PAddr = "p2p-addr" optionNameNATAddr = "nat-addr" optionNameP2PWSEnable = "p2p-ws-enable" + optionNameP2PQUICEnable = "p2p-quic-enable" optionNameDebugAPIEnable = "debug-api-enable" optionNameDebugAPIAddr = "debug-api-addr" optionNameBootnodes = "bootnode" @@ -249,6 +250,7 @@ func (c *command) setAllFlags(cmd *cobra.Command) { cmd.Flags().String(optionNameAPIAddr, ":1633", "HTTP API listen address") cmd.Flags().String(optionNameP2PAddr, ":1634", "P2P listen address") cmd.Flags().String(optionNameNATAddr, "", "NAT exposed address") + cmd.Flags().Bool(optionNameP2PQUICEnable, true, "enable P2P QUIC transport") cmd.Flags().Bool(optionNameP2PWSEnable, false, "enable P2P WebSocket transport") cmd.Flags().StringSlice(optionNameBootnodes, []string{""}, "initial nodes to connect to") cmd.Flags().Bool(optionNameDebugAPIEnable, false, "enable debug HTTP API") diff --git a/cmd/bee/cmd/start.go b/cmd/bee/cmd/start.go index 504e1ab562d..920054e7188 100644 --- a/cmd/bee/cmd/start.go +++ b/cmd/bee/cmd/start.go @@ -303,6 +303,7 @@ func buildBeeNode(ctx context.Context, c *command, cmd *cobra.Command, logger lo DebugAPIAddr: debugAPIAddr, Addr: c.config.GetString(optionNameP2PAddr), NATAddr: c.config.GetString(optionNameNATAddr), + EnableQUIC: c.config.GetBool(optionNameP2PQUICEnable), EnableWS: c.config.GetBool(optionNameP2PWSEnable), WelcomeMessage: c.config.GetString(optionWelcomeMessage), Bootnodes: networkConfig.bootNodes, diff --git a/openapi/SwarmCommon.yaml b/openapi/SwarmCommon.yaml index ce3f988e3dc..c84095bc5a5 100644 --- a/openapi/SwarmCommon.yaml +++ b/openapi/SwarmCommon.yaml @@ -320,7 +320,18 @@ components: type: array nullable: false items: - $ref: "#/components/schemas/Address" + type: object + properties: + address: + $ref: "#/components/schemas/Address" + fullNode: + type: boolean + underlay: + type: array + nullable: true + items: + $ref: "SwarmCommon.yaml#/components/schemas/MultiAddress" + BlockListedPeers: type: array diff --git a/pkg/api/api.go b/pkg/api/api.go index 3ecea7ad000..3696a24830a 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethersphere/bee/pkg/accounting" + "github.com/ethersphere/bee/pkg/addressbook" "github.com/ethersphere/bee/pkg/auth" "github.com/ethersphere/bee/pkg/crypto" "github.com/ethersphere/bee/pkg/feeds" @@ -176,6 +177,7 @@ type Service struct { chequebook chequebook.Service pseudosettle settlement.Interface pingpong pingpong.Interface + addressBook addressbook.Getter batchStore postage.Storer stamperStore storage.Store @@ -247,6 +249,7 @@ type ExtraOptions struct { Steward steward.Interface SyncStatus func() (bool, error) NodeStatus *status.Service + AddressBook addressbook.Getter } func New( @@ -318,6 +321,7 @@ func (s *Service) Configure(signer crypto.Signer, auth auth.Authenticator, trace s.postageContract = e.PostageContract s.steward = e.Steward s.stakingContract = e.Staking + s.addressBook = e.AddressBook s.pingpong = e.Pingpong s.topologyDriver = e.TopologyDriver diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index 8655853474f..ddb9d8b78f8 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -24,6 +24,7 @@ import ( "github.com/ethereum/go-ethereum/common" accountingmock "github.com/ethersphere/bee/pkg/accounting/mock" + "github.com/ethersphere/bee/pkg/addressbook" "github.com/ethersphere/bee/pkg/api" "github.com/ethersphere/bee/pkg/auth" mockauth "github.com/ethersphere/bee/pkg/auth/mock" @@ -169,6 +170,8 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket. chequebook := chequebookmock.NewChequebook(o.ChequebookOpts...) ln := lightnode.NewContainer(o.Overlay) + addressBook := addressbook.New(statestore.NewStateStore()) + transaction := transactionmock.New(o.TransactionOpts...) storeRecipient := statestore.NewStateStore() @@ -199,6 +202,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket. SyncStatus: o.SyncStatus, Staking: o.StakingContract, NodeStatus: o.NodeStatus, + AddressBook: addressBook, } // By default bee mode is set to full mode. diff --git a/pkg/api/p2p_test.go b/pkg/api/p2p_test.go index e184906734a..336a4f7a1be 100644 --- a/pkg/api/p2p_test.go +++ b/pkg/api/p2p_test.go @@ -35,6 +35,7 @@ func TestAddresses(t *testing.T) { addresses := []multiaddr.Multiaddr{ mustMultiaddr(t, "/ip4/127.0.0.1/tcp/7071/p2p/16Uiu2HAmTBuJT9LvNmBiQiNoTsxE5mtNy6YG3paw79m94CRa9sRb"), mustMultiaddr(t, "/ip4/192.168.0.101/tcp/7071/p2p/16Uiu2HAmTBuJT9LvNmBiQiNoTsxE5mtNy6YG3paw79m94CRa9sRb"), + mustMultiaddr(t, "/ip4/127.0.0.1/udp/7071/quic-v1/p2p/16Uiu2HAmTBuJT9LvNmBiQiNoTsxE5mtNy6YG3paw79m94CRa9sRb"), } ethereumAddress := common.HexToAddress("abcd") diff --git a/pkg/api/peer.go b/pkg/api/peer.go index a6dfc8f7aeb..678a76a4f7d 100644 --- a/pkg/api/peer.go +++ b/pkg/api/peer.go @@ -79,8 +79,10 @@ func (s *Service) peerDisconnectHandler(w http.ResponseWriter, r *http.Request) // Peer holds information about a Peer. type Peer struct { - Address swarm.Address `json:"address"` - FullNode bool `json:"fullNode"` + Address swarm.Address `json:"address"` + Underlay multiaddr.Multiaddr `json:"underlay,omitempty"` + FullNode bool `json:"fullNode"` + Connections []multiaddr.Multiaddr `json:"connections,omitempty"` } type BlockListedPeer struct { @@ -99,7 +101,7 @@ type blockListedPeersResponse struct { func (s *Service) peersHandler(w http.ResponseWriter, _ *http.Request) { jsonhttp.OK(w, peersResponse{ - Peers: mapPeers(s.p2p.Peers()), + Peers: s.mapPeers(s.p2p.Peers()), }) } @@ -118,13 +120,22 @@ func (s *Service) blocklistedPeersHandler(w http.ResponseWriter, _ *http.Request }) } -func mapPeers(peers []p2p.Peer) (out []Peer) { +func (s *Service) mapPeers(peers []p2p.Peer) (out []Peer) { out = make([]Peer, 0, len(peers)) for _, peer := range peers { - out = append(out, Peer{ - Address: peer.Address, - FullNode: peer.FullNode, - }) + bzz, err := s.addressBook.Get(peer.Address) + if err != nil { + s.logger.Debug("get bzz address from addressbook", "address", peer.Address, "error", err) + } + p := Peer{ + Address: peer.Address, + FullNode: peer.FullNode, + Connections: peer.Connections, + } + if bzz != nil { + p.Underlay = bzz.Underlay + } + out = append(out, p) } return } diff --git a/pkg/hive/hive.go b/pkg/hive/hive.go index 7d129446754..e8300245644 100644 --- a/pkg/hive/hive.go +++ b/pkg/hive/hive.go @@ -304,6 +304,8 @@ func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) { wg.Done() }() + s.logger.Debug("received peer", "overlay", swarm.NewAddress(newPeer.Overlay), "underlay", multiUnderlay) + ctx, cancel := context.WithTimeout(ctx, pingTimeout) defer cancel() diff --git a/pkg/node/node.go b/pkg/node/node.go index ad96a94ac88..8005dc92e37 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -131,6 +131,7 @@ type Options struct { DebugAPIAddr string Addr string NATAddr string + EnableQUIC bool EnableWS bool WelcomeMessage string Bootnodes []string @@ -639,6 +640,7 @@ func NewBee( p2ps, err := libp2p.New(ctx, signer, networkID, swarmAddress, addr, addressbook, stateStore, lightNodes, logger, tracer, libp2p.Options{ PrivateKey: libp2pPrivateKey, NATAddr: o.NATAddr, + EnableQUIC: o.EnableQUIC, EnableWS: o.EnableWS, WelcomeMessage: o.WelcomeMessage, FullNode: o.FullNodeMode, @@ -1104,6 +1106,7 @@ func NewBee( Steward: steward, SyncStatus: syncStatusFn, NodeStatus: nodeStatus, + AddressBook: addressbook, } if o.APIAddr != "" { diff --git a/pkg/p2p/discover.go b/pkg/p2p/discover.go index 98687273d48..bd603b7b7a8 100644 --- a/pkg/p2p/discover.go +++ b/pkg/p2p/discover.go @@ -8,7 +8,8 @@ import ( "context" "errors" "fmt" - "math/rand" + "sort" + "strings" ma "github.com/multiformats/go-multiaddr" madns "github.com/multiformats/go-multiaddr-dns" @@ -35,9 +36,11 @@ func Discover(ctx context.Context, addr ma.Multiaddr, f func(ma.Multiaddr) (bool return false, errors.New("non-resolvable API endpoint") } - rand.Shuffle(len(addrs), func(i, j int) { - addrs[i], addrs[j] = addrs[j], addrs[i] + // Prioritize QUIC transport connections over the others. + sort.Slice(addrs, func(i, j int) bool { + return strings.Contains(addrs[i].String(), "/quic") }) + for _, addr := range addrs { stopped, err := Discover(ctx, addr, f) if err != nil { diff --git a/pkg/p2p/libp2p/connections_test.go b/pkg/p2p/libp2p/connections_test.go index 6f560e3ed7d..9179d87039b 100644 --- a/pkg/p2p/libp2p/connections_test.go +++ b/pkg/p2p/libp2p/connections_test.go @@ -1129,6 +1129,37 @@ func TestReachabilityUpdate(t *testing.T) { } } +func TestConnectWithEnabledQUICTransports(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + n1, overlay1 := newService(t, 1, libp2pServiceOpts{ + libp2pOpts: libp2p.Options{ + EnableQUIC: true, + FullNode: true, + }, + }) + + n2, overlay2 := newService(t, 1, libp2pServiceOpts{ + libp2pOpts: libp2p.Options{ + EnableQUIC: true, + FullNode: true, + }, + }) + + addr1 := serviceUnderlayAddress(t, n1, ma.P_QUIC_V1) + if addr1 == nil { + t.Fatal("QUIC protocol address not found") + } + + if _, err := n2.Connect(ctx, addr1); err != nil { + t.Fatal(err) + } + + expectPeers(t, n2, overlay1) + expectPeersEventually(t, n1, overlay2) +} + func testUserAgentLogLine(t *testing.T, logs *buffer, substring string) { t.Helper() diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index d7250a517fb..838da3fca23 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -17,46 +17,46 @@ import ( "sync" "time" + ocprom "contrib.go.opencensus.io/exporter/prometheus" "github.com/ethersphere/bee" "github.com/ethersphere/bee/pkg/addressbook" "github.com/ethersphere/bee/pkg/bzz" beecrypto "github.com/ethersphere/bee/pkg/crypto" "github.com/ethersphere/bee/pkg/log" + m2 "github.com/ethersphere/bee/pkg/metrics" "github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/blocklist" "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/breaker" - handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake" + "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake" "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/reacher" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/topology/lightnode" "github.com/ethersphere/bee/pkg/tracing" - libp2p "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" - network "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/network" libp2ppeer "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" - protocol "github.com/libp2p/go-libp2p/core/protocol" - autonat "github.com/libp2p/go-libp2p/p2p/host/autonat" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/host/autonat" basichost "github.com/libp2p/go-libp2p/p2p/host/basic" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" + rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager" lp2pswarm "github.com/libp2p/go-libp2p/p2p/net/swarm" libp2pping "github.com/libp2p/go-libp2p/p2p/protocol/ping" + libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic" "github.com/libp2p/go-libp2p/p2p/transport/tcp" ws "github.com/libp2p/go-libp2p/p2p/transport/websocket" - ma "github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multistream" - "go.uber.org/atomic" - - ocprom "contrib.go.opencensus.io/exporter/prometheus" - m2 "github.com/ethersphere/bee/pkg/metrics" - rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" + "golang.org/x/exp/maps" ) // loggerName is the tree path name of the logger for this package. @@ -122,6 +122,7 @@ type lightnodes interface { type Options struct { PrivateKey *ecdsa.PrivateKey NATAddr string + EnableQUIC bool EnableWS bool FullNode bool LightNodeLimit int @@ -156,6 +157,9 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay var listenAddrs []string if ip4Addr != "" { listenAddrs = append(listenAddrs, fmt.Sprintf("/ip4/%s/tcp/%s", ip4Addr, port)) + if o.EnableQUIC { + listenAddrs = append(listenAddrs, fmt.Sprintf("/ip4/%s/udp/%s/quic-v1", ip4Addr, port)) + } if o.EnableWS { listenAddrs = append(listenAddrs, fmt.Sprintf("/ip4/%s/tcp/%s/ws", ip4Addr, port)) } @@ -163,6 +167,9 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay if ip6Addr != "" { listenAddrs = append(listenAddrs, fmt.Sprintf("/ip6/%s/tcp/%s", ip6Addr, port)) + if o.EnableQUIC { + listenAddrs = append(listenAddrs, fmt.Sprintf("/ip6/%s/udp/%s/quic-v1", ip6Addr, port)) + } if o.EnableWS { listenAddrs = append(listenAddrs, fmt.Sprintf("/ip6/%s/tcp/%s/ws", ip6Addr, port)) } @@ -244,7 +251,9 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay transports := []libp2p.Option{ libp2p.Transport(tcp.NewTCPTransport, tcp.DisableReuseport()), } - + if o.EnableQUIC { + transports = append(transports, libp2p.Transport(libp2pquic.NewTransport)) + } if o.EnableWS { transports = append(transports, libp2p.Transport(ws.New)) } @@ -644,24 +653,26 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) { return nil } -func (s *Service) Addresses() (addreses []ma.Multiaddr, err error) { +func (s *Service) Addresses() ([]ma.Multiaddr, error) { + addresses := make(map[string]ma.Multiaddr) for _, addr := range s.host.Addrs() { a, err := buildUnderlayAddress(addr, s.host.ID()) if err != nil { return nil, err } - - addreses = append(addreses, a) + addresses[a.String()] = a } - if s.natAddrResolver != nil && len(addreses) > 0 { - a, err := s.natAddrResolver.Resolve(addreses[0]) - if err != nil { - return nil, err + if s.natAddrResolver != nil { + for _, addr := range addresses { + a, err := s.natAddrResolver.Resolve(addr) + if err != nil { + return nil, err + } + addresses[a.String()] = a } - addreses = append(addreses, a) } - return addreses, nil + return maps.Values(addresses), nil } func (s *Service) NATManager() basichost.NATManager { @@ -695,7 +706,7 @@ func (s *Service) Blocklist(overlay swarm.Address, duration time.Duration, reaso } func buildHostAddress(peerID libp2ppeer.ID) (ma.Multiaddr, error) { - return ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", peerID.Pretty())) + return ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", peerID)) } func buildUnderlayAddress(addr ma.Multiaddr, peerID libp2ppeer.ID) (ma.Multiaddr, error) { diff --git a/pkg/p2p/libp2p/libp2p_test.go b/pkg/p2p/libp2p/libp2p_test.go index 378d4dbc9d3..769694f382e 100644 --- a/pkg/p2p/libp2p/libp2p_test.go +++ b/pkg/p2p/libp2p/libp2p_test.go @@ -152,12 +152,30 @@ func expectPeersEventually(t *testing.T, s *libp2p.Service, addrs ...swarm.Addre } } -func serviceUnderlayAddress(t *testing.T, s *libp2p.Service) multiaddr.Multiaddr { +// serviceUnderlayAddress returns the first multiaddress of the service that +// contains the given protocol code. If no protocol code is given, the first +// address is returned. +func serviceUnderlayAddress(t *testing.T, s *libp2p.Service, codes ...int) multiaddr.Multiaddr { t.Helper() addrs, err := s.Addresses() if err != nil { t.Fatal(err) } - return addrs[0] + + if len(codes) == 0 { + return addrs[0] + } + + for _, addr := range addrs { + for _, proto := range addr.Protocols() { + for _, code := range codes { + if proto.Code == code { + return addr + } + } + } + } + + return nil } diff --git a/pkg/p2p/libp2p/peer.go b/pkg/p2p/libp2p/peer.go index c8eeb1578c2..f951664cbf4 100644 --- a/pkg/p2p/libp2p/peer.go +++ b/pkg/p2p/libp2p/peer.go @@ -119,10 +119,14 @@ func (r *peerRegistry) peers() []p2p.Peer { r.mu.RLock() peers := make([]p2p.Peer, 0, len(r.overlays)) for p, a := range r.overlays { - peers = append(peers, p2p.Peer{ + peer := p2p.Peer{ Address: a, FullNode: r.full[p], - }) + } + for conn := range r.connections[p] { + peer.Connections = append(peer.Connections, conn.RemoteMultiaddr()) + } + peers = append(peers, peer) } r.mu.RUnlock() sort.Slice(peers, func(i, j int) bool { diff --git a/pkg/p2p/libp2p/static_resolver.go b/pkg/p2p/libp2p/static_resolver.go index c5416862e1b..04ac10b0677 100644 --- a/pkg/p2p/libp2p/static_resolver.go +++ b/pkg/p2p/libp2p/static_resolver.go @@ -56,20 +56,26 @@ func (r *staticAddressResolver) Resolve(observedAddress ma.Multiaddr) (ma.Multia return observedAddress, nil } - var multiProto string + var elements []string if r.multiProto != "" { - multiProto = r.multiProto + elements = append(elements, r.multiProto) } else { - multiProto = strings.Join(observedAddrSplit[:3], "/") + elements = append(elements, observedAddrSplit[:3]...) } - var port string + elements = append(elements, observedAddrSplit[3]) + if r.port != "" { - port = r.port + elements = append(elements, r.port) } else { - port = observedAddrSplit[4] + elements = append(elements, observedAddrSplit[4]) } - a, err := ma.NewMultiaddr(multiProto + "/" + observedAddrSplit[3] + "/" + port) + + if len(observedAddrSplit) > 5 { + elements = append(elements, observedAddrSplit[5:]...) + } + + a, err := ma.NewMultiaddr(strings.Join(elements, "/")) if err != nil { return nil, err } diff --git a/pkg/p2p/libp2p/static_resolver_test.go b/pkg/p2p/libp2p/static_resolver_test.go index 992d7500004..282788c1012 100644 --- a/pkg/p2p/libp2p/static_resolver_test.go +++ b/pkg/p2p/libp2p/static_resolver_test.go @@ -33,6 +33,12 @@ func TestStaticAddressResolver(t *testing.T) { observableAddress: "/ip4/127.0.0.1/tcp/7071/p2p/16Uiu2HAkyyGKpjBiCkVqCKoJa6RzzZw9Nr7hGogsMPcdad1KyMmd", want: "/ip4/192.168.1.34/tcp/7071/p2p/16Uiu2HAkyyGKpjBiCkVqCKoJa6RzzZw9Nr7hGogsMPcdad1KyMmd", }, + { + name: "replace ip v4 with quic", + natAddr: "192.168.1.34:", + observableAddress: "/ip4/127.0.0.1/udp/7071/quic-v1/p2p/16Uiu2HAkyyGKpjBiCkVqCKoJa6RzzZw9Nr7hGogsMPcdad1KyMmd", + want: "/ip4/192.168.1.34/udp/7071/quic-v1/p2p/16Uiu2HAkyyGKpjBiCkVqCKoJa6RzzZw9Nr7hGogsMPcdad1KyMmd", + }, { name: "replace ip v6", natAddr: "[2001:db8::8a2e:370:1111]:", @@ -105,13 +111,13 @@ func TestStaticAddressResolver(t *testing.T) { if err != nil { t.Fatal(err) } - got, err := r.Resolve(observableAddress) + have, err := r.Resolve(observableAddress) if err != nil { t.Fatal(err) } - if got.String() != tc.want { - t.Errorf("got %s, want %s", got, tc.want) + if have.String() != tc.want { + t.Errorf("Resolve(%s):\nhave %s\nwant %s", observableAddress, have, tc.want) } }) } diff --git a/pkg/p2p/p2p.go b/pkg/p2p/p2p.go index 4f7897be592..b42eee7bc06 100644 --- a/pkg/p2p/p2p.go +++ b/pkg/p2p/p2p.go @@ -191,6 +191,7 @@ type Peer struct { Address swarm.Address FullNode bool EthereumAddress []byte + Connections []ma.Multiaddr } // BlockListedPeer holds information about a Peer that is blocked. diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index 277eeba331a..86262485cea 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -13,6 +13,8 @@ import ( "math/big" "math/rand" "path/filepath" + "sort" + "strings" "sync" "time" @@ -35,8 +37,8 @@ const loggerName = "kademlia" const ( maxConnAttempts = 1 // when there is maxConnAttempts failed connect calls for a given peer it is considered non-connectable - maxBootNodeAttempts = 3 // how many attempts to dial to boot-nodes before giving up - maxNeighborAttempts = 3 // how many attempts to dial to boot-nodes before giving up + maxBootNodeAttempts = 3 // how many attempts to dial to one boot-node before giving up + maxNeighborAttempts = 3 // how many attempts to dial to neighbor before giving up addPeerBatchSize = 500 @@ -135,6 +137,11 @@ func newKadOptions(o Options) kadOptions { LowWaterMark: defaultValInt(o.LowWaterMark, defaultLowWaterMark), } + // Prioritize QUIC transport connections over the others. + sort.Slice(ko.Bootnodes, func(i, j int) bool { + return strings.Contains(ko.Bootnodes[i].String(), "/quic") + }) + if ko.SaturationFunc == nil { ko.SaturationFunc = makeSaturationFunc(ko) } @@ -774,54 +781,51 @@ func (k *Kad) previouslyConnected() []swarm.Address { func (k *Kad) connectBootNodes(ctx context.Context) { loggerV1 := k.logger.V(1).Register() - var attempts, connected int - totalAttempts := maxBootNodeAttempts * len(k.opt.Bootnodes) - ctx, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() - for _, addr := range k.opt.Bootnodes { - if attempts >= totalAttempts || connected >= 3 { - return - } - - if _, err := p2p.Discover(ctx, addr, func(addr ma.Multiaddr) (stop bool, err error) { - loggerV1.Debug("connecting to bootnode", "bootnode_address", addr) - if attempts >= maxBootNodeAttempts { - return true, nil - } - bzzAddress, err := k.p2p.Connect(ctx, addr) - - attempts++ - k.metrics.TotalBootNodesConnectionAttempts.Inc() - - if err != nil { - if !errors.Is(err, p2p.ErrAlreadyConnected) { + var connected int + for i := maxBootNodeAttempts; i > 0; i-- { + for _, addr := range k.opt.Bootnodes { + stop, err := p2p.Discover(ctx, addr, func(addr ma.Multiaddr) (bool, error) { + loggerV1.Debug("connecting to bootnode", "bootnode_address", addr) + k.metrics.TotalBootNodesConnectionAttempts.Inc() + bzzAddress, err := k.p2p.Connect(ctx, addr) + if err != nil { + if !errors.Is(err, p2p.ErrAlreadyConnected) { + k.logger.Debug("connect to bootnode failed", "bootnode_address", addr, "error", err) + k.logger.Warning("connect to bootnode failed", "bootnode_address", addr) + return false, err + } k.logger.Debug("connect to bootnode failed", "bootnode_address", addr, "error", err) - k.logger.Warning("connect to bootnode failed", "bootnode_address", addr) + return false, nil + } + + if err := k.onConnected(ctx, bzzAddress.Overlay); err != nil { return false, err } - k.logger.Debug("connect to bootnode failed", "bootnode_address", addr, "error", err) - return false, nil - } - if err := k.onConnected(ctx, bzzAddress.Overlay); err != nil { - return false, err + k.metrics.TotalOutboundConnections.Inc() + k.collector.Record(bzzAddress.Overlay, im.PeerLogIn(time.Now(), im.PeerConnectionDirectionOutbound)) + loggerV1.Debug("connected to bootnode", "bootnode_address", addr) + connected++ + + // connect to max 3 bootnodes + return connected >= 3, nil + }) + if err != nil && !errors.Is(err, context.Canceled) { + k.logger.Debug("discover to bootnode failed", "bootnode_address", addr, "error", err) + k.logger.Warning("discover to bootnode failed", "bootnode_address", addr) + } + if stop { + return } - - k.metrics.TotalOutboundConnections.Inc() - k.collector.Record(bzzAddress.Overlay, im.PeerLogIn(time.Now(), im.PeerConnectionDirectionOutbound)) - loggerV1.Debug("connected to bootnode", "bootnode_address", addr) - connected++ - - // connect to max 3 bootnodes - return connected >= 3, nil - }); err != nil && !errors.Is(err, context.Canceled) { - k.logger.Debug("discover to bootnode failed", "bootnode_address", addr, "error", err) - k.logger.Warning("discover to bootnode failed", "bootnode_address", addr) - return } } + + if connected == 0 { + k.logger.Warning("could not connect to any bootnode", "bootnode_addresses", k.opt.Bootnodes) + } } // binSaturated indicates whether a certain bin is saturated or not.