Skip to content

Commit

Permalink
Write received chunks asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Mar 10, 2024
1 parent 3b0a78f commit e0d2eee
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 12 deletions.
29 changes: 20 additions & 9 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,8 +654,6 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
}
}

cl := t.cl

// Do we actually want this chunk?
if t.haveChunk(ppReq) {
// panic(fmt.Sprintf("%+v", ppReq))
Expand Down Expand Up @@ -692,9 +690,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
p.cancel(req)
}

err = func() error {
cl.unlock()
defer cl.lock()
go func() {
// Opportunistically do this here while we aren't holding the client lock.
recordBlockForSmartBan()
concurrentChunkWrites.Add(1)
Expand All @@ -704,9 +700,26 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
// because we want to handle errors synchronously and I haven't thought of a nice way to
// defer any concurrency to the storage and have that notify the client of errors. TODO: Do
// that instead.
return t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
c.afterChunkWrite(err, msg)
}()

return nil
}

func (c *Peer) afterChunkWrite(err error, msg *pp.Message) {
t := c.t
cl := t.cl
ppReq := newRequestFromMessage(msg)
req := c.t.requestIndexFromRequest(ppReq)
piece := &t.pieces[ppReq.Index]

t.putChunkPool(msg.Piece)
concurrentChunkWrites.Add(-1)

cl.lock()
defer cl.unlock()

piece.decrementPendingWrites()

if err != nil {
Expand All @@ -717,7 +730,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
// fresh update after pending the failed request.
c.updateRequests("Peer.receiveChunk error writing chunk")
t.onWriteChunkErr(err)
return nil
return
}

c.onDirtiedPiece(pieceIndex(ppReq.Index))
Expand All @@ -734,8 +747,6 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
cl.event.Broadcast()
// We do this because we've written a chunk, and may change PieceState.Partial.
t.publishPieceStateChange(pieceIndex(ppReq.Index))

return nil
}

func (c *Peer) onDirtiedPiece(piece pieceIndex) {
Expand Down
5 changes: 2 additions & 3 deletions peerconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,10 +836,9 @@ func (c *PeerConn) mainReadLoop() (err error) {
case pp.Piece:
c.doChunkReadStats(int64(len(msg.Piece)))
err = c.receiveChunk(&msg)
if len(msg.Piece) == int(t.chunkSize) {
t.chunkPool.Put(&msg.Piece)
}
if err != nil {
// receiveChunk only does this if it doesn't error.
t.putChunkPool(msg.Piece)
err = fmt.Errorf("receiving chunk: %w", err)
}
case pp.Cancel:
Expand Down
6 changes: 6 additions & 0 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3175,3 +3175,9 @@ func (t *Torrent) getFileByPiecesRoot(hash [32]byte) *File {
}
return nil
}

func (t *Torrent) putChunkPool(b []byte) {
if len(b) == int(t.chunkSize) {
t.chunkPool.Put(&b)
}
}

0 comments on commit e0d2eee

Please sign in to comment.