Skip to content

Commit

Permalink
speed up key computation
Browse files Browse the repository at this point in the history
* batch database key writes during `computeKey` calls
* log progress when there are many keys to update
* avoid evicting the vertex cache when traversing the trie for key
computation purposes
* avoid storing trivial leaf hashes that directly can be loaded from the
vertex
  • Loading branch information
arnetheduck committed Sep 19, 2024
1 parent a9ad10c commit 7884bcd
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 52 deletions.
4 changes: 2 additions & 2 deletions nimbus/db/aristo/aristo_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1036,9 +1036,9 @@ func init*(

else:
beDup.getVtxFn =
proc(a: RootedVertexID): auto =
proc(a: RootedVertexID, flags: set[GetVtxFlag]): auto =
AristoApiProfBeGetVtxFn.profileRunner:
result = be.getVtxFn(a)
result = be.getVtxFn(a, flags)
data.list[AristoApiProfBeGetVtxFn.ord].masked = true

beDup.getKeyFn =
Expand Down
118 changes: 91 additions & 27 deletions nimbus/db/aristo/aristo_compute.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,71 @@
{.push raises: [].}

import
std/strformat,
chronicles,
eth/common,
results,
"."/[aristo_desc, aristo_get, aristo_serialise]
"."/[aristo_desc, aristo_get, aristo_serialise],
./aristo_desc/desc_backend

type WriteBatch = tuple[writer: PutHdlRef, count: int, depth: int, prefix: uint64]

# Keep write batch size _around_ 1mb, give or take some overhead - this is a
# tradeoff between efficiency and memory usage with diminishing returns the
# larger it is..
const batchSize = 1024 * 1024 div (sizeof(RootedVertexID) + sizeof(HashKey))

func progress(batch: WriteBatch): string =
# Return an approximation on how much of the keyspace has been covered by
# looking at the path prefix that we're currently processing
&"{(float(batch.prefix) / float(uint64.high)) * 100:02.2f}%"

func enter(batch: var WriteBatch, nibble: int) =
batch.depth += 1
if batch.depth <= 16:
batch.prefix += uint64(nibble) shl ((16 - batch.depth) * 4)

func leave(batch: var WriteBatch, nibble: int) =
if batch.depth <= 16:
batch.prefix -= uint64(nibble) shl ((16 - batch.depth) * 4)
batch.depth -= 1

proc putKeyAtLevel(
db: AristoDbRef, rvid: RootedVertexID, key: HashKey, level: int
db: AristoDbRef,
rvid: RootedVertexID,
key: HashKey,
level: int,
batch: var WriteBatch,
): Result[void, AristoError] =
## Store a hash key in the given layer or directly to the underlying database
## which helps ensure that memory usage is proportional to the pending change
## set (vertex data may have been committed to disk without computing the
## corresponding hash!)

# Only put computed keys in the database which keeps churn down by focusing on
# the ones that do not change - the ones that don't require hashing might as
# well be loaded from the vertex!
if level == -2:
let be = db.backend
doAssert be != nil, "source data is from the backend"
# TODO long-running batch here?
let writeBatch = ?be.putBegFn()
be.putKeyFn(writeBatch, rvid, key)
?be.putEndFn writeBatch
if key.len == 32:
let be = db.backend
if batch.writer == nil:
doAssert be != nil, "source data is from the backend"
# TODO long-running batch here?
batch.writer = ?be.putBegFn()

be.putKeyFn(batch.writer, rvid, key)
batch.count += 1

if batch.count mod batchSize == 0:
if batch.count mod (batchSize * 100) == 0:
info "Writing computeKey cache",
count = batch.count, accounts = batch.progress
else:
debug "Writing computeKey cache",
count = batch.count, accounts = batch.progress
?be.putEndFn batch.writer
batch.writer = nil

ok()
else:
db.deltaAtLevel(level).kMap[rvid] = key
Expand All @@ -45,9 +92,10 @@ func maxLevel(cur, other: int): int =
min(cur, other) # Here the order is reversed and 0 is the top layer

proc computeKeyImpl(
db: AristoDbRef; # Database, top layer
rvid: RootedVertexID; # Vertex to convert
): Result[(HashKey, int), AristoError] =
db: AristoDbRef, # Database, top layer
rvid: RootedVertexID, # Vertex to convert
batch: var WriteBatch,
): Result[(HashKey, int), AristoError] =
## Compute the key for an arbitrary vertex ID. If successful, the length of
## the resulting key might be smaller than 32. If it is used as a root vertex
## state/hash, it must be converted to a `Hash256` (using (`.to(Hash256)`) as
Expand All @@ -57,15 +105,16 @@ proc computeKeyImpl(
db.getKeyRc(rvid).isErrOr:
# Value cached either in layers or database
return ok value
let (vtx, vl) = ? db.getVtxRc rvid

let (vtx, vl) = ?db.getVtxRc(rvid, {GetVtxFlag.PeekCache})

# Top-most level of all the verticies this hash compution depends on
var level = vl

# TODO this is the same code as when serializing NodeRef, without the NodeRef
var writer = initRlpWriter()

case vtx.vType:
case vtx.vType
of Leaf:
writer.startList(2)
writer.append(vtx.pfx.toHexPrefix(isLeaf = true).data())
Expand All @@ -76,36 +125,41 @@ proc computeKeyImpl(
stoID = vtx.lData.stoID
skey =
if stoID.isValid:
let (skey, sl) = ?db.computeKeyImpl((stoID.vid, stoID.vid))
let (skey, sl) = ?db.computeKeyImpl((stoID.vid, stoID.vid), batch)
level = maxLevel(level, sl)
skey
else:
VOID_HASH_KEY

writer.append(encode Account(
nonce: vtx.lData.account.nonce,
balance: vtx.lData.account.balance,
storageRoot: skey.to(Hash256),
codeHash: vtx.lData.account.codeHash)
writer.append(
encode Account(
nonce: vtx.lData.account.nonce,
balance: vtx.lData.account.balance,
storageRoot: skey.to(Hash256),
codeHash: vtx.lData.account.codeHash,
)
)
of RawData:
writer.append(vtx.lData.rawBlob)
of StoData:
# TODO avoid memory allocation when encoding storage data
writer.append(rlp.encode(vtx.lData.stoData))

of Branch:
template writeBranch(w: var RlpWriter) =
w.startList(17)
for n in 0..15:
for n in 0 .. 15:
let vid = vtx.bVid[n]
if vid.isValid:
let (bkey, bl) = ?db.computeKeyImpl((rvid.root, vid))
batch.enter(n)
let (bkey, bl) = ?db.computeKeyImpl((rvid.root, vid), batch)
batch.leave(n)

level = maxLevel(level, bl)
w.append(bkey)
else:
w.append(VOID_HASH_KEY)
w.append EmptyBlob

if vtx.pfx.len > 0: # Extension node
var bwriter = initRlpWriter()
writeBranch(bwriter)
Expand All @@ -124,15 +178,25 @@ proc computeKeyImpl(
# likely to live in an in-memory layer since any leaf change will lead to the
# root key also changing while leaves that have never been hashed will see
# their hash being saved directly to the backend.
? db.putKeyAtLevel(rvid, h, level)
?db.putKeyAtLevel(rvid, h, level, batch)

ok (h, level)

proc computeKey*(
db: AristoDbRef; # Database, top layer
rvid: RootedVertexID; # Vertex to convert
): Result[HashKey, AristoError] =
ok (?computeKeyImpl(db, rvid))[0]
db: AristoDbRef, # Database, top layer
rvid: RootedVertexID, # Vertex to convert
): Result[HashKey, AristoError] =
var batch: WriteBatch
let res = computeKeyImpl(db, rvid, batch)
if res.isOk:
if batch.writer != nil:
if batch.count >= batchSize * 100:
info "Writing computeKey cache", count = batch.count, progress = "100.00%"
else:
debug "Writing computeKey cache", count = batch.count, progress = "100.00%"
?db.backend.putEndFn batch.writer
batch.writer = nil
ok (?res)[0]

# ------------------------------------------------------------------------------
# End
Expand Down
4 changes: 2 additions & 2 deletions nimbus/db/aristo/aristo_delete.nim
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ proc deleteImpl(

# Unlink child vertex from structural table
br.vtx.bVid[hike.legs[^2].nibble] = VertexID(0)
db.layersPutVtx((hike.root, br.vid), br.vtx)
db.layersUpdateVtx((hike.root, br.vid), br.vtx)

# Clear all Merkle hash keys up to the root key
for n in 0 .. hike.legs.len - 2:
Expand Down Expand Up @@ -111,7 +111,7 @@ proc deleteImpl(
bVid: nxt.bVid)

# Put the new vertex at the id of the obsolete branch
db.layersPutVtx((hike.root, br.vid), vtx)
db.layersUpdateVtx((hike.root, br.vid), vtx)

if vtx.vType == Leaf:
ok(vtx)
Expand Down
2 changes: 1 addition & 1 deletion nimbus/db/aristo/aristo_desc/desc_backend.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import

type
GetVtxFn* =
proc(rvid: RootedVertexID): Result[VertexRef,AristoError] {.gcsafe, raises: [].}
proc(rvid: RootedVertexID, flags: set[GetVtxFlag]): Result[VertexRef,AristoError] {.gcsafe, raises: [].}
## Generic backend database retrieval function for a single structural
## `Aristo DB` data record.

Expand Down
5 changes: 5 additions & 0 deletions nimbus/db/aristo/aristo_desc/desc_structural.nim
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ type

txUid*: uint ## Transaction identifier if positive

GetVtxFlag* = enum
PeekCache
## Peek into, but don't update cache - useful on work loads that are
## unfriendly to caches

# ------------------------------------------------------------------------------
# Public helpers (misc)
# ------------------------------------------------------------------------------
Expand Down
14 changes: 8 additions & 6 deletions nimbus/db/aristo/aristo_get.nim
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ proc getLstUbe*(
proc getVtxUbe*(
db: AristoDbRef;
rvid: RootedVertexID;
flags: set[GetVtxFlag] = {};
): Result[VertexRef,AristoError] =
## Get the vertex from the unfiltered backened if available.
let be = db.backend
if not be.isNil:
return be.getVtxFn rvid
return be.getVtxFn(rvid, flags)
err GetVtxNotFound

proc getKeyUbe*(
Expand All @@ -73,14 +74,15 @@ proc getTuvBE*(
proc getVtxBE*(
db: AristoDbRef;
rvid: RootedVertexID;
flags: set[GetVtxFlag] = {};
): Result[(VertexRef, int),AristoError] =
## Get the vertex from the (filtered) backened if available.
if not db.balancer.isNil:
db.balancer.sTab.withValue(rvid, w):
if w[].isValid:
return ok (w[], -1)
return err(GetVtxNotFound)
ok (? db.getVtxUbe rvid, -2)
ok (? db.getVtxUbe(rvid, flags), -2)

proc getKeyBE*(
db: AristoDbRef;
Expand All @@ -98,7 +100,8 @@ proc getKeyBE*(

proc getVtxRc*(
db: AristoDbRef;
rvid: RootedVertexID
rvid: RootedVertexID;
flags: set[GetVtxFlag] = {};
): Result[(VertexRef, int),AristoError] =
## Cascaded attempt to fetch a vertex from the cache layers or the backend.
##
Expand All @@ -113,15 +116,14 @@ proc getVtxRc*(
else:
return err(GetVtxNotFound)

db.getVtxBE rvid
db.getVtxBE(rvid, flags)

proc getVtx*(db: AristoDbRef; rvid: RootedVertexID): VertexRef =
proc getVtx*(db: AristoDbRef; rvid: RootedVertexID, flags: set[GetVtxFlag] = {}): VertexRef =
## Cascaded attempt to fetch a vertex from the cache layers or the backend.
## The function returns `nil` on error or failure.
##
db.getVtxRc(rvid).valueOr((VertexRef(nil), 0))[0]


proc getKeyRc*(db: AristoDbRef; rvid: RootedVertexID): Result[(HashKey, int),AristoError] =
## Cascaded attempt to fetch a Merkle hash from the cache layers or the
## backend. This function will never return a `VOID_HASH_KEY` but rather
Expand Down
2 changes: 1 addition & 1 deletion nimbus/db/aristo/aristo_init/memory_db.nim
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ proc endSession(hdl: PutHdlRef; db: MemBackendRef): MemPutHdlRef =

proc getVtxFn(db: MemBackendRef): GetVtxFn =
result =
proc(rvid: RootedVertexID): Result[VertexRef,AristoError] =
proc(rvid: RootedVertexID, flags: set[GetVtxFlag]): Result[VertexRef,AristoError] =
# Fetch serialised data record
let data = db.mdb.sTab.getOrDefault(rvid, EmptyBlob)
if 0 < data.len:
Expand Down
4 changes: 2 additions & 2 deletions nimbus/db/aristo/aristo_init/rocks_db.nim
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ proc endSession(hdl: PutHdlRef; db: RdbBackendRef): RdbPutHdlRef =

proc getVtxFn(db: RdbBackendRef): GetVtxFn =
result =
proc(rvid: RootedVertexID): Result[VertexRef,AristoError] =
proc(rvid: RootedVertexID, flags: set[GetVtxFlag]): Result[VertexRef,AristoError] =

# Fetch serialised data record
let vtx = db.rdb.getVtx(rvid).valueOr:
let vtx = db.rdb.getVtx(rvid, flags).valueOr:
when extraTraceMessages:
trace logTxt "getVtxFn() failed", rvid, error=error[0], info=error[1]
return err(error[0])
Expand Down
13 changes: 10 additions & 3 deletions nimbus/db/aristo/aristo_init/rocks_db/rdb_get.nim
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,15 @@ proc getKey*(
proc getVtx*(
rdb: var RdbInst;
rvid: RootedVertexID;
flags: set[GetVtxFlag];
): Result[VertexRef,(AristoError,string)] =
# Try LRU cache first
var rc = rdb.rdVtxLru.get(rvid.vid)
var rc =
if GetVtxFlag.PeekCache in flags:
rdb.rdVtxLru.peek(rvid.vid)
else:
rdb.rdVtxLru.get(rvid.vid)

if rc.isOK:
rdbVtxLruStats[rvid.to(RdbStateType)][rc.value().vType].inc(true)
return ok(move(rc.value))
Expand All @@ -164,8 +170,9 @@ proc getVtx*(

rdbVtxLruStats[rvid.to(RdbStateType)][res.value().vType].inc(false)

# Update cache and return
rdb.rdVtxLru.put(rvid.vid, res.value())
# Update cache and return - in peek mode, avoid evicting cache items
if GetVtxFlag.PeekCache notin flags or rdb.rdVtxLru.len < rdb.rdVtxLru.capacity:
rdb.rdVtxLru.put(rvid.vid, res.value())

ok res.value()

Expand Down
8 changes: 7 additions & 1 deletion nimbus/db/aristo/aristo_init/rocks_db/rdb_put.nim
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ proc putKey*(
rvid: RootedVertexID, key: HashKey;
): Result[void,(VertexID,AristoError,string)] =
let dsc = rdb.session
if key.isValid:
# We only write keys whose value has to be hashed - the trivial ones can be
# loaded from the corresponding vertex directly!
# TODO move this logic to a higher layer
# TODO skip the delete for trivial keys - it's here to support databases that
# were written at a time when trivial keys were also cached - it should
# be cleaned up when revising the key cache in general.
if key.isValid and key.len == 32:
dsc.put(rvid.blobify().data(), key.data, rdb.keyCol.handle()).isOkOr:
# Caller must `rollback()` which will flush the `rdKeyLru` cache
const errSym = RdbBeDriverPutKeyError
Expand Down
Loading

0 comments on commit 7884bcd

Please sign in to comment.