diff --git a/database/tbcd/level/level.go b/database/tbcd/level/level.go index adb859b4..0b97520c 100644 --- a/database/tbcd/level/level.go +++ b/database/tbcd/level/level.go @@ -50,6 +50,10 @@ const ( verbose = false bhsCanonicalTipKey = "canonicaltip" + + heighthashSize = 8 + 1 + chainhash.HashSize + blockheaderSize = 120 + keystoneSize = chainhash.HashSize + hemi.L2KeystoneAbrevSize ) type IteratorError error @@ -197,9 +201,60 @@ func New(ctx context.Context, cfg *Config) (*ldb, error) { type ( discardFunc func() commitFunc func() error + + cacheStrategy int ) -func (l *ldb) startTransaction(db string) (*leveldb.Transaction, commitFunc, discardFunc, error) { +const ( + csAlwaysFlush cacheStrategy = 0 + csNeverFlush cacheStrategy = 1 +) + +// https://pkg.go.dev/github.com/syndtr/goleveldb@v1.0.0/leveldb/opt#Options +// WriteBuffer = 4 MiB +// It would be cute if we could read this value from the underlying DB but no +// such luck. Since we use defaults we can use this for now. +var csFlushSize = 4 * 1024 * 1024 // default leveldb size + +// blockheaderStrategy calculates if we should flush data to the underlying +// database. +// Commented out for now because this will be wildly expensive with millions +// of blocks. +//func blockheaderStrategy(bhs *wire.MsgHeaders) cacheStrategy { +// size := len(bhs.Headers) * blockheaderSize // encoded blockheader size +// if size < csFlushSize { +// return csAlwaysFlush +// } +// return csNeverFlush +//} + +// heighthashStrategy calculates if we should flush data to the underlying +// database. +// Commented out for now because this will be wildly expensive with millions +// of blocks. +//func heighthashStrategy(bhs *wire.MsgHeaders) cacheStrategy { +// size := len(bhs.Headers) * heighthashSize // encoded heighthash size +// if size < csFlushSize { +// return csAlwaysFlush +// } +// return csNeverFlush +//} + +func keystoneStrategy(direction int, bhs *wire.MsgHeaders) cacheStrategy { + switch direction { + case 1: + case -1: + default: + panic("invalid direction") + } + size := len(bhs.Headers) * heighthashSize // encoded heighthash size + if size < csFlushSize { + return csAlwaysFlush + } + return csNeverFlush +} + +func (l *ldb) startTransaction(db string, strategy cacheStrategy) (*leveldb.Transaction, commitFunc, discardFunc, error) { bhsDB := l.pool[db] tx, err := bhsDB.OpenTransaction() if err != nil { @@ -217,9 +272,14 @@ func (l *ldb) startTransaction(db string) (*leveldb.Transaction, commitFunc, dis if err := tx.Commit(); err != nil { return fmt.Errorf("%v commit: %w", db, err) } - // Always flush transaction to disk - if err := bhsDB.CompactRange(util.Range{}); err != nil { - return fmt.Errorf("%v compact: %w", db, err) + + switch strategy { + case csNeverFlush: + case csAlwaysFlush: + // Always flush transaction to disk + if err := bhsDB.CompactRange(util.Range{}); err != nil { + return fmt.Errorf("%v compact: %w", db, err) + } } *discard = false return nil @@ -257,7 +317,7 @@ func (l *ldb) MetadataGet(ctx context.Context, key []byte) ([]byte, error) { // Metadata transaction, we do this to simply lock the table. // XXX reason if we can/want to remove the read lock. - mdDB, _, mdDiscard, err := l.startTransaction(level.MetadataDB) + mdDB, _, mdDiscard, err := l.startTransaction(level.MetadataDB, csNeverFlush) if err != nil { return nil, fmt.Errorf("metadata open db transaction: %w", err) } @@ -278,7 +338,7 @@ func (l *ldb) MetadataBatchGet(ctx context.Context, allOrNone bool, keys [][]byt defer log.Tracef("MetadataGet exit") // Metadata transaction, we do this to simply lock the table. - mdDB, _, mdDiscard, err := l.startTransaction(level.MetadataDB) + mdDB, _, mdDiscard, err := l.startTransaction(level.MetadataDB, csNeverFlush) if err != nil { return nil, fmt.Errorf("metadata open db transaction: %w", err) } @@ -319,7 +379,8 @@ func (l *ldb) MetadataBatchPut(ctx context.Context, rows []tbcd.Row) error { defer log.Tracef("MetadataBatchPut exit") // Metadata transaction - mdDB, mdCommit, mdDiscard, err := l.startTransaction(level.MetadataDB) + mdDB, mdCommit, mdDiscard, err := l.startTransaction(level.MetadataDB, + csAlwaysFlush) if err != nil { return fmt.Errorf("metadata open db transaction: %w", err) } @@ -346,7 +407,8 @@ func (l *ldb) MetadataPut(ctx context.Context, key, value []byte) error { defer log.Tracef("MetadataPut exit") // Metadata transaction - mdDB, mdCommit, mdDiscard, err := l.startTransaction(level.MetadataDB) + mdDB, mdCommit, mdDiscard, err := l.startTransaction(level.MetadataDB, + csAlwaysFlush) if err != nil { return fmt.Errorf("metadata open db transaction: %w", err) } @@ -459,7 +521,7 @@ func heightHashToKey(height uint64, hash []byte) []byte { if len(hash) != chainhash.HashSize { panic(fmt.Sprintf("invalid hash size: %v", len(hash))) } - key := make([]byte, 8+1+chainhash.HashSize) + key := make([]byte, heighthashSize) binary.BigEndian.PutUint64(key[0:8], height) copy(key[9:], hash) return key @@ -477,10 +539,10 @@ func keyToHeightHash(key []byte) (uint64, *chainhash.Hash) { // encodeBlockHeader encodes a database block header as // [height,header,difficulty] or [8+80+32] bytes. The hash is the leveldb table // key. -func encodeBlockHeader(height uint64, header [80]byte, difficulty *big.Int) (ebhr [120]byte) { +func encodeBlockHeader(height uint64, header [80]byte, difficulty *big.Int) (ebhr [blockheaderSize]byte) { binary.BigEndian.PutUint64(ebhr[0:8], height) copy(ebhr[8:88], header[:]) - difficulty.FillBytes(ebhr[88:120]) + difficulty.FillBytes(ebhr[88:blockheaderSize]) return } @@ -502,7 +564,8 @@ func (l *ldb) BlockHeaderGenesisInsert(ctx context.Context, wbh *wire.BlockHeade defer log.Tracef("BlockHeaderGenesisInsert exit") // block headers - bhsTx, bhsCommit, bhsDiscard, err := l.startTransaction(level.BlockHeadersDB) + bhsTx, bhsCommit, bhsDiscard, err := l.startTransaction(level.BlockHeadersDB, + csAlwaysFlush) if err != nil { return fmt.Errorf("block header open transaction: %w", err) } @@ -519,14 +582,16 @@ func (l *ldb) BlockHeaderGenesisInsert(ctx context.Context, wbh *wire.BlockHeade } // blocks missing - bmTx, bmCommit, bmDiscard, err := l.startTransaction(level.BlocksMissingDB) + bmTx, bmCommit, bmDiscard, err := l.startTransaction(level.BlocksMissingDB, + csAlwaysFlush) if err != nil { return fmt.Errorf("blocks missing open transaction: %w", err) } defer bmDiscard() // height hash - hhTx, hhCommit, hhDiscard, err := l.startTransaction(level.HeightHashDB) + hhTx, hhCommit, hhDiscard, err := l.startTransaction(level.HeightHashDB, + csAlwaysFlush) if err != nil { return fmt.Errorf("height hash open transaction: %w", err) } @@ -712,7 +777,8 @@ func (l *ldb) BlockHeadersRemove(ctx context.Context, bhs *wire.MsgHeaders, tipA // happen inside the database transaction. // Metadata - mdTx, mdCommit, mdDiscard, err := l.startTransaction(level.MetadataDB) + mdTx, mdCommit, mdDiscard, err := l.startTransaction(level.MetadataDB, + csAlwaysFlush) if err != nil { return tbcd.RTInvalid, nil, fmt.Errorf("metadata open transaction: %w", err) @@ -720,7 +786,8 @@ func (l *ldb) BlockHeadersRemove(ctx context.Context, bhs *wire.MsgHeaders, tipA defer mdDiscard() // Block headers - bhsTx, bhsCommit, bhsDiscard, err := l.startTransaction(level.BlockHeadersDB) + bhsTx, bhsCommit, bhsDiscard, err := l.startTransaction(level.BlockHeadersDB, + csNeverFlush) if err != nil { return tbcd.RTInvalid, nil, fmt.Errorf("block headers remove: unable to start block headers leveldb transaction, err: %w", err) @@ -728,7 +795,8 @@ func (l *ldb) BlockHeadersRemove(ctx context.Context, bhs *wire.MsgHeaders, tipA defer bhsDiscard() // height hash - hhTx, hhCommit, hhDiscard, err := l.startTransaction(level.HeightHashDB) + hhTx, hhCommit, hhDiscard, err := l.startTransaction(level.HeightHashDB, + csNeverFlush) if err != nil { return tbcd.RTInvalid, nil, fmt.Errorf("block headers remove: unable to start height hash leveldb transaction, err: %w", err) @@ -1020,7 +1088,8 @@ func (l *ldb) BlockHeadersInsert(ctx context.Context, bhs *wire.MsgHeaders, batc } // Metadata - mdTx, mdCommit, mdDiscard, err := l.startTransaction(level.MetadataDB) + mdTx, mdCommit, mdDiscard, err := l.startTransaction(level.MetadataDB, + csAlwaysFlush) if err != nil { return tbcd.ITInvalid, nil, nil, 0, fmt.Errorf("metadata open transaction: %w", err) @@ -1028,7 +1097,8 @@ func (l *ldb) BlockHeadersInsert(ctx context.Context, bhs *wire.MsgHeaders, batc defer mdDiscard() // block headers - bhsTx, bhsCommit, bhsDiscard, err := l.startTransaction(level.BlockHeadersDB) + bhsTx, bhsCommit, bhsDiscard, err := l.startTransaction(level.BlockHeadersDB, + csNeverFlush) if err != nil { return tbcd.ITInvalid, nil, nil, 0, fmt.Errorf("block headers open transaction: %w", err) @@ -1068,7 +1138,8 @@ func (l *ldb) BlockHeadersInsert(ctx context.Context, bhs *wire.MsgHeaders, batc } // blocks missing - bmTx, bmCommit, bmDiscard, err := l.startTransaction(level.BlocksMissingDB) + bmTx, bmCommit, bmDiscard, err := l.startTransaction(level.BlocksMissingDB, + csNeverFlush) if err != nil { return tbcd.ITInvalid, nil, nil, 0, fmt.Errorf("blocks missing open transaction: %w", err) @@ -1076,7 +1147,8 @@ func (l *ldb) BlockHeadersInsert(ctx context.Context, bhs *wire.MsgHeaders, batc defer bmDiscard() // height hash - hhTx, hhCommit, hhDiscard, err := l.startTransaction(level.HeightHashDB) + hhTx, hhCommit, hhDiscard, err := l.startTransaction(level.HeightHashDB, + csNeverFlush) if err != nil { return tbcd.ITInvalid, nil, nil, 0, fmt.Errorf("height hash open transaction: %w", err) @@ -1603,7 +1675,8 @@ func (l *ldb) BlockUtxoUpdate(ctx context.Context, direction int, utxos map[tbcd } // outputs - outsTx, outsCommit, outsDiscard, err := l.startTransaction(level.OutputsDB) + outsTx, outsCommit, outsDiscard, err := l.startTransaction(level.OutputsDB, + csNeverFlush) if err != nil { return fmt.Errorf("outputs open db transaction: %w", err) } @@ -1659,7 +1732,8 @@ func (l *ldb) BlockTxUpdate(ctx context.Context, direction int, txs map[tbcd.TxK } // transactions - txsTx, txsCommit, txsDiscard, err := l.startTransaction(level.TransactionsDB) + txsTx, txsCommit, txsDiscard, err := l.startTransaction(level.TransactionsDB, + csNeverFlush) if err != nil { return fmt.Errorf("transactions open db transaction: %w", err) } @@ -1732,7 +1806,7 @@ func (l *ldb) BlockTxUpdate(ctx context.Context, direction int, txs map[tbcd.TxK // encodeKeystone encodes a database keystone as // [blockhash,abbreviated keystone] or [32+76] bytes. The abbreviated keystone // hash is the leveldb table key. -func encodeKeystone(ks tbcd.Keystone) (eks [chainhash.HashSize + hemi.L2KeystoneAbrevSize]byte) { +func encodeKeystone(ks tbcd.Keystone) (eks [keystoneSize]byte) { copy(eks[0:32], ks.BlockHash[:]) copy(eks[32:], ks.AbbreviatedKeystone[:]) return @@ -1764,7 +1838,10 @@ func (l *ldb) BlockKeystoneUpdate(ctx context.Context, direction int, keystones } // keystones - kssTx, kssCommit, kssDiscard, err := l.startTransaction(level.KeystonesDB) + // There are never going to be thousands of keystones on mainnet + // so always flush. + kssTx, kssCommit, kssDiscard, err := l.startTransaction(level.KeystonesDB, + csAlwaysFlush) if err != nil { return fmt.Errorf("keystones open db transaction: %w", err) }