From 5fbe8747a30411042b62f56588500ec12e90ed40 Mon Sep 17 00:00:00 2001 From: kdeme <7857583+kdeme@users.noreply.github.com> Date: Fri, 10 Jan 2025 11:46:22 +0100 Subject: [PATCH] Quick and dirty PoC for syncing from Portal history network This PR is not intended to get merged. It is a very quick and dirty implementation with the intention of testing the Portal network and Fluffy code and to verify how long downloads of blocks take in comparison with the execution of them. It kind of abuses the current import from era code to do this. I think (?) in an improved version the block downloads should probably lead the implementation and trigger execution (it is a bit the reverse right now, which makes sense for era files). Perhaps that way the execution could even be offloaded to another thread? It is also coded without using the JSON-RPC API, as I found that easier for a quick version. But the getBlock call could be changed to use the json-rpc alternative. --- nimbus/config.nim | 7 + nimbus/nimbus_execution_client.nim | 2 +- nimbus/nimbus_import.nim | 212 +++++++++++++++++++++++++++-- 3 files changed, 211 insertions(+), 10 deletions(-) diff --git a/nimbus/config.nim b/nimbus/config.nim index a8548574aa..a9fb092989 100644 --- a/nimbus/config.nim +++ b/nimbus/config.nim @@ -550,12 +550,19 @@ type defaultValue: false name: "debug-store-slot-hashes".}: bool + usePortal* {. + hidden + desc: "Use portal network instead of era files" + defaultValue: false + name: "debug-use-portal".}: bool + of `import-rlp`: blocksFile* {. argument desc: "One or more RLP encoded block(s) files" name: "blocks-file" }: seq[InputFile] + func parseCmdArg(T: type NetworkId, p: string): T {.gcsafe, raises: [ValueError].} = parseBiggestUInt(p).T diff --git a/nimbus/nimbus_execution_client.nim b/nimbus/nimbus_execution_client.nim index 5a9ab94430..e1f94d08f6 100644 --- a/nimbus/nimbus_execution_client.nim +++ b/nimbus/nimbus_execution_client.nim @@ -257,7 +257,7 @@ proc run(nimbus: NimbusNode, conf: NimbusConf) = case conf.cmd of NimbusCmd.`import`: - importBlocks(conf, com) + importBlocksPortal(conf, com) of NimbusCmd.`import-rlp`: importRlpBlocks(conf, com) else: diff --git a/nimbus/nimbus_import.nim b/nimbus/nimbus_import.nim index 4de9fa377d..54ddee0587 100644 --- a/nimbus/nimbus_import.nim +++ b/nimbus/nimbus_import.nim @@ -13,7 +13,8 @@ import chronicles, metrics, chronos/timer, - std/[strformat, strutils], + chronos, + std/[strformat, strutils, os], stew/io2, beacon_chain/era_db, beacon_chain/networking/network_metadata, @@ -21,7 +22,16 @@ import ./common/common, ./core/chain, ./db/era1_db, - ./utils/era_helpers + ./utils/era_helpers, + eth/common/keys, # rng + eth/net/nat, # setupAddress + eth/p2p/discoveryv5/protocol as discv5_protocol, + eth/p2p/discoveryv5/routing_table, + eth/p2p/discoveryv5/enr, + ../fluffy/portal_node, + ../fluffy/common/common_utils, # getPersistentNetKey, getPersistentEnr + ../fluffy/network_metadata, + ../fluffy/version declareGauge nec_import_block_number, "Latest imported block number" @@ -87,7 +97,164 @@ template boolFlag(flags, b): PersistBlockFlags = else: {} -proc importBlocks*(conf: NimbusConf, com: CommonRef) = +proc run(config: NimbusConf): PortalNode {. + raises: [CatchableError] +.} = + let rng = newRng() + + ## Network configuration + let + bindIp = config.listenAddress + udpPort = Port(config.udpPort) + # TODO: allow for no TCP port mapping! + (extIp, _, extUdpPort) = + try: + setupAddress(config.nat, config.listenAddress, udpPort, udpPort, "portal") + except CatchableError as exc: + raiseAssert exc.msg + # raise exc # TODO: Ideally we don't have the Exception here + except Exception as exc: + raiseAssert exc.msg + (netkey, newNetKey) = + # if config.netKey.isSome(): + # (config.netKey.get(), true) + # else: + getPersistentNetKey(rng[], config.dataDir / "netkey") + + enrFilePath = config.dataDir / "nimbus_portal_node.enr" + previousEnr = + if not newNetKey: + getPersistentEnr(enrFilePath) + else: + Opt.none(enr.Record) + + var bootstrapRecords: seq[Record] + # loadBootstrapFile(string config.bootstrapNodesFile, bootstrapRecords) + # bootstrapRecords.add(config.bootstrapNodes) + + # case config.network + # of PortalNetwork.none: + # discard # don't connect to any network bootstrap nodes + # of PortalNetwork.mainnet: + # for enrURI in mainnetBootstrapNodes: + # let res = enr.Record.fromURI(enrURI) + # if res.isOk(): + # bootstrapRecords.add(res.value) + # of PortalNetwork.angelfood: + # for enrURI in angelfoodBootstrapNodes: + # let res = enr.Record.fromURI(enrURI) + # if res.isOk(): + # bootstrapRecords.add(res.value) + + # Only mainnet + for enrURI in mainnetBootstrapNodes: + let res = enr.Record.fromURI(enrURI) + if res.isOk(): + bootstrapRecords.add(res.value) + + ## Discovery v5 protocol setup + let + discoveryConfig = + DiscoveryConfig.init(DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop) + d = newProtocol( + netkey, + extIp, + Opt.none(Port), + extUdpPort, + # Note: The addition of default clientInfo to the ENR is a temporary + # measure to easily identify & debug the clients used in the testnet. + # Might make this into a, default off, cli option. + localEnrFields = {"c": enrClientInfoShort}, + bootstrapRecords = bootstrapRecords, + previousRecord = previousEnr, + bindIp = bindIp, + bindPort = udpPort, + enrAutoUpdate = true, + config = discoveryConfig, + rng = rng, + ) + + d.open() + + ## Portal node setup + let + portalProtocolConfig = PortalProtocolConfig.init( + DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop, defaultAlpha, RadiusConfig(kind: Static, logRadius: 249), + defaultDisablePoke, defaultMaxGossipNodes, defaultContentCacheSize, + defaultDisableContentCache, defaultMaxConcurrentOffers + ) + + portalNodeConfig = PortalNodeConfig( + accumulatorFile: Opt.none(string), + disableStateRootValidation: true, + trustedBlockRoot: Opt.none(Digest), + portalConfig: portalProtocolConfig, + dataDir: string config.dataDir, + storageCapacity: 0, + contentRequestRetries: 1 + ) + + node = PortalNode.new( + PortalNetwork.mainnet, + portalNodeConfig, + d, + {PortalSubnetwork.history}, + bootstrapRecords = bootstrapRecords, + rng = rng, + ) + + let enrFile = config.dataDir / "nimbus_portal_node.enr" + if io2.writeFile(enrFile, d.localNode.record.toURI()).isErr: + fatal "Failed to write the enr file", file = enrFile + quit 1 + + ## Start the Portal node. + node.start() + + node + +proc getBlockLoop(node: PortalNode, blockQueue: AsyncQueue[seq[EthBlock]], startBlock: uint64): Future[void] {.async.} = + let historyNetwork = node.historyNetwork.value() + var blockNumber = startBlock + + let blockNumberQueue = newAsyncQueue[(uint64, uint64)](2048) + var blocks: seq[EthBlock] = newSeq[EthBlock](8192) + var count = 0 + + proc blockWorker(node: PortalNode): Future[void] {.async.} = + while true: + let (blockNumber, i) = await blockNumberQueue.popFirst() + while true: + let (header, body) = (await historyNetwork.getBlock(blockNumber + i)).valueOr: + error "Failed to get block", blockNumber = blockNumber + i + # Note: loop will get stuck here if a block is not available + continue + + blocks[i] = init(EthBlock, header, body) + count.inc() + + break + + var workers: seq[Future[void]] = @[] + for i in 0 ..< 512: + workers.add node.blockWorker() + + while true: + blocks = newSeq[EthBlock](8192) + count = 0 + info "Downloading 8192 blocks", startBlock = blockNumber + for i in 0..8191'u64: + await blockNumberQueue.addLast((blockNumber, i)) + + # Not great :/ + while count != 8192: + await sleepAsync(10.milliseconds) + info "Finished downloading 8192 blocks", startBlock = blockNumber + await blockQueue.addLast(blocks) + + blockNumber += 8192 + +proc importBlocks*(conf: NimbusConf, com: CommonRef, node: PortalNode, blockQueue: AsyncQueue[seq[EthBlock]]) {.async.} = proc controlCHandler() {.noconv.} = when defined(windows): # workaround for https://github.com/nim-lang/Nim/issues/4057 @@ -119,7 +286,7 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = boolFlag(NoPersistBodies, not conf.storeBodies) + boolFlag({PersistBlockFlag.NoPersistReceipts}, not conf.storeReceipts) + boolFlag({PersistBlockFlag.NoPersistSlotHashes}, not conf.storeSlotHashes) - blk: Block + blk: blocks.Block persister = Persister.init(chain, flags) cstats: PersistStats # stats at start of chunk @@ -293,11 +460,19 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = while running and persister.stats.blocks.uint64 < conf.maxBlocks and blockNumber <= lastEra1Block: - if not loadEraBlock(blockNumber): - notice "No more `era1` blocks to import", blockNumber, slot - break - persistBlock() - checkpoint() + if not conf.usePortal: + if not loadEraBlock(blockNumber): + notice "No more `era1` blocks to import", blockNumber, slot + break + persistBlock() + checkpoint() + else: + let blockSeq = await blockQueue.popFirst() + for blck in blockSeq: + blk = blck + persistBlock() + checkpoint() + # debugEcho "blck:" & $blck.header.number block era1Import: if blockNumber > lastEra1Block: @@ -366,3 +541,22 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = blocks = persister.stats.blocks, txs = persister.stats.txs, mgas = f(persister.stats.gas.float / 1000000) + +proc importBlocksPortal*(conf: NimbusConf, com: CommonRef) {. + raises: [CatchableError] +.} = + let + portalNode = run(conf) + blockQueue = newAsyncQueue[seq[EthBlock]](4) + start = com.db.getSavedStateBlockNumber() + 1 + + if conf.usePortal: + asyncSpawn portalNode.getBlockLoop(blockQueue, start) + + asyncSpawn importBlocks(conf, com, portalNode, blockQueue) + + while running: + try: + poll() + except CatchableError as e: + warn "Exception in poll()", exc = e.name, err = e.msg