From d7843f525219b8d14a7aa70cc30ccd1aeec893fc Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 6 Mar 2024 02:00:08 +0300 Subject: [PATCH] fix: asd --- pkg/api/bzz.go | 8 +--- pkg/file/joiner/joiner_test.go | 32 ++++++------- pkg/file/redundancy/getter/getter.go | 4 +- pkg/file/redundancy/getter/getter_test.go | 20 ++++---- pkg/file/redundancy/getter/strategies.go | 58 +++++++---------------- 5 files changed, 44 insertions(+), 78 deletions(-) diff --git a/pkg/api/bzz.go b/pkg/api/bzz.go index 07956fe5dc6..f65ce0365e0 100644 --- a/pkg/api/bzz.go +++ b/pkg/api/bzz.go @@ -343,10 +343,8 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV ls := loadsave.NewReadonly(s.storer.Download(cache)) feedDereferenced := false - strategyTimeout := getter.DefaultStrategyTimeout.String() - ctx := r.Context() - ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout, logger) + ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, logger) if err != nil { logger.Error(err, err.Error()) jsonhttp.BadRequest(w, "could not parse headers") @@ -537,10 +535,8 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h cache = *headers.Cache } - strategyTimeout := getter.DefaultStrategyTimeout.String() - ctx := r.Context() - ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout, logger) + ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, logger) if err != nil { logger.Error(err, err.Error()) jsonhttp.BadRequest(w, "could not parse headers") diff --git a/pkg/file/joiner/joiner_test.go b/pkg/file/joiner/joiner_test.go index bfb6e4d2cd6..996229b1bcd 100644 --- a/pkg/file/joiner/joiner_test.go +++ b/pkg/file/joiner/joiner_test.go @@ -1109,17 +1109,15 @@ func TestJoinerRedundancy(t *testing.T) { if err != nil { t.Fatal(err) } - strategyTimeout := 100 * time.Millisecond // all data can be read back readCheck := func(t *testing.T, expErr error) { ctx := context.Background() - strategyTimeoutStr := strategyTimeout.String() - decodeTimeoutStr := (10 * strategyTimeout).String() + decodeTimeoutStr := time.Second.String() fallback := true s := getter.RACE - ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodeTimeoutStr, &strategyTimeoutStr, log.Noop) + ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodeTimeoutStr, log.Noop) if err != nil { t.Fatal(err) } @@ -1229,7 +1227,7 @@ func TestJoinerRedundancy(t *testing.T) { // nolint:thelper func TestJoinerRedundancyMultilevel(t *testing.T) { t.Parallel() - test := func(t *testing.T, rLevel redundancy.Level, encrypt bool, levels, size int) { + test := func(t *testing.T, rLevel redundancy.Level, encrypt bool, size int) { t.Helper() store := mockstorer.NewForgettingStore(inmemchunkstore.New()) testutil.CleanupCloser(t, store) @@ -1249,14 +1247,12 @@ func TestJoinerRedundancyMultilevel(t *testing.T) { expRead := swarm.ChunkSize buf := make([]byte, expRead) offset := mrand.Intn(size) * expRead - canReadRange := func(t *testing.T, s getter.Strategy, fallback bool, levels int, canRead bool) { + canReadRange := func(t *testing.T, s getter.Strategy, fallback bool, canRead bool) { ctx := context.Background() - strategyTimeout := 100 * time.Millisecond - strategyTimeoutStr := strategyTimeout.String() - decodingTimeoutStr := (2 * strategyTimeout).String() + decodingTimeoutStr := (200 * time.Millisecond).String() - ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodingTimeoutStr, &strategyTimeoutStr, log.Noop) + ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodingTimeoutStr, log.Noop) if err != nil { t.Fatal(err) } @@ -1295,35 +1291,35 @@ func TestJoinerRedundancyMultilevel(t *testing.T) { t.Run("NONE w/o fallback CAN retrieve", func(t *testing.T) { store.Record() defer store.Unrecord() - canReadRange(t, getter.NONE, false, levels, true) + canReadRange(t, getter.NONE, false, true) }) // do not forget the root chunk store.Unmiss(swarm.NewAddress(addr.Bytes()[:swarm.HashSize])) // after we forget the chunks on the way to the range, we should not be able to retrieve t.Run("NONE w/o fallback CANNOT retrieve", func(t *testing.T) { - canReadRange(t, getter.NONE, false, levels, false) + canReadRange(t, getter.NONE, false, false) }) // we lost a data chunk, we cannot recover using DATA only strategy with no fallback t.Run("DATA w/o fallback CANNOT retrieve", func(t *testing.T) { - canReadRange(t, getter.DATA, false, levels, false) + canReadRange(t, getter.DATA, false, false) }) if rLevel == 0 { // allowing fallback mode will not help if no redundancy used for upload t.Run("DATA w fallback CANNOT retrieve", func(t *testing.T) { - canReadRange(t, getter.DATA, true, levels, false) + canReadRange(t, getter.DATA, true, false) }) return } // allowing fallback mode will make the range retrievable using erasure decoding t.Run("DATA w fallback CAN retrieve", func(t *testing.T) { - canReadRange(t, getter.DATA, true, levels, true) + canReadRange(t, getter.DATA, true, true) }) // after the reconstructed data is stored, we can retrieve the range using DATA only mode t.Run("after recovery, NONE w/o fallback CAN retrieve", func(t *testing.T) { - canReadRange(t, getter.NONE, false, levels, true) + canReadRange(t, getter.NONE, false, true) }) } r2level := []int{2, 1, 2, 3, 2} @@ -1353,7 +1349,7 @@ func TestJoinerRedundancyMultilevel(t *testing.T) { if r2level[rLevel] != levels || encrypt != encryptChunk[rLevel] { t.Skip("skipping to save time") } - test(t, rLevel, encrypt, levels, chunkCnt) + test(t, rLevel, encrypt, chunkCnt) }) switch levels { case 1: @@ -1364,7 +1360,7 @@ func TestJoinerRedundancyMultilevel(t *testing.T) { continue } t.Run(fmt.Sprintf("encrypt=%v levels=%d chunks=%d full", encrypt, levels, chunkCnt), func(t *testing.T) { - test(t, rLevel, encrypt, levels, chunkCnt) + test(t, rLevel, encrypt, chunkCnt) }) } } diff --git a/pkg/file/redundancy/getter/getter.go b/pkg/file/redundancy/getter/getter.go index 2296d3f4a06..230ffbb63eb 100644 --- a/pkg/file/redundancy/getter/getter.go +++ b/pkg/file/redundancy/getter/getter.go @@ -41,7 +41,6 @@ type decoder struct { parityCnt int // number of parity shards wg sync.WaitGroup // wait group to wait for all goroutines to finish mu sync.Mutex // mutex to protect buffer - err error // error of the last erasure decoding fetchedCnt atomic.Int32 // count successful retrievals failedCnt atomic.Int32 // count successful retrievals cancel func() // cancel function for RS decoding @@ -57,6 +56,7 @@ type Getter interface { // New returns a decoder object used to retrieve children of an intermediate chunk func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(error), conf Config) Getter { + // global context is canceled when the Close is called or when the prefetch terminates ctx, cancel := context.WithCancel(context.Background()) size := len(addrs) @@ -94,7 +94,7 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter d.wg.Add(1) go func() { defer d.wg.Done() - d.err = d.prefetch(ctx) + _ = d.prefetch(ctx) }() } else { // recovery not allowed close(d.badRecovery) diff --git a/pkg/file/redundancy/getter/getter_test.go b/pkg/file/redundancy/getter/getter_test.go index ee1132277cb..0dfe26a85bb 100644 --- a/pkg/file/redundancy/getter/getter_test.go +++ b/pkg/file/redundancy/getter/getter_test.go @@ -96,9 +96,9 @@ func TestGetterFallback(t *testing.T) { func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) { t.Helper() - strategyTimeout := 100 * time.Millisecond + timeout := 100 * time.Millisecond if racedetection.On { - strategyTimeout *= 2 + timeout *= 2 } store := inmem.New() buf := make([][]byte, bufSize) @@ -118,10 +118,9 @@ func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() conf := getter.Config{ - Strategy: getter.RACE, - FetchTimeout: 2 * strategyTimeout, - StrategyTimeout: strategyTimeout, - Logger: log.Noop, + Strategy: getter.RACE, + FetchTimeout: 2 * timeout, + Logger: log.Noop, } g := getter.New(addrs, shardCnt, store, store, func(error) {}, conf) defer g.Close() @@ -132,7 +131,7 @@ func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) { q <- err }() err := context.DeadlineExceeded - wait := strategyTimeout * 2 + wait := timeout * 2 if racedetection.On { wait *= 2 } @@ -194,10 +193,9 @@ func testDecodingFallback(t *testing.T, s getter.Strategy, strict bool) { // create getter start := time.Now() conf := getter.Config{ - Strategy: s, - Strict: strict, - FetchTimeout: strategyTimeout / 2, - StrategyTimeout: strategyTimeout, + Strategy: s, + Strict: strict, + FetchTimeout: strategyTimeout / 2, } g := getter.New(addrs, shardCnt, store, store, func(error) {}, conf) defer g.Close() diff --git a/pkg/file/redundancy/getter/strategies.go b/pkg/file/redundancy/getter/strategies.go index 7387096c4c6..f61632a5370 100644 --- a/pkg/file/redundancy/getter/strategies.go +++ b/pkg/file/redundancy/getter/strategies.go @@ -14,28 +14,25 @@ import ( ) const ( - DefaultStrategy = DATA // default prefetching strategy - DefaultStrict = false // default fallback modes - DefaultFetchTimeout = retrieval.RetrieveChunkTimeout // timeout for each chunk retrieval - DefaultStrategyTimeout = 300 * time.Millisecond // timeout for each strategy + DefaultStrategy = DATA // default prefetching strategy + DefaultStrict = false // default fallback modes + DefaultFetchTimeout = retrieval.RetrieveChunkTimeout // timeout for each chunk retrieval ) type ( - strategyKey struct{} - modeKey struct{} - fetchTimeoutKey struct{} - strategyTimeoutKey struct{} - loggerKey struct{} - Strategy = int + strategyKey struct{} + modeKey struct{} + fetchTimeoutKey struct{} + loggerKey struct{} + Strategy = int ) // Config is the configuration for the getter - public type Config struct { - Strategy Strategy - Strict bool - FetchTimeout time.Duration - StrategyTimeout time.Duration - Logger log.Logger + Strategy Strategy + Strict bool + FetchTimeout time.Duration + Logger log.Logger } const ( @@ -48,11 +45,10 @@ const ( // DefaultConfig is the default configuration for the getter var DefaultConfig = Config{ - Strategy: DefaultStrategy, - Strict: DefaultStrict, - FetchTimeout: DefaultFetchTimeout, - StrategyTimeout: DefaultStrategyTimeout, - Logger: log.Noop, + Strategy: DefaultStrategy, + Strict: DefaultStrict, + FetchTimeout: DefaultFetchTimeout, + Logger: log.Noop, } // NewConfigFromContext returns a new Config based on the context @@ -80,12 +76,6 @@ func NewConfigFromContext(ctx context.Context, def Config) (conf Config, err err return conf, e("fetcher timeout") } } - if val := ctx.Value(strategyTimeoutKey{}); val != nil { - conf.StrategyTimeout, ok = val.(time.Duration) - if !ok { - return conf, e("strategy timeout") - } - } if val := ctx.Value(loggerKey{}); val != nil { conf.Logger, ok = val.(log.Logger) if !ok { @@ -111,18 +101,12 @@ func SetFetchTimeout(ctx context.Context, timeout time.Duration) context.Context return context.WithValue(ctx, fetchTimeoutKey{}, timeout) } -// SetStrategyTimeout sets the timeout for each strategy -func SetStrategyTimeout(ctx context.Context, timeout time.Duration) context.Context { - return context.WithValue(ctx, strategyTimeoutKey{}, timeout) -} - -// SetStrategyTimeout sets the timeout for each strategy func SetLogger(ctx context.Context, l log.Logger) context.Context { return context.WithValue(ctx, loggerKey{}, l) } // SetConfigInContext sets the config params in the context -func SetConfigInContext(ctx context.Context, s *Strategy, fallbackmode *bool, fetchTimeout, strategyTimeout *string, logger log.Logger) (context.Context, error) { +func SetConfigInContext(ctx context.Context, s *Strategy, fallbackmode *bool, fetchTimeout *string, logger log.Logger) (context.Context, error) { if s != nil { ctx = SetStrategy(ctx, *s) } @@ -139,14 +123,6 @@ func SetConfigInContext(ctx context.Context, s *Strategy, fallbackmode *bool, fe ctx = SetFetchTimeout(ctx, dur) } - if strategyTimeout != nil { - dur, err := time.ParseDuration(*strategyTimeout) - if err != nil { - return nil, err - } - ctx = SetStrategyTimeout(ctx, dur) - } - if logger != nil { ctx = SetLogger(ctx, logger) }