From f47f9d021978c4eb48df94faaaf2a0d8ab758df7 Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 10 Dec 2019 16:42:14 +0100 Subject: [PATCH 01/16] bmt, param: Introduce SectionHasher interface, implement in bmt --- bmt/bmt.go | 223 +++++++++++++++++++++++++++++++--------- bmt/bmt_test.go | 100 +++++++++++++----- param/hash.go | 7 ++ param/io.go | 19 ++++ storage/chunker_test.go | 3 +- storage/common_test.go | 3 +- storage/hasherstore.go | 5 +- storage/swarmhasher.go | 4 +- storage/types.go | 6 +- 9 files changed, 287 insertions(+), 83 deletions(-) create mode 100644 param/hash.go create mode 100644 param/io.go diff --git a/bmt/bmt.go b/bmt/bmt.go index 18eab5a2bc..d20e2c1aa3 100644 --- a/bmt/bmt.go +++ b/bmt/bmt.go @@ -18,11 +18,17 @@ package bmt import ( + "context" + "encoding/binary" + "errors" "fmt" "hash" "strings" "sync" "sync/atomic" + + "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/param" ) /* @@ -60,6 +66,10 @@ const ( PoolSize = 8 ) +var ( + zeroSpan = make([]byte, 8) +) + // BaseHasherFunc is a hash.Hash constructor function used for the base hash of the BMT. // implemented by Keccak256 SHA3 sha3.NewLegacyKeccak256 type BaseHasherFunc func() hash.Hash @@ -75,8 +85,10 @@ type BaseHasherFunc func() hash.Hash // the tree and itself in a state reusable for hashing a new chunk // - generates and verifies segment inclusion proofs (TODO:) type Hasher struct { - pool *TreePool // BMT resource pool - bmt *tree // prebuilt BMT resource for flowcontrol and proofs + pool *TreePool // BMT resource pool + bmt *tree // prebuilt BMT resource for flowcontrol and proofs + size int // bytes written to Hasher since last Reset() + cursor int // cursor to write to on next Write() call } // New creates a reusable BMT Hasher that @@ -276,14 +288,56 @@ func newTree(segmentSize, depth int, hashfunc func() hash.Hash) *tree { } } -// methods needed to implement hash.Hash +// Implements param.SectionWriter +func (h *Hasher) SetWriter(_ param.SectionWriterFunc) param.SectionWriter { + log.Warn("Synchasher does not currently support SectionWriter chaining") + return h +} + +// Implements param.SectionWriter +func (h *Hasher) SectionSize() int { + return h.pool.SegmentSize +} + +// Implements param.SectionWriter +func (h *Hasher) SetLength(length int) { +} + +// Implements param.SectionWriter +func (h *Hasher) SetSpan(length int) { + span := LengthToSpan(length) + h.getTree().span = span +} + +// Implements storage.SwarmHash +func (h *Hasher) SetSpanBytes(b []byte) { + t := h.getTree() + t.span = make([]byte, 8) + copy(t.span, b) +} + +// Implements param.SectionWriter +func (h *Hasher) Branches() int { + return h.pool.SegmentCount +} + +// Implements param.SectionWriter +func (h *Hasher) Init(_ context.Context, _ func(error)) { +} -// Size returns the size +// Size returns the digest size +// Implements hash.Hash in param.SectionWriter func (h *Hasher) Size() int { return h.pool.SegmentSize } +// Seek sets the section that will be written to on the next Write() +func (h *Hasher) SeekSection(offset int) { + h.cursor = offset +} + // BlockSize returns the block size +// Implements hash.Hash in param.SectionWriter func (h *Hasher) BlockSize() int { return 2 * h.pool.SegmentSize } @@ -293,31 +347,35 @@ func (h *Hasher) BlockSize() int { // hash.Hash interface Sum method appends the byte slice to the underlying // data before it calculates and returns the hash of the chunk // caller must make sure Sum is not called concurrently with Write, writeSection +// Implements hash.Hash in param.SectionWriter func (h *Hasher) Sum(b []byte) (s []byte) { t := h.getTree() + if h.size == 0 && t.offset == 0 { + h.releaseTree() + return h.pool.zerohashes[h.pool.Depth] + } // write the last section with final flag set to true go h.writeSection(t.cursor, t.section, true, true) // wait for the result s = <-t.result + if t.span == nil { + t.span = LengthToSpan(h.size) + } span := t.span // release the tree resource back to the pool h.releaseTree() - // b + sha3(span + BMT(pure_chunk)) - if len(span) == 0 { - return append(b, s...) - } return doSum(h.pool.hasher(), b, span, s) } -// methods needed to implement the SwarmHash and the io.Writer interfaces - // Write calls sequentially add to the buffer to be hashed, // with every full segment calls writeSection in a go routine +// Implements hash.Hash (io.Writer) in param.SectionWriter func (h *Hasher) Write(b []byte) (int, error) { l := len(b) if l == 0 || l > h.pool.Size { return 0, nil } + h.size += len(b) t := h.getTree() secsize := 2 * h.pool.SegmentSize // calculate length of missing bit to complete current open section @@ -359,20 +417,13 @@ func (h *Hasher) Write(b []byte) (int, error) { } // Reset needs to be called before writing to the hasher +// Implements hash.Hash in param.SectionWriter func (h *Hasher) Reset() { + h.cursor = 0 + h.size = 0 h.releaseTree() } -// methods needed to implement the SwarmHash interface - -// ResetWithLength needs to be called before writing to the hasher -// the argument is supposed to be the byte slice binary representation of -// the length of the data subsumed under the hash, i.e., span -func (h *Hasher) ResetWithLength(span []byte) { - h.Reset() - h.getTree().span = span -} - // releaseTree gives back the Tree to the pool whereby it unlocks // it resets tree, segment and index func (h *Hasher) releaseTree() { @@ -395,30 +446,30 @@ func (h *Hasher) releaseTree() { } // NewAsyncWriter extends Hasher with an interface for concurrent segment/section writes +// TODO: Instead of explicitly setting double size of segment should be dynamic and chunked internally. If not, we have to keep different bmt hashers generation functions for different purposes in the same instance, or cope with added complexity of bmt hasher generation functions having to receive parameters func (h *Hasher) NewAsyncWriter(double bool) *AsyncHasher { secsize := h.pool.SegmentSize if double { secsize *= 2 } + seccount := h.pool.SegmentCount + if double { + seccount /= 2 + } write := func(i int, section []byte, final bool) { h.writeSection(i, section, double, final) } return &AsyncHasher{ - Hasher: h, - double: double, - secsize: secsize, - write: write, + Hasher: h, + double: double, + secsize: secsize, + seccount: seccount, + write: write, + jobSize: 0, + sought: true, } } -// SectionWriter is an asynchronous segment/section writer interface -type SectionWriter interface { - Reset() // standard init to be called before reuse - Write(index int, data []byte) // write into section of index - Sum(b []byte, length int, span []byte) []byte // returns the hash of the buffer - SectionSize() int // size of the async section unit to use -} - // AsyncHasher extends BMT Hasher with an asynchronous segment/section writer interface // AsyncHasher is unsafe and does not check indexes and section data lengths // it must be used with the right indexes and length and the right number of sections @@ -434,33 +485,94 @@ type SectionWriter interface { // * it will not leak processes if not all sections are written but it blocks // and keeps the resource which can be released calling Reset() type AsyncHasher struct { - *Hasher // extends the Hasher - mtx sync.Mutex // to lock the cursor access - double bool // whether to use double segments (call Hasher.writeSection) - secsize int // size of base section (size of hash or double) - write func(i int, section []byte, final bool) + *Hasher // extends the Hasher + mtx sync.Mutex // to lock the cursor access + double bool // whether to use double segments (call Hasher.writeSection) + secsize int // size of base section (size of hash or double) + seccount int // base section count + write func(i int, section []byte, final bool) + errFunc func(error) + all bool // if all written in one go, temporary workaround + sought bool + jobSize int } -// methods needed to implement AsyncWriter +// Implements param.SectionWriter +// TODO context should be implemented all across (ie original TODO in TreePool.reserve()) +func (sw *AsyncHasher) Init(_ context.Context, errFunc func(error)) { + sw.errFunc = errFunc +} + +// Implements param.SectionWriter +func (sw *AsyncHasher) Reset() { + sw.sought = true + sw.jobSize = 0 + sw.all = false + sw.Hasher.Reset() +} + +func (sw *AsyncHasher) SetLength(length int) { + sw.jobSize = length +} + +// Implements param.SectionWriter +func (sw *AsyncHasher) SetWriter(_ param.SectionWriterFunc) param.SectionWriter { + sw.errFunc(errors.New("Asynchasher does not currently support SectionWriter chaining")) + return sw +} // SectionSize returns the size of async section unit to use +// Implements param.SectionWriter func (sw *AsyncHasher) SectionSize() int { return sw.secsize } +// DigestSize returns the branching factor, which is equivalent to the size of the BMT input +// Implements param.SectionWriter +func (sw *AsyncHasher) Branches() int { + return sw.seccount +} + +// SeekSection sets the cursor where the next Write() will write +// It locks the cursor until Write() is called; if no Write() is called, it will hang. +// Implements param.SectionWriter +func (sw *AsyncHasher) SeekSection(offset int) { + sw.mtx.Lock() + sw.Hasher.SeekSection(offset) +} + +// Write writes to the current position cursor of the Hasher +// The cursor must first be manually set with SeekSection() +// The method will NOT advance the cursor. +// Implements hash.hash in param.SectionWriter +func (sw *AsyncHasher) Write(section []byte) (int, error) { + defer sw.mtx.Unlock() + sw.Hasher.size += len(section) + return sw.writeSection(sw.Hasher.cursor, section) +} + // Write writes the i-th section of the BMT base // this function can and is meant to be called concurrently // it sets max segment threadsafely -func (sw *AsyncHasher) Write(i int, section []byte) { - sw.mtx.Lock() - defer sw.mtx.Unlock() +func (sw *AsyncHasher) writeSection(i int, section []byte) (int, error) { + // TODO: Temporary workaround for chunkwise write + if i < 0 { + sw.Hasher.cursor = 0 + sw.Hasher.Reset() + sw.Hasher.SetLength(len(section)) + sw.Hasher.Write(section) + sw.all = true + return len(section), nil + } + //sw.mtx.Lock() // this lock is now set in SeekSection + // defer sw.mtk.Unlock() // this unlock is still left in Write() t := sw.getTree() // cursor keeps track of the rightmost section written so far // if index is lower than cursor then just write non-final section as is if i < t.cursor { // if index is not the rightmost, safe to write section go sw.write(i, section, false) - return + return len(section), nil } // if there is a previous rightmost section safe to write section if t.offset > 0 { @@ -470,7 +582,7 @@ func (sw *AsyncHasher) Write(i int, section []byte) { t.section = make([]byte, sw.secsize) copy(t.section, section) go sw.write(i, t.section, true) - return + return len(section), nil } // the rightmost section just changed, so we write the previous one as non-final go sw.write(t.cursor, t.section, false) @@ -481,6 +593,7 @@ func (sw *AsyncHasher) Write(i int, section []byte) { t.offset = i*sw.secsize + 1 t.section = make([]byte, sw.secsize) copy(t.section, section) + return len(section), nil } // Sum can be called any time once the length and the span is known @@ -492,12 +605,20 @@ func (sw *AsyncHasher) Write(i int, section []byte) { // length: known length of the input (unsafe; undefined if out of range) // meta: metadata to hash together with BMT root for the final digest // e.g., span for protection against existential forgery -func (sw *AsyncHasher) Sum(b []byte, length int, meta []byte) (s []byte) { +// +// Implements hash.hash in param.SectionWriter +func (sw *AsyncHasher) Sum(b []byte) (s []byte) { + if sw.all { + return sw.Hasher.Sum(nil) + } sw.mtx.Lock() t := sw.getTree() + length := sw.jobSize if length == 0 { + sw.releaseTree() sw.mtx.Unlock() s = sw.pool.zerohashes[sw.pool.Depth] + return } else { // for non-zero input the rightmost section is written to the tree asynchronously // if the actual last section has been written (t.cursor == length/t.secsize) @@ -515,15 +636,13 @@ func (sw *AsyncHasher) Sum(b []byte, length int, meta []byte) (s []byte) { } // relesase the tree back to the pool sw.releaseTree() - // if no meta is given just append digest to b - if len(meta) == 0 { - return append(b, s...) - } + meta := t.span // hash together meta and BMT root hash using the pools return doSum(sw.pool.hasher(), b, meta, s) } // writeSection writes the hash of i-th section into level 1 node of the BMT tree +// TODO: h.size increases even on multiple writes to the same section of a section func (h *Hasher) writeSection(i int, section []byte, double bool, final bool) { // select the leaf node for the section var n *node @@ -688,3 +807,11 @@ func calculateDepthFor(n int) (d int) { } return d + 1 } + +// creates a binary span size representation +// to pass to bmt.SectionWriter +func LengthToSpan(length int) []byte { + spanBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(spanBytes, uint64(length)) + return spanBytes +} diff --git a/bmt/bmt_test.go b/bmt/bmt_test.go index fc020eb7c2..1cfd611a22 100644 --- a/bmt/bmt_test.go +++ b/bmt/bmt_test.go @@ -26,10 +26,15 @@ import ( "testing" "time" + "github.com/ethersphere/swarm/param" "github.com/ethersphere/swarm/testutil" "golang.org/x/crypto/sha3" ) +func init() { + testutil.Init() +} + // the actual data length generated (could be longer than max datalength of the BMT) const BufferSize = 4128 @@ -141,10 +146,10 @@ func TestHasherEmptyData(t *testing.T) { defer pool.Drain(0) bmt := New(pool) rbmt := NewRefHasher(hasher, count) - refHash := rbmt.Hash(data) - expHash := syncHash(bmt, nil, data) - if !bytes.Equal(expHash, refHash) { - t.Fatalf("hash mismatch with reference. expected %x, got %x", refHash, expHash) + expHash := rbmt.Hash(data) + resHash := syncHash(bmt, 0, data) + if !bytes.Equal(expHash, resHash) { + t.Fatalf("hash mismatch with reference. expected %x, got %x", resHash, expHash) } }) } @@ -197,15 +202,19 @@ func TestAsyncCorrectness(t *testing.T) { bmt := New(pool) d := data[:n] rbmt := NewRefHasher(hasher, count) - exp := rbmt.Hash(d) - got := syncHash(bmt, nil, d) + expNoMeta := rbmt.Hash(d) + h := hasher() + h.Write(zeroSpan) + h.Write(expNoMeta) + exp := h.Sum(nil) + got := syncHash(bmt, 0, d) if !bytes.Equal(got, exp) { - t.Fatalf("wrong sync hash for datalength %v: expected %x (ref), got %x", n, exp, got) + t.Fatalf("wrong sync hash (syncpart) for datalength %v: expected %x (ref), got %x", n, exp, got) } sw := bmt.NewAsyncWriter(double) - got = asyncHashRandom(sw, nil, d, wh) + got = asyncHashRandom(sw, 0, d, wh) if !bytes.Equal(got, exp) { - t.Fatalf("wrong async hash for datalength %v: expected %x, got %x", n, exp, got) + t.Fatalf("wrong async hash (asyncpart) for datalength %v: expected %x, got %x", n, exp, got) } } }) @@ -288,8 +297,12 @@ func TestBMTWriterBuffers(t *testing.T) { bmt := New(pool) data := testutil.RandomBytes(1, n) rbmt := NewRefHasher(hasher, count) - refHash := rbmt.Hash(data) - expHash := syncHash(bmt, nil, data) + refNoMetaHash := rbmt.Hash(data) + h := hasher() + h.Write(zeroSpan) + h.Write(refNoMetaHash) + refHash := h.Sum(nil) + expHash := syncHash(bmt, 0, data) if !bytes.Equal(expHash, refHash) { t.Fatalf("hash mismatch with reference. expected %x, got %x", refHash, expHash) } @@ -308,6 +321,7 @@ func TestBMTWriterBuffers(t *testing.T) { return fmt.Errorf("incorrect read. expected %v bytes, got %v", buflen, read) } } + bmt.SetSpan(0) hash := bmt.Sum(nil) if !bytes.Equal(hash, expHash) { return fmt.Errorf("hash mismatch. expected %x, got %x", hash, expHash) @@ -346,11 +360,16 @@ func testHasherCorrectness(bmt *Hasher, hasher BaseHasherFunc, d []byte, n, coun if len(d) < n { n = len(d) } - binary.BigEndian.PutUint64(span, uint64(n)) + binary.LittleEndian.PutUint64(span, uint64(n)) data := d[:n] rbmt := NewRefHasher(hasher, count) - exp := sha3hash(span, rbmt.Hash(data)) - got := syncHash(bmt, span, data) + var exp []byte + if n == 0 { + exp = bmt.pool.zerohashes[bmt.pool.Depth] + } else { + exp = sha3hash(span, rbmt.Hash(data)) + } + got := syncHash(bmt, n, data) if !bytes.Equal(got, exp) { return fmt.Errorf("wrong hash: expected %x, got %x", exp, got) } @@ -460,7 +479,7 @@ func benchmarkBMT(t *testing.B, n int) { t.ReportAllocs() t.ResetTimer() for i := 0; i < t.N; i++ { - syncHash(bmt, nil, data) + syncHash(bmt, 0, data) } } @@ -478,7 +497,7 @@ func benchmarkBMTAsync(t *testing.B, n int, wh whenHash, double bool) { t.ReportAllocs() t.ResetTimer() for i := 0; i < t.N; i++ { - asyncHash(bmt, nil, n, wh, idxs, segments) + asyncHash(bmt, 0, n, wh, idxs, segments) } } @@ -498,7 +517,7 @@ func benchmarkPool(t *testing.B, poolsize, n int) { go func() { defer wg.Done() bmt := New(pool) - syncHash(bmt, nil, data) + syncHash(bmt, 0, data) }() } wg.Wait() @@ -519,8 +538,9 @@ func benchmarkRefHasher(t *testing.B, n int) { } // Hash hashes the data and the span using the bmt hasher -func syncHash(h *Hasher, span, data []byte) []byte { - h.ResetWithLength(span) +func syncHash(h *Hasher, spanLength int, data []byte) []byte { + h.Reset() + h.SetSpan(spanLength) h.Write(data) return h.Sum(nil) } @@ -547,23 +567,27 @@ func splitAndShuffle(secsize int, data []byte) (idxs []int, segments [][]byte) { } // splits the input data performs a random shuffle to mock async section writes -func asyncHashRandom(bmt SectionWriter, span []byte, data []byte, wh whenHash) (s []byte) { +func asyncHashRandom(bmt param.SectionWriter, spanLength int, data []byte, wh whenHash) (s []byte) { idxs, segments := splitAndShuffle(bmt.SectionSize(), data) - return asyncHash(bmt, span, len(data), wh, idxs, segments) + return asyncHash(bmt, spanLength, len(data), wh, idxs, segments) } -// mock for async section writes for BMT SectionWriter +// mock for async section writes for param.SectionWriter // requires a permutation (a random shuffle) of list of all indexes of segments // and writes them in order to the appropriate section // the Sum function is called according to the wh parameter (first, last, random [relative to segment writes]) -func asyncHash(bmt SectionWriter, span []byte, l int, wh whenHash, idxs []int, segments [][]byte) (s []byte) { +func asyncHash(bmt param.SectionWriter, spanLength int, l int, wh whenHash, idxs []int, segments [][]byte) (s []byte) { bmt.Reset() if l == 0 { - return bmt.Sum(nil, l, span) + bmt.SetLength(l) + bmt.SetSpan(spanLength) + return bmt.Sum(nil) } c := make(chan []byte, 1) hashf := func() { - c <- bmt.Sum(nil, l, span) + bmt.SetLength(l) + bmt.SetSpan(spanLength) + c <- bmt.Sum(nil) } maxsize := len(idxs) var r int @@ -571,13 +595,35 @@ func asyncHash(bmt SectionWriter, span []byte, l int, wh whenHash, idxs []int, s r = rand.Intn(maxsize) } for i, idx := range idxs { - bmt.Write(idx, segments[idx]) + bmt.SeekSection(idx) + bmt.Write(segments[idx]) if (wh == first || wh == random) && i == r { go hashf() } } if wh == last { - return bmt.Sum(nil, l, span) + bmt.SetLength(l) + bmt.SetSpan(spanLength) + return bmt.Sum(nil) } return <-c } + +// TestUseSyncAsOrdinaryHasher verifies that the bmt.Hasher can be used with the hash.Hash interface +func TestUseSyncAsOrdinaryHasher(t *testing.T) { + hasher := sha3.NewLegacyKeccak256 + pool := NewTreePool(hasher, segmentCount, PoolSize) + bmt := New(pool) + bmt.Write([]byte("foo")) + res := bmt.Sum(nil) + refh := NewRefHasher(hasher, 128) + resh := refh.Hash([]byte("foo")) + hsub := hasher() + span := LengthToSpan(3) + hsub.Write(span) + hsub.Write(resh) + refRes := hsub.Sum(nil) + if !bytes.Equal(res, refRes) { + t.Fatalf("normalhash; expected %x, got %x", refRes, res) + } +} diff --git a/param/hash.go b/param/hash.go new file mode 100644 index 0000000000..114fa0ad6c --- /dev/null +++ b/param/hash.go @@ -0,0 +1,7 @@ +package param + +import "golang.org/x/crypto/sha3" + +var ( + HashFunc = sha3.NewLegacyKeccak256 +) diff --git a/param/io.go b/param/io.go new file mode 100644 index 0000000000..3eb14bce67 --- /dev/null +++ b/param/io.go @@ -0,0 +1,19 @@ +package param + +import ( + "context" + "hash" +) + +type SectionWriterFunc func(ctx context.Context) SectionWriter + +type SectionWriter interface { + hash.Hash + Init(ctx context.Context, errFunc func(error)) // errFunc is used for asynchronous components to signal error and termination + SetWriter(hashFunc SectionWriterFunc) SectionWriter // chain another SectionWriter the current instance + SeekSection(section int) // sets cursor that next Write() will write to + SetLength(length int) // set total number of bytes that will be written to SectionWriter + SetSpan(length int) // set data span of chunk + SectionSize() int // section size of this SectionWriter + Branches() int // branch factor of this SectionWriter +} diff --git a/storage/chunker_test.go b/storage/chunker_test.go index fd1af937f2..3e1158d13f 100644 --- a/storage/chunker_test.go +++ b/storage/chunker_test.go @@ -151,7 +151,8 @@ func TestSha3ForCorrectness(t *testing.T) { rawSha3Output := rawSha3.Sum(nil) sha3FromMakeFunc := MakeHashFunc(SHA3Hash)() - sha3FromMakeFunc.ResetWithLength(input[:8]) + sha3FromMakeFunc.Reset() + sha3FromMakeFunc.SetSpanBytes(input[:8]) sha3FromMakeFunc.Write(input[8:]) sha3FromMakeFuncOutput := sha3FromMakeFunc.Sum(nil) diff --git a/storage/common_test.go b/storage/common_test.go index a65a686943..e625cd8091 100644 --- a/storage/common_test.go +++ b/storage/common_test.go @@ -151,7 +151,8 @@ func testStoreCorrect(m ChunkStore, n int, t *testing.T) { } hasher := MakeHashFunc(DefaultHash)() data := chunk.Data() - hasher.ResetWithLength(data[:8]) + hasher.Reset() + hasher.SetSpanBytes(data[:8]) hasher.Write(data[8:]) exp := hasher.Sum(nil) if !bytes.Equal(h, exp) { diff --git a/storage/hasherstore.go b/storage/hasherstore.go index 4890219a15..d81ffba5aa 100644 --- a/storage/hasherstore.go +++ b/storage/hasherstore.go @@ -184,8 +184,9 @@ func (h *hasherStore) startWait(ctx context.Context) { func (h *hasherStore) createHash(chunkData ChunkData) Address { hasher := h.hashFunc() - hasher.ResetWithLength(chunkData[:8]) // 8 bytes of length - hasher.Write(chunkData[8:]) // minus 8 []byte length + hasher.Reset() + hasher.SetSpanBytes(chunkData[:8]) // 8 bytes of length + hasher.Write(chunkData[8:]) // minus 8 []byte length return hasher.Sum(nil) } diff --git a/storage/swarmhasher.go b/storage/swarmhasher.go index fae03f0c72..0cbc12556c 100644 --- a/storage/swarmhasher.go +++ b/storage/swarmhasher.go @@ -28,14 +28,14 @@ const ( type SwarmHash interface { hash.Hash - ResetWithLength([]byte) + SetSpanBytes([]byte) } type HashWithLength struct { hash.Hash } -func (h *HashWithLength) ResetWithLength(length []byte) { +func (h *HashWithLength) SetSpanBytes(length []byte) { h.Reset() h.Write(length) } diff --git a/storage/types.go b/storage/types.go index a4b102a62c..9fa258495d 100644 --- a/storage/types.go +++ b/storage/types.go @@ -93,7 +93,8 @@ func GenerateRandomChunk(dataSize int64) Chunk { sdata := make([]byte, dataSize+8) rand.Read(sdata[8:]) binary.LittleEndian.PutUint64(sdata[:8], uint64(dataSize)) - hasher.ResetWithLength(sdata[:8]) + hasher.Reset() + hasher.SetSpanBytes(sdata[:8]) hasher.Write(sdata[8:]) return NewChunk(hasher.Sum(nil), sdata) } @@ -202,7 +203,8 @@ func (v *ContentAddressValidator) Validate(ch Chunk) bool { } hasher := v.Hasher() - hasher.ResetWithLength(data[:8]) + hasher.Reset() + hasher.SetSpanBytes(data[:8]) hasher.Write(data[8:]) hash := hasher.Sum(nil) From ee1ad2cd21265c5c687ba56a38902fd5d0084153 Mon Sep 17 00:00:00 2001 From: nolash Date: Wed, 11 Dec 2019 10:12:41 +0100 Subject: [PATCH 02/16] bmt: Cleanup --- bmt/bmt.go | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/bmt/bmt.go b/bmt/bmt.go index d20e2c1aa3..43bfdda29a 100644 --- a/bmt/bmt.go +++ b/bmt/bmt.go @@ -85,10 +85,12 @@ type BaseHasherFunc func() hash.Hash // the tree and itself in a state reusable for hashing a new chunk // - generates and verifies segment inclusion proofs (TODO:) type Hasher struct { - pool *TreePool // BMT resource pool - bmt *tree // prebuilt BMT resource for flowcontrol and proofs - size int // bytes written to Hasher since last Reset() - cursor int // cursor to write to on next Write() call + pool *TreePool // BMT resource pool + bmt *tree // prebuilt BMT resource for flowcontrol and proofs + size int // bytes written to Hasher since last Reset() + cursor int // cursor to write to on next Write() call + errFunc func(error) + ctx context.Context } // New creates a reusable BMT Hasher that @@ -322,7 +324,9 @@ func (h *Hasher) Branches() int { } // Implements param.SectionWriter -func (h *Hasher) Init(_ context.Context, _ func(error)) { +func (h *Hasher) Init(ctx context.Context, errFunc func(error)) { + h.errFunc = errFunc + h.ctx = ctx } // Size returns the digest size @@ -466,7 +470,6 @@ func (h *Hasher) NewAsyncWriter(double bool) *AsyncHasher { seccount: seccount, write: write, jobSize: 0, - sought: true, } } @@ -493,19 +496,11 @@ type AsyncHasher struct { write func(i int, section []byte, final bool) errFunc func(error) all bool // if all written in one go, temporary workaround - sought bool jobSize int } -// Implements param.SectionWriter -// TODO context should be implemented all across (ie original TODO in TreePool.reserve()) -func (sw *AsyncHasher) Init(_ context.Context, errFunc func(error)) { - sw.errFunc = errFunc -} - // Implements param.SectionWriter func (sw *AsyncHasher) Reset() { - sw.sought = true sw.jobSize = 0 sw.all = false sw.Hasher.Reset() From 679b8110b7d46f5ee42ca33c1f2a1f6290cb2a3b Mon Sep 17 00:00:00 2001 From: nolash Date: Wed, 11 Dec 2019 10:32:39 +0100 Subject: [PATCH 03/16] bmt, param: Improve comments --- bmt/bmt.go | 61 ++++++++++++++++++++++++----------------------------- param/io.go | 2 +- 2 files changed, 28 insertions(+), 35 deletions(-) diff --git a/bmt/bmt.go b/bmt/bmt.go index 43bfdda29a..01f604b143 100644 --- a/bmt/bmt.go +++ b/bmt/bmt.go @@ -296,61 +296,56 @@ func (h *Hasher) SetWriter(_ param.SectionWriterFunc) param.SectionWriter { return h } -// Implements param.SectionWriter +// SectionSize implements param.SectionWriter func (h *Hasher) SectionSize() int { return h.pool.SegmentSize } -// Implements param.SectionWriter +// SetLength implements param.SectionWriter func (h *Hasher) SetLength(length int) { } -// Implements param.SectionWriter +// SetSpan implements param.SectionWriter func (h *Hasher) SetSpan(length int) { span := LengthToSpan(length) h.getTree().span = span } -// Implements storage.SwarmHash +// SetSpanBytes implements storage.SwarmHash func (h *Hasher) SetSpanBytes(b []byte) { t := h.getTree() t.span = make([]byte, 8) copy(t.span, b) } -// Implements param.SectionWriter +// Branches implements param.SectionWriter func (h *Hasher) Branches() int { return h.pool.SegmentCount } -// Implements param.SectionWriter +// Init implements param.SectionWriter func (h *Hasher) Init(ctx context.Context, errFunc func(error)) { h.errFunc = errFunc h.ctx = ctx } -// Size returns the digest size -// Implements hash.Hash in param.SectionWriter +// Size implements hash.Hash and param.SectionWriter func (h *Hasher) Size() int { return h.pool.SegmentSize } -// Seek sets the section that will be written to on the next Write() +// SeekSection implements param.SectionWriter func (h *Hasher) SeekSection(offset int) { h.cursor = offset } -// BlockSize returns the block size -// Implements hash.Hash in param.SectionWriter +// BlockSize implements hash.Hash and param.SectionWriter func (h *Hasher) BlockSize() int { return 2 * h.pool.SegmentSize } // Sum returns the BMT root hash of the buffer // using Sum presupposes sequential synchronous writes (io.Writer interface) -// hash.Hash interface Sum method appends the byte slice to the underlying -// data before it calculates and returns the hash of the chunk -// caller must make sure Sum is not called concurrently with Write, writeSection // Implements hash.Hash in param.SectionWriter func (h *Hasher) Sum(b []byte) (s []byte) { t := h.getTree() @@ -373,7 +368,7 @@ func (h *Hasher) Sum(b []byte) (s []byte) { // Write calls sequentially add to the buffer to be hashed, // with every full segment calls writeSection in a go routine -// Implements hash.Hash (io.Writer) in param.SectionWriter +// Implements hash.Hash and param.SectionWriter func (h *Hasher) Write(b []byte) (int, error) { l := len(b) if l == 0 || l > h.pool.Size { @@ -420,8 +415,7 @@ func (h *Hasher) Write(b []byte) (int, error) { return l, nil } -// Reset needs to be called before writing to the hasher -// Implements hash.Hash in param.SectionWriter +// Reset implements hash.Hash and param.SectionWriter func (h *Hasher) Reset() { h.cursor = 0 h.size = 0 @@ -474,8 +468,9 @@ func (h *Hasher) NewAsyncWriter(double bool) *AsyncHasher { } // AsyncHasher extends BMT Hasher with an asynchronous segment/section writer interface -// AsyncHasher is unsafe and does not check indexes and section data lengths -// it must be used with the right indexes and length and the right number of sections +// AsyncHasher cannot be used as with a hash.Hash interface: It must be used with the +// right indexes and length and the right number of sections +// It is unsafe and does not check indexes and section data lengths // // behaviour is undefined if // * non-final sections are shorter or longer than secsize @@ -499,37 +494,35 @@ type AsyncHasher struct { jobSize int } -// Implements param.SectionWriter +// Reset implements param.SectionWriter func (sw *AsyncHasher) Reset() { sw.jobSize = 0 sw.all = false sw.Hasher.Reset() } +// SetLength implements param.SectionWriter func (sw *AsyncHasher) SetLength(length int) { sw.jobSize = length } -// Implements param.SectionWriter +// SetWriter implements param.SectionWriter func (sw *AsyncHasher) SetWriter(_ param.SectionWriterFunc) param.SectionWriter { sw.errFunc(errors.New("Asynchasher does not currently support SectionWriter chaining")) return sw } -// SectionSize returns the size of async section unit to use -// Implements param.SectionWriter +// SectionSize implements param.SectionWriter func (sw *AsyncHasher) SectionSize() int { return sw.secsize } -// DigestSize returns the branching factor, which is equivalent to the size of the BMT input -// Implements param.SectionWriter +// Branches implements param.SectionWriter func (sw *AsyncHasher) Branches() int { return sw.seccount } -// SeekSection sets the cursor where the next Write() will write -// It locks the cursor until Write() is called; if no Write() is called, it will hang. +// SeekSection locks the cursor until Write() is called; if no Write() is called, it will hang. // Implements param.SectionWriter func (sw *AsyncHasher) SeekSection(offset int) { sw.mtx.Lock() @@ -539,17 +532,17 @@ func (sw *AsyncHasher) SeekSection(offset int) { // Write writes to the current position cursor of the Hasher // The cursor must first be manually set with SeekSection() // The method will NOT advance the cursor. -// Implements hash.hash in param.SectionWriter +// Implements param.SectionWriter func (sw *AsyncHasher) Write(section []byte) (int, error) { defer sw.mtx.Unlock() sw.Hasher.size += len(section) - return sw.writeSection(sw.Hasher.cursor, section) + return sw.WriteSection(sw.Hasher.cursor, section) } -// Write writes the i-th section of the BMT base +// WriteSection writes the i-th section of the BMT base // this function can and is meant to be called concurrently // it sets max segment threadsafely -func (sw *AsyncHasher) writeSection(i int, section []byte) (int, error) { +func (sw *AsyncHasher) WriteSection(i int, section []byte) (int, error) { // TODO: Temporary workaround for chunkwise write if i < 0 { sw.Hasher.cursor = 0 @@ -601,7 +594,7 @@ func (sw *AsyncHasher) writeSection(i int, section []byte) (int, error) { // meta: metadata to hash together with BMT root for the final digest // e.g., span for protection against existential forgery // -// Implements hash.hash in param.SectionWriter +// Implements param.SectionWriter func (sw *AsyncHasher) Sum(b []byte) (s []byte) { if sw.all { return sw.Hasher.Sum(nil) @@ -803,8 +796,8 @@ func calculateDepthFor(n int) (d int) { return d + 1 } -// creates a binary span size representation -// to pass to bmt.SectionWriter +// LengthToSpan creates a binary data span size representation +// It is required for calculating the BMT hash func LengthToSpan(length int) []byte { spanBytes := make([]byte, 8) binary.LittleEndian.PutUint64(spanBytes, uint64(length)) diff --git a/param/io.go b/param/io.go index 3eb14bce67..93917c31aa 100644 --- a/param/io.go +++ b/param/io.go @@ -8,7 +8,7 @@ import ( type SectionWriterFunc func(ctx context.Context) SectionWriter type SectionWriter interface { - hash.Hash + hash.Hash // Write,Sum,Reset,Size,BlockSize Init(ctx context.Context, errFunc func(error)) // errFunc is used for asynchronous components to signal error and termination SetWriter(hashFunc SectionWriterFunc) SectionWriter // chain another SectionWriter the current instance SeekSection(section int) // sets cursor that next Write() will write to From 851cab08223af95f12af26abdaced516eb3f5d49 Mon Sep 17 00:00:00 2001 From: nolash Date: Sun, 15 Dec 2019 18:24:09 +0100 Subject: [PATCH 04/16] Move writer interface til file --- bmt/bmt.go | 6 ------ param/io.go => file/types.go | 1 - param/hash.go | 7 ------- 3 files changed, 14 deletions(-) rename param/io.go => file/types.go (86%) delete mode 100644 param/hash.go diff --git a/bmt/bmt.go b/bmt/bmt.go index 01f604b143..40dfee7459 100644 --- a/bmt/bmt.go +++ b/bmt/bmt.go @@ -323,12 +323,6 @@ func (h *Hasher) Branches() int { return h.pool.SegmentCount } -// Init implements param.SectionWriter -func (h *Hasher) Init(ctx context.Context, errFunc func(error)) { - h.errFunc = errFunc - h.ctx = ctx -} - // Size implements hash.Hash and param.SectionWriter func (h *Hasher) Size() int { return h.pool.SegmentSize diff --git a/param/io.go b/file/types.go similarity index 86% rename from param/io.go rename to file/types.go index 93917c31aa..37ac6aca80 100644 --- a/param/io.go +++ b/file/types.go @@ -9,7 +9,6 @@ type SectionWriterFunc func(ctx context.Context) SectionWriter type SectionWriter interface { hash.Hash // Write,Sum,Reset,Size,BlockSize - Init(ctx context.Context, errFunc func(error)) // errFunc is used for asynchronous components to signal error and termination SetWriter(hashFunc SectionWriterFunc) SectionWriter // chain another SectionWriter the current instance SeekSection(section int) // sets cursor that next Write() will write to SetLength(length int) // set total number of bytes that will be written to SectionWriter diff --git a/param/hash.go b/param/hash.go deleted file mode 100644 index 114fa0ad6c..0000000000 --- a/param/hash.go +++ /dev/null @@ -1,7 +0,0 @@ -package param - -import "golang.org/x/crypto/sha3" - -var ( - HashFunc = sha3.NewLegacyKeccak256 -) From 9f0f874d11ce95c349907319a314d3afb446b27b Mon Sep 17 00:00:00 2001 From: nolash Date: Sun, 15 Dec 2019 18:28:20 +0100 Subject: [PATCH 05/16] file: Move interface from param to file --- bmt/bmt.go | 44 ++++++++++++++++++++++---------------------- bmt/bmt_test.go | 8 ++++---- file/types.go | 2 +- 3 files changed, 27 insertions(+), 27 deletions(-) diff --git a/bmt/bmt.go b/bmt/bmt.go index 40dfee7459..1b148f2645 100644 --- a/bmt/bmt.go +++ b/bmt/bmt.go @@ -27,8 +27,8 @@ import ( "sync" "sync/atomic" + "github.com/ethersphere/swarm/file" "github.com/ethersphere/swarm/log" - "github.com/ethersphere/swarm/param" ) /* @@ -290,22 +290,22 @@ func newTree(segmentSize, depth int, hashfunc func() hash.Hash) *tree { } } -// Implements param.SectionWriter -func (h *Hasher) SetWriter(_ param.SectionWriterFunc) param.SectionWriter { +// Implements file.SectionWriter +func (h *Hasher) SetWriter(_ file.SectionWriterFunc) file.SectionWriter { log.Warn("Synchasher does not currently support SectionWriter chaining") return h } -// SectionSize implements param.SectionWriter +// SectionSize implements file.SectionWriter func (h *Hasher) SectionSize() int { return h.pool.SegmentSize } -// SetLength implements param.SectionWriter +// SetLength implements file.SectionWriter func (h *Hasher) SetLength(length int) { } -// SetSpan implements param.SectionWriter +// SetSpan implements file.SectionWriter func (h *Hasher) SetSpan(length int) { span := LengthToSpan(length) h.getTree().span = span @@ -318,29 +318,29 @@ func (h *Hasher) SetSpanBytes(b []byte) { copy(t.span, b) } -// Branches implements param.SectionWriter +// Branches implements file.SectionWriter func (h *Hasher) Branches() int { return h.pool.SegmentCount } -// Size implements hash.Hash and param.SectionWriter +// Size implements hash.Hash and file.SectionWriter func (h *Hasher) Size() int { return h.pool.SegmentSize } -// SeekSection implements param.SectionWriter +// SeekSection implements file.SectionWriter func (h *Hasher) SeekSection(offset int) { h.cursor = offset } -// BlockSize implements hash.Hash and param.SectionWriter +// BlockSize implements hash.Hash and file.SectionWriter func (h *Hasher) BlockSize() int { return 2 * h.pool.SegmentSize } // Sum returns the BMT root hash of the buffer // using Sum presupposes sequential synchronous writes (io.Writer interface) -// Implements hash.Hash in param.SectionWriter +// Implements hash.Hash in file.SectionWriter func (h *Hasher) Sum(b []byte) (s []byte) { t := h.getTree() if h.size == 0 && t.offset == 0 { @@ -362,7 +362,7 @@ func (h *Hasher) Sum(b []byte) (s []byte) { // Write calls sequentially add to the buffer to be hashed, // with every full segment calls writeSection in a go routine -// Implements hash.Hash and param.SectionWriter +// Implements hash.Hash and file.SectionWriter func (h *Hasher) Write(b []byte) (int, error) { l := len(b) if l == 0 || l > h.pool.Size { @@ -409,7 +409,7 @@ func (h *Hasher) Write(b []byte) (int, error) { return l, nil } -// Reset implements hash.Hash and param.SectionWriter +// Reset implements hash.Hash and file.SectionWriter func (h *Hasher) Reset() { h.cursor = 0 h.size = 0 @@ -488,36 +488,36 @@ type AsyncHasher struct { jobSize int } -// Reset implements param.SectionWriter +// Reset implements file.SectionWriter func (sw *AsyncHasher) Reset() { sw.jobSize = 0 sw.all = false sw.Hasher.Reset() } -// SetLength implements param.SectionWriter +// SetLength implements file.SectionWriter func (sw *AsyncHasher) SetLength(length int) { sw.jobSize = length } -// SetWriter implements param.SectionWriter -func (sw *AsyncHasher) SetWriter(_ param.SectionWriterFunc) param.SectionWriter { +// SetWriter implements file.SectionWriter +func (sw *AsyncHasher) SetWriter(_ file.SectionWriterFunc) file.SectionWriter { sw.errFunc(errors.New("Asynchasher does not currently support SectionWriter chaining")) return sw } -// SectionSize implements param.SectionWriter +// SectionSize implements file.SectionWriter func (sw *AsyncHasher) SectionSize() int { return sw.secsize } -// Branches implements param.SectionWriter +// Branches implements file.SectionWriter func (sw *AsyncHasher) Branches() int { return sw.seccount } // SeekSection locks the cursor until Write() is called; if no Write() is called, it will hang. -// Implements param.SectionWriter +// Implements file.SectionWriter func (sw *AsyncHasher) SeekSection(offset int) { sw.mtx.Lock() sw.Hasher.SeekSection(offset) @@ -526,7 +526,7 @@ func (sw *AsyncHasher) SeekSection(offset int) { // Write writes to the current position cursor of the Hasher // The cursor must first be manually set with SeekSection() // The method will NOT advance the cursor. -// Implements param.SectionWriter +// Implements file.SectionWriter func (sw *AsyncHasher) Write(section []byte) (int, error) { defer sw.mtx.Unlock() sw.Hasher.size += len(section) @@ -588,7 +588,7 @@ func (sw *AsyncHasher) WriteSection(i int, section []byte) (int, error) { // meta: metadata to hash together with BMT root for the final digest // e.g., span for protection against existential forgery // -// Implements param.SectionWriter +// Implements file.SectionWriter func (sw *AsyncHasher) Sum(b []byte) (s []byte) { if sw.all { return sw.Hasher.Sum(nil) diff --git a/bmt/bmt_test.go b/bmt/bmt_test.go index 1cfd611a22..be10b230a9 100644 --- a/bmt/bmt_test.go +++ b/bmt/bmt_test.go @@ -26,7 +26,7 @@ import ( "testing" "time" - "github.com/ethersphere/swarm/param" + "github.com/ethersphere/swarm/file" "github.com/ethersphere/swarm/testutil" "golang.org/x/crypto/sha3" ) @@ -567,16 +567,16 @@ func splitAndShuffle(secsize int, data []byte) (idxs []int, segments [][]byte) { } // splits the input data performs a random shuffle to mock async section writes -func asyncHashRandom(bmt param.SectionWriter, spanLength int, data []byte, wh whenHash) (s []byte) { +func asyncHashRandom(bmt file.SectionWriter, spanLength int, data []byte, wh whenHash) (s []byte) { idxs, segments := splitAndShuffle(bmt.SectionSize(), data) return asyncHash(bmt, spanLength, len(data), wh, idxs, segments) } -// mock for async section writes for param.SectionWriter +// mock for async section writes for file.SectionWriter // requires a permutation (a random shuffle) of list of all indexes of segments // and writes them in order to the appropriate section // the Sum function is called according to the wh parameter (first, last, random [relative to segment writes]) -func asyncHash(bmt param.SectionWriter, spanLength int, l int, wh whenHash, idxs []int, segments [][]byte) (s []byte) { +func asyncHash(bmt file.SectionWriter, spanLength int, l int, wh whenHash, idxs []int, segments [][]byte) (s []byte) { bmt.Reset() if l == 0 { bmt.SetLength(l) diff --git a/file/types.go b/file/types.go index 37ac6aca80..19db8bd326 100644 --- a/file/types.go +++ b/file/types.go @@ -1,4 +1,4 @@ -package param +package file import ( "context" From 7a6e8b27c3dda8296e93a5eae15920ce3462c3f5 Mon Sep 17 00:00:00 2001 From: nolash Date: Sun, 15 Dec 2019 18:31:51 +0100 Subject: [PATCH 06/16] bmt, file: Move asynchasher to file/hasher --- bmt/bmt.go | 187 ------------------------------ bmt/bmt_test.go | 117 ------------------- file/hasher/asynchasher.go | 194 ++++++++++++++++++++++++++++++++ file/hasher/asynchasher_test.go | 128 +++++++++++++++++++++ 4 files changed, 322 insertions(+), 304 deletions(-) create mode 100644 file/hasher/asynchasher.go create mode 100644 file/hasher/asynchasher_test.go diff --git a/bmt/bmt.go b/bmt/bmt.go index 1b148f2645..85d5477471 100644 --- a/bmt/bmt.go +++ b/bmt/bmt.go @@ -20,7 +20,6 @@ package bmt import ( "context" "encoding/binary" - "errors" "fmt" "hash" "strings" @@ -437,192 +436,6 @@ func (h *Hasher) releaseTree() { }() } -// NewAsyncWriter extends Hasher with an interface for concurrent segment/section writes -// TODO: Instead of explicitly setting double size of segment should be dynamic and chunked internally. If not, we have to keep different bmt hashers generation functions for different purposes in the same instance, or cope with added complexity of bmt hasher generation functions having to receive parameters -func (h *Hasher) NewAsyncWriter(double bool) *AsyncHasher { - secsize := h.pool.SegmentSize - if double { - secsize *= 2 - } - seccount := h.pool.SegmentCount - if double { - seccount /= 2 - } - write := func(i int, section []byte, final bool) { - h.writeSection(i, section, double, final) - } - return &AsyncHasher{ - Hasher: h, - double: double, - secsize: secsize, - seccount: seccount, - write: write, - jobSize: 0, - } -} - -// AsyncHasher extends BMT Hasher with an asynchronous segment/section writer interface -// AsyncHasher cannot be used as with a hash.Hash interface: It must be used with the -// right indexes and length and the right number of sections -// It is unsafe and does not check indexes and section data lengths -// -// behaviour is undefined if -// * non-final sections are shorter or longer than secsize -// * if final section does not match length -// * write a section with index that is higher than length/secsize -// * set length in Sum call when length/secsize < maxsec -// -// * if Sum() is not called on a Hasher that is fully written -// a process will block, can be terminated with Reset -// * it will not leak processes if not all sections are written but it blocks -// and keeps the resource which can be released calling Reset() -type AsyncHasher struct { - *Hasher // extends the Hasher - mtx sync.Mutex // to lock the cursor access - double bool // whether to use double segments (call Hasher.writeSection) - secsize int // size of base section (size of hash or double) - seccount int // base section count - write func(i int, section []byte, final bool) - errFunc func(error) - all bool // if all written in one go, temporary workaround - jobSize int -} - -// Reset implements file.SectionWriter -func (sw *AsyncHasher) Reset() { - sw.jobSize = 0 - sw.all = false - sw.Hasher.Reset() -} - -// SetLength implements file.SectionWriter -func (sw *AsyncHasher) SetLength(length int) { - sw.jobSize = length -} - -// SetWriter implements file.SectionWriter -func (sw *AsyncHasher) SetWriter(_ file.SectionWriterFunc) file.SectionWriter { - sw.errFunc(errors.New("Asynchasher does not currently support SectionWriter chaining")) - return sw -} - -// SectionSize implements file.SectionWriter -func (sw *AsyncHasher) SectionSize() int { - return sw.secsize -} - -// Branches implements file.SectionWriter -func (sw *AsyncHasher) Branches() int { - return sw.seccount -} - -// SeekSection locks the cursor until Write() is called; if no Write() is called, it will hang. -// Implements file.SectionWriter -func (sw *AsyncHasher) SeekSection(offset int) { - sw.mtx.Lock() - sw.Hasher.SeekSection(offset) -} - -// Write writes to the current position cursor of the Hasher -// The cursor must first be manually set with SeekSection() -// The method will NOT advance the cursor. -// Implements file.SectionWriter -func (sw *AsyncHasher) Write(section []byte) (int, error) { - defer sw.mtx.Unlock() - sw.Hasher.size += len(section) - return sw.WriteSection(sw.Hasher.cursor, section) -} - -// WriteSection writes the i-th section of the BMT base -// this function can and is meant to be called concurrently -// it sets max segment threadsafely -func (sw *AsyncHasher) WriteSection(i int, section []byte) (int, error) { - // TODO: Temporary workaround for chunkwise write - if i < 0 { - sw.Hasher.cursor = 0 - sw.Hasher.Reset() - sw.Hasher.SetLength(len(section)) - sw.Hasher.Write(section) - sw.all = true - return len(section), nil - } - //sw.mtx.Lock() // this lock is now set in SeekSection - // defer sw.mtk.Unlock() // this unlock is still left in Write() - t := sw.getTree() - // cursor keeps track of the rightmost section written so far - // if index is lower than cursor then just write non-final section as is - if i < t.cursor { - // if index is not the rightmost, safe to write section - go sw.write(i, section, false) - return len(section), nil - } - // if there is a previous rightmost section safe to write section - if t.offset > 0 { - if i == t.cursor { - // i==cursor implies cursor was set by Hash call so we can write section as final one - // since it can be shorter, first we copy it to the padded buffer - t.section = make([]byte, sw.secsize) - copy(t.section, section) - go sw.write(i, t.section, true) - return len(section), nil - } - // the rightmost section just changed, so we write the previous one as non-final - go sw.write(t.cursor, t.section, false) - } - // set i as the index of the righmost section written so far - // set t.offset to cursor*secsize+1 - t.cursor = i - t.offset = i*sw.secsize + 1 - t.section = make([]byte, sw.secsize) - copy(t.section, section) - return len(section), nil -} - -// Sum can be called any time once the length and the span is known -// potentially even before all segments have been written -// in such cases Sum will block until all segments are present and -// the hash for the length can be calculated. -// -// b: digest is appended to b -// length: known length of the input (unsafe; undefined if out of range) -// meta: metadata to hash together with BMT root for the final digest -// e.g., span for protection against existential forgery -// -// Implements file.SectionWriter -func (sw *AsyncHasher) Sum(b []byte) (s []byte) { - if sw.all { - return sw.Hasher.Sum(nil) - } - sw.mtx.Lock() - t := sw.getTree() - length := sw.jobSize - if length == 0 { - sw.releaseTree() - sw.mtx.Unlock() - s = sw.pool.zerohashes[sw.pool.Depth] - return - } else { - // for non-zero input the rightmost section is written to the tree asynchronously - // if the actual last section has been written (t.cursor == length/t.secsize) - maxsec := (length - 1) / sw.secsize - if t.offset > 0 { - go sw.write(t.cursor, t.section, maxsec == t.cursor) - } - // set cursor to maxsec so final section is written when it arrives - t.cursor = maxsec - t.offset = length - result := t.result - sw.mtx.Unlock() - // wait for the result or reset - s = <-result - } - // relesase the tree back to the pool - sw.releaseTree() - meta := t.span - // hash together meta and BMT root hash using the pools - return doSum(sw.pool.hasher(), b, meta, s) -} - // writeSection writes the hash of i-th section into level 1 node of the BMT tree // TODO: h.size increases even on multiple writes to the same section of a section func (h *Hasher) writeSection(i int, section []byte, double bool, final bool) { diff --git a/bmt/bmt_test.go b/bmt/bmt_test.go index be10b230a9..b853874d38 100644 --- a/bmt/bmt_test.go +++ b/bmt/bmt_test.go @@ -26,7 +26,6 @@ import ( "testing" "time" - "github.com/ethersphere/swarm/file" "github.com/ethersphere/swarm/testutil" "golang.org/x/crypto/sha3" ) @@ -181,48 +180,6 @@ func TestSyncHasherCorrectness(t *testing.T) { } } -// tests order-neutral concurrent writes with entire max size written in one go -func TestAsyncCorrectness(t *testing.T) { - data := testutil.RandomBytes(1, BufferSize) - hasher := sha3.NewLegacyKeccak256 - size := hasher().Size() - whs := []whenHash{first, last, random} - - for _, double := range []bool{false, true} { - for _, wh := range whs { - for _, count := range counts { - t.Run(fmt.Sprintf("double_%v_hash_when_%v_segments_%v", double, wh, count), func(t *testing.T) { - max := count * size - var incr int - capacity := 1 - pool := NewTreePool(hasher, count, capacity) - defer pool.Drain(0) - for n := 1; n <= max; n += incr { - incr = 1 + rand.Intn(5) - bmt := New(pool) - d := data[:n] - rbmt := NewRefHasher(hasher, count) - expNoMeta := rbmt.Hash(d) - h := hasher() - h.Write(zeroSpan) - h.Write(expNoMeta) - exp := h.Sum(nil) - got := syncHash(bmt, 0, d) - if !bytes.Equal(got, exp) { - t.Fatalf("wrong sync hash (syncpart) for datalength %v: expected %x (ref), got %x", n, exp, got) - } - sw := bmt.NewAsyncWriter(double) - got = asyncHashRandom(sw, 0, d, wh) - if !bytes.Equal(got, exp) { - t.Fatalf("wrong async hash (asyncpart) for datalength %v: expected %x, got %x", n, exp, got) - } - } - }) - } - } - } -} - // Tests that the BMT hasher can be synchronously reused with poolsizes 1 and PoolSize func TestHasherReuse(t *testing.T) { t.Run(fmt.Sprintf("poolsize_%d", 1), func(t *testing.T) { @@ -402,19 +359,6 @@ const ( random ) -func BenchmarkBMTAsync(t *testing.B) { - whs := []whenHash{first, last, random} - for size := 4096; size >= 128; size /= 2 { - for _, wh := range whs { - for _, double := range []bool{false, true} { - t.Run(fmt.Sprintf("double_%v_hash_when_%v_size_%v", double, wh, size), func(t *testing.B) { - benchmarkBMTAsync(t, size, wh, double) - }) - } - } - } -} - func BenchmarkPool(t *testing.B) { caps := []int{1, PoolSize} for size := 4096; size >= 128; size /= 2 { @@ -483,24 +427,6 @@ func benchmarkBMT(t *testing.B, n int) { } } -// benchmarks BMT hasher with asynchronous concurrent segment/section writes -func benchmarkBMTAsync(t *testing.B, n int, wh whenHash, double bool) { - data := testutil.RandomBytes(1, n) - hasher := sha3.NewLegacyKeccak256 - pool := NewTreePool(hasher, segmentCount, PoolSize) - bmt := New(pool).NewAsyncWriter(double) - idxs, segments := splitAndShuffle(bmt.SectionSize(), data) - rand.Shuffle(len(idxs), func(i int, j int) { - idxs[i], idxs[j] = idxs[j], idxs[i] - }) - - t.ReportAllocs() - t.ResetTimer() - for i := 0; i < t.N; i++ { - asyncHash(bmt, 0, n, wh, idxs, segments) - } -} - // benchmarks 100 concurrent bmt hashes with pool capacity func benchmarkPool(t *testing.B, poolsize, n int) { data := testutil.RandomBytes(1, n) @@ -566,49 +492,6 @@ func splitAndShuffle(secsize int, data []byte) (idxs []int, segments [][]byte) { return idxs, segments } -// splits the input data performs a random shuffle to mock async section writes -func asyncHashRandom(bmt file.SectionWriter, spanLength int, data []byte, wh whenHash) (s []byte) { - idxs, segments := splitAndShuffle(bmt.SectionSize(), data) - return asyncHash(bmt, spanLength, len(data), wh, idxs, segments) -} - -// mock for async section writes for file.SectionWriter -// requires a permutation (a random shuffle) of list of all indexes of segments -// and writes them in order to the appropriate section -// the Sum function is called according to the wh parameter (first, last, random [relative to segment writes]) -func asyncHash(bmt file.SectionWriter, spanLength int, l int, wh whenHash, idxs []int, segments [][]byte) (s []byte) { - bmt.Reset() - if l == 0 { - bmt.SetLength(l) - bmt.SetSpan(spanLength) - return bmt.Sum(nil) - } - c := make(chan []byte, 1) - hashf := func() { - bmt.SetLength(l) - bmt.SetSpan(spanLength) - c <- bmt.Sum(nil) - } - maxsize := len(idxs) - var r int - if wh == random { - r = rand.Intn(maxsize) - } - for i, idx := range idxs { - bmt.SeekSection(idx) - bmt.Write(segments[idx]) - if (wh == first || wh == random) && i == r { - go hashf() - } - } - if wh == last { - bmt.SetLength(l) - bmt.SetSpan(spanLength) - return bmt.Sum(nil) - } - return <-c -} - // TestUseSyncAsOrdinaryHasher verifies that the bmt.Hasher can be used with the hash.Hash interface func TestUseSyncAsOrdinaryHasher(t *testing.T) { hasher := sha3.NewLegacyKeccak256 diff --git a/file/hasher/asynchasher.go b/file/hasher/asynchasher.go new file mode 100644 index 0000000000..cce1a9613d --- /dev/null +++ b/file/hasher/asynchasher.go @@ -0,0 +1,194 @@ +package hasher + +import ( + "errors" + "sync" + + "github.com/ethersphere/swarm/file" +) + +// NewAsyncWriter extends Hasher with an interface for concurrent segment/section writes +// TODO: Instead of explicitly setting double size of segment should be dynamic and chunked internally. If not, we have to keep different bmt hashers generation functions for different purposes in the same instance, or cope with added complexity of bmt hasher generation functions having to receive parameters +func (h *Hasher) NewAsyncWriter(double bool) *AsyncHasher { + secsize := h.pool.SegmentSize + if double { + secsize *= 2 + } + seccount := h.pool.SegmentCount + if double { + seccount /= 2 + } + write := func(i int, section []byte, final bool) { + h.writeSection(i, section, double, final) + } + return &AsyncHasher{ + Hasher: h, + double: double, + secsize: secsize, + seccount: seccount, + write: write, + jobSize: 0, + } +} + +// AsyncHasher extends BMT Hasher with an asynchronous segment/section writer interface +// AsyncHasher cannot be used as with a hash.Hash interface: It must be used with the +// right indexes and length and the right number of sections +// It is unsafe and does not check indexes and section data lengths +// +// behaviour is undefined if +// * non-final sections are shorter or longer than secsize +// * if final section does not match length +// * write a section with index that is higher than length/secsize +// * set length in Sum call when length/secsize < maxsec +// +// * if Sum() is not called on a Hasher that is fully written +// a process will block, can be terminated with Reset +// * it will not leak processes if not all sections are written but it blocks +// and keeps the resource which can be released calling Reset() +type AsyncHasher struct { + *Hasher // extends the Hasher + mtx sync.Mutex // to lock the cursor access + double bool // whether to use double segments (call Hasher.writeSection) + secsize int // size of base section (size of hash or double) + seccount int // base section count + write func(i int, section []byte, final bool) + errFunc func(error) + all bool // if all written in one go, temporary workaround + jobSize int +} + +// Reset implements file.SectionWriter +func (sw *AsyncHasher) Reset() { + sw.jobSize = 0 + sw.all = false + sw.Hasher.Reset() +} + +// SetLength implements file.SectionWriter +func (sw *AsyncHasher) SetLength(length int) { + sw.jobSize = length +} + +// SetWriter implements file.SectionWriter +func (sw *AsyncHasher) SetWriter(_ file.SectionWriterFunc) file.SectionWriter { + sw.errFunc(errors.New("Asynchasher does not currently support SectionWriter chaining")) + return sw +} + +// SectionSize implements file.SectionWriter +func (sw *AsyncHasher) SectionSize() int { + return sw.secsize +} + +// Branches implements file.SectionWriter +func (sw *AsyncHasher) Branches() int { + return sw.seccount +} + +// SeekSection locks the cursor until Write() is called; if no Write() is called, it will hang. +// Implements file.SectionWriter +func (sw *AsyncHasher) SeekSection(offset int) { + sw.mtx.Lock() + sw.Hasher.SeekSection(offset) +} + +// Write writes to the current position cursor of the Hasher +// The cursor must first be manually set with SeekSection() +// The method will NOT advance the cursor. +// Implements file.SectionWriter +func (sw *AsyncHasher) Write(section []byte) (int, error) { + defer sw.mtx.Unlock() + sw.Hasher.size += len(section) + return sw.WriteSection(sw.Hasher.cursor, section) +} + +// WriteSection writes the i-th section of the BMT base +// this function can and is meant to be called concurrently +// it sets max segment threadsafely +func (sw *AsyncHasher) WriteSection(i int, section []byte) (int, error) { + // TODO: Temporary workaround for chunkwise write + if i < 0 { + sw.Hasher.cursor = 0 + sw.Hasher.Reset() + sw.Hasher.SetLength(len(section)) + sw.Hasher.Write(section) + sw.all = true + return len(section), nil + } + //sw.mtx.Lock() // this lock is now set in SeekSection + // defer sw.mtk.Unlock() // this unlock is still left in Write() + t := sw.getTree() + // cursor keeps track of the rightmost section written so far + // if index is lower than cursor then just write non-final section as is + if i < t.cursor { + // if index is not the rightmost, safe to write section + go sw.write(i, section, false) + return len(section), nil + } + // if there is a previous rightmost section safe to write section + if t.offset > 0 { + if i == t.cursor { + // i==cursor implies cursor was set by Hash call so we can write section as final one + // since it can be shorter, first we copy it to the padded buffer + t.section = make([]byte, sw.secsize) + copy(t.section, section) + go sw.write(i, t.section, true) + return len(section), nil + } + // the rightmost section just changed, so we write the previous one as non-final + go sw.write(t.cursor, t.section, false) + } + // set i as the index of the righmost section written so far + // set t.offset to cursor*secsize+1 + t.cursor = i + t.offset = i*sw.secsize + 1 + t.section = make([]byte, sw.secsize) + copy(t.section, section) + return len(section), nil +} + +// Sum can be called any time once the length and the span is known +// potentially even before all segments have been written +// in such cases Sum will block until all segments are present and +// the hash for the length can be calculated. +// +// b: digest is appended to b +// length: known length of the input (unsafe; undefined if out of range) +// meta: metadata to hash together with BMT root for the final digest +// e.g., span for protection against existential forgery +// +// Implements file.SectionWriter +func (sw *AsyncHasher) Sum(b []byte) (s []byte) { + if sw.all { + return sw.Hasher.Sum(nil) + } + sw.mtx.Lock() + t := sw.getTree() + length := sw.jobSize + if length == 0 { + sw.releaseTree() + sw.mtx.Unlock() + s = sw.pool.zerohashes[sw.pool.Depth] + return + } else { + // for non-zero input the rightmost section is written to the tree asynchronously + // if the actual last section has been written (t.cursor == length/t.secsize) + maxsec := (length - 1) / sw.secsize + if t.offset > 0 { + go sw.write(t.cursor, t.section, maxsec == t.cursor) + } + // set cursor to maxsec so final section is written when it arrives + t.cursor = maxsec + t.offset = length + result := t.result + sw.mtx.Unlock() + // wait for the result or reset + s = <-result + } + // relesase the tree back to the pool + sw.releaseTree() + meta := t.span + // hash together meta and BMT root hash using the pools + return doSum(sw.pool.hasher(), b, meta, s) +} diff --git a/file/hasher/asynchasher_test.go b/file/hasher/asynchasher_test.go new file mode 100644 index 0000000000..410e713037 --- /dev/null +++ b/file/hasher/asynchasher_test.go @@ -0,0 +1,128 @@ +package hasher + +import ( + "bytes" + "fmt" + "math/rand" + "testing" + + "github.com/ethersphere/swarm/file" + "github.com/ethersphere/swarm/testutil" + "golang.org/x/crypto/sha3" +) + +// tests order-neutral concurrent writes with entire max size written in one go +func TestAsyncCorrectness(t *testing.T) { + data := testutil.RandomBytes(1, BufferSize) + hasher := sha3.NewLegacyKeccak256 + size := hasher().Size() + whs := []whenHash{first, last, random} + + for _, double := range []bool{false, true} { + for _, wh := range whs { + for _, count := range counts { + t.Run(fmt.Sprintf("double_%v_hash_when_%v_segments_%v", double, wh, count), func(t *testing.T) { + max := count * size + var incr int + capacity := 1 + pool := NewTreePool(hasher, count, capacity) + defer pool.Drain(0) + for n := 1; n <= max; n += incr { + incr = 1 + rand.Intn(5) + bmt := New(pool) + d := data[:n] + rbmt := NewRefHasher(hasher, count) + expNoMeta := rbmt.Hash(d) + h := hasher() + h.Write(zeroSpan) + h.Write(expNoMeta) + exp := h.Sum(nil) + got := syncHash(bmt, 0, d) + if !bytes.Equal(got, exp) { + t.Fatalf("wrong sync hash (syncpart) for datalength %v: expected %x (ref), got %x", n, exp, got) + } + sw := bmt.NewAsyncWriter(double) + got = asyncHashRandom(sw, 0, d, wh) + if !bytes.Equal(got, exp) { + t.Fatalf("wrong async hash (asyncpart) for datalength %v: expected %x, got %x", n, exp, got) + } + } + }) + } + } + } +} + +func BenchmarkBMTAsync(t *testing.B) { + whs := []whenHash{first, last, random} + for size := 4096; size >= 128; size /= 2 { + for _, wh := range whs { + for _, double := range []bool{false, true} { + t.Run(fmt.Sprintf("double_%v_hash_when_%v_size_%v", double, wh, size), func(t *testing.B) { + benchmarkBMTAsync(t, size, wh, double) + }) + } + } + } +} + +// benchmarks BMT hasher with asynchronous concurrent segment/section writes +func benchmarkBMTAsync(t *testing.B, n int, wh whenHash, double bool) { + data := testutil.RandomBytes(1, n) + hasher := sha3.NewLegacyKeccak256 + pool := NewTreePool(hasher, segmentCount, PoolSize) + bmt := New(pool).NewAsyncWriter(double) + idxs, segments := splitAndShuffle(bmt.SectionSize(), data) + rand.Shuffle(len(idxs), func(i int, j int) { + idxs[i], idxs[j] = idxs[j], idxs[i] + }) + + t.ReportAllocs() + t.ResetTimer() + for i := 0; i < t.N; i++ { + asyncHash(bmt, 0, n, wh, idxs, segments) + } +} + +// splits the input data performs a random shuffle to mock async section writes +func asyncHashRandom(bmt file.SectionWriter, spanLength int, data []byte, wh whenHash) (s []byte) { + idxs, segments := splitAndShuffle(bmt.SectionSize(), data) + return asyncHash(bmt, spanLength, len(data), wh, idxs, segments) +} + +// mock for async section writes for file.SectionWriter +// requires a permutation (a random shuffle) of list of all indexes of segments +// and writes them in order to the appropriate section +// the Sum function is called according to the wh parameter (first, last, random [relative to segment writes]) +func asyncHash(bmt file.SectionWriter, spanLength int, l int, wh whenHash, idxs []int, segments [][]byte) (s []byte) { + bmt.Reset() + if l == 0 { + bmt.SetLength(l) + bmt.SetSpan(spanLength) + return bmt.Sum(nil) + } + c := make(chan []byte, 1) + hashf := func() { + bmt.SetLength(l) + bmt.SetSpan(spanLength) + c <- bmt.Sum(nil) + } + maxsize := len(idxs) + var r int + if wh == random { + r = rand.Intn(maxsize) + } + for i, idx := range idxs { + bmt.SeekSection(idx) + bmt.Write(segments[idx]) + if (wh == first || wh == random) && i == r { + go hashf() + } + } + if wh == last { + bmt.SetLength(l) + bmt.SetSpan(spanLength) + return bmt.Sum(nil) + } + return <-c +} From 57cf86dbeec5a0703cf8be43d1d7b11363d4c49c Mon Sep 17 00:00:00 2001 From: nolash Date: Sun, 15 Dec 2019 19:41:58 +0100 Subject: [PATCH 07/16] bmt, file: Make AsyncHasher compile --- bmt/bmt.go | 64 ++++++++++++++- bmt/bmt_test.go | 10 +-- file/hasher/asynchasher.go | 134 +++++++++++++++----------------- file/hasher/asynchasher_test.go | 110 +++++++++++++++++++------- 4 files changed, 208 insertions(+), 110 deletions(-) diff --git a/bmt/bmt.go b/bmt/bmt.go index 85d5477471..fceb6b0838 100644 --- a/bmt/bmt.go +++ b/bmt/bmt.go @@ -66,7 +66,7 @@ const ( ) var ( - zeroSpan = make([]byte, 8) + ZeroSpan = make([]byte, 8) ) // BaseHasherFunc is a hash.Hash constructor function used for the base hash of the BMT. @@ -347,7 +347,7 @@ func (h *Hasher) Sum(b []byte) (s []byte) { return h.pool.zerohashes[h.pool.Depth] } // write the last section with final flag set to true - go h.writeSection(t.cursor, t.section, true, true) + go h.WriteSection(t.cursor, t.section, true, true) // wait for the result s = <-t.result if t.span == nil { @@ -360,7 +360,7 @@ func (h *Hasher) Sum(b []byte) (s []byte) { } // Write calls sequentially add to the buffer to be hashed, -// with every full segment calls writeSection in a go routine +// with every full segment calls WriteSection in a go routine // Implements hash.Hash and file.SectionWriter func (h *Hasher) Write(b []byte) (int, error) { l := len(b) @@ -394,7 +394,7 @@ func (h *Hasher) Write(b []byte) (int, error) { // read full sections and the last possibly partial section from the input buffer for smax < l { // section complete; push to tree asynchronously - go h.writeSection(t.cursor, t.section, true, false) + go h.WriteSection(t.cursor, t.section, true, false) // reset section t.section = make([]byte, secsize) // copy from input buffer at smax to right half of section @@ -436,6 +436,11 @@ func (h *Hasher) releaseTree() { }() } +func (h *Hasher) WriteSection(i int, section []byte, double bool, final bool) { + h.size += len(section) + h.writeSection(i, section, double, final) +} + // writeSection writes the hash of i-th section into level 1 node of the BMT tree // TODO: h.size increases even on multiple writes to the same section of a section func (h *Hasher) writeSection(i int, section []byte, double bool, final bool) { @@ -610,3 +615,54 @@ func LengthToSpan(length int) []byte { binary.LittleEndian.PutUint64(spanBytes, uint64(length)) return spanBytes } + +// ASYNCHASHER ACCESSORS + +// GetHasher returns a new instance of the underlying hasher +func (h *Hasher) GetHasher() hash.Hash { + return h.pool.hasher() +} + +func (h *Hasher) GetZeroHash() []byte { + return h.pool.zerohashes[h.pool.Depth] +} + +func (h *Hasher) GetTree() *tree { + return h.getTree() +} + +func (h *Hasher) ReleaseTree() { + h.releaseTree() +} + +func (h *Hasher) GetCursor() int { + return h.cursor +} + +func (h *Hasher) SetCursor(c int) { + h.cursor = c +} + +func (t *tree) GetOffset() int { + return t.offset +} + +func (t *tree) SetOffset(offset int) { + t.offset = offset +} + +func (t *tree) GetSection() []byte { + return t.section +} + +func (t *tree) SetSection(b []byte) { + t.section = b +} + +func (t *tree) GetResult() <-chan []byte { + return t.result +} + +func (t *tree) GetSpan() []byte { + return t.span +} diff --git a/bmt/bmt_test.go b/bmt/bmt_test.go index b853874d38..4e7a4797b7 100644 --- a/bmt/bmt_test.go +++ b/bmt/bmt_test.go @@ -35,7 +35,7 @@ func init() { } // the actual data length generated (could be longer than max datalength of the BMT) -const BufferSize = 4128 +const bufferSize = 4128 const ( // segmentCount is the maximum number of segments of the underlying chunk @@ -156,7 +156,7 @@ func TestHasherEmptyData(t *testing.T) { // tests sequential write with entire max size written in one go func TestSyncHasherCorrectness(t *testing.T) { - data := testutil.RandomBytes(1, BufferSize) + data := testutil.RandomBytes(1, bufferSize) hasher := sha3.NewLegacyKeccak256 size := hasher().Size() @@ -198,7 +198,7 @@ func testHasherReuse(poolsize int, t *testing.T) { bmt := New(pool) for i := 0; i < 100; i++ { - data := testutil.RandomBytes(1, BufferSize) + data := testutil.RandomBytes(1, bufferSize) n := rand.Intn(bmt.Size()) err := testHasherCorrectness(bmt, hasher, data, n, segmentCount) if err != nil { @@ -218,7 +218,7 @@ func TestBMTConcurrentUse(t *testing.T) { for i := 0; i < cycles; i++ { go func() { bmt := New(pool) - data := testutil.RandomBytes(1, BufferSize) + data := testutil.RandomBytes(1, bufferSize) n := rand.Intn(bmt.Size()) errc <- testHasherCorrectness(bmt, hasher, data, n, 128) }() @@ -256,7 +256,7 @@ func TestBMTWriterBuffers(t *testing.T) { rbmt := NewRefHasher(hasher, count) refNoMetaHash := rbmt.Hash(data) h := hasher() - h.Write(zeroSpan) + h.Write(ZeroSpan) h.Write(refNoMetaHash) refHash := h.Sum(nil) expHash := syncHash(bmt, 0, data) diff --git a/file/hasher/asynchasher.go b/file/hasher/asynchasher.go index cce1a9613d..766ae61655 100644 --- a/file/hasher/asynchasher.go +++ b/file/hasher/asynchasher.go @@ -4,34 +4,31 @@ import ( "errors" "sync" + "github.com/ethersphere/swarm/bmt" "github.com/ethersphere/swarm/file" ) -// NewAsyncWriter extends Hasher with an interface for concurrent segment/section writes +// NewAsyncWriter extends Hasher with an interface for concurrent segment.GetSection() writes // TODO: Instead of explicitly setting double size of segment should be dynamic and chunked internally. If not, we have to keep different bmt hashers generation functions for different purposes in the same instance, or cope with added complexity of bmt hasher generation functions having to receive parameters -func (h *Hasher) NewAsyncWriter(double bool) *AsyncHasher { - secsize := h.pool.SegmentSize +func NewAsyncWriter(h *bmt.Hasher, double bool) *AsyncHasher { + secsize := h.SectionSize() if double { secsize *= 2 } - seccount := h.pool.SegmentCount + seccount := h.Branches() if double { seccount /= 2 } - write := func(i int, section []byte, final bool) { - h.writeSection(i, section, double, final) - } return &AsyncHasher{ Hasher: h, double: double, secsize: secsize, seccount: seccount, - write: write, jobSize: 0, } } -// AsyncHasher extends BMT Hasher with an asynchronous segment/section writer interface +// AsyncHasher extends BMT Hasher with an asynchronous segment.GetSection() writer interface // AsyncHasher cannot be used as with a hash.Hash interface: It must be used with the // right indexes and length and the right number of sections // It is unsafe and does not check indexes and section data lengths @@ -47,15 +44,15 @@ func (h *Hasher) NewAsyncWriter(double bool) *AsyncHasher { // * it will not leak processes if not all sections are written but it blocks // and keeps the resource which can be released calling Reset() type AsyncHasher struct { - *Hasher // extends the Hasher - mtx sync.Mutex // to lock the cursor access - double bool // whether to use double segments (call Hasher.writeSection) - secsize int // size of base section (size of hash or double) - seccount int // base section count - write func(i int, section []byte, final bool) - errFunc func(error) - all bool // if all written in one go, temporary workaround - jobSize int + *bmt.Hasher // extends the Hasher + mtx sync.Mutex // to lock the cursor access + double bool // whether to use double segments (call Hasher.writeSection) + secsize int // size of base section (size of hash or double) + seccount int // base section count + write func(i int, section []byte, final bool) + errFunc func(error) + all bool // if all written in one go, temporary workaround + jobSize int } // Reset implements file.SectionWriter @@ -86,65 +83,55 @@ func (sw *AsyncHasher) Branches() int { return sw.seccount } -// SeekSection locks the cursor until Write() is called; if no Write() is called, it will hang. -// Implements file.SectionWriter -func (sw *AsyncHasher) SeekSection(offset int) { - sw.mtx.Lock() - sw.Hasher.SeekSection(offset) -} - -// Write writes to the current position cursor of the Hasher -// The cursor must first be manually set with SeekSection() -// The method will NOT advance the cursor. -// Implements file.SectionWriter -func (sw *AsyncHasher) Write(section []byte) (int, error) { - defer sw.mtx.Unlock() - sw.Hasher.size += len(section) - return sw.WriteSection(sw.Hasher.cursor, section) -} - // WriteSection writes the i-th section of the BMT base // this function can and is meant to be called concurrently // it sets max segment threadsafely -func (sw *AsyncHasher) WriteSection(i int, section []byte) (int, error) { +func (sw *AsyncHasher) WriteToIndex(i int, section []byte) (int, error) { // TODO: Temporary workaround for chunkwise write if i < 0 { - sw.Hasher.cursor = 0 sw.Hasher.Reset() sw.Hasher.SetLength(len(section)) sw.Hasher.Write(section) sw.all = true return len(section), nil } - //sw.mtx.Lock() // this lock is now set in SeekSection - // defer sw.mtk.Unlock() // this unlock is still left in Write() - t := sw.getTree() - // cursor keeps track of the rightmost section written so far + sw.mtx.Lock() + defer sw.mtx.Unlock() + t := sw.GetTree() + // cursor keeps track of the rightmost.GetSection() written so far // if index is lower than cursor then just write non-final section as is - if i < t.cursor { + if i < sw.Hasher.GetCursor() { // if index is not the rightmost, safe to write section go sw.write(i, section, false) return len(section), nil } - // if there is a previous rightmost section safe to write section - if t.offset > 0 { - if i == t.cursor { + // if there is a previous rightmost.GetSection() safe to write section + if t.GetOffset() > 0 { + if i == sw.Hasher.GetCursor() { // i==cursor implies cursor was set by Hash call so we can write section as final one // since it can be shorter, first we copy it to the padded buffer - t.section = make([]byte, sw.secsize) - copy(t.section, section) - go sw.write(i, t.section, true) + //t.GetSection() = make([]byte, sw.secsize) + //copy(t.GetSection(), section) + // TODO: Consider whether the section here needs to be copied, maybe we can enforce not change the original slice + copySection := make([]byte, sw.secsize) + copy(copySection, section) + t.SetSection(copySection) + go sw.Hasher.WriteSection(i, t.GetSection(), sw.double, true) return len(section), nil } - // the rightmost section just changed, so we write the previous one as non-final - go sw.write(t.cursor, t.section, false) + // the rightmost.GetSection() just changed, so we write the previous one as non-final + go sw.write(sw.Hasher.GetCursor(), t.GetSection(), false) } - // set i as the index of the righmost section written so far - // set t.offset to cursor*secsize+1 - t.cursor = i - t.offset = i*sw.secsize + 1 - t.section = make([]byte, sw.secsize) - copy(t.section, section) + // set i as the index of the righmost.GetSection() written so far + // set t.GetOffset() to cursor*secsize+1 + sw.Hasher.SetCursor(i) + //t.GetOffset() = i*sw.secsize + 1 + t.SetOffset(i*sw.secsize + 1) + //t.GetSection() = make([]byte, sw.secsize) + //copy(t.GetSection(), section) + copySection := make([]byte, sw.secsize) + copy(copySection, section) + t.SetSection(copySection) return len(section), nil } @@ -157,38 +144,41 @@ func (sw *AsyncHasher) WriteSection(i int, section []byte) (int, error) { // length: known length of the input (unsafe; undefined if out of range) // meta: metadata to hash together with BMT root for the final digest // e.g., span for protection against existential forgery -// -// Implements file.SectionWriter func (sw *AsyncHasher) Sum(b []byte) (s []byte) { if sw.all { return sw.Hasher.Sum(nil) } sw.mtx.Lock() - t := sw.getTree() + t := sw.GetTree() length := sw.jobSize if length == 0 { - sw.releaseTree() + sw.ReleaseTree() sw.mtx.Unlock() - s = sw.pool.zerohashes[sw.pool.Depth] + s = sw.Hasher.GetZeroHash() return } else { - // for non-zero input the rightmost section is written to the tree asynchronously - // if the actual last section has been written (t.cursor == length/t.secsize) + // for non-zero input the rightmost.GetSection() is written to the tree asynchronously + // if the actual last.GetSection() has been written (sw.Hasher.GetCursor() == length/t.secsize) maxsec := (length - 1) / sw.secsize - if t.offset > 0 { - go sw.write(t.cursor, t.section, maxsec == t.cursor) + if t.GetOffset() > 0 { + go sw.Hasher.WriteSection(sw.Hasher.GetCursor(), t.GetSection(), sw.double, maxsec == sw.Hasher.GetCursor()) } - // set cursor to maxsec so final section is written when it arrives - t.cursor = maxsec - t.offset = length - result := t.result + // sesw.Hasher.GetCursor() to maxsec so final section is written when it arrives + sw.Hasher.SetCursor(maxsec) + t.SetOffset(length) + // TODO: must this t.result channel be within lock? + result := t.GetResult() sw.mtx.Unlock() // wait for the result or reset s = <-result } // relesase the tree back to the pool - sw.releaseTree() - meta := t.span + sw.ReleaseTree() + meta := t.GetSpan() // hash together meta and BMT root hash using the pools - return doSum(sw.pool.hasher(), b, meta, s) + hsh := sw.Hasher.GetHasher() + hsh.Reset() + hsh.Write(meta) + hsh.Write(s) + return hsh.Sum(b) } diff --git a/file/hasher/asynchasher_test.go b/file/hasher/asynchasher_test.go index 410e713037..1cc86e5a0f 100644 --- a/file/hasher/asynchasher_test.go +++ b/file/hasher/asynchasher_test.go @@ -6,14 +6,14 @@ import ( "math/rand" "testing" - "github.com/ethersphere/swarm/file" + "github.com/ethersphere/swarm/bmt" "github.com/ethersphere/swarm/testutil" "golang.org/x/crypto/sha3" ) // tests order-neutral concurrent writes with entire max size written in one go func TestAsyncCorrectness(t *testing.T) { - data := testutil.RandomBytes(1, BufferSize) + data := testutil.RandomBytes(1, bufferSize) hasher := sha3.NewLegacyKeccak256 size := hasher().Size() whs := []whenHash{first, last, random} @@ -25,23 +25,23 @@ func TestAsyncCorrectness(t *testing.T) { max := count * size var incr int capacity := 1 - pool := NewTreePool(hasher, count, capacity) + pool := bmt.NewTreePool(hasher, count, capacity) defer pool.Drain(0) for n := 1; n <= max; n += incr { incr = 1 + rand.Intn(5) - bmt := New(pool) + bmtobj := bmt.New(pool) d := data[:n] - rbmt := NewRefHasher(hasher, count) - expNoMeta := rbmt.Hash(d) + rbmtobj := bmt.NewRefHasher(hasher, count) + expNoMeta := rbmtobj.Hash(d) h := hasher() - h.Write(zeroSpan) + h.Write(bmt.ZeroSpan) h.Write(expNoMeta) exp := h.Sum(nil) - got := syncHash(bmt, 0, d) + got := syncHash(bmtobj, 0, d) if !bytes.Equal(got, exp) { t.Fatalf("wrong sync hash (syncpart) for datalength %v: expected %x (ref), got %x", n, exp, got) } - sw := bmt.NewAsyncWriter(double) + sw := NewAsyncWriter(bmtobj, double) got = asyncHashRandom(sw, 0, d, wh) if !bytes.Equal(got, exp) { t.Fatalf("wrong async hash (asyncpart) for datalength %v: expected %x, got %x", n, exp, got) @@ -70,9 +70,10 @@ func BenchmarkBMTAsync(t *testing.B) { func benchmarkBMTAsync(t *testing.B, n int, wh whenHash, double bool) { data := testutil.RandomBytes(1, n) hasher := sha3.NewLegacyKeccak256 - pool := NewTreePool(hasher, segmentCount, PoolSize) - bmt := New(pool).NewAsyncWriter(double) - idxs, segments := splitAndShuffle(bmt.SectionSize(), data) + pool := bmt.NewTreePool(hasher, segmentCount, bmt.PoolSize) + bmth := bmt.New(pool) + bmtobj := NewAsyncWriter(bmth, double) + idxs, segments := splitAndShuffle(bmtobj.SectionSize(), data) rand.Shuffle(len(idxs), func(i int, j int) { idxs[i], idxs[j] = idxs[j], idxs[i] }) @@ -80,32 +81,32 @@ func benchmarkBMTAsync(t *testing.B, n int, wh whenHash, double bool) { t.ReportAllocs() t.ResetTimer() for i := 0; i < t.N; i++ { - asyncHash(bmt, 0, n, wh, idxs, segments) + asyncHash(bmtobj, 0, n, wh, idxs, segments) } } // splits the input data performs a random shuffle to mock async section writes -func asyncHashRandom(bmt file.SectionWriter, spanLength int, data []byte, wh whenHash) (s []byte) { - idxs, segments := splitAndShuffle(bmt.SectionSize(), data) - return asyncHash(bmt, spanLength, len(data), wh, idxs, segments) +func asyncHashRandom(bmtobj *AsyncHasher, spanLength int, data []byte, wh whenHash) (s []byte) { + idxs, segments := splitAndShuffle(bmtobj.SectionSize(), data) + return asyncHash(bmtobj, spanLength, len(data), wh, idxs, segments) } // mock for async section writes for file.SectionWriter // requires a permutation (a random shuffle) of list of all indexes of segments // and writes them in order to the appropriate section // the Sum function is called according to the wh parameter (first, last, random [relative to segment writes]) -func asyncHash(bmt file.SectionWriter, spanLength int, l int, wh whenHash, idxs []int, segments [][]byte) (s []byte) { - bmt.Reset() +func asyncHash(bmtobj *AsyncHasher, spanLength int, l int, wh whenHash, idxs []int, segments [][]byte) (s []byte) { + bmtobj.Reset() if l == 0 { - bmt.SetLength(l) - bmt.SetSpan(spanLength) - return bmt.Sum(nil) + bmtobj.SetLength(l) + bmtobj.SetSpan(spanLength) + return bmtobj.Sum(nil) } c := make(chan []byte, 1) hashf := func() { - bmt.SetLength(l) - bmt.SetSpan(spanLength) - c <- bmt.Sum(nil) + bmtobj.SetLength(l) + bmtobj.SetSpan(spanLength) + c <- bmtobj.Sum(nil) } maxsize := len(idxs) var r int @@ -113,16 +114,67 @@ func asyncHash(bmt file.SectionWriter, spanLength int, l int, wh whenHash, idxs r = rand.Intn(maxsize) } for i, idx := range idxs { - bmt.SeekSection(idx) - bmt.Write(segments[idx]) + bmtobj.SeekSection(idx) + bmtobj.Write(segments[idx]) if (wh == first || wh == random) && i == r { go hashf() } } if wh == last { - bmt.SetLength(l) - bmt.SetSpan(spanLength) - return bmt.Sum(nil) + bmtobj.SetLength(l) + bmtobj.SetSpan(spanLength) + return bmtobj.Sum(nil) } return <-c } + +// COPIED FROM bmt test package +// MERGE LATER + +// Hash hashes the data and the span using the bmt hasher +func syncHash(h *bmt.Hasher, spanLength int, data []byte) []byte { + h.Reset() + h.SetSpan(spanLength) + h.Write(data) + return h.Sum(nil) +} + +func splitAndShuffle(secsize int, data []byte) (idxs []int, segments [][]byte) { + l := len(data) + n := l / secsize + if l%secsize > 0 { + n++ + } + for i := 0; i < n; i++ { + idxs = append(idxs, i) + end := (i + 1) * secsize + if end > l { + end = l + } + section := data[i*secsize : end] + segments = append(segments, section) + } + rand.Shuffle(n, func(i int, j int) { + idxs[i], idxs[j] = idxs[j], idxs[i] + }) + return idxs, segments +} + +const ( + // segmentCount is the maximum number of segments of the underlying chunk + // Should be equal to max-chunk-data-size / hash-size + // Currently set to 128 == 4096 (default chunk size) / 32 (sha3.keccak256 size) + segmentCount = 128 +) + +const bufferSize = 4128 + +type whenHash = int + +const ( + first whenHash = iota + last + random +) + +var counts = []int{1, 2, 3, 4, 5, 8, 9, 15, 16, 17, 32, 37, 42, 53, 63, 64, 65, 111, 127, 128} From 859a48e59b04f91dedcdac6d7fdcfc425d966131 Mon Sep 17 00:00:00 2001 From: nolash Date: Sun, 15 Dec 2019 20:42:53 +0100 Subject: [PATCH 08/16] file, bmt: Add naive exports in bmt to provide for asynchasher move --- bmt/bmt.go | 5 ----- bmt/bmt_test.go | 1 + file/hasher/asynchasher.go | 4 ++-- file/hasher/asynchasher_test.go | 3 +-- file/types.go | 1 - 5 files changed, 4 insertions(+), 10 deletions(-) diff --git a/bmt/bmt.go b/bmt/bmt.go index fceb6b0838..824eac3233 100644 --- a/bmt/bmt.go +++ b/bmt/bmt.go @@ -327,11 +327,6 @@ func (h *Hasher) Size() int { return h.pool.SegmentSize } -// SeekSection implements file.SectionWriter -func (h *Hasher) SeekSection(offset int) { - h.cursor = offset -} - // BlockSize implements hash.Hash and file.SectionWriter func (h *Hasher) BlockSize() int { return 2 * h.pool.SegmentSize diff --git a/bmt/bmt_test.go b/bmt/bmt_test.go index 4e7a4797b7..1a3686b08d 100644 --- a/bmt/bmt_test.go +++ b/bmt/bmt_test.go @@ -497,6 +497,7 @@ func TestUseSyncAsOrdinaryHasher(t *testing.T) { hasher := sha3.NewLegacyKeccak256 pool := NewTreePool(hasher, segmentCount, PoolSize) bmt := New(pool) + bmt.SetSpan(3) bmt.Write([]byte("foo")) res := bmt.Sum(nil) refh := NewRefHasher(hasher, 128) diff --git a/file/hasher/asynchasher.go b/file/hasher/asynchasher.go index 766ae61655..1374061a88 100644 --- a/file/hasher/asynchasher.go +++ b/file/hasher/asynchasher.go @@ -102,7 +102,7 @@ func (sw *AsyncHasher) WriteToIndex(i int, section []byte) (int, error) { // if index is lower than cursor then just write non-final section as is if i < sw.Hasher.GetCursor() { // if index is not the rightmost, safe to write section - go sw.write(i, section, false) + go sw.WriteSection(i, section, sw.double, false) return len(section), nil } // if there is a previous rightmost.GetSection() safe to write section @@ -120,7 +120,7 @@ func (sw *AsyncHasher) WriteToIndex(i int, section []byte) (int, error) { return len(section), nil } // the rightmost.GetSection() just changed, so we write the previous one as non-final - go sw.write(sw.Hasher.GetCursor(), t.GetSection(), false) + go sw.WriteSection(sw.Hasher.GetCursor(), t.GetSection(), sw.double, false) } // set i as the index of the righmost.GetSection() written so far // set t.GetOffset() to cursor*secsize+1 diff --git a/file/hasher/asynchasher_test.go b/file/hasher/asynchasher_test.go index 1cc86e5a0f..d8e4b9547b 100644 --- a/file/hasher/asynchasher_test.go +++ b/file/hasher/asynchasher_test.go @@ -114,8 +114,7 @@ func asyncHash(bmtobj *AsyncHasher, spanLength int, l int, wh whenHash, idxs []i r = rand.Intn(maxsize) } for i, idx := range idxs { - bmtobj.SeekSection(idx) - bmtobj.Write(segments[idx]) + bmtobj.WriteToIndex(idx, segments[idx]) if (wh == first || wh == random) && i == r { go hashf() } diff --git a/file/types.go b/file/types.go index 19db8bd326..22ea775436 100644 --- a/file/types.go +++ b/file/types.go @@ -10,7 +10,6 @@ type SectionWriterFunc func(ctx context.Context) SectionWriter type SectionWriter interface { hash.Hash // Write,Sum,Reset,Size,BlockSize SetWriter(hashFunc SectionWriterFunc) SectionWriter // chain another SectionWriter the current instance - SeekSection(section int) // sets cursor that next Write() will write to SetLength(length int) // set total number of bytes that will be written to SectionWriter SetSpan(length int) // set data span of chunk SectionSize() int // section size of this SectionWriter From 8b595313cf545f9ac33ba93961164deccb853788 Mon Sep 17 00:00:00 2001 From: nolash Date: Sun, 15 Dec 2019 20:52:01 +0100 Subject: [PATCH 09/16] file: Add ctx and errFunc to async hasher constructor --- file/hasher/asynchasher.go | 14 ++++++++++++-- file/hasher/asynchasher_test.go | 9 +++++++-- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/file/hasher/asynchasher.go b/file/hasher/asynchasher.go index 1374061a88..817c113e08 100644 --- a/file/hasher/asynchasher.go +++ b/file/hasher/asynchasher.go @@ -1,6 +1,7 @@ package hasher import ( + "context" "errors" "sync" @@ -10,7 +11,7 @@ import ( // NewAsyncWriter extends Hasher with an interface for concurrent segment.GetSection() writes // TODO: Instead of explicitly setting double size of segment should be dynamic and chunked internally. If not, we have to keep different bmt hashers generation functions for different purposes in the same instance, or cope with added complexity of bmt hasher generation functions having to receive parameters -func NewAsyncWriter(h *bmt.Hasher, double bool) *AsyncHasher { +func NewAsyncWriter(ctx context.Context, h *bmt.Hasher, double bool, errFunc func(error)) *AsyncHasher { secsize := h.SectionSize() if double { secsize *= 2 @@ -25,6 +26,8 @@ func NewAsyncWriter(h *bmt.Hasher, double bool) *AsyncHasher { secsize: secsize, seccount: seccount, jobSize: 0, + ctx: ctx, + errFunc: errFunc, } } @@ -51,10 +54,17 @@ type AsyncHasher struct { seccount int // base section count write func(i int, section []byte, final bool) errFunc func(error) + ctx context.Context all bool // if all written in one go, temporary workaround jobSize int } +func (sw *AsyncHasher) raiseError(err string) { + if sw.errFunc != nil { + sw.errFunc(errors.New(err)) + } +} + // Reset implements file.SectionWriter func (sw *AsyncHasher) Reset() { sw.jobSize = 0 @@ -69,7 +79,7 @@ func (sw *AsyncHasher) SetLength(length int) { // SetWriter implements file.SectionWriter func (sw *AsyncHasher) SetWriter(_ file.SectionWriterFunc) file.SectionWriter { - sw.errFunc(errors.New("Asynchasher does not currently support SectionWriter chaining")) + sw.raiseError("Asynchasher does not currently support SectionWriter chaining") return sw } diff --git a/file/hasher/asynchasher_test.go b/file/hasher/asynchasher_test.go index d8e4b9547b..9fdbd925c3 100644 --- a/file/hasher/asynchasher_test.go +++ b/file/hasher/asynchasher_test.go @@ -2,6 +2,7 @@ package hasher import ( "bytes" + "context" "fmt" "math/rand" "testing" @@ -41,7 +42,9 @@ func TestAsyncCorrectness(t *testing.T) { if !bytes.Equal(got, exp) { t.Fatalf("wrong sync hash (syncpart) for datalength %v: expected %x (ref), got %x", n, exp, got) } - sw := NewAsyncWriter(bmtobj, double) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sw := NewAsyncWriter(ctx, bmtobj, double, nil) got = asyncHashRandom(sw, 0, d, wh) if !bytes.Equal(got, exp) { t.Fatalf("wrong async hash (asyncpart) for datalength %v: expected %x, got %x", n, exp, got) @@ -72,7 +75,9 @@ func benchmarkBMTAsync(t *testing.B, n int, wh whenHash, double bool) { hasher := sha3.NewLegacyKeccak256 pool := bmt.NewTreePool(hasher, segmentCount, bmt.PoolSize) bmth := bmt.New(pool) - bmtobj := NewAsyncWriter(bmth, double) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + bmtobj := NewAsyncWriter(ctx, bmth, double, nil) idxs, segments := splitAndShuffle(bmtobj.SectionSize(), data) rand.Shuffle(len(idxs), func(i int, j int) { idxs[i], idxs[j] = idxs[j], idxs[i] From 1ada8d3d3e7ada2ee93a1949c3f208caa53dbdd1 Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 17 Dec 2019 17:47:18 +0100 Subject: [PATCH 10/16] file: Enable sync hash.Hash usage through async hasher --- file/hasher/asynchasher.go | 6 +++--- file/hasher/asynchasher_test.go | 35 +++++++++++++++++++++++++++------ 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/file/hasher/asynchasher.go b/file/hasher/asynchasher.go index 817c113e08..21f9f1b0a7 100644 --- a/file/hasher/asynchasher.go +++ b/file/hasher/asynchasher.go @@ -11,7 +11,7 @@ import ( // NewAsyncWriter extends Hasher with an interface for concurrent segment.GetSection() writes // TODO: Instead of explicitly setting double size of segment should be dynamic and chunked internally. If not, we have to keep different bmt hashers generation functions for different purposes in the same instance, or cope with added complexity of bmt hasher generation functions having to receive parameters -func NewAsyncWriter(ctx context.Context, h *bmt.Hasher, double bool, errFunc func(error)) *AsyncHasher { +func NewAsyncHasher(ctx context.Context, h *bmt.Hasher, double bool, errFunc func(error)) *AsyncHasher { secsize := h.SectionSize() if double { secsize *= 2 @@ -96,7 +96,7 @@ func (sw *AsyncHasher) Branches() int { // WriteSection writes the i-th section of the BMT base // this function can and is meant to be called concurrently // it sets max segment threadsafely -func (sw *AsyncHasher) WriteToIndex(i int, section []byte) (int, error) { +func (sw *AsyncHasher) WriteIndexed(i int, section []byte) (int, error) { // TODO: Temporary workaround for chunkwise write if i < 0 { sw.Hasher.Reset() @@ -154,7 +154,7 @@ func (sw *AsyncHasher) WriteToIndex(i int, section []byte) (int, error) { // length: known length of the input (unsafe; undefined if out of range) // meta: metadata to hash together with BMT root for the final digest // e.g., span for protection against existential forgery -func (sw *AsyncHasher) Sum(b []byte) (s []byte) { +func (sw *AsyncHasher) SumIndexed(b []byte) (s []byte) { if sw.all { return sw.Hasher.Sum(nil) } diff --git a/file/hasher/asynchasher_test.go b/file/hasher/asynchasher_test.go index 9fdbd925c3..64934d9312 100644 --- a/file/hasher/asynchasher_test.go +++ b/file/hasher/asynchasher_test.go @@ -44,7 +44,7 @@ func TestAsyncCorrectness(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - sw := NewAsyncWriter(ctx, bmtobj, double, nil) + sw := NewAsyncHasher(ctx, bmtobj, double, nil) got = asyncHashRandom(sw, 0, d, wh) if !bytes.Equal(got, exp) { t.Fatalf("wrong async hash (asyncpart) for datalength %v: expected %x, got %x", n, exp, got) @@ -77,7 +77,7 @@ func benchmarkBMTAsync(t *testing.B, n int, wh whenHash, double bool) { bmth := bmt.New(pool) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - bmtobj := NewAsyncWriter(ctx, bmth, double, nil) + bmtobj := NewAsyncHasher(ctx, bmth, double, nil) idxs, segments := splitAndShuffle(bmtobj.SectionSize(), data) rand.Shuffle(len(idxs), func(i int, j int) { idxs[i], idxs[j] = idxs[j], idxs[i] @@ -105,13 +105,13 @@ func asyncHash(bmtobj *AsyncHasher, spanLength int, l int, wh whenHash, idxs []i if l == 0 { bmtobj.SetLength(l) bmtobj.SetSpan(spanLength) - return bmtobj.Sum(nil) + return bmtobj.SumIndexed(nil) } c := make(chan []byte, 1) hashf := func() { bmtobj.SetLength(l) bmtobj.SetSpan(spanLength) - c <- bmtobj.Sum(nil) + c <- bmtobj.SumIndexed(nil) } maxsize := len(idxs) var r int @@ -119,7 +119,7 @@ func asyncHash(bmtobj *AsyncHasher, spanLength int, l int, wh whenHash, idxs []i r = rand.Intn(maxsize) } for i, idx := range idxs { - bmtobj.WriteToIndex(idx, segments[idx]) + bmtobj.WriteIndexed(idx, segments[idx]) if (wh == first || wh == random) && i == r { go hashf() } @@ -127,11 +127,34 @@ func asyncHash(bmtobj *AsyncHasher, spanLength int, l int, wh whenHash, idxs []i if wh == last { bmtobj.SetLength(l) bmtobj.SetSpan(spanLength) - return bmtobj.Sum(nil) + return bmtobj.SumIndexed(nil) } return <-c } +// TestUseAsyncAsOrdinaryHasher verifies that the bmt.Hasher can be used with the hash.Hash interface +func TestUseAsyncAsOrdinaryHasher(t *testing.T) { + hasher := sha3.NewLegacyKeccak256 + pool := bmt.NewTreePool(hasher, segmentCount, bmt.PoolSize) + sbmt := bmt.New(pool) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + abmt := NewAsyncHasher(ctx, sbmt, false, nil) + abmt.SetSpan(3) + abmt.Write([]byte("foo")) + res := abmt.Sum(nil) + refh := bmt.NewRefHasher(hasher, 128) + resh := refh.Hash([]byte("foo")) + hsub := hasher() + span := bmt.LengthToSpan(3) + hsub.Write(span) + hsub.Write(resh) + refRes := hsub.Sum(nil) + if !bytes.Equal(res, refRes) { + t.Fatalf("normalhash; expected %x, got %x", refRes, res) + } +} + // COPIED FROM bmt test package // MERGE LATER From 0a3ec03611614214108f408f87723c10941b49c6 Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 17 Dec 2019 18:00:15 +0100 Subject: [PATCH 11/16] bmt: Merge Standalone asynchasher to bmt package for easier diff --- bmt/bmt.go | 168 ++++++++++++++++++++++++++ bmt/bmt_test.go | 144 ++++++++++++++++++++++ file/hasher/asynchasher.go | 194 ------------------------------ file/hasher/asynchasher_test.go | 207 -------------------------------- 4 files changed, 312 insertions(+), 401 deletions(-) delete mode 100644 file/hasher/asynchasher.go delete mode 100644 file/hasher/asynchasher_test.go diff --git a/bmt/bmt.go b/bmt/bmt.go index 824eac3233..44a469193b 100644 --- a/bmt/bmt.go +++ b/bmt/bmt.go @@ -20,6 +20,7 @@ package bmt import ( "context" "encoding/binary" + "errors" "fmt" "hash" "strings" @@ -431,6 +432,173 @@ func (h *Hasher) releaseTree() { }() } +// NewAsyncWriter extends Hasher with an interface for concurrent segment.GetSection() writes +// TODO: Instead of explicitly setting double size of segment should be dynamic and chunked internally. If not, we have to keep different bmt hashers generation functions for different purposes in the same instance, or cope with added complexity of bmt hasher generation functions having to receive parameters +func NewAsyncHasher(ctx context.Context, h *Hasher, double bool, errFunc func(error)) *AsyncHasher { + secsize := h.SectionSize() + if double { + secsize *= 2 + } + seccount := h.Branches() + if double { + seccount /= 2 + } + return &AsyncHasher{ + Hasher: h, + double: double, + secsize: secsize, + seccount: seccount, + jobSize: 0, + ctx: ctx, + errFunc: errFunc, + } +} + +// AsyncHasher extends BMT Hasher with an asynchronous segment.GetSection() writer interface +// AsyncHasher cannot be used as with a hash.Hash interface: It must be used with the +// right indexes and length and the right number of sections +// It is unsafe and does not check indexes and section data lengths +// +// behaviour is undefined if +// * non-final sections are shorter or longer than secsize +// * if final section does not match length +// * write a section with index that is higher than length/secsize +// * set length in Sum call when length/secsize < maxsec +// +// * if Sum() is not called on a Hasher that is fully written +// a process will block, can be terminated with Reset +// * it will not leak processes if not all sections are written but it blocks +// and keeps the resource which can be released calling Reset() +type AsyncHasher struct { + *Hasher // extends the Hasher + mtx sync.Mutex // to lock the cursor access + double bool // whether to use double segments (call Hasher.writeSection) + secsize int // size of base section (size of hash or double) + seccount int // base section count + write func(i int, section []byte, final bool) + errFunc func(error) + ctx context.Context + all bool // if all written in one go, temporary workaround + jobSize int +} + +func (sw *AsyncHasher) raiseError(err string) { + if sw.errFunc != nil { + sw.errFunc(errors.New(err)) + } +} + +// Reset implements file.SectionWriter +func (sw *AsyncHasher) Reset() { + sw.jobSize = 0 + sw.all = false + sw.Hasher.Reset() +} + +// SetLength implements file.SectionWriter +func (sw *AsyncHasher) SetLength(length int) { + sw.jobSize = length +} + +// SectionSize implements file.SectionWriter +func (sw *AsyncHasher) SectionSize() int { + return sw.secsize +} + +// Branches implements file.SectionWriter +func (sw *AsyncHasher) Branches() int { + return sw.seccount +} + +// WriteSection writes the i-th section of the BMT base +// this function can and is meant to be called concurrently +// it sets max segment threadsafely +func (sw *AsyncHasher) WriteIndexed(i int, section []byte) (int, error) { + sw.mtx.Lock() + defer sw.mtx.Unlock() + t := sw.GetTree() + // cursor keeps track of the rightmost.GetSection() written so far + // if index is lower than cursor then just write non-final section as is + if i < sw.Hasher.GetCursor() { + // if index is not the rightmost, safe to write section + go sw.WriteSection(i, section, sw.double, false) + return len(section), nil + } + // if there is a previous rightmost.GetSection() safe to write section + if t.GetOffset() > 0 { + if i == sw.Hasher.GetCursor() { + // i==cursor implies cursor was set by Hash call so we can write section as final one + // since it can be shorter, first we copy it to the padded buffer + //t.GetSection() = make([]byte, sw.secsize) + //copy(t.GetSection(), section) + // TODO: Consider whether the section here needs to be copied, maybe we can enforce not change the original slice + copySection := make([]byte, sw.secsize) + copy(copySection, section) + t.SetSection(copySection) + go sw.Hasher.WriteSection(i, t.GetSection(), sw.double, true) + return len(section), nil + } + // the rightmost.GetSection() just changed, so we write the previous one as non-final + go sw.WriteSection(sw.Hasher.GetCursor(), t.GetSection(), sw.double, false) + } + // set i as the index of the righmost.GetSection() written so far + // set t.GetOffset() to cursor*secsize+1 + sw.Hasher.SetCursor(i) + t.SetOffset(i*sw.secsize + 1) + copySection := make([]byte, sw.secsize) + copy(copySection, section) + t.SetSection(copySection) + return len(section), nil +} + +// Sum can be called any time once the length and the span is known +// potentially even before all segments have been written +// in such cases Sum will block until all segments are present and +// the hash for the length can be calculated. +// +// b: digest is appended to b +// length: known length of the input (unsafe; undefined if out of range) +// meta: metadata to hash together with BMT root for the final digest +// e.g., span for protection against existential forgery +func (sw *AsyncHasher) SumIndexed(b []byte) (s []byte) { + if sw.all { + return sw.Hasher.Sum(nil) + } + sw.mtx.Lock() + t := sw.GetTree() + length := sw.jobSize + if length == 0 { + sw.ReleaseTree() + sw.mtx.Unlock() + s = sw.Hasher.GetZeroHash() + return + } else { + // for non-zero input the rightmost.GetSection() is written to the tree asynchronously + // if the actual last.GetSection() has been written (sw.Hasher.GetCursor() == length/t.secsize) + maxsec := (length - 1) / sw.secsize + if t.GetOffset() > 0 { + go sw.Hasher.WriteSection(sw.Hasher.GetCursor(), t.GetSection(), sw.double, maxsec == sw.Hasher.GetCursor()) + } + // sesw.Hasher.GetCursor() to maxsec so final section is written when it arrives + sw.Hasher.SetCursor(maxsec) + t.SetOffset(length) + // TODO: must this t.result channel be within lock? + result := t.GetResult() + sw.mtx.Unlock() + // wait for the result or reset + s = <-result + } + // relesase the tree back to the pool + sw.ReleaseTree() + meta := t.GetSpan() + // hash together meta and BMT root hash using the pools + hsh := sw.Hasher.GetHasher() + hsh.Reset() + hsh.Write(meta) + hsh.Write(s) + return hsh.Sum(b) +} + func (h *Hasher) WriteSection(i int, section []byte, double bool, final bool) { h.size += len(section) h.writeSection(i, section, double, final) diff --git a/bmt/bmt_test.go b/bmt/bmt_test.go index 1a3686b08d..00709088e5 100644 --- a/bmt/bmt_test.go +++ b/bmt/bmt_test.go @@ -18,6 +18,7 @@ package bmt import ( "bytes" + "context" "encoding/binary" "fmt" "math/rand" @@ -180,6 +181,50 @@ func TestSyncHasherCorrectness(t *testing.T) { } } +// tests order-neutral concurrent writes with entire max size written in one go +func TestAsyncCorrectness(t *testing.T) { + data := testutil.RandomBytes(1, bufferSize) + hasher := sha3.NewLegacyKeccak256 + size := hasher().Size() + whs := []whenHash{first, last, random} + + for _, double := range []bool{false, true} { + for _, wh := range whs { + for _, count := range counts { + t.Run(fmt.Sprintf("double_%v_hash_when_%v_segments_%v", double, wh, count), func(t *testing.T) { + max := count * size + var incr int + capacity := 1 + pool := NewTreePool(hasher, count, capacity) + defer pool.Drain(0) + for n := 1; n <= max; n += incr { + incr = 1 + rand.Intn(5) + bmtobj := New(pool) + d := data[:n] + rbmtobj := NewRefHasher(hasher, count) + expNoMeta := rbmtobj.Hash(d) + h := hasher() + h.Write(ZeroSpan) + h.Write(expNoMeta) + exp := h.Sum(nil) + got := syncHash(bmtobj, 0, d) + if !bytes.Equal(got, exp) { + t.Fatalf("wrong sync hash (syncpart) for datalength %v: expected %x (ref), got %x", n, exp, got) + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sw := NewAsyncHasher(ctx, bmtobj, double, nil) + got = asyncHashRandom(sw, 0, d, wh) + if !bytes.Equal(got, exp) { + t.Fatalf("wrong async hash (asyncpart) for datalength %v: expected %x, got %x", n, exp, got) + } + } + }) + } + } + } +} + // Tests that the BMT hasher can be synchronously reused with poolsizes 1 and PoolSize func TestHasherReuse(t *testing.T) { t.Run(fmt.Sprintf("poolsize_%d", 1), func(t *testing.T) { @@ -359,6 +404,19 @@ const ( random ) +func BenchmarkBMTAsync(t *testing.B) { + whs := []whenHash{first, last, random} + for size := 4096; size >= 128; size /= 2 { + for _, wh := range whs { + for _, double := range []bool{false, true} { + t.Run(fmt.Sprintf("double_%v_hash_when_%v_size_%v", double, wh, size), func(t *testing.B) { + benchmarkBMTAsync(t, size, wh, double) + }) + } + } + } +} + func BenchmarkPool(t *testing.B) { caps := []int{1, PoolSize} for size := 4096; size >= 128; size /= 2 { @@ -427,6 +485,27 @@ func benchmarkBMT(t *testing.B, n int) { } } +// benchmarks BMT hasher with asynchronous concurrent segment/section writes +func benchmarkBMTAsync(t *testing.B, n int, wh whenHash, double bool) { + data := testutil.RandomBytes(1, n) + hasher := sha3.NewLegacyKeccak256 + pool := NewTreePool(hasher, segmentCount, PoolSize) + bmth := New(pool) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + bmtobj := NewAsyncHasher(ctx, bmth, double, nil) + idxs, segments := splitAndShuffle(bmtobj.SectionSize(), data) + rand.Shuffle(len(idxs), func(i int, j int) { + idxs[i], idxs[j] = idxs[j], idxs[i] + }) + + t.ReportAllocs() + t.ResetTimer() + for i := 0; i < t.N; i++ { + asyncHash(bmtobj, 0, n, wh, idxs, segments) + } +} + // benchmarks 100 concurrent bmt hashes with pool capacity func benchmarkPool(t *testing.B, poolsize, n int) { data := testutil.RandomBytes(1, n) @@ -492,6 +571,48 @@ func splitAndShuffle(secsize int, data []byte) (idxs []int, segments [][]byte) { return idxs, segments } +// splits the input data performs a random shuffle to mock async section writes +func asyncHashRandom(bmtobj *AsyncHasher, spanLength int, data []byte, wh whenHash) (s []byte) { + idxs, segments := splitAndShuffle(bmtobj.SectionSize(), data) + return asyncHash(bmtobj, spanLength, len(data), wh, idxs, segments) +} + +// mock for async section writes for file.SectionWriter +// requires a permutation (a random shuffle) of list of all indexes of segments +// and writes them in order to the appropriate section +// the Sum function is called according to the wh parameter (first, last, random [relative to segment writes]) +func asyncHash(bmtobj *AsyncHasher, spanLength int, l int, wh whenHash, idxs []int, segments [][]byte) (s []byte) { + bmtobj.Reset() + if l == 0 { + bmtobj.SetLength(l) + bmtobj.SetSpan(spanLength) + return bmtobj.SumIndexed(nil) + } + c := make(chan []byte, 1) + hashf := func() { + bmtobj.SetLength(l) + bmtobj.SetSpan(spanLength) + c <- bmtobj.SumIndexed(nil) + } + maxsize := len(idxs) + var r int + if wh == random { + r = rand.Intn(maxsize) + } + for i, idx := range idxs { + bmtobj.WriteIndexed(idx, segments[idx]) + if (wh == first || wh == random) && i == r { + go hashf() + } + } + if wh == last { + bmtobj.SetLength(l) + bmtobj.SetSpan(spanLength) + return bmtobj.SumIndexed(nil) + } + return <-c +} + // TestUseSyncAsOrdinaryHasher verifies that the bmt.Hasher can be used with the hash.Hash interface func TestUseSyncAsOrdinaryHasher(t *testing.T) { hasher := sha3.NewLegacyKeccak256 @@ -511,3 +632,26 @@ func TestUseSyncAsOrdinaryHasher(t *testing.T) { t.Fatalf("normalhash; expected %x, got %x", refRes, res) } } + +// TestUseAsyncAsOrdinaryHasher verifies that the bmt.Hasher can be used with the hash.Hash interface +func TestUseAsyncAsOrdinaryHasher(t *testing.T) { + hasher := sha3.NewLegacyKeccak256 + pool := NewTreePool(hasher, segmentCount, PoolSize) + sbmt := New(pool) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + abmt := NewAsyncHasher(ctx, sbmt, false, nil) + abmt.SetSpan(3) + abmt.Write([]byte("foo")) + res := abmt.Sum(nil) + refh := NewRefHasher(hasher, 128) + resh := refh.Hash([]byte("foo")) + hsub := hasher() + span := LengthToSpan(3) + hsub.Write(span) + hsub.Write(resh) + refRes := hsub.Sum(nil) + if !bytes.Equal(res, refRes) { + t.Fatalf("normalhash; expected %x, got %x", refRes, res) + } +} diff --git a/file/hasher/asynchasher.go b/file/hasher/asynchasher.go deleted file mode 100644 index 21f9f1b0a7..0000000000 --- a/file/hasher/asynchasher.go +++ /dev/null @@ -1,194 +0,0 @@ -package hasher - -import ( - "context" - "errors" - "sync" - - "github.com/ethersphere/swarm/bmt" - "github.com/ethersphere/swarm/file" -) - -// NewAsyncWriter extends Hasher with an interface for concurrent segment.GetSection() writes -// TODO: Instead of explicitly setting double size of segment should be dynamic and chunked internally. If not, we have to keep different bmt hashers generation functions for different purposes in the same instance, or cope with added complexity of bmt hasher generation functions having to receive parameters -func NewAsyncHasher(ctx context.Context, h *bmt.Hasher, double bool, errFunc func(error)) *AsyncHasher { - secsize := h.SectionSize() - if double { - secsize *= 2 - } - seccount := h.Branches() - if double { - seccount /= 2 - } - return &AsyncHasher{ - Hasher: h, - double: double, - secsize: secsize, - seccount: seccount, - jobSize: 0, - ctx: ctx, - errFunc: errFunc, - } -} - -// AsyncHasher extends BMT Hasher with an asynchronous segment.GetSection() writer interface -// AsyncHasher cannot be used as with a hash.Hash interface: It must be used with the -// right indexes and length and the right number of sections -// It is unsafe and does not check indexes and section data lengths -// -// behaviour is undefined if -// * non-final sections are shorter or longer than secsize -// * if final section does not match length -// * write a section with index that is higher than length/secsize -// * set length in Sum call when length/secsize < maxsec -// -// * if Sum() is not called on a Hasher that is fully written -// a process will block, can be terminated with Reset -// * it will not leak processes if not all sections are written but it blocks -// and keeps the resource which can be released calling Reset() -type AsyncHasher struct { - *bmt.Hasher // extends the Hasher - mtx sync.Mutex // to lock the cursor access - double bool // whether to use double segments (call Hasher.writeSection) - secsize int // size of base section (size of hash or double) - seccount int // base section count - write func(i int, section []byte, final bool) - errFunc func(error) - ctx context.Context - all bool // if all written in one go, temporary workaround - jobSize int -} - -func (sw *AsyncHasher) raiseError(err string) { - if sw.errFunc != nil { - sw.errFunc(errors.New(err)) - } -} - -// Reset implements file.SectionWriter -func (sw *AsyncHasher) Reset() { - sw.jobSize = 0 - sw.all = false - sw.Hasher.Reset() -} - -// SetLength implements file.SectionWriter -func (sw *AsyncHasher) SetLength(length int) { - sw.jobSize = length -} - -// SetWriter implements file.SectionWriter -func (sw *AsyncHasher) SetWriter(_ file.SectionWriterFunc) file.SectionWriter { - sw.raiseError("Asynchasher does not currently support SectionWriter chaining") - return sw -} - -// SectionSize implements file.SectionWriter -func (sw *AsyncHasher) SectionSize() int { - return sw.secsize -} - -// Branches implements file.SectionWriter -func (sw *AsyncHasher) Branches() int { - return sw.seccount -} - -// WriteSection writes the i-th section of the BMT base -// this function can and is meant to be called concurrently -// it sets max segment threadsafely -func (sw *AsyncHasher) WriteIndexed(i int, section []byte) (int, error) { - // TODO: Temporary workaround for chunkwise write - if i < 0 { - sw.Hasher.Reset() - sw.Hasher.SetLength(len(section)) - sw.Hasher.Write(section) - sw.all = true - return len(section), nil - } - sw.mtx.Lock() - defer sw.mtx.Unlock() - t := sw.GetTree() - // cursor keeps track of the rightmost.GetSection() written so far - // if index is lower than cursor then just write non-final section as is - if i < sw.Hasher.GetCursor() { - // if index is not the rightmost, safe to write section - go sw.WriteSection(i, section, sw.double, false) - return len(section), nil - } - // if there is a previous rightmost.GetSection() safe to write section - if t.GetOffset() > 0 { - if i == sw.Hasher.GetCursor() { - // i==cursor implies cursor was set by Hash call so we can write section as final one - // since it can be shorter, first we copy it to the padded buffer - //t.GetSection() = make([]byte, sw.secsize) - //copy(t.GetSection(), section) - // TODO: Consider whether the section here needs to be copied, maybe we can enforce not change the original slice - copySection := make([]byte, sw.secsize) - copy(copySection, section) - t.SetSection(copySection) - go sw.Hasher.WriteSection(i, t.GetSection(), sw.double, true) - return len(section), nil - } - // the rightmost.GetSection() just changed, so we write the previous one as non-final - go sw.WriteSection(sw.Hasher.GetCursor(), t.GetSection(), sw.double, false) - } - // set i as the index of the righmost.GetSection() written so far - // set t.GetOffset() to cursor*secsize+1 - sw.Hasher.SetCursor(i) - //t.GetOffset() = i*sw.secsize + 1 - t.SetOffset(i*sw.secsize + 1) - //t.GetSection() = make([]byte, sw.secsize) - //copy(t.GetSection(), section) - copySection := make([]byte, sw.secsize) - copy(copySection, section) - t.SetSection(copySection) - return len(section), nil -} - -// Sum can be called any time once the length and the span is known -// potentially even before all segments have been written -// in such cases Sum will block until all segments are present and -// the hash for the length can be calculated. -// -// b: digest is appended to b -// length: known length of the input (unsafe; undefined if out of range) -// meta: metadata to hash together with BMT root for the final digest -// e.g., span for protection against existential forgery -func (sw *AsyncHasher) SumIndexed(b []byte) (s []byte) { - if sw.all { - return sw.Hasher.Sum(nil) - } - sw.mtx.Lock() - t := sw.GetTree() - length := sw.jobSize - if length == 0 { - sw.ReleaseTree() - sw.mtx.Unlock() - s = sw.Hasher.GetZeroHash() - return - } else { - // for non-zero input the rightmost.GetSection() is written to the tree asynchronously - // if the actual last.GetSection() has been written (sw.Hasher.GetCursor() == length/t.secsize) - maxsec := (length - 1) / sw.secsize - if t.GetOffset() > 0 { - go sw.Hasher.WriteSection(sw.Hasher.GetCursor(), t.GetSection(), sw.double, maxsec == sw.Hasher.GetCursor()) - } - // sesw.Hasher.GetCursor() to maxsec so final section is written when it arrives - sw.Hasher.SetCursor(maxsec) - t.SetOffset(length) - // TODO: must this t.result channel be within lock? - result := t.GetResult() - sw.mtx.Unlock() - // wait for the result or reset - s = <-result - } - // relesase the tree back to the pool - sw.ReleaseTree() - meta := t.GetSpan() - // hash together meta and BMT root hash using the pools - hsh := sw.Hasher.GetHasher() - hsh.Reset() - hsh.Write(meta) - hsh.Write(s) - return hsh.Sum(b) -} diff --git a/file/hasher/asynchasher_test.go b/file/hasher/asynchasher_test.go deleted file mode 100644 index 64934d9312..0000000000 --- a/file/hasher/asynchasher_test.go +++ /dev/null @@ -1,207 +0,0 @@ -package hasher - -import ( - "bytes" - "context" - "fmt" - "math/rand" - "testing" - - "github.com/ethersphere/swarm/bmt" - "github.com/ethersphere/swarm/testutil" - "golang.org/x/crypto/sha3" -) - -// tests order-neutral concurrent writes with entire max size written in one go -func TestAsyncCorrectness(t *testing.T) { - data := testutil.RandomBytes(1, bufferSize) - hasher := sha3.NewLegacyKeccak256 - size := hasher().Size() - whs := []whenHash{first, last, random} - - for _, double := range []bool{false, true} { - for _, wh := range whs { - for _, count := range counts { - t.Run(fmt.Sprintf("double_%v_hash_when_%v_segments_%v", double, wh, count), func(t *testing.T) { - max := count * size - var incr int - capacity := 1 - pool := bmt.NewTreePool(hasher, count, capacity) - defer pool.Drain(0) - for n := 1; n <= max; n += incr { - incr = 1 + rand.Intn(5) - bmtobj := bmt.New(pool) - d := data[:n] - rbmtobj := bmt.NewRefHasher(hasher, count) - expNoMeta := rbmtobj.Hash(d) - h := hasher() - h.Write(bmt.ZeroSpan) - h.Write(expNoMeta) - exp := h.Sum(nil) - got := syncHash(bmtobj, 0, d) - if !bytes.Equal(got, exp) { - t.Fatalf("wrong sync hash (syncpart) for datalength %v: expected %x (ref), got %x", n, exp, got) - } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - sw := NewAsyncHasher(ctx, bmtobj, double, nil) - got = asyncHashRandom(sw, 0, d, wh) - if !bytes.Equal(got, exp) { - t.Fatalf("wrong async hash (asyncpart) for datalength %v: expected %x, got %x", n, exp, got) - } - } - }) - } - } - } -} - -func BenchmarkBMTAsync(t *testing.B) { - whs := []whenHash{first, last, random} - for size := 4096; size >= 128; size /= 2 { - for _, wh := range whs { - for _, double := range []bool{false, true} { - t.Run(fmt.Sprintf("double_%v_hash_when_%v_size_%v", double, wh, size), func(t *testing.B) { - benchmarkBMTAsync(t, size, wh, double) - }) - } - } - } -} - -// benchmarks BMT hasher with asynchronous concurrent segment/section writes -func benchmarkBMTAsync(t *testing.B, n int, wh whenHash, double bool) { - data := testutil.RandomBytes(1, n) - hasher := sha3.NewLegacyKeccak256 - pool := bmt.NewTreePool(hasher, segmentCount, bmt.PoolSize) - bmth := bmt.New(pool) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - bmtobj := NewAsyncHasher(ctx, bmth, double, nil) - idxs, segments := splitAndShuffle(bmtobj.SectionSize(), data) - rand.Shuffle(len(idxs), func(i int, j int) { - idxs[i], idxs[j] = idxs[j], idxs[i] - }) - - t.ReportAllocs() - t.ResetTimer() - for i := 0; i < t.N; i++ { - asyncHash(bmtobj, 0, n, wh, idxs, segments) - } -} - -// splits the input data performs a random shuffle to mock async section writes -func asyncHashRandom(bmtobj *AsyncHasher, spanLength int, data []byte, wh whenHash) (s []byte) { - idxs, segments := splitAndShuffle(bmtobj.SectionSize(), data) - return asyncHash(bmtobj, spanLength, len(data), wh, idxs, segments) -} - -// mock for async section writes for file.SectionWriter -// requires a permutation (a random shuffle) of list of all indexes of segments -// and writes them in order to the appropriate section -// the Sum function is called according to the wh parameter (first, last, random [relative to segment writes]) -func asyncHash(bmtobj *AsyncHasher, spanLength int, l int, wh whenHash, idxs []int, segments [][]byte) (s []byte) { - bmtobj.Reset() - if l == 0 { - bmtobj.SetLength(l) - bmtobj.SetSpan(spanLength) - return bmtobj.SumIndexed(nil) - } - c := make(chan []byte, 1) - hashf := func() { - bmtobj.SetLength(l) - bmtobj.SetSpan(spanLength) - c <- bmtobj.SumIndexed(nil) - } - maxsize := len(idxs) - var r int - if wh == random { - r = rand.Intn(maxsize) - } - for i, idx := range idxs { - bmtobj.WriteIndexed(idx, segments[idx]) - if (wh == first || wh == random) && i == r { - go hashf() - } - } - if wh == last { - bmtobj.SetLength(l) - bmtobj.SetSpan(spanLength) - return bmtobj.SumIndexed(nil) - } - return <-c -} - -// TestUseAsyncAsOrdinaryHasher verifies that the bmt.Hasher can be used with the hash.Hash interface -func TestUseAsyncAsOrdinaryHasher(t *testing.T) { - hasher := sha3.NewLegacyKeccak256 - pool := bmt.NewTreePool(hasher, segmentCount, bmt.PoolSize) - sbmt := bmt.New(pool) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - abmt := NewAsyncHasher(ctx, sbmt, false, nil) - abmt.SetSpan(3) - abmt.Write([]byte("foo")) - res := abmt.Sum(nil) - refh := bmt.NewRefHasher(hasher, 128) - resh := refh.Hash([]byte("foo")) - hsub := hasher() - span := bmt.LengthToSpan(3) - hsub.Write(span) - hsub.Write(resh) - refRes := hsub.Sum(nil) - if !bytes.Equal(res, refRes) { - t.Fatalf("normalhash; expected %x, got %x", refRes, res) - } -} - -// COPIED FROM bmt test package -// MERGE LATER - -// Hash hashes the data and the span using the bmt hasher -func syncHash(h *bmt.Hasher, spanLength int, data []byte) []byte { - h.Reset() - h.SetSpan(spanLength) - h.Write(data) - return h.Sum(nil) -} - -func splitAndShuffle(secsize int, data []byte) (idxs []int, segments [][]byte) { - l := len(data) - n := l / secsize - if l%secsize > 0 { - n++ - } - for i := 0; i < n; i++ { - idxs = append(idxs, i) - end := (i + 1) * secsize - if end > l { - end = l - } - section := data[i*secsize : end] - segments = append(segments, section) - } - rand.Shuffle(n, func(i int, j int) { - idxs[i], idxs[j] = idxs[j], idxs[i] - }) - return idxs, segments -} - -const ( - // segmentCount is the maximum number of segments of the underlying chunk - // Should be equal to max-chunk-data-size / hash-size - // Currently set to 128 == 4096 (default chunk size) / 32 (sha3.keccak256 size) - segmentCount = 128 -) - -const bufferSize = 4128 - -type whenHash = int - -const ( - first whenHash = iota - last - random -) - -var counts = []int{1, 2, 3, 4, 5, 8, 9, 15, 16, 17, 32, 37, 42, 53, 63, 64, 65, 111, 127, 128} From af6f1c9eb51b7683104085c9c394f584237a99a8 Mon Sep 17 00:00:00 2001 From: nolash Date: Wed, 18 Dec 2019 09:00:14 +0100 Subject: [PATCH 12/16] bmt, file: Remove redundant SetLength method from interface --- bmt/bmt.go | 18 +----------------- bmt/bmt_test.go | 9 +++------ file/types.go | 1 - 3 files changed, 4 insertions(+), 24 deletions(-) diff --git a/bmt/bmt.go b/bmt/bmt.go index 44a469193b..e3b364b472 100644 --- a/bmt/bmt.go +++ b/bmt/bmt.go @@ -301,10 +301,6 @@ func (h *Hasher) SectionSize() int { return h.pool.SegmentSize } -// SetLength implements file.SectionWriter -func (h *Hasher) SetLength(length int) { -} - // SetSpan implements file.SectionWriter func (h *Hasher) SetSpan(length int) { span := LengthToSpan(length) @@ -448,7 +444,6 @@ func NewAsyncHasher(ctx context.Context, h *Hasher, double bool, errFunc func(er double: double, secsize: secsize, seccount: seccount, - jobSize: 0, ctx: ctx, errFunc: errFunc, } @@ -479,7 +474,6 @@ type AsyncHasher struct { errFunc func(error) ctx context.Context all bool // if all written in one go, temporary workaround - jobSize int } func (sw *AsyncHasher) raiseError(err string) { @@ -490,16 +484,10 @@ func (sw *AsyncHasher) raiseError(err string) { // Reset implements file.SectionWriter func (sw *AsyncHasher) Reset() { - sw.jobSize = 0 sw.all = false sw.Hasher.Reset() } -// SetLength implements file.SectionWriter -func (sw *AsyncHasher) SetLength(length int) { - sw.jobSize = length -} - // SectionSize implements file.SectionWriter func (sw *AsyncHasher) SectionSize() int { return sw.secsize @@ -560,13 +548,9 @@ func (sw *AsyncHasher) WriteIndexed(i int, section []byte) (int, error) { // length: known length of the input (unsafe; undefined if out of range) // meta: metadata to hash together with BMT root for the final digest // e.g., span for protection against existential forgery -func (sw *AsyncHasher) SumIndexed(b []byte) (s []byte) { - if sw.all { - return sw.Hasher.Sum(nil) - } +func (sw *AsyncHasher) SumIndexed(b []byte, length int) (s []byte) { sw.mtx.Lock() t := sw.GetTree() - length := sw.jobSize if length == 0 { sw.ReleaseTree() sw.mtx.Unlock() diff --git a/bmt/bmt_test.go b/bmt/bmt_test.go index 00709088e5..ec912101e2 100644 --- a/bmt/bmt_test.go +++ b/bmt/bmt_test.go @@ -584,15 +584,13 @@ func asyncHashRandom(bmtobj *AsyncHasher, spanLength int, data []byte, wh whenHa func asyncHash(bmtobj *AsyncHasher, spanLength int, l int, wh whenHash, idxs []int, segments [][]byte) (s []byte) { bmtobj.Reset() if l == 0 { - bmtobj.SetLength(l) bmtobj.SetSpan(spanLength) - return bmtobj.SumIndexed(nil) + return bmtobj.SumIndexed(nil, l) } c := make(chan []byte, 1) hashf := func() { - bmtobj.SetLength(l) bmtobj.SetSpan(spanLength) - c <- bmtobj.SumIndexed(nil) + c <- bmtobj.SumIndexed(nil, l) } maxsize := len(idxs) var r int @@ -606,9 +604,8 @@ func asyncHash(bmtobj *AsyncHasher, spanLength int, l int, wh whenHash, idxs []i } } if wh == last { - bmtobj.SetLength(l) bmtobj.SetSpan(spanLength) - return bmtobj.SumIndexed(nil) + return bmtobj.SumIndexed(nil, l) } return <-c } diff --git a/file/types.go b/file/types.go index 22ea775436..5ad84fce90 100644 --- a/file/types.go +++ b/file/types.go @@ -10,7 +10,6 @@ type SectionWriterFunc func(ctx context.Context) SectionWriter type SectionWriter interface { hash.Hash // Write,Sum,Reset,Size,BlockSize SetWriter(hashFunc SectionWriterFunc) SectionWriter // chain another SectionWriter the current instance - SetLength(length int) // set total number of bytes that will be written to SectionWriter SetSpan(length int) // set data span of chunk SectionSize() int // section size of this SectionWriter Branches() int // branch factor of this SectionWriter From 47b9667600547e272624eb3676bd05618d74b378 Mon Sep 17 00:00:00 2001 From: nolash Date: Wed, 18 Dec 2019 09:09:09 +0100 Subject: [PATCH 13/16] bmt: Remove redundant return values in WriteIndexed --- bmt/bmt.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/bmt/bmt.go b/bmt/bmt.go index e3b364b472..3eb1a3e641 100644 --- a/bmt/bmt.go +++ b/bmt/bmt.go @@ -501,7 +501,7 @@ func (sw *AsyncHasher) Branches() int { // WriteSection writes the i-th section of the BMT base // this function can and is meant to be called concurrently // it sets max segment threadsafely -func (sw *AsyncHasher) WriteIndexed(i int, section []byte) (int, error) { +func (sw *AsyncHasher) WriteIndexed(i int, section []byte) { sw.mtx.Lock() defer sw.mtx.Unlock() t := sw.GetTree() @@ -510,7 +510,7 @@ func (sw *AsyncHasher) WriteIndexed(i int, section []byte) (int, error) { if i < sw.Hasher.GetCursor() { // if index is not the rightmost, safe to write section go sw.WriteSection(i, section, sw.double, false) - return len(section), nil + return } // if there is a previous rightmost.GetSection() safe to write section if t.GetOffset() > 0 { @@ -524,7 +524,7 @@ func (sw *AsyncHasher) WriteIndexed(i int, section []byte) (int, error) { copy(copySection, section) t.SetSection(copySection) go sw.Hasher.WriteSection(i, t.GetSection(), sw.double, true) - return len(section), nil + return } // the rightmost.GetSection() just changed, so we write the previous one as non-final go sw.WriteSection(sw.Hasher.GetCursor(), t.GetSection(), sw.double, false) @@ -536,7 +536,7 @@ func (sw *AsyncHasher) WriteIndexed(i int, section []byte) (int, error) { copySection := make([]byte, sw.secsize) copy(copySection, section) t.SetSection(copySection) - return len(section), nil + return } // Sum can be called any time once the length and the span is known @@ -589,7 +589,6 @@ func (h *Hasher) WriteSection(i int, section []byte, double bool, final bool) { } // writeSection writes the hash of i-th section into level 1 node of the BMT tree -// TODO: h.size increases even on multiple writes to the same section of a section func (h *Hasher) writeSection(i int, section []byte, double bool, final bool) { // select the leaf node for the section var n *node @@ -764,52 +763,65 @@ func LengthToSpan(length int) []byte { } // ASYNCHASHER ACCESSORS +// All methods below here are exported to enable access for AsyncHasher +// // GetHasher returns a new instance of the underlying hasher func (h *Hasher) GetHasher() hash.Hash { return h.pool.hasher() } +// GetZeroHash returns the zero hash of the full depth of the Hasher instance func (h *Hasher) GetZeroHash() []byte { return h.pool.zerohashes[h.pool.Depth] } +// GetTree gets the underlying tree in use by the Hasher func (h *Hasher) GetTree() *tree { return h.getTree() } +// GetTree releases the underlying tree in use by the Hasher func (h *Hasher) ReleaseTree() { h.releaseTree() } +// GetCursor returns the current write cursor for the Hasher func (h *Hasher) GetCursor() int { return h.cursor } +// GetCursor assigns the value of the current write cursor for the Hasher func (h *Hasher) SetCursor(c int) { h.cursor = c } +// GetOffset returns the write offset within the current section of the Hasher func (t *tree) GetOffset() int { return t.offset } +// GetOffset assigns the value of the write offset within the current section of the Hasher func (t *tree) SetOffset(offset int) { t.offset = offset } +// GetSection returns the current section Hasher is operating on func (t *tree) GetSection() []byte { return t.section } +// SetSection assigns the current section Hasher is operating on func (t *tree) SetSection(b []byte) { t.section = b } +// GetResult returns the result channel of the Hasher func (t *tree) GetResult() <-chan []byte { return t.result } +// GetSpan returns the span set by SetSpan func (t *tree) GetSpan() []byte { return t.span } From adc45db57ef6d82cb5cfa44629b27375615de5d4 Mon Sep 17 00:00:00 2001 From: nolash Date: Mon, 3 Feb 2020 10:19:04 +0100 Subject: [PATCH 14/16] bmt: Add comments, use GetZeroHash in hasher.Sum --- bmt/bmt.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/bmt/bmt.go b/bmt/bmt.go index 3eb1a3e641..6770b1bc54 100644 --- a/bmt/bmt.go +++ b/bmt/bmt.go @@ -336,7 +336,8 @@ func (h *Hasher) Sum(b []byte) (s []byte) { t := h.getTree() if h.size == 0 && t.offset == 0 { h.releaseTree() - return h.pool.zerohashes[h.pool.Depth] + //return h.pool.zerohashes[h.pool.Depth] + return h.GetZeroHash() } // write the last section with final flag set to true go h.WriteSection(t.cursor, t.section, true, true) @@ -526,7 +527,7 @@ func (sw *AsyncHasher) WriteIndexed(i int, section []byte) { go sw.Hasher.WriteSection(i, t.GetSection(), sw.double, true) return } - // the rightmost.GetSection() just changed, so we write the previous one as non-final + // the rightmost section just changed, so we write the previous one as non-final go sw.WriteSection(sw.Hasher.GetCursor(), t.GetSection(), sw.double, false) } // set i as the index of the righmost.GetSection() written so far @@ -536,7 +537,6 @@ func (sw *AsyncHasher) WriteIndexed(i int, section []byte) { copySection := make([]byte, sw.secsize) copy(copySection, section) t.SetSection(copySection) - return } // Sum can be called any time once the length and the span is known @@ -583,6 +583,9 @@ func (sw *AsyncHasher) SumIndexed(b []byte, length int) (s []byte) { return hsh.Sum(b) } +// Writesection writes data to the data level in the section at index i. +// Setting final to true tells the hasher no further data will be written and prepares the data for h.Sum() +// TODO remove double as argument, push responsibility for handling data context to caller func (h *Hasher) WriteSection(i int, section []byte, double bool, final bool) { h.size += len(section) h.writeSection(i, section, double, final) From 8221d1b62040bfba41fa17c2fa04958e71b89fcc Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 4 Feb 2020 13:33:41 +0100 Subject: [PATCH 15/16] bmt: Fix races, remove sync hash run in async correctness test --- bmt/bmt.go | 15 +++++++++------ bmt/bmt_test.go | 6 +----- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/bmt/bmt.go b/bmt/bmt.go index 6770b1bc54..1013fee0fd 100644 --- a/bmt/bmt.go +++ b/bmt/bmt.go @@ -85,10 +85,11 @@ type BaseHasherFunc func() hash.Hash // the tree and itself in a state reusable for hashing a new chunk // - generates and verifies segment inclusion proofs (TODO:) type Hasher struct { - pool *TreePool // BMT resource pool - bmt *tree // prebuilt BMT resource for flowcontrol and proofs - size int // bytes written to Hasher since last Reset() - cursor int // cursor to write to on next Write() call + mtx sync.Mutex // protects Hasher.size increments (temporary solution) + pool *TreePool // BMT resource pool + bmt *tree // prebuilt BMT resource for flowcontrol and proofs + size int // bytes written to Hasher since last Reset() + cursor int // cursor to write to on next Write() call errFunc func(error) ctx context.Context } @@ -290,7 +291,7 @@ func newTree(segmentSize, depth int, hashfunc func() hash.Hash) *tree { } } -// Implements file.SectionWriter +// SetWriter implements file.SectionWriter func (h *Hasher) SetWriter(_ file.SectionWriterFunc) file.SectionWriter { log.Warn("Synchasher does not currently support SectionWriter chaining") return h @@ -573,8 +574,8 @@ func (sw *AsyncHasher) SumIndexed(b []byte, length int) (s []byte) { s = <-result } // relesase the tree back to the pool - sw.ReleaseTree() meta := t.GetSpan() + sw.ReleaseTree() // hash together meta and BMT root hash using the pools hsh := sw.Hasher.GetHasher() hsh.Reset() @@ -587,7 +588,9 @@ func (sw *AsyncHasher) SumIndexed(b []byte, length int) (s []byte) { // Setting final to true tells the hasher no further data will be written and prepares the data for h.Sum() // TODO remove double as argument, push responsibility for handling data context to caller func (h *Hasher) WriteSection(i int, section []byte, double bool, final bool) { + h.mtx.Lock() h.size += len(section) + h.mtx.Unlock() h.writeSection(i, section, double, final) } diff --git a/bmt/bmt_test.go b/bmt/bmt_test.go index ec912101e2..8c1adb6038 100644 --- a/bmt/bmt_test.go +++ b/bmt/bmt_test.go @@ -207,14 +207,10 @@ func TestAsyncCorrectness(t *testing.T) { h.Write(ZeroSpan) h.Write(expNoMeta) exp := h.Sum(nil) - got := syncHash(bmtobj, 0, d) - if !bytes.Equal(got, exp) { - t.Fatalf("wrong sync hash (syncpart) for datalength %v: expected %x (ref), got %x", n, exp, got) - } ctx, cancel := context.WithCancel(context.Background()) defer cancel() sw := NewAsyncHasher(ctx, bmtobj, double, nil) - got = asyncHashRandom(sw, 0, d, wh) + got := asyncHashRandom(sw, 0, d, wh) if !bytes.Equal(got, exp) { t.Fatalf("wrong async hash (asyncpart) for datalength %v: expected %x, got %x", n, exp, got) } From fda2831f3a795e9a920b42f5fa095b6cc6455506 Mon Sep 17 00:00:00 2001 From: nolash Date: Fri, 7 Feb 2020 14:57:31 +0100 Subject: [PATCH 16/16] bmt: Assign bmthash result in test for profiling w/o optimz --- bmt/bmt_bench_test.go | 13 +++++++++++++ bmt/bmt_test.go | 6 +++++- 2 files changed, 18 insertions(+), 1 deletion(-) create mode 100644 bmt/bmt_bench_test.go diff --git a/bmt/bmt_bench_test.go b/bmt/bmt_bench_test.go new file mode 100644 index 0000000000..05b8308344 --- /dev/null +++ b/bmt/bmt_bench_test.go @@ -0,0 +1,13 @@ +package bmt + +import ( + "fmt" + "testing" +) + +func BenchmarkBMTUsed(t *testing.B) { + size := 4096 + t.Run(fmt.Sprintf("%v_size_%v", "BMT", size), func(t *testing.B) { + benchmarkBMT(t, size) + }) +} diff --git a/bmt/bmt_test.go b/bmt/bmt_test.go index 8c1adb6038..2662359ba4 100644 --- a/bmt/bmt_test.go +++ b/bmt/bmt_test.go @@ -47,6 +47,8 @@ const ( var counts = []int{1, 2, 3, 4, 5, 8, 9, 15, 16, 17, 32, 37, 42, 53, 63, 64, 65, 111, 127, 128} +var benchmarkBMTResult []byte + // calculates the Keccak256 SHA3 hash of the data func sha3hash(data ...[]byte) []byte { h := sha3.NewLegacyKeccak256() @@ -473,12 +475,14 @@ func benchmarkBMT(t *testing.B, n int) { hasher := sha3.NewLegacyKeccak256 pool := NewTreePool(hasher, segmentCount, PoolSize) bmt := New(pool) + var r []byte t.ReportAllocs() t.ResetTimer() for i := 0; i < t.N; i++ { - syncHash(bmt, 0, data) + r = syncHash(bmt, 0, data) } + benchmarkBMTResult = r } // benchmarks BMT hasher with asynchronous concurrent segment/section writes