From 0136857a038330fc88dc7b389f42b4554f5ccb16 Mon Sep 17 00:00:00 2001 From: sukun Date: Tue, 10 Sep 2024 14:15:25 +0530 Subject: [PATCH 1/5] tcp: fix metrics for multiple calls to Close (#2953) --- p2p/transport/tcp/metrics.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/p2p/transport/tcp/metrics.go b/p2p/transport/tcp/metrics.go index 4434b037bf..213ee2200a 100644 --- a/p2p/transport/tcp/metrics.go +++ b/p2p/transport/tcp/metrics.go @@ -208,7 +208,9 @@ type tracingConn struct { isClient bool manet.Conn - tcpConn *tcp.Conn + tcpConn *tcp.Conn + closeOnce sync.Once + closeErr error } func newTracingConn(c manet.Conn, isClient bool) (*tracingConn, error) { @@ -236,8 +238,11 @@ func (c *tracingConn) getDirection() string { } func (c *tracingConn) Close() error { - collector.ClosedConn(c, c.getDirection()) - return c.Conn.Close() + c.closeOnce.Do(func() { + collector.ClosedConn(c, c.getDirection()) + c.closeErr = c.Conn.Close() + }) + return c.closeErr } func (c *tracingConn) getTCPInfo() (*tcpinfo.Info, error) { From 6694d4a45740c1771c047e2d72fd7ebd72effcc3 Mon Sep 17 00:00:00 2001 From: gopherfarm Date: Tue, 10 Sep 2024 17:13:54 +0800 Subject: [PATCH 2/5] fix: use quic.Version instead of the deprecated quic.VersionNumber (#2955) --- p2p/transport/quic/listener.go | 4 ++-- p2p/transport/quic/transport.go | 2 +- p2p/transport/quic/virtuallistener.go | 12 ++++++------ p2p/transport/quicreuse/config.go | 2 +- p2p/transport/quicreuse/connmgr.go | 2 +- p2p/transport/quicreuse/quic_multiaddr.go | 6 +++--- 6 files changed, 14 insertions(+), 14 deletions(-) diff --git a/p2p/transport/quic/listener.go b/p2p/transport/quic/listener.go index 0c69741358..f90bdf53f0 100644 --- a/p2p/transport/quic/listener.go +++ b/p2p/transport/quic/listener.go @@ -23,11 +23,11 @@ type listener struct { rcmgr network.ResourceManager privKey ic.PrivKey localPeer peer.ID - localMultiaddrs map[quic.VersionNumber]ma.Multiaddr + localMultiaddrs map[quic.Version]ma.Multiaddr } func newListener(ln quicreuse.Listener, t *transport, localPeer peer.ID, key ic.PrivKey, rcmgr network.ResourceManager) (listener, error) { - localMultiaddrs := make(map[quic.VersionNumber]ma.Multiaddr) + localMultiaddrs := make(map[quic.Version]ma.Multiaddr) for _, addr := range ln.Multiaddrs() { if _, err := addr.ValueForProtocol(ma.P_QUIC_V1); err == nil { localMultiaddrs[quic.Version1] = addr diff --git a/p2p/transport/quic/transport.go b/p2p/transport/quic/transport.go index f0862d22e9..4d3d9e551d 100644 --- a/p2p/transport/quic/transport.go +++ b/p2p/transport/quic/transport.go @@ -327,7 +327,7 @@ func (t *transport) Listen(addr ma.Multiaddr) (tpt.Listener, error) { acceptRunner = &acceptLoopRunner{ acceptSem: make(chan struct{}, 1), - muxer: make(map[quic.VersionNumber]chan acceptVal), + muxer: make(map[quic.Version]chan acceptVal), } } diff --git a/p2p/transport/quic/virtuallistener.go b/p2p/transport/quic/virtuallistener.go index 8aa2a0c1e6..7927225567 100644 --- a/p2p/transport/quic/virtuallistener.go +++ b/p2p/transport/quic/virtuallistener.go @@ -16,7 +16,7 @@ const acceptBufferPerVersion = 4 type virtualListener struct { *listener udpAddr string - version quic.VersionNumber + version quic.Version t *transport acceptRunnner *acceptLoopRunner acceptChan chan acceptVal @@ -46,11 +46,11 @@ type acceptLoopRunner struct { acceptSem chan struct{} muxerMu sync.Mutex - muxer map[quic.VersionNumber]chan acceptVal + muxer map[quic.Version]chan acceptVal muxerClosed bool } -func (r *acceptLoopRunner) AcceptForVersion(v quic.VersionNumber) chan acceptVal { +func (r *acceptLoopRunner) AcceptForVersion(v quic.Version) chan acceptVal { r.muxerMu.Lock() defer r.muxerMu.Unlock() @@ -64,7 +64,7 @@ func (r *acceptLoopRunner) AcceptForVersion(v quic.VersionNumber) chan acceptVal return ch } -func (r *acceptLoopRunner) RmAcceptForVersion(v quic.VersionNumber, err error) { +func (r *acceptLoopRunner) RmAcceptForVersion(v quic.Version, err error) { r.muxerMu.Lock() defer r.muxerMu.Unlock() @@ -98,7 +98,7 @@ func (r *acceptLoopRunner) sendErrAndClose(err error) { // innerAccept is the inner logic of the Accept loop. Assume caller holds the // acceptSemaphore. May return both a nil conn and nil error if it didn't find a // conn with the expected version -func (r *acceptLoopRunner) innerAccept(l *listener, expectedVersion quic.VersionNumber, bufferedConnChan chan acceptVal) (tpt.CapableConn, error) { +func (r *acceptLoopRunner) innerAccept(l *listener, expectedVersion quic.Version, bufferedConnChan chan acceptVal) (tpt.CapableConn, error) { select { // Check if we have a buffered connection first from an earlier Accept call case v, ok := <-bufferedConnChan: @@ -150,7 +150,7 @@ func (r *acceptLoopRunner) innerAccept(l *listener, expectedVersion quic.Version return nil, nil } -func (r *acceptLoopRunner) Accept(l *listener, expectedVersion quic.VersionNumber, bufferedConnChan chan acceptVal) (tpt.CapableConn, error) { +func (r *acceptLoopRunner) Accept(l *listener, expectedVersion quic.Version, bufferedConnChan chan acceptVal) (tpt.CapableConn, error) { for { var conn tpt.CapableConn var err error diff --git a/p2p/transport/quicreuse/config.go b/p2p/transport/quicreuse/config.go index 92c881ef7c..62f8919c8b 100644 --- a/p2p/transport/quicreuse/config.go +++ b/p2p/transport/quicreuse/config.go @@ -12,7 +12,7 @@ var quicConfig = &quic.Config{ MaxStreamReceiveWindow: 10 * (1 << 20), // 10 MB MaxConnectionReceiveWindow: 15 * (1 << 20), // 15 MB KeepAlivePeriod: 15 * time.Second, - Versions: []quic.VersionNumber{quic.Version1}, + Versions: []quic.Version{quic.Version1}, // We don't use datagrams (yet), but this is necessary for WebTransport EnableDatagrams: true, } diff --git a/p2p/transport/quicreuse/connmgr.go b/p2p/transport/quicreuse/connmgr.go index 8e4c61b0bc..c3aa0fa046 100644 --- a/p2p/transport/quicreuse/connmgr.go +++ b/p2p/transport/quicreuse/connmgr.go @@ -232,7 +232,7 @@ func (c *ConnManager) DialQUIC(ctx context.Context, raddr ma.Multiaddr, tlsConf if v == quic.Version1 { // The endpoint has explicit support for QUIC v1, so we'll only use that version. - quicConf.Versions = []quic.VersionNumber{quic.Version1} + quicConf.Versions = []quic.Version{quic.Version1} } else { return nil, errors.New("unknown QUIC version") } diff --git a/p2p/transport/quicreuse/quic_multiaddr.go b/p2p/transport/quicreuse/quic_multiaddr.go index 3da4721b5b..85b282df2e 100644 --- a/p2p/transport/quicreuse/quic_multiaddr.go +++ b/p2p/transport/quicreuse/quic_multiaddr.go @@ -13,7 +13,7 @@ var ( quicV1MA = ma.StringCast("/quic-v1") ) -func ToQuicMultiaddr(na net.Addr, version quic.VersionNumber) (ma.Multiaddr, error) { +func ToQuicMultiaddr(na net.Addr, version quic.Version) (ma.Multiaddr, error) { udpMA, err := manet.FromNetAddr(na) if err != nil { return nil, err @@ -26,8 +26,8 @@ func ToQuicMultiaddr(na net.Addr, version quic.VersionNumber) (ma.Multiaddr, err } } -func FromQuicMultiaddr(addr ma.Multiaddr) (*net.UDPAddr, quic.VersionNumber, error) { - var version quic.VersionNumber +func FromQuicMultiaddr(addr ma.Multiaddr) (*net.UDPAddr, quic.Version, error) { + var version quic.Version var partsBeforeQUIC []ma.Multiaddr ma.ForEach(addr, func(c ma.Component) bool { switch c.Protocol().Code { From 3a720d91b20a400bf92e2c921b4b9103a8e4efb6 Mon Sep 17 00:00:00 2001 From: Piotr Galar Date: Sat, 14 Sep 2024 07:40:30 +0200 Subject: [PATCH 3/5] chore: parameterise s3 build cache setup (#2948) --- .github/workflows/interop-test.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/interop-test.yml b/.github/workflows/interop-test.yml index dc25f84bca..e51c51cddd 100644 --- a/.github/workflows/interop-test.yml +++ b/.github/workflows/interop-test.yml @@ -30,6 +30,6 @@ jobs: with: test-filter: go-libp2p-head extra-versions: ${{ github.workspace }}/test-plans/ping-version.json - s3-cache-bucket: libp2p-by-tf-aws-bootstrap - s3-access-key-id: ${{ vars.TEST_PLANS_BUILD_CACHE_KEY_ID }} - s3-secret-access-key: ${{ secrets.TEST_PLANS_BUILD_CACHE_KEY }} + s3-cache-bucket: ${{ vars.S3_LIBP2P_BUILD_CACHE_BUCKET_NAME }} + s3-access-key-id: ${{ vars.S3_LIBP2P_BUILD_CACHE_AWS_ACCESS_KEY_ID }} + s3-secret-access-key: ${{ secrets.S3_LIBP2P_BUILD_CACHE_AWS_SECRET_ACCESS_KEY }} From 6f2ec90f378759ea86a9ca559f8e02c8c4f3e0a8 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 17 Sep 2024 11:19:53 -0700 Subject: [PATCH 4/5] peerstore: better GC in membacked peerstore (#2960) --------- Co-authored-by: sukun --- core/peerstore/peerstore.go | 67 ++-- p2p/host/basic/basic_host.go | 55 +-- p2p/host/peerstore/pstoremem/addr_book.go | 336 ++++++++++-------- .../peerstore/pstoremem/addr_book_test.go | 200 +++++++++++ p2p/host/peerstore/pstoremem/inmem_test.go | 30 ++ 5 files changed, 476 insertions(+), 212 deletions(-) create mode 100644 p2p/host/peerstore/pstoremem/addr_book_test.go diff --git a/core/peerstore/peerstore.go b/core/peerstore/peerstore.go index 0ef09df9fe..10469e72cb 100644 --- a/core/peerstore/peerstore.go +++ b/core/peerstore/peerstore.go @@ -121,20 +121,10 @@ type AddrBook interface { PeersWithAddrs() peer.IDSlice } -// CertifiedAddrBook manages "self-certified" addresses for remote peers. -// Self-certified addresses are contained in peer.PeerRecords -// which are wrapped in a record.Envelope and signed by the peer -// to whom they belong. +// CertifiedAddrBook manages signed peer records and "self-certified" addresses +// contained within them. +// Use this interface with an `AddrBook`. // -// Certified addresses (CA) are generally more secure than uncertified -// addresses (UA). Consequently, CAs beat and displace UAs. When the -// peerstore learns CAs for a peer, it will reject UAs for the same peer -// (as long as the former haven't expired). -// Furthermore, peer records act like sequenced snapshots of CAs. Therefore, -// processing a peer record that's newer than the last one seen overwrites -// all addresses with the incoming ones. -// -// This interface is most useful when combined with AddrBook. // To test whether a given AddrBook / Peerstore implementation supports // certified addresses, callers should use the GetCertifiedAddrBook helper or // type-assert on the CertifiedAddrBook interface: @@ -143,41 +133,28 @@ type AddrBook interface { // cab.ConsumePeerRecord(signedPeerRecord, aTTL) // } type CertifiedAddrBook interface { - // ConsumePeerRecord adds addresses from a signed peer.PeerRecord (contained in - // a record.Envelope), which will expire after the given TTL. - // - // The 'accepted' return value indicates that the record was successfully processed - // and integrated into the CertifiedAddrBook state. If 'accepted' is false but no - // error is returned, it means that the record was ignored, most likely because - // a newer record exists for the same peer. - // - // Signed records added via this method will be stored without - // alteration as long as the address TTLs remain valid. The Envelopes - // containing the PeerRecords can be retrieved by calling GetPeerRecord(peerID). - // - // If the signed PeerRecord belongs to a peer that already has certified - // addresses in the CertifiedAddrBook, the new addresses will replace the - // older ones, if the new record has a higher sequence number than the - // existing record. Attempting to add a peer record with a - // sequence number that's <= an existing record for the same peer will not - // result in an error, but the record will be ignored, and the 'accepted' - // bool return value will be false. + // ConsumePeerRecord stores a signed peer record and the contained addresses for + // for ttl duration. + // The addresses contained in the signed peer record will expire after ttl. If any + // address is already present in the peer store, it'll expire at max of existing ttl and + // provided ttl. + // The signed peer record itself will be expired when all the addresses associated with the peer, + // self-certified or not, are removed from the AddrBook. + // To delete the signed peer record, use `AddrBook.UpdateAddrs`,`AddrBook.SetAddrs`, or + // `AddrBook.ClearAddrs` with ttl 0. + // Note: Future calls to ConsumePeerRecord will not expire self-certified addresses from the + // previous calls. // - // If the CertifiedAddrBook is also an AddrBook (which is most likely the case), - // adding certified addresses for a peer will *replace* any - // existing non-certified addresses for that peer, and only the certified - // addresses will be returned from AddrBook.Addrs thereafter. + // The `accepted` return value indicates that the record was successfully processed. If + // `accepted` is false but no error is returned, it means that the record was ignored, most + // likely because a newer record exists for the same peer with a greater seq value. // - // Likewise, once certified addresses have been added for a given peer, - // any non-certified addresses added via AddrBook.AddAddrs or - // AddrBook.SetAddrs will be ignored. AddrBook.SetAddrs may still be used - // to update the TTL of certified addresses that have previously been - // added via ConsumePeerRecord. + // The Envelopes containing the signed peer records can be retrieved by calling + // GetPeerRecord(peerID). ConsumePeerRecord(s *record.Envelope, ttl time.Duration) (accepted bool, err error) - // GetPeerRecord returns an Envelope containing a PeerRecord for the - // given peer id, if one exists. - // Returns nil if no signed PeerRecord exists for the peer. + // GetPeerRecord returns an Envelope containing a peer record for the + // peer, or nil if no record exists. GetPeerRecord(p peer.ID) *record.Envelope } @@ -196,7 +173,7 @@ func GetCertifiedAddrBook(ab AddrBook) (cab CertifiedAddrBook, ok bool) { // KeyBook tracks the keys of Peers. type KeyBook interface { - // PubKey stores the public key of a peer. + // PubKey returns the public key of a peer. PubKey(peer.ID) ic.PubKey // AddPubKey stores the public key of a peer. diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 7b7f8855fb..b5d252e9d2 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -495,9 +495,12 @@ func (h *BasicHost) SignalAddressChange() { } } -func makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated { +func (h *BasicHost) makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated { + if prev == nil && current == nil { + return nil + } prevmap := make(map[string]ma.Multiaddr, len(prev)) - evt := event.EvtLocalAddressesUpdated{Diffs: true} + evt := &event.EvtLocalAddressesUpdated{Diffs: true} addrsAdded := false for _, addr := range prev { @@ -524,7 +527,19 @@ func makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddresses return nil } - return &evt + // Our addresses have changed. Make a new signed peer record. + if !h.disableSignedPeerRecord { + // add signed peer record to the event + sr, err := h.makeSignedPeerRecord(current) + if err != nil { + log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err) + // drop this change + return nil + } + evt.SignedPeerRecord = sr + } + + return evt } func (h *BasicHost) makeSignedPeerRecord(addrs []ma.Multiaddr) (*record.Envelope, error) { @@ -548,34 +563,27 @@ func (h *BasicHost) background() { var lastAddrs []ma.Multiaddr emitAddrChange := func(currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) { - // nothing to do if both are nil..defensive check - if currentAddrs == nil && lastAddrs == nil { - return - } - - changeEvt := makeUpdatedAddrEvent(lastAddrs, currentAddrs) - + changeEvt := h.makeUpdatedAddrEvent(lastAddrs, currentAddrs) if changeEvt == nil { return } - + // Our addresses have changed. + // store the signed peer record in the peer store. if !h.disableSignedPeerRecord { - // add signed peer record to the event - sr, err := h.makeSignedPeerRecord(currentAddrs) - if err != nil { - log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err) - return - } - changeEvt.SignedPeerRecord = sr - - // persist the signed record to the peerstore - if _, err := h.caBook.ConsumePeerRecord(sr, peerstore.PermanentAddrTTL); err != nil { + if _, err := h.caBook.ConsumePeerRecord(changeEvt.SignedPeerRecord, peerstore.PermanentAddrTTL); err != nil { log.Errorf("failed to persist signed peer record in peer store, err=%s", err) return } } + // update host addresses in the peer store + removedAddrs := make([]ma.Multiaddr, 0, len(changeEvt.Removed)) + for _, ua := range changeEvt.Removed { + removedAddrs = append(removedAddrs, ua.Address) + } + h.Peerstore().SetAddrs(h.ID(), currentAddrs, peerstore.PermanentAddrTTL) + h.Peerstore().SetAddrs(h.ID(), removedAddrs, 0) - // emit addr change event on the bus + // emit addr change event if err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt); err != nil { log.Warnf("error emitting event for updated addrs: %s", err) } @@ -587,11 +595,10 @@ func (h *BasicHost) background() { defer ticker.Stop() for { + // Update our local IP addresses before checking our current addresses. if len(h.network.ListenAddresses()) > 0 { h.updateLocalIpAddr() } - // Request addresses anyways because, technically, address filters still apply. - // The underlying AllAddrs call is effectively a no-op. curr := h.Addrs() emitAddrChange(curr, lastAddrs) lastAddrs = curr diff --git a/p2p/host/peerstore/pstoremem/addr_book.go b/p2p/host/peerstore/pstoremem/addr_book.go index 209937ca83..89b87bdb47 100644 --- a/p2p/host/peerstore/pstoremem/addr_book.go +++ b/p2p/host/peerstore/pstoremem/addr_book.go @@ -1,6 +1,7 @@ package pstoremem import ( + "container/heap" "context" "fmt" "sort" @@ -18,13 +19,16 @@ import ( var log = logging.Logger("peerstore") type expiringAddr struct { - Addr ma.Multiaddr - TTL time.Duration - Expires time.Time + Addr ma.Multiaddr + TTL time.Duration + Expiry time.Time + Peer peer.ID + // to sort by expiry time + heapIndex int } func (e *expiringAddr) ExpiredBy(t time.Time) bool { - return !t.Before(e.Expires) + return !t.Before(e.Expiry) } type peerRecordState struct { @@ -32,24 +36,89 @@ type peerRecordState struct { Seq uint64 } -type addrSegments [256]*addrSegment +// Essentially Go stdlib's Priority Queue example +var _ heap.Interface = &peerAddrs{} -type addrSegment struct { - sync.RWMutex +type peerAddrs struct { + Addrs map[peer.ID]map[string]*expiringAddr // peer.ID -> addr.Bytes() -> *expiringAddr + expiringHeap []*expiringAddr +} - // Use pointers to save memory. Maps always leave some fraction of their - // space unused. storing the *values* directly in the map will - // drastically increase the space waste. In our case, by 6x. - addrs map[peer.ID]map[string]*expiringAddr +func newPeerAddrs() peerAddrs { + return peerAddrs{ + Addrs: make(map[peer.ID]map[string]*expiringAddr), + } +} - signedPeerRecords map[peer.ID]*peerRecordState +func (pa *peerAddrs) Len() int { return len(pa.expiringHeap) } +func (pa *peerAddrs) Less(i, j int) bool { + return pa.expiringHeap[i].Expiry.Before(pa.expiringHeap[j].Expiry) +} +func (pa *peerAddrs) Swap(i, j int) { + pa.expiringHeap[i], pa.expiringHeap[j] = pa.expiringHeap[j], pa.expiringHeap[i] + pa.expiringHeap[i].heapIndex = i + pa.expiringHeap[j].heapIndex = j +} +func (pa *peerAddrs) Push(x any) { + a := x.(*expiringAddr) + if _, ok := pa.Addrs[a.Peer]; !ok { + pa.Addrs[a.Peer] = make(map[string]*expiringAddr) + } + pa.Addrs[a.Peer][string(a.Addr.Bytes())] = a + a.heapIndex = len(pa.expiringHeap) + pa.expiringHeap = append(pa.expiringHeap, a) +} +func (pa *peerAddrs) Pop() any { + a := pa.expiringHeap[len(pa.expiringHeap)-1] + a.heapIndex = -1 + pa.expiringHeap = pa.expiringHeap[0 : len(pa.expiringHeap)-1] + + if m, ok := pa.Addrs[a.Peer]; ok { + delete(m, string(a.Addr.Bytes())) + if len(m) == 0 { + delete(pa.Addrs, a.Peer) + } + } + return a +} + +func (pa *peerAddrs) Fix(a *expiringAddr) { + heap.Fix(pa, a.heapIndex) +} + +func (pa *peerAddrs) Delete(a *expiringAddr) { + heap.Remove(pa, a.heapIndex) + a.heapIndex = -1 + if m, ok := pa.Addrs[a.Peer]; ok { + delete(m, string(a.Addr.Bytes())) + if len(m) == 0 { + delete(pa.Addrs, a.Peer) + } + } } -func (segments *addrSegments) get(p peer.ID) *addrSegment { - if len(p) == 0 { // it's not terribly useful to use an empty peer ID, but at least we should not panic - return segments[0] +func (pa *peerAddrs) FindAddr(p peer.ID, addrBytes ma.Multiaddr) (*expiringAddr, bool) { + if m, ok := pa.Addrs[p]; ok { + v, ok := m[string(addrBytes.Bytes())] + return v, ok } - return segments[p[len(p)-1]] + return nil, false +} + +func (pa *peerAddrs) NextExpiry() time.Time { + if len(pa.expiringHeap) == 0 { + return time.Time{} + } + return pa.expiringHeap[0].Expiry +} + +func (pa *peerAddrs) PopIfExpired(now time.Time) (*expiringAddr, bool) { + // Use `!Before` instead of `After` to ensure that we expire *at* now, and not *just after now*. + if len(pa.expiringHeap) > 0 && !now.Before(pa.NextExpiry()) { + a := heap.Pop(pa) + return a.(*expiringAddr), true + } + return nil, false } type clock interface { @@ -64,7 +133,10 @@ func (rc realclock) Now() time.Time { // memoryAddrBook manages addresses. type memoryAddrBook struct { - segments addrSegments + mu sync.RWMutex + // TODO bound the number of not connected addresses we store. + addrs peerAddrs + signedPeerRecords map[peer.ID]*peerRecordState refCount sync.WaitGroup cancel func() @@ -80,17 +152,11 @@ func NewAddrBook() *memoryAddrBook { ctx, cancel := context.WithCancel(context.Background()) ab := &memoryAddrBook{ - segments: func() (ret addrSegments) { - for i := range ret { - ret[i] = &addrSegment{ - addrs: make(map[peer.ID]map[string]*expiringAddr), - signedPeerRecords: make(map[peer.ID]*peerRecordState)} - } - return ret - }(), - subManager: NewAddrSubManager(), - cancel: cancel, - clock: realclock{}, + addrs: newPeerAddrs(), + signedPeerRecords: make(map[peer.ID]*peerRecordState), + subManager: NewAddrSubManager(), + cancel: cancel, + clock: realclock{}, } ab.refCount.Add(1) go ab.background(ctx) @@ -109,7 +175,7 @@ func WithClock(clock clock) AddrBookOption { // background periodically schedules a gc func (mab *memoryAddrBook) background(ctx context.Context) { defer mab.refCount.Done() - ticker := time.NewTicker(1 * time.Hour) + ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() for { @@ -131,37 +197,22 @@ func (mab *memoryAddrBook) Close() error { // gc garbage collects the in-memory address book. func (mab *memoryAddrBook) gc() { now := mab.clock.Now() - for _, s := range mab.segments { - s.Lock() - for p, amap := range s.addrs { - for k, addr := range amap { - if addr.ExpiredBy(now) { - delete(amap, k) - } - } - if len(amap) == 0 { - delete(s.addrs, p) - delete(s.signedPeerRecords, p) - } + mab.mu.Lock() + defer mab.mu.Unlock() + for { + ea, ok := mab.addrs.PopIfExpired(now) + if !ok { + return } - s.Unlock() + mab.maybeDeleteSignedPeerRecordUnlocked(ea.Peer) } } func (mab *memoryAddrBook) PeersWithAddrs() peer.IDSlice { - // deduplicate, since the same peer could have both signed & unsigned addrs - set := make(map[peer.ID]struct{}) - for _, s := range mab.segments { - s.RLock() - for pid, amap := range s.addrs { - if len(amap) > 0 { - set[pid] = struct{}{} - } - } - s.RUnlock() - } - peers := make(peer.IDSlice, 0, len(set)) - for pid := range set { + mab.mu.RLock() + defer mab.mu.RUnlock() + peers := make(peer.IDSlice, 0, len(mab.addrs.Addrs)) + for pid := range mab.addrs.Addrs { peers = append(peers, pid) } return peers @@ -172,20 +223,13 @@ func (mab *memoryAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Durati mab.AddAddrs(p, []ma.Multiaddr{addr}, ttl) } -// AddAddrs gives memoryAddrBook addresses to use, with a given ttl -// (time-to-live), after which the address is no longer valid. +// AddAddrs adds `addrs` for peer `p`, which will expire after the given `ttl`. // This function never reduces the TTL or expiration of an address. func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { - // if we have a valid peer record, ignore unsigned addrs - // peerRec := mab.GetPeerRecord(p) - // if peerRec != nil { - // return - // } mab.addAddrs(p, addrs, ttl) } -// ConsumePeerRecord adds addresses from a signed peer.PeerRecord (contained in -// a record.Envelope), which will expire after the given TTL. +// ConsumePeerRecord adds addresses from a signed peer.PeerRecord, which will expire after the given TTL. // See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook for more details. func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl time.Duration) (bool, error) { r, err := recordEnvelope.Record() @@ -200,42 +244,43 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt return false, fmt.Errorf("signing key does not match PeerID in PeerRecord") } - // ensure seq is greater than, or equal to, the last received - s := mab.segments.get(rec.PeerID) - s.Lock() - defer s.Unlock() - lastState, found := s.signedPeerRecords[rec.PeerID] + mab.mu.Lock() + defer mab.mu.Unlock() + + // ensure seq is greater than or equal to the last received + lastState, found := mab.signedPeerRecords[rec.PeerID] if found && lastState.Seq > rec.Seq { return false, nil } - s.signedPeerRecords[rec.PeerID] = &peerRecordState{ + mab.signedPeerRecords[rec.PeerID] = &peerRecordState{ Envelope: recordEnvelope, Seq: rec.Seq, } - mab.addAddrsUnlocked(s, rec.PeerID, rec.Addrs, ttl, true) + mab.addAddrsUnlocked(rec.PeerID, rec.Addrs, ttl) return true, nil } +func (mab *memoryAddrBook) maybeDeleteSignedPeerRecordUnlocked(p peer.ID) { + if len(mab.addrs.Addrs[p]) == 0 { + delete(mab.signedPeerRecords, p) + } +} + func (mab *memoryAddrBook) addAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { - s := mab.segments.get(p) - s.Lock() - defer s.Unlock() + mab.mu.Lock() + defer mab.mu.Unlock() - mab.addAddrsUnlocked(s, p, addrs, ttl, false) + mab.addAddrsUnlocked(p, addrs, ttl) } -func (mab *memoryAddrBook) addAddrsUnlocked(s *addrSegment, p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, signed bool) { +func (mab *memoryAddrBook) addAddrsUnlocked(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { + defer mab.maybeDeleteSignedPeerRecordUnlocked(p) + // if ttl is zero, exit. nothing to do. if ttl <= 0 { return } - amap, ok := s.addrs[p] - if !ok { - amap = make(map[string]*expiringAddr) - s.addrs[p] = amap - } - exp := mab.clock.Now().Add(ttl) for _, addr := range addrs { // Remove suffix of /p2p/peer-id from address @@ -248,21 +293,25 @@ func (mab *memoryAddrBook) addAddrsUnlocked(s *addrSegment, p peer.ID, addrs []m log.Warnf("Was passed p2p address with a different peerId. found: %s, expected: %s", addrPid, p) continue } - // find the highest TTL and Expiry time between - // existing records and function args - a, found := amap[string(addr.Bytes())] // won't allocate. + a, found := mab.addrs.FindAddr(p, addr) if !found { // not found, announce it. - entry := &expiringAddr{Addr: addr, Expires: exp, TTL: ttl} - amap[string(addr.Bytes())] = entry + entry := &expiringAddr{Addr: addr, Expiry: exp, TTL: ttl, Peer: p} + heap.Push(&mab.addrs, entry) mab.subManager.BroadcastAddr(p, addr) } else { // update ttl & exp to whichever is greater between new and existing entry + var changed bool if ttl > a.TTL { + changed = true a.TTL = ttl } - if exp.After(a.Expires) { - a.Expires = exp + if exp.After(a.Expiry) { + changed = true + a.Expiry = exp + } + if changed { + mab.addrs.Fix(a) } } } @@ -276,15 +325,10 @@ func (mab *memoryAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Durati // SetAddrs sets the ttl on addresses. This clears any TTL there previously. // This is used when we receive the best estimate of the validity of an address. func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { - s := mab.segments.get(p) - s.Lock() - defer s.Unlock() + mab.mu.Lock() + defer mab.mu.Unlock() - amap, ok := s.addrs[p] - if !ok { - amap = make(map[string]*expiringAddr) - s.addrs[p] = amap - } + defer mab.maybeDeleteSignedPeerRecordUnlocked(p) exp := mab.clock.Now().Add(ttl) for _, addr := range addrs { @@ -297,15 +341,22 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du log.Warnf("was passed p2p address with a different peerId, found: %s wanted: %s", addrPid, p) continue } - aBytes := addr.Bytes() - key := string(aBytes) - // re-set all of them for new ttl. - if ttl > 0 { - amap[key] = &expiringAddr{Addr: addr, Expires: exp, TTL: ttl} - mab.subManager.BroadcastAddr(p, addr) + if a, found := mab.addrs.FindAddr(p, addr); found { + if ttl > 0 { + a.Addr = addr + a.Expiry = exp + a.TTL = ttl + mab.addrs.Fix(a) + mab.subManager.BroadcastAddr(p, addr) + } else { + mab.addrs.Delete(a) + } } else { - delete(amap, key) + if ttl > 0 { + heap.Push(&mab.addrs, &expiringAddr{Addr: addr, Expiry: exp, TTL: ttl, Peer: p}) + mab.subManager.BroadcastAddr(p, addr) + } } } } @@ -313,23 +364,20 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du // UpdateAddrs updates the addresses associated with the given peer that have // the given oldTTL to have the given newTTL. func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) { - s := mab.segments.get(p) - s.Lock() - defer s.Unlock() - exp := mab.clock.Now().Add(newTTL) - amap, found := s.addrs[p] - if !found { - return - } + mab.mu.Lock() + defer mab.mu.Unlock() - for k, a := range amap { + defer mab.maybeDeleteSignedPeerRecordUnlocked(p) + + exp := mab.clock.Now().Add(newTTL) + for _, a := range mab.addrs.Addrs[p] { if oldTTL == a.TTL { if newTTL == 0 { - delete(amap, k) + mab.addrs.Delete(a) } else { a.TTL = newTTL - a.Expires = exp - amap[k] = a + a.Expiry = exp + mab.addrs.Fix(a) } } } @@ -337,11 +385,12 @@ func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL t // Addrs returns all known (and valid) addresses for a given peer func (mab *memoryAddrBook) Addrs(p peer.ID) []ma.Multiaddr { - s := mab.segments.get(p) - s.RLock() - defer s.RUnlock() - - return validAddrs(mab.clock.Now(), s.addrs[p]) + mab.mu.RLock() + defer mab.mu.RUnlock() + if _, ok := mab.addrs.Addrs[p]; !ok { + return nil + } + return validAddrs(mab.clock.Now(), mab.addrs.Addrs[p]) } func validAddrs(now time.Time, amap map[string]*expiringAddr) []ma.Multiaddr { @@ -354,7 +403,6 @@ func validAddrs(now time.Time, amap map[string]*expiringAddr) []ma.Multiaddr { good = append(good, m.Addr) } } - return good } @@ -362,18 +410,18 @@ func validAddrs(now time.Time, amap map[string]*expiringAddr) []ma.Multiaddr { // given peer id, if one exists. // Returns nil if no signed PeerRecord exists for the peer. func (mab *memoryAddrBook) GetPeerRecord(p peer.ID) *record.Envelope { - s := mab.segments.get(p) - s.RLock() - defer s.RUnlock() - - // although the signed record gets garbage collected when all addrs inside it are expired, - // we may be in between the expiration time and the GC interval - // so, we check to see if we have any valid signed addrs before returning the record - if len(validAddrs(mab.clock.Now(), s.addrs[p])) == 0 { + mab.mu.RLock() + defer mab.mu.RUnlock() + + if _, ok := mab.addrs.Addrs[p]; !ok { + return nil + } + // The record may have expired, but not gargage collected. + if len(validAddrs(mab.clock.Now(), mab.addrs.Addrs[p])) == 0 { return nil } - state := s.signedPeerRecords[p] + state := mab.signedPeerRecords[p] if state == nil { return nil } @@ -382,26 +430,28 @@ func (mab *memoryAddrBook) GetPeerRecord(p peer.ID) *record.Envelope { // ClearAddrs removes all previously stored addresses func (mab *memoryAddrBook) ClearAddrs(p peer.ID) { - s := mab.segments.get(p) - s.Lock() - defer s.Unlock() + mab.mu.Lock() + defer mab.mu.Unlock() - delete(s.addrs, p) - delete(s.signedPeerRecords, p) + delete(mab.signedPeerRecords, p) + for _, a := range mab.addrs.Addrs[p] { + mab.addrs.Delete(a) + } } // AddrStream returns a channel on which all new addresses discovered for a // given peer ID will be published. func (mab *memoryAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr { - s := mab.segments.get(p) - s.RLock() - defer s.RUnlock() + var initial []ma.Multiaddr - baseaddrslice := s.addrs[p] - initial := make([]ma.Multiaddr, 0, len(baseaddrslice)) - for _, a := range baseaddrslice { - initial = append(initial, a.Addr) + mab.mu.RLock() + if m, ok := mab.addrs.Addrs[p]; ok { + initial = make([]ma.Multiaddr, 0, len(m)) + for _, a := range m { + initial = append(initial, a.Addr) + } } + mab.mu.RUnlock() return mab.subManager.AddrStream(ctx, p, initial) } diff --git a/p2p/host/peerstore/pstoremem/addr_book_test.go b/p2p/host/peerstore/pstoremem/addr_book_test.go new file mode 100644 index 0000000000..963c4552cf --- /dev/null +++ b/p2p/host/peerstore/pstoremem/addr_book_test.go @@ -0,0 +1,200 @@ +package pstoremem + +import ( + "container/heap" + "fmt" + "math/rand" + "slices" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + ma "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" +) + +func TestPeerAddrsNextExpiry(t *testing.T) { + paa := newPeerAddrs() + pa := &paa + a1 := ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1") + a2 := ma.StringCast("/ip4/1.2.3.4/udp/2/quic-v1") + + // t1 is before t2 + t1 := time.Time{}.Add(1 * time.Second) + t2 := time.Time{}.Add(2 * time.Second) + heap.Push(pa, &expiringAddr{Addr: a1, Expiry: t1, TTL: 10 * time.Second, Peer: "p1"}) + heap.Push(pa, &expiringAddr{Addr: a2, Expiry: t2, TTL: 10 * time.Second, Peer: "p2"}) + + if pa.NextExpiry() != t1 { + t.Fatal("expiry should be set to t1, got", pa.NextExpiry()) + } +} + +func peerAddrsInput(n int) []*expiringAddr { + expiringAddrs := make([]*expiringAddr, n) + for i := 0; i < n; i++ { + port := i % 65535 + a := ma.StringCast(fmt.Sprintf("/ip4/1.2.3.4/udp/%d/quic-v1", port)) + e := time.Time{}.Add(time.Duration(i) * time.Second) + p := peer.ID(fmt.Sprintf("p%d", i)) + expiringAddrs[i] = &expiringAddr{Addr: a, Expiry: e, TTL: 10 * time.Second, Peer: p} + } + return expiringAddrs +} + +func TestPeerAddrsHeapProperty(t *testing.T) { + paa := newPeerAddrs() + pa := &paa + + const N = 10000 + expiringAddrs := peerAddrsInput(N) + for i := 0; i < N; i++ { + heap.Push(pa, expiringAddrs[i]) + } + + for i := 0; i < N; i++ { + ea, ok := pa.PopIfExpired(expiringAddrs[i].Expiry) + require.True(t, ok, "pos: %d", i) + require.Equal(t, ea.Addr, expiringAddrs[i].Addr) + + ea, ok = pa.PopIfExpired(expiringAddrs[i].Expiry) + require.False(t, ok) + require.Nil(t, ea) + } +} + +func TestPeerAddrsHeapPropertyDeletions(t *testing.T) { + paa := newPeerAddrs() + pa := &paa + + const N = 10000 + expiringAddrs := peerAddrsInput(N) + for i := 0; i < N; i++ { + heap.Push(pa, expiringAddrs[i]) + } + + // delete every 3rd element + for i := 0; i < N; i += 3 { + paa.Delete(expiringAddrs[i]) + } + + for i := 0; i < N; i++ { + ea, ok := pa.PopIfExpired(expiringAddrs[i].Expiry) + if i%3 == 0 { + require.False(t, ok) + require.Nil(t, ea) + } else { + require.True(t, ok) + require.Equal(t, ea.Addr, expiringAddrs[i].Addr) + } + + ea, ok = pa.PopIfExpired(expiringAddrs[i].Expiry) + require.False(t, ok) + require.Nil(t, ea) + } +} + +func TestPeerAddrsHeapPropertyUpdates(t *testing.T) { + paa := newPeerAddrs() + pa := &paa + + const N = 10000 + expiringAddrs := peerAddrsInput(N) + for i := 0; i < N; i++ { + heap.Push(pa, expiringAddrs[i]) + } + + // update every 3rd element to expire at the end + var endElements []ma.Multiaddr + for i := 0; i < N; i += 3 { + expiringAddrs[i].Expiry = time.Time{}.Add(1000_000 * time.Second) + pa.Fix(expiringAddrs[i]) + endElements = append(endElements, expiringAddrs[i].Addr) + } + + for i := 0; i < N; i++ { + if i%3 == 0 { + continue // skip the elements at the end + } + ea, ok := pa.PopIfExpired(expiringAddrs[i].Expiry) + require.True(t, ok, "pos: %d", i) + require.Equal(t, ea.Addr, expiringAddrs[i].Addr) + + ea, ok = pa.PopIfExpired(expiringAddrs[i].Expiry) + require.False(t, ok) + require.Nil(t, ea) + } + + for len(endElements) > 0 { + ea, ok := pa.PopIfExpired(time.Time{}.Add(1000_000 * time.Second)) + require.True(t, ok) + require.Contains(t, endElements, ea.Addr) + endElements = slices.DeleteFunc(endElements, func(a ma.Multiaddr) bool { return ea.Addr.Equal(a) }) + } +} + +// TestPeerAddrsExpiry tests for multiple element expiry with PopIfExpired. +func TestPeerAddrsExpiry(t *testing.T) { + const T = 100_000 + for x := 0; x < T; x++ { + paa := newPeerAddrs() + pa := &paa + // Try a lot of random inputs. + // T > 5*((5^5)*5) (=15k) + // So this should test for all possible 5 element inputs. + const N = 5 + expiringAddrs := peerAddrsInput(N) + for i := 0; i < N; i++ { + expiringAddrs[i].Expiry = time.Time{}.Add(time.Duration(1+rand.Intn(N)) * time.Second) + } + for i := 0; i < N; i++ { + heap.Push(pa, expiringAddrs[i]) + } + + expiry := time.Time{}.Add(time.Duration(1+rand.Intn(N)) * time.Second) + expected := []ma.Multiaddr{} + for i := 0; i < N; i++ { + if !expiry.Before(expiringAddrs[i].Expiry) { + expected = append(expected, expiringAddrs[i].Addr) + } + } + got := []ma.Multiaddr{} + for { + ea, ok := pa.PopIfExpired(expiry) + if !ok { + break + } + got = append(got, ea.Addr) + } + expiries := []int{} + for i := 0; i < N; i++ { + expiries = append(expiries, expiringAddrs[i].Expiry.Second()) + } + require.ElementsMatch(t, expected, got, "failed for input: element expiries: %v, expiry: %v", expiries, expiry.Second()) + } +} + +func BenchmarkPeerAddrs(b *testing.B) { + sizes := [...]int{1, 10, 100, 1000, 10_000, 100_000, 1000_000} + for _, sz := range sizes { + b.Run(fmt.Sprintf("%d", sz), func(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + paa := newPeerAddrs() + pa := &paa + expiringAddrs := peerAddrsInput(sz) + for i := 0; i < sz; i++ { + heap.Push(pa, expiringAddrs[i]) + } + b.StartTimer() + for { + _, ok := pa.PopIfExpired(expiringAddrs[len(expiringAddrs)-1].Expiry) + if !ok { + break + } + } + } + }) + } + +} diff --git a/p2p/host/peerstore/pstoremem/inmem_test.go b/p2p/host/peerstore/pstoremem/inmem_test.go index 097a1fd25e..f87d6e4f45 100644 --- a/p2p/host/peerstore/pstoremem/inmem_test.go +++ b/p2p/host/peerstore/pstoremem/inmem_test.go @@ -1,10 +1,14 @@ package pstoremem import ( + "strconv" "testing" + "time" + "github.com/libp2p/go-libp2p/core/peer" pstore "github.com/libp2p/go-libp2p/core/peerstore" pt "github.com/libp2p/go-libp2p/p2p/host/peerstore/test" + "github.com/multiformats/go-multiaddr" mockClock "github.com/benbjohnson/clock" "github.com/stretchr/testify/require" @@ -82,3 +86,29 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), ) } + +func BenchmarkGC(b *testing.B) { + clock := mockClock.NewMock() + ps, err := NewPeerstore(WithClock(clock)) + require.NoError(b, err) + defer ps.Close() + + peerCount := 100_000 + addrsPerPeer := 32 + + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + for i := 0; i < peerCount; i++ { + id := peer.ID(strconv.Itoa(i)) + addrs := make([]multiaddr.Multiaddr, addrsPerPeer) + for j := 0; j < addrsPerPeer; j++ { + addrs[j] = multiaddr.StringCast("/ip4/1.2.3.4/tcp/" + strconv.Itoa(j)) + } + ps.AddAddrs(id, addrs, 24*time.Hour) + } + clock.Add(25 * time.Hour) + b.StartTimer() + ps.gc() + } +} From cb4f7cb9774cc12995e29603e8be60e5dd16d1c4 Mon Sep 17 00:00:00 2001 From: sukun Date: Tue, 24 Sep 2024 20:07:51 +0530 Subject: [PATCH 5/5] Release v0.36.4 --- version.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.json b/version.json index 844c2c1201..706bab2ee2 100644 --- a/version.json +++ b/version.json @@ -1,3 +1,3 @@ { - "version": "v0.36.3" + "version": "v0.36.4" }