Skip to content

Commit

Permalink
[skip changelog] chore: blockstore_gc: minor cleanups (#12312)
Browse files Browse the repository at this point in the history
* Flush() correctly in case we are in MovingGC

* Allow MovingGC to be interrupted by a context + slight refactor

* switch to using multierr as per review
  • Loading branch information
ribasushi committed Aug 17, 2024
1 parent 569687e commit f33b180
Showing 1 changed file with 66 additions and 28 deletions.
94 changes: 66 additions & 28 deletions blockstore/badger/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import (

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/pb"
badgerstruct "github.com/dgraph-io/badger/v2/pb"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
logger "github.com/ipfs/go-log/v2"
pool "github.com/libp2p/go-buffer-pool"
"github.com/multiformats/go-base32"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/xerrors"

Expand Down Expand Up @@ -242,7 +243,7 @@ func (b *Blockstore) unlockMove(state bsMoveState) {
// are persisted to the new blockstore; if a failure occurs aboring the move,
// then they must be peristed to the old blockstore.
// In short, the blockstore must not lose data from new writes during the move.
func (b *Blockstore) movingGC() error {
func (b *Blockstore) movingGC(ctx context.Context) error {
// this inlines moveLock/moveUnlock for the initial state check to prevent a second move
// while one is in progress without clobbering state
b.moveMx.Lock()
Expand Down Expand Up @@ -327,7 +328,7 @@ func (b *Blockstore) movingGC() error {
b.unlockMove(moveStateMoving)

log.Info("copying blockstore")
err = b.doCopy(b.db, b.dbNext)
err = b.doCopy(ctx, b.db, b.dbNext)
if err != nil {
return fmt.Errorf("error moving badger blockstore to %s: %w", newPath, err)
}
Expand Down Expand Up @@ -389,37 +390,66 @@ func symlink(path, linkTo string) error {
return os.Symlink(path, linkTo)
}

// doCopy copies a badger blockstore to another, with an optional filter; if the filter
// is not nil, then only cids that satisfy the filter will be copied.
func (b *Blockstore) doCopy(from, to *badger.DB) error {
workers := runtime.NumCPU() / 2
if workers < 2 {
workers = 2
}
if workers > 8 {
workers = 8
}

stream := from.NewStream()
stream.NumGo = workers
stream.LogPrefix = "doCopy"
stream.Send = func(list *pb.KVList) error {
batch := to.NewWriteBatch()
defer batch.Cancel()
// doCopy copies a badger blockstore to another
func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB) (defErr error) {
batch := to.NewWriteBatch()
defer func() {
if defErr == nil {
defErr = batch.Flush()
}
if defErr != nil {
batch.Cancel()
}
}()

for _, kv := range list.Kv {
if kv.Key == nil || kv.Value == nil {
continue
}
return iterateBadger(ctx, from, func(kvs []*badgerstruct.KV) error {
// check whether context is closed on every kv group
if err := ctx.Err(); err != nil {
return err
}
for _, kv := range kvs {
if err := batch.Set(kv.Key, kv.Value); err != nil {
return err
}
}
return nil
})
}

var IterateLSMWorkers int // defaults to between( 2, 8, runtime.NumCPU/2 )

return batch.Flush()
func iterateBadger(ctx context.Context, db *badger.DB, iter func([]*badgerstruct.KV) error) error {
workers := IterateLSMWorkers
if workers == 0 {
workers = between(2, 8, runtime.NumCPU()/2)
}

return stream.Orchestrate(context.Background())
stream := db.NewStream()
stream.NumGo = workers
stream.LogPrefix = "iterateBadgerKVs"
stream.Send = func(kvl *badgerstruct.KVList) error {
kvs := make([]*badgerstruct.KV, 0, len(kvl.Kv))
for _, kv := range kvl.Kv {
if kv.Key != nil && kv.Value != nil {
kvs = append(kvs, kv)
}
}
if len(kvs) == 0 {
return nil
}
return iter(kvs)
}
return stream.Orchestrate(ctx)
}

func between(min, max, val int) int {
if val > max {
val = max
}
if val < min {
val = min
}
return val
}

func (b *Blockstore) deleteDB(path string) {
Expand Down Expand Up @@ -500,7 +530,7 @@ func (b *Blockstore) CollectGarbage(ctx context.Context, opts ...blockstore.Bloc
}

if options.FullGC {
return b.movingGC()
return b.movingGC(ctx)
}
threshold := options.Threshold
if threshold == 0 {
Expand Down Expand Up @@ -627,7 +657,15 @@ func (b *Blockstore) Flush(context.Context) error {
b.lockDB()
defer b.unlockDB()

return b.db.Sync()
var nextErr error
if b.dbNext != nil {
nextErr = b.dbNext.Sync()
}

return multierr.Combine(
nextErr,
b.db.Sync(),
)
}

// Has implements Blockstore.Has.
Expand Down

0 comments on commit f33b180

Please sign in to comment.