Skip to content

Commit

Permalink
v2 torrent piece hashing
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Feb 28, 2024
1 parent d03d872 commit e9355d9
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 38 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/anacrolix/dht/v2 v2.19.2-0.20221121215055-066ad8494444
github.com/anacrolix/envpprof v1.3.0
github.com/anacrolix/fuse v0.2.0
github.com/anacrolix/generics v0.0.0-20230911070922-5dd7545c6b13
github.com/anacrolix/generics v0.0.2-0.20240227122613-f95486179cab
github.com/anacrolix/go-libutp v1.3.1
github.com/anacrolix/log v0.14.6-0.20231202035202-ed7a02cad0b4
github.com/anacrolix/missinggo v1.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ github.com/anacrolix/envpprof v1.3.0 h1:WJt9bpuT7A/CDCxPOv/eeZqHWlle/Y0keJUvc6tc
github.com/anacrolix/envpprof v1.3.0/go.mod h1:7QIG4CaX1uexQ3tqd5+BRa/9e2D02Wcertl6Yh0jCB0=
github.com/anacrolix/fuse v0.2.0 h1:pc+To78kI2d/WUjIyrsdqeJQAesuwpGxlI3h1nAv3Do=
github.com/anacrolix/fuse v0.2.0/go.mod h1:Kfu02xBwnySDpH3N23BmrP3MDfwAQGRLUCj6XyeOvBQ=
github.com/anacrolix/generics v0.0.0-20230911070922-5dd7545c6b13 h1:qwOprPTDMM3BASJRf84mmZnTXRsPGGJ8xoHKQS7m3so=
github.com/anacrolix/generics v0.0.0-20230911070922-5dd7545c6b13/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8=
github.com/anacrolix/generics v0.0.2-0.20240227122613-f95486179cab h1:MvuAC/UJtcohN6xWc8zYXSZfllh1LVNepQ0R3BCX5I4=
github.com/anacrolix/generics v0.0.2-0.20240227122613-f95486179cab/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8=
github.com/anacrolix/go-libutp v1.3.1 h1:idJzreNLl+hNjGC3ZnUOjujEaryeOGgkwHLqSGoige0=
github.com/anacrolix/go-libutp v1.3.1/go.mod h1:heF41EC8kN0qCLMokLBVkB8NXiLwx3t8R8810MTNI5o=
github.com/anacrolix/log v0.3.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgwU9jwU=
Expand Down
71 changes: 71 additions & 0 deletions merkle/hash.go
Original file line number Diff line number Diff line change
@@ -1 +1,72 @@
package merkle

import (
"crypto/sha256"
"hash"
)

func NewHash() *Hash {
return &Hash{
nextBlock: sha256.New(),
}
}

type Hash struct {
blocks [][32]byte
nextBlock hash.Hash
written int
}

func (h *Hash) remaining() int {
return BlockSize - h.written
}

func (h *Hash) Write(p []byte) (n int, err error) {
for len(p) > 0 {
var n1 int
n1, err = h.nextBlock.Write(p[:min(len(p), h.remaining())])
n += n1
h.written += n1
p = p[n1:]
if h.remaining() == 0 {
h.blocks = append(h.blocks, h.nextBlockSum())
h.nextBlock.Reset()
h.written = 0
}
if err != nil {
break
}
}
return
}

func (h *Hash) nextBlockSum() (sum [32]byte) {
h.nextBlock.Sum(sum[:0])
return
}

func (h *Hash) Sum(b []byte) []byte {
blocks := h.blocks
if h.written != 0 {
blocks = append(blocks, h.nextBlockSum())
}
n := int(RoundUpToPowerOfTwo(uint(len(blocks))))
blocks = append(blocks, make([][32]byte, n-len(blocks))...)
sum := Root(blocks)
return append(b, sum[:]...)
}

func (h *Hash) Reset() {
h.blocks = h.blocks[:0]
h.nextBlock.Reset()
}

func (h *Hash) Size() int {
return 32
}

func (h *Hash) BlockSize() int {
return h.nextBlock.BlockSize()
}

var _ hash.Hash = (*Hash)(nil)
8 changes: 7 additions & 1 deletion merkle/merkle.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@ import (
"math/bits"
)

// The leaf block size for BitTorrent v2 Merkle trees.
const BlockSize = 1 << 14 // 16KiB

