diff --git a/.gitignore b/.gitignore index 1b4710c292..2d8496b544 100644 --- a/.gitignore +++ b/.gitignore @@ -51,3 +51,5 @@ tests/fixtures/eest_static tests/fixtures/eest_stable tests/fixtures/eest_develop tests/fixtures/eest_devnet + +execution_chain/nimbus diff --git a/Makefile b/Makefile index 578291fa82..76a7e01f3b 100644 --- a/Makefile +++ b/Makefile @@ -218,8 +218,9 @@ nimbus_execution_client: | build deps rocksdb check_revision: nimbus_execution_client scripts/check_revision.sh -nimbus: nimbus_execution_client - echo "The nimbus target is deprecated and will soon change meaning, use 'nimbus_execution_client' instead" +nimbus: | build deps rocksdb + echo -e $(BUILD_MSG) "build/nimbus" && \ + $(ENV_SCRIPT) nim c $(NIM_PARAMS) -d:chronicles_log_level=TRACE -o:build/nimbus "execution_chain/nimbus.nim" # symlink nimbus.nims: diff --git a/execution_chain/beacon/api_handler/api_newpayload.nim b/execution_chain/beacon/api_handler/api_newpayload.nim index 5abd0b11f7..6668ca6d49 100644 --- a/execution_chain/beacon/api_handler/api_newpayload.nim +++ b/execution_chain/beacon/api_handler/api_newpayload.nim @@ -178,7 +178,7 @@ proc newPayload*(ben: BeaconEngineRef, # If we already have the block locally, ignore the entire execution and just # return a fake success. if chain.haveBlockAndState(blockHash): - notice "Ignoring already known beacon payload", + debug "Ignoring already known beacon payload", number = header.number, hash = blockHash.short return validStatus(blockHash) diff --git a/execution_chain/config.nim b/execution_chain/config.nim index 76a941c53f..64f75d515f 100644 --- a/execution_chain/config.nim +++ b/execution_chain/config.nim @@ -51,7 +51,7 @@ func getLogLevels(): string = join(logLevels, ", ") const - defaultPort = 30303 + defaultExecutionPort* = 30303 defaultMetricsServerPort = 9093 defaultHttpPort = 8545 # https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.4/src/engine/authentication.md#jwt-specifications @@ -242,8 +242,8 @@ type tcpPort* {. desc: "Ethereum P2P network listening TCP port" - defaultValue: defaultPort - defaultValueDesc: $defaultPort + defaultValue: defaultExecutionPort + defaultValueDesc: $defaultExecutionPort name: "tcp-port" }: Port udpPort* {. @@ -474,6 +474,12 @@ type defaultValueDesc: "\"jwt.hex\" in the data directory (see --data-dir)" name: "jwt-secret" .}: Option[InputFile] + jwtSecretValue* {. + hidden + desc: "Hex string with jwt secret" + defaultValueDesc: "\"jwt.hex\" in the data directory (see --data-dir)" + name: "debug-jwt-secret-value" .}: Option[string] + beaconSyncTarget* {. hidden desc: "Manually set the initial sync target specified by its 32 byte" & diff --git a/execution_chain/el_sync.nim b/execution_chain/el_sync.nim new file mode 100644 index 0000000000..66f33b9910 --- /dev/null +++ b/execution_chain/el_sync.nim @@ -0,0 +1,208 @@ +# Nimbus +# Copyright (c) 2024-2025 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +## Consensus to execution syncer prototype based on nrpc + +{.push raises: [].} + +import + chronos, + chronicles, + web3, + web3/[engine_api, primitives, conversions], + beacon_chain/consensus_object_pools/blockchain_dag, + beacon_chain/el/[el_manager, engine_api_conversions], + beacon_chain/spec/[forks, presets, state_transition_block] + +logScope: + topics = "elsync" + +proc getForkedBlock(dag: ChainDAGRef, slot: Slot): Opt[ForkedTrustedSignedBeaconBlock] = + let bsi = ?dag.getBlockIdAtSlot(slot) + if bsi.isProposed(): + dag.getForkedBlock(bsi.bid) + else: + Opt.none(ForkedTrustedSignedBeaconBlock) + +proc blockNumber(blck: ForkedTrustedSignedBeaconBlock): uint64 = + withBlck(blck): + when consensusFork >= ConsensusFork.Bellatrix and + consensusFork < ConsensusFork.Gloas: + forkyBlck.message.body.execution_payload.block_number + else: + 0'u64 + +# Load the network configuration based on the network id +proc loadNetworkConfig(cfg: RuntimeConfig): (uint64, uint64) = + case cfg.CONFIG_NAME + of "mainnet": + (15537393'u64, 4700013'u64) + of "sepolia": + (1450408'u64, 115193'u64) + of "holesky", "hoodi": + (0'u64, 0'u64) + else: + notice "Loading custom network, assuming post-merge" + (0'u64, 0'u64) + +# Slot Finding Mechanism +# First it sets the initial lower bound to `firstSlotAfterMerge` + number of blocks after Era1 +# Then it iterates over the slots to find the current slot number, along with reducing the +# search space by calculating the difference between the `blockNumber` and the `block_number` from the executionPayload +# of the slot, then adding the difference to the importedSlot. This pushes the lower bound more, +# making the search way smaller +proc findSlot( + dag: ChainDAGRef, + elBlockNumber: uint64, + lastEra1Block: uint64, + firstSlotAfterMerge: uint64, +): Opt[uint64] = + var importedSlot = (elBlockNumber - lastEra1Block) + firstSlotAfterMerge + 1 + debug "Finding slot number corresponding to block", elBlockNumber, importedSlot + + var clNum = 0'u64 + while clNum < elBlockNumber: + # Check if we can get the block id - if not, this part of the chain is not + # available from the CL + let bsi = ?dag.getBlockIdAtSlot(Slot(importedSlot)) + + if not bsi.isProposed: + importedSlot += 1 + continue # Empty slot + + let blck = dag.getForkedBlock(bsi.bid).valueOr: + return # Block unavailable + + clNum = blck.blockNumber + # on the first iteration, the arithmetic helps skip the gap that has built + # up due to empty slots - for all subsequent iterations, except the last, + # we'll go one step at a time + # iteration so that we don't start at "one slot early" + importedSlot += max(elBlockNumber - clNum, 1) + + Opt.some importedSlot + +proc syncToEngineApi*(dag: ChainDAGRef, url: EngineApiUrl) {.async.} = + # Takes blocks from the CL and sends them to the EL - the attempt is made + # optimistically until something unexpected happens (reorg etc) at which point + # the process ends + + let + # Create the client for the engine api + # And exchange the capabilities for a test communication + web3 = await url.newWeb3() + rpcClient = web3.provider + (lastEra1Block, firstSlotAfterMerge) = dag.cfg.loadNetworkConfig() + + defer: + try: + await web3.close() + except: + discard + + # Load the EL state detials and create the beaconAPI client + var elBlockNumber = uint64(await rpcClient.eth_blockNumber()) + + # Check for pre-merge situation + if elBlockNumber <= lastEra1Block: + debug "EL still pre-merge, no EL sync", + blocknumber = elBlockNumber, lastPoWBlock = lastEra1Block + return + + # Load the latest state from the CL + var clBlockNumber = dag.getForkedBlock(dag.head.slot).expect("head block").blockNumber + + # Check if the EL is already in sync or about to become so (ie processing a + # payload already, most likely) + if clBlockNumber in [elBlockNumber, elBlockNumber + 1]: + debug "EL in sync (or almost)", clBlockNumber, elBlockNumber + return + + if clBlockNumber < elBlockNumber: + # This happens often during initial sync when the light client information + # allows the EL to sync ahead of the CL head - it can also happen during + # reorgs + debug "CL is behind EL, not activating", clBlockNumber, elBlockNumber + return + + var importedSlot = findSlot(dag, elBlockNumber, lastEra1Block, firstSlotAfterMerge).valueOr: + debug "Missing slot information for sync", elBlockNumber + return + + notice "Found initial slot for EL sync", importedSlot, elBlockNumber, clBlockNumber + + while elBlockNumber < clBlockNumber: + var isAvailable = false + let curBlck = dag.getForkedBlock(Slot(importedSlot)).valueOr: + importedSlot += 1 + continue + importedSlot += 1 + let payloadResponse = withBlck(curBlck): + # Don't include blocks before bellatrix, as it doesn't have payload + when consensusFork >= ConsensusFork.Gloas: + break + elif consensusFork >= ConsensusFork.Bellatrix: + # Load the execution payload for all blocks after the bellatrix upgrade + let payload = + forkyBlck.message.body.execution_payload.asEngineExecutionPayload() + + debug "Sending payload", payload + + when consensusFork >= ConsensusFork.Electra: + let + # Calculate the versioned hashes from the kzg commitments + versioned_hashes = + forkyBlck.message.body.blob_kzg_commitments.asEngineVersionedHashes() + # Execution Requests for Electra + execution_requests = + forkyBlck.message.body.execution_requests.asEngineExecutionRequests() + + await rpcClient.engine_newPayloadV4( + payload, + versioned_hashes, + forkyBlck.message.parent_root.to(Hash32), + execution_requests, + ) + elif consensusFork >= ConsensusFork.Deneb: + # Calculate the versioned hashes from the kzg commitments + let versioned_hashes = + forkyBlck.message.body.blob_kzg_commitments.asEngineVersionedHashes() + await rpcClient.engine_newPayloadV3( + payload, versioned_hashes, forkyBlck.message.parent_root.to(Hash32) + ) + elif consensusFork >= ConsensusFork.Capella: + await rpcClient.engine_newPayloadV2(payload) + else: + await rpcClient.engine_newPayloadV1(payload) + else: + return + + if payloadResponse.status != PayloadExecutionStatus.valid: + if payloadResponse.status notin + [PayloadExecutionStatus.syncing, PayloadExecutionStatus.accepted]: + # This would be highly unusual since it would imply a CL-valid but + # EL-invalid block.. + warn "Payload invalid", + elBlockNumber, status = payloadResponse.status, curBlck = shortLog(curBlck) + return + + debug "newPayload accepted", elBlockNumber, response = payloadResponse.status + + elBlockNumber += 1 + + if elBlockNumber mod 1024 == 0: + let curElBlock = uint64(await rpcClient.eth_blockNumber()) + if curElBlock != elBlockNumber: + # If the EL starts syncing on its own, faster than we can feed it blocks + # from here, it'll run ahead and we can stop this remote-drive attempt + # TODO this happens because el-sync competes with the regular devp2p sync + # when in fact it could be collaborating such that we don't do + # redundant work + debug "EL out of sync with EL syncer", curElBlock, elBlockNumber + return diff --git a/execution_chain/nimbus.nim b/execution_chain/nimbus.nim new file mode 100644 index 0000000000..bb2110e818 --- /dev/null +++ b/execution_chain/nimbus.nim @@ -0,0 +1,390 @@ +# nimbus +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms + +import ../execution_chain/compile_info + +import web3/primitives, chronos + +proc workaround*(): int {.exportc.} = + # TODO https://github.com/nim-lang/Nim/issues/24844 + return int(Future[Quantity]().internalValue) + +import + std/[os, net, options, typetraits], + chronos/threadsync, + chronicles, + metrics, + nimcrypto/sysrand, + eth/enr/enr, + eth/net/nat, + ./constants, + ./nimbus_desc, + ./rpc/jwt_auth, + std/terminal, + metrics/chronos_httpserver, + stew/io2, + eth/p2p/discoveryv5/random2, + beacon_chain/spec/[engine_authentication], + beacon_chain/validators/keystore_management, + beacon_chain/[buildinfo, conf, beacon_node, nimbus_binary_common, process_state], + beacon_chain/nimbus_beacon_node, + ./[el_sync, nimbus_execution_client] + +const + defaultMetricsServerPort = 8008 + copyright = "Copyright (c) " & compileYear & " Status Research & Development GmbH" + +type NStartUpCmd* {.pure.} = enum + noCommand + beaconNode + executionClient + +#!fmt: off +type + XNimbusConf = object + configFile* {. + desc: "Loads the configuration from a TOML file", + name: "config-file" + .}: Option[InputFile] + + logLevel* {. + desc: + "Sets the log level for process and topics (e.g. \"DEBUG; TRACE:discv5,libp2p; REQUIRED:none; DISABLED:none\")", + defaultValue: "INFO", + name: "log-level" + .}: string + + logStdout* {. + hidden, + desc: + "Specifies what kind of logs should be written to stdout (auto, colors, nocolors, json)", + defaultValueDesc: "auto", + defaultValue: StdoutLogKind.Auto, + name: "log-format" + .}: StdoutLogKind + + eth2Network* {. + desc: "The Eth2 network to join", defaultValueDesc: "mainnet", name: "network" + .}: Option[string] + + dataDirFlag* {. + desc: "The directory where nimbus will store all blockchain data", + defaultValueDesc: defaultDataDir("", ""), + abbr: "d", + name: "data-dir" + .}: Option[OutDir] + + metricsEnabled* {. + desc: "Enable the built-in metrics HTTP server", + defaultValue: false, + name: "metrics" + .}: bool + + metricsPort* {. + desc: "Listening port of the built-in metrics HTTP server", + defaultValue: defaultMetricsServerPort, + defaultValueDesc: $defaultMetricsServerPort, + name: "metrics-port" + .}: Port + + metricsAddress* {. + desc: "Listening IP address of the built-in metrics HTTP server", + defaultValue: defaultAdminListenAddress, + defaultValueDesc: $defaultAdminListenAddressDesc, + name: "metrics-address" + .}: IpAddress + + numThreads* {. + defaultValue: 0, + desc: "Number of worker threads (\"0\" = use as many threads as there are CPU cores available)" + name: "num-threads" .}: int + + # TODO beacon and execution engine must run on different ports - in order + # to keep compatibility with `--tcp-port` that is used in both, use + # consecutive ports unless specific ports are set - to be evaluated + executionTcpPort* {. + desc: "Listening TCP port for Ethereum DevP2P traffic" + name: "execution-tcp-port" .}: Option[Port] + + executionUdpPort* {. + desc: "Listening UDP port for execution node discovery" + name: "execution-udp-port" .}: Option[Port] + + beaconTcpPort* {. + desc: "Listening TCP port for Ethereum DevP2P traffic" + name: "beacon-tcp-port" .}: Option[Port] + + beaconUdpPort* {. + desc: "Listening UDP port for execution node discovery" + name: "beacon-udp-port" .}: Option[Port] + + tcpPort* {. + desc: "Listening TCP port for Ethereum traffic - tcp-port and tcp-port+1 will be used if set" + name: "tcp-port" .}: Option[Port] + + udpPort* {. + desc: "Listening UDP port for node discovery - udp-port and udp-port+1 will be used if set" + name: "udp-port" .}: Option[Port] + + elSync* {. + desc: "Turn on CL-driven sync of the EL, for syncing execution blocks from the consensus network" + defaultValue: true + name: "el-sync" .}: bool + + case cmd* {.command, defaultValue: NStartUpCmd.noCommand.}: NStartUpCmd + of noCommand: + discard + of beaconNode: + discard + of executionClient: + discard + +#!fmt: on + +type + BeaconThreadConfig = object + tsp: ThreadSignalPtr + tcpPort: Port + udpPort: Port + elSync: bool + + ExecutionThreadConfig = object + tsp: ThreadSignalPtr + tcpPort: Port + udpPort: Port + +var jwtKey: JwtSharedKey + +proc dataDir*(config: XNimbusConf): string = + string config.dataDirFlag.get( + OutDir defaultDataDir("", config.eth2Network.loadEth2Network().cfg.name) + ) + +proc justWait(tsp: ThreadSignalPtr) {.async: (raises: [CancelledError]).} = + try: + await tsp.wait() + except AsyncError as exc: + notice "Waiting failed", err = exc.msg + +proc elSyncLoop( + dag: ChainDAGRef, url: EngineApiUrl +) {.async: (raises: [CancelledError]).} = + while true: + await sleepAsync(12.seconds) + + # TODO trigger only when the EL needs syncing + try: + await syncToEngineApi(dag, url) + except CatchableError as exc: + notice "oops", err = exc.msg + +proc runBeaconNode(p: BeaconThreadConfig) {.thread.} = + var config = BeaconNodeConf.loadWithBanners(clientId, copyright, [specBanner], true).valueOr: + stderr.writeLine error # Logging not yet set up + quit QuitFailure + + let rng = HmacDrbgContext.new() + + let engineUrl = + EngineApiUrl.init("http://127.0.0.1:8551/", Opt.some(@(distinctBase(jwtKey)))) + + config.metricsEnabled = false + config.elUrls = + @[ + EngineApiUrlConfigValue( + url: engineUrl.url, jwtSecret: some toHex(distinctBase(jwtKey)) + ) + ] + config.statusBarEnabled = false # Multi-threading issues due to logging + config.tcpPort = p.tcpPort + config.udpPort = p.udpPort + + # TODO https://github.com/status-im/nim-taskpools/issues/6 + # share taskpool between bn and ec + let taskpool = setupTaskpool(config.numThreads) + + info "Launching beacon node", + version = fullVersionStr, + bls_backend = $BLS_BACKEND, + const_preset, + cmdParams = commandLineParams(), + config, + numThreads = taskpool.numThreads + + config.createDumpDirs() + + let metadata = config.loadEth2Network() + + # Updating the config based on the metadata certainly is not beautiful but it + # works + for node in metadata.bootstrapNodes: + config.bootstrapNodes.add node + + block: + let res = + if config.trustedSetupFile.isNone: + conf.loadKzgTrustedSetup() + else: + conf.loadKzgTrustedSetup(config.trustedSetupFile.get) + if res.isErr(): + raiseAssert res.error() + + let stopper = p.tsp.justWait() + + if stopper.finished(): + return + + let node = waitFor BeaconNode.init(rng, config, metadata, taskpool) + + if stopper.finished(): + return + + if p.elSync: + discard elSyncLoop(node.dag, engineUrl) + + dynamicLogScope(comp = "bn"): + if node.nickname != "": + dynamicLogScope(node = node.nickname): + node.run(stopper) + else: + node.run(stopper) + +proc runExecutionClient(p: ExecutionThreadConfig) {.thread.} = + var config = makeConfig(ignoreUnknown = true) + config.metricsEnabled = false + config.engineApiEnabled = true + config.jwtSecretValue = some toHex(distinctBase(jwtKey)) + config.agentString = "nimbus" + config.tcpPort = p.tcpPort + config.udpPort = p.udpPort + + # TODO https://github.com/status-im/nim-taskpools/issues/6 + # share taskpool between bn and ec + let + taskpool = setupTaskpool(int config.numThreads) + com = setupCommonRef(config, taskpool) + + {.gcsafe.}: + dynamicLogScope(comp = "ec"): + nimbus_execution_client.runExeClient(config, com, p.tsp.justWait()) + +# noinline to keep it in stack traces +proc main() {.noinline, raises: [CatchableError].} = + var + params = commandLineParams() + isEC = false + isBN = false + for i in 0 ..< params.len: + try: + discard NimbusCmd.parseCmdArg(params[i]) + isEC = true + params.delete(i) + break + except ValueError: + discard + try: + discard BNStartUpCmd.parseCmdArg(params[i]) + isBN = true + params.delete(i) + break + except ValueError: + discard + + try: + let cmd = NStartUpCmd.parseCmdArg(params[i]) + + if cmd == NStartUpCmd.beaconNode: + isBN = true + params.delete(i) + break + + if cmd == NStartUpCmd.executionClient: + isEC = true + params.delete(i) + break + except ValueError: + discard + + if isBN: + nimbus_beacon_node.main() + elif isEC: + nimbus_execution_client.main() + else: + # Make sure the default nim handlers don't run in any thread + ProcessState.setupStopHandlers() + + # Make it harder to connect to the (internal) engine - this will of course + # go away + discard randomBytes(distinctBase(jwtKey)) + + const banner = "Nimbus v0.0.1" + + var config = XNimbusConf.loadWithBanners(banner, copyright, [specBanner], true).valueOr: + writePanicLine error # Logging not yet set up + quit QuitFailure + + setupLogging(config.logLevel, config.logStdout, none OutFile) + setupFileLimits() + + if not (checkAndCreateDataDir(string(config.dataDir))): + # We are unable to access/create data folder or data folder's + # permissions are insecure. + quit QuitFailure + + let metricsServer = (waitFor config.initMetricsServer()).valueOr: + quit 1 + + # Nim GC metrics (for the main thread) will be collected in onSecond(), but + # we disable piggy-backing on other metrics here. + setSystemMetricsAutomaticUpdate(false) + + var bnThread: Thread[BeaconThreadConfig] + let bnStop = ThreadSignalPtr.new().expect("working ThreadSignalPtr") + createThread( + bnThread, + runBeaconNode, + BeaconThreadConfig( + tsp: bnStop, + tcpPort: config.beaconTcpPort.get(config.tcpPort.get(Port defaultEth2TcpPort)), + udpPort: config.beaconUdpPort.get(config.udpPort.get(Port defaultEth2TcpPort)), + elSync: config.elSync, + ), + ) + + var ecThread: Thread[ExecutionThreadConfig] + let ecStop = ThreadSignalPtr.new().expect("working ThreadSignalPtr") + createThread( + ecThread, + runExecutionClient, + ExecutionThreadConfig( + tsp: ecStop, + tcpPort: + # -1/+1 to make sure global default is respected but +1 is applied to --tcp-port + config.executionTcpPort.get( + Port(uint16(config.tcpPort.get(Port(defaultExecutionPort - 1))) + 1) + ), + udpPort: config.executionUdpPort.get( + Port(uint16(config.udpPort.get(Port(defaultExecutionPort - 1))) + 1) + ), + ), + ) + + while not ProcessState.stopIt(notice("Shutting down", reason = it)): + os.sleep(100) + + waitFor bnStop.fire() + waitFor ecStop.fire() + + joinThread(bnThread) + joinThread(ecThread) + + waitFor metricsServer.stopMetricsServer() + +when isMainModule: + main() diff --git a/execution_chain/nimbus.nim.cfg b/execution_chain/nimbus.nim.cfg new file mode 100644 index 0000000000..29daec5fd7 --- /dev/null +++ b/execution_chain/nimbus.nim.cfg @@ -0,0 +1,8 @@ +-d:"chronicles_sinks=textlines[dynamic],json[dynamic]" +-d:"chronicles_runtime_filtering=on" +-d:"chronicles_thread_ids=no" +-d:"libp2p_pki_schemes=secp256k1" + +@if release: + -d:"chronicles_line_numbers:0" +@end diff --git a/execution_chain/rpc/jwt_auth.nim b/execution_chain/rpc/jwt_auth.nim index 1ba66cd9d7..e882f688ea 100644 --- a/execution_chain/rpc/jwt_auth.nim +++ b/execution_chain/rpc/jwt_auth.nim @@ -209,6 +209,13 @@ proc jwtSharedSecret*( # startup, or show error and continue without exposing the authenticated # port. # + if config.jwtSecretValue.isSome(): + var key: JwtSharedKey + let rc = key.fromHex(config.jwtSecretValue.get()) + if rc.isErr: + return err(rc.error) + return ok(key) + var jwtSecretPath = config.dataDir / jwtSecretFile # default path let jwtDoesNotExist = not fileExists(jwtSecretPath) if config.jwtSecret.isNone and jwtDoesNotExist: