Skip to content

Commit

Permalink
Base discover2 (#51)
Browse files Browse the repository at this point in the history
* update discover function

* Increase the buf of node discovery
  • Loading branch information
AstaFrode authored Jul 6, 2023
1 parent 7fb60c4 commit de5d1ed
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 75 deletions.
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Config struct {
Workspace string
PrivatekeyPath string
ProtocolPrefix string
PublicIpv4 string
}

// Option is a libp2p config option that can be given to the libp2p constructor
Expand All @@ -38,7 +39,7 @@ const (
//
// This function consumes the config. Do not reuse it (really!).
func (cfg *Config) NewNode(ctx context.Context) (core.P2P, error) {
return core.NewBasicNode(ctx, cfg.ListenPort, cfg.Workspace, cfg.PrivatekeyPath, cfg.BootPeers, cfg.ConnManager, cfg.ProtocolPrefix)
return core.NewBasicNode(ctx, cfg.ListenPort, cfg.Workspace, cfg.PrivatekeyPath, cfg.BootPeers, cfg.ConnManager, cfg.ProtocolPrefix, cfg.PublicIpv4)
}

// Apply applies the given options to the config, returning the first error
Expand Down
2 changes: 0 additions & 2 deletions core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ const privatekeyFile = ".private"
const AllIpAddress = "0.0.0.0"
const LocalAddress = "127.0.0.1"

const Rendezvous = "/rendezvous/1.0.0"

// byte size
const (
SIZE_1KiB = 1024
Expand Down
4 changes: 2 additions & 2 deletions core/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ type FileProtocol struct {
*Node
}

func (n *Node) NewFileProtocol(protocolPrefix string) *FileProtocol {
func (n *Node) NewFileProtocol() *FileProtocol {
e := FileProtocol{Node: n}
n.SetStreamHandler(protocol.ID(protocolPrefix+FILE_PROTOCOL), e.onFileRequest)
n.SetStreamHandler(protocol.ID(n.protocolPrefix+FILE_PROTOCOL), e.onFileRequest)
return &e
}

Expand Down
159 changes: 102 additions & 57 deletions core/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -83,6 +82,9 @@ type P2P interface {
// GetDhtProtocolVersion returns the host's DHT ProtocolVersion
GetDhtProtocolVersion() string

// GetRendezvousVersion returns the rendezvous protocol
GetRendezvousVersion() string

// GetProtocolPrefix returns protocols prefix
GetProtocolPrefix() string

Expand All @@ -95,6 +97,12 @@ type P2P interface {
// SetBootstraps updates the host's bootstrap list
SetBootstraps(bootstrap []string)

// FindPeer searches for a peer with given ID.
DHTFindPeer(peerid string) (peer.AddrInfo, error)

// RouteTableFindPeers
RouteTableFindPeers(limit int) (<-chan peer.AddrInfo, error)

// DiscoveredPeer returns the node channel discovered by the host
DiscoveredPeer() <-chan peer.AddrInfo

Expand Down Expand Up @@ -127,10 +135,13 @@ type Node struct {
serviceTagDataCh chan string
protocolVersion string
dhtProtocolVersion string
rendezvousVersion string
protocolPrefix string
discoverStat atomic.Uint32
bootstrap []string
discoveredPeerCh chan peer.AddrInfo
*dht.IpfsDHT
*drouting.RoutingDiscovery
*protocols
}

Expand All @@ -151,16 +162,18 @@ func NewBasicNode(
bootstrap []string,
cmgr connmgr.ConnManager,
protocolPrefix string,
publicip string,
) (P2P, error) {

if protocolPrefix == "" {
protocolPrefix = "/kldr"
}

//fmt.Println("bootstrap:", bootstrap)
var boots = make([]string, 0)
for _, b := range bootstrap {
bootnodes, err := ParseMultiaddrs(b)
if err != nil {
log.Printf("Err: %v", err)
continue
}
boots = append(boots, bootnodes...)
Expand All @@ -177,11 +190,19 @@ func NewBasicNode(

var opts []libp2p.Option
var multiaddrs []ma.Multiaddr

externalIp, err := GetExternalIp()
if err == nil {
if publicip == "" {
externalIp, err := GetExternalIp()
if err != nil {
return nil, err
}
extMultiAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", externalIp, port))
multiaddrs = append(multiaddrs, extMultiAddr)
} else {
if !IsIPv4(publicip) {
return nil, errors.New("invalid ipv4")
}
extMultiAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", publicip, port))
multiaddrs = append(multiaddrs, extMultiAddr)
}

addressFactory := func(addrs []ma.Multiaddr) []ma.Multiaddr {
Expand All @@ -199,6 +220,7 @@ func NewBasicNode(
libp2p.ProtocolVersion(protocolPrefix+p2pProtocolVer),
libp2p.DefaultMuxers,
libp2p.AddrsFactory(addressFactory),
libp2p.DefaultEnableRelay,
)

bhost, err := libp2p.New(opts...)
Expand All @@ -207,7 +229,7 @@ func NewBasicNode(
}

if !bhost.ID().MatchesPrivateKey(prvKey) {
return nil, errors.New("")
return nil, errors.New("invalid private key")
}

publickey, err := base58.Decode(bhost.ID().String())
Expand Down Expand Up @@ -239,20 +261,40 @@ func NewBasicNode(
serviceTagDataCh: make(chan string, 1),
protocolVersion: protocolPrefix + p2pProtocolVer,
dhtProtocolVersion: protocolPrefix + dhtProtocolVer,
rendezvousVersion: protocolPrefix + rendezvous,
protocolPrefix: protocolPrefix,
discoverStat: atomic.Uint32{},
bootstrap: boots,
discoveredPeerCh: make(chan peer.AddrInfo, 600),
discoveredPeerCh: make(chan peer.AddrInfo, 1000),
protocols: NewProtocol(),
}

err = n.initDHT()
if err != nil {
return nil, err
}

n.initProtocol(protocolPrefix)

go n.discoverPeers(n.ctxReg, n.host, n.protocolPrefix, n.dhtProtocolVersion, n.bootstrap)
go n.discoverPeers()

return n, nil
}

// FindPeer searches for a peer with given ID.
func (n *Node) DHTFindPeer(peerid string) (peer.AddrInfo, error) {
id, err := peer.Decode(peerid)
if err != nil {
return peer.AddrInfo{}, err
}
return n.IpfsDHT.FindPeer(n.ctxReg, id)
}

// RouteTableFindPeers
func (n *Node) RouteTableFindPeers(limit int) (<-chan peer.AddrInfo, error) {
return n.RoutingDiscovery.FindPeers(n.ctxReg, n.rendezvousVersion, discovery.Limit(limit))
}

// ID returns the (local) peer.ID associated with this Host
func (n *Node) ID() peer.ID {
return n.host.ID()
Expand Down Expand Up @@ -346,7 +388,7 @@ func (n *Node) GetProtocolPrefix() string {
}

func (n *Node) StartDiscover() {
go n.discoverPeers(n.ctxReg, n.host, n.protocolPrefix, n.dhtProtocolVersion, n.bootstrap)
go n.discoverPeers()
}

func (n *Node) AddMultiaddrToPeerstore(multiaddr string, t time.Duration) (peer.ID, error) {
Expand Down Expand Up @@ -394,6 +436,10 @@ func (n *Node) GetDhtProtocolVersion() string {
return n.dhtProtocolVersion
}

func (n *Node) GetRendezvousVersion() string {
return n.rendezvousVersion
}

func (n *Node) GetBootstraps() []string {
return n.bootstrap
}
Expand Down Expand Up @@ -626,11 +672,11 @@ func verifyWorkspace(ws string) error {
return nil
}

func (n *Node) discoverPeers(ctx context.Context, h host.Host, protocolPrefix, dhtProtocolVersion string, bootstrap []string) {
func (n *Node) discoverPeers() {
if n.discoverStat.Load() > 0 {
return
}
ctxCancel, cancel := context.WithCancel(ctx)
ctxCancel, cancel := context.WithCancel(n.ctxReg)
defer func() {
recover()
cancel()
Expand All @@ -643,15 +689,11 @@ func (n *Node) discoverPeers(ctx context.Context, h host.Host, protocolPrefix, d
return
}

kademliaDHT, err := initDHT(ctx, h, dhtProtocolVersion, bootstrap)
if err != nil {
return
}
routingDiscovery := drouting.NewRoutingDiscovery(n.IpfsDHT)

routingDiscovery := drouting.NewRoutingDiscovery(kademliaDHT)
dutil.Advertise(ctx, routingDiscovery, protocolPrefix+Rendezvous)
dutil.Advertise(n.ctxReg, routingDiscovery, n.rendezvousVersion)

go findPeers(ctxCancel, routingDiscovery, protocolPrefix+Rendezvous)
go findPeers(ctxCancel, routingDiscovery, n.rendezvousVersion)

for {
select {
Expand All @@ -664,7 +706,7 @@ func (n *Node) discoverPeers(ctx context.Context, h host.Host, protocolPrefix, d
}

func findPeers(ctx context.Context, routingDiscovery *drouting.RoutingDiscovery, rendezvous string) {
var ok bool
log.Println("Start discover service")

tick := time.NewTicker(time.Minute)
defer tick.Stop()
Expand All @@ -674,70 +716,73 @@ func findPeers(ctx context.Context, routingDiscovery *drouting.RoutingDiscovery,
case <-ctx.Done():
return
case <-tick.C:
peerChan, err := routingDiscovery.FindPeers(ctx, rendezvous, discovery.Limit(300))
if err == nil {
for {
_, ok = <-peerChan
if !ok {
break
}
}
}
routingDiscovery.FindPeers(ctx, rendezvous, discovery.Limit(300))
}
}
}

func (n *Node) initProtocol(protocolPrefix string) {
n.SetProtocolPrefix(protocolPrefix)
n.WriteFileProtocol = n.NewWriteFileProtocol(protocolPrefix)
n.ReadFileProtocol = n.NewReadFileProtocol(protocolPrefix)
n.WriteFileProtocol = n.NewWriteFileProtocol()
n.ReadFileProtocol = n.NewReadFileProtocol()
n.CustomDataTagProtocol = n.NewCustomDataTagProtocol()
n.IdleDataTagProtocol = n.NewIdleDataTagProtocol()
n.FileProtocol = n.NewFileProtocol(protocolPrefix)
n.FileProtocol = n.NewFileProtocol()
n.AggrProofProtocol = n.NewAggrProofProtocol()
n.PushTagProtocol = n.NewPushTagProtocol(protocolPrefix)
n.PushTagProtocol = n.NewPushTagProtocol()
}

func initDHT(ctx context.Context, h host.Host, dhtProtocolVersion string, bootstrap []string) (*dht.IpfsDHT, error) {
func (n *Node) initDHT() error {
var options []dht.Option
options = append(options, dht.V1ProtocolOverride(protocol.ID(dhtProtocolVersion)))

if len(bootstrap) == 0 {
options = append(options, dht.Mode(dht.ModeServer))
options = append(options,
dht.Mode(dht.ModeAutoServer),
dht.V1ProtocolOverride(protocol.ID(n.dhtProtocolVersion)),
dht.Resiliency(10),
)
bootstrap := n.bootstrap
var bootaddrs []peer.AddrInfo
for _, v := range bootstrap {
muladdr, err := ma.NewMultiaddr(v)
if err != nil {
continue
}
addrinfo, err := peer.AddrInfoFromP2pAddr(muladdr)
if err != nil {
continue
}
bootaddrs = append(bootaddrs, *addrinfo)
}
if len(bootaddrs) > 0 {
options = append(options, dht.BootstrapPeers(bootaddrs...))
}

// Start a DHT, for use in peer discovery. We can't just make a new DHT
// client because we want each peer to maintain its own local copy of the
// DHT, so that the bootstrapping node of the DHT can go down without
// inhibiting future peer discovery.
kademliaDHT, err := dht.New(ctx, h, options...)
kademliaDHT, err := dht.New(n.ctxReg, n.host, options...)
if err != nil {
return nil, err
return err
}
if err = kademliaDHT.Bootstrap(ctx); err != nil {
return nil, err

if err = kademliaDHT.Bootstrap(n.ctxReg); err != nil {
return err
}

var wg sync.WaitGroup
for _, peerAddr := range bootstrap {
bootstrapAddr, err := ma.NewMultiaddr(peerAddr)
if err != nil {
continue
}

peerinfo, _ := peer.AddrInfoFromP2pAddr(bootstrapAddr)
wg.Add(1)
go func() {
defer wg.Done()
err := h.Connect(ctx, *peerinfo)
if err != nil {
log.Println("Failed to connect to the bootstrap node: ", peerinfo.ID.Pretty())
} else {
log.Println("Connected to the bootstrap node: ", peerinfo.ID.Pretty())
}
}()
kademliaDHT.RoutingTable().PeerAdded(peerinfo.ID)
err = n.host.Connect(n.ctxReg, *peerinfo)
if err != nil {
log.Println("Failed to connect to the bootstrap node: ", peerinfo.ID.Pretty())
} else {
log.Println("Connected to the bootstrap node: ", peerinfo.ID.Pretty())
}
}
wg.Wait()

return kademliaDHT, nil
n.IpfsDHT = kademliaDHT
n.RoutingDiscovery = drouting.NewRoutingDiscovery(n.IpfsDHT)
return nil
}
4 changes: 2 additions & 2 deletions core/pushTag.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ type PushTagProtocol struct {
*Node
}

func (n *Node) NewPushTagProtocol(protocolPrefix string) *PushTagProtocol {
func (n *Node) NewPushTagProtocol() *PushTagProtocol {
e := PushTagProtocol{Node: n}
n.SetStreamHandler(protocol.ID(protocolPrefix+PushTag_Protocol), e.onPushTagRequest)
n.SetStreamHandler(protocol.ID(n.protocolPrefix+PushTag_Protocol), e.onPushTagRequest)
return &e
}

Expand Down
6 changes: 3 additions & 3 deletions core/readfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ type ReadFileProtocol struct {
requests map[string]*readMsgResp // determine whether it is your own response
}

func (n *Node) NewReadFileProtocol(protocolPrefix string) *ReadFileProtocol {
func (n *Node) NewReadFileProtocol() *ReadFileProtocol {
e := ReadFileProtocol{Node: n, Mutex: new(sync.Mutex), requests: make(map[string]*readMsgResp)}
n.SetStreamHandler(protocol.ID(protocolPrefix+readFileRequest), e.onReadFileRequest)
n.SetStreamHandler(protocol.ID(protocolPrefix+readFileResponse), e.onReadFileResponse)
n.SetStreamHandler(protocol.ID(n.protocolPrefix+readFileRequest), e.onReadFileRequest)
n.SetStreamHandler(protocol.ID(n.protocolPrefix+readFileResponse), e.onReadFileResponse)
return &e
}

Expand Down
1 change: 1 addition & 0 deletions core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const (
const (
p2pProtocolVer = "/1.0"
dhtProtocolVer = "/kad/1.0"
rendezvous = "/rendezvous/1.0.0"
)

var (
Expand Down
Loading

0 comments on commit de5d1ed

Please sign in to comment.