Skip to content

Commit

Permalink
fix: asd
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Mar 5, 2024
1 parent ec435ed commit d7843f5
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 78 deletions.
8 changes: 2 additions & 6 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,8 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV
ls := loadsave.NewReadonly(s.storer.Download(cache))
feedDereferenced := false

strategyTimeout := getter.DefaultStrategyTimeout.String()

ctx := r.Context()
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout, logger)
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, logger)
if err != nil {
logger.Error(err, err.Error())
jsonhttp.BadRequest(w, "could not parse headers")
Expand Down Expand Up @@ -537,10 +535,8 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h
cache = *headers.Cache
}

strategyTimeout := getter.DefaultStrategyTimeout.String()

ctx := r.Context()
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout, logger)
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, logger)
if err != nil {
logger.Error(err, err.Error())
jsonhttp.BadRequest(w, "could not parse headers")
Expand Down
32 changes: 14 additions & 18 deletions pkg/file/joiner/joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1109,17 +1109,15 @@ func TestJoinerRedundancy(t *testing.T) {
if err != nil {
t.Fatal(err)
}
strategyTimeout := 100 * time.Millisecond
// all data can be read back
readCheck := func(t *testing.T, expErr error) {
ctx := context.Background()

strategyTimeoutStr := strategyTimeout.String()
decodeTimeoutStr := (10 * strategyTimeout).String()
decodeTimeoutStr := time.Second.String()
fallback := true
s := getter.RACE

ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodeTimeoutStr, &strategyTimeoutStr, log.Noop)
ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodeTimeoutStr, log.Noop)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1229,7 +1227,7 @@ func TestJoinerRedundancy(t *testing.T) {
// nolint:thelper
func TestJoinerRedundancyMultilevel(t *testing.T) {
t.Parallel()
test := func(t *testing.T, rLevel redundancy.Level, encrypt bool, levels, size int) {
test := func(t *testing.T, rLevel redundancy.Level, encrypt bool, size int) {
t.Helper()
store := mockstorer.NewForgettingStore(inmemchunkstore.New())
testutil.CleanupCloser(t, store)
Expand All @@ -1249,14 +1247,12 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
expRead := swarm.ChunkSize
buf := make([]byte, expRead)
offset := mrand.Intn(size) * expRead
canReadRange := func(t *testing.T, s getter.Strategy, fallback bool, levels int, canRead bool) {
canReadRange := func(t *testing.T, s getter.Strategy, fallback bool, canRead bool) {
ctx := context.Background()
strategyTimeout := 100 * time.Millisecond

strategyTimeoutStr := strategyTimeout.String()
decodingTimeoutStr := (2 * strategyTimeout).String()
decodingTimeoutStr := (200 * time.Millisecond).String()

ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodingTimeoutStr, &strategyTimeoutStr, log.Noop)
ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodingTimeoutStr, log.Noop)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1295,35 +1291,35 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
t.Run("NONE w/o fallback CAN retrieve", func(t *testing.T) {
store.Record()
defer store.Unrecord()
canReadRange(t, getter.NONE, false, levels, true)
canReadRange(t, getter.NONE, false, true)
})

// do not forget the root chunk
store.Unmiss(swarm.NewAddress(addr.Bytes()[:swarm.HashSize]))
// after we forget the chunks on the way to the range, we should not be able to retrieve
t.Run("NONE w/o fallback CANNOT retrieve", func(t *testing.T) {
canReadRange(t, getter.NONE, false, levels, false)
canReadRange(t, getter.NONE, false, false)
})

// we lost a data chunk, we cannot recover using DATA only strategy with no fallback
t.Run("DATA w/o fallback CANNOT retrieve", func(t *testing.T) {
canReadRange(t, getter.DATA, false, levels, false)
canReadRange(t, getter.DATA, false, false)
})

if rLevel == 0 {
// allowing fallback mode will not help if no redundancy used for upload
t.Run("DATA w fallback CANNOT retrieve", func(t *testing.T) {
canReadRange(t, getter.DATA, true, levels, false)
canReadRange(t, getter.DATA, true, false)
})
return
}
// allowing fallback mode will make the range retrievable using erasure decoding
t.Run("DATA w fallback CAN retrieve", func(t *testing.T) {
canReadRange(t, getter.DATA, true, levels, true)
canReadRange(t, getter.DATA, true, true)
})
// after the reconstructed data is stored, we can retrieve the range using DATA only mode
t.Run("after recovery, NONE w/o fallback CAN retrieve", func(t *testing.T) {
canReadRange(t, getter.NONE, false, levels, true)
canReadRange(t, getter.NONE, false, true)
})
}
r2level := []int{2, 1, 2, 3, 2}
Expand Down Expand Up @@ -1353,7 +1349,7 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
if r2level[rLevel] != levels || encrypt != encryptChunk[rLevel] {
t.Skip("skipping to save time")
}
test(t, rLevel, encrypt, levels, chunkCnt)
test(t, rLevel, encrypt, chunkCnt)
})
switch levels {
case 1:
Expand All @@ -1364,7 +1360,7 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
continue
}
t.Run(fmt.Sprintf("encrypt=%v levels=%d chunks=%d full", encrypt, levels, chunkCnt), func(t *testing.T) {
test(t, rLevel, encrypt, levels, chunkCnt)
test(t, rLevel, encrypt, chunkCnt)
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/file/redundancy/getter/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type decoder struct {
parityCnt int // number of parity shards
wg sync.WaitGroup // wait group to wait for all goroutines to finish
mu sync.Mutex // mutex to protect buffer
err error // error of the last erasure decoding
fetchedCnt atomic.Int32 // count successful retrievals
failedCnt atomic.Int32 // count successful retrievals
cancel func() // cancel function for RS decoding
Expand All @@ -57,6 +56,7 @@ type Getter interface {

// New returns a decoder object used to retrieve children of an intermediate chunk
func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(error), conf Config) Getter {
// global context is canceled when the Close is called or when the prefetch terminates
ctx, cancel := context.WithCancel(context.Background())
size := len(addrs)

Expand Down Expand Up @@ -94,7 +94,7 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.err = d.prefetch(ctx)
_ = d.prefetch(ctx)
}()
} else { // recovery not allowed
close(d.badRecovery)
Expand Down
20 changes: 9 additions & 11 deletions pkg/file/redundancy/getter/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ func TestGetterFallback(t *testing.T) {

func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) {
t.Helper()
strategyTimeout := 100 * time.Millisecond
timeout := 100 * time.Millisecond
if racedetection.On {
strategyTimeout *= 2
timeout *= 2
}
store := inmem.New()
buf := make([][]byte, bufSize)
Expand All @@ -118,10 +118,9 @@ func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
conf := getter.Config{
Strategy: getter.RACE,
FetchTimeout: 2 * strategyTimeout,
StrategyTimeout: strategyTimeout,
Logger: log.Noop,
Strategy: getter.RACE,
FetchTimeout: 2 * timeout,
Logger: log.Noop,
}
g := getter.New(addrs, shardCnt, store, store, func(error) {}, conf)
defer g.Close()
Expand All @@ -132,7 +131,7 @@ func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) {
q <- err
}()
err := context.DeadlineExceeded
wait := strategyTimeout * 2
wait := timeout * 2
if racedetection.On {
wait *= 2
}
Expand Down Expand Up @@ -194,10 +193,9 @@ func testDecodingFallback(t *testing.T, s getter.Strategy, strict bool) {
// create getter
start := time.Now()
conf := getter.Config{
Strategy: s,
Strict: strict,
FetchTimeout: strategyTimeout / 2,
StrategyTimeout: strategyTimeout,
Strategy: s,
Strict: strict,
FetchTimeout: strategyTimeout / 2,
}
g := getter.New(addrs, shardCnt, store, store, func(error) {}, conf)
defer g.Close()
Expand Down
58 changes: 17 additions & 41 deletions pkg/file/redundancy/getter/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,25 @@ import (
)

const (
DefaultStrategy = DATA // default prefetching strategy
DefaultStrict = false // default fallback modes
DefaultFetchTimeout = retrieval.RetrieveChunkTimeout // timeout for each chunk retrieval
DefaultStrategyTimeout = 300 * time.Millisecond // timeout for each strategy
DefaultStrategy = DATA // default prefetching strategy
DefaultStrict = false // default fallback modes
DefaultFetchTimeout = retrieval.RetrieveChunkTimeout // timeout for each chunk retrieval
)

type (
strategyKey struct{}
modeKey struct{}
fetchTimeoutKey struct{}
strategyTimeoutKey struct{}
loggerKey struct{}
Strategy = int
strategyKey struct{}
modeKey struct{}
fetchTimeoutKey struct{}
loggerKey struct{}
Strategy = int
)

// Config is the configuration for the getter - public
type Config struct {
Strategy Strategy
Strict bool
FetchTimeout time.Duration
StrategyTimeout time.Duration
Logger log.Logger
Strategy Strategy
Strict bool
FetchTimeout time.Duration
Logger log.Logger
}

const (
Expand All @@ -48,11 +45,10 @@ const (

// DefaultConfig is the default configuration for the getter
var DefaultConfig = Config{
Strategy: DefaultStrategy,
Strict: DefaultStrict,
FetchTimeout: DefaultFetchTimeout,
StrategyTimeout: DefaultStrategyTimeout,
Logger: log.Noop,
Strategy: DefaultStrategy,
Strict: DefaultStrict,
FetchTimeout: DefaultFetchTimeout,
Logger: log.Noop,
}

// NewConfigFromContext returns a new Config based on the context
Expand Down Expand Up @@ -80,12 +76,6 @@ func NewConfigFromContext(ctx context.Context, def Config) (conf Config, err err
return conf, e("fetcher timeout")
}
}
if val := ctx.Value(strategyTimeoutKey{}); val != nil {
conf.StrategyTimeout, ok = val.(time.Duration)
if !ok {
return conf, e("strategy timeout")
}
}
if val := ctx.Value(loggerKey{}); val != nil {
conf.Logger, ok = val.(log.Logger)
if !ok {
Expand All @@ -111,18 +101,12 @@ func SetFetchTimeout(ctx context.Context, timeout time.Duration) context.Context
return context.WithValue(ctx, fetchTimeoutKey{}, timeout)
}

// SetStrategyTimeout sets the timeout for each strategy
func SetStrategyTimeout(ctx context.Context, timeout time.Duration) context.Context {
return context.WithValue(ctx, strategyTimeoutKey{}, timeout)
}

// SetStrategyTimeout sets the timeout for each strategy
func SetLogger(ctx context.Context, l log.Logger) context.Context {
return context.WithValue(ctx, loggerKey{}, l)
}

// SetConfigInContext sets the config params in the context
func SetConfigInContext(ctx context.Context, s *Strategy, fallbackmode *bool, fetchTimeout, strategyTimeout *string, logger log.Logger) (context.Context, error) {
func SetConfigInContext(ctx context.Context, s *Strategy, fallbackmode *bool, fetchTimeout *string, logger log.Logger) (context.Context, error) {
if s != nil {
ctx = SetStrategy(ctx, *s)
}
Expand All @@ -139,14 +123,6 @@ func SetConfigInContext(ctx context.Context, s *Strategy, fallbackmode *bool, fe
ctx = SetFetchTimeout(ctx, dur)
}

if strategyTimeout != nil {
dur, err := time.ParseDuration(*strategyTimeout)
if err != nil {
return nil, err
}
ctx = SetStrategyTimeout(ctx, dur)
}

if logger != nil {
ctx = SetLogger(ctx, logger)
}
Expand Down

0 comments on commit d7843f5

Please sign in to comment.