Skip to content

Commit

Permalink
Quick and dirty PoC for syncing from Portal history network
Browse files Browse the repository at this point in the history
This PR is not intended to get merged. It is a very quick and
dirty implementation with the intention of testing the Portal
network and Fluffy code and to verify how long downloads of blocks
take in comparison with the execution of them.
It kind of abuses the current import from era code to do this.

I think (?) in an improved version the block downloads should
probably lead the implementation and trigger execution (it is a
bit the reverse right now, which makes sense for era files).
Perhaps that way the execution could even be offloaded to another
thread?

It is also coded without using the JSON-RPC API, as I found that
easier for a quick version. But the getBlock call could be
changed to use the json-rpc alternative.
  • Loading branch information
kdeme committed Jan 10, 2025
1 parent 419e019 commit 5fbe874
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 10 deletions.
7 changes: 7 additions & 0 deletions nimbus/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -550,12 +550,19 @@ type
defaultValue: false
name: "debug-store-slot-hashes".}: bool

usePortal* {.
hidden
desc: "Use portal network instead of era files"
defaultValue: false
name: "debug-use-portal".}: bool

of `import-rlp`:
blocksFile* {.
argument
desc: "One or more RLP encoded block(s) files"
name: "blocks-file" }: seq[InputFile]


func parseCmdArg(T: type NetworkId, p: string): T
{.gcsafe, raises: [ValueError].} =
parseBiggestUInt(p).T
Expand Down
2 changes: 1 addition & 1 deletion nimbus/nimbus_execution_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ proc run(nimbus: NimbusNode, conf: NimbusConf) =

case conf.cmd
of NimbusCmd.`import`:
importBlocks(conf, com)
importBlocksPortal(conf, com)
of NimbusCmd.`import-rlp`:
importRlpBlocks(conf, com)
else:
Expand Down
212 changes: 203 additions & 9 deletions nimbus/nimbus_import.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,25 @@ import
chronicles,
metrics,
chronos/timer,
std/[strformat, strutils],
chronos,
std/[strformat, strutils, os],
stew/io2,
beacon_chain/era_db,
beacon_chain/networking/network_metadata,
./config,
./common/common,
./core/chain,
./db/era1_db,
./utils/era_helpers
./utils/era_helpers,
eth/common/keys, # rng
eth/net/nat, # setupAddress
eth/p2p/discoveryv5/protocol as discv5_protocol,
eth/p2p/discoveryv5/routing_table,
eth/p2p/discoveryv5/enr,
../fluffy/portal_node,
../fluffy/common/common_utils, # getPersistentNetKey, getPersistentEnr
../fluffy/network_metadata,
../fluffy/version

declareGauge nec_import_block_number, "Latest imported block number"

Expand Down Expand Up @@ -87,7 +97,164 @@ template boolFlag(flags, b): PersistBlockFlags =
else:
{}

proc importBlocks*(conf: NimbusConf, com: CommonRef) =
proc run(config: NimbusConf): PortalNode {.
raises: [CatchableError]
.} =
let rng = newRng()

## Network configuration
let
bindIp = config.listenAddress
udpPort = Port(config.udpPort)
# TODO: allow for no TCP port mapping!
(extIp, _, extUdpPort) =
try:
setupAddress(config.nat, config.listenAddress, udpPort, udpPort, "portal")
except CatchableError as exc:
raiseAssert exc.msg
# raise exc # TODO: Ideally we don't have the Exception here
except Exception as exc:
raiseAssert exc.msg
(netkey, newNetKey) =
# if config.netKey.isSome():
# (config.netKey.get(), true)
# else:
getPersistentNetKey(rng[], config.dataDir / "netkey")

enrFilePath = config.dataDir / "nimbus_portal_node.enr"
previousEnr =
if not newNetKey:
getPersistentEnr(enrFilePath)
else:
Opt.none(enr.Record)

var bootstrapRecords: seq[Record]
# loadBootstrapFile(string config.bootstrapNodesFile, bootstrapRecords)
# bootstrapRecords.add(config.bootstrapNodes)

# case config.network
# of PortalNetwork.none:
# discard # don't connect to any network bootstrap nodes
# of PortalNetwork.mainnet:
# for enrURI in mainnetBootstrapNodes:
# let res = enr.Record.fromURI(enrURI)
# if res.isOk():
# bootstrapRecords.add(res.value)
# of PortalNetwork.angelfood:
# for enrURI in angelfoodBootstrapNodes:
# let res = enr.Record.fromURI(enrURI)
# if res.isOk():
# bootstrapRecords.add(res.value)

# Only mainnet
for enrURI in mainnetBootstrapNodes:
let res = enr.Record.fromURI(enrURI)
if res.isOk():
bootstrapRecords.add(res.value)

## Discovery v5 protocol setup
let
discoveryConfig =
DiscoveryConfig.init(DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop)
d = newProtocol(
netkey,
extIp,
Opt.none(Port),
extUdpPort,
# Note: The addition of default clientInfo to the ENR is a temporary
# measure to easily identify & debug the clients used in the testnet.
# Might make this into a, default off, cli option.
localEnrFields = {"c": enrClientInfoShort},
bootstrapRecords = bootstrapRecords,
previousRecord = previousEnr,
bindIp = bindIp,
bindPort = udpPort,
enrAutoUpdate = true,
config = discoveryConfig,
rng = rng,
)

