Skip to content

Commit

Permalink
Add force prune for statically set radius (#1896)
Browse files Browse the repository at this point in the history
  • Loading branch information
kdeme authored Nov 21, 2023
1 parent 610e2d3 commit a7bb52e
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 75 deletions.
8 changes: 8 additions & 0 deletions fluffy/conf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,14 @@ type
defaultValue: none(TrustedDigest)
name: "trusted-block-root" .}: Option[TrustedDigest]

forcePrune* {.
hidden
desc: "Force the pruning of the database. This should be used when the " &
"database is decreased in size, e.g. when a lower static radius " &
"is set. Only supported for statically set radius."
defaultValue: false
name: "force-prune" .}: bool

disablePoke* {.
hidden
desc: "Disable POKE functionality for gossip mechanisms testing"
Expand Down
113 changes: 76 additions & 37 deletions fluffy/content_db.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Nimbus
# Fluffy
# Copyright (c) 2021-2023 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
Expand All @@ -10,12 +10,13 @@
import
chronicles,
metrics,
eth/db/kvstore,
eth/db/kvstore_sqlite3,
stint,
stew/results,
eth/db/kvstore,
eth/db/kvstore_sqlite3,
./network/state/state_content,
"."/network/wire/[portal_protocol, portal_protocol_config]
"."/network/wire/[portal_protocol, portal_protocol_config],
./content_db_custom_sql_functions

export kvstore_sqlite3

Expand Down Expand Up @@ -51,14 +52,17 @@ type
distance: array[32, byte]

ContentDB* = ref object
backend: SqStoreRef
kv: KvStoreRef
manualCheckpoint: bool
storageCapacity*: uint64
sizeStmt: SqliteStmt[NoParams, int64]
unusedSizeStmt: SqliteStmt[NoParams, int64]
vacuumStmt: SqliteStmt[NoParams, void]
contentCountStmt: SqliteStmt[NoParams, int64]
contentSizeStmt: SqliteStmt[NoParams, int64]
getAllOrderedByDistanceStmt: SqliteStmt[array[32, byte], RowInfo]
deleteOutOfRadiusStmt: SqliteStmt[(array[32, byte], array[32, byte]), void]

PutResultType* = enum
ContentStored, DbPruned
Expand All @@ -72,41 +76,28 @@ type
deletedFraction*: float64
deletedElements*: int64

func xorDistance(
a: openArray[byte],
b: openArray[byte]
): Result[seq[byte], cstring] {.cdecl.} =
var s: seq[byte] = newSeq[byte](32)

if len(a) != 32 or len(b) != 32:
return err("Blobs should have 32 byte length")

var i = 0
while i < 32:
s[i] = a[i] xor b[i]
inc i

return ok(s)

template expectDb(x: auto): untyped =
# There's no meaningful error handling implemented for a corrupt database or
# full disk - this requires manual intervention, so we'll panic for now
x.expect("working database (disk broken/full?)")

proc new*(
T: type ContentDB, path: string, storageCapacity: uint64, inMemory = false):
ContentDB =
T: type ContentDB, path: string, storageCapacity: uint64,
inMemory = false, manualCheckpoint = false): ContentDB =
doAssert(storageCapacity <= uint64(int64.high))

let db =
if inMemory:
SqStoreRef.init("", "fluffy-test", inMemory = true).expect(
"working database (out of memory?)")
else:
SqStoreRef.init(path, "fluffy").expectDb()
SqStoreRef.init(path, "fluffy", manualCheckpoint = false).expectDb()

db.createCustomFunction("xorDistance", 2, xorDistance).expect(
"Custom function xorDistance creation OK")

db.registerCustomScalarFunction("xorDistance", xorDistance)
.expect("Couldn't register custom xor function")
db.createCustomFunction("isInRadius", 3, isInRadius).expect(
"Custom function isInRadius creation OK")

