From 4b680350fe2f9bf3c6ee9addf7355b5cf871b963 Mon Sep 17 00:00:00 2001 From: Maelkum Date: Thu, 11 Jul 2024 23:38:22 +0200 Subject: [PATCH 1/5] Notifiee - on connection, don't use the peerstore as it will be empty on the initial connect --- node/notifiee.go | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/node/notifiee.go b/node/notifiee.go index 1fcc43a8..2a103721 100644 --- a/node/notifiee.go +++ b/node/notifiee.go @@ -2,6 +2,7 @@ package node import ( "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" "github.com/rs/zerolog" @@ -29,21 +30,31 @@ func (n *connectionNotifiee) Connected(network network.Network, conn network.Con peerID := conn.RemotePeer() maddr := conn.RemoteMultiaddr() laddr := conn.LocalMultiaddr() - addrInfo := network.Peerstore().PeerInfo(peerID) - n.log.Debug(). - Str("peer", peerID.String()). - Str("remote_address", maddr.String()). - Str("local_address", laddr.String()). - Interface("addr_info", addrInfo). - Msg("peer connected") + // We could save only the mutliaddress from which we receive this connection. However, we could theoretically have multiple connections + // and there's no reason to limit ourselves to a single address. peer := blockless.Peer{ ID: peerID, MultiAddr: maddr.String(), - AddrInfo: addrInfo, + // AddrInfo struct basically repeats the above info (multiaddress). + AddrInfo: peer.AddrInfo{ + ID: peerID, + Addrs: make([]multiaddr.Multiaddr, 0), + }, + } + + for _, conn := range network.ConnsToPeer(conn.RemotePeer()) { + peer.AddrInfo.Addrs = append(peer.AddrInfo.Addrs, conn.RemoteMultiaddr()) } + n.log.Debug(). + Str("peer", peerID.String()). + Str("remote_address", maddr.String()). + Str("local_address", laddr.String()). + Any("addr_info", peer.AddrInfo). + Msg("peer connected") + // Store the peer info. err := n.store.SavePeer(peer) if err != nil { @@ -53,7 +64,6 @@ func (n *connectionNotifiee) Connected(network network.Network, conn network.Con func (n *connectionNotifiee) Disconnected(_ network.Network, conn network.Conn) { - // TODO: Check - do we want to remove peer after he's been disconnected. maddr := conn.RemoteMultiaddr() laddr := conn.LocalMultiaddr() From ddfb5f8a9d4c0c09f827fbe3c0b356fc325040cc Mon Sep 17 00:00:00 2001 From: Maelkum Date: Fri, 12 Jul 2024 13:47:35 +0200 Subject: [PATCH 2/5] Slight changes for peer discovery loop --- host/dht.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/host/dht.go b/host/dht.go index 0d5b8378..7bd6b574 100644 --- a/host/dht.go +++ b/host/dht.go @@ -31,12 +31,17 @@ func (h *Host) DiscoverPeers(ctx context.Context, topic string) error { connected := uint(0) findPeers: for { - peers, err := discovery.FindPeers(ctx, topic) + h.log.Trace().Msg("starting peer discovery") + + // Using a list instead of a channel. If this starts getting too large switch back. + peers, err := util.FindPeers(ctx, discovery, topic) if err != nil { return fmt.Errorf("could not find peers: %w", err) } - for peer := range peers { + h.log.Trace().Int("count", len(peers)).Str("topic", topic).Msg("discovered peers") + + for _, peer := range peers { // Skip self. if peer.ID == h.ID() { continue @@ -50,10 +55,7 @@ findPeers: err = h.Connect(ctx, peer) if err != nil { - h.log.Debug(). - Err(err). - Str("peer", peer.String()). - Msg("could not connect to peer") + h.log.Debug().Err(err).Str("peer", peer.ID.String()).Msg("could not connect to discovered peer") continue } @@ -115,6 +117,7 @@ func (h *Host) initDHT(ctx context.Context) (*dht.IpfsDHT, error) { added := uint(0) addLimit := h.cfg.DialBackPeersLimit + var dialbackPeers []blockless.Peer for _, peer := range h.cfg.DialBackPeers { peer := peer @@ -130,7 +133,7 @@ func (h *Host) initDHT(ctx context.Context) (*dht.IpfsDHT, error) { ma, err := multiaddr.NewMultiaddr(peer.MultiAddr) if err != nil { h.log.Warn().Str("peer", peer.ID.String()).Str("addr", peer.MultiAddr).Msg("invalid multiaddress for dial-back peer, skipping") - break + continue } h.log.Debug().Str("peer", peer.ID.String()).Msg("using last known multiaddress for dial-back peer") @@ -140,10 +143,12 @@ func (h *Host) initDHT(ctx context.Context) (*dht.IpfsDHT, error) { h.log.Debug().Str("peer", peer.ID.String()).Interface("addr_info", peer.AddrInfo).Msg("adding dial-back peer") - bootNodes = append(bootNodes, peer) + dialbackPeers = append(dialbackPeers, peer) added++ } + bootNodes = append(bootNodes, dialbackPeers...) + // Connect to the bootstrap nodes. var wg sync.WaitGroup for _, bootNode := range bootNodes { @@ -164,9 +169,13 @@ func (h *Host) initDHT(ctx context.Context) (*dht.IpfsDHT, error) { err := h.Host.Connect(ctx, peerAddr) if err != nil { if err.Error() != errNoGoodAddresses { - h.log.Error().Err(err).Str("peer", peerAddr.ID.String()).Interface("addr_info", peerAddr).Msg("could not connect to bootstrap node") + h.log.Error().Err(err).Str("peer", peer.ID.String()).Interface("addr_info", peerAddr).Msg("could not connect to bootstrap node") } + + return } + + h.log.Debug().Str("peer", peer.ID.String()).Any("addr_info", peerAddr).Msg("connected to known peer") }(bootNode) } From 6ccb9482c545ab0c487ffa52aa0c23e0a1f05ae9 Mon Sep 17 00:00:00 2001 From: Maelkum Date: Fri, 12 Jul 2024 15:42:03 +0200 Subject: [PATCH 3/5] Improve mechanism for connecting to boot nodes - allow treating failure to connect to boot nodes as a halting error - retry connection to boot nodes periodically in the background - decouple boot nodes/dialback peers from peer discovery --- cmd/node/main.go | 1 + config/config.go | 19 +-- host/config.go | 10 ++ host/{dht.go => discovery.go} | 250 +++++++++++++++++++++------------- host/params.go | 2 + node/run.go | 7 +- 6 files changed, 186 insertions(+), 103 deletions(-) rename host/{dht.go => discovery.go} (55%) diff --git a/cmd/node/main.go b/cmd/node/main.go index c1003e17..1d521a37 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -127,6 +127,7 @@ func run() int { host.WithDialBackWebsocketPort(cfg.Connectivity.WebsocketDialbackPort), host.WithWebsocket(cfg.Connectivity.Websocket), host.WithWebsocketPort(cfg.Connectivity.WebsocketPort), + host.WithMustReachBootNodes(cfg.Connectivity.MustReachBootNodes), } if !cfg.Connectivity.NoDialbackPeers { diff --git a/config/config.go b/config/config.go index afb9e13b..c2967e0a 100644 --- a/config/config.go +++ b/config/config.go @@ -59,15 +59,16 @@ type Log struct { // Connectivity describes the libp2p host that the node will use. type Connectivity struct { - Address string `koanf:"address" flag:"address,a"` - Port uint `koanf:"port" flag:"port,p"` - PrivateKey string `koanf:"private-key" flag:"private-key"` - DialbackAddress string `koanf:"dialback-address" flag:"dialback-address"` - DialbackPort uint `koanf:"dialback-port" flag:"dialback-port"` - Websocket bool `koanf:"websocket" flag:"websocket,w"` - WebsocketPort uint `koanf:"websocket-port" flag:"websocket-port"` + Address string `koanf:"address" flag:"address,a"` + Port uint `koanf:"port" flag:"port,p"` + PrivateKey string `koanf:"private-key" flag:"private-key"` + DialbackAddress string `koanf:"dialback-address" flag:"dialback-address"` + DialbackPort uint `koanf:"dialback-port" flag:"dialback-port"` + Websocket bool `koanf:"websocket" flag:"websocket,w"` + WebsocketPort uint `koanf:"websocket-port" flag:"websocket-port"` WebsocketDialbackPort uint `koanf:"websocket-dialback-port" flag:"websocket-dialback-port"` - NoDialbackPeers bool `koanf:"no-dialback-peers" flag:"no-dialback-peers"` + NoDialbackPeers bool `koanf:"no-dialback-peers" flag:"no-dialback-peers"` + MustReachBootNodes bool `koanf:"must-reach-boot-nodes" flag:"must-reach-boot-nodes"` } type Head struct { @@ -139,6 +140,8 @@ func getFlagDescription(flag string) string { return "memory limit (kB) for Blockless Functions" case "no-dialback-peers": return "start without dialing back peers from previous runs" + case "must-reach-boot-nodes": + return "halt node if we fail to reach boot nodes on start" default: return "" } diff --git a/host/config.go b/host/config.go index dd946d17..35580dbc 100644 --- a/host/config.go +++ b/host/config.go @@ -15,6 +15,7 @@ var defaultConfig = Config{ DialBackPeersLimit: 100, DiscoveryInterval: 10 * time.Second, Websocket: false, + MustReachBootNodes: defaultMustReachBootNodes, } // Config represents the Host configuration. @@ -33,6 +34,8 @@ type Config struct { DialBackAddress string DialBackPort uint DialBackWebsocketPort uint + + MustReachBootNodes bool } // WithPrivateKey specifies the private key for the Host. @@ -108,3 +111,10 @@ func WithWebsocketPort(port uint) func(*Config) { cfg.WebsocketPort = port } } + +// WithMustReachBootNodes specifies if we should treat failure to reach boot nodes as a halting error. +func WithMustReachBootNodes(b bool) func(*Config) { + return func(cfg *Config) { + cfg.MustReachBootNodes = b + } +} diff --git a/host/dht.go b/host/discovery.go similarity index 55% rename from host/dht.go rename to host/discovery.go index 7bd6b574..9ad01ff6 100644 --- a/host/dht.go +++ b/host/discovery.go @@ -3,7 +3,6 @@ package host import ( "context" "fmt" - "sync" "time" dht "github.com/libp2p/go-libp2p-kad-dht" @@ -11,10 +10,164 @@ import ( "github.com/libp2p/go-libp2p/p2p/discovery/routing" "github.com/libp2p/go-libp2p/p2p/discovery/util" "github.com/multiformats/go-multiaddr" + "golang.org/x/sync/errgroup" "github.com/blocklessnetwork/b7s/models/blockless" ) +func (h *Host) ConnectToKnownPeers(ctx context.Context) error { + + err := h.ConnectToBootNodes(ctx) + if err != nil { + return fmt.Errorf("could not connect to bootstrap nodes: %w", err) + } + + err = h.ConnectToDialbackPeers(ctx) + if err != nil { + h.log.Warn().Err(err).Msg("could not connect to dialback peers") + } + + // Spin up a goroutine to maintain connections to boot nodes in the background. + // In case boot nodes drops out, we want to connect back to it. + go func(ctx context.Context) { + ticker := time.NewTicker(h.cfg.DiscoveryInterval) + for { + select { + case <-ticker.C: + err := h.ConnectToBootNodes(ctx) + if err != nil { + h.log.Warn().Err(err).Msg("could not connect to boot nodes") + } + + case <-ctx.Done(): + ticker.Stop() + h.log.Info().Msg("stopping boot node monitoring") + } + } + }(ctx) + + return nil +} + +func (h *Host) ConnectToBootNodes(ctx context.Context) error { + + // Bootstrap nodes we try to connect to on start. + var peers []blockless.Peer + for _, addr := range h.cfg.BootNodes { + + addrInfo, err := peer.AddrInfoFromP2pAddr(addr) + if err != nil { + + if h.cfg.MustReachBootNodes { + return fmt.Errorf("could not get boot node address info (address: %s): %w", addr.String(), err) + } + + h.log.Warn().Err(err).Str("address", addr.String()).Msg("could not get address info for boot node - skipping") + continue + } + + node := blockless.Peer{ + ID: addrInfo.ID, + AddrInfo: *addrInfo, + } + + peers = append(peers, node) + } + + err := h.connectToPeers(ctx, peers) + if err != nil { + if h.cfg.MustReachBootNodes { + return fmt.Errorf("could not connect to bootstrap nodes: %w", err) + } + + h.log.Error().Err(err).Msg("could not connect to bootstrap nodes") + } + + return nil +} + +func (h *Host) ConnectToDialbackPeers(ctx context.Context) error { + + // Dial-back peers are peers we're familiar with from before. + // We may want to limit the number of dial back peers we use. + added := uint(0) + addLimit := h.cfg.DialBackPeersLimit + + var peers []blockless.Peer + for _, peer := range h.cfg.DialBackPeers { + + // If the limit of dial-back peers is set and we've reached it - stop now. + if addLimit != 0 && added >= addLimit { + h.log.Info().Uint("limit", addLimit).Msg("reached limit for dial-back peers") + break + } + + // This should not happen anymore as we should have addresses, but in case it did - use the last known multiaddress. + if len(peer.AddrInfo.Addrs) == 0 { + + ma, err := multiaddr.NewMultiaddr(peer.MultiAddr) + if err != nil { + h.log.Warn().Str("peer", peer.ID.String()).Str("addr", peer.MultiAddr).Msg("invalid multiaddress for dial-back peer, skipping") + continue + } + + h.log.Debug().Str("peer", peer.ID.String()).Msg("using last known multiaddress for dial-back peer") + + peer.AddrInfo.Addrs = []multiaddr.Multiaddr{ma} + } + + peers = append(peers, peer) + added++ + } + + err := h.connectToPeers(ctx, peers) + if err != nil { + return fmt.Errorf("could not connect to dial-back peers: %w", err) + } + + return nil +} + +func (h *Host) connectToPeers(ctx context.Context, peers []blockless.Peer) error { + + // Connect to the bootstrap nodes. + var errGroup errgroup.Group + for _, peer := range peers { + peer := peer + + // Should not happen other than misconfig, but we shouldn't dial self. + if peer.ID == h.ID() { + continue + } + + // Skip peers we're already connected to. + connections := h.Network().ConnsToPeer(peer.ID) + if len(connections) > 0 { + continue + } + + errGroup.Go(func() error { + err := h.Host.Connect(ctx, peer.AddrInfo) + // Log errors because error group Wait() method will return only the first non-nil error. We would like to be aware of all of them. + if err != nil { + h.log.Error().Err(err).Str("peer", peer.ID.String()).Any("addr_info", peer.AddrInfo).Msg("could not connect to bootstrap node") + return err + } + + h.log.Debug().Str("peer", peer.ID.String()).Any("addr_info", peer.AddrInfo).Msg("connected to peer") + return nil + }) + } + + // Wait until we know the outcome of all connection attempts. + err := errGroup.Wait() + if err != nil { + return fmt.Errorf("some connections failed: %w", err) + } + + return nil +} + func (h *Host) DiscoverPeers(ctx context.Context, topic string) error { // Initialize DHT. @@ -34,6 +187,7 @@ findPeers: h.log.Trace().Msg("starting peer discovery") // Using a list instead of a channel. If this starts getting too large switch back. + // TODO: There's an upper limit config option, set a sane default. peers, err := util.FindPeers(ctx, discovery, topic) if err != nil { return fmt.Errorf("could not find peers: %w", err) @@ -59,7 +213,7 @@ findPeers: continue } - h.log.Info().Str("peer", peer.ID.String()).Msg("connected to peer") + h.log.Info().Str("peer", peer.ID.String()).Msg("connected to discovered peer") connected++ @@ -90,97 +244,5 @@ func (h *Host) initDHT(ctx context.Context) (*dht.IpfsDHT, error) { return nil, fmt.Errorf("could not bootstrap the DHT: %w", err) } - // Nodes we will try to connect to on start. - var bootNodes []blockless.Peer - - // Add explicitly specified nodes first. - for _, addr := range h.cfg.BootNodes { - addr := addr - - addrInfo, err := peer.AddrInfoFromP2pAddr(addr) - if err != nil { - h.log.Warn().Err(err).Str("address", addr.String()).Msg("could not get addrinfo for boot node - skipping") - continue - } - - node := blockless.Peer{ - ID: addrInfo.ID, - AddrInfo: *addrInfo, - } - - bootNodes = append(bootNodes, node) - } - - // Add the dial-back peers to the list of bootstrap nodes if they do not already exist. - - // We may want to limit the number of dial back peers we use. - added := uint(0) - addLimit := h.cfg.DialBackPeersLimit - - var dialbackPeers []blockless.Peer - for _, peer := range h.cfg.DialBackPeers { - peer := peer - - // If the limit of dial-back peers is set and we've reached it - stop now. - if addLimit != 0 && added >= addLimit { - h.log.Info().Uint("limit", addLimit).Msg("reached limit for dial-back peers") - break - } - - // If we don't have any addresses, add the multiaddress we (hopefully) do have - last one we received a connection from. - if len(peer.AddrInfo.Addrs) == 0 { - - ma, err := multiaddr.NewMultiaddr(peer.MultiAddr) - if err != nil { - h.log.Warn().Str("peer", peer.ID.String()).Str("addr", peer.MultiAddr).Msg("invalid multiaddress for dial-back peer, skipping") - continue - } - - h.log.Debug().Str("peer", peer.ID.String()).Msg("using last known multiaddress for dial-back peer") - - peer.AddrInfo.Addrs = []multiaddr.Multiaddr{ma} - } - - h.log.Debug().Str("peer", peer.ID.String()).Interface("addr_info", peer.AddrInfo).Msg("adding dial-back peer") - - dialbackPeers = append(dialbackPeers, peer) - added++ - } - - bootNodes = append(bootNodes, dialbackPeers...) - - // Connect to the bootstrap nodes. - var wg sync.WaitGroup - for _, bootNode := range bootNodes { - bootNode := bootNode - - // Skip peers we're already connected to (perhaps a dial-back peer was also a boot node). - connections := h.Network().ConnsToPeer(bootNode.ID) - if len(connections) > 0 { - continue - } - - wg.Add(1) - go func(peer blockless.Peer) { - defer wg.Done() - - peerAddr := peer.AddrInfo - - err := h.Host.Connect(ctx, peerAddr) - if err != nil { - if err.Error() != errNoGoodAddresses { - h.log.Error().Err(err).Str("peer", peer.ID.String()).Interface("addr_info", peerAddr).Msg("could not connect to bootstrap node") - } - - return - } - - h.log.Debug().Str("peer", peer.ID.String()).Any("addr_info", peerAddr).Msg("connected to known peer") - }(bootNode) - } - - // Wait until we know the outcome of all connection attempts. - wg.Wait() - return kademliaDHT, nil } diff --git a/host/params.go b/host/params.go index 331c767d..0ee3c2e9 100644 --- a/host/params.go +++ b/host/params.go @@ -3,4 +3,6 @@ package host const ( // Sentinel error for DHT. errNoGoodAddresses = "no good addresses" + + defaultMustReachBootNodes = false ) diff --git a/node/run.go b/node/run.go index ce6b6f0f..c52af9a0 100644 --- a/node/run.go +++ b/node/run.go @@ -17,7 +17,12 @@ import ( // Run will start the main loop for the node. func (n *Node) Run(ctx context.Context) error { - err := n.subscribeToTopics(ctx) + err := n.host.ConnectToKnownPeers(ctx) + if err != nil { + return fmt.Errorf("could not connect to known peers: %w", err) + } + + err = n.subscribeToTopics(ctx) if err != nil { return fmt.Errorf("could not subscribe to topics: %w", err) } From 51b9b886bcb2e21954e1ec17c5df2ffad69c286d Mon Sep 17 00:00:00 2001 From: Maelkum Date: Fri, 12 Jul 2024 16:03:40 +0200 Subject: [PATCH 4/5] Decouple boot node reachability interval from peer discovery interval --- host/config.go | 23 ++++++++++++++++------- host/discovery.go | 4 ++-- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/host/config.go b/host/config.go index 35580dbc..5ac81e1c 100644 --- a/host/config.go +++ b/host/config.go @@ -10,12 +10,13 @@ import ( // defaultConfig used to create Host. var defaultConfig = Config{ - PrivateKey: "", - ConnectionThreshold: 20, - DialBackPeersLimit: 100, - DiscoveryInterval: 10 * time.Second, - Websocket: false, - MustReachBootNodes: defaultMustReachBootNodes, + PrivateKey: "", + ConnectionThreshold: 20, + DialBackPeersLimit: 100, + DiscoveryInterval: 10 * time.Second, + Websocket: false, + BootNodesReachabilityCheckInterval: 1 * time.Minute, + MustReachBootNodes: defaultMustReachBootNodes, } // Config represents the Host configuration. @@ -35,7 +36,8 @@ type Config struct { DialBackPort uint DialBackWebsocketPort uint - MustReachBootNodes bool + BootNodesReachabilityCheckInterval time.Duration + MustReachBootNodes bool } // WithPrivateKey specifies the private key for the Host. @@ -118,3 +120,10 @@ func WithMustReachBootNodes(b bool) func(*Config) { cfg.MustReachBootNodes = b } } + +// WithBootNodesReachabilityInterval specifies how often should we recheck and reconnect to boot nodes. +func WithBootNodesReachabilityInterval(d time.Duration) func(cfg *Config) { + return func(cfg *Config) { + cfg.BootNodesReachabilityCheckInterval = d + } +} diff --git a/host/discovery.go b/host/discovery.go index 9ad01ff6..2273d963 100644 --- a/host/discovery.go +++ b/host/discovery.go @@ -30,7 +30,7 @@ func (h *Host) ConnectToKnownPeers(ctx context.Context) error { // Spin up a goroutine to maintain connections to boot nodes in the background. // In case boot nodes drops out, we want to connect back to it. go func(ctx context.Context) { - ticker := time.NewTicker(h.cfg.DiscoveryInterval) + ticker := time.NewTicker(h.cfg.BootNodesReachabilityCheckInterval) for { select { case <-ticker.C: @@ -41,7 +41,7 @@ func (h *Host) ConnectToKnownPeers(ctx context.Context) error { case <-ctx.Done(): ticker.Stop() - h.log.Info().Msg("stopping boot node monitoring") + h.log.Debug().Msg("stopping boot node reachability monitoring") } } }(ctx) From 129eeed4ba846cf97cd2c952e4d02b67315c1eb9 Mon Sep 17 00:00:00 2001 From: Maelkum Date: Mon, 15 Jul 2024 17:54:49 +0200 Subject: [PATCH 5/5] On node start subscribe to topics first --- node/run.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/node/run.go b/node/run.go index c52af9a0..95fb68d8 100644 --- a/node/run.go +++ b/node/run.go @@ -17,14 +17,14 @@ import ( // Run will start the main loop for the node. func (n *Node) Run(ctx context.Context) error { - err := n.host.ConnectToKnownPeers(ctx) + err := n.subscribeToTopics(ctx) if err != nil { - return fmt.Errorf("could not connect to known peers: %w", err) + return fmt.Errorf("could not subscribe to topics: %w", err) } - err = n.subscribeToTopics(ctx) + err = n.host.ConnectToKnownPeers(ctx) if err != nil { - return fmt.Errorf("could not subscribe to topics: %w", err) + return fmt.Errorf("could not connect to known peers: %w", err) } // Sync functions now in case they were removed from the storage.