diff --git a/config/config.go b/config/config.go index 2a264b7..87e8fb9 100644 --- a/config/config.go +++ b/config/config.go @@ -30,6 +30,7 @@ type Config struct { type Option func(cfg *Config) error const ( + DefaultProtocolPrefix = "/kldr" DevnetProtocolPrefix = "/kldr-devnet" TestnetProtocolPrefix = "/kldr-testnet" MainnetProtocolPrefix = "/kldr-mainnet" @@ -39,6 +40,9 @@ const ( // // This function consumes the config. Do not reuse it (really!). func (cfg *Config) NewNode(ctx context.Context) (core.P2P, error) { + if cfg.ProtocolPrefix == "" { + cfg.ProtocolPrefix = DefaultProtocolPrefix + } return core.NewBasicNode(ctx, cfg.ListenPort, cfg.Workspace, cfg.PrivatekeyPath, cfg.BootPeers, cfg.ConnManager, cfg.ProtocolPrefix, cfg.PublicIpv4) } diff --git a/core/node.go b/core/node.go index b0f5ecd..c5b14e4 100644 --- a/core/node.go +++ b/core/node.go @@ -20,6 +20,7 @@ import ( "sync/atomic" "time" + "github.com/CESSProject/p2p-go/out" "github.com/CESSProject/p2p-go/pb" ggio "github.com/gogo/protobuf/io" "github.com/gogo/protobuf/proto" @@ -54,15 +55,14 @@ type P2P interface { // Message protocol Protocol - // GetRootCtx returns the root context of the host - GetRootCtx() context.Context + // GetCtxRoot returns the root context of the host + GetCtxRoot() context.Context - // GetDiscoverSt returns whether the discovery service is running - GetDiscoverSt() bool + // GetCtxCancelFromRoot returns the cancel context from root context + GetCtxCancelFromRoot() context.Context - // StartDiscover starts the node discovery service, If you have - // already started the service, calling it again will have no effect. - StartDiscover() + // GetCtxQueryFromCtxCancel returns tne query context from cancel context + GetCtxQueryFromCtxCancel() context.Context // PrivatekeyPath returns the key file location PrivatekeyPath() string @@ -100,12 +100,15 @@ type P2P interface { // FindPeer searches for a peer with given ID. DHTFindPeer(peerid string) (peer.AddrInfo, error) + // Close p2p + Close() error + + // + GetDiscoveredPeers() <-chan *routing.QueryEvent + // RouteTableFindPeers RouteTableFindPeers(limit int) (<-chan peer.AddrInfo, error) - // DiscoveredPeer returns the node channel discovered by the host - DiscoveredPeer() <-chan peer.AddrInfo - // GetIdleDataCh returns the idle data channel received by the host GetIdleDataCh() <-chan string @@ -118,28 +121,26 @@ type P2P interface { // Node type - Implementation of a P2P Host type Node struct { - ctx context.Context - ctxCancel context.Context - ctxReg context.Context - cancelFunc context.CancelFunc - discoverEvent <-chan *routing.QueryEvent - host host.Host - dir DataDirs - peerPublickey []byte - workspace string - privatekeyPath string - idleTee atomic.Value - serviceTee atomic.Value - idleDataCh chan string - idleTagDataCh chan string - serviceTagDataCh chan string - protocolVersion string - dhtProtocolVersion string - rendezvousVersion string - protocolPrefix string - discoverStat atomic.Uint32 - bootstrap []string - discoveredPeerCh chan peer.AddrInfo + ctxRoot context.Context + ctxCancelFromRoot context.Context + ctxQueryFromCtxCancel context.Context + ctxCancelFuncFromRoot context.CancelFunc + discoveredPeerCh <-chan *routing.QueryEvent + host host.Host + dir DataDirs + peerPublickey []byte + workspace string + privatekeyPath string + idleTee atomic.Value + serviceTee atomic.Value + idleDataCh chan string + idleTagDataCh chan string + serviceTagDataCh chan string + protocolVersion string + dhtProtocolVersion string + rendezvousVersion string + protocolPrefix string + bootstrap []string *dht.IpfsDHT *drouting.RoutingDiscovery *protocols @@ -164,16 +165,11 @@ func NewBasicNode( protocolPrefix string, publicip string, ) (P2P, error) { - if protocolPrefix == "" { - protocolPrefix = "/kldr" - } - - //fmt.Println("bootstrap:", bootstrap) var boots = make([]string, 0) for _, b := range bootstrap { bootnodes, err := ParseMultiaddrs(b) if err != nil { - log.Printf("Err: %v", err) + out.Err(err.Error()) continue } boots = append(boots, bootnodes...) @@ -246,27 +242,25 @@ func NewBasicNode( ctxreg, events := routing.RegisterForQueryEvents(ctxcancel) n := &Node{ - ctx: ctx, - ctxCancel: ctxcancel, - ctxReg: ctxreg, - cancelFunc: cancel, - discoverEvent: events, - host: bhost, - workspace: workspace, - privatekeyPath: privatekeypath, - dir: dataDir, - peerPublickey: publickey, - idleDataCh: make(chan string, 1), - idleTagDataCh: make(chan string, 1), - serviceTagDataCh: make(chan string, 1), - protocolVersion: protocolPrefix + p2pProtocolVer, - dhtProtocolVersion: protocolPrefix + dhtProtocolVer, - rendezvousVersion: protocolPrefix + rendezvous, - protocolPrefix: protocolPrefix, - discoverStat: atomic.Uint32{}, - bootstrap: boots, - discoveredPeerCh: make(chan peer.AddrInfo, 1000), - protocols: NewProtocol(), + ctxRoot: ctx, + ctxCancelFromRoot: ctxcancel, + ctxQueryFromCtxCancel: ctxreg, + ctxCancelFuncFromRoot: cancel, + discoveredPeerCh: events, + host: bhost, + workspace: workspace, + privatekeyPath: privatekeypath, + dir: dataDir, + peerPublickey: publickey, + idleDataCh: make(chan string, 1), + idleTagDataCh: make(chan string, 1), + serviceTagDataCh: make(chan string, 1), + protocolVersion: protocolPrefix + p2pProtocolVer, + dhtProtocolVersion: protocolPrefix + dhtProtocolVer, + rendezvousVersion: protocolPrefix + rendezvous, + protocolPrefix: protocolPrefix, + bootstrap: boots, + protocols: NewProtocol(), } err = n.initDHT() @@ -276,23 +270,29 @@ func NewBasicNode( n.initProtocol(protocolPrefix) - go n.discoverPeers() - return n, nil } -// FindPeer searches for a peer with given ID. +// DHTFindPeer searches for a peer with given ID. func (n *Node) DHTFindPeer(peerid string) (peer.AddrInfo, error) { id, err := peer.Decode(peerid) if err != nil { return peer.AddrInfo{}, err } - return n.IpfsDHT.FindPeer(n.ctxReg, id) + return n.IpfsDHT.FindPeer(n.ctxQueryFromCtxCancel, id) } // RouteTableFindPeers func (n *Node) RouteTableFindPeers(limit int) (<-chan peer.AddrInfo, error) { - return n.RoutingDiscovery.FindPeers(n.ctxReg, n.rendezvousVersion, discovery.Limit(limit)) + if limit <= 0 { + return n.RoutingDiscovery.FindPeers(n.ctxQueryFromCtxCancel, n.rendezvousVersion) + } + return n.RoutingDiscovery.FindPeers(n.ctxQueryFromCtxCancel, n.rendezvousVersion, discovery.Limit(limit)) +} + +// GetDiscoveredPeers +func (n *Node) GetDiscoveredPeers() <-chan *routing.QueryEvent { + return n.discoveredPeerCh } // ID returns the (local) peer.ID associated with this Host @@ -357,15 +357,14 @@ func (n *Node) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (n // Close shuts down the host, its Network, and services. func (n *Node) Close() error { - n.cancelFunc() err := n.host.Close() if err != nil { return err } + n.ctxCancelFuncFromRoot() close(n.idleDataCh) close(n.idleTagDataCh) close(n.serviceTagDataCh) - close(n.discoveredPeerCh) return nil } @@ -379,18 +378,10 @@ func (n *Node) EventBus() event.Bus { return n.host.EventBus() } -func (n *Node) GetDiscoverSt() bool { - return n.discoverStat.Load() > 0 -} - func (n *Node) GetProtocolPrefix() string { return n.protocolPrefix } -func (n *Node) StartDiscover() { - go n.discoverPeers() -} - func (n *Node) AddMultiaddrToPeerstore(multiaddr string, t time.Duration) (peer.ID, error) { time := peerstore.RecentlyConnectedAddrTTL if t.Seconds() > 0 { @@ -420,10 +411,6 @@ func (n *Node) Workspace() string { return n.workspace } -func (n *Node) DiscoveredPeer() <-chan peer.AddrInfo { - return n.discoveredPeerCh -} - func (n *Node) GetPeerPublickey() []byte { return n.peerPublickey } @@ -448,8 +435,16 @@ func (n *Node) SetBootstraps(bootstrap []string) { n.bootstrap = bootstrap } -func (n *Node) GetRootCtx() context.Context { - return n.ctx +func (n *Node) GetCtxRoot() context.Context { + return n.ctxRoot +} + +func (n *Node) GetCtxCancelFromRoot() context.Context { + return n.ctxCancelFromRoot +} + +func (n *Node) GetCtxQueryFromCtxCancel() context.Context { + return n.ctxQueryFromCtxCancel } func (n *Node) GetDirs() DataDirs { @@ -672,39 +667,6 @@ func verifyWorkspace(ws string) error { return nil } -func (n *Node) discoverPeers() { - if n.discoverStat.Load() > 0 { - return - } - ctxCancel, cancel := context.WithCancel(n.ctxReg) - defer func() { - recover() - cancel() - n.discoverStat.Store(0) - }() - - n.discoverStat.Add(1) - time.Sleep(time.Second) - if n.discoverStat.Load() != 1 { - return - } - - routingDiscovery := drouting.NewRoutingDiscovery(n.IpfsDHT) - - dutil.Advertise(n.ctxReg, routingDiscovery, n.rendezvousVersion) - - go findPeers(ctxCancel, routingDiscovery, n.rendezvousVersion) - - for { - select { - case peer := <-n.discoverEvent: - for _, v := range peer.Responses { - n.discoveredPeerCh <- *v - } - } - } -} - func findPeers(ctx context.Context, routingDiscovery *drouting.RoutingDiscovery, rendezvous string) { log.Println("Start discover service") @@ -759,12 +721,12 @@ func (n *Node) initDHT() error { // client because we want each peer to maintain its own local copy of the // DHT, so that the bootstrapping node of the DHT can go down without // inhibiting future peer discovery. - kademliaDHT, err := dht.New(n.ctxReg, n.host, options...) + kademliaDHT, err := dht.New(n.ctxQueryFromCtxCancel, n.host, options...) if err != nil { return err } - if err = kademliaDHT.Bootstrap(n.ctxReg); err != nil { + if err = kademliaDHT.Bootstrap(n.ctxQueryFromCtxCancel); err != nil { return err } @@ -775,14 +737,16 @@ func (n *Node) initDHT() error { } peerinfo, _ := peer.AddrInfoFromP2pAddr(bootstrapAddr) kademliaDHT.RoutingTable().PeerAdded(peerinfo.ID) - err = n.host.Connect(n.ctxReg, *peerinfo) + err = n.host.Connect(n.ctxQueryFromCtxCancel, *peerinfo) if err != nil { - log.Println("Failed to connect to the bootstrap node: ", peerinfo.ID.Pretty()) + out.Err(fmt.Sprintf("Connection to boot node failed: %s", peerinfo.ID.Pretty())) } else { - log.Println("Connected to the bootstrap node: ", peerinfo.ID.Pretty()) + out.Ok(fmt.Sprintf("Connection to boot node successful: %s", peerinfo.ID.Pretty())) } } + n.IpfsDHT = kademliaDHT n.RoutingDiscovery = drouting.NewRoutingDiscovery(n.IpfsDHT) + dutil.Advertise(n.ctxQueryFromCtxCancel, n.RoutingDiscovery, n.rendezvousVersion) return nil } diff --git a/core/utils.go b/core/utils.go index 1399620..b4aefcc 100644 --- a/core/utils.go +++ b/core/utils.go @@ -82,6 +82,10 @@ func GetLocalIp() ([]string, error) { for _, address := range addrs { if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { if ipnet.IP.To4() != nil { + ip := ipnet.IP.String() + if ip[len(ip)-1] == byte(49) && ip[len(ip)-3] == byte(48) { + continue + } result = append(result, ipnet.IP.String()) } } diff --git a/examples/node/main.go b/examples/node/main.go index 99abcd0..d07678c 100644 --- a/examples/node/main.go +++ b/examples/node/main.go @@ -13,9 +13,11 @@ import ( "log" "os" "strconv" + "time" p2pgo "github.com/CESSProject/p2p-go" "github.com/CESSProject/p2p-go/core" + "github.com/CESSProject/p2p-go/out" ) type Nnode struct { @@ -54,11 +56,30 @@ func main() { fmt.Println(nnode.Addrs(), nnode.ID()) + nnode.RouteTableFindPeers(0) + + tick := time.NewTicker(time.Second * 18) + defer tick.Stop() + + tickdht := time.NewTicker(time.Minute) + defer tickdht.Stop() + for { select { - case peer := <-nnode.DiscoveredPeer(): - log.Println("found: ", peer.ID.Pretty()) + case peer := <-nnode.GetDiscoveredPeers(): + tick.Reset(time.Second * 18) + for _, v := range peer.Responses { + log.Println("found: ", v.ID.Pretty(), v.Addrs) + } + case <-tick.C: + out.Tip("-------------------------RouteTableFindPeers----------------------") + nnode.RouteTableFindPeers(0) + case <-tickdht.C: + if _, err := nnode.DHTFindPeer(os.Args[2]); err == nil { + out.Tip(fmt.Sprintf("++++++++++++++++++++++++++++++++++++dht found: %s", os.Args[2])) + } else { + out.Tip(fmt.Sprintf("++++++++++++++++++++++++++++++++++++dht not found: %s", os.Args[2])) + } } } - } diff --git a/out/out.go b/out/out.go new file mode 100644 index 0000000..176dc0f --- /dev/null +++ b/out/out.go @@ -0,0 +1,74 @@ +/* + Copyright (C) CESS. All rights reserved. + Copyright (C) Cumulus Encrypted Storage System. All rights reserved. + + SPDX-License-Identifier: Apache-2.0 +*/ + +package out + +import ( + "fmt" + "time" +) + +const ( + HiBlack = iota + 90 + HiRed + HiGreen + HiYellow + HiBlue + HiPurple + HiCyan + HiWhite +) + +const ( + OkPrompt = "OK" + WarnPrompt = "!!" + ErrPrompt = "XX" + InputPrompt = ">>" + TipPrompt = "++" +) + +const TimeFormat = "2006-01-02 15:04:05" + +func Input(msg string) { + fmt.Println(textInput(), msg) +} + +func Tip(msg string) { + fmt.Println(textTip(), fmt.Sprintf("%v %s", time.Now().Format(TimeFormat), msg)) +} + +func Err(msg string) { + fmt.Println(textErr(), fmt.Sprintf("%v %s", time.Now().Format(TimeFormat), msg)) +} + +func Warn(msg string) { + fmt.Println(textWarn(), fmt.Sprintf("%v %s", time.Now().Format(TimeFormat), msg)) +} + +func Ok(msg string) { + fmt.Println(textOk(), fmt.Sprintf("%v %s", time.Now().Format(TimeFormat), msg)) +} + +func textTip() string { + return fmt.Sprintf("\x1b[0;%dm%s\x1b[0m", HiGreen, TipPrompt) +} + +func textInput() string { + return fmt.Sprintf("\x1b[0;%dm%s\x1b[0m", HiBlue, InputPrompt) +} + +func textErr() string { + return fmt.Sprintf("\x1b[0;%dm%s\x1b[0m", HiRed, ErrPrompt) +} + +func textOk() string { + return fmt.Sprintf("\x1b[0;%dm%s\x1b[0m", HiGreen, OkPrompt) +} + +func textWarn() string { + return fmt.Sprintf("\x1b[0;%dm%s\x1b[0m", HiYellow, WarnPrompt) +}