Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[skip changelog] chore: blockstore_gc: minor cleanups #12312

Merged
merged 3 commits into from
Jul 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 66 additions & 28 deletions blockstore/badger/blockstore.go
Original file line number Diff line number Diff line change
@@ -12,13 +12,14 @@ import (

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/pb"
badgerstruct "github.com/dgraph-io/badger/v2/pb"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
logger "github.com/ipfs/go-log/v2"
pool "github.com/libp2p/go-buffer-pool"
"github.com/multiformats/go-base32"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/xerrors"

@@ -242,7 +243,7 @@ func (b *Blockstore) unlockMove(state bsMoveState) {
// are persisted to the new blockstore; if a failure occurs aboring the move,
// then they must be peristed to the old blockstore.
// In short, the blockstore must not lose data from new writes during the move.
func (b *Blockstore) movingGC() error {
func (b *Blockstore) movingGC(ctx context.Context) error {
// this inlines moveLock/moveUnlock for the initial state check to prevent a second move
// while one is in progress without clobbering state
b.moveMx.Lock()
@@ -327,7 +328,7 @@ func (b *Blockstore) movingGC() error {
b.unlockMove(moveStateMoving)

log.Info("copying blockstore")
err = b.doCopy(b.db, b.dbNext)
err = b.doCopy(ctx, b.db, b.dbNext)
if err != nil {
return fmt.Errorf("error moving badger blockstore to %s: %w", newPath, err)
}
@@ -389,37 +390,66 @@ func symlink(path, linkTo string) error {
return os.Symlink(path, linkTo)
}

// doCopy copies a badger blockstore to another, with an optional filter; if the filter
// is not nil, then only cids that satisfy the filter will be copied.
func (b *Blockstore) doCopy(from, to *badger.DB) error {
workers := runtime.NumCPU() / 2
if workers < 2 {
workers = 2
}
if workers > 8 {
workers = 8
}

stream := from.NewStream()
stream.NumGo = workers
stream.LogPrefix = "doCopy"
stream.Send = func(list *pb.KVList) error {
batch := to.NewWriteBatch()
defer batch.Cancel()
// doCopy copies a badger blockstore to another
func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB) (defErr error) {
batch := to.NewWriteBatch()
defer func() {
if defErr == nil {
defErr = batch.Flush()
}
if defErr != nil {
ZenGround0 marked this conversation as resolved.
Show resolved Hide resolved
batch.Cancel()
}
}()

for _, kv := range list.Kv {
if kv.Key == nil || kv.Value == nil {
ZenGround0 marked this conversation as resolved.
Show resolved Hide resolved
continue
}
return iterateBadger(ctx, from, func(kvs []*badgerstruct.KV) error {
// check whether context is closed on every kv group
if err := ctx.Err(); err != nil {
return err
}
for _, kv := range kvs {
if err := batch.Set(kv.Key, kv.Value); err != nil {
return err
}
}
return nil
})
}

var IterateLSMWorkers int // defaults to between( 2, 8, runtime.NumCPU/2 )

return batch.Flush()
func iterateBadger(ctx context.Context, db *badger.DB, iter func([]*badgerstruct.KV) error) error {
workers := IterateLSMWorkers
if workers == 0 {
workers = between(2, 8, runtime.NumCPU()/2)
}

return stream.Orchestrate(context.Background())
stream := db.NewStream()
stream.NumGo = workers
stream.LogPrefix = "iterateBadgerKVs"
ZenGround0 marked this conversation as resolved.
Show resolved Hide resolved
stream.Send = func(kvl *badgerstruct.KVList) error {
kvs := make([]*badgerstruct.KV, 0, len(kvl.Kv))
for _, kv := range kvl.Kv {
if kv.Key != nil && kv.Value != nil {
kvs = append(kvs, kv)
}
}
if len(kvs) == 0 {
return nil
}
return iter(kvs)
}
return stream.Orchestrate(ctx)
}

func between(min, max, val int) int {
if val > max {
val = max
}
if val < min {
val = min
}
return val
}

func (b *Blockstore) deleteDB(path string) {
@@ -500,7 +530,7 @@ func (b *Blockstore) CollectGarbage(ctx context.Context, opts ...blockstore.Bloc
}

if options.FullGC {
return b.movingGC()
return b.movingGC(ctx)
}
threshold := options.Threshold
if threshold == 0 {
@@ -627,7 +657,15 @@ func (b *Blockstore) Flush(context.Context) error {
b.lockDB()
defer b.unlockDB()

return b.db.Sync()
var nextErr error
if b.dbNext != nil {
nextErr = b.dbNext.Sync()
}

return multierr.Combine(
nextErr,
b.db.Sync(),
)
}

// Has implements Blockstore.Has.