-
Notifications
You must be signed in to change notification settings - Fork 338
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf(getter): cancel inflight requests if enough chunks are fetched for recovery #4608
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ var ( | |
// if retrieves children of an intermediate chunk potentially using erasure decoding | ||
// it caches sibling chunks if erasure decoding started already | ||
type decoder struct { | ||
ctx context.Context | ||
fetcher storage.Getter // network retrieval interface to fetch chunks | ||
putter storage.Putter // interface to local storage to save reconstructed chunks | ||
addrs []swarm.Address // all addresses of the intermediate chunk | ||
|
@@ -40,11 +41,10 @@ type decoder struct { | |
parityCnt int // number of parity shards | ||
wg sync.WaitGroup // wait group to wait for all goroutines to finish | ||
mu sync.Mutex // mutex to protect buffer | ||
err error // error of the last erasure decoding | ||
fetchedCnt atomic.Int32 // count successful retrievals | ||
failedCnt atomic.Int32 // count successful retrievals | ||
cancel func() // cancel function for RS decoding | ||
remove func() // callback to remove decoder from decoders cache | ||
remove func(error) // callback to remove decoder from decoders cache | ||
config Config // configuration | ||
logger log.Logger | ||
} | ||
|
@@ -55,11 +55,13 @@ type Getter interface { | |
} | ||
|
||
// New returns a decoder object used to retrieve children of an intermediate chunk | ||
func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(), conf Config) Getter { | ||
func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(error), conf Config) Getter { | ||
// global context is canceled when the Close is called or when the prefetch terminates | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
size := len(addrs) | ||
|
||
d := &decoder{ | ||
ctx: ctx, | ||
fetcher: g, | ||
putter: p, | ||
addrs: addrs, | ||
|
@@ -92,7 +94,7 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter | |
d.wg.Add(1) | ||
go func() { | ||
defer d.wg.Done() | ||
d.err = d.prefetch(ctx) | ||
_ = d.prefetch(ctx) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we need to call |
||
}() | ||
} else { // recovery not allowed | ||
close(d.badRecovery) | ||
|
@@ -143,7 +145,14 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e | |
defer cancel() | ||
|
||
g.wg.Add(1) | ||
defer g.wg.Done() | ||
go func() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is this for? |
||
select { | ||
case <-fctx.Done(): // local context | ||
case <-g.ctx.Done(): // global context | ||
} | ||
cancel() | ||
g.wg.Done() | ||
}() | ||
|
||
// retrieval | ||
ch, err := g.fetcher.Get(fctx, g.addrs[i]) | ||
|
@@ -172,8 +181,9 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e | |
return waitRecovery(storage.ErrNotFound) | ||
} | ||
|
||
func (g *decoder) prefetch(ctx context.Context) error { | ||
defer g.remove() | ||
func (g *decoder) prefetch(ctx context.Context) (err error) { | ||
defer g.remove(err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this will never give you non-nil.
no? |
||
defer g.cancel() | ||
|
||
run := func(s Strategy) error { | ||
if err := g.runStrategy(ctx, s); err != nil { | ||
|
@@ -183,7 +193,6 @@ func (g *decoder) prefetch(ctx context.Context) error { | |
return g.recover(ctx) | ||
} | ||
|
||
var err error | ||
for s := g.config.Strategy; s < strategyCnt; s++ { | ||
|
||
err = run(s) | ||
|
@@ -377,6 +386,6 @@ func (g *decoder) save(ctx context.Context, missing []int) error { | |
func (g *decoder) Close() error { | ||
g.cancel() | ||
g.wg.Wait() | ||
g.remove() | ||
g.remove(nil) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When we force close the decoder, the joiner entry should be completely removed from the cache so that when the same scope is called another time, the decoder is to rebuilt |
||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is known anti-pattern. And clearly not needed here