diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b096c616f..dee7c87c35 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ # UNRELEASED +- Sync snapshots directly to hotstore. Save 120 G of lotus disk space. ([filecoin-project/lotus#12800](https://github.com/filecoin-project/lotus/pull/12803)) - Add json output of tipsets to `louts chain list`. ([filecoin-project/lotus#12691](https://github.com/filecoin-project/lotus/pull/12691)) - Remove IPNI advertisement relay over pubsub via Lotus node as it now has been deprecated. ([filecoin-project/lotus#12768](https://github.com/filecoin-project/lotus/pull/12768) - During a network upgrade, log migration progress every 2 seconds so they are more helpful and informative. The `LOTUS_MIGRATE_PROGRESS_LOG_SECONDS` environment variable can be used to change this if needed. ([filecoin-project/lotus#12732](https://github.com/filecoin-project/lotus/pull/12732)) diff --git a/blockstore/mem.go b/blockstore/mem.go index 8dbf4b719a..6babbf508e 100644 --- a/blockstore/mem.go +++ b/blockstore/mem.go @@ -107,3 +107,12 @@ func (m MemBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) func (m MemBlockstore) HashOnRead(enabled bool) { // no-op } + +func (m MemBlockstore) ForEachKey(f func(c cid.Cid) error) error { + for _, b := range m { + if err := f(b.Cid()); err != nil { + return err + } + } + return nil +} diff --git a/blockstore/splitstore/markset_map.go b/blockstore/splitstore/markset_map.go index 52efcb8a48..922800eafa 100644 --- a/blockstore/splitstore/markset_map.go +++ b/blockstore/splitstore/markset_map.go @@ -42,6 +42,11 @@ func NewMapMarkSetEnv(path string) (*MapMarkSetEnv, error) { func (e *MapMarkSetEnv) New(name string, sizeHint int64) (MarkSet, error) { path := filepath.Join(e.path, name) + // Limit map size to 1k if sizeHint exceeds this value + // This prevents accidental hanging when sizeHint is too large + if sizeHint > 1000 { + sizeHint = 1000 + } return &MapMarkSet{ set: make(map[string]struct{}, sizeHint), path: path, diff --git a/blockstore/splitstore/splitstore_test.go b/blockstore/splitstore/splitstore_test.go index 23eadec1fb..ad6639f77f 100644 --- a/blockstore/splitstore/splitstore_test.go +++ b/blockstore/splitstore/splitstore_test.go @@ -163,11 +163,11 @@ func testSplitStore(t *testing.T, cfg *Config) { hotCnt := countBlocks(hot) if coldCnt != 2 { - t.Errorf("expected %d blocks, but got %d", 2, coldCnt) + t.Fatalf("expected %d blocks, but got %d", 2, coldCnt) } if hotCnt != 12 { - t.Errorf("expected %d blocks, but got %d", 12, hotCnt) + t.Fatalf("expected %d blocks, but got %d", 12, hotCnt) } // trigger a compaction diff --git a/blockstore/splitstore/splitstore_warmup.go b/blockstore/splitstore/splitstore_warmup.go index 5a8dbc4d6c..1dd64236d3 100644 --- a/blockstore/splitstore/splitstore_warmup.go +++ b/blockstore/splitstore/splitstore_warmup.go @@ -36,6 +36,7 @@ func (s *SplitStore) warmup(curTs *types.TipSet) error { start := time.Now() err := s.doWarmup(curTs) + if err != nil { log.Errorf("error warming up hotstore: %s", err) return @@ -47,9 +48,11 @@ func (s *SplitStore) warmup(curTs *types.TipSet) error { return nil } -// the actual warmup procedure; it walks the chain loading all state roots at the boundary -// and headers all the way up to genesis. -// objects are written in batches so as to minimize overhead. +// the full warmup procedure +// Most of this is unnecessary in the common case where we are starting from a snapshot +// since snapshots are now loaded directly to the hotstore. However this is safe and robust, +// handling all cases where we are transition from a universal setup to a splitstore setup. +// And warmup costs are only paid on init so it is not func (s *SplitStore) doWarmup(curTs *types.TipSet) error { var boundaryEpoch abi.ChainEpoch epoch := curTs.Height() diff --git a/blockstore/sync.go b/blockstore/sync.go index 4f0cf830ee..37d5098300 100644 --- a/blockstore/sync.go +++ b/blockstore/sync.go @@ -81,3 +81,9 @@ func (m *SyncBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error func (m *SyncBlockstore) HashOnRead(enabled bool) { // noop } + +func (m *SyncBlockstore) ForEachKey(f func(c cid.Cid) error) error { + m.mu.RLock() + defer m.mu.RUnlock() + return m.bs.ForEachKey(f) +} diff --git a/chain/gen/gen.go b/chain/gen/gen.go index 6747ff4098..4ee83b7e4f 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -134,7 +134,7 @@ func NewGeneratorWithSectorsAndUpgradeSchedule(numSectors int, us stmgr.UpgradeS return nil, xerrors.Errorf("failed to get metadata datastore: %w", err) } - bs, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore) + bs, _, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore) if err != nil { return nil, err } diff --git a/chain/store/store_test.go b/chain/store/store_test.go index 5329371ae3..5925ca4a90 100644 --- a/chain/store/store_test.go +++ b/chain/store/store_test.go @@ -56,7 +56,7 @@ func BenchmarkGetRandomness(b *testing.B) { b.Fatal(err) } - bs, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore) + bs, _, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore) if err != nil { b.Fatal(err) } diff --git a/cmd/lotus-shed/balances.go b/cmd/lotus-shed/balances.go index 48030c58cc..684b940d1a 100644 --- a/cmd/lotus-shed/balances.go +++ b/cmd/lotus-shed/balances.go @@ -497,7 +497,7 @@ var chainBalanceStateCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } @@ -721,7 +721,7 @@ var chainPledgeCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return xerrors.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/deal-label.go b/cmd/lotus-shed/deal-label.go index 417d137019..36bb9874f1 100644 --- a/cmd/lotus-shed/deal-label.go +++ b/cmd/lotus-shed/deal-label.go @@ -52,7 +52,7 @@ var dealLabelCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/diff.go b/cmd/lotus-shed/diff.go index bdd2126b6d..d7237ac522 100644 --- a/cmd/lotus-shed/diff.go +++ b/cmd/lotus-shed/diff.go @@ -81,7 +81,7 @@ var diffMinerStates = &cli.Command{ _ = lkrepo.Close() }(lkrepo) - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/export-car.go b/cmd/lotus-shed/export-car.go index 7659d98137..81b6bc11e6 100644 --- a/cmd/lotus-shed/export-car.go +++ b/cmd/lotus-shed/export-car.go @@ -70,7 +70,7 @@ var exportCarCmd = &cli.Command{ lr, err := r.Lock(repo.FullNode) if err == nil { - bs, err = lr.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err = lr.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/export.go b/cmd/lotus-shed/export.go index e6d0c4e056..f4b42358bb 100644 --- a/cmd/lotus-shed/export.go +++ b/cmd/lotus-shed/export.go @@ -95,7 +95,7 @@ var exportChainCmd = &cli.Command{ defer fi.Close() //nolint:errcheck - bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lr.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/gas-estimation.go b/cmd/lotus-shed/gas-estimation.go index b1c61b62f2..a26bf60ca1 100644 --- a/cmd/lotus-shed/gas-estimation.go +++ b/cmd/lotus-shed/gas-estimation.go @@ -80,7 +80,7 @@ var gasTraceCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } @@ -177,7 +177,7 @@ var replayOfflineCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/import-car.go b/cmd/lotus-shed/import-car.go index 973e7b31b8..b621177157 100644 --- a/cmd/lotus-shed/import-car.go +++ b/cmd/lotus-shed/import-car.go @@ -47,7 +47,7 @@ var importCarCmd = &cli.Command{ return xerrors.Errorf("opening the car file: %w", err) } - bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lr.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return err } @@ -118,7 +118,7 @@ var importObjectCmd = &cli.Command{ } defer lr.Close() //nolint:errcheck - bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lr.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/invariants.go b/cmd/lotus-shed/invariants.go index bc994bbe63..9ba6292c3e 100644 --- a/cmd/lotus-shed/invariants.go +++ b/cmd/lotus-shed/invariants.go @@ -79,7 +79,7 @@ var invariantsCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - cold, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + cold, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open universal blockstore %w", err) } diff --git a/cmd/lotus-shed/migrations.go b/cmd/lotus-shed/migrations.go index be0ef834c4..c3cfa98a67 100644 --- a/cmd/lotus-shed/migrations.go +++ b/cmd/lotus-shed/migrations.go @@ -128,10 +128,15 @@ var migrationsCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - cold, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + cold, closer, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open universal blockstore %w", err) } + defer func() { + if err := closer(); err != nil { + log.Warnf("failed to close universal blockstore: %s", err) + } + }() path, err := lkrepo.SplitstorePath() if err != nil { diff --git a/cmd/lotus-shed/miner-peerid.go b/cmd/lotus-shed/miner-peerid.go index e430637976..3d620fdab2 100644 --- a/cmd/lotus-shed/miner-peerid.go +++ b/cmd/lotus-shed/miner-peerid.go @@ -61,7 +61,7 @@ var minerPeeridCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/miner-types.go b/cmd/lotus-shed/miner-types.go index 822d037aa2..b53d5d5896 100644 --- a/cmd/lotus-shed/miner-types.go +++ b/cmd/lotus-shed/miner-types.go @@ -58,7 +58,7 @@ var minerTypesCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/msig.go b/cmd/lotus-shed/msig.go index ccc932c93f..918d02f0d4 100644 --- a/cmd/lotus-shed/msig.go +++ b/cmd/lotus-shed/msig.go @@ -70,7 +70,7 @@ var multisigGetAllCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/pruning.go b/cmd/lotus-shed/pruning.go index c0bd453b14..b67b875d81 100644 --- a/cmd/lotus-shed/pruning.go +++ b/cmd/lotus-shed/pruning.go @@ -131,7 +131,7 @@ var stateTreePruneCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/splitstore.go b/cmd/lotus-shed/splitstore.go index e8c45a0c5e..eaf218eefd 100644 --- a/cmd/lotus-shed/splitstore.go +++ b/cmd/lotus-shed/splitstore.go @@ -3,6 +3,7 @@ package main import ( "bufio" "context" + "encoding/binary" "fmt" "io" "os" @@ -31,6 +32,7 @@ var splitstoreCmd = &cli.Command{ splitstoreClearCmd, splitstoreCheckCmd, splitstoreInfoCmd, + splitstoreRepoInfoCmd, }, } @@ -353,6 +355,40 @@ func deleteSplitstoreKeys(lr repo.LockedRepo) error { return nil } +func printSplitstoreKeys(lr repo.LockedRepo) error { + ds, err := lr.Datastore(context.TODO(), "/metadata") + if err != nil { + return xerrors.Errorf("error opening datastore: %w", err) + } + if closer, ok := ds.(io.Closer); ok { + defer closer.Close() //nolint + } + + res, err := ds.Query(context.Background(), query.Query{Prefix: "/splitstore"}) + if err != nil { + return xerrors.Errorf("error querying datastore for splitstore keys: %w", err) + } + + fmt.Println("Splitstore keys and values:") + for r := range res.Next() { + if r.Error != nil { + return xerrors.Errorf("datastore query error: %w", r.Error) + } + + // Get the value for this key + value, err := ds.Get(context.Background(), datastore.NewKey(r.Key)) + if err != nil { + return xerrors.Errorf("error getting value for key %s: %w", r.Key, err) + } + + // Decode the value as a uvarint + decoded, _ := binary.Uvarint(value) + fmt.Printf(" %s: %d\n", r.Key, decoded) + } + + return nil +} + // badger logging through go-log type badgerLogger struct { *zap.SugaredLogger @@ -378,6 +414,39 @@ var splitstoreCheckCmd = &cli.Command{ }, } +var splitstoreRepoInfoCmd = &cli.Command{ + Name: "splitstore-info", + Usage: "Display splitstore metadata information", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "repo", + Value: "~/.lotus", + Usage: "lotus repo path", + }, + }, + Action: func(cctx *cli.Context) error { + r, err := repo.NewFS(cctx.String("repo")) + if err != nil { + return xerrors.Errorf("error opening fs repo: %w", err) + } + + exists, err := r.Exists() + if err != nil { + return err + } + if !exists { + return xerrors.Errorf("lotus repo doesn't exist") + } + + lr, err := r.Lock(repo.FullNode) + if err != nil { + return xerrors.Errorf("error locking repo: %w", err) + } + + return printSplitstoreKeys(lr) + }, +} + var splitstoreInfoCmd = &cli.Command{ Name: "info", Description: "prints some basic splitstore information", diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go index 88b21f4076..399bae570e 100644 --- a/cmd/lotus-shed/state-stats.go +++ b/cmd/lotus-shed/state-stats.go @@ -234,7 +234,7 @@ func loadChainStore(ctx context.Context, repoPath string) (*StoreHandle, error) return nil, err } - cold, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + cold, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return nil, xerrors.Errorf("failed to open universal blockstore %w", err) } diff --git a/cmd/lotus-shed/terminations.go b/cmd/lotus-shed/terminations.go index 563c1ba3a7..e67d383104 100644 --- a/cmd/lotus-shed/terminations.go +++ b/cmd/lotus-shed/terminations.go @@ -61,7 +61,7 @@ var terminationsCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-sim/simulation/node.go b/cmd/lotus-sim/simulation/node.go index cda3e69d83..f1efb2543c 100644 --- a/cmd/lotus-sim/simulation/node.go +++ b/cmd/lotus-sim/simulation/node.go @@ -51,7 +51,7 @@ func NewNode(ctx context.Context, r repo.Repo) (nd *Node, _err error) { } }() - bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) + bs, _, err := lr.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return nil, err } diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 97d2294b79..707cd49dd4 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -31,6 +31,7 @@ import ( "github.com/filecoin-project/go-paramfetch" lapi "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/chain/beacon/drand" @@ -544,11 +545,27 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool) if err != nil { return err } - defer lr.Close() //nolint:errcheck - bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) + c, err := lr.Config() if err != nil { - return xerrors.Errorf("failed to open blockstore: %w", err) + return err + } + cfg, ok := c.(*config.FullNode) + if !ok { + return xerrors.Errorf("invalid config for repo, got: %T", c) + } + + var bs blockstore.Blockstore + if cfg.Chainstore.EnableSplitstore { + bs, _, err = lr.Blockstore(ctx, repo.HotBlockstore) + if err != nil { + return xerrors.Errorf("failed to open hot blockstore: %w", err) + } + } else { + bs, _, err = lr.Blockstore(ctx, repo.UniversalBlockstore) + if err != nil { + return xerrors.Errorf("failed to open universalblockstore: %w", err) + } } mds, err := lr.Datastore(ctx, "/metadata") @@ -629,15 +646,6 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool) return err } - c, err := lr.Config() - if err != nil { - return err - } - cfg, ok := c.(*config.FullNode) - if !ok { - return xerrors.Errorf("invalid config for repo, got: %T", c) - } - if !cfg.ChainIndexer.EnableIndexer { log.Info("chain indexer is disabled, not populating index from snapshot") return nil diff --git a/go.mod b/go.mod index f872e98412..f461d318fd 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,7 @@ require ( github.com/elastic/go-sysinfo v1.7.0 github.com/elastic/gosigar v0.14.3 github.com/etclabscore/go-openrpc-reflect v0.0.36 - github.com/fatih/color v1.15.0 + github.com/fatih/color v1.16.0 github.com/filecoin-project/filecoin-ffi v1.31.0 github.com/filecoin-project/go-address v1.2.0 github.com/filecoin-project/go-amt-ipld/v4 v4.4.0 @@ -83,7 +83,7 @@ require ( github.com/hashicorp/golang-lru/arc/v2 v2.0.7 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/icza/backscanner v0.0.0-20210726202459-ac2ffc679f94 - github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab + github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c github.com/invopop/jsonschema v0.12.0 github.com/ipfs/bbloom v0.0.4 github.com/ipfs/boxo v0.20.0 @@ -136,7 +136,7 @@ require ( github.com/stretchr/testify v1.10.0 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // dependency-check-ignore: unknown github.com/triplewz/poseidon v0.0.2 - github.com/urfave/cli/v2 v2.25.5 + github.com/urfave/cli/v2 v2.25.7 github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba github.com/whyrusleeping/cbor-gen v0.2.0 github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 @@ -211,7 +211,7 @@ require ( github.com/go-logfmt/logfmt v0.6.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/go-ole/go-ole v1.2.5 // indirect + github.com/go-ole/go-ole v1.3.0 // indirect github.com/go-openapi/jsonpointer v0.19.3 // indirect github.com/go-openapi/jsonreference v0.19.4 // indirect github.com/go-openapi/swag v0.19.11 // indirect diff --git a/go.sum b/go.sum index 2b50a03dba..b6158f6645 100644 --- a/go.sum +++ b/go.sum @@ -250,8 +250,8 @@ github.com/etclabscore/go-jsonschema-walk v0.0.6/go.mod h1:VdfDY72AFAiUhy0ZXEaWS github.com/etclabscore/go-openrpc-reflect v0.0.36 h1:kSqNB2U8RVoW4si+4fsv13NGNkRAQ5j78zTUx1qiehk= github.com/etclabscore/go-openrpc-reflect v0.0.36/go.mod h1:0404Ky3igAasAOpyj1eESjstTyneBAIk5PgJFbK4s5E= github.com/fatih/color v1.8.0/go.mod h1:3l45GVGkyrnYNl9HoIjnp2NnNWvh6hLAqD8yTfGjnw8= -github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= -github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= +github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= +github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= @@ -382,8 +382,9 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY= github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= +github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= github.com/go-openapi/jsonpointer v0.19.2/go.mod h1:3akKfEdA7DF1sugOqz1dVQHBcuDBPKZGEoHC/NkiQRg= github.com/go-openapi/jsonpointer v0.19.3 h1:gihV7YNZK1iK6Tgwwsxo2rJbD1GTbdm72325Bq8FI3w= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= @@ -570,8 +571,8 @@ github.com/icza/mighty v0.0.0-20180919140131-cfd07d671de6 h1:8UsGZ2rr2ksmEru6lTo github.com/icza/mighty v0.0.0-20180919140131-cfd07d671de6/go.mod h1:xQig96I1VNBDIWGCdTt54nHt6EeI639SmHycLYL7FkA= github.com/imdario/mergo v0.3.7/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab h1:HqW4xhhynfjrtEiiSGcQUd6vrK23iMam1FO8rI7mwig= -github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= +github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c h1:qSHzRbhzK8RdXOsAdfDgO49TtqC1oZ+acxPrkfTxcCs= +github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/invopop/jsonschema v0.12.0 h1:6ovsNSuvn9wEQVOyc72aycBMVQFKz7cPdMJn10CvzRI= github.com/invopop/jsonschema v0.12.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0= github.com/ipfs/bbloom v0.0.1/go.mod h1:oqo8CVWsJFMOZqTglBG4wydCE4IQA/G2/SEofB0rjUI= @@ -1294,8 +1295,8 @@ github.com/uber/jaeger-lib v2.4.1+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6 github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= -github.com/urfave/cli/v2 v2.25.5 h1:d0NIAyhh5shGscroL7ek/Ya9QYQE0KNabJgiUinIQkc= -github.com/urfave/cli/v2 v2.25.5/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc= +github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs= +github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.0.1 h1:tY9CJiPnMXf1ERmG2EyK7gNUd+c6RKGD0IfU8WdUSz8= diff --git a/node/modules/blockstore.go b/node/modules/blockstore.go index 9c54d51e60..ac16a0f7e7 100644 --- a/node/modules/blockstore.go +++ b/node/modules/blockstore.go @@ -2,16 +2,12 @@ package modules import ( "context" - "io" - "os" - "path/filepath" bstore "github.com/ipfs/boxo/blockstore" "go.uber.org/fx" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/blockstore" - badgerbs "github.com/filecoin-project/lotus/blockstore/badger" "github.com/filecoin-project/lotus/blockstore/splitstore" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/dtypes" @@ -23,17 +19,16 @@ import ( // chain data and state data. It can be backed by a blockstore directly // (e.g. Badger), or by a Splitstore. func UniversalBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.UniversalBlockstore, error) { - bs, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.UniversalBlockstore) + bs, closer, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.UniversalBlockstore) if err != nil { return nil, err } - if c, ok := bs.(io.Closer); ok { - lc.Append(fx.Hook{ - OnStop: func(_ context.Context) error { - return c.Close() - }, - }) - } + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + return closer() + }, + }) + return bs, err } @@ -45,46 +40,32 @@ func DiscardColdBlockstore(lc fx.Lifecycle, bs dtypes.UniversalBlockstore) (dtyp return blockstore.NewDiscardStore(bs), nil } -func BadgerHotBlockstore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.HotBlockstore, error) { - path, err := r.SplitstorePath() +func BadgerHotBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.HotBlockstore, error) { + bs, closer, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.HotBlockstore) if err != nil { return nil, err } - - path = filepath.Join(path, "hot.badger") - if err := os.MkdirAll(path, 0755); err != nil { - return nil, err - } - - opts, err := repo.BadgerBlockstoreOptions(repo.HotBlockstore, path, r.Readonly()) - if err != nil { - return nil, err - } - - bs, err := badgerbs.Open(opts) - if err != nil { - return nil, err - } - lc.Append(fx.Hook{ OnStop: func(_ context.Context) error { - return bs.Close() - }}) - + return closer() + }, + }) return bs, nil } -func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) { - return func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) { +func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, func() error, error) { + return func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, func() error, error) { + noop := func() error { return nil } path, err := r.SplitstorePath() if err != nil { - return nil, err + return nil, noop, err } cfg := &splitstore.Config{ - MarkSetType: cfg.Splitstore.MarkSetType, - DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard", - UniversalColdBlocks: cfg.Splitstore.ColdStoreType == "universal", + MarkSetType: cfg.Splitstore.MarkSetType, + DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard", + UniversalColdBlocks: cfg.Splitstore.ColdStoreType == "universal", + HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention, HotStoreFullGCFrequency: cfg.Splitstore.HotStoreFullGCFrequency, HotstoreMaxSpaceTarget: cfg.Splitstore.HotStoreMaxSpaceTarget, @@ -93,7 +74,7 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked } ss, err := splitstore.Open(path, ds, hot, cold, cfg) if err != nil { - return nil, err + return nil, noop, err } lc.Append(fx.Hook{ OnStop: func(context.Context) error { @@ -101,7 +82,7 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked }, }) - return ss, err + return ss, noop, nil } } diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index 1c2e9f738d..b5ff65b392 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -369,12 +369,15 @@ type fsLockedRepo struct { dsErr error dsOnce sync.Once - bs blockstore.Blockstore - bsErr error - bsOnce sync.Once - ssPath string - ssErr error - ssOnce sync.Once + bs blockstore.Blockstore + bsOnce sync.Once + bsErr error + bsHot blockstore.Blockstore + bsHotOnce sync.Once + bsHotErr error + ssPath string + ssErr error + ssOnce sync.Once chainIndexPath string chainIndexErr error @@ -416,6 +419,11 @@ func (fsr *fsLockedRepo) Close() error { return xerrors.Errorf("could not close blockstore: %w", err) } } + if c, ok := fsr.bsHot.(io.Closer); ok && c != nil { + if err := c.Close(); err != nil { + return xerrors.Errorf("could not close blockstore: %w", err) + } + } err = fsr.closer.Close() fsr.closer = nil @@ -423,39 +431,76 @@ func (fsr *fsLockedRepo) Close() error { } // Blockstore returns a blockstore for the provided data domain. -func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) { - if domain != UniversalBlockstore { - return nil, ErrInvalidBlockstoreDomain - } - - fsr.bsOnce.Do(func() { - path := fsr.join(filepath.Join(fsDatastore, "chain")) - readonly := fsr.readonly - - if err := os.MkdirAll(path, 0755); err != nil { - fsr.bsErr = err - return - } +func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, func() error, error) { + if domain != UniversalBlockstore && domain != HotBlockstore { + log.Error("invalid blockstore domain", domain) + return nil, nil, ErrInvalidBlockstoreDomain + } + var bs blockstore.Blockstore + var err error + var closer func() error + if domain == UniversalBlockstore { + fsr.bsOnce.Do(func() { + path := fsr.join(filepath.Join(fsDatastore, "chain")) + readonly := fsr.readonly + + if err = os.MkdirAll(path, 0755); err != nil { + fsr.bsErr = err + return + } - opts, err := BadgerBlockstoreOptions(domain, path, readonly) - if err != nil { - fsr.bsErr = err - return - } + opts, err := BadgerBlockstoreOptions(domain, path, readonly) + if err != nil { + fsr.bsErr = err + return + } - if system.BadgerFsyncDisable { - opts.SyncWrites = false - } + if system.BadgerFsyncDisable { + opts.SyncWrites = false + } - bs, err := badgerbs.Open(opts) - if err != nil { - fsr.bsErr = err - return - } - fsr.bs = blockstore.WrapIDStore(bs) - }) + bbs, err := badgerbs.Open(opts) + if err != nil { + fsr.bsErr = err + return + } + closer = bbs.Close + fsr.bs = blockstore.WrapIDStore(bbs) + bs = fsr.bs + }) + err = fsr.bsErr + } else { + // else domain == HotBlockstore { + fsr.bsHotOnce.Do(func() { + path, err := fsr.SplitstorePath() + if err != nil { + fsr.bsHotErr = err + return + } + path = filepath.Join(path, "hot.badger") + if err := os.MkdirAll(path, 0755); err != nil { + fsr.bsHotErr = err + return + } + readonly := fsr.readonly + opts, err := BadgerBlockstoreOptions(HotBlockstore, path, readonly) + if err != nil { + fsr.bsHotErr = err + return + } + bbs, err := badgerbs.Open(opts) + if err != nil { + fsr.bsHotErr = err + return + } + fsr.bsHot = bbs + closer = bbs.Close + bs = fsr.bsHot + }) + err = fsr.bsHotErr + } - return fsr.bs, fsr.bsErr + return bs, closer, err } func (fsr *fsLockedRepo) SplitstorePath() (string, error) { diff --git a/node/repo/interface.go b/node/repo/interface.go index 100d0dc58d..056b104c6c 100644 --- a/node/repo/interface.go +++ b/node/repo/interface.go @@ -64,7 +64,7 @@ type LockedRepo interface { // The supplied context must only be used to initialize the blockstore. // The implementation should not retain the context for usage throughout // the lifecycle. - Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) + Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, func() error, error) // SplitstorePath returns the path for the SplitStore SplitstorePath() (string, error) diff --git a/node/repo/memrepo.go b/node/repo/memrepo.go index cda00f985f..5da2b1711b 100644 --- a/node/repo/memrepo.go +++ b/node/repo/memrepo.go @@ -253,11 +253,11 @@ func (lmem *lockedMemRepo) Datastore(_ context.Context, ns string) (datastore.Ba return namespace.Wrap(lmem.mem.datastore, datastore.NewKey(ns)), nil } -func (lmem *lockedMemRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) { - if domain != UniversalBlockstore { - return nil, ErrInvalidBlockstoreDomain +func (lmem *lockedMemRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, func() error, error) { + if domain != UniversalBlockstore && domain != HotBlockstore { + return nil, nil, ErrInvalidBlockstoreDomain } - return lmem.mem.blockstore, nil + return lmem.mem.blockstore, func() error { return nil }, nil } func (lmem *lockedMemRepo) SplitstorePath() (string, error) {