Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(getter): cancel inflight requests if enough chunks are fetched for recovery #4608

Merged
merged 4 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,8 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV
ls := loadsave.NewReadonly(s.storer.Download(cache))
feedDereferenced := false

strategyTimeout := getter.DefaultStrategyTimeout.String()

ctx := r.Context()
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout, logger)
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, logger)
if err != nil {
logger.Error(err, err.Error())
jsonhttp.BadRequest(w, "could not parse headers")
Expand Down Expand Up @@ -537,10 +535,8 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h
cache = *headers.Cache
}

strategyTimeout := getter.DefaultStrategyTimeout.String()

ctx := r.Context()
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout, logger)
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, logger)
if err != nil {
logger.Error(err, err.Error())
jsonhttp.BadRequest(w, "could not parse headers")
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/bzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ import (
// using redundancy to reconstruct the file and find the file recoverable.
//
// nolint:thelper
func TestBzzUploadDownloadWithRedundancy(t *testing.T) {
func TestBzzUploadDownloadWithRedundancy_FLAKY(t *testing.T) {
t.Parallel()
fileUploadResource := "/bzz"
fileDownloadResource := func(addr string) string { return "/bzz/" + addr + "/" }
Expand Down
10 changes: 8 additions & 2 deletions pkg/file/joiner/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,16 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.
}
return d
}
remove := func() {
remove := func(err error) {
g.mu.Lock()
defer g.mu.Unlock()
g.cache[key] = nil
if err != nil {
// signals that a new getter is needed to reattempt to recover the data
delete(g.cache, key)
} else {
// signals that the chunks were fetched/recovered/cached so a future getter is not needed
g.cache[key] = nil
}
}
d = getter.New(addrs, shardCnt, g.fetcher, g.putter, remove, g.config)
g.cache[key] = d
Expand Down
32 changes: 14 additions & 18 deletions pkg/file/joiner/joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1109,17 +1109,15 @@ func TestJoinerRedundancy(t *testing.T) {
if err != nil {
t.Fatal(err)
}
strategyTimeout := 100 * time.Millisecond
// all data can be read back
readCheck := func(t *testing.T, expErr error) {
ctx := context.Background()

strategyTimeoutStr := strategyTimeout.String()
decodeTimeoutStr := (10 * strategyTimeout).String()
decodeTimeoutStr := time.Second.String()
fallback := true
s := getter.RACE

ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodeTimeoutStr, &strategyTimeoutStr, log.Noop)
ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodeTimeoutStr, log.Noop)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1229,7 +1227,7 @@ func TestJoinerRedundancy(t *testing.T) {
// nolint:thelper
func TestJoinerRedundancyMultilevel(t *testing.T) {
t.Parallel()
test := func(t *testing.T, rLevel redundancy.Level, encrypt bool, levels, size int) {
test := func(t *testing.T, rLevel redundancy.Level, encrypt bool, size int) {
t.Helper()
store := mockstorer.NewForgettingStore(inmemchunkstore.New())
testutil.CleanupCloser(t, store)
Expand All @@ -1249,14 +1247,12 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
expRead := swarm.ChunkSize
buf := make([]byte, expRead)
offset := mrand.Intn(size) * expRead
canReadRange := func(t *testing.T, s getter.Strategy, fallback bool, levels int, canRead bool) {
canReadRange := func(t *testing.T, s getter.Strategy, fallback bool, canRead bool) {
ctx := context.Background()
strategyTimeout := 100 * time.Millisecond

strategyTimeoutStr := strategyTimeout.String()
decodingTimeoutStr := (2 * strategyTimeout).String()
decodingTimeoutStr := (200 * time.Millisecond).String()

ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodingTimeoutStr, &strategyTimeoutStr, log.Noop)
ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodingTimeoutStr, log.Noop)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1295,35 +1291,35 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
t.Run("NONE w/o fallback CAN retrieve", func(t *testing.T) {
store.Record()
defer store.Unrecord()
canReadRange(t, getter.NONE, false, levels, true)
canReadRange(t, getter.NONE, false, true)
})

// do not forget the root chunk
store.Unmiss(swarm.NewAddress(addr.Bytes()[:swarm.HashSize]))
// after we forget the chunks on the way to the range, we should not be able to retrieve
t.Run("NONE w/o fallback CANNOT retrieve", func(t *testing.T) {
canReadRange(t, getter.NONE, false, levels, false)
canReadRange(t, getter.NONE, false, false)
})

// we lost a data chunk, we cannot recover using DATA only strategy with no fallback
t.Run("DATA w/o fallback CANNOT retrieve", func(t *testing.T) {
canReadRange(t, getter.DATA, false, levels, false)
canReadRange(t, getter.DATA, false, false)
})

if rLevel == 0 {
// allowing fallback mode will not help if no redundancy used for upload
t.Run("DATA w fallback CANNOT retrieve", func(t *testing.T) {
canReadRange(t, getter.DATA, true, levels, false)
canReadRange(t, getter.DATA, true, false)
})
return
}
// allowing fallback mode will make the range retrievable using erasure decoding
t.Run("DATA w fallback CAN retrieve", func(t *testing.T) {
canReadRange(t, getter.DATA, true, levels, true)
canReadRange(t, getter.DATA, true, true)
})
// after the reconstructed data is stored, we can retrieve the range using DATA only mode
t.Run("after recovery, NONE w/o fallback CAN retrieve", func(t *testing.T) {
canReadRange(t, getter.NONE, false, levels, true)
canReadRange(t, getter.NONE, false, true)
})
}
r2level := []int{2, 1, 2, 3, 2}
Expand Down Expand Up @@ -1353,7 +1349,7 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
if r2level[rLevel] != levels || encrypt != encryptChunk[rLevel] {
t.Skip("skipping to save time")
}
test(t, rLevel, encrypt, levels, chunkCnt)
test(t, rLevel, encrypt, chunkCnt)
})
switch levels {
case 1:
Expand All @@ -1364,7 +1360,7 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
continue
}
t.Run(fmt.Sprintf("encrypt=%v levels=%d chunks=%d full", encrypt, levels, chunkCnt), func(t *testing.T) {
test(t, rLevel, encrypt, levels, chunkCnt)
test(t, rLevel, encrypt, chunkCnt)
})
}
}
Expand Down
27 changes: 18 additions & 9 deletions pkg/file/redundancy/getter/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
// if retrieves children of an intermediate chunk potentially using erasure decoding
// it caches sibling chunks if erasure decoding started already
type decoder struct {
ctx context.Context
fetcher storage.Getter // network retrieval interface to fetch chunks
putter storage.Putter // interface to local storage to save reconstructed chunks
addrs []swarm.Address // all addresses of the intermediate chunk
Expand All @@ -40,11 +41,10 @@ type decoder struct {
parityCnt int // number of parity shards
wg sync.WaitGroup // wait group to wait for all goroutines to finish
mu sync.Mutex // mutex to protect buffer
err error // error of the last erasure decoding
fetchedCnt atomic.Int32 // count successful retrievals
failedCnt atomic.Int32 // count successful retrievals
cancel func() // cancel function for RS decoding
remove func() // callback to remove decoder from decoders cache
remove func(error) // callback to remove decoder from decoders cache
config Config // configuration
logger log.Logger
}
Expand All @@ -55,11 +55,13 @@ type Getter interface {
}

// New returns a decoder object used to retrieve children of an intermediate chunk
func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(), conf Config) Getter {
func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(error), conf Config) Getter {
// global context is canceled when the Close is called or when the prefetch terminates
ctx, cancel := context.WithCancel(context.Background())
size := len(addrs)

d := &decoder{
ctx: ctx,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is known anti-pattern. And clearly not needed here

fetcher: g,
putter: p,
addrs: addrs,
Expand Down Expand Up @@ -92,7 +94,7 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.err = d.prefetch(ctx)
_ = d.prefetch(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to call remove(err) also for decoder without a prefetch process

}()
} else { // recovery not allowed
close(d.badRecovery)
Expand Down Expand Up @@ -143,7 +145,14 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e
defer cancel()

g.wg.Add(1)
defer g.wg.Done()
go func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this for?

select {
case <-fctx.Done(): // local context
case <-g.ctx.Done(): // global context
}
cancel()
g.wg.Done()
}()

// retrieval
ch, err := g.fetcher.Get(fctx, g.addrs[i])
Expand Down Expand Up @@ -172,8 +181,9 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e
return waitRecovery(storage.ErrNotFound)
}

func (g *decoder) prefetch(ctx context.Context) error {
defer g.remove()
func (g *decoder) prefetch(ctx context.Context) (err error) {
defer g.remove(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will never give you non-nil.
you need

defer func() { g.remove(err) }()

no?

defer g.cancel()

run := func(s Strategy) error {
if err := g.runStrategy(ctx, s); err != nil {
Expand All @@ -183,7 +193,6 @@ func (g *decoder) prefetch(ctx context.Context) error {
return g.recover(ctx)
}

var err error
for s := g.config.Strategy; s < strategyCnt; s++ {

err = run(s)
Expand Down Expand Up @@ -377,6 +386,6 @@ func (g *decoder) save(ctx context.Context, missing []int) error {
func (g *decoder) Close() error {
g.cancel()
g.wg.Wait()
g.remove()
g.remove(nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we force close the decoder, the joiner entry should be completely removed from the cache so that when the same scope is called another time, the decoder is to rebuilt

return nil
}
46 changes: 11 additions & 35 deletions pkg/file/redundancy/getter/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ import (

"github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/file/redundancy/getter"
"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/storage"
inmem "github.com/ethersphere/bee/pkg/storage/inmemchunkstore"
mockstorer "github.com/ethersphere/bee/pkg/storer/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/util/testutil/racedetection"
"github.com/ethersphere/bee/pkg/util/testutil"
"github.com/klauspost/reedsolomon"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -96,10 +95,6 @@ func TestGetterFallback(t *testing.T) {

func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) {
t.Helper()
strategyTimeout := 100 * time.Millisecond
if racedetection.On {
strategyTimeout *= 2
}
store := inmem.New()
buf := make([][]byte, bufSize)
addrs := initData(t, buf, shardCnt, store)
Expand All @@ -115,31 +110,13 @@ func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) {
if len(addr.Bytes()) == 0 {
t.Skip("no data shard erased")
}
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
conf := getter.Config{
Strategy: getter.RACE,
FetchTimeout: 2 * strategyTimeout,
StrategyTimeout: strategyTimeout,
Logger: log.Noop,
}
g := getter.New(addrs, shardCnt, store, store, func() {}, conf)
defer g.Close()

g := getter.New(addrs, shardCnt, store, store, func(error) {}, getter.DefaultConfig)
testutil.CleanupCloser(t, g)

parityCnt := len(buf) - shardCnt
q := make(chan error, 1)
go func() {
_, err := g.Get(ctx, addr)
q <- err
}()
err := context.DeadlineExceeded
wait := strategyTimeout * 2
if racedetection.On {
wait *= 2
}
select {
case err = <-q:
case <-time.After(wait):
}
_, err := g.Get(context.Background(), addr)

switch {
case erasureCnt > parityCnt:
t.Run("unable to recover", func(t *testing.T) {
Expand Down Expand Up @@ -194,12 +171,11 @@ func testDecodingFallback(t *testing.T, s getter.Strategy, strict bool) {
// create getter
start := time.Now()
conf := getter.Config{
Strategy: s,
Strict: strict,
FetchTimeout: strategyTimeout / 2,
StrategyTimeout: strategyTimeout,
Strategy: s,
Strict: strict,
FetchTimeout: strategyTimeout / 2,
}
g := getter.New(addrs, shardCnt, store, store, func() {}, conf)
g := getter.New(addrs, shardCnt, store, store, func(error) {}, conf)
defer g.Close()

// launch delayed and erased chunk retrieval
Expand Down
Loading
Loading