let sizeStmt = db.prepareStmt(
"SELECT page_count * page_size as size FROM pragma_page_count(), pragma_page_size();",
Expand Down Expand Up @@ -134,17 +125,39 @@ proc new*(
"SELECT key, length(value), xorDistance(?, key) as distance FROM kvstore ORDER BY distance DESC",
array[32, byte], RowInfo).get()

let deleteOutOfRadiusStmt = db.prepareStmt(
"DELETE FROM kvstore WHERE isInRadius(?, key, ?) == 0",
(array[32, byte], array[32, byte]), void).get()

ContentDB(
kv: kvStore,
backend: db,
manualCheckpoint: manualCheckpoint,
storageCapacity: storageCapacity,
sizeStmt: sizeStmt,
unusedSizeStmt: unusedSizeStmt,
vacuumStmt: vacuumStmt,
contentSizeStmt: contentSizeStmt,
contentCountStmt: contentCountStmt,
getAllOrderedByDistanceStmt: getAllOrderedByDistanceStmt
getAllOrderedByDistanceStmt: getAllOrderedByDistanceStmt,
deleteOutOfRadiusStmt: deleteOutOfRadiusStmt
)

template disposeSafe(s: untyped): untyped =
if distinctBase(s) != nil:
s.dispose()
s = typeof(s)(nil)

proc close*(db: ContentDB) =
db.sizeStmt.disposeSafe()
db.unusedSizeStmt.disposeSafe()
db.vacuumStmt.disposeSafe()
db.contentCountStmt.disposeSafe()
db.contentSizeStmt.disposeSafe()
db.getAllOrderedByDistanceStmt.disposeSafe()
db.deleteOutOfRadiusStmt.disposeSafe()
discard db.kv.close()

## Private KvStoreRef Calls

proc get(kv: KvStoreRef, key: openArray[byte]): Opt[seq[byte]] =
Expand Down Expand Up @@ -210,15 +223,7 @@ proc del*(db: ContentDB, key: ContentId) =
proc getSszDecoded*(db: ContentDB, key: ContentId, T: type auto): Opt[T] =
db.getSszDecoded(key.toBytesBE(), T)

## Public database size, content and pruning related calls

proc reclaimSpace*(db: ContentDB): void =
## Runs sqlite VACUUM commands which rebuilds the db, repacking it into a
## minimal amount of disk space.
## Ideal mode of operation, is to run it after several deletes.
## Another option would be to run 'PRAGMA auto_vacuum = FULL;' statement at
## the start of db to leave it up to sqlite to clean up
db.vacuumStmt.exec().expectDb()
## Public calls to get database size, content size and similar.

proc size*(db: ContentDB): int64 =
## Return current size of DB as product of sqlite page_count and page_size:
Expand Down Expand Up @@ -260,12 +265,14 @@ proc contentCount*(db: ContentDB): int64 =
count = res).expectDb()
return count

## Pruning related calls

proc deleteContentFraction*(
db: ContentDB,
target: UInt256,
fraction: float64): (UInt256, int64, int64, int64) =
## Deletes at most `fraction` percent of content form database.
## Content furthest from provided `target` is deleted first.
## Deletes at most `fraction` percent of content from the database.
## The content furthest from the provided `target` is deleted first.
# TODO: The usage of `db.contentSize()` for the deletion calculation versus
# `db.usedSize()` for the pruning threshold leads sometimes to some unexpected
# results of how much content gets up deleted.
Expand Down Expand Up @@ -294,6 +301,38 @@ proc deleteContentFraction*(
deletedElements
)

proc reclaimSpace*(db: ContentDB): void =
## Runs sqlite VACUUM commands which rebuilds the db, repacking it into a
## minimal amount of disk space.
## Ideal mode of operation is to run it after several deletes.
## Another option would be to run 'PRAGMA auto_vacuum = FULL;' statement at
## the start of db to leave it up to sqlite to clean up.
db.vacuumStmt.exec().expectDb()

proc deleteContentOutOfRadius*(
db: ContentDB, localId: UInt256, radius: UInt256) =
## Deletes all content that falls outside of the given radius range.
db.deleteOutOfRadiusStmt.exec(
(localId.toBytesBE(), radius.toBytesBE())).expect("SQL query OK")

proc forcePrune*(db: ContentDB, localId: UInt256, radius: UInt256) =
## Force prune the database to a statically set radius. This will also run
## the reclaimSpace (vacuum) to free unused pages. As side effect this will
## cause the pruned database size to double in size on disk (wal file will be
## approximately the same size as the db). A truncate checkpoint is done to
## clean that up. In order to be able do the truncate checkpoint, the db needs
## to be initialized in with `manualCheckpoint` on, else this step will be
## skipped.
notice "Starting the pruning of content"
db.deleteContentOutOfRadius(localId, radius)
notice "Reclaiming unused pages"
db.reclaimSpace()
if db.manualCheckpoint:
notice "Truncating WAL file"
db.backend.checkpoint(SqStoreCheckpointKind.truncate)
db.close()
notice "Finished database pruning"

