From cc0acca2290fadd3195a7df814972dcca450cd78 Mon Sep 17 00:00:00 2001 From: zenground0 Date: Thu, 26 Dec 2024 00:50:34 +0530 Subject: [PATCH] Close hotstore after writing snapshot --- chain/gen/gen.go | 2 +- chain/store/store_test.go | 2 +- cmd/lotus-shed/balances.go | 4 +-- cmd/lotus-shed/deal-label.go | 2 +- cmd/lotus-shed/diff.go | 2 +- cmd/lotus-shed/export-car.go | 2 +- cmd/lotus-shed/export.go | 2 +- cmd/lotus-shed/gas-estimation.go | 4 +-- cmd/lotus-shed/import-car.go | 4 +-- cmd/lotus-shed/invariants.go | 2 +- cmd/lotus-shed/miner-peerid.go | 2 +- cmd/lotus-shed/miner-types.go | 2 +- cmd/lotus-shed/msig.go | 2 +- cmd/lotus-shed/pruning.go | 2 +- cmd/lotus-shed/state-stats.go | 2 +- cmd/lotus-shed/terminations.go | 2 +- cmd/lotus/daemon.go | 3 +- node/modules/blockstore.go | 13 +++++---- node/repo/fsrepo.go | 50 ++++++++++++++------------------ node/repo/interface.go | 2 +- node/repo/memrepo.go | 6 ++-- 21 files changed, 54 insertions(+), 58 deletions(-) diff --git a/chain/gen/gen.go b/chain/gen/gen.go index 6747ff4098a..4ee83b7e4fc 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -134,7 +134,7 @@ func NewGeneratorWithSectorsAndUpgradeSchedule(numSectors int, us stmgr.UpgradeS return nil, xerrors.Errorf("failed to get metadata datastore: %w", err) } - bs, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore) + bs, _, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore) if err != nil { return nil, err } diff --git a/chain/store/store_test.go b/chain/store/store_test.go index 5329371ae33..5925ca4a906 100644 --- a/chain/store/store_test.go +++ b/chain/store/store_test.go @@ -56,7 +56,7 @@ func BenchmarkGetRandomness(b *testing.B) { b.Fatal(err) } - bs, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore) + bs, _, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore) if err != nil { b.Fatal(err) } diff --git a/cmd/lotus-shed/balances.go b/cmd/lotus-shed/balances.go index 48030c58ccd..684b940d1a7 100644 --- a/cmd/lotus-shed/balances.go +++ b/cmd/lotus-shed/balances.go @@ -497,7 +497,7 @@ var chainBalanceStateCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } @@ -721,7 +721,7 @@ var chainPledgeCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return xerrors.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/deal-label.go b/cmd/lotus-shed/deal-label.go index 417d1370193..36bb9874f14 100644 --- a/cmd/lotus-shed/deal-label.go +++ b/cmd/lotus-shed/deal-label.go @@ -52,7 +52,7 @@ var dealLabelCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/diff.go b/cmd/lotus-shed/diff.go index bdd2126b6d0..d7237ac522a 100644 --- a/cmd/lotus-shed/diff.go +++ b/cmd/lotus-shed/diff.go @@ -81,7 +81,7 @@ var diffMinerStates = &cli.Command{ _ = lkrepo.Close() }(lkrepo) - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/export-car.go b/cmd/lotus-shed/export-car.go index 7659d981376..81b6bc11e6d 100644 --- a/cmd/lotus-shed/export-car.go +++ b/cmd/lotus-shed/export-car.go @@ -70,7 +70,7 @@ var exportCarCmd = &cli.Command{ lr, err := r.Lock(repo.FullNode) if err == nil { - bs, err = lr.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err = lr.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/export.go b/cmd/lotus-shed/export.go index e6d0c4e056f..f4b42358bbb 100644 --- a/cmd/lotus-shed/export.go +++ b/cmd/lotus-shed/export.go @@ -95,7 +95,7 @@ var exportChainCmd = &cli.Command{ defer fi.Close() //nolint:errcheck - bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lr.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/gas-estimation.go b/cmd/lotus-shed/gas-estimation.go index b1c61b62f2c..a26bf60ca1f 100644 --- a/cmd/lotus-shed/gas-estimation.go +++ b/cmd/lotus-shed/gas-estimation.go @@ -80,7 +80,7 @@ var gasTraceCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } @@ -177,7 +177,7 @@ var replayOfflineCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/import-car.go b/cmd/lotus-shed/import-car.go index 973e7b31b84..b621177157e 100644 --- a/cmd/lotus-shed/import-car.go +++ b/cmd/lotus-shed/import-car.go @@ -47,7 +47,7 @@ var importCarCmd = &cli.Command{ return xerrors.Errorf("opening the car file: %w", err) } - bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lr.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return err } @@ -118,7 +118,7 @@ var importObjectCmd = &cli.Command{ } defer lr.Close() //nolint:errcheck - bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lr.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/invariants.go b/cmd/lotus-shed/invariants.go index bc994bbe633..9ba6292c3e0 100644 --- a/cmd/lotus-shed/invariants.go +++ b/cmd/lotus-shed/invariants.go @@ -79,7 +79,7 @@ var invariantsCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - cold, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + cold, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open universal blockstore %w", err) } diff --git a/cmd/lotus-shed/miner-peerid.go b/cmd/lotus-shed/miner-peerid.go index e430637976c..3d620fdab2b 100644 --- a/cmd/lotus-shed/miner-peerid.go +++ b/cmd/lotus-shed/miner-peerid.go @@ -61,7 +61,7 @@ var minerPeeridCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/miner-types.go b/cmd/lotus-shed/miner-types.go index 822d037aa26..b53d5d58965 100644 --- a/cmd/lotus-shed/miner-types.go +++ b/cmd/lotus-shed/miner-types.go @@ -58,7 +58,7 @@ var minerTypesCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/msig.go b/cmd/lotus-shed/msig.go index ccc932c93ff..918d02f0d41 100644 --- a/cmd/lotus-shed/msig.go +++ b/cmd/lotus-shed/msig.go @@ -70,7 +70,7 @@ var multisigGetAllCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/pruning.go b/cmd/lotus-shed/pruning.go index c0bd453b145..b67b875d810 100644 --- a/cmd/lotus-shed/pruning.go +++ b/cmd/lotus-shed/pruning.go @@ -131,7 +131,7 @@ var stateTreePruneCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go index 88b21f40767..399bae570ed 100644 --- a/cmd/lotus-shed/state-stats.go +++ b/cmd/lotus-shed/state-stats.go @@ -234,7 +234,7 @@ func loadChainStore(ctx context.Context, repoPath string) (*StoreHandle, error) return nil, err } - cold, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + cold, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return nil, xerrors.Errorf("failed to open universal blockstore %w", err) } diff --git a/cmd/lotus-shed/terminations.go b/cmd/lotus-shed/terminations.go index 563c1ba3a77..e67d3831042 100644 --- a/cmd/lotus-shed/terminations.go +++ b/cmd/lotus-shed/terminations.go @@ -61,7 +61,7 @@ var terminationsCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 3d050af9fb0..d9901b29b00 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -546,10 +546,11 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool) } defer lr.Close() //nolint:errcheck - bs, err := lr.Blockstore(ctx, repo.HotBlockstore) + bs, closer, err := lr.Blockstore(ctx, repo.HotBlockstore) if err != nil { return xerrors.Errorf("failed to open blockstore: %w", err) } + defer closer() mds, err := lr.Datastore(ctx, "/metadata") if err != nil { diff --git a/node/modules/blockstore.go b/node/modules/blockstore.go index 9c54d51e60f..ec0d6fa5c03 100644 --- a/node/modules/blockstore.go +++ b/node/modules/blockstore.go @@ -23,7 +23,7 @@ import ( // chain data and state data. It can be backed by a blockstore directly // (e.g. Badger), or by a Splitstore. func UniversalBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.UniversalBlockstore, error) { - bs, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.UniversalBlockstore) + bs, _, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.UniversalBlockstore) if err != nil { return nil, err } @@ -74,11 +74,12 @@ func BadgerHotBlockstore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.HotBlocksto return bs, nil } -func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) { - return func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) { +func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, func() error, error) { + return func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, func() error, error) { + noop := func() error { return nil } path, err := r.SplitstorePath() if err != nil { - return nil, err + return nil, noop, err } cfg := &splitstore.Config{ @@ -93,7 +94,7 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked } ss, err := splitstore.Open(path, ds, hot, cold, cfg) if err != nil { - return nil, err + return nil, noop, err } lc.Append(fx.Hook{ OnStop: func(context.Context) error { @@ -101,7 +102,7 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked }, }) - return ss, err + return ss, noop, nil } } diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index 2737020b896..b769108d72c 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -426,9 +426,9 @@ func (fsr *fsLockedRepo) Close() error { } // Blockstore returns a blockstore for the provided data domain. -func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) { +func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, func() error, error) { if domain != UniversalBlockstore && domain != HotBlockstore { - return nil, ErrInvalidBlockstoreDomain + return nil, nil, ErrInvalidBlockstoreDomain } if domain == UniversalBlockstore { fsr.bsOnce.Do(func() { @@ -457,34 +457,28 @@ func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain } fsr.bs = blockstore.WrapIDStore(bs) }) - return fsr.bs, fsr.bsErr + return fsr.bs, nil, fsr.bsErr } // else domain == HotBlockstore { - fsr.bsHotOnce.Do(func() { - path, err := fsr.SplitstorePath() - if err != nil { - fsr.bsHotErr = err - return - } - path = filepath.Join(path, "hot.badger") - if err := os.MkdirAll(path, 0755); err != nil { - fsr.bsHotErr = err - return - } - readonly := fsr.readonly - opts, err := BadgerBlockstoreOptions(HotBlockstore, path, readonly) - if err != nil { - fsr.bsHotErr = err - return - } - bs, err := badgerbs.Open(opts) - if err != nil { - fsr.bsHotErr = err - return - } - fsr.bsHot = bs - }) - return fsr.bsHot, fsr.bsHotErr + path, err := fsr.SplitstorePath() + if err != nil { + return nil, nil, err + } + path = filepath.Join(path, "hot.badger") + if err := os.MkdirAll(path, 0755); err != nil { + return nil, nil, err + } + readonly := fsr.readonly + opts, err := BadgerBlockstoreOptions(HotBlockstore, path, readonly) + if err != nil { + return nil, nil, err + } + bs, err := badgerbs.Open(opts) + if err != nil { + return nil, nil, err + } + + return bs, bs.Close, nil } func (fsr *fsLockedRepo) SplitstorePath() (string, error) { diff --git a/node/repo/interface.go b/node/repo/interface.go index 100d0dc58d5..056b104c6c0 100644 --- a/node/repo/interface.go +++ b/node/repo/interface.go @@ -64,7 +64,7 @@ type LockedRepo interface { // The supplied context must only be used to initialize the blockstore. // The implementation should not retain the context for usage throughout // the lifecycle. - Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) + Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, func() error, error) // SplitstorePath returns the path for the SplitStore SplitstorePath() (string, error) diff --git a/node/repo/memrepo.go b/node/repo/memrepo.go index cda00f985f2..d602ae8e45f 100644 --- a/node/repo/memrepo.go +++ b/node/repo/memrepo.go @@ -253,11 +253,11 @@ func (lmem *lockedMemRepo) Datastore(_ context.Context, ns string) (datastore.Ba return namespace.Wrap(lmem.mem.datastore, datastore.NewKey(ns)), nil } -func (lmem *lockedMemRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) { +func (lmem *lockedMemRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, func() error, error) { if domain != UniversalBlockstore { - return nil, ErrInvalidBlockstoreDomain + return nil, nil, ErrInvalidBlockstoreDomain } - return lmem.mem.blockstore, nil + return lmem.mem.blockstore, func() error { return nil }, nil } func (lmem *lockedMemRepo) SplitstorePath() (string, error) {