diff --git a/merkle/merkle.go b/merkle/merkle.go index ab54af6a2b..171555510d 100644 --- a/merkle/merkle.go +++ b/merkle/merkle.go @@ -47,3 +47,7 @@ func CompactLayerToSliceHashes(compactLayer string) (hashes [][sha256.Size]byte, func RoundUpToPowerOfTwo(n uint) (ret uint) { return 1 << bits.Len(n-1) } + +func Log2RoundingUp(n uint) (ret uint) { + return uint(bits.Len(n - 1)) +} diff --git a/misc.go b/misc.go index 8f82c2a0f2..42c516f0ea 100644 --- a/misc.go +++ b/misc.go @@ -157,16 +157,6 @@ func maxInt(as ...int) int { return ret } -func min(as ...int64) int64 { - ret := as[0] - for _, a := range as[1:] { - if a < ret { - ret = a - } - } - return ret -} - func minInt(as ...int) int { ret := as[0] for _, a := range as[1:] { diff --git a/peer_protocol/msg.go b/peer_protocol/msg.go index f1b1f10e83..b08bb5380e 100644 --- a/peer_protocol/msg.go +++ b/peer_protocol/msg.go @@ -9,16 +9,20 @@ import ( ) // This is a lazy union representing all the possible fields for messages. Go doesn't have ADTs, and -// I didn't choose to use type-assertions. +// I didn't choose to use type-assertions. Fields are ordered to minimize struct size and padding. type Message struct { - Keepalive bool - Type MessageType - Index, Begin, Length Integer + PiecesRoot [32]byte Piece []byte Bitfield []bool - ExtendedID ExtensionNumber ExtendedPayload []byte + Hashes [][32]byte + Index, Begin, Length Integer + BaseLayer Integer + ProofLayers Integer Port uint16 + Type MessageType + ExtendedID ExtensionNumber + Keepalive bool } var _ interface { @@ -58,7 +62,21 @@ func (msg Message) MustMarshalBinary() []byte { } func (msg Message) MarshalBinary() (data []byte, err error) { + // It might look like you could have a pool of buffers and preallocate the message length + // prefix, but because we have to return []byte, it becomes non-trivial to make this fast. You + // will need a benchmark. var buf bytes.Buffer + mustWrite := func(data any) { + err := binary.Write(&buf, binary.BigEndian, data) + if err != nil { + panic(err) + } + } + writeConsecutive := func(data ...any) { + for _, d := range data { + mustWrite(d) + } + } if !msg.Keepalive { err = buf.WriteByte(byte(msg.Type)) if err != nil { @@ -99,6 +117,9 @@ func (msg Message) MarshalBinary() (data []byte, err error) { _, err = buf.Write(msg.ExtendedPayload) case Port: err = binary.Write(&buf, binary.BigEndian, msg.Port) + case HashRequest: + buf.Write(msg.PiecesRoot[:]) + writeConsecutive(msg.BaseLayer, msg.Index, msg.Length, msg.ProofLayers) default: err = fmt.Errorf("unknown message type: %v", msg.Type) } diff --git a/peerconn.go b/peerconn.go index 59a31ad4a7..74e5e757e6 100644 --- a/peerconn.go +++ b/peerconn.go @@ -6,6 +6,7 @@ import ( "context" "errors" "fmt" + "github.com/anacrolix/torrent/merkle" "io" "math/rand" "net" @@ -79,6 +80,8 @@ type PeerConn struct { peerRequestDataAllocLimiter alloclim.Limiter outstandingHolepunchingRendezvous map[netip.AddrPort]struct{} + + sentHashRequests map[hashRequest]struct{} } func (cn *PeerConn) pexStatus() string { @@ -336,6 +339,7 @@ func (cn *PeerConn) fillWriteBuffer() { // knowledge of write buffers. return } + cn.requestMissingHashes() cn.maybeUpdateActualRequestState() if cn.pex.IsEnabled() { if flow := cn.pex.Share(cn.write); !flow { @@ -1214,3 +1218,73 @@ func (pc *PeerConn) WriteExtendedMessage(extName pp.ExtensionName, payload []byt }) return nil } + +func (pc *PeerConn) requestMissingHashes() { + if !pc.t.haveInfo() { + return + } + info := pc.t.info + baseLayer := pp.Integer(merkle.Log2RoundingUp(merkle.RoundUpToPowerOfTwo( + uint((pc.t.usualPieceSize() + merkle.BlockSize - 1) / merkle.BlockSize)), + )) + for _, file := range info.UpvertedFiles() { + piecesRoot := file.PiecesRoot.Unwrap() + fileNumPieces := int((file.Length + info.PieceLength - 1) / info.PieceLength) + proofLayers := pp.Integer(0) + // We would be requesting the leaves, the file must be short enough that we can just do with + // the pieces root as the piece hash. + if fileNumPieces <= 1 { + continue + } + for index := 0; index < fileNumPieces; index += 512 { + // Minimizing to the number of pieces in a file conflicts with the BEP. + length := merkle.RoundUpToPowerOfTwo(uint(min(512, fileNumPieces-index))) + if length < 2 { + // This should have been filtered out by baseLayer and pieces root as piece hash + // checks. + panic(length) + } + msg := pp.Message{ + Type: pp.HashRequest, + PiecesRoot: piecesRoot, + BaseLayer: baseLayer, + Index: pp.Integer(index), + Length: pp.Integer(length), + ProofLayers: proofLayers, + } + hr := hashRequestFromMessage(msg) + if generics.MapContains(pc.sentHashRequests, hr) { + continue + } + pc.write(msg) + generics.MakeMapIfNil(&pc.sentHashRequests) + pc.sentHashRequests[hr] = struct{}{} + } + } +} + +type hashRequest struct { + piecesRoot [32]byte + baseLayer, index, length, proofLayers pp.Integer +} + +func (hr hashRequest) toMessage() pp.Message { + return pp.Message{ + Type: pp.HashRequest, + PiecesRoot: hr.piecesRoot, + BaseLayer: hr.baseLayer, + Index: hr.index, + Length: hr.length, + ProofLayers: hr.proofLayers, + } +} + +func hashRequestFromMessage(m pp.Message) hashRequest { + return hashRequest{ + piecesRoot: m.PiecesRoot, + baseLayer: m.BaseLayer, + index: m.Index, + length: m.Length, + proofLayers: m.ProofLayers, + } +} diff --git a/piecestate.go b/piecestate.go index 089adca440..9e67907bd9 100644 --- a/piecestate.go +++ b/piecestate.go @@ -18,6 +18,9 @@ type PieceState struct { // Some of the piece has been obtained. Partial bool + + // The v2 hash for the piece layer is missing. + MissingPieceLayerHash bool } // Represents a series of consecutive pieces with the same state. diff --git a/torrent.go b/torrent.go index 3c925ae577..8a403e16ab 100644 --- a/torrent.go +++ b/torrent.go @@ -416,6 +416,12 @@ func (t *Torrent) makePieces() { if numFiles != 1 { panic(fmt.Sprintf("%v:%v", beginFile, endFile)) } + if t.info.HasV2() { + file := piece.mustGetOnlyFile() + if file.numPieces() == 1 { + piece.hashV2.Set(file.piecesRoot.Unwrap()) + } + } } } } @@ -647,6 +653,9 @@ func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) { if !ret.Complete && t.piecePartiallyDownloaded(index) { ret.Partial = true } + if t.info.HasV2() && !p.hashV2.Ok { + ret.MissingPieceLayerHash = true + } return } @@ -745,6 +754,9 @@ func (psr PieceStateRun) String() (ret string) { if !psr.Ok { ret += "?" } + if psr.MissingPieceLayerHash { + ret += "h" + } return }