diff --git a/cmd/node/main.go b/cmd/node/main.go index c1003e17..1d521a37 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -127,6 +127,7 @@ func run() int { host.WithDialBackWebsocketPort(cfg.Connectivity.WebsocketDialbackPort), host.WithWebsocket(cfg.Connectivity.Websocket), host.WithWebsocketPort(cfg.Connectivity.WebsocketPort), + host.WithMustReachBootNodes(cfg.Connectivity.MustReachBootNodes), } if !cfg.Connectivity.NoDialbackPeers { diff --git a/config/config.go b/config/config.go index afb9e13b..c2967e0a 100644 --- a/config/config.go +++ b/config/config.go @@ -59,15 +59,16 @@ type Log struct { // Connectivity describes the libp2p host that the node will use. type Connectivity struct { - Address string `koanf:"address" flag:"address,a"` - Port uint `koanf:"port" flag:"port,p"` - PrivateKey string `koanf:"private-key" flag:"private-key"` - DialbackAddress string `koanf:"dialback-address" flag:"dialback-address"` - DialbackPort uint `koanf:"dialback-port" flag:"dialback-port"` - Websocket bool `koanf:"websocket" flag:"websocket,w"` - WebsocketPort uint `koanf:"websocket-port" flag:"websocket-port"` + Address string `koanf:"address" flag:"address,a"` + Port uint `koanf:"port" flag:"port,p"` + PrivateKey string `koanf:"private-key" flag:"private-key"` + DialbackAddress string `koanf:"dialback-address" flag:"dialback-address"` + DialbackPort uint `koanf:"dialback-port" flag:"dialback-port"` + Websocket bool `koanf:"websocket" flag:"websocket,w"` + WebsocketPort uint `koanf:"websocket-port" flag:"websocket-port"` WebsocketDialbackPort uint `koanf:"websocket-dialback-port" flag:"websocket-dialback-port"` - NoDialbackPeers bool `koanf:"no-dialback-peers" flag:"no-dialback-peers"` + NoDialbackPeers bool `koanf:"no-dialback-peers" flag:"no-dialback-peers"` + MustReachBootNodes bool `koanf:"must-reach-boot-nodes" flag:"must-reach-boot-nodes"` } type Head struct { @@ -139,6 +140,8 @@ func getFlagDescription(flag string) string { return "memory limit (kB) for Blockless Functions" case "no-dialback-peers": return "start without dialing back peers from previous runs" + case "must-reach-boot-nodes": + return "halt node if we fail to reach boot nodes on start" default: return "" } diff --git a/host/config.go b/host/config.go index dd946d17..5ac81e1c 100644 --- a/host/config.go +++ b/host/config.go @@ -10,11 +10,13 @@ import ( // defaultConfig used to create Host. var defaultConfig = Config{ - PrivateKey: "", - ConnectionThreshold: 20, - DialBackPeersLimit: 100, - DiscoveryInterval: 10 * time.Second, - Websocket: false, + PrivateKey: "", + ConnectionThreshold: 20, + DialBackPeersLimit: 100, + DiscoveryInterval: 10 * time.Second, + Websocket: false, + BootNodesReachabilityCheckInterval: 1 * time.Minute, + MustReachBootNodes: defaultMustReachBootNodes, } // Config represents the Host configuration. @@ -33,6 +35,9 @@ type Config struct { DialBackAddress string DialBackPort uint DialBackWebsocketPort uint + + BootNodesReachabilityCheckInterval time.Duration + MustReachBootNodes bool } // WithPrivateKey specifies the private key for the Host. @@ -108,3 +113,17 @@ func WithWebsocketPort(port uint) func(*Config) { cfg.WebsocketPort = port } } + +// WithMustReachBootNodes specifies if we should treat failure to reach boot nodes as a halting error. +func WithMustReachBootNodes(b bool) func(*Config) { + return func(cfg *Config) { + cfg.MustReachBootNodes = b + } +} + +// WithBootNodesReachabilityInterval specifies how often should we recheck and reconnect to boot nodes. +func WithBootNodesReachabilityInterval(d time.Duration) func(cfg *Config) { + return func(cfg *Config) { + cfg.BootNodesReachabilityCheckInterval = d + } +} diff --git a/host/dht.go b/host/dht.go deleted file mode 100644 index 0d5b8378..00000000 --- a/host/dht.go +++ /dev/null @@ -1,177 +0,0 @@ -package host - -import ( - "context" - "fmt" - "sync" - "time" - - dht "github.com/libp2p/go-libp2p-kad-dht" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/p2p/discovery/routing" - "github.com/libp2p/go-libp2p/p2p/discovery/util" - "github.com/multiformats/go-multiaddr" - - "github.com/blocklessnetwork/b7s/models/blockless" -) - -func (h *Host) DiscoverPeers(ctx context.Context, topic string) error { - - // Initialize DHT. - dht, err := h.initDHT(ctx) - if err != nil { - return fmt.Errorf("could not initialize DHT: %w", err) - } - - discovery := routing.NewRoutingDiscovery(dht) - util.Advertise(ctx, discovery, topic) - - h.log.Debug().Msg("host started peer discovery") - - connected := uint(0) -findPeers: - for { - peers, err := discovery.FindPeers(ctx, topic) - if err != nil { - return fmt.Errorf("could not find peers: %w", err) - } - - for peer := range peers { - // Skip self. - if peer.ID == h.ID() { - continue - } - - // Skip peers we're already connected to. - connections := h.Network().ConnsToPeer(peer.ID) - if len(connections) > 0 { - continue - } - - err = h.Connect(ctx, peer) - if err != nil { - h.log.Debug(). - Err(err). - Str("peer", peer.String()). - Msg("could not connect to peer") - continue - } - - h.log.Info().Str("peer", peer.ID.String()).Msg("connected to peer") - - connected++ - - // Stop when we have reached connection threshold. - if connected >= h.cfg.ConnectionThreshold { - break findPeers - } - } - - time.Sleep(h.cfg.DiscoveryInterval) - } - - h.log.Info().Msg("peer discovery complete") - return nil -} - -func (h *Host) initDHT(ctx context.Context) (*dht.IpfsDHT, error) { - - // Start a DHT for use in peer discovery. Set the DHT to server mode. - kademliaDHT, err := dht.New(ctx, h.Host, dht.Mode(dht.ModeServer)) - if err != nil { - return nil, fmt.Errorf("could not create DHT: %w", err) - } - - // Bootstrap the DHT. - err = kademliaDHT.Bootstrap(ctx) - if err != nil { - return nil, fmt.Errorf("could not bootstrap the DHT: %w", err) - } - - // Nodes we will try to connect to on start. - var bootNodes []blockless.Peer - - // Add explicitly specified nodes first. - for _, addr := range h.cfg.BootNodes { - addr := addr - - addrInfo, err := peer.AddrInfoFromP2pAddr(addr) - if err != nil { - h.log.Warn().Err(err).Str("address", addr.String()).Msg("could not get addrinfo for boot node - skipping") - continue - } - - node := blockless.Peer{ - ID: addrInfo.ID, - AddrInfo: *addrInfo, - } - - bootNodes = append(bootNodes, node) - } - - // Add the dial-back peers to the list of bootstrap nodes if they do not already exist. - - // We may want to limit the number of dial back peers we use. - added := uint(0) - addLimit := h.cfg.DialBackPeersLimit - - for _, peer := range h.cfg.DialBackPeers { - peer := peer - - // If the limit of dial-back peers is set and we've reached it - stop now. - if addLimit != 0 && added >= addLimit { - h.log.Info().Uint("limit", addLimit).Msg("reached limit for dial-back peers") - break - } - - // If we don't have any addresses, add the multiaddress we (hopefully) do have - last one we received a connection from. - if len(peer.AddrInfo.Addrs) == 0 { - - ma, err := multiaddr.NewMultiaddr(peer.MultiAddr) - if err != nil { - h.log.Warn().Str("peer", peer.ID.String()).Str("addr", peer.MultiAddr).Msg("invalid multiaddress for dial-back peer, skipping") - break - } - - h.log.Debug().Str("peer", peer.ID.String()).Msg("using last known multiaddress for dial-back peer") - - peer.AddrInfo.Addrs = []multiaddr.Multiaddr{ma} - } - - h.log.Debug().Str("peer", peer.ID.String()).Interface("addr_info", peer.AddrInfo).Msg("adding dial-back peer") - - bootNodes = append(bootNodes, peer) - added++ - } - - // Connect to the bootstrap nodes. - var wg sync.WaitGroup - for _, bootNode := range bootNodes { - bootNode := bootNode - - // Skip peers we're already connected to (perhaps a dial-back peer was also a boot node). - connections := h.Network().ConnsToPeer(bootNode.ID) - if len(connections) > 0 { - continue - } - - wg.Add(1) - go func(peer blockless.Peer) { - defer wg.Done() - - peerAddr := peer.AddrInfo - - err := h.Host.Connect(ctx, peerAddr) - if err != nil { - if err.Error() != errNoGoodAddresses { - h.log.Error().Err(err).Str("peer", peerAddr.ID.String()).Interface("addr_info", peerAddr).Msg("could not connect to bootstrap node") - } - } - }(bootNode) - } - - // Wait until we know the outcome of all connection attempts. - wg.Wait() - - return kademliaDHT, nil -} diff --git a/host/discovery.go b/host/discovery.go new file mode 100644 index 00000000..2273d963 --- /dev/null +++ b/host/discovery.go @@ -0,0 +1,248 @@ +package host + +import ( + "context" + "fmt" + "time" + + dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/discovery/routing" + "github.com/libp2p/go-libp2p/p2p/discovery/util" + "github.com/multiformats/go-multiaddr" + "golang.org/x/sync/errgroup" + + "github.com/blocklessnetwork/b7s/models/blockless" +) + +func (h *Host) ConnectToKnownPeers(ctx context.Context) error { + + err := h.ConnectToBootNodes(ctx) + if err != nil { + return fmt.Errorf("could not connect to bootstrap nodes: %w", err) + } + + err = h.ConnectToDialbackPeers(ctx) + if err != nil { + h.log.Warn().Err(err).Msg("could not connect to dialback peers") + } + + // Spin up a goroutine to maintain connections to boot nodes in the background. + // In case boot nodes drops out, we want to connect back to it. + go func(ctx context.Context) { + ticker := time.NewTicker(h.cfg.BootNodesReachabilityCheckInterval) + for { + select { + case <-ticker.C: + err := h.ConnectToBootNodes(ctx) + if err != nil { + h.log.Warn().Err(err).Msg("could not connect to boot nodes") + } + + case <-ctx.Done(): + ticker.Stop() + h.log.Debug().Msg("stopping boot node reachability monitoring") + } + } + }(ctx) + + return nil +} + +func (h *Host) ConnectToBootNodes(ctx context.Context) error { + + // Bootstrap nodes we try to connect to on start. + var peers []blockless.Peer + for _, addr := range h.cfg.BootNodes { + + addrInfo, err := peer.AddrInfoFromP2pAddr(addr) + if err != nil { + + if h.cfg.MustReachBootNodes { + return fmt.Errorf("could not get boot node address info (address: %s): %w", addr.String(), err) + } + + h.log.Warn().Err(err).Str("address", addr.String()).Msg("could not get address info for boot node - skipping") + continue + } + + node := blockless.Peer{ + ID: addrInfo.ID, + AddrInfo: *addrInfo, + } + + peers = append(peers, node) + } + + err := h.connectToPeers(ctx, peers) + if err != nil { + if h.cfg.MustReachBootNodes { + return fmt.Errorf("could not connect to bootstrap nodes: %w", err) + } + + h.log.Error().Err(err).Msg("could not connect to bootstrap nodes") + } + + return nil +} + +func (h *Host) ConnectToDialbackPeers(ctx context.Context) error { + + // Dial-back peers are peers we're familiar with from before. + // We may want to limit the number of dial back peers we use. + added := uint(0) + addLimit := h.cfg.DialBackPeersLimit + + var peers []blockless.Peer + for _, peer := range h.cfg.DialBackPeers { + + // If the limit of dial-back peers is set and we've reached it - stop now. + if addLimit != 0 && added >= addLimit { + h.log.Info().Uint("limit", addLimit).Msg("reached limit for dial-back peers") + break + } + + // This should not happen anymore as we should have addresses, but in case it did - use the last known multiaddress. + if len(peer.AddrInfo.Addrs) == 0 { + + ma, err := multiaddr.NewMultiaddr(peer.MultiAddr) + if err != nil { + h.log.Warn().Str("peer", peer.ID.String()).Str("addr", peer.MultiAddr).Msg("invalid multiaddress for dial-back peer, skipping") + continue + } + + h.log.Debug().Str("peer", peer.ID.String()).Msg("using last known multiaddress for dial-back peer") + + peer.AddrInfo.Addrs = []multiaddr.Multiaddr{ma} + } + + peers = append(peers, peer) + added++ + } + + err := h.connectToPeers(ctx, peers) + if err != nil { + return fmt.Errorf("could not connect to dial-back peers: %w", err) + } + + return nil +} + +func (h *Host) connectToPeers(ctx context.Context, peers []blockless.Peer) error { + + // Connect to the bootstrap nodes. + var errGroup errgroup.Group + for _, peer := range peers { + peer := peer + + // Should not happen other than misconfig, but we shouldn't dial self. + if peer.ID == h.ID() { + continue + } + + // Skip peers we're already connected to. + connections := h.Network().ConnsToPeer(peer.ID) + if len(connections) > 0 { + continue + } + + errGroup.Go(func() error { + err := h.Host.Connect(ctx, peer.AddrInfo) + // Log errors because error group Wait() method will return only the first non-nil error. We would like to be aware of all of them. + if err != nil { + h.log.Error().Err(err).Str("peer", peer.ID.String()).Any("addr_info", peer.AddrInfo).Msg("could not connect to bootstrap node") + return err + } + + h.log.Debug().Str("peer", peer.ID.String()).Any("addr_info", peer.AddrInfo).Msg("connected to peer") + return nil + }) + } + + // Wait until we know the outcome of all connection attempts. + err := errGroup.Wait() + if err != nil { + return fmt.Errorf("some connections failed: %w", err) + } + + return nil +} + +func (h *Host) DiscoverPeers(ctx context.Context, topic string) error { + + // Initialize DHT. + dht, err := h.initDHT(ctx) + if err != nil { + return fmt.Errorf("could not initialize DHT: %w", err) + } + + discovery := routing.NewRoutingDiscovery(dht) + util.Advertise(ctx, discovery, topic) + + h.log.Debug().Msg("host started peer discovery") + + connected := uint(0) +findPeers: + for { + h.log.Trace().Msg("starting peer discovery") + + // Using a list instead of a channel. If this starts getting too large switch back. + // TODO: There's an upper limit config option, set a sane default. + peers, err := util.FindPeers(ctx, discovery, topic) + if err != nil { + return fmt.Errorf("could not find peers: %w", err) + } + + h.log.Trace().Int("count", len(peers)).Str("topic", topic).Msg("discovered peers") + + for _, peer := range peers { + // Skip self. + if peer.ID == h.ID() { + continue + } + + // Skip peers we're already connected to. + connections := h.Network().ConnsToPeer(peer.ID) + if len(connections) > 0 { + continue + } + + err = h.Connect(ctx, peer) + if err != nil { + h.log.Debug().Err(err).Str("peer", peer.ID.String()).Msg("could not connect to discovered peer") + continue + } + + h.log.Info().Str("peer", peer.ID.String()).Msg("connected to discovered peer") + + connected++ + + // Stop when we have reached connection threshold. + if connected >= h.cfg.ConnectionThreshold { + break findPeers + } + } + + time.Sleep(h.cfg.DiscoveryInterval) + } + + h.log.Info().Msg("peer discovery complete") + return nil +} + +func (h *Host) initDHT(ctx context.Context) (*dht.IpfsDHT, error) { + + // Start a DHT for use in peer discovery. Set the DHT to server mode. + kademliaDHT, err := dht.New(ctx, h.Host, dht.Mode(dht.ModeServer)) + if err != nil { + return nil, fmt.Errorf("could not create DHT: %w", err) + } + + // Bootstrap the DHT. + err = kademliaDHT.Bootstrap(ctx) + if err != nil { + return nil, fmt.Errorf("could not bootstrap the DHT: %w", err) + } + + return kademliaDHT, nil +} diff --git a/host/params.go b/host/params.go index 331c767d..0ee3c2e9 100644 --- a/host/params.go +++ b/host/params.go @@ -3,4 +3,6 @@ package host const ( // Sentinel error for DHT. errNoGoodAddresses = "no good addresses" + + defaultMustReachBootNodes = false ) diff --git a/node/notifiee.go b/node/notifiee.go index 1fcc43a8..2a103721 100644 --- a/node/notifiee.go +++ b/node/notifiee.go @@ -2,6 +2,7 @@ package node import ( "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" "github.com/rs/zerolog" @@ -29,21 +30,31 @@ func (n *connectionNotifiee) Connected(network network.Network, conn network.Con peerID := conn.RemotePeer() maddr := conn.RemoteMultiaddr() laddr := conn.LocalMultiaddr() - addrInfo := network.Peerstore().PeerInfo(peerID) - n.log.Debug(). - Str("peer", peerID.String()). - Str("remote_address", maddr.String()). - Str("local_address", laddr.String()). - Interface("addr_info", addrInfo). - Msg("peer connected") + // We could save only the mutliaddress from which we receive this connection. However, we could theoretically have multiple connections + // and there's no reason to limit ourselves to a single address. peer := blockless.Peer{ ID: peerID, MultiAddr: maddr.String(), - AddrInfo: addrInfo, + // AddrInfo struct basically repeats the above info (multiaddress). + AddrInfo: peer.AddrInfo{ + ID: peerID, + Addrs: make([]multiaddr.Multiaddr, 0), + }, + } + + for _, conn := range network.ConnsToPeer(conn.RemotePeer()) { + peer.AddrInfo.Addrs = append(peer.AddrInfo.Addrs, conn.RemoteMultiaddr()) } + n.log.Debug(). + Str("peer", peerID.String()). + Str("remote_address", maddr.String()). + Str("local_address", laddr.String()). + Any("addr_info", peer.AddrInfo). + Msg("peer connected") + // Store the peer info. err := n.store.SavePeer(peer) if err != nil { @@ -53,7 +64,6 @@ func (n *connectionNotifiee) Connected(network network.Network, conn network.Con func (n *connectionNotifiee) Disconnected(_ network.Network, conn network.Conn) { - // TODO: Check - do we want to remove peer after he's been disconnected. maddr := conn.RemoteMultiaddr() laddr := conn.LocalMultiaddr() diff --git a/node/run.go b/node/run.go index ce6b6f0f..95fb68d8 100644 --- a/node/run.go +++ b/node/run.go @@ -22,6 +22,11 @@ func (n *Node) Run(ctx context.Context) error { return fmt.Errorf("could not subscribe to topics: %w", err) } + err = n.host.ConnectToKnownPeers(ctx) + if err != nil { + return fmt.Errorf("could not connect to known peers: %w", err) + } + // Sync functions now in case they were removed from the storage. err = n.fstore.Sync(false) if err != nil {