func Root(hashes [][sha256.Size]byte) [sha256.Size]byte {
if len(hashes) <= 1 {
switch len(hashes) {
case 0:
return sha256.Sum256(nil)
case 1:
return hashes[0]
}
numHashes := uint(len(hashes))
Expand Down
5 changes: 3 additions & 2 deletions metainfo/bep52.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ func ValidatePieceLayers(
if !ok {
// BEP 52: "For each file in the file tree that is larger than the piece size it
// contains one string value.". The reference torrent creator in
// https://blog.libtorrent.org/2020/09/bittorrent-v2/ also has this. I'm not sure what
// harm it causes if it's present anyway, possibly it won't be useful to us.
// https://blog.libtorrent.org/2020/09/bittorrent-v2/ also has this. If a file is equal
// to or smaller than the piece length, we can just use the pieces root instead of the
// piece layer hash.
if ft.File.Length > pieceLength {
err = fmt.Errorf("no piece layers for file %q", path)
}
Expand Down
104 changes: 72 additions & 32 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/anacrolix/torrent/merkle"
"github.com/anacrolix/torrent/types/infohash"
infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2"
"hash"
"io"
"math/rand"
"net/netip"
Expand Down Expand Up @@ -423,22 +424,35 @@ func (t *Torrent) AddPieceLayers(layers map[string]string) (err error) {
return
}
compactLayer, ok := layers[string(f.piecesRoot.Value[:])]
if !ok {
continue
}
var hashes [][32]byte
hashes, err = merkle.CompactLayerToSliceHashes(compactLayer)
if err != nil {
err = fmt.Errorf("bad piece layers for file %q: %w", f, err)
return
if ok {
hashes, err = merkle.CompactLayerToSliceHashes(compactLayer)
if err != nil {
err = fmt.Errorf("bad piece layers for file %q: %w", f, err)
return
}
} else if f.length > t.info.PieceLength {
// BEP 52 is pretty strongly worded about this, even though we should be able to
// recover: If a v2 torrent is added by magnet link or infohash, we need to fetch piece
// layers ourselves anyway, and that's how we can recover from this.
t.logger.Levelf(log.Warning, "no piece layers for file %q", f)
continue
} else {
hashes = [][32]byte{f.piecesRoot.Value}
}
if len(hashes) != f.numPieces() {
err = fmt.Errorf("file %q: got %v hashes expected %v", f, len(hashes), f.numPieces())
return
}
for i := range f.numPieces() {
p := t.piece(f.BeginPieceIndex() + i)
p.hashV2.Set(hashes[i])
pi := f.BeginPieceIndex() + i
p := t.piece(pi)
// See Torrent.onSetInfo. We want to trigger an initial check if appropriate, if we
// didn't yet have a piece hash (can occur with v2 when we don't start with piece
// layers).
if !p.hashV2.Set(hashes[i]).Ok && p.hash == nil {
t.queueInitialPieceCheck(pi)
}
}
}
return nil
Expand Down Expand Up @@ -521,10 +535,7 @@ func (t *Torrent) onSetInfo() {
p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i)
t.addRequestOrderPiece(i)
t.updatePieceCompletion(i)
if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
// t.logger.Printf("piece %s completion unknown, queueing check", p)
t.queuePieceCheck(i)
}
t.queueInitialPieceCheck(i)
}
t.cl.event.Broadcast()
close(t.gotMetainfoC)
Expand Down Expand Up @@ -1057,28 +1068,39 @@ func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWr
}

func (t *Torrent) hashPiece(piece pieceIndex) (
ret metainfo.Hash,
correct bool,
// These are peers that sent us blocks that differ from what we hash here.
differingPeers map[bannableAddr]struct{},
err error,
) {
p := t.piece(piece)
p.waitNoPendingWrites()
storagePiece := t.pieces[piece].Storage()

// Does the backend want to do its own hashing?
if i, ok := storagePiece.PieceImpl.(storage.SelfHashing); ok {
var sum metainfo.Hash
// log.Printf("A piece decided to self-hash: %d", piece)
sum, err = i.SelfHash()
missinggo.CopyExact(&ret, sum)
return
storagePiece := p.Storage()

var h hash.Hash
if p.hash != nil {
h = pieceHash.New()

// Does the backend want to do its own hashing?
if i, ok := storagePiece.PieceImpl.(storage.SelfHashing); ok {
var sum metainfo.Hash
// log.Printf("A piece decided to self-hash: %d", piece)
sum, err = i.SelfHash()
correct = sum == *p.hash
// Can't do smart banning without reading the piece. The smartBanCache is still cleared
// in pieceHasher regardless.
return
}

} else if p.hashV2.Ok {
h = merkle.NewHash()
} else {
panic("no hash")
}

hash := pieceHash.New()
const logPieceContents = false
smartBanWriter := t.smartBanBlockCheckingWriter(piece)
writers := []io.Writer{hash, smartBanWriter}
writers := []io.Writer{h, smartBanWriter}
var examineBuf bytes.Buffer
if logPieceContents {
writers = append(writers, &examineBuf)
Expand All @@ -1089,7 +1111,23 @@ func (t *Torrent) hashPiece(piece pieceIndex) (
}
smartBanWriter.Flush()
differingPeers = smartBanWriter.badPeers
missinggo.CopyExact(&ret, hash.Sum(nil))
if p.hash != nil {
var sum [20]byte
n := len(h.Sum(sum[:0]))
if n != 20 {
panic(n)
}
correct = sum == *p.hash
} else if p.hashV2.Ok {
var sum [32]byte
n := len(h.Sum(sum[:0]))
if n != 32 {
panic(n)
}
correct = sum == p.hashV2.Value
} else {
panic("no hash")
}
return
}

Expand Down Expand Up @@ -2169,10 +2207,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
} else {
log.Fmsg(
"piece %d failed hash: %d connections contributed", piece, len(p.dirtiers),
).AddValues(t, p).LogLevel(

log.Debug, t.logger)

).AddValues(t, p).LogLevel(log.Info, t.logger)
pieceHashedNotCorrect.Add(1)
}
}
Expand Down Expand Up @@ -2368,8 +2403,7 @@ func (t *Torrent) dropBannedPeers() {

func (t *Torrent) pieceHasher(index pieceIndex) {
p := t.piece(index)
sum, failedPeers, copyErr := t.hashPiece(index)
correct := sum == *p.hash
correct, failedPeers, copyErr := t.hashPiece(index)
switch copyErr {
case nil, io.EOF:
default:
Expand Down Expand Up @@ -2411,6 +2445,12 @@ func (t *Torrent) peersAsSlice() (ret []*Peer) {
return
}

func (t *Torrent) queueInitialPieceCheck(i pieceIndex) {
if !t.initialPieceCheckDisabled && !t.piece(i).storageCompletionOk {
t.queuePieceCheck(i)
}
}

func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
piece := t.piece(pieceIndex)
if piece.hash == nil && !piece.hashV2.Ok {
Expand Down

0 comments on commit e9355d9

Please sign in to comment.