d.open()

## Portal node setup
let
portalProtocolConfig = PortalProtocolConfig.init(
DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop, defaultAlpha, RadiusConfig(kind: Static, logRadius: 249),
defaultDisablePoke, defaultMaxGossipNodes, defaultContentCacheSize,
defaultDisableContentCache, defaultMaxConcurrentOffers
)

portalNodeConfig = PortalNodeConfig(
accumulatorFile: Opt.none(string),
disableStateRootValidation: true,
trustedBlockRoot: Opt.none(Digest),
portalConfig: portalProtocolConfig,
dataDir: string config.dataDir,
storageCapacity: 0,
contentRequestRetries: 1
)

node = PortalNode.new(
PortalNetwork.mainnet,
portalNodeConfig,
d,
{PortalSubnetwork.history},
bootstrapRecords = bootstrapRecords,
rng = rng,
)

let enrFile = config.dataDir / "nimbus_portal_node.enr"
if io2.writeFile(enrFile, d.localNode.record.toURI()).isErr:
fatal "Failed to write the enr file", file = enrFile
quit 1

## Start the Portal node.
node.start()

node

proc getBlockLoop(node: PortalNode, blockQueue: AsyncQueue[seq[EthBlock]], startBlock: uint64): Future[void] {.async.} =
let historyNetwork = node.historyNetwork.value()
var blockNumber = startBlock

let blockNumberQueue = newAsyncQueue[(uint64, uint64)](2048)
var blocks: seq[EthBlock] = newSeq[EthBlock](8192)
var count = 0

proc blockWorker(node: PortalNode): Future[void] {.async.} =
while true:
let (blockNumber, i) = await blockNumberQueue.popFirst()
while true:
let (header, body) = (await historyNetwork.getBlock(blockNumber + i)).valueOr:
error "Failed to get block", blockNumber = blockNumber + i
# Note: loop will get stuck here if a block is not available
continue

blocks[i] = init(EthBlock, header, body)
count.inc()

break

var workers: seq[Future[void]] = @[]
for i in 0 ..< 512:
workers.add node.blockWorker()

while true:
blocks = newSeq[EthBlock](8192)
count = 0
info "Downloading 8192 blocks", startBlock = blockNumber
for i in 0..8191'u64:
await blockNumberQueue.addLast((blockNumber, i))

# Not great :/
while count != 8192:
await sleepAsync(10.milliseconds)
info "Finished downloading 8192 blocks", startBlock = blockNumber
await blockQueue.addLast(blocks)

blockNumber += 8192

proc importBlocks*(conf: NimbusConf, com: CommonRef, node: PortalNode, blockQueue: AsyncQueue[seq[EthBlock]]) {.async.} =
proc controlCHandler() {.noconv.} =
when defined(windows):
# workaround for https://github.com/nim-lang/Nim/issues/4057
Expand Down Expand Up @@ -119,7 +286,7 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
boolFlag(NoPersistBodies, not conf.storeBodies) +
boolFlag({PersistBlockFlag.NoPersistReceipts}, not conf.storeReceipts) +
boolFlag({PersistBlockFlag.NoPersistSlotHashes}, not conf.storeSlotHashes)
blk: Block
blk: blocks.Block
persister = Persister.init(chain, flags)
cstats: PersistStats # stats at start of chunk

Expand Down Expand Up @@ -293,11 +460,19 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =

while running and persister.stats.blocks.uint64 < conf.maxBlocks and
blockNumber <= lastEra1Block:
if not loadEraBlock(blockNumber):
notice "No more `era1` blocks to import", blockNumber, slot
break
persistBlock()
checkpoint()
if not conf.usePortal:
if not loadEraBlock(blockNumber):
notice "No more `era1` blocks to import", blockNumber, slot
break
persistBlock()
checkpoint()
else:
let blockSeq = await blockQueue.popFirst()
for blck in blockSeq:
blk = blck
persistBlock()
checkpoint()
# debugEcho "blck:" & $blck.header.number

block era1Import:
if blockNumber > lastEra1Block:
Expand Down Expand Up @@ -366,3 +541,22 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
blocks = persister.stats.blocks,
txs = persister.stats.txs,
mgas = f(persister.stats.gas.float / 1000000)

proc importBlocksPortal*(conf: NimbusConf, com: CommonRef) {.
raises: [CatchableError]
.} =
let
portalNode = run(conf)
blockQueue = newAsyncQueue[seq[EthBlock]](4)
start = com.db.getSavedStateBlockNumber() + 1

if conf.usePortal:
asyncSpawn portalNode.getBlockLoop(blockQueue, start)

asyncSpawn importBlocks(conf, com, portalNode, blockQueue)

while running:
try:
poll()
except CatchableError as e:
warn "Exception in poll()", exc = e.name, err = e.msg

0 comments on commit 5fbe874

Please sign in to comment.