Skip to content

Commit

Permalink
load snapshot to hotstore and skip warmup
Browse files Browse the repository at this point in the history
  • Loading branch information
ZenGround0 committed Dec 25, 2024
1 parent f11a49a commit f593bb1
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 125 deletions.
191 changes: 105 additions & 86 deletions blockstore/splitstore/splitstore_warmup.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
package splitstore

import (
"sync"
"sync/atomic"
"time"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
)
Expand All @@ -35,7 +29,7 @@ func (s *SplitStore) warmup(curTs *types.TipSet) error {
log.Info("warming up hotstore")
start := time.Now()

err := s.doWarmup(curTs)
err := s.doWarmup2(curTs)
if err != nil {
log.Errorf("error warming up hotstore: %s", err)
return
Expand All @@ -47,86 +41,14 @@ func (s *SplitStore) warmup(curTs *types.TipSet) error {
return nil
}

// the actual warmup procedure; it walks the chain loading all state roots at the boundary
// and headers all the way up to genesis.
// objects are written in batches so as to minimize overhead.
func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
var boundaryEpoch abi.ChainEpoch
func (s *SplitStore) doWarmup2(curTs *types.TipSet) error {
log.Infow("warmup starting")

epoch := curTs.Height()
if WarmupBoundary < epoch {
boundaryEpoch = epoch - WarmupBoundary
}
var mx sync.Mutex
batchHot := make([]blocks.Block, 0, batchSize)
count := new(int64)
xcount := new(int64)
missing := new(int64)

visitor, err := s.markSetEnv.New("warmup", 0)
if err != nil {
return xerrors.Errorf("error creating visitor: %w", err)
}
defer visitor.Close() //nolint

err = s.walkChain(curTs, boundaryEpoch, epoch+1, // we don't load messages/receipts in warmup
visitor,
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
}

atomic.AddInt64(count, 1)

has, err := s.hot.Has(s.ctx, c)
if err != nil {
return err
}

if has {
return nil
}

blk, err := s.cold.Get(s.ctx, c)
if err != nil {
if ipld.IsNotFound(err) {
atomic.AddInt64(missing, 1)
return errStopWalk
}
return err
}

atomic.AddInt64(xcount, 1)

mx.Lock()
batchHot = append(batchHot, blk)
if len(batchHot) == batchSize {
err = s.hot.PutMany(s.ctx, batchHot)
if err != nil {
mx.Unlock()
return err
}
batchHot = batchHot[:0]
}
mx.Unlock()

return nil
}, func(cid.Cid) error { return nil })

if err != nil {
return err
}

if len(batchHot) > 0 {
err = s.hot.PutMany(s.ctx, batchHot)
if err != nil {
return err
}
}

log.Infow("warmup stats", "visited", *count, "warm", *xcount, "missing", *missing)

*count = 10_000_000_000
s.markSetSize = *count + *count>>2 // overestimate a bit
err = s.ds.Put(s.ctx, markSetSizeKey, int64ToBytes(s.markSetSize))
err := s.ds.Put(s.ctx, markSetSizeKey, int64ToBytes(s.markSetSize))
if err != nil {
log.Warnf("error saving mark set size: %s", err)
}
Expand All @@ -136,14 +58,111 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
if err != nil {
return xerrors.Errorf("error saving warm up epoch: %w", err)
}

s.warmupEpoch.Store(int64(epoch))

// also save the compactionIndex, as this is used as an indicator of warmup for upgraded nodes
err = s.ds.Put(s.ctx, compactionIndexKey, int64ToBytes(s.compactionIndex))
if err != nil {
return xerrors.Errorf("error saving compaction index: %w", err)
}

return nil
}

// the legacy warmup procedure before we wrote snapshots directly to the hotstore
// func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
// var boundaryEpoch abi.ChainEpoch
// epoch := curTs.Height()
// if WarmupBoundary < epoch {
// boundaryEpoch = epoch - WarmupBoundary
// }
// var mx sync.Mutex
// batchHot := make([]blocks.Block, 0, batchSize)
// count := new(int64)
// xcount := new(int64)
// missing := new(int64)

// visitor, err := s.markSetEnv.New("warmup", 0)
// if err != nil {
// return xerrors.Errorf("error creating visitor: %w", err)
// }
// defer visitor.Close() //nolint

// err = s.walkChain(curTs, boundaryEpoch, epoch+1, // we don't load messages/receipts in warmup
// visitor,
// func(c cid.Cid) error {
// if isUnitaryObject(c) {
// return errStopWalk
// }

// atomic.AddInt64(count, 1)

// has, err := s.hot.Has(s.ctx, c)
// if err != nil {
// return err
// }

// if has {
// return nil
// }

