diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 1cc2f601a4..de6c9364ad 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -36,7 +36,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: [ '1.22' ] + go-version: [ '1.21', '1.22' ] fail-fast: false steps: - uses: actions/checkout@v4 diff --git a/go.mod b/go.mod index 06b9972db6..401a533c6f 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/anacrolix/torrent -go 1.22 +go 1.21.4 + +toolchain go1.22.5 require ( github.com/RoaringBitmap/roaring v1.2.3 diff --git a/peer.go b/peer.go index 733aa018ef..4174971077 100644 --- a/peer.go +++ b/peer.go @@ -468,6 +468,8 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) { return cn.peerImpl._request(ppReq), nil } +var peerUpdateRequestsPeerCancelReason = "Peer.cancel" + func (me *Peer) cancel(r RequestIndex) { if !me.deleteRequest(r) { panic("request not existing should have been guarded") @@ -480,7 +482,7 @@ func (me *Peer) cancel(r RequestIndex) { } me.decPeakRequests() if me.isLowOnRequests() { - me.updateRequests("Peer.cancel") + me.updateRequests(peerUpdateRequestsPeerCancelReason) } } @@ -566,6 +568,8 @@ func runSafeExtraneous(f func()) { } } +var peerUpdateRequestsRemoteRejectReason = "Peer.remoteRejectedRequest" + // Returns true if it was valid to reject the request. func (c *Peer) remoteRejectedRequest(r RequestIndex) bool { if c.deleteRequest(r) { @@ -574,7 +578,7 @@ func (c *Peer) remoteRejectedRequest(r RequestIndex) bool { return false } if c.isLowOnRequests() { - c.updateRequests("Peer.remoteRejectedRequest") + c.updateRequests(peerUpdateRequestsRemoteRejectReason) } c.decExpectedChunkReceive(r) return true diff --git a/webseed-peer.go b/webseed-peer.go index ca915f3861..e34a8bd692 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -27,6 +27,7 @@ type webseedPeer struct { client webseed.Client activeRequests map[Request]webseed.Request requesterCond sync.Cond + updateRequestor *time.Timer lastUnhandledErr time.Time } @@ -72,7 +73,6 @@ func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec { } func (ws *webseedPeer) _request(r Request) bool { - ws.requesterCond.Signal() return true } @@ -91,15 +91,17 @@ func (ws *webseedPeer) doRequest(r Request) error { func (ws *webseedPeer) requester(i int) { ws.requesterCond.L.Lock() defer ws.requesterCond.L.Unlock() -start: + for !ws.peer.closed.IsSet() { // Restart is set if we don't need to wait for the requestCond before trying again. restart := false + ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool { r := ws.peer.t.requestIndexToRequest(x) if _, ok := ws.activeRequests[r]; ok { return true } + err := ws.doRequest(r) ws.requesterCond.L.Unlock() if err != nil && !errors.Is(err, context.Canceled) { @@ -117,10 +119,38 @@ start: ws.requesterCond.L.Lock() return false }) - if restart { - goto start + + if !restart { + if !ws.peer.t.dataDownloadDisallowed.Bool() && ws.peer.isLowOnRequests() && len(ws.peer.getDesiredRequestState().Requests.requestIndexes) > 0 { + if ws.updateRequestor == nil { + ws.updateRequestor = time.AfterFunc(updateRequestsTimerDuration, func() { requestUpdate(ws) }) + } + } + + ws.requesterCond.Wait() + + if ws.updateRequestor != nil { + ws.updateRequestor.Stop() + ws.updateRequestor = nil + } + } + } +} + +func requestUpdate(ws *webseedPeer) { + if ws != nil { + if !ws.peer.closed.IsSet() { + if len(ws.peer.getDesiredRequestState().Requests.requestIndexes) > 0 { + if ws.peer.isLowOnRequests() { + if time.Since(ws.peer.lastRequestUpdate) > updateRequestsTimerDuration { + ws.peer.updateRequests(peerUpdateRequestsTimerReason) + return + } + } + + ws.requesterCond.Signal() + } } - ws.requesterCond.Wait() } } @@ -142,6 +172,7 @@ func (ws *webseedPeer) handleUpdateRequests() { ws.peer.t.cl.lock() defer ws.peer.t.cl.unlock() ws.peer.maybeUpdateActualRequestState() + ws.requesterCond.Signal() }() }