From ec435ed57386a7a938e15aba3a3458de4d4cbe8c Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 5 Mar 2024 20:04:33 +0300 Subject: [PATCH 1/4] perf(getter): cancel requests if enough chunks are fetched for recovery --- pkg/file/joiner/joiner.go | 10 ++++++++-- pkg/file/redundancy/getter/getter.go | 23 ++++++++++++++++------- pkg/file/redundancy/getter/getter_test.go | 4 ++-- 3 files changed, 26 insertions(+), 11 deletions(-) diff --git a/pkg/file/joiner/joiner.go b/pkg/file/joiner/joiner.go index 69ae16065bd..1d64644b553 100644 --- a/pkg/file/joiner/joiner.go +++ b/pkg/file/joiner/joiner.go @@ -80,10 +80,16 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage. } return d } - remove := func() { + remove := func(err error) { g.mu.Lock() defer g.mu.Unlock() - g.cache[key] = nil + if err != nil { + // signals that a new getter is needed to reattempt to recover the data + delete(g.cache, key) + } else { + // signals that the chunks were fetched/recovered/cached so a future getter is not needed + g.cache[key] = nil + } } d = getter.New(addrs, shardCnt, g.fetcher, g.putter, remove, g.config) g.cache[key] = d diff --git a/pkg/file/redundancy/getter/getter.go b/pkg/file/redundancy/getter/getter.go index d0082212d8a..2296d3f4a06 100644 --- a/pkg/file/redundancy/getter/getter.go +++ b/pkg/file/redundancy/getter/getter.go @@ -26,6 +26,7 @@ var ( // if retrieves children of an intermediate chunk potentially using erasure decoding // it caches sibling chunks if erasure decoding started already type decoder struct { + ctx context.Context fetcher storage.Getter // network retrieval interface to fetch chunks putter storage.Putter // interface to local storage to save reconstructed chunks addrs []swarm.Address // all addresses of the intermediate chunk @@ -44,7 +45,7 @@ type decoder struct { fetchedCnt atomic.Int32 // count successful retrievals failedCnt atomic.Int32 // count successful retrievals cancel func() // cancel function for RS decoding - remove func() // callback to remove decoder from decoders cache + remove func(error) // callback to remove decoder from decoders cache config Config // configuration logger log.Logger } @@ -55,11 +56,12 @@ type Getter interface { } // New returns a decoder object used to retrieve children of an intermediate chunk -func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(), conf Config) Getter { +func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(error), conf Config) Getter { ctx, cancel := context.WithCancel(context.Background()) size := len(addrs) d := &decoder{ + ctx: ctx, fetcher: g, putter: p, addrs: addrs, @@ -143,7 +145,14 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e defer cancel() g.wg.Add(1) - defer g.wg.Done() + go func() { + select { + case <-fctx.Done(): // local context + case <-g.ctx.Done(): // global context + } + cancel() + g.wg.Done() + }() // retrieval ch, err := g.fetcher.Get(fctx, g.addrs[i]) @@ -172,8 +181,9 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e return waitRecovery(storage.ErrNotFound) } -func (g *decoder) prefetch(ctx context.Context) error { - defer g.remove() +func (g *decoder) prefetch(ctx context.Context) (err error) { + defer g.remove(err) + defer g.cancel() run := func(s Strategy) error { if err := g.runStrategy(ctx, s); err != nil { @@ -183,7 +193,6 @@ func (g *decoder) prefetch(ctx context.Context) error { return g.recover(ctx) } - var err error for s := g.config.Strategy; s < strategyCnt; s++ { err = run(s) @@ -377,6 +386,6 @@ func (g *decoder) save(ctx context.Context, missing []int) error { func (g *decoder) Close() error { g.cancel() g.wg.Wait() - g.remove() + g.remove(nil) return nil } diff --git a/pkg/file/redundancy/getter/getter_test.go b/pkg/file/redundancy/getter/getter_test.go index 95609fc4c1e..ee1132277cb 100644 --- a/pkg/file/redundancy/getter/getter_test.go +++ b/pkg/file/redundancy/getter/getter_test.go @@ -123,7 +123,7 @@ func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) { StrategyTimeout: strategyTimeout, Logger: log.Noop, } - g := getter.New(addrs, shardCnt, store, store, func() {}, conf) + g := getter.New(addrs, shardCnt, store, store, func(error) {}, conf) defer g.Close() parityCnt := len(buf) - shardCnt q := make(chan error, 1) @@ -199,7 +199,7 @@ func testDecodingFallback(t *testing.T, s getter.Strategy, strict bool) { FetchTimeout: strategyTimeout / 2, StrategyTimeout: strategyTimeout, } - g := getter.New(addrs, shardCnt, store, store, func() {}, conf) + g := getter.New(addrs, shardCnt, store, store, func(error) {}, conf) defer g.Close() // launch delayed and erased chunk retrieval From d7843f525219b8d14a7aa70cc30ccd1aeec893fc Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 6 Mar 2024 02:00:08 +0300 Subject: [PATCH 2/4] fix: asd --- pkg/api/bzz.go | 8 +--- pkg/file/joiner/joiner_test.go | 32 ++++++------- pkg/file/redundancy/getter/getter.go | 4 +- pkg/file/redundancy/getter/getter_test.go | 20 ++++---- pkg/file/redundancy/getter/strategies.go | 58 +++++++---------------- 5 files changed, 44 insertions(+), 78 deletions(-) diff --git a/pkg/api/bzz.go b/pkg/api/bzz.go index 07956fe5dc6..f65ce0365e0 100644 --- a/pkg/api/bzz.go +++ b/pkg/api/bzz.go @@ -343,10 +343,8 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV ls := loadsave.NewReadonly(s.storer.Download(cache)) feedDereferenced := false - strategyTimeout := getter.DefaultStrategyTimeout.String() - ctx := r.Context() - ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout, logger) + ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, logger) if err != nil { logger.Error(err, err.Error()) jsonhttp.BadRequest(w, "could not parse headers") @@ -537,10 +535,8 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h cache = *headers.Cache } - strategyTimeout := getter.DefaultStrategyTimeout.String() - ctx := r.Context() - ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout, logger) + ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, logger) if err != nil { logger.Error(err, err.Error()) jsonhttp.BadRequest(w, "could not parse headers") diff --git a/pkg/file/joiner/joiner_test.go b/pkg/file/joiner/joiner_test.go index bfb6e4d2cd6..996229b1bcd 100644 --- a/pkg/file/joiner/joiner_test.go +++ b/pkg/file/joiner/joiner_test.go @@ -1109,17 +1109,15 @@ func TestJoinerRedundancy(t *testing.T) { if err != nil { t.Fatal(err) } - strategyTimeout := 100 * time.Millisecond // all data can be read back readCheck := func(t *testing.T, expErr error) { ctx := context.Background() - strategyTimeoutStr := strategyTimeout.String() - decodeTimeoutStr := (10 * strategyTimeout).String() + decodeTimeoutStr := time.Second.String() fallback := true s := getter.RACE - ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodeTimeoutStr, &strategyTimeoutStr, log.Noop) + ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodeTimeoutStr, log.Noop) if err != nil { t.Fatal(err) } @@ -1229,7 +1227,7 @@ func TestJoinerRedundancy(t *testing.T) { // nolint:thelper func TestJoinerRedundancyMultilevel(t *testing.T) { t.Parallel() - test := func(t *testing.T, rLevel redundancy.Level, encrypt bool, levels, size int) { + test := func(t *testing.T, rLevel redundancy.Level, encrypt bool, size int) { t.Helper() store := mockstorer.NewForgettingStore(inmemchunkstore.New()) testutil.CleanupCloser(t, store) @@ -1249,14 +1247,12 @@ func TestJoinerRedundancyMultilevel(t *testing.T) { expRead := swarm.ChunkSize buf := make([]byte, expRead) offset := mrand.Intn(size) * expRead - canReadRange := func(t *testing.T, s getter.Strategy, fallback bool, levels int, canRead bool) { + canReadRange := func(t *testing.T, s getter.Strategy, fallback bool, canRead bool) { ctx := context.Background() - strategyTimeout := 100 * time.Millisecond - strategyTimeoutStr := strategyTimeout.String() - decodingTimeoutStr := (2 * strategyTimeout).String() + decodingTimeoutStr := (200 * time.Millisecond).String() - ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodingTimeoutStr, &strategyTimeoutStr, log.Noop) + ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodingTimeoutStr, log.Noop) if err != nil { t.Fatal(err) } @@ -1295,35 +1291,35 @@ func TestJoinerRedundancyMultilevel(t *testing.T) { t.Run("NONE w/o fallback CAN retrieve", func(t *testing.T) { store.Record() defer store.Unrecord() - canReadRange(t, getter.NONE, false, levels, true) + canReadRange(t, getter.NONE, false, true) }) // do not forget the root chunk store.Unmiss(swarm.NewAddress(addr.Bytes()[:swarm.HashSize])) // after we forget the chunks on the way to the range, we should not be able to retrieve t.Run("NONE w/o fallback CANNOT retrieve", func(t *testing.T) { - canReadRange(t, getter.NONE, false, levels, false) + canReadRange(t, getter.NONE, false, false) }) // we lost a data chunk, we cannot recover using DATA only strategy with no fallback t.Run("DATA w/o fallback CANNOT retrieve", func(t *testing.T) { - canReadRange(t, getter.DATA, false, levels, false) + canReadRange(t, getter.DATA, false, false) }) if rLevel == 0 { // allowing fallback mode will not help if no redundancy used for upload t.Run("DATA w fallback CANNOT retrieve", func(t *testing.T) { - canReadRange(t, getter.DATA, true, levels, false) + canReadRange(t, getter.DATA, true, false) }) return } // allowing fallback mode will make the range retrievable using erasure decoding t.Run("DATA w fallback CAN retrieve", func(t *testing.T) { - canReadRange(t, getter.DATA, true, levels, true) + canReadRange(t, getter.DATA, true, true) }) // after the reconstructed data is stored, we can retrieve the range using DATA only mode t.Run("after recovery, NONE w/o fallback CAN retrieve", func(t *testing.T) { - canReadRange(t, getter.NONE, false, levels, true) + canReadRange(t, getter.NONE, false, true) }) } r2level := []int{2, 1, 2, 3, 2} @@ -1353,7 +1349,7 @@ func TestJoinerRedundancyMultilevel(t *testing.T) { if r2level[rLevel] != levels || encrypt != encryptChunk[rLevel] { t.Skip("skipping to save time") } - test(t, rLevel, encrypt, levels, chunkCnt) + test(t, rLevel, encrypt, chunkCnt) }) switch levels { case 1: @@ -1364,7 +1360,7 @@ func TestJoinerRedundancyMultilevel(t *testing.T) { continue } t.Run(fmt.Sprintf("encrypt=%v levels=%d chunks=%d full", encrypt, levels, chunkCnt), func(t *testing.T) { - test(t, rLevel, encrypt, levels, chunkCnt) + test(t, rLevel, encrypt, chunkCnt) }) } } diff --git a/pkg/file/redundancy/getter/getter.go b/pkg/file/redundancy/getter/getter.go index 2296d3f4a06..230ffbb63eb 100644 --- a/pkg/file/redundancy/getter/getter.go +++ b/pkg/file/redundancy/getter/getter.go @@ -41,7 +41,6 @@ type decoder struct { parityCnt int // number of parity shards wg sync.WaitGroup // wait group to wait for all goroutines to finish mu sync.Mutex // mutex to protect buffer - err error // error of the last erasure decoding fetchedCnt atomic.Int32 // count successful retrievals failedCnt atomic.Int32 // count successful retrievals cancel func() // cancel function for RS decoding @@ -57,6 +56,7 @@ type Getter interface { // New returns a decoder object used to retrieve children of an intermediate chunk func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(error), conf Config) Getter { + // global context is canceled when the Close is called or when the prefetch terminates ctx, cancel := context.WithCancel(context.Background()) size := len(addrs) @@ -94,7 +94,7 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter d.wg.Add(1) go func() { defer d.wg.Done() - d.err = d.prefetch(ctx) + _ = d.prefetch(ctx) }() } else { // recovery not allowed close(d.badRecovery) diff --git a/pkg/file/redundancy/getter/getter_test.go b/pkg/file/redundancy/getter/getter_test.go index ee1132277cb..0dfe26a85bb 100644 --- a/pkg/file/redundancy/getter/getter_test.go +++ b/pkg/file/redundancy/getter/getter_test.go @@ -96,9 +96,9 @@ func TestGetterFallback(t *testing.T) { func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) { t.Helper() - strategyTimeout := 100 * time.Millisecond + timeout := 100 * time.Millisecond if racedetection.On { - strategyTimeout *= 2 + timeout *= 2 } store := inmem.New() buf := make([][]byte, bufSize) @@ -118,10 +118,9 @@ func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() conf := getter.Config{ - Strategy: getter.RACE, - FetchTimeout: 2 * strategyTimeout, - StrategyTimeout: strategyTimeout, - Logger: log.Noop, + Strategy: getter.RACE, + FetchTimeout: 2 * timeout, + Logger: log.Noop, } g := getter.New(addrs, shardCnt, store, store, func(error) {}, conf) defer g.Close() @@ -132,7 +131,7 @@ func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) { q <- err }() err := context.DeadlineExceeded - wait := strategyTimeout * 2 + wait := timeout * 2 if racedetection.On { wait *= 2 } @@ -194,10 +193,9 @@ func testDecodingFallback(t *testing.T, s getter.Strategy, strict bool) { // create getter start := time.Now() conf := getter.Config{ - Strategy: s, - Strict: strict, - FetchTimeout: strategyTimeout / 2, - StrategyTimeout: strategyTimeout, + Strategy: s, + Strict: strict, + FetchTimeout: strategyTimeout / 2, } g := getter.New(addrs, shardCnt, store, store, func(error) {}, conf) defer g.Close() diff --git a/pkg/file/redundancy/getter/strategies.go b/pkg/file/redundancy/getter/strategies.go index 7387096c4c6..f61632a5370 100644 --- a/pkg/file/redundancy/getter/strategies.go +++ b/pkg/file/redundancy/getter/strategies.go @@ -14,28 +14,25 @@ import ( ) const ( - DefaultStrategy = DATA // default prefetching strategy - DefaultStrict = false // default fallback modes - DefaultFetchTimeout = retrieval.RetrieveChunkTimeout // timeout for each chunk retrieval - DefaultStrategyTimeout = 300 * time.Millisecond // timeout for each strategy + DefaultStrategy = DATA // default prefetching strategy + DefaultStrict = false // default fallback modes + DefaultFetchTimeout = retrieval.RetrieveChunkTimeout // timeout for each chunk retrieval ) type ( - strategyKey struct{} - modeKey struct{} - fetchTimeoutKey struct{} - strategyTimeoutKey struct{} - loggerKey struct{} - Strategy = int + strategyKey struct{} + modeKey struct{} + fetchTimeoutKey struct{} + loggerKey struct{} + Strategy = int ) // Config is the configuration for the getter - public type Config struct { - Strategy Strategy - Strict bool - FetchTimeout time.Duration - StrategyTimeout time.Duration - Logger log.Logger + Strategy Strategy + Strict bool + FetchTimeout time.Duration + Logger log.Logger } const ( @@ -48,11 +45,10 @@ const ( // DefaultConfig is the default configuration for the getter var DefaultConfig = Config{ - Strategy: DefaultStrategy, - Strict: DefaultStrict, - FetchTimeout: DefaultFetchTimeout, - StrategyTimeout: DefaultStrategyTimeout, - Logger: log.Noop, + Strategy: DefaultStrategy, + Strict: DefaultStrict, + FetchTimeout: DefaultFetchTimeout, + Logger: log.Noop, } // NewConfigFromContext returns a new Config based on the context @@ -80,12 +76,6 @@ func NewConfigFromContext(ctx context.Context, def Config) (conf Config, err err return conf, e("fetcher timeout") } } - if val := ctx.Value(strategyTimeoutKey{}); val != nil { - conf.StrategyTimeout, ok = val.(time.Duration) - if !ok { - return conf, e("strategy timeout") - } - } if val := ctx.Value(loggerKey{}); val != nil { conf.Logger, ok = val.(log.Logger) if !ok { @@ -111,18 +101,12 @@ func SetFetchTimeout(ctx context.Context, timeout time.Duration) context.Context return context.WithValue(ctx, fetchTimeoutKey{}, timeout) } -// SetStrategyTimeout sets the timeout for each strategy -func SetStrategyTimeout(ctx context.Context, timeout time.Duration) context.Context { - return context.WithValue(ctx, strategyTimeoutKey{}, timeout) -} - -// SetStrategyTimeout sets the timeout for each strategy func SetLogger(ctx context.Context, l log.Logger) context.Context { return context.WithValue(ctx, loggerKey{}, l) } // SetConfigInContext sets the config params in the context -func SetConfigInContext(ctx context.Context, s *Strategy, fallbackmode *bool, fetchTimeout, strategyTimeout *string, logger log.Logger) (context.Context, error) { +func SetConfigInContext(ctx context.Context, s *Strategy, fallbackmode *bool, fetchTimeout *string, logger log.Logger) (context.Context, error) { if s != nil { ctx = SetStrategy(ctx, *s) } @@ -139,14 +123,6 @@ func SetConfigInContext(ctx context.Context, s *Strategy, fallbackmode *bool, fe ctx = SetFetchTimeout(ctx, dur) } - if strategyTimeout != nil { - dur, err := time.ParseDuration(*strategyTimeout) - if err != nil { - return nil, err - } - ctx = SetStrategyTimeout(ctx, dur) - } - if logger != nil { ctx = SetLogger(ctx, logger) } From be79fa55547b450f57316066e8a14ef5f4a36cde Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 6 Mar 2024 15:46:21 +0300 Subject: [PATCH 3/4] fix: unit test --- pkg/file/redundancy/getter/getter_test.go | 36 +++++------------------ 1 file changed, 7 insertions(+), 29 deletions(-) diff --git a/pkg/file/redundancy/getter/getter_test.go b/pkg/file/redundancy/getter/getter_test.go index 0dfe26a85bb..f517c19c4c5 100644 --- a/pkg/file/redundancy/getter/getter_test.go +++ b/pkg/file/redundancy/getter/getter_test.go @@ -19,12 +19,11 @@ import ( "github.com/ethersphere/bee/pkg/cac" "github.com/ethersphere/bee/pkg/file/redundancy/getter" - "github.com/ethersphere/bee/pkg/log" "github.com/ethersphere/bee/pkg/storage" inmem "github.com/ethersphere/bee/pkg/storage/inmemchunkstore" mockstorer "github.com/ethersphere/bee/pkg/storer/mock" "github.com/ethersphere/bee/pkg/swarm" - "github.com/ethersphere/bee/pkg/util/testutil/racedetection" + "github.com/ethersphere/bee/pkg/util/testutil" "github.com/klauspost/reedsolomon" "golang.org/x/sync/errgroup" ) @@ -96,10 +95,6 @@ func TestGetterFallback(t *testing.T) { func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) { t.Helper() - timeout := 100 * time.Millisecond - if racedetection.On { - timeout *= 2 - } store := inmem.New() buf := make([][]byte, bufSize) addrs := initData(t, buf, shardCnt, store) @@ -115,30 +110,13 @@ func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) { if len(addr.Bytes()) == 0 { t.Skip("no data shard erased") } - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - conf := getter.Config{ - Strategy: getter.RACE, - FetchTimeout: 2 * timeout, - Logger: log.Noop, - } - g := getter.New(addrs, shardCnt, store, store, func(error) {}, conf) - defer g.Close() + + g := getter.New(addrs, shardCnt, store, store, func(error) {}, getter.DefaultConfig) + testutil.CleanupCloser(t, g) + parityCnt := len(buf) - shardCnt - q := make(chan error, 1) - go func() { - _, err := g.Get(ctx, addr) - q <- err - }() - err := context.DeadlineExceeded - wait := timeout * 2 - if racedetection.On { - wait *= 2 - } - select { - case err = <-q: - case <-time.After(wait): - } + _, err := g.Get(context.Background(), addr) + switch { case erasureCnt > parityCnt: t.Run("unable to recover", func(t *testing.T) { From ff4ecc6969fc2b8525a86fa01856e3d669f96e1b Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 6 Mar 2024 16:32:34 +0300 Subject: [PATCH 4/4] fix: flaky --- pkg/api/bzz_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/api/bzz_test.go b/pkg/api/bzz_test.go index 34955fe580f..a589ff4304e 100644 --- a/pkg/api/bzz_test.go +++ b/pkg/api/bzz_test.go @@ -62,7 +62,7 @@ import ( // using redundancy to reconstruct the file and find the file recoverable. // // nolint:thelper -func TestBzzUploadDownloadWithRedundancy(t *testing.T) { +func TestBzzUploadDownloadWithRedundancy_FLAKY(t *testing.T) { t.Parallel() fileUploadResource := "/bzz" fileDownloadResource := func(addr string) string { return "/bzz/" + addr + "/" }