Skip to content

Commit

Permalink
update node (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
AstaFrode authored Jul 7, 2023
1 parent de5d1ed commit e72eb61
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 121 deletions.
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Config struct {
type Option func(cfg *Config) error

const (
DefaultProtocolPrefix = "/kldr"
DevnetProtocolPrefix = "/kldr-devnet"
TestnetProtocolPrefix = "/kldr-testnet"
MainnetProtocolPrefix = "/kldr-mainnet"
Expand All @@ -39,6 +40,9 @@ const (
//
// This function consumes the config. Do not reuse it (really!).
func (cfg *Config) NewNode(ctx context.Context) (core.P2P, error) {
if cfg.ProtocolPrefix == "" {
cfg.ProtocolPrefix = DefaultProtocolPrefix
}
return core.NewBasicNode(ctx, cfg.ListenPort, cfg.Workspace, cfg.PrivatekeyPath, cfg.BootPeers, cfg.ConnManager, cfg.ProtocolPrefix, cfg.PublicIpv4)
}

Expand Down
200 changes: 82 additions & 118 deletions core/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync/atomic"
"time"

"github.com/CESSProject/p2p-go/out"
"github.com/CESSProject/p2p-go/pb"
ggio "github.com/gogo/protobuf/io"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -54,15 +55,14 @@ type P2P interface {
// Message protocol
Protocol

// GetRootCtx returns the root context of the host
GetRootCtx() context.Context
// GetCtxRoot returns the root context of the host
GetCtxRoot() context.Context

// GetDiscoverSt returns whether the discovery service is running
GetDiscoverSt() bool
// GetCtxCancelFromRoot returns the cancel context from root context
GetCtxCancelFromRoot() context.Context

// StartDiscover starts the node discovery service, If you have
// already started the service, calling it again will have no effect.
StartDiscover()
// GetCtxQueryFromCtxCancel returns tne query context from cancel context
GetCtxQueryFromCtxCancel() context.Context

// PrivatekeyPath returns the key file location
PrivatekeyPath() string
Expand Down Expand Up @@ -100,12 +100,15 @@ type P2P interface {
// FindPeer searches for a peer with given ID.
DHTFindPeer(peerid string) (peer.AddrInfo, error)

// Close p2p
Close() error

//
GetDiscoveredPeers() <-chan *routing.QueryEvent

// RouteTableFindPeers
RouteTableFindPeers(limit int) (<-chan peer.AddrInfo, error)

// DiscoveredPeer returns the node channel discovered by the host
DiscoveredPeer() <-chan peer.AddrInfo

// GetIdleDataCh returns the idle data channel received by the host
GetIdleDataCh() <-chan string

Expand All @@ -118,28 +121,26 @@ type P2P interface {

// Node type - Implementation of a P2P Host
type Node struct {
ctx context.Context
ctxCancel context.Context
ctxReg context.Context
cancelFunc context.CancelFunc
discoverEvent <-chan *routing.QueryEvent
host host.Host
dir DataDirs
peerPublickey []byte
workspace string
privatekeyPath string
idleTee atomic.Value
serviceTee atomic.Value
idleDataCh chan string
idleTagDataCh chan string
serviceTagDataCh chan string
protocolVersion string
dhtProtocolVersion string
rendezvousVersion string
protocolPrefix string
discoverStat atomic.Uint32
bootstrap []string
discoveredPeerCh chan peer.AddrInfo
ctxRoot context.Context
ctxCancelFromRoot context.Context
ctxQueryFromCtxCancel context.Context
ctxCancelFuncFromRoot context.CancelFunc
discoveredPeerCh <-chan *routing.QueryEvent
host host.Host
dir DataDirs
peerPublickey []byte
workspace string
privatekeyPath string
idleTee atomic.Value
serviceTee atomic.Value
idleDataCh chan string
idleTagDataCh chan string
serviceTagDataCh chan string
protocolVersion string
dhtProtocolVersion string
rendezvousVersion string
protocolPrefix string
bootstrap []string
*dht.IpfsDHT
*drouting.RoutingDiscovery
*protocols
Expand All @@ -164,16 +165,11 @@ func NewBasicNode(
protocolPrefix string,
publicip string,
) (P2P, error) {
if protocolPrefix == "" {
protocolPrefix = "/kldr"
}

//fmt.Println("bootstrap:", bootstrap)
var boots = make([]string, 0)
for _, b := range bootstrap {
bootnodes, err := ParseMultiaddrs(b)
if err != nil {
log.Printf("Err: %v", err)
out.Err(err.Error())
continue
}
boots = append(boots, bootnodes...)
Expand Down Expand Up @@ -246,27 +242,25 @@ func NewBasicNode(
ctxreg, events := routing.RegisterForQueryEvents(ctxcancel)

n := &Node{
ctx: ctx,
ctxCancel: ctxcancel,
ctxReg: ctxreg,
cancelFunc: cancel,
discoverEvent: events,
host: bhost,
workspace: workspace,
privatekeyPath: privatekeypath,
dir: dataDir,
peerPublickey: publickey,
idleDataCh: make(chan string, 1),
idleTagDataCh: make(chan string, 1),
serviceTagDataCh: make(chan string, 1),
protocolVersion: protocolPrefix + p2pProtocolVer,
dhtProtocolVersion: protocolPrefix + dhtProtocolVer,
rendezvousVersion: protocolPrefix + rendezvous,
protocolPrefix: protocolPrefix,
discoverStat: atomic.Uint32{},
bootstrap: boots,
discoveredPeerCh: make(chan peer.AddrInfo, 1000),
protocols: NewProtocol(),
ctxRoot: ctx,
ctxCancelFromRoot: ctxcancel,
ctxQueryFromCtxCancel: ctxreg,
ctxCancelFuncFromRoot: cancel,
discoveredPeerCh: events,
host: bhost,
workspace: workspace,
privatekeyPath: privatekeypath,
dir: dataDir,
peerPublickey: publickey,
idleDataCh: make(chan string, 1),
idleTagDataCh: make(chan string, 1),
serviceTagDataCh: make(chan string, 1),
protocolVersion: protocolPrefix + p2pProtocolVer,
dhtProtocolVersion: protocolPrefix + dhtProtocolVer,
rendezvousVersion: protocolPrefix + rendezvous,
protocolPrefix: protocolPrefix,
bootstrap: boots,
protocols: NewProtocol(),
}

err = n.initDHT()
Expand All @@ -276,23 +270,29 @@ func NewBasicNode(

n.initProtocol(protocolPrefix)

go n.discoverPeers()

return n, nil
}

// FindPeer searches for a peer with given ID.
// DHTFindPeer searches for a peer with given ID.
func (n *Node) DHTFindPeer(peerid string) (peer.AddrInfo, error) {
id, err := peer.Decode(peerid)
if err != nil {
return peer.AddrInfo{}, err
}
return n.IpfsDHT.FindPeer(n.ctxReg, id)
return n.IpfsDHT.FindPeer(n.ctxQueryFromCtxCancel, id)
}

// RouteTableFindPeers
func (n *Node) RouteTableFindPeers(limit int) (<-chan peer.AddrInfo, error) {
return n.RoutingDiscovery.FindPeers(n.ctxReg, n.rendezvousVersion, discovery.Limit(limit))
if limit <= 0 {
return n.RoutingDiscovery.FindPeers(n.ctxQueryFromCtxCancel, n.rendezvousVersion)
}
return n.RoutingDiscovery.FindPeers(n.ctxQueryFromCtxCancel, n.rendezvousVersion, discovery.Limit(limit))
}

// GetDiscoveredPeers
func (n *Node) GetDiscoveredPeers() <-chan *routing.QueryEvent {
return n.discoveredPeerCh
}

// ID returns the (local) peer.ID associated with this Host
Expand Down Expand Up @@ -357,15 +357,14 @@ func (n *Node) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (n

// Close shuts down the host, its Network, and services.
func (n *Node) Close() error {
n.cancelFunc()
err := n.host.Close()
if err != nil {
return err
}
n.ctxCancelFuncFromRoot()
close(n.idleDataCh)
close(n.idleTagDataCh)
close(n.serviceTagDataCh)
close(n.discoveredPeerCh)
return nil
}

Expand All @@ -379,18 +378,10 @@ func (n *Node) EventBus() event.Bus {
return n.host.EventBus()
}

func (n *Node) GetDiscoverSt() bool {
return n.discoverStat.Load() > 0
}

func (n *Node) GetProtocolPrefix() string {
return n.protocolPrefix
}

func (n *Node) StartDiscover() {
go n.discoverPeers()
}

func (n *Node) AddMultiaddrToPeerstore(multiaddr string, t time.Duration) (peer.ID, error) {
time := peerstore.RecentlyConnectedAddrTTL
if t.Seconds() > 0 {
Expand Down Expand Up @@ -420,10 +411,6 @@ func (n *Node) Workspace() string {
return n.workspace
}

func (n *Node) DiscoveredPeer() <-chan peer.AddrInfo {
return n.discoveredPeerCh
}

func (n *Node) GetPeerPublickey() []byte {
return n.peerPublickey
}
Expand All @@ -448,8 +435,16 @@ func (n *Node) SetBootstraps(bootstrap []string) {
n.bootstrap = bootstrap
}

func (n *Node) GetRootCtx() context.Context {
return n.ctx
func (n *Node) GetCtxRoot() context.Context {
return n.ctxRoot
}

func (n *Node) GetCtxCancelFromRoot() context.Context {
return n.ctxCancelFromRoot
}

func (n *Node) GetCtxQueryFromCtxCancel() context.Context {
return n.ctxQueryFromCtxCancel
}

func (n *Node) GetDirs() DataDirs {
Expand Down Expand Up @@ -672,39 +667,6 @@ func verifyWorkspace(ws string) error {
return nil
}

func (n *Node) discoverPeers() {
if n.discoverStat.Load() > 0 {
return
}
ctxCancel, cancel := context.WithCancel(n.ctxReg)
defer func() {
recover()
cancel()
n.discoverStat.Store(0)
}()

n.discoverStat.Add(1)
time.Sleep(time.Second)
if n.discoverStat.Load() != 1 {
return
}

routingDiscovery := drouting.NewRoutingDiscovery(n.IpfsDHT)

dutil.Advertise(n.ctxReg, routingDiscovery, n.rendezvousVersion)

go findPeers(ctxCancel, routingDiscovery, n.rendezvousVersion)

for {
select {
case peer := <-n.discoverEvent:
for _, v := range peer.Responses {
n.discoveredPeerCh <- *v
}
}
}
}

func findPeers(ctx context.Context, routingDiscovery *drouting.RoutingDiscovery, rendezvous string) {
log.Println("Start discover service")

Expand Down Expand Up @@ -759,12 +721,12 @@ func (n *Node) initDHT() error {
// client because we want each peer to maintain its own local copy of the
// DHT, so that the bootstrapping node of the DHT can go down without
// inhibiting future peer discovery.
kademliaDHT, err := dht.New(n.ctxReg, n.host, options...)
kademliaDHT, err := dht.New(n.ctxQueryFromCtxCancel, n.host, options...)
if err != nil {
return err
}

if err = kademliaDHT.Bootstrap(n.ctxReg); err != nil {
if err = kademliaDHT.Bootstrap(n.ctxQueryFromCtxCancel); err != nil {
return err
}

Expand All @@ -775,14 +737,16 @@ func (n *Node) initDHT() error {
}
peerinfo, _ := peer.AddrInfoFromP2pAddr(bootstrapAddr)
kademliaDHT.RoutingTable().PeerAdded(peerinfo.ID)
err = n.host.Connect(n.ctxReg, *peerinfo)
err = n.host.Connect(n.ctxQueryFromCtxCancel, *peerinfo)
if err != nil {
log.Println("Failed to connect to the bootstrap node: ", peerinfo.ID.Pretty())
out.Err(fmt.Sprintf("Connection to boot node failed: %s", peerinfo.ID.Pretty()))
} else {
log.Println("Connected to the bootstrap node: ", peerinfo.ID.Pretty())
out.Ok(fmt.Sprintf("Connection to boot node successful: %s", peerinfo.ID.Pretty()))
}
}

n.IpfsDHT = kademliaDHT
n.RoutingDiscovery = drouting.NewRoutingDiscovery(n.IpfsDHT)
dutil.Advertise(n.ctxQueryFromCtxCancel, n.RoutingDiscovery, n.rendezvousVersion)
return nil
}
4 changes: 4 additions & 0 deletions core/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func GetLocalIp() ([]string, error) {
for _, address := range addrs {
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
ip := ipnet.IP.String()
if ip[len(ip)-1] == byte(49) && ip[len(ip)-3] == byte(48) {
continue
}
result = append(result, ipnet.IP.String())
}
}
Expand Down
Loading

0 comments on commit e72eb61

Please sign in to comment.