Skip to content

Commit

Permalink
Send hash requests for missing v2 hashes
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Feb 29, 2024
1 parent 3d1adc6 commit 7761868
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 15 deletions.
4 changes: 4 additions & 0 deletions merkle/merkle.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,7 @@ func CompactLayerToSliceHashes(compactLayer string) (hashes [][sha256.Size]byte,
func RoundUpToPowerOfTwo(n uint) (ret uint) {
return 1 << bits.Len(n-1)
}

func Log2RoundingUp(n uint) (ret uint) {
return uint(bits.Len(n - 1))
}
10 changes: 0 additions & 10 deletions misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,6 @@ func maxInt(as ...int) int {
return ret
}

func min(as ...int64) int64 {
ret := as[0]
for _, a := range as[1:] {
if a < ret {
ret = a
}
}
return ret
}

func minInt(as ...int) int {
ret := as[0]
for _, a := range as[1:] {
Expand Down
31 changes: 26 additions & 5 deletions peer_protocol/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,20 @@ import (
)

// This is a lazy union representing all the possible fields for messages. Go doesn't have ADTs, and
// I didn't choose to use type-assertions.
// I didn't choose to use type-assertions. Fields are ordered to minimize struct size and padding.
type Message struct {
Keepalive bool
Type MessageType
Index, Begin, Length Integer
PiecesRoot [32]byte
Piece []byte
Bitfield []bool
ExtendedID ExtensionNumber
ExtendedPayload []byte
Hashes [][32]byte
Index, Begin, Length Integer
BaseLayer Integer
ProofLayers Integer
Port uint16
Type MessageType
ExtendedID ExtensionNumber
Keepalive bool
}

var _ interface {
Expand Down Expand Up @@ -58,7 +62,21 @@ func (msg Message) MustMarshalBinary() []byte {
}

func (msg Message) MarshalBinary() (data []byte, err error) {
// It might look like you could have a pool of buffers and preallocate the message length
// prefix, but because we have to return []byte, it becomes non-trivial to make this fast. You
// will need a benchmark.
var buf bytes.Buffer
mustWrite := func(data any) {
err := binary.Write(&buf, binary.BigEndian, data)
if err != nil {
panic(err)
}
}
writeConsecutive := func(data ...any) {
for _, d := range data {
mustWrite(d)
}
}
if !msg.Keepalive {
err = buf.WriteByte(byte(msg.Type))
if err != nil {
Expand Down Expand Up @@ -99,6 +117,9 @@ func (msg Message) MarshalBinary() (data []byte, err error) {
_, err = buf.Write(msg.ExtendedPayload)
case Port:
err = binary.Write(&buf, binary.BigEndian, msg.Port)
case HashRequest:
buf.Write(msg.PiecesRoot[:])
writeConsecutive(msg.BaseLayer, msg.Index, msg.Length, msg.ProofLayers)
default:
err = fmt.Errorf("unknown message type: %v", msg.Type)
}
Expand Down
74 changes: 74 additions & 0 deletions peerconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"errors"
"fmt"
"github.com/anacrolix/torrent/merkle"
"io"
"math/rand"
"net"
Expand Down Expand Up @@ -79,6 +80,8 @@ type PeerConn struct {
peerRequestDataAllocLimiter alloclim.Limiter

outstandingHolepunchingRendezvous map[netip.AddrPort]struct{}

sentHashRequests map[hashRequest]struct{}
}

func (cn *PeerConn) pexStatus() string {
Expand Down Expand Up @@ -336,6 +339,7 @@ func (cn *PeerConn) fillWriteBuffer() {
// knowledge of write buffers.
return
}
cn.requestMissingHashes()
cn.maybeUpdateActualRequestState()
if cn.pex.IsEnabled() {
if flow := cn.pex.Share(cn.write); !flow {
Expand Down Expand Up @@ -1214,3 +1218,73 @@ func (pc *PeerConn) WriteExtendedMessage(extName pp.ExtensionName, payload []byt
})
return nil
}

func (pc *PeerConn) requestMissingHashes() {
if !pc.t.haveInfo() {
return
}
info := pc.t.info
baseLayer := pp.Integer(merkle.Log2RoundingUp(merkle.RoundUpToPowerOfTwo(
uint((pc.t.usualPieceSize() + merkle.BlockSize - 1) / merkle.BlockSize)),
))
for _, file := range info.UpvertedFiles() {
piecesRoot := file.PiecesRoot.Unwrap()
fileNumPieces := int((file.Length + info.PieceLength - 1) / info.PieceLength)
proofLayers := pp.Integer(0)
// We would be requesting the leaves, the file must be short enough that we can just do with
// the pieces root as the piece hash.
if fileNumPieces <= 1 {
continue
}
for index := 0; index < fileNumPieces; index += 512 {
// Minimizing to the number of pieces in a file conflicts with the BEP.
length := merkle.RoundUpToPowerOfTwo(uint(min(512, fileNumPieces-index)))
if length < 2 {
// This should have been filtered out by baseLayer and pieces root as piece hash
// checks.
panic(length)
}
msg := pp.Message{
Type: pp.HashRequest,
PiecesRoot: piecesRoot,
BaseLayer: baseLayer,
Index: pp.Integer(index),
Length: pp.Integer(length),
ProofLayers: proofLayers,
}
hr := hashRequestFromMessage(msg)
if generics.MapContains(pc.sentHashRequests, hr) {
continue
}
pc.write(msg)
generics.MakeMapIfNil(&pc.sentHashRequests)
pc.sentHashRequests[hr] = struct{}{}
}
}
}

type hashRequest struct {
piecesRoot [32]byte
baseLayer, index, length, proofLayers pp.Integer
}

func (hr hashRequest) toMessage() pp.Message {
return pp.Message{
Type: pp.HashRequest,
PiecesRoot: hr.piecesRoot,
BaseLayer: hr.baseLayer,
Index: hr.index,
Length: hr.length,
ProofLayers: hr.proofLayers,
}
}

func hashRequestFromMessage(m pp.Message) hashRequest {
return hashRequest{
piecesRoot: m.PiecesRoot,
baseLayer: m.BaseLayer,
index: m.Index,
length: m.Length,
proofLayers: m.ProofLayers,
}
}
3 changes: 3 additions & 0 deletions piecestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ type PieceState struct {

// Some of the piece has been obtained.
Partial bool

// The v2 hash for the piece layer is missing.
MissingPieceLayerHash bool
}

// Represents a series of consecutive pieces with the same state.
Expand Down
12 changes: 12 additions & 0 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,12 @@ func (t *Torrent) makePieces() {
if numFiles != 1 {
panic(fmt.Sprintf("%v:%v", beginFile, endFile))
}
if t.info.HasV2() {
file := piece.mustGetOnlyFile()
if file.numPieces() == 1 {
piece.hashV2.Set(file.piecesRoot.Unwrap())
}
}
}
}
}
Expand Down Expand Up @@ -647,6 +653,9 @@ func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) {
if !ret.Complete && t.piecePartiallyDownloaded(index) {
ret.Partial = true
}
if t.info.HasV2() && !p.hashV2.Ok {
ret.MissingPieceLayerHash = true
}
return
}

Expand Down Expand Up @@ -745,6 +754,9 @@ func (psr PieceStateRun) String() (ret string) {
if !psr.Ok {
ret += "?"
}
if psr.MissingPieceLayerHash {
ret += "h"
}
return
}

Expand Down

0 comments on commit 7761868

Please sign in to comment.