diff --git a/bitswap/client/internal/messagequeue/messagequeue.go b/bitswap/client/internal/messagequeue/messagequeue.go index edea20b9c..86b0f260a 100644 --- a/bitswap/client/internal/messagequeue/messagequeue.go +++ b/bitswap/client/internal/messagequeue/messagequeue.go @@ -175,6 +175,23 @@ func (r *recallWantlist) ClearSentAt(c cid.Cid) { delete(r.sentAt, c) } +// Refresh moves wants from the sent list back to the pending list. +// If a want has been sent for longer than the interval, it is moved back to the pending list. +// Returns the number of wants that were refreshed. +func (r *recallWantlist) Refresh(now time.Time, interval time.Duration) int { + var refreshed int + for _, want := range r.sent.Entries() { + sentAt, ok := r.sentAt[want.Cid] + if ok && now.Sub(sentAt) >= interval { + r.pending.Add(want.Cid, want.Priority, want.WantType) + r.sent.Remove(want.Cid) + refreshed++ + } + } + + return refreshed +} + type peerConn struct { p peer.ID network MessageNetwork @@ -476,27 +493,27 @@ func (mq *MessageQueue) rebroadcastWantlist() { mq.rebroadcastIntervalLk.Unlock() // If some wants were transferred from the rebroadcast list - if mq.transferRebroadcastWants() { + if toRebroadcast := mq.transferRebroadcastWants(); toRebroadcast > 0 { // Send them out mq.sendMessage() + log.Infow("Rebroadcasting wants", "amount", toRebroadcast, "peer", mq.p) } } // Transfer wants from the rebroadcast lists into the pending lists. -func (mq *MessageQueue) transferRebroadcastWants() bool { +func (mq *MessageQueue) transferRebroadcastWants() int { mq.wllock.Lock() defer mq.wllock.Unlock() - // Check if there are any wants to rebroadcast - if mq.bcstWants.sent.Len() == 0 && mq.peerWants.sent.Len() == 0 { - return false - } - - // Copy sent wants into pending wants lists - mq.bcstWants.pending.Absorb(mq.bcstWants.sent) - mq.peerWants.pending.Absorb(mq.peerWants.sent) + mq.rebroadcastIntervalLk.Lock() + rebroadcastInterval := mq.rebroadcastInterval + mq.rebroadcastIntervalLk.Unlock() - return true + now := mq.clock.Now() + // Transfer sent wants into pending wants lists + transferred := mq.bcstWants.Refresh(now, rebroadcastInterval) + transferred += mq.peerWants.Refresh(now, rebroadcastInterval) + return transferred } func (mq *MessageQueue) signalWorkReady() { diff --git a/bitswap/client/wantlist/wantlist.go b/bitswap/client/wantlist/wantlist.go index 6cb71eecc..245085af9 100644 --- a/bitswap/client/wantlist/wantlist.go +++ b/bitswap/client/wantlist/wantlist.go @@ -130,13 +130,3 @@ func (w *Wantlist) Entries() []Entry { w.cached = es return es[0:len(es):len(es)] } - -// Absorb all the entries in other into this want list -func (w *Wantlist) Absorb(other *Wantlist) { - // Invalidate the cache up-front to avoid doing any work trying to keep it up-to-date. - w.cached = nil - - for _, e := range other.Entries() { - w.Add(e.Cid, e.Priority, e.WantType) - } -} diff --git a/bitswap/client/wantlist/wantlist_test.go b/bitswap/client/wantlist/wantlist_test.go index 07d4ce415..901fe0d67 100644 --- a/bitswap/client/wantlist/wantlist_test.go +++ b/bitswap/client/wantlist/wantlist_test.go @@ -157,52 +157,6 @@ func TestAddBlockThenRemoveAny(t *testing.T) { } } -func TestAbsort(t *testing.T) { - wl := New() - wl.Add(testcids[0], 5, pb.Message_Wantlist_Block) - wl.Add(testcids[1], 4, pb.Message_Wantlist_Have) - wl.Add(testcids[2], 3, pb.Message_Wantlist_Have) - - wl2 := New() - wl2.Add(testcids[0], 2, pb.Message_Wantlist_Have) - wl2.Add(testcids[1], 1, pb.Message_Wantlist_Block) - - wl.Absorb(wl2) - - e, ok := wl.Contains(testcids[0]) - if !ok { - t.Fatal("expected to have ", testcids[0]) - } - if e.Priority != 5 { - t.Fatal("expected priority 5") - } - if e.WantType != pb.Message_Wantlist_Block { - t.Fatal("expected type ", pb.Message_Wantlist_Block) - } - - e, ok = wl.Contains(testcids[1]) - if !ok { - t.Fatal("expected to have ", testcids[1]) - } - if e.Priority != 1 { - t.Fatal("expected priority 1") - } - if e.WantType != pb.Message_Wantlist_Block { - t.Fatal("expected type ", pb.Message_Wantlist_Block) - } - - e, ok = wl.Contains(testcids[2]) - if !ok { - t.Fatal("expected to have ", testcids[2]) - } - if e.Priority != 3 { - t.Fatal("expected priority 3") - } - if e.WantType != pb.Message_Wantlist_Have { - t.Fatal("expected type ", pb.Message_Wantlist_Have) - } -} - func TestSortEntries(t *testing.T) { wl := New()