From 029ddb6dfc94f3339ef7d966fad25390939d91be Mon Sep 17 00:00:00 2001 From: AstaFrode Date: Thu, 18 Apr 2024 22:34:13 +0800 Subject: [PATCH] U online (#111) * add online * update NewDHT * update node --- core/node.go | 90 +++++++++++++------- core/online.go | 133 ++++++++++++++++++++++++++++++ core/protocol.go | 2 + examples/online/example_online.go | 58 +++++++++++++ 4 files changed, 251 insertions(+), 32 deletions(-) create mode 100644 core/online.go create mode 100644 examples/online/example_online.go diff --git a/core/node.go b/core/node.go index 0a99081..7bb41a9 100644 --- a/core/node.go +++ b/core/node.go @@ -66,6 +66,9 @@ type P2P interface { // GetProtocolVersion returns the ProtocolVersion of the host GetProtocolVersion() string + // + GetProtocolPrefix() string + // GetDhtProtocolVersion returns the host's DHT ProtocolVersion GetDhtProtocolVersion() string @@ -333,12 +336,27 @@ func NewPeerNode(ctx context.Context, cfg *config.Config) (*PeerNode, error) { return nil, fmt.Errorf("[NewDHT] %v", err) } - peer_node.dir, err = mkdir(cfg.Workspace) - if err != nil { - return nil, err - } + if len(boots) > 0 { + peer_node.dir, err = mkdir(cfg.Workspace) + if err != nil { + return nil, err + } - peer_node.initProtocol(cfg.ProtocolPrefix) + peer_node.initProtocol(cfg.ProtocolPrefix) + + for _, v := range boots { + bootstrapAddr, err := ma.NewMultiaddr(v) + if err != nil { + continue + } + peerinfo, err := peer.AddrInfoFromP2pAddr(bootstrapAddr) + if err != nil { + continue + } + peer_node.Connect(ctx, *peerinfo) + peer_node.OnlineAction(peerinfo.ID) + } + } return peer_node, nil } @@ -438,6 +456,10 @@ func (n *PeerNode) GetProtocolVersion() string { return n.protocolVersion } +func (n *PeerNode) GetProtocolPrefix() string { + return n.protocolPrefix +} + func (n *PeerNode) GetDhtProtocolVersion() string { return n.dhtProtocolVersion } @@ -641,6 +663,7 @@ func (n *PeerNode) initProtocol(protocolPrefix string) { n.ReadFileProtocol = n.NewReadFileProtocol() n.ReadDataProtocol = n.NewReadDataProtocol() n.ReadDataStatProtocol = n.NewReadDataStatProtocol() + n.OnlineProtocol = n.NewOnlineProtocol() } func NewDHT(ctx context.Context, h host.Host, bucketsize int, version string, boot_nodes []string, protocolPrefix, dhtProtocol string) (*dht.IpfsDHT, string, string, error) { @@ -669,36 +692,39 @@ func NewDHT(ctx context.Context, h host.Host, bucketsize int, version string, bo return nil, "", "", err } - netenv := "" - for _, peerAddr := range boot_nodes { - bootstrapAddr, err := ma.NewMultiaddr(peerAddr) - if err != nil { - continue - } - peerinfo, err := peer.AddrInfoFromP2pAddr(bootstrapAddr) - if err != nil { - continue - } - err = h.Connect(ctx, *peerinfo) - if err == nil { - out.Ok(fmt.Sprintf("Connect to the boot node: %s", peerinfo.ID.String())) - switch peerinfo.ID.String() { - case "12D3KooWS8a18xoBzwkmUsgGBctNo6QCr6XCpUDR946mTBBUTe83", - "12D3KooWDWeiiqbpNGAqA5QbDTdKgTtwX8LCShWkTpcyxpRf2jA9", - "12D3KooWNcTWWuUWKhjTVDF1xZ38yCoHXoF4aDjnbjsNpeVwj33U": - netenv = "testnet" - case "12D3KooWGDk9JJ5F6UPNuutEKSbHrTXnF5eSn3zKaR27amgU6o9S", - "12D3KooWEGeAp1MvvUrBYQtb31FE1LPg7aHsd1LtTXn6cerZTBBd", - "12D3KooWRm2sQg65y2ZgCUksLsjWmKbBtZ4HRRsGLxbN76XTtC8T": - netenv = "devnet" - default: - netenv = "mainnet" + if len(boot_nodes) > 0 { + netenv := "" + for _, peerAddr := range boot_nodes { + bootstrapAddr, err := ma.NewMultiaddr(peerAddr) + if err != nil { + continue + } + peerinfo, err := peer.AddrInfoFromP2pAddr(bootstrapAddr) + if err != nil { + continue + } + err = h.Connect(ctx, *peerinfo) + if err == nil { + out.Ok(fmt.Sprintf("Connect to the boot node: %s", peerinfo.ID.String())) + switch peerinfo.ID.String() { + case "12D3KooWS8a18xoBzwkmUsgGBctNo6QCr6XCpUDR946mTBBUTe83", + "12D3KooWDWeiiqbpNGAqA5QbDTdKgTtwX8LCShWkTpcyxpRf2jA9", + "12D3KooWNcTWWuUWKhjTVDF1xZ38yCoHXoF4aDjnbjsNpeVwj33U": + netenv = "testnet" + case "12D3KooWGDk9JJ5F6UPNuutEKSbHrTXnF5eSn3zKaR27amgU6o9S", + "12D3KooWEGeAp1MvvUrBYQtb31FE1LPg7aHsd1LtTXn6cerZTBBd", + "12D3KooWRm2sQg65y2ZgCUksLsjWmKbBtZ4HRRsGLxbN76XTtC8T": + netenv = "devnet" + default: + netenv = "mainnet" + } + return kademliaDHT, bootstrapAddr.String(), netenv, nil } - - return kademliaDHT, bootstrapAddr.String(), netenv, nil } + } else { + return kademliaDHT, "", "", nil } - return kademliaDHT, "", netenv, fmt.Errorf("failed to connect to all boot nodes") + return kademliaDHT, "", "", fmt.Errorf("failed to connect to all boot nodes") } func buildPrimaryResourceManager() (network.ResourceManager, error) { diff --git a/core/online.go b/core/online.go new file mode 100644 index 0000000..b1cae2d --- /dev/null +++ b/core/online.go @@ -0,0 +1,133 @@ +/* + Copyright (C) CESS. All rights reserved. + Copyright (C) Cumulus Encrypted Storage System. All rights reserved. + + SPDX-License-Identifier: Apache-2.0 +*/ + +package core + +import ( + "errors" + "io" + "sync" + "time" + + "github.com/CESSProject/p2p-go/pb" + + "github.com/google/uuid" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "google.golang.org/protobuf/proto" +) + +// pattern: /protocol-name/request-or-response-message/version +const OnlineRequest = "/online/req/v0" +const OnlineResponse = "/online/resp/v0" + +type onlineResp struct { + ch chan bool + *pb.MessageData +} + +type OnlineProtocol struct { // local host + *PeerNode + *sync.Mutex + requests map[string]*onlineResp // determine whether it is your own response +} + +func (n *PeerNode) NewOnlineProtocol() *OnlineProtocol { + e := OnlineProtocol{PeerNode: n, Mutex: new(sync.Mutex), requests: make(map[string]*onlineResp)} + n.SetStreamHandler(protocol.ID(n.protocolPrefix+OnlineResponse), e.onOnlineResponse) + return &e +} + +func (e *protocols) OnlineAction(id peer.ID) error { + var err error + var ok bool + // create message data + req := &pb.MessageData{ + Id: uuid.New().String(), + NodeId: e.OnlineProtocol.ID().String(), + } + + // store request so response handler has access to it + respChan := make(chan bool, 1) + + e.OnlineProtocol.Lock() + for { + if _, ok := e.OnlineProtocol.requests[req.Id]; ok { + req.Id = uuid.New().String() + continue + } + e.OnlineProtocol.requests[req.Id] = &onlineResp{ + ch: respChan, + MessageData: &pb.MessageData{ + Id: req.Id, + }, + } + break + } + e.OnlineProtocol.Unlock() + + defer func() { + e.OnlineProtocol.Lock() + delete(e.OnlineProtocol.requests, req.Id) + close(respChan) + e.OnlineProtocol.Unlock() + }() + + timeout := time.NewTicker(P2PWriteReqRespTime) + defer timeout.Stop() + + err = e.OnlineProtocol.SendProtoMessage(id, protocol.ID(e.ProtocolPrefix+OnlineRequest), req) + if err != nil { + return err + } + + // wait response + timeout.Reset(P2PWriteReqRespTime) + select { + case ok = <-respChan: + if !ok { + return errors.New(ERR_RespFailure) + } + return nil + case <-timeout.C: + return errors.New(ERR_RespTimeOut) + } +} + +// remote peer response handler +func (e *OnlineProtocol) onOnlineResponse(s network.Stream) { + defer s.Close() + + data := &pb.MessageData{} + buf, err := io.ReadAll(s) + if err != nil { + s.Reset() + return + } + + // unmarshal it + err = proto.Unmarshal(buf, data) + if err != nil { + s.Reset() + return + } + + if data.NodeId == "" { + s.Reset() + return + } + + // locate request data and remove it if found + e.OnlineProtocol.Lock() + defer e.OnlineProtocol.Unlock() + + _, ok := e.requests[data.Id] + if ok { + e.requests[data.Id].ch <- true + } +} diff --git a/core/protocol.go b/core/protocol.go index 7bd01b3..146b2cc 100644 --- a/core/protocol.go +++ b/core/protocol.go @@ -16,6 +16,7 @@ type Protocol interface { ReadFileAction(id peer.ID, roothash, datahash, path string, size int64) error ReadDataAction(id peer.ID, roothash, datahash, path string, size int64) error ReadDataStatAction(id peer.ID, roothash string, datahash string) (uint64, error) + OnlineAction(id peer.ID) error } type protocols struct { @@ -24,6 +25,7 @@ type protocols struct { *ReadFileProtocol *ReadDataProtocol *ReadDataStatProtocol + *OnlineProtocol } func NewProtocol() *protocols { diff --git a/examples/online/example_online.go b/examples/online/example_online.go new file mode 100644 index 0000000..32fbc57 --- /dev/null +++ b/examples/online/example_online.go @@ -0,0 +1,58 @@ +/* + Copyright (C) CESS. All rights reserved. + Copyright (C) Cumulus Encrypted Storage System. All rights reserved. + + SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "flag" + "fmt" + "log" + "time" + + p2pgo "github.com/CESSProject/p2p-go" + "github.com/libp2p/go-libp2p/core/peer" + ma "github.com/multiformats/go-multiaddr" +) + +func main() { + ctx := context.Background() + sourcePort1 := flag.Int("p1", 15000, "Source port number") + // To construct a simple host with all the default settings, just use `New` + h1, err := p2pgo.New( + ctx, + p2pgo.PrivatekeyFile(".private1"), + p2pgo.ListenPort(*sourcePort1), // regular tcp connections + p2pgo.Workspace("."), + p2pgo.BootPeers([]string{"_dnsaddr.boot-bucket-devnet.cess.cloud"}), + ) + if err != nil { + log.Println("[p2pgo.New]", err) + return + } + defer h1.Close() + + log.Println("node1:", h1.Addrs(), h1.ID()) + + remote := "/ip4/127.0.0.1/tcp/8001/p2p/12D3KooWGDk9JJ5F6UPNuutEKSbHrTXnF5eSn3zKaR27amgU6o9S" + + maddr, err := ma.NewMultiaddr(remote) + if err != nil { + log.Println("[ma.NewMultiaddr]", err) + return + } + // Extract the peer ID from the multiaddr. + info, err := peer.AddrInfoFromP2pAddr(maddr) + if err != nil { + log.Println("[peer.AddrInfoFromP2pAddr]", err) + return + } + h1.Peerstore().AddAddr(info.ID, maddr, time.Hour) + + err = h1.OnlineAction(info.ID) + fmt.Println(err) +}