Skip to content

Commit

Permalink
perf(getter): cancel requests if enough chunks are fetched for recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Mar 5, 2024
1 parent f7b3586 commit ec435ed
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 11 deletions.
10 changes: 8 additions & 2 deletions pkg/file/joiner/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,16 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.
}
return d
}
remove := func() {
remove := func(err error) {
g.mu.Lock()
defer g.mu.Unlock()
g.cache[key] = nil
if err != nil {
// signals that a new getter is needed to reattempt to recover the data
delete(g.cache, key)
} else {
// signals that the chunks were fetched/recovered/cached so a future getter is not needed
g.cache[key] = nil
}
}
d = getter.New(addrs, shardCnt, g.fetcher, g.putter, remove, g.config)
g.cache[key] = d
Expand Down
23 changes: 16 additions & 7 deletions pkg/file/redundancy/getter/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
// if retrieves children of an intermediate chunk potentially using erasure decoding
// it caches sibling chunks if erasure decoding started already
type decoder struct {
ctx context.Context
fetcher storage.Getter // network retrieval interface to fetch chunks
putter storage.Putter // interface to local storage to save reconstructed chunks
addrs []swarm.Address // all addresses of the intermediate chunk
Expand All @@ -44,7 +45,7 @@ type decoder struct {
fetchedCnt atomic.Int32 // count successful retrievals
failedCnt atomic.Int32 // count successful retrievals
cancel func() // cancel function for RS decoding
remove func() // callback to remove decoder from decoders cache
remove func(error) // callback to remove decoder from decoders cache
config Config // configuration
logger log.Logger
}
Expand All @@ -55,11 +56,12 @@ type Getter interface {
}

// New returns a decoder object used to retrieve children of an intermediate chunk
func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(), conf Config) Getter {
func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(error), conf Config) Getter {
ctx, cancel := context.WithCancel(context.Background())
size := len(addrs)

d := &decoder{
ctx: ctx,
fetcher: g,
putter: p,
addrs: addrs,
Expand Down Expand Up @@ -143,7 +145,14 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e
defer cancel()

g.wg.Add(1)
defer g.wg.Done()
go func() {
select {
case <-fctx.Done(): // local context
case <-g.ctx.Done(): // global context
}
cancel()
g.wg.Done()
}()

// retrieval
ch, err := g.fetcher.Get(fctx, g.addrs[i])
Expand Down Expand Up @@ -172,8 +181,9 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e
return waitRecovery(storage.ErrNotFound)
}

func (g *decoder) prefetch(ctx context.Context) error {
defer g.remove()
func (g *decoder) prefetch(ctx context.Context) (err error) {
defer g.remove(err)
defer g.cancel()

run := func(s Strategy) error {
if err := g.runStrategy(ctx, s); err != nil {
Expand All @@ -183,7 +193,6 @@ func (g *decoder) prefetch(ctx context.Context) error {
return g.recover(ctx)
}

var err error
for s := g.config.Strategy; s < strategyCnt; s++ {

err = run(s)
Expand Down Expand Up @@ -377,6 +386,6 @@ func (g *decoder) save(ctx context.Context, missing []int) error {
func (g *decoder) Close() error {
g.cancel()
g.wg.Wait()
g.remove()
g.remove(nil)
return nil
}
4 changes: 2 additions & 2 deletions pkg/file/redundancy/getter/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) {
StrategyTimeout: strategyTimeout,
Logger: log.Noop,
}
g := getter.New(addrs, shardCnt, store, store, func() {}, conf)
g := getter.New(addrs, shardCnt, store, store, func(error) {}, conf)
defer g.Close()
parityCnt := len(buf) - shardCnt
q := make(chan error, 1)
Expand Down Expand Up @@ -199,7 +199,7 @@ func testDecodingFallback(t *testing.T, s getter.Strategy, strict bool) {
FetchTimeout: strategyTimeout / 2,
StrategyTimeout: strategyTimeout,
}
g := getter.New(addrs, shardCnt, store, store, func() {}, conf)
g := getter.New(addrs, shardCnt, store, store, func(error) {}, conf)
defer g.Close()

// launch delayed and erased chunk retrieval
Expand Down

0 comments on commit ec435ed

Please sign in to comment.