diff --git a/peer.go b/peer.go index 37d7ca57f0..8550b907f4 100644 --- a/peer.go +++ b/peer.go @@ -654,8 +654,6 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { } } - cl := t.cl - // Do we actually want this chunk? if t.haveChunk(ppReq) { // panic(fmt.Sprintf("%+v", ppReq)) @@ -692,9 +690,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { p.cancel(req) } - err = func() error { - cl.unlock() - defer cl.lock() + go func() { // Opportunistically do this here while we aren't holding the client lock. recordBlockForSmartBan() concurrentChunkWrites.Add(1) @@ -704,9 +700,26 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { // because we want to handle errors synchronously and I haven't thought of a nice way to // defer any concurrency to the storage and have that notify the client of errors. TODO: Do // that instead. - return t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece) + err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece) + c.afterChunkWrite(err, msg) }() + return nil +} + +func (c *Peer) afterChunkWrite(err error, msg *pp.Message) { + t := c.t + cl := t.cl + ppReq := newRequestFromMessage(msg) + req := c.t.requestIndexFromRequest(ppReq) + piece := &t.pieces[ppReq.Index] + + t.putChunkPool(msg.Piece) + concurrentChunkWrites.Add(-1) + + cl.lock() + defer cl.unlock() + piece.decrementPendingWrites() if err != nil { @@ -717,7 +730,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { // fresh update after pending the failed request. c.updateRequests("Peer.receiveChunk error writing chunk") t.onWriteChunkErr(err) - return nil + return } c.onDirtiedPiece(pieceIndex(ppReq.Index)) @@ -734,8 +747,6 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { cl.event.Broadcast() // We do this because we've written a chunk, and may change PieceState.Partial. t.publishPieceStateChange(pieceIndex(ppReq.Index)) - - return nil } func (c *Peer) onDirtiedPiece(piece pieceIndex) { diff --git a/peerconn.go b/peerconn.go index 232da671be..463260a89a 100644 --- a/peerconn.go +++ b/peerconn.go @@ -836,10 +836,9 @@ func (c *PeerConn) mainReadLoop() (err error) { case pp.Piece: c.doChunkReadStats(int64(len(msg.Piece))) err = c.receiveChunk(&msg) - if len(msg.Piece) == int(t.chunkSize) { - t.chunkPool.Put(&msg.Piece) - } if err != nil { + // receiveChunk only does this if it doesn't error. + t.putChunkPool(msg.Piece) err = fmt.Errorf("receiving chunk: %w", err) } case pp.Cancel: diff --git a/torrent.go b/torrent.go index 757a180c7f..2044003a67 100644 --- a/torrent.go +++ b/torrent.go @@ -3175,3 +3175,9 @@ func (t *Torrent) getFileByPiecesRoot(hash [32]byte) *File { } return nil } + +func (t *Torrent) putChunkPool(b []byte) { + if len(b) == int(t.chunkSize) { + t.chunkPool.Put(&b) + } +}