proc put*(
db: ContentDB,
key: ContentId,
Expand Down
72 changes: 72 additions & 0 deletions fluffy/content_db_custom_sql_functions.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Fluffy
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

{.push raises: [].}

import
stew/ptrops,
stint,
sqlite3_abi,
eth/db/kvstore_sqlite3

func xorDistance(a: openArray[byte], b: openArray[byte]): seq[byte] =
doAssert(a.len == b.len)

let length = a.len
var distance: seq[byte] = newSeq[byte](length)
for i in 0..<length:
distance[i] = a[i] xor b[i]

return distance

proc xorDistance*(
ctx: SqliteContext, n: cint, v: SqliteValue)
{.cdecl, gcsafe, raises: [].} =
doAssert(n == 2)

let
ptrs = makeUncheckedArray(v)
blob1Len = sqlite3_value_bytes(ptrs[][0])
blob2Len = sqlite3_value_bytes(ptrs[][1])

bytes = xorDistance(
makeOpenArray(sqlite3_value_blob(ptrs[][0]), byte, blob1Len),
makeOpenArray(sqlite3_value_blob(ptrs[][1]), byte, blob2Len)
)

sqlite3_result_blob(ctx, baseAddr bytes, cint bytes.len, SQLITE_TRANSIENT)

func isInRadius(contentId: UInt256, localId: UInt256, radius: UInt256): bool =
let distance = contentId xor localId

radius > distance

func isInRadius*(
ctx: SqliteContext, n: cint, v: SqliteValue)
{.cdecl, gcsafe, raises: [].} =
doAssert(n == 3)

let
ptrs = makeUncheckedArray(v)
blob1Len = sqlite3_value_bytes(ptrs[][0])
blob2Len = sqlite3_value_bytes(ptrs[][1])
blob3Len = sqlite3_value_bytes(ptrs[][2])

doAssert(blob1Len == 32 and blob2Len == 32 and blob3Len == 32)

let
localId = UInt256.fromBytesBE(
makeOpenArray(sqlite3_value_blob(ptrs[][0]), byte, blob1Len))
contentId = UInt256.fromBytesBE(
makeOpenArray(sqlite3_value_blob(ptrs[][1]), byte, blob2Len))
radius = UInt256.fromBytesBE(
makeOpenArray(sqlite3_value_blob(ptrs[][2]), byte, blob3Len))

if isInRadius(contentId, localId, radius):
ctx.sqlite3_result_int(cint 1)
else:
ctx.sqlite3_result_int(cint 0)
9 changes: 9 additions & 0 deletions fluffy/fluffy.nim
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,15 @@ proc run(config: PortalConf) {.raises: [CatchableError].} =

d.open()

# Force pruning
if config.forcePrune and config.radiusConfig.kind == Static:
let db = ContentDB.new(config.dataDir / "db" / "contentdb_" &
d.localNode.id.toBytesBE().toOpenArray(0, 8).toHex(),
storageCapacity = config.storageCapacityMB * 1_000_000,
manualCheckpoint = true)
db.forcePrune(d.localNode.id, UInt256.fromLogRadius(config.radiusConfig.logRadius))
db.close()

# Store the database at contentdb prefixed with the first 8 chars of node id.
# This is done because the content in the db is dependant on the `NodeId` and
# the selected `Radius`.
Expand Down
16 changes: 0 additions & 16 deletions fluffy/network/wire/portal_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -463,22 +463,6 @@ proc messageHandler(protocol: TalkProtocol, request: seq[byte],
debug "Packet decoding error", error = decoded.error, srcId, srcUdpAddress
@[]

proc fromLogRadius(T: type UInt256, logRadius: uint16): T =
# Get the max value of the logRadius range
pow((2).stuint(256), logRadius) - 1

proc getInitialRadius(rc: RadiusConfig): UInt256 =
case rc.kind
of Static:
return UInt256.fromLogRadius(rc.logRadius)
of Dynamic:
# In case of a dynamic radius we start from the maximum value to quickly
# gather as much data as possible, and also make sure each data piece in
# the database is in our range after a node restart.
# Alternative would be to store node the radius in database, and initialize
# it from database after a restart
return UInt256.high()

proc new*(T: type PortalProtocol,
baseProtocol: protocol.Protocol,
protocolId: PortalProtocolId,
Expand Down
22 changes: 20 additions & 2 deletions fluffy/network/wire/portal_protocol_config.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Nimbus
# Fluffy
# Copyright (c) 2021-2023 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
Expand All @@ -11,6 +11,7 @@ import
std/strutils,
confutils,
chronos,
stint,
eth/p2p/discoveryv5/routing_table

type
Expand All @@ -30,7 +31,6 @@ type
radiusConfig*: RadiusConfig
disablePoke*: bool


const
defaultRadiusConfig* = RadiusConfig(kind: Dynamic)
defaultRadiusConfigDesc* = $defaultRadiusConfig.kind
Expand Down Expand Up @@ -65,6 +65,24 @@ proc init*(
disablePoke: disablePoke
)

proc fromLogRadius*(T: type UInt256, logRadius: uint16): T =
# Get the max value of the logRadius range
pow((2).stuint(256), logRadius) - 1

proc getInitialRadius*(rc: RadiusConfig): UInt256 =
case rc.kind
of Static:
return UInt256.fromLogRadius(rc.logRadius)
of Dynamic:
# In case of a dynamic radius we start from the maximum value to quickly
# gather as much data as possible, and also make sure each data piece in
# the database is in our range after a node restart.
# Alternative would be to store node the radius in database, and initialize
# it from database after a restart
return UInt256.high()

## Confutils parsers

proc parseCmdArg*(T: type RadiusConfig, p: string): T
{.raises: [ValueError].} =
if p.startsWith("dynamic") and len(p) == 7:
Expand Down
Loading

0 comments on commit a7bb52e

Please sign in to comment.