Skip to content

Commit

Permalink
Introduce flushing strategy to the database.
Browse files Browse the repository at this point in the history
We can estimate pretty accurately which transactions need a compaction
and which don't. The issue at hand is that very small transactions may
not flush quickly enough leading to reading of stale data.

This may need some additional tinkering but try this for the time being.
  • Loading branch information
marcopeereboom committed Feb 12, 2025
1 parent c44028d commit b4643e0
Showing 1 changed file with 102 additions and 25 deletions.
127 changes: 102 additions & 25 deletions database/tbcd/level/level.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ const (
verbose = false

bhsCanonicalTipKey = "canonicaltip"

heighthashSize = 8 + 1 + chainhash.HashSize
blockheaderSize = 120
keystoneSize = chainhash.HashSize + hemi.L2KeystoneAbrevSize
)

type IteratorError error
Expand Down Expand Up @@ -197,9 +201,60 @@ func New(ctx context.Context, cfg *Config) (*ldb, error) {
type (
discardFunc func()
commitFunc func() error

cacheStrategy int
)

func (l *ldb) startTransaction(db string) (*leveldb.Transaction, commitFunc, discardFunc, error) {
const (
csAlwaysFlush cacheStrategy = 0
csNeverFlush cacheStrategy = 1
)

// https://pkg.go.dev/github.com/syndtr/[email protected]/leveldb/opt#Options
// WriteBuffer = 4 MiB
// It would be cute if we could read this value from the underlying DB but no
// such luck. Since we use defaults we can use this for now.
var csFlushSize = 4 * 1024 * 1024 // default leveldb size

// blockheaderStrategy calculates if we should flush data to the underlying
// database.
// Commented out for now because this will be wildly expensive with millions
// of blocks.
//func blockheaderStrategy(bhs *wire.MsgHeaders) cacheStrategy {
// size := len(bhs.Headers) * blockheaderSize // encoded blockheader size
// if size < csFlushSize {
// return csAlwaysFlush
// }
// return csNeverFlush
//}

// heighthashStrategy calculates if we should flush data to the underlying
// database.
// Commented out for now because this will be wildly expensive with millions
// of blocks.
//func heighthashStrategy(bhs *wire.MsgHeaders) cacheStrategy {
// size := len(bhs.Headers) * heighthashSize // encoded heighthash size
// if size < csFlushSize {
// return csAlwaysFlush
// }
// return csNeverFlush
//}

func keystoneStrategy(direction int, bhs *wire.MsgHeaders) cacheStrategy {
switch direction {
case 1:
case -1:
default:
panic("invalid direction")
}
size := len(bhs.Headers) * heighthashSize // encoded heighthash size
if size < csFlushSize {
return csAlwaysFlush
}
return csNeverFlush
}

func (l *ldb) startTransaction(db string, strategy cacheStrategy) (*leveldb.Transaction, commitFunc, discardFunc, error) {
bhsDB := l.pool[db]
tx, err := bhsDB.OpenTransaction()
if err != nil {
Expand All @@ -217,9 +272,14 @@ func (l *ldb) startTransaction(db string) (*leveldb.Transaction, commitFunc, dis
if err := tx.Commit(); err != nil {
return fmt.Errorf("%v commit: %w", db, err)
}
// Always flush transaction to disk
if err := bhsDB.CompactRange(util.Range{}); err != nil {
return fmt.Errorf("%v compact: %w", db, err)

switch strategy {
case csNeverFlush:
case csAlwaysFlush:
// Always flush transaction to disk
if err := bhsDB.CompactRange(util.Range{}); err != nil {
return fmt.Errorf("%v compact: %w", db, err)
}
}
*discard = false
return nil
Expand Down Expand Up @@ -257,7 +317,7 @@ func (l *ldb) MetadataGet(ctx context.Context, key []byte) ([]byte, error) {

// Metadata transaction, we do this to simply lock the table.
// XXX reason if we can/want to remove the read lock.
mdDB, _, mdDiscard, err := l.startTransaction(level.MetadataDB)
mdDB, _, mdDiscard, err := l.startTransaction(level.MetadataDB, csNeverFlush)
if err != nil {
return nil, fmt.Errorf("metadata open db transaction: %w", err)
}
Expand All @@ -278,7 +338,7 @@ func (l *ldb) MetadataBatchGet(ctx context.Context, allOrNone bool, keys [][]byt
defer log.Tracef("MetadataGet exit")

// Metadata transaction, we do this to simply lock the table.
mdDB, _, mdDiscard, err := l.startTransaction(level.MetadataDB)
mdDB, _, mdDiscard, err := l.startTransaction(level.MetadataDB, csNeverFlush)
if err != nil {
return nil, fmt.Errorf("metadata open db transaction: %w", err)
}
Expand Down Expand Up @@ -319,7 +379,8 @@ func (l *ldb) MetadataBatchPut(ctx context.Context, rows []tbcd.Row) error {
defer log.Tracef("MetadataBatchPut exit")

// Metadata transaction
mdDB, mdCommit, mdDiscard, err := l.startTransaction(level.MetadataDB)
mdDB, mdCommit, mdDiscard, err := l.startTransaction(level.MetadataDB,
csAlwaysFlush)
if err != nil {
return fmt.Errorf("metadata open db transaction: %w", err)
}
Expand All @@ -346,7 +407,8 @@ func (l *ldb) MetadataPut(ctx context.Context, key, value []byte) error {
defer log.Tracef("MetadataPut exit")

// Metadata transaction
mdDB, mdCommit, mdDiscard, err := l.startTransaction(level.MetadataDB)
mdDB, mdCommit, mdDiscard, err := l.startTransaction(level.MetadataDB,
csAlwaysFlush)
if err != nil {
return fmt.Errorf("metadata open db transaction: %w", err)
}
Expand Down Expand Up @@ -459,7 +521,7 @@ func heightHashToKey(height uint64, hash []byte) []byte {
if len(hash) != chainhash.HashSize {
panic(fmt.Sprintf("invalid hash size: %v", len(hash)))
}
key := make([]byte, 8+1+chainhash.HashSize)
key := make([]byte, heighthashSize)
binary.BigEndian.PutUint64(key[0:8], height)
copy(key[9:], hash)
return key
Expand All @@ -477,10 +539,10 @@ func keyToHeightHash(key []byte) (uint64, *chainhash.Hash) {
// encodeBlockHeader encodes a database block header as
// [height,header,difficulty] or [8+80+32] bytes. The hash is the leveldb table
// key.
func encodeBlockHeader(height uint64, header [80]byte, difficulty *big.Int) (ebhr [120]byte) {
func encodeBlockHeader(height uint64, header [80]byte, difficulty *big.Int) (ebhr [blockheaderSize]byte) {
binary.BigEndian.PutUint64(ebhr[0:8], height)
copy(ebhr[8:88], header[:])
difficulty.FillBytes(ebhr[88:120])
difficulty.FillBytes(ebhr[88:blockheaderSize])
return
}

Expand All @@ -502,7 +564,8 @@ func (l *ldb) BlockHeaderGenesisInsert(ctx context.Context, wbh *wire.BlockHeade
defer log.Tracef("BlockHeaderGenesisInsert exit")

// block headers
bhsTx, bhsCommit, bhsDiscard, err := l.startTransaction(level.BlockHeadersDB)
bhsTx, bhsCommit, bhsDiscard, err := l.startTransaction(level.BlockHeadersDB,
csAlwaysFlush)
if err != nil {
return fmt.Errorf("block header open transaction: %w", err)
}
Expand All @@ -519,14 +582,16 @@ func (l *ldb) BlockHeaderGenesisInsert(ctx context.Context, wbh *wire.BlockHeade
}

// blocks missing
bmTx, bmCommit, bmDiscard, err := l.startTransaction(level.BlocksMissingDB)
bmTx, bmCommit, bmDiscard, err := l.startTransaction(level.BlocksMissingDB,
csAlwaysFlush)
if err != nil {
return fmt.Errorf("blocks missing open transaction: %w", err)
}
defer bmDiscard()

// height hash
hhTx, hhCommit, hhDiscard, err := l.startTransaction(level.HeightHashDB)
hhTx, hhCommit, hhDiscard, err := l.startTransaction(level.HeightHashDB,
csAlwaysFlush)
if err != nil {
return fmt.Errorf("height hash open transaction: %w", err)
}
Expand Down Expand Up @@ -712,23 +777,26 @@ func (l *ldb) BlockHeadersRemove(ctx context.Context, bhs *wire.MsgHeaders, tipA
// happen inside the database transaction.

// Metadata
mdTx, mdCommit, mdDiscard, err := l.startTransaction(level.MetadataDB)
mdTx, mdCommit, mdDiscard, err := l.startTransaction(level.MetadataDB,
csAlwaysFlush)
if err != nil {
return tbcd.RTInvalid, nil,
fmt.Errorf("metadata open transaction: %w", err)
}
defer mdDiscard()

// Block headers
bhsTx, bhsCommit, bhsDiscard, err := l.startTransaction(level.BlockHeadersDB)
bhsTx, bhsCommit, bhsDiscard, err := l.startTransaction(level.BlockHeadersDB,
csNeverFlush)
if err != nil {
return tbcd.RTInvalid, nil,
fmt.Errorf("block headers remove: unable to start block headers leveldb transaction, err: %w", err)
}
defer bhsDiscard()

// height hash
hhTx, hhCommit, hhDiscard, err := l.startTransaction(level.HeightHashDB)
hhTx, hhCommit, hhDiscard, err := l.startTransaction(level.HeightHashDB,
csNeverFlush)
if err != nil {
return tbcd.RTInvalid, nil,
fmt.Errorf("block headers remove: unable to start height hash leveldb transaction, err: %w", err)
Expand Down Expand Up @@ -1020,15 +1088,17 @@ func (l *ldb) BlockHeadersInsert(ctx context.Context, bhs *wire.MsgHeaders, batc
}

// Metadata
mdTx, mdCommit, mdDiscard, err := l.startTransaction(level.MetadataDB)
mdTx, mdCommit, mdDiscard, err := l.startTransaction(level.MetadataDB,
csAlwaysFlush)
if err != nil {
return tbcd.ITInvalid, nil, nil, 0,
fmt.Errorf("metadata open transaction: %w", err)
}
defer mdDiscard()

// block headers
bhsTx, bhsCommit, bhsDiscard, err := l.startTransaction(level.BlockHeadersDB)
bhsTx, bhsCommit, bhsDiscard, err := l.startTransaction(level.BlockHeadersDB,
csNeverFlush)
if err != nil {
return tbcd.ITInvalid, nil, nil, 0,
fmt.Errorf("block headers open transaction: %w", err)
Expand Down Expand Up @@ -1068,15 +1138,17 @@ func (l *ldb) BlockHeadersInsert(ctx context.Context, bhs *wire.MsgHeaders, batc
}

// blocks missing
bmTx, bmCommit, bmDiscard, err := l.startTransaction(level.BlocksMissingDB)
bmTx, bmCommit, bmDiscard, err := l.startTransaction(level.BlocksMissingDB,
csNeverFlush)
if err != nil {
return tbcd.ITInvalid, nil, nil, 0,
fmt.Errorf("blocks missing open transaction: %w", err)
}
defer bmDiscard()

// height hash
hhTx, hhCommit, hhDiscard, err := l.startTransaction(level.HeightHashDB)
hhTx, hhCommit, hhDiscard, err := l.startTransaction(level.HeightHashDB,
csNeverFlush)
if err != nil {
return tbcd.ITInvalid, nil, nil, 0,
fmt.Errorf("height hash open transaction: %w", err)
Expand Down Expand Up @@ -1603,7 +1675,8 @@ func (l *ldb) BlockUtxoUpdate(ctx context.Context, direction int, utxos map[tbcd
}

// outputs
outsTx, outsCommit, outsDiscard, err := l.startTransaction(level.OutputsDB)
outsTx, outsCommit, outsDiscard, err := l.startTransaction(level.OutputsDB,
csNeverFlush)
if err != nil {
return fmt.Errorf("outputs open db transaction: %w", err)
}
Expand Down Expand Up @@ -1659,7 +1732,8 @@ func (l *ldb) BlockTxUpdate(ctx context.Context, direction int, txs map[tbcd.TxK
}

// transactions
txsTx, txsCommit, txsDiscard, err := l.startTransaction(level.TransactionsDB)
txsTx, txsCommit, txsDiscard, err := l.startTransaction(level.TransactionsDB,
csNeverFlush)
if err != nil {
return fmt.Errorf("transactions open db transaction: %w", err)
}
Expand Down Expand Up @@ -1732,7 +1806,7 @@ func (l *ldb) BlockTxUpdate(ctx context.Context, direction int, txs map[tbcd.TxK
// encodeKeystone encodes a database keystone as
// [blockhash,abbreviated keystone] or [32+76] bytes. The abbreviated keystone
// hash is the leveldb table key.
func encodeKeystone(ks tbcd.Keystone) (eks [chainhash.HashSize + hemi.L2KeystoneAbrevSize]byte) {
func encodeKeystone(ks tbcd.Keystone) (eks [keystoneSize]byte) {
copy(eks[0:32], ks.BlockHash[:])
copy(eks[32:], ks.AbbreviatedKeystone[:])
return
Expand Down Expand Up @@ -1764,7 +1838,10 @@ func (l *ldb) BlockKeystoneUpdate(ctx context.Context, direction int, keystones
}

// keystones
kssTx, kssCommit, kssDiscard, err := l.startTransaction(level.KeystonesDB)
// There are never going to be thousands of keystones on mainnet
// so always flush.
kssTx, kssCommit, kssDiscard, err := l.startTransaction(level.KeystonesDB,
csAlwaysFlush)
if err != nil {
return fmt.Errorf("keystones open db transaction: %w", err)
}
Expand Down

0 comments on commit b4643e0

Please sign in to comment.