diff --git a/pkg/api/metrics.go b/pkg/api/metrics.go index 9146d1079a4..06e11b31024 100644 --- a/pkg/api/metrics.go +++ b/pkg/api/metrics.go @@ -108,9 +108,6 @@ type UpgradedResponseWriter interface { http.Pusher http.Hijacker http.Flusher - // staticcheck SA1019 CloseNotifier interface is required by gorilla compress handler - // nolint:staticcheck - http.CloseNotifier } type responseWriter struct { diff --git a/pkg/file/redundancy/getter/getter.go b/pkg/file/redundancy/getter/getter.go index 230ffbb63eb..f533b67db8e 100644 --- a/pkg/file/redundancy/getter/getter.go +++ b/pkg/file/redundancy/getter/getter.go @@ -7,7 +7,6 @@ package getter import ( "context" "errors" - "io" "sync" "sync/atomic" @@ -26,42 +25,33 @@ var ( // if retrieves children of an intermediate chunk potentially using erasure decoding // it caches sibling chunks if erasure decoding started already type decoder struct { - ctx context.Context - fetcher storage.Getter // network retrieval interface to fetch chunks - putter storage.Putter // interface to local storage to save reconstructed chunks - addrs []swarm.Address // all addresses of the intermediate chunk - inflight []atomic.Bool // locks to protect wait channels and RS buffer - cache map[string]int // map from chunk address shard position index - waits []chan error // wait channels for each chunk - rsbuf [][]byte // RS buffer of data + parity shards for erasure decoding - goodRecovery chan struct{} // signal channel for successful retrieval of shardCnt chunks - badRecovery chan struct{} // signals that either the recovery has failed or not allowed to run - lastLen int // length of the last data chunk in the RS buffer - shardCnt int // number of data shards - parityCnt int // number of parity shards - wg sync.WaitGroup // wait group to wait for all goroutines to finish - mu sync.Mutex // mutex to protect buffer - fetchedCnt atomic.Int32 // count successful retrievals - failedCnt atomic.Int32 // count successful retrievals - cancel func() // cancel function for RS decoding - remove func(error) // callback to remove decoder from decoders cache - config Config // configuration - logger log.Logger -} - -type Getter interface { - storage.Getter - io.Closer + fetcher storage.Getter // network retrieval interface to fetch chunks + putter storage.Putter // interface to local storage to save reconstructed chunks + addrs []swarm.Address // all addresses of the intermediate chunk + inflight []atomic.Bool // locks to protect wait channels and RS buffer + cache map[string]int // map from chunk address shard position index + waits []chan error // wait channels for each chunk + rsbuf [][]byte // RS buffer of data + parity shards for erasure decoding + goodRecovery chan struct{} // signal channel for successful retrieval of shardCnt chunks + badRecovery chan struct{} // signals that either the recovery has failed or not allowed to run + lastLen int // length of the last data chunk in the RS buffer + shardCnt int // number of data shards + parityCnt int // number of parity shards + wg sync.WaitGroup // wait group to wait for all goroutines to finish + mu sync.Mutex // mutex to protect buffer + fetchedCnt atomic.Int32 // count successful retrievals + failedCnt atomic.Int32 // count successful retrievals + remove func(error) // callback to remove decoder from decoders cache + config Config // configuration + prefetchTriggered bool // indicates that prefetch has been triggerd + logger log.Logger } // New returns a decoder object used to retrieve children of an intermediate chunk -func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(error), conf Config) Getter { - // global context is canceled when the Close is called or when the prefetch terminates - ctx, cancel := context.WithCancel(context.Background()) +func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(error), conf Config) storage.Getter { size := len(addrs) d := &decoder{ - ctx: ctx, fetcher: g, putter: p, addrs: addrs, @@ -71,7 +61,6 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter rsbuf: make([][]byte, size), goodRecovery: make(chan struct{}), badRecovery: make(chan struct{}), - cancel: cancel, remove: remove, shardCnt: shardCnt, parityCnt: size - shardCnt, @@ -91,13 +80,12 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter // prefetch chunks according to strategy if !conf.Strict || conf.Strategy != NONE { + d.prefetchTriggered = true d.wg.Add(1) go func() { defer d.wg.Done() - _ = d.prefetch(ctx) + d.prefetch() }() - } else { // recovery not allowed - close(d.badRecovery) } return d @@ -110,7 +98,7 @@ func (g *decoder) Get(ctx context.Context, addr swarm.Address) (swarm.Chunk, err if !ok { return nil, storage.ErrNotFound } - err := g.fetch(ctx, i, true) + err := g.fetch(ctx, i, g.prefetchTriggered) if err != nil { return nil, err } @@ -144,15 +132,16 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e fctx, cancel := context.WithTimeout(ctx, g.config.FetchTimeout) defer cancel() - g.wg.Add(1) - go func() { - select { - case <-fctx.Done(): // local context - case <-g.ctx.Done(): // global context - } - cancel() - g.wg.Done() - }() + // when the already running the recovery process terminates, any inflight requests can be canceled. + // we do the extra bool check to not fire an unnecessary goroutine + if waitForRecovery { + g.wg.Add(1) + go func() { + defer g.wg.Done() + defer cancel() + _ = waitRecovery(nil) + }() + } // retrieval ch, err := g.fetcher.Get(fctx, g.addrs[i]) @@ -181,9 +170,18 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e return waitRecovery(storage.ErrNotFound) } -func (g *decoder) prefetch(ctx context.Context) (err error) { - defer g.remove(err) - defer g.cancel() +func (g *decoder) prefetch() { + + var err error + defer func() { + if err != nil { + close(g.badRecovery) + } + g.remove(err) + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() run := func(s Strategy) error { if err := g.runStrategy(ctx, s); err != nil { @@ -194,7 +192,6 @@ func (g *decoder) prefetch(ctx context.Context) (err error) { } for s := g.config.Strategy; s < strategyCnt; s++ { - err = run(s) if err != nil { if s == DATA || s == RACE { @@ -206,19 +203,12 @@ func (g *decoder) prefetch(ctx context.Context) (err error) { g.logger.Debug("successful recovery", "strategy", s) } close(g.goodRecovery) - break + return } if g.config.Strict { // only run one strategy - break + return } } - - if err != nil { - close(g.badRecovery) - return err - } - - return err } func (g *decoder) runStrategy(ctx context.Context, s Strategy) error { @@ -379,13 +369,3 @@ func (g *decoder) save(ctx context.Context, missing []int) error { } return nil } - -// Close terminates the prefetch loop, waits for all goroutines to finish and -// removes the decoder from the cache -// it implements the io.Closer interface -func (g *decoder) Close() error { - g.cancel() - g.wg.Wait() - g.remove(nil) - return nil -} diff --git a/pkg/file/redundancy/getter/getter_test.go b/pkg/file/redundancy/getter/getter_test.go index f517c19c4c5..73679659ff3 100644 --- a/pkg/file/redundancy/getter/getter_test.go +++ b/pkg/file/redundancy/getter/getter_test.go @@ -23,7 +23,6 @@ import ( inmem "github.com/ethersphere/bee/pkg/storage/inmemchunkstore" mockstorer "github.com/ethersphere/bee/pkg/storer/mock" "github.com/ethersphere/bee/pkg/swarm" - "github.com/ethersphere/bee/pkg/util/testutil" "github.com/klauspost/reedsolomon" "golang.org/x/sync/errgroup" ) @@ -112,7 +111,6 @@ func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) { } g := getter.New(addrs, shardCnt, store, store, func(error) {}, getter.DefaultConfig) - testutil.CleanupCloser(t, g) parityCnt := len(buf) - shardCnt _, err := g.Get(context.Background(), addr) @@ -176,7 +174,6 @@ func testDecodingFallback(t *testing.T, s getter.Strategy, strict bool) { FetchTimeout: strategyTimeout / 2, } g := getter.New(addrs, shardCnt, store, store, func(error) {}, conf) - defer g.Close() // launch delayed and erased chunk retrieval wg := sync.WaitGroup{}