// blk, err := s.cold.Get(s.ctx, c)
// if err != nil {
// if ipld.IsNotFound(err) {
// atomic.AddInt64(missing, 1)
// return errStopWalk
// }
// return err
// }

// atomic.AddInt64(xcount, 1)

// mx.Lock()
// batchHot = append(batchHot, blk)
// if len(batchHot) == batchSize {
// err = s.hot.PutMany(s.ctx, batchHot)
// if err != nil {
// mx.Unlock()
// return err
// }
// batchHot = batchHot[:0]
// }
// mx.Unlock()

// return nil
// }, func(cid.Cid) error { return nil })

// if err != nil {
// return err
// }

// if len(batchHot) > 0 {
// err = s.hot.PutMany(s.ctx, batchHot)
// if err != nil {
// return err
// }
// }

// log.Infow("warmup stats", "visited", *count, "warm", *xcount, "missing", *missing)

// s.markSetSize = *count + *count>>2 // overestimate a bit
// err = s.ds.Put(s.ctx, markSetSizeKey, int64ToBytes(s.markSetSize))
// if err != nil {
// log.Warnf("error saving mark set size: %s", err)
// }

// // save the warmup epoch
// err = s.ds.Put(s.ctx, warmupEpochKey, epochToBytes(epoch))
// if err != nil {
// return xerrors.Errorf("error saving warm up epoch: %w", err)
// }

// s.warmupEpoch.Store(int64(epoch))

// // also save the compactionIndex, as this is used as an indicator of warmup for upgraded nodes
// err = s.ds.Put(s.ctx, compactionIndexKey, int64ToBytes(s.compactionIndex))
// if err != nil {
// return xerrors.Errorf("error saving compaction index: %w", err)
// }

// return nil
// }
2 changes: 1 addition & 1 deletion cmd/lotus/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
}
defer lr.Close() //nolint:errcheck

bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
bs, err := lr.Blockstore(ctx, repo.HotBlockstore)
if err != nil {
return xerrors.Errorf("failed to open blockstore: %w", err)
}
Expand Down
27 changes: 21 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ require (
github.com/elastic/go-sysinfo v1.7.0
github.com/elastic/gosigar v0.14.3
github.com/etclabscore/go-openrpc-reflect v0.0.36
github.com/fatih/color v1.15.0
github.com/fatih/color v1.16.0
github.com/filecoin-project/filecoin-ffi v1.31.0
github.com/filecoin-project/go-address v1.2.0
github.com/filecoin-project/go-amt-ipld/v4 v4.4.0
Expand Down Expand Up @@ -83,7 +83,7 @@ require (
github.com/hashicorp/golang-lru/arc/v2 v2.0.7
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/icza/backscanner v0.0.0-20210726202459-ac2ffc679f94
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c
github.com/invopop/jsonschema v0.12.0
github.com/ipfs/bbloom v0.0.4
github.com/ipfs/boxo v0.20.0
Expand Down Expand Up @@ -136,7 +136,7 @@ require (
github.com/stretchr/testify v1.10.0
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // dependency-check-ignore: unknown
github.com/triplewz/poseidon v0.0.2
github.com/urfave/cli/v2 v2.25.5
github.com/urfave/cli/v2 v2.25.7
github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba
github.com/whyrusleeping/cbor-gen v0.2.0
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7
Expand Down Expand Up @@ -211,7 +211,7 @@ require (
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.5 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/go-openapi/jsonpointer v0.19.3 // indirect
github.com/go-openapi/jsonreference v0.19.4 // indirect
github.com/go-openapi/swag v0.19.11 // indirect
Expand All @@ -221,7 +221,7 @@ require (
github.com/golang/glog v1.2.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20241017200806-017d972448fc // indirect
Expand Down Expand Up @@ -318,7 +318,7 @@ require (
github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66 // indirect; dependency-check-ignore: unknown
github.com/rivo/uniseg v0.4.7 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shirou/gopsutil v2.18.12+incompatible // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/smartystreets/assertions v1.13.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
Expand Down Expand Up @@ -356,3 +356,18 @@ require (
lukechampine.com/blake3 v1.3.0 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
)

require github.com/ethereum/go-ethereum v1.14.12

require (
github.com/crate-crypto/go-ipa v0.0.0-20240223125850-b1e8a79f509c // indirect
github.com/crate-crypto/go-kzg-4844 v1.0.0 // indirect
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
github.com/ethereum/c-kzg-4844 v1.0.0 // indirect
github.com/ethereum/go-verkle v0.1.1-0.20240829091221-dffa7562dbe9 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/holiman/uint256 v1.3.1 // indirect
github.com/supranational/blst v0.3.13 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
)
Loading

0 comments on commit f593bb1

Please sign in to comment.