diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 824e927726e..1947eac0a3c 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -1,6 +1,7 @@ package badgerbs import ( + "bytes" "context" "fmt" "io" @@ -35,26 +36,28 @@ type supportedMultihash struct { // hardcoded hash list for now // justification 🧵 https://filecoinproject.slack.com/archives/CRK2LKYHW/p1711381656211189?thread_ts=1711264671.316169&cid=CRK2LKYHW -var supportedMultihashes = map[string]supportedMultihash{ - "\xA0\xE4\x02\x20": { - cid.NewPrefixV1(uint64(multicodec.Raw), uint64(multicodec.Blake2b256)), - 1, - }, - "\x12\x20": { - cid.NewPrefixV1(uint64(multicodec.Raw), uint64(multicodec.Sha2_256)), - 0, - }, -} +var ( + supportedMultihashes = map[string]supportedMultihash{ + "\xA0\xE4\x02\x20": { + cid.NewPrefixV1(uint64(multicodec.Raw), uint64(multicodec.Blake2b256)), + 1, + }, + "\x12\x20": { + cid.NewPrefixV1(uint64(multicodec.Raw), uint64(multicodec.Sha2_256)), + 0, + }, + } + + tryOrder = []string{"\xA0\xE4\x02\x20", "\x12\x20"} +) const ( supportedHashLen = 256 / 8 mhJournalFilename = "MultiHashes.bin" mhJournalRecordLen = 1 + supportedHashLen // journalShortCode prefix + 256 bits hash -) -var ( - // KeyPool is the buffer pool we use to compute storage keys. - KeyPool *pool.BufferPool = pool.GlobalPool + binkeyBits = 128 // could I safely drop this to 96? 80? at 26bil blocks every byte counts... + binkeyLen = binkeyBits / 8 ) var ( @@ -83,6 +86,9 @@ type Options struct { // Prefix is an optional prefix to prepend to keys. Default: "". Prefix string + + // Whether to check for legacy base32-encoded keys on Get/Delete + QueryLegacyKeys bool } func DefaultOptions(path string) Options { @@ -488,6 +494,27 @@ func symlink(path, linkTo string) error { return os.Symlink(path, linkTo) } +func findCidForPartialKV(kv *badgerstruct.KV) (c cid.Cid, smh supportedMultihash, err error) { + // this is so so SO nasty... 🤮 + for _, pref := range tryOrder { + trySmh := supportedMultihashes[pref] + c, err = trySmh.cidMaker.Sum(kv.Value) + if err != nil { + return + } + h := c.Hash() + if bytes.Equal(kv.Key, h[len(h)-supportedHashLen:len(h)-supportedHashLen+binkeyLen]) { + smh = trySmh + break + } + c = cid.Undef + } + if !c.Defined() { + err = xerrors.Errorf("none of the available mutihashers produced a hash starting with 0x%X", kv.Key) + } + return +} + // doCopy copies a badger blockstore to another func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB, jrnlFh io.Writer) (defErr error) { batch := to.NewWriteBatch() @@ -510,21 +537,47 @@ func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB, jrnlFh io. defer pool.Put(jrnlSlab) jrnl := jrnlSlab[:0] - mhBuf := pool.Get(varint.MaxLenUvarint63 + supportedHashLen) - defer pool.Put(mhBuf) - for _, kv := range kvs { - n, err := base32.RawStdEncoding.Decode(mhBuf, kv.Key[b.prefixLen:]) + if len(kv.Key) == binkeyLen { + // nasty way to recreate the hash from the payload alone 🤮 + // worth it however for maintaining journal consistency + c, smh, err := findCidForPartialKV(kv) + if err != nil { + return err + } + + if err := batch.Set(kv.Key, kv.Value); err != nil { + return err + } + + // add a journal record + // NOTE: this could very well result in duplicates + // there isn't much we can do about this right now... + mh := c.Hash() + jrnl = append(jrnl, smh.journalShortCode) + jrnl = append(jrnl, mh[len(mh)-supportedHashLen:]...) + + continue + } + + // this is a legacy key: remake it regardless of b.opts.QueryLegacyKeys + // can not use a pooled decode buffer, as it gets held by the badger batch + mh := make([]byte, varint.MaxLenUvarint63+supportedHashLen) + n, err := base32.RawStdEncoding.Decode(mh, kv.Key[b.prefixLen:]) if err != nil { return xerrors.Errorf("undecodeable key 0x%X: %s", kv.Key[b.prefixLen:], err) } - smh, err := isMultihashSupported(mhBuf[:n]) + mh = mh[:n:n] + smh, err := isMultihashSupported(mh) if err != nil { return xerrors.Errorf("unsupported multihash for key 0x%X: %w", kv.Key[b.prefixLen:], err) } - if err := batch.Set(kv.Key, kv.Value); err != nil { + if err := batch.Set( + mh[n-supportedHashLen:n-supportedHashLen+binkeyLen:n-supportedHashLen+binkeyLen], // we checked the multihash digest is hashLen (256bits) long + kv.Value, + ); err != nil { return err } @@ -532,7 +585,7 @@ func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB, jrnlFh io. // NOTE: this could very well result in duplicates // there isn't much we can do about this right now... jrnl = append(jrnl, smh.journalShortCode) - jrnl = append(jrnl, mhBuf[n-supportedHashLen:n]...) + jrnl = append(jrnl, mh[len(mh)-supportedHashLen:]...) } if _, err := jrnlFh.Write(jrnl); err != nil { @@ -754,12 +807,23 @@ func isMultihashSupported(mh []byte) (supportedMultihash, error) { } // badgerGet is a basic tri-state: value+nil nil+nil nil+err -func badgerGet(t *badger.Txn, k []byte) (*valueItem, error) { - switch item, err := t.Get(k); err { +func (b *Blockstore) badgerGet(t *badger.Txn, mk badgerMultiKey) (*valueItem, error) { + switch item, err := t.Get(mk.binKey()); err { case nil: - return &valueItem{item}, nil + return &valueItem{item, mk.binKey()}, nil case badger.ErrKeyNotFound: - return nil, nil + if !b.opts.QueryLegacyKeys { + return nil, nil + } + // try again with legacy ... + switch item, err := t.Get(mk.legacyKey()); err { + case nil: + return &valueItem{item, mk.legacyKey()}, nil + case badger.ErrKeyNotFound: + return nil, nil + default: + return nil, err + } default: return nil, err } @@ -767,6 +831,40 @@ func badgerGet(t *badger.Txn, k []byte) (*valueItem, error) { type valueItem struct { badgerItem *badger.Item + currentKey []byte +} + +func (vi *valueItem) size() (int, error) { + return int(vi.badgerItem.ValueSize()), nil +} +func (vi *valueItem) block(c cid.Cid) (blocks.Block, error) { + payload, err := vi.badgerItem.ValueCopy(nil) + if err != nil { + return nil, err + } + if err := checkHash(c, payload, vi.currentKey); err != nil { + return nil, err + } + return blocks.NewBlockWithCid(payload, c) +} +func (vi *valueItem) view(c cid.Cid, f func(val []byte) error) error { + return vi.badgerItem.Value(func(payload []byte) error { + if err := checkHash(c, payload, vi.currentKey); err != nil { + return err + } + return f(payload) + }) +} + +func checkHash(c cid.Cid, b, k []byte) error { + rehash, err := c.Prefix().Sum(b) + if err != nil { + return err + } + if !rehash.Equals(c) { + return xerrors.Errorf("multihash mismatch for cid %s (badger key 0x%X): value hashes to 0x%X, but expected multihash 0x%X", c, k, []byte(rehash.Hash()), []byte(c.Hash())) + } + return nil } // View implements blockstore.Viewer, which leverages zero-copy read-only @@ -780,19 +878,20 @@ func (b *Blockstore) View(ctx context.Context, c cid.Cid, fn func([]byte) error) b.lockDB() defer b.unlockDB() - k, pooled := b.PooledStorageKey(c) - if pooled { - defer KeyPool.Put(k) + mk, err := b.pooledMultiKey(c) + if err != nil { + return err } + defer pool.Put(mk) return b.db.View(func(txn *badger.Txn) error { - val, err := badgerGet(txn, k) + val, err := b.badgerGet(txn, mk) if err != nil { return fmt.Errorf("failed to view block from badger blockstore: %w", err) } else if val == nil { return ipld.ErrNotFound{Cid: c} } - return val.badgerItem.Value(fn) + return val.view(c, fn) }) } @@ -837,14 +936,15 @@ func (b *Blockstore) Has(ctx context.Context, c cid.Cid) (bool, error) { b.lockDB() defer b.unlockDB() - k, pooled := b.PooledStorageKey(c) - if pooled { - defer KeyPool.Put(k) + mk, err := b.pooledMultiKey(c) + if err != nil { + return false, err } + defer pool.Put(mk) var canHaz bool - err := b.db.View(func(txn *badger.Txn) error { - val, err := badgerGet(txn, k) + err = b.db.View(func(txn *badger.Txn) error { + val, err := b.badgerGet(txn, mk) if val != nil { canHaz = true } @@ -871,27 +971,24 @@ func (b *Blockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { b.lockDB() defer b.unlockDB() - k, pooled := b.PooledStorageKey(c) - if pooled { - defer KeyPool.Put(k) + mk, err := b.pooledMultiKey(c) + if err != nil { + return nil, err } + defer pool.Put(mk) - var buf []byte - - if err := b.db.View(func(txn *badger.Txn) error { - val, err := badgerGet(txn, k) + var blk blocks.Block + err = b.db.View(func(txn *badger.Txn) error { + val, err := b.badgerGet(txn, mk) if err != nil { return fmt.Errorf("failed to get block from badger blockstore: %w", err) } else if val == nil { return ipld.ErrNotFound{Cid: c} } - buf, err = val.badgerItem.ValueCopy(nil) + blk, err = val.block(c) return err - }); err != nil { - return nil, err - } - - return blocks.NewBlockWithCid(buf, c) + }) + return blk, err } // GetSize implements Blockstore.GetSize. @@ -904,14 +1001,15 @@ func (b *Blockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { b.lockDB() defer b.unlockDB() - k, pooled := b.PooledStorageKey(c) - if pooled { - defer KeyPool.Put(k) + mk, err := b.pooledMultiKey(c) + if err != nil { + return -1, err } + defer pool.Put(mk) size := -1 - err := b.db.View(func(txn *badger.Txn) error { - val, err := badgerGet(txn, k) + err = b.db.View(func(txn *badger.Txn) error { + val, err := b.badgerGet(txn, mk) if err != nil { return fmt.Errorf("failed to get block size from badger blockstore: %w", err) @@ -919,8 +1017,8 @@ func (b *Blockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { return ipld.ErrNotFound{Cid: c} } - size = int(val.badgerItem.ValueSize()) - return nil + size, err = val.size() + return err }) return size, err @@ -941,26 +1039,24 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { b.lockDB() defer b.unlockDB() - // toReturn tracks the byte slices to return to the pool, if we're using key - // prefixing. we can't return each slice to the pool after each Set, because - // badger holds on to the slice. - var toReturn [][]byte - if b.prefixing { - toReturn = make([][]byte, 0, len(blocks)) - defer func() { - for _, b := range toReturn { - KeyPool.Put(b) - } - }() + type kv struct { + mk badgerMultiKey + val []byte } - keys := make([][]byte, len(blocks)) - for i, block := range blocks { - k, pooled := b.PooledStorageKey(block.Cid()) - if pooled { - toReturn = append(toReturn, k) + // kvs/defer() must be declared before (thus happen after) the NewWriteBatch below + kvs := make([]kv, 0, len(blocks)) + defer func() { + for _, kv := range kvs { + pool.Put(kv.mk) + } + }() + for i := range blocks { + mk, err := b.pooledMultiKey(blocks[i].Cid()) + if err != nil { + return err } - keys[i] = k + kvs = append(kvs, kv{mk: mk}) } jrnlSlab := pool.Get(len(blocks) * mhJournalRecordLen) @@ -968,15 +1064,12 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { jrnl := jrnlSlab[:0] if err := b.db.View(func(txn *badger.Txn) error { - for i, k := range keys { - val, err := badgerGet(txn, k) + for i := range kvs { + val, err := b.badgerGet(txn, kvs[i].mk) if err != nil { // Something is actually wrong return err - } else if val != nil { - // Already have it - keys[i] = nil - } else { + } else if val == nil { // Got to insert that, check it is supported, write journal mh := blocks[i].Cid().Hash() smh, err := isMultihashSupported(mh) @@ -984,6 +1077,8 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { return xerrors.Errorf("unsupported multihash for cid %s: %w", blocks[i].Cid(), err) } + kvs[i].val = blocks[i].RawData() + // add a journal record jrnl = append(jrnl, smh.journalShortCode) jrnl = append(jrnl, mh[len(mh)-supportedHashLen:]...) @@ -998,14 +1093,11 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { batch := db.NewWriteBatch() defer batch.Cancel() - for i, block := range blocks { - k := keys[i] - if k == nil { - // skipped because we already have it. - continue - } - if err := batch.Set(k, block.RawData()); err != nil { - return err + for i := range kvs { + if kvs[i].val != nil { + if err := batch.Set(kvs[i].mk.binKey(), kvs[i].val); err != nil { + return err + } } } @@ -1047,88 +1139,90 @@ func (b *Blockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error { b.lockDB() defer b.unlockDB() - // toReturn tracks the byte slices to return to the pool, if we're using key - // prefixing. we can't return each slice to the pool after each Set, because - // badger holds on to the slice. - var toReturn [][]byte - if b.prefixing { - toReturn = make([][]byte, 0, len(cids)) - defer func() { - for _, b := range toReturn { - KeyPool.Put(b) + // keys/defer() must be declared before (thus happen after) the NewWriteBatch below + multikeys := make([]badgerMultiKey, len(cids)) + defer func() { + for _, mk := range multikeys { + if mk != nil { + pool.Put(mk) } - }() + } + }() + var err error + for i := range cids { + multikeys[i], err = b.pooledMultiKey(cids[i]) + if err != nil { + return err + } } batch := b.db.NewWriteBatch() defer batch.Cancel() - for _, cid := range cids { - k, pooled := b.PooledStorageKey(cid) - if pooled { - toReturn = append(toReturn, k) - } - if err := batch.Delete(k); err != nil { - return err + // only delete keys we are known to have + // blindly calling batch.Delete() clutters the logs with delete markers + if err := b.db.View(func(txn *badger.Txn) error { + for _, mk := range multikeys { + + variants := append( + make([][]byte, 0, 2), + mk.binKey(), + ) + if b.opts.QueryLegacyKeys { + variants = append(variants, mk.legacyKey()) + } + + for _, k := range variants { + _, err := txn.Get(k) + if err == badger.ErrKeyNotFound { + continue + } else if err != nil { + return err + } + // key does exist + if err := batch.Delete(k); err != nil { + return err + } + } } - } - err := batch.Flush() - if err != nil { - err = fmt.Errorf("failed to delete blocks from badger blockstore: %w", err) + return batch.Flush() + }); err != nil { + return fmt.Errorf("failed to delete blocks from badger blockstore: %w", err) } - return err + + return nil } // AllKeysChan implements Blockstore.AllKeysChan. +var EnableHorriblyInefficientAllKeysChanMethod bool + func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { - if err := b.access(); err != nil { - return nil, err + + if !EnableHorriblyInefficientAllKeysChanMethod { + return nil, xerrors.New("method AllKeysChan() is not supported, use ForEachKey() instead") } - b.lockDB() - defer b.unlockDB() + // reimplement to use strictly as test of the below + // this is useless in production anyway 🫠 + cids := make([]cid.Cid, 0, 1<<20) - txn := b.db.NewTransaction(false) - opts := badger.IteratorOptions{PrefetchSize: 100} - if b.prefixing { - opts.Prefix = b.prefix + if err := b.ForEachKey(func(c cid.Cid) error { + cids = append(cids, c) + return nil + }); err != nil { + return nil, err } - iter := txn.NewIterator(opts) ch := make(chan cid.Cid) go func() { - defer b.viewers.Done() defer close(ch) - defer iter.Close() - - // NewCidV1 makes a copy of the multihash buffer, so we can reuse it to - // contain allocs. - var buf []byte - for iter.Rewind(); iter.Valid(); iter.Next() { - if ctx.Err() != nil { - return // context has fired. - } - if !b.isOpen() { - // open iterators will run even after the database is closed... - return // closing, yield. - } - k := iter.Item().Key() - if b.prefixing { - k = k[b.prefixLen:] - } - - if reqlen := base32.RawStdEncoding.DecodedLen(len(k)); len(buf) < reqlen { - buf = make([]byte, reqlen) - } - if n, err := base32.RawStdEncoding.Decode(buf, k); err == nil { - select { - case ch <- cid.NewCidV1(cid.Raw, buf[:n]): - case <-ctx.Done(): - return - } - } else { - log.Warnf("failed to decode key %s in badger AllKeysChan; err: %s", k, err) + for _, c := range cids { + select { + case <-ctx.Done(): + return + case ch <- c: + // keep looping } } }() @@ -1138,6 +1232,10 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { // Implementation of BlockstoreIterator interface func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error { + // unfortunately this is used in splitstore, so have to reimplement horribly 🫠 + mhBuf := pool.Get(varint.MaxLenUvarint63 + supportedHashLen) + defer pool.Put(mhBuf) + if err := b.access(); err != nil { return err } @@ -1146,100 +1244,84 @@ func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error { b.lockDB() defer b.unlockDB() - txn := b.db.NewTransaction(false) - defer txn.Discard() - - opts := badger.IteratorOptions{PrefetchSize: 100} - if b.prefixing { - opts.Prefix = b.prefix - } - - iter := txn.NewIterator(opts) - defer iter.Close() + var err error + var c cid.Cid - var buf []byte - for iter.Rewind(); iter.Valid(); iter.Next() { + return iterateBadger(context.Background(), b.db, func(kvs []*badgerstruct.KV) error { if !b.isOpen() { return ErrBlockstoreClosed } - k := iter.Item().Key() - if b.prefixing { - k = k[b.prefixLen:] - } - - klen := base32.RawStdEncoding.DecodedLen(len(k)) - if klen > len(buf) { - buf = make([]byte, klen) - } - - n, err := base32.RawStdEncoding.Decode(buf, k) - if err != nil { - return err - } - - c := cid.NewCidV1(cid.Raw, buf[:n]) + for _, kv := range kvs { + if len(kv.Key) != binkeyLen { + if !b.opts.QueryLegacyKeys { + continue + } + // this is a legacy key: just use it + n, err := base32.RawStdEncoding.Decode(mhBuf, kv.Key[b.prefixLen:]) + if err != nil { + return err + } + c = cid.NewCidV1(uint64(multicodec.Raw), mhBuf[:n]) + } else { + c, _, err = findCidForPartialKV(kv) + if err != nil { + return err + } + } - err = f(c) - if err != nil { - return err + if err := f(c); err != nil { + return err + } } - } - return nil + return nil + }) } // HashOnRead implements Blockstore.HashOnRead. It is not supported by this // blockstore. -func (b *Blockstore) HashOnRead(_ bool) { - log.Warnf("called HashOnRead on badger blockstore; function not supported; ignoring") +func (b *Blockstore) HashOnRead(t bool) { + if !t { + log.Warnf("attempt to disable HashOnRead on badger blockstore: this is always unconditionally enabled, ignoring") + } } -// PooledStorageKey returns the storage key under which this CID is stored. -// -// The key is: prefix + base32_no_padding(cid.Hash) -// -// This method may return pooled byte slice, which MUST be returned to the -// KeyPool if pooled=true, or a leak will occur. -func (b *Blockstore) PooledStorageKey(c cid.Cid) (key []byte, pooled bool) { - h := c.Hash() - size := base32.RawStdEncoding.EncodedLen(len(h)) - if !b.prefixing { // optimize for branch prediction. - k := pool.Get(size) - base32.RawStdEncoding.Encode(k, h) - return k, true // slicing upto length unnecessary; the pool has already done this. - } - - size += b.prefixLen - k := pool.Get(size) - copy(k, b.prefix) - base32.RawStdEncoding.Encode(k[b.prefixLen:], h) - return k, true // slicing upto length unnecessary; the pool has already done this. -} +type badgerMultiKey []byte -// Storage acts like PooledStorageKey, but attempts to write the storage key -// into the provided slice. If the slice capacity is insufficient, it allocates -// a new byte slice with enough capacity to accommodate the result. This method -// returns the resulting slice. -func (b *Blockstore) StorageKey(dst []byte, c cid.Cid) []byte { - h := c.Hash() - reqsize := base32.RawStdEncoding.EncodedLen(len(h)) + b.prefixLen - if reqsize > cap(dst) { - // passed slice is smaller than required size; create new. - dst = make([]byte, reqsize) - } else if reqsize > len(dst) { - // passed slice has enough capacity, but its length is - // restricted, expand. - dst = dst[:cap(dst)] - } - - if b.prefixing { // optimize for branch prediction. - copy(dst, b.prefix) - base32.RawStdEncoding.Encode(dst[b.prefixLen:], h) - } else { - base32.RawStdEncoding.Encode(dst, h) +func (bk badgerMultiKey) binKey() []byte { return bk[:binkeyLen] } +func (bk badgerMultiKey) legacyKey() []byte { return bk[binkeyLen:] } + +// returns an amalgam binary/legacy key to be used for the necessary lookups +func (b *Blockstore) pooledMultiKey(c cid.Cid) (badgerMultiKey, error) { + mh := c.Hash() + + if _, err := isMultihashSupported(mh); err != nil { + return nil, err + } + binkey := mh[len(mh)-supportedHashLen : len(mh)-supportedHashLen+binkeyLen] + + if !b.opts.QueryLegacyKeys { + mk := pool.Get(binkeyLen) + copy(mk, binkey) + return mk, nil } - return dst[:reqsize] + + legacyLen := base32.RawStdEncoding.EncodedLen(len(mh)) + + if !b.prefixing { + mk := pool.Get(binkeyLen + legacyLen) + copy(mk, binkey) + base32.RawStdEncoding.Encode(mk[binkeyLen:], mh) + return mk, nil + } + + mk := pool.Get(binkeyLen + b.prefixLen + legacyLen) + copy(mk, binkey) + copy(mk[binkeyLen:], b.prefix) + base32.RawStdEncoding.Encode(mk[binkeyLen+b.prefixLen:], mh) + + return mk, nil } // this method is added for lotus-shed needs diff --git a/blockstore/badger/blockstore_test.go b/blockstore/badger/blockstore_test.go index d253f37d95d..d96591f839a 100644 --- a/blockstore/badger/blockstore_test.go +++ b/blockstore/badger/blockstore_test.go @@ -12,7 +12,6 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "github.com/filecoin-project/lotus/blockstore" @@ -38,40 +37,6 @@ func TestBadgerBlockstore(t *testing.T) { }).RunTests(t, "prefixed") } -func TestStorageKey(t *testing.T) { - //stm: @SPLITSTORE_BADGER_OPEN_001, @SPLITSTORE_BADGER_CLOSE_001 - //stm: @SPLITSTORE_BADGER_STORAGE_KEY_001 - bs, _ := newBlockstore(DefaultOptions)(t) - bbs := bs.(*Blockstore) - defer bbs.Close() //nolint:errcheck - - cid1 := blocks.NewBlock([]byte("some data")).Cid() - cid2 := blocks.NewBlock([]byte("more data")).Cid() - cid3 := blocks.NewBlock([]byte("a little more data")).Cid() - require.NotEqual(t, cid1, cid2) // sanity check - require.NotEqual(t, cid2, cid3) // sanity check - - // nil slice; let StorageKey allocate for us. - k1 := bbs.StorageKey(nil, cid1) - require.Len(t, k1, 55) - require.True(t, cap(k1) == len(k1)) - - // k1's backing array is reused. - k2 := bbs.StorageKey(k1, cid2) - require.Len(t, k2, 55) - require.True(t, cap(k2) == len(k1)) - - // bring k2 to len=0, and verify that its backing array gets reused - // (i.e. k1 and k2 are overwritten) - k3 := bbs.StorageKey(k2[:0], cid3) - require.Len(t, k3, 55) - require.True(t, cap(k3) == len(k3)) - - // backing array of k1 and k2 has been modified, i.e. memory is shared. - require.Equal(t, k3, k1) - require.Equal(t, k3, k2) -} - func newBlockstore(optsSupplier func(path string) Options) func(tb testing.TB) (bs blockstore.BasicBlockstore, path string) { return func(tb testing.TB) (bs blockstore.BasicBlockstore, path string) { tb.Helper() diff --git a/blockstore/badger/blockstore_test_suite.go b/blockstore/badger/blockstore_test_suite.go index 480f5d793f3..a50e550ba63 100644 --- a/blockstore/badger/blockstore_test_suite.go +++ b/blockstore/badger/blockstore_test_suite.go @@ -18,6 +18,10 @@ import ( "github.com/filecoin-project/lotus/blockstore" ) +func init() { + EnableHorriblyInefficientAllKeysChanMethod = true +} + // TODO: move this to go-ipfs-blockstore. type Suite struct { NewBlockstore func(tb testing.TB) (bs blockstore.BasicBlockstore, path string) @@ -214,7 +218,7 @@ func (s *Suite) TestAllKeysRespectsContext(t *testing.T) { cancel() // pull one value out to avoid race - _, _ = <-ch + <-ch v, ok = <-ch require.Equal(t, cid.Undef, v) diff --git a/node/modules/chain.go b/node/modules/chain.go index 70cb9b58541..520a6b8ccf5 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -29,6 +29,7 @@ import ( "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" + "github.com/filecoin-project/lotus/system" ) // ChainBitswap uses a blockstore that bypasses all caches. @@ -87,7 +88,13 @@ func ChainStore(lc fx.Lifecycle, chain := store.NewChainStore(cbs, sbs, ds, weight, j) if err := chain.Load(helpers.LifecycleCtx(mctx, lc)); err != nil { - return nil, xerrors.Errorf("loading chain state from disk: %w", err) + if !system.BadgerQueryLegacyKeys { + err = xerrors.Errorf("loading chain state from disk ( !!! PERHAPS YOU NEED TO SET `LOTUS_CHAIN_BADGERSTORE_QUERY_LEGACY_KEYS` TO TRUE !!! ): %w", err) + + } else { + err = xerrors.Errorf("loading chain state from disk: %w", err) + } + return nil, err } var startHook func(context.Context) error diff --git a/node/repo/blockstore_opts.go b/node/repo/blockstore_opts.go index f0f8d707847..f0de388e68c 100644 --- a/node/repo/blockstore_opts.go +++ b/node/repo/blockstore_opts.go @@ -5,6 +5,7 @@ import ( "strconv" badgerbs "github.com/filecoin-project/lotus/blockstore/badger" + "github.com/filecoin-project/lotus/system" ) // BadgerBlockstoreOptions returns the badger options to apply for the provided @@ -61,6 +62,13 @@ func BadgerBlockstoreOptions(domain BlockstoreDomain, path string, readonly bool } } + if system.BadgerFsyncDisable { + opts.SyncWrites = false + } + if system.BadgerQueryLegacyKeys { + opts.QueryLegacyKeys = true + } + return opts, nil } diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index 9cc2e4de965..c0146a362f9 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -26,7 +26,6 @@ import ( "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/storage/sealer/fsutil" "github.com/filecoin-project/lotus/storage/sealer/storiface" - "github.com/filecoin-project/lotus/system" ) const ( @@ -503,10 +502,6 @@ func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain return } - if system.BadgerFsyncDisable { - opts.SyncWrites = false - } - bs, err := badgerbs.Open(opts) if err != nil { fsr.bsErr = err diff --git a/system/io.go b/system/io.go index 80e2e16efdc..82ba51df660 100644 --- a/system/io.go +++ b/system/io.go @@ -5,7 +5,10 @@ import ( "strings" ) -var BadgerFsyncDisable bool +var ( + BadgerFsyncDisable bool + BadgerQueryLegacyKeys bool +) func init() { // Do not fsync badgers, it does not add value at this stage @@ -25,4 +28,11 @@ func init() { BadgerFsyncDisable = true } } + + if legacy, isSet := os.LookupEnv("LOTUS_CHAIN_BADGERSTORE_QUERY_LEGACY_KEYS"); isSet { + legacy = strings.ToLower(legacy) + if legacy != "" && legacy != "0" && legacy != "false" && legacy != "no" { + BadgerQueryLegacyKeys = true + } + } }