Skip to content

Commit

Permalink
Close hotstore after writing snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
ZenGround0 committed Dec 25, 2024
1 parent f593bb1 commit cc0acca
Show file tree
Hide file tree
Showing 21 changed files with 54 additions and 58 deletions.
2 changes: 1 addition & 1 deletion chain/gen/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func NewGeneratorWithSectorsAndUpgradeSchedule(numSectors int, us stmgr.UpgradeS
return nil, xerrors.Errorf("failed to get metadata datastore: %w", err)
}

bs, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore)
bs, _, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion chain/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func BenchmarkGetRandomness(b *testing.B) {
b.Fatal(err)
}

bs, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore)
bs, _, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore)
if err != nil {
b.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/lotus-shed/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ var chainBalanceStateCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down Expand Up @@ -721,7 +721,7 @@ var chainPledgeCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return xerrors.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/deal-label.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var dealLabelCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ var diffMinerStates = &cli.Command{
_ = lkrepo.Close()
}(lkrepo)

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/export-car.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var exportCarCmd = &cli.Command{

lr, err := r.Lock(repo.FullNode)
if err == nil {
bs, err = lr.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err = lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ var exportChainCmd = &cli.Command{

defer fi.Close() //nolint:errcheck

bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/lotus-shed/gas-estimation.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var gasTraceCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down Expand Up @@ -177,7 +177,7 @@ var replayOfflineCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/lotus-shed/import-car.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var importCarCmd = &cli.Command{
return xerrors.Errorf("opening the car file: %w", err)
}

bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return err
}
Expand Down Expand Up @@ -118,7 +118,7 @@ var importObjectCmd = &cli.Command{
}
defer lr.Close() //nolint:errcheck

bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/invariants.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var invariantsCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

cold, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
cold, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open universal blockstore %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/miner-peerid.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var minerPeeridCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/miner-types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var minerTypesCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/msig.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var multisigGetAllCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ var stateTreePruneCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/state-stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func loadChainStore(ctx context.Context, repoPath string) (*StoreHandle, error)
return nil, err
}

cold, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
cold, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return nil, xerrors.Errorf("failed to open universal blockstore %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/terminations.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var terminationsCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/lotus/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,10 +546,11 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
}
defer lr.Close() //nolint:errcheck

bs, err := lr.Blockstore(ctx, repo.HotBlockstore)
bs, closer, err := lr.Blockstore(ctx, repo.HotBlockstore)
if err != nil {
return xerrors.Errorf("failed to open blockstore: %w", err)
}
defer closer()

mds, err := lr.Datastore(ctx, "/metadata")
if err != nil {
Expand Down
13 changes: 7 additions & 6 deletions node/modules/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
// chain data and state data. It can be backed by a blockstore directly
// (e.g. Badger), or by a Splitstore.
func UniversalBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.UniversalBlockstore, error) {
bs, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.UniversalBlockstore)
bs, _, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.UniversalBlockstore)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -74,11 +74,12 @@ func BadgerHotBlockstore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.HotBlocksto
return bs, nil
}

func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) {
return func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) {
func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, func() error, error) {
return func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, func() error, error) {
noop := func() error { return nil }
path, err := r.SplitstorePath()
if err != nil {
return nil, err
return nil, noop, err
}

cfg := &splitstore.Config{
Expand All @@ -93,15 +94,15 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked
}
ss, err := splitstore.Open(path, ds, hot, cold, cfg)
if err != nil {
return nil, err
return nil, noop, err
}
lc.Append(fx.Hook{
OnStop: func(context.Context) error {
return ss.Close()
},
})

return ss, err
return ss, noop, nil
}
}

Expand Down
50 changes: 22 additions & 28 deletions node/repo/fsrepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,9 @@ func (fsr *fsLockedRepo) Close() error {
}

// Blockstore returns a blockstore for the provided data domain.
func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) {
func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, func() error, error) {
if domain != UniversalBlockstore && domain != HotBlockstore {
return nil, ErrInvalidBlockstoreDomain
return nil, nil, ErrInvalidBlockstoreDomain
}
if domain == UniversalBlockstore {
fsr.bsOnce.Do(func() {
Expand Down Expand Up @@ -457,34 +457,28 @@ func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain
}
fsr.bs = blockstore.WrapIDStore(bs)
})
return fsr.bs, fsr.bsErr
return fsr.bs, nil, fsr.bsErr
}
// else domain == HotBlockstore {
fsr.bsHotOnce.Do(func() {
path, err := fsr.SplitstorePath()
if err != nil {
fsr.bsHotErr = err
return
}
path = filepath.Join(path, "hot.badger")
if err := os.MkdirAll(path, 0755); err != nil {
fsr.bsHotErr = err
return
}
readonly := fsr.readonly
opts, err := BadgerBlockstoreOptions(HotBlockstore, path, readonly)
if err != nil {
fsr.bsHotErr = err
return
}
bs, err := badgerbs.Open(opts)
if err != nil {
fsr.bsHotErr = err
return
}
fsr.bsHot = bs
})
return fsr.bsHot, fsr.bsHotErr
path, err := fsr.SplitstorePath()
if err != nil {
return nil, nil, err
}
path = filepath.Join(path, "hot.badger")
if err := os.MkdirAll(path, 0755); err != nil {
return nil, nil, err
}
readonly := fsr.readonly
opts, err := BadgerBlockstoreOptions(HotBlockstore, path, readonly)
if err != nil {
return nil, nil, err
}
bs, err := badgerbs.Open(opts)
if err != nil {
return nil, nil, err
}

return bs, bs.Close, nil
}

func (fsr *fsLockedRepo) SplitstorePath() (string, error) {
Expand Down
2 changes: 1 addition & 1 deletion node/repo/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type LockedRepo interface {
// The supplied context must only be used to initialize the blockstore.
// The implementation should not retain the context for usage throughout
// the lifecycle.
Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error)
Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, func() error, error)

// SplitstorePath returns the path for the SplitStore
SplitstorePath() (string, error)
Expand Down
6 changes: 3 additions & 3 deletions node/repo/memrepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,11 @@ func (lmem *lockedMemRepo) Datastore(_ context.Context, ns string) (datastore.Ba
return namespace.Wrap(lmem.mem.datastore, datastore.NewKey(ns)), nil
}

func (lmem *lockedMemRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) {
func (lmem *lockedMemRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, func() error, error) {
if domain != UniversalBlockstore {
return nil, ErrInvalidBlockstoreDomain
return nil, nil, ErrInvalidBlockstoreDomain
}
return lmem.mem.blockstore, nil
return lmem.mem.blockstore, func() error { return nil }, nil
}

func (lmem *lockedMemRepo) SplitstorePath() (string, error) {
Expand Down

0 comments on commit cc0acca

Please sign in to comment.