diff --git a/pkg/file/joiner/joiner.go b/pkg/file/joiner/joiner.go index 69ae16065bd..1d64644b553 100644 --- a/pkg/file/joiner/joiner.go +++ b/pkg/file/joiner/joiner.go @@ -80,10 +80,16 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage. } return d } - remove := func() { + remove := func(err error) { g.mu.Lock() defer g.mu.Unlock() - g.cache[key] = nil + if err != nil { + // signals that a new getter is needed to reattempt to recover the data + delete(g.cache, key) + } else { + // signals that the chunks were fetched/recovered/cached so a future getter is not needed + g.cache[key] = nil + } } d = getter.New(addrs, shardCnt, g.fetcher, g.putter, remove, g.config) g.cache[key] = d diff --git a/pkg/file/redundancy/getter/getter.go b/pkg/file/redundancy/getter/getter.go index d0082212d8a..2296d3f4a06 100644 --- a/pkg/file/redundancy/getter/getter.go +++ b/pkg/file/redundancy/getter/getter.go @@ -26,6 +26,7 @@ var ( // if retrieves children of an intermediate chunk potentially using erasure decoding // it caches sibling chunks if erasure decoding started already type decoder struct { + ctx context.Context fetcher storage.Getter // network retrieval interface to fetch chunks putter storage.Putter // interface to local storage to save reconstructed chunks addrs []swarm.Address // all addresses of the intermediate chunk @@ -44,7 +45,7 @@ type decoder struct { fetchedCnt atomic.Int32 // count successful retrievals failedCnt atomic.Int32 // count successful retrievals cancel func() // cancel function for RS decoding - remove func() // callback to remove decoder from decoders cache + remove func(error) // callback to remove decoder from decoders cache config Config // configuration logger log.Logger } @@ -55,11 +56,12 @@ type Getter interface { } // New returns a decoder object used to retrieve children of an intermediate chunk -func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(), conf Config) Getter { +func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(error), conf Config) Getter { ctx, cancel := context.WithCancel(context.Background()) size := len(addrs) d := &decoder{ + ctx: ctx, fetcher: g, putter: p, addrs: addrs, @@ -143,7 +145,14 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e defer cancel() g.wg.Add(1) - defer g.wg.Done() + go func() { + select { + case <-fctx.Done(): // local context + case <-g.ctx.Done(): // global context + } + cancel() + g.wg.Done() + }() // retrieval ch, err := g.fetcher.Get(fctx, g.addrs[i]) @@ -172,8 +181,9 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e return waitRecovery(storage.ErrNotFound) } -func (g *decoder) prefetch(ctx context.Context) error { - defer g.remove() +func (g *decoder) prefetch(ctx context.Context) (err error) { + defer g.remove(err) + defer g.cancel() run := func(s Strategy) error { if err := g.runStrategy(ctx, s); err != nil { @@ -183,7 +193,6 @@ func (g *decoder) prefetch(ctx context.Context) error { return g.recover(ctx) } - var err error for s := g.config.Strategy; s < strategyCnt; s++ { err = run(s) @@ -377,6 +386,6 @@ func (g *decoder) save(ctx context.Context, missing []int) error { func (g *decoder) Close() error { g.cancel() g.wg.Wait() - g.remove() + g.remove(nil) return nil } diff --git a/pkg/file/redundancy/getter/getter_test.go b/pkg/file/redundancy/getter/getter_test.go index 95609fc4c1e..ee1132277cb 100644 --- a/pkg/file/redundancy/getter/getter_test.go +++ b/pkg/file/redundancy/getter/getter_test.go @@ -123,7 +123,7 @@ func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) { StrategyTimeout: strategyTimeout, Logger: log.Noop, } - g := getter.New(addrs, shardCnt, store, store, func() {}, conf) + g := getter.New(addrs, shardCnt, store, store, func(error) {}, conf) defer g.Close() parityCnt := len(buf) - shardCnt q := make(chan error, 1) @@ -199,7 +199,7 @@ func testDecodingFallback(t *testing.T, s getter.Strategy, strict bool) { FetchTimeout: strategyTimeout / 2, StrategyTimeout: strategyTimeout, } - g := getter.New(addrs, shardCnt, store, store, func() {}, conf) + g := getter.New(addrs, shardCnt, store, store, func(error) {}, conf) defer g.Close() // launch delayed and erased chunk retrieval