Skip to content

Commit

Permalink
Deneb support
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsproul committed Jan 18, 2024
1 parent f164b02 commit bfbf34c
Show file tree
Hide file tree
Showing 8 changed files with 2,138 additions and 2,887 deletions.
4,793 changes: 1,975 additions & 2,818 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 4 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ version = "0.1.0"
edition = "2021"

[dependencies]
execution_layer = { git = "https://github.com/michaelsproul/lighthouse", rev = "c144ae391edc938dbc5e185a99df78b8c8cb1c76" }
execution_layer = { git = "https://github.com/sigp/lighthouse", tag = "v4.6.0-rc.0" }
tree_hash = "0.5.1"
tree_hash_derive = "0.5.1"
task_executor = { git = "https://github.com/michaelsproul/lighthouse", rev = "c144ae391edc938dbc5e185a99df78b8c8cb1c76" }
eth2_network_config = { git = "https://github.com/michaelsproul/lighthouse", rev = "c144ae391edc938dbc5e185a99df78b8c8cb1c76" }
eth2 = { git = "https://github.com/michaelsproul/lighthouse", rev = "c144ae391edc938dbc5e185a99df78b8c8cb1c76" }
task_executor = { git = "https://github.com/sigp/lighthouse", tag = "v4.6.0-rc.0" }
eth2_network_config = { git = "https://github.com/sigp/lighthouse", tag = "v4.6.0-rc.0" }
eth2 = { git = "https://github.com/sigp/lighthouse", tag = "v4.6.0-rc.0" }
ethereum_serde_utils = "0.5.1"
tokio = { version = "1.0.0", features = ["rt-multi-thread"] }
axum = { version = "0.6.10", features = ["headers"] }
Expand All @@ -32,14 +32,3 @@ hmac = "0.12.1"
sha2 = "0.10.7"
toml = "0.8.0"
hex = "0.4.3"

[patch]
[patch.crates-io]
arbitrary = { git = "https://github.com/michaelsproul/arbitrary", rev="f002b99989b561ddce62e4cf2887b0f8860ae991" }

[patch."https://github.com/ralexstokes/mev-rs"]
mev-rs = { git = "https://github.com/ralexstokes//mev-rs", rev = "7813d4a4a564e0754e9aaab2d95520ba437c3889" }
[patch."https://github.com/ralexstokes/ethereum-consensus"]
ethereum-consensus = { git = "https://github.com/ralexstokes//ethereum-consensus", rev = "9b0ee0a8a45b968c8df5e7e64ea1c094e16f053d" }
[patch."https://github.com/ralexstokes/ssz-rs"]
ssz-rs = { git = "https://github.com/ralexstokes//ssz-rs", rev = "adf1a0b14cef90b9536f28ef89da1fab316465e1" }
34 changes: 15 additions & 19 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::{
config::Config,
jwt::{jwt_secret_from_path, verify_single_token, KeyCollection, Secret},
multiplexer::Multiplexer,
transition_config::handle_transition_config,
types::{
ErrorResponse, MaybeErrorResponse, Request, Requests, Response, Responses, TaskExecutor,
},
Expand All @@ -18,10 +17,10 @@ use axum::{
use clap::Parser;
use eth2::types::MainnetEthSpec;
use execution_layer::http::{
ENGINE_EXCHANGE_CAPABILITIES, ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1,
ENGINE_FORKCHOICE_UPDATED_V1, ENGINE_FORKCHOICE_UPDATED_V2,
ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1, ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1,
ENGINE_GET_PAYLOAD_V1, ENGINE_GET_PAYLOAD_V2, ENGINE_NEW_PAYLOAD_V1, ENGINE_NEW_PAYLOAD_V2,
ENGINE_EXCHANGE_CAPABILITIES, ENGINE_FORKCHOICE_UPDATED_V1, ENGINE_FORKCHOICE_UPDATED_V2,
ENGINE_FORKCHOICE_UPDATED_V3, ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1,
ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1, ENGINE_GET_PAYLOAD_V1, ENGINE_GET_PAYLOAD_V2,
ENGINE_GET_PAYLOAD_V3, ENGINE_NEW_PAYLOAD_V1, ENGINE_NEW_PAYLOAD_V2, ENGINE_NEW_PAYLOAD_V3,
ETH_SYNCING,
};
use futures::channel::mpsc::channel;
Expand All @@ -39,7 +38,6 @@ mod meta;
mod multiplexer;
mod new_payload;
mod payload_builder;
mod transition_config;
mod types;

// TODO: allow other specs
Expand All @@ -61,7 +59,7 @@ async fn main() {
let listen_port = config.listen_port;
let controller_jwt_secret = jwt_secret_from_path(&config.controller_jwt_secret).unwrap();
let client_jwt_collection = KeyCollection::load(&config.client_jwt_secrets).unwrap();
let multiplexer = Multiplexer::<E>::new(config, executor, log).unwrap();
let multiplexer = Multiplexer::<E>::new(config, executor, log).await.unwrap();
let app_state = Arc::new(AppState {
controller_jwt_secret,
client_jwt_collection,
Expand Down Expand Up @@ -146,13 +144,12 @@ async fn process_client_request(
request: Request,
) -> Result<Response, ErrorResponse> {
match request.method.as_str() {
ENGINE_FORKCHOICE_UPDATED_V1 | ENGINE_FORKCHOICE_UPDATED_V2 => {
multiplexer.handle_fcu(request).await
}
ENGINE_NEW_PAYLOAD_V1 | ENGINE_NEW_PAYLOAD_V2 => {
ENGINE_FORKCHOICE_UPDATED_V1
| ENGINE_FORKCHOICE_UPDATED_V2
| ENGINE_FORKCHOICE_UPDATED_V3 => multiplexer.handle_fcu(request).await,
ENGINE_NEW_PAYLOAD_V1 | ENGINE_NEW_PAYLOAD_V2 | ENGINE_NEW_PAYLOAD_V3 => {
multiplexer.handle_new_payload(request).await
}
ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1 => handle_transition_config(request).await,
ETH_SYNCING => multiplexer.handle_syncing(request).await,
"eth_chainId" => multiplexer.handle_chain_id(request).await,
ENGINE_EXCHANGE_CAPABILITIES => multiplexer.handle_engine_capabilities(request).await,
Expand All @@ -163,7 +160,7 @@ async fn process_client_request(
| "eth_blockNumber"
| ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1
| ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1 => multiplexer.proxy_directly(request).await,
ENGINE_GET_PAYLOAD_V1 | ENGINE_GET_PAYLOAD_V2 => {
ENGINE_GET_PAYLOAD_V1 | ENGINE_GET_PAYLOAD_V2 | ENGINE_GET_PAYLOAD_V3 => {
multiplexer.handle_get_payload(request).await
}
method => Err(ErrorResponse::unsupported_method(request.id, method)),
Expand Down Expand Up @@ -194,13 +191,12 @@ async fn handle_controller_json_rpc(
.map_err(|e| ErrorResponse::parse_error_generic(serde_json::json!(0), e.body_text()))?;

match request.method.as_str() {
ENGINE_FORKCHOICE_UPDATED_V1 | ENGINE_FORKCHOICE_UPDATED_V2 => {
multiplexer.handle_controller_fcu(request).await
}
ENGINE_NEW_PAYLOAD_V1 | ENGINE_NEW_PAYLOAD_V2 => {
ENGINE_FORKCHOICE_UPDATED_V1
| ENGINE_FORKCHOICE_UPDATED_V2
| ENGINE_FORKCHOICE_UPDATED_V3 => multiplexer.handle_controller_fcu(request).await,
ENGINE_NEW_PAYLOAD_V1 | ENGINE_NEW_PAYLOAD_V2 | ENGINE_NEW_PAYLOAD_V3 => {
multiplexer.handle_controller_new_payload(request).await
}
ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1 => handle_transition_config(request).await,
ETH_SYNCING => multiplexer.handle_syncing(request).await,
"eth_chainId" => multiplexer.handle_chain_id(request).await,
ENGINE_EXCHANGE_CAPABILITIES => multiplexer.handle_engine_capabilities(request).await,
Expand All @@ -211,7 +207,7 @@ async fn handle_controller_json_rpc(
| "eth_blockNumber"
| ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1
| ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1 => multiplexer.proxy_directly(request).await,
ENGINE_GET_PAYLOAD_V1 | ENGINE_GET_PAYLOAD_V2 => {
ENGINE_GET_PAYLOAD_V1 | ENGINE_GET_PAYLOAD_V2 | ENGINE_GET_PAYLOAD_V3 => {
multiplexer.handle_get_payload(request).await
}
method => Err(ErrorResponse::unsupported_method(request.id, method)),
Expand Down
11 changes: 9 additions & 2 deletions src/multiplexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::marker::PhantomData;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
use tokio::sync::Mutex;

pub struct Multiplexer<E: EthSpec> {
Expand All @@ -36,7 +37,7 @@ pub struct NewPayloadCacheEntry {
}

impl<E: EthSpec> Multiplexer<E> {
pub fn new(config: Config, executor: TaskExecutor, log: Logger) -> Result<Self, String> {
pub async fn new(config: Config, executor: TaskExecutor, log: Logger) -> Result<Self, String> {
let engine: Engine = {
let jwt_secret_path = PathBuf::from(&config.ee_jwt_secret);
let jwt_id = Some("eleel".to_string());
Expand Down Expand Up @@ -74,7 +75,13 @@ impl<E: EthSpec> Multiplexer<E> {

// Derived values.
let spec = config.network.network.chain_spec::<E>()?;
let genesis_state = config.network.network.beacon_state::<E>()?;
let genesis_state_timeout = Duration::from_secs(180);
let genesis_state = config
.network
.network
.genesis_state::<E>(None, genesis_state_timeout, &log)
.await?
.ok_or("no genesis state")?;
let genesis_time = genesis_state.genesis_time();

Ok(Self {
Expand Down
92 changes: 77 additions & 15 deletions src/new_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@ use crate::{
multiplexer::{Multiplexer, NewPayloadCacheEntry},
types::{
ErrorResponse, JsonExecutionPayload, JsonPayloadStatusV1, JsonPayloadStatusV1Status,
JsonValue, QuantityU64, Request, Response,
JsonValue, NewPayloadRequest, NewPayloadRequestCapella, NewPayloadRequestDeneb,
NewPayloadRequestMerge, QuantityU64, Request, Response,
},
};
use eth2::types::{EthSpec, ExecutionBlockHash, ExecutionPayload, ForkName, Slot};
use execution_layer::{http::ENGINE_NEW_PAYLOAD_V1, ExecutionLayer};
use eth2::types::{
EthSpec, ExecutionBlockHash, ExecutionPayload, ForkName, Hash256, Slot, VersionedHash,
};
use execution_layer::{
http::{ENGINE_NEW_PAYLOAD_V1, ENGINE_NEW_PAYLOAD_V2, ENGINE_NEW_PAYLOAD_V3},
ExecutionLayer,
};
use std::time::{Duration, Instant};

impl<E: EthSpec> Multiplexer<E> {
Expand All @@ -16,17 +22,35 @@ impl<E: EthSpec> Multiplexer<E> {
request: Request,
) -> Result<Response, ErrorResponse> {
tracing::info!("processing payload from controller");
let (id, json_execution_payload) = self.decode_execution_payload(request)?;
let (id, json_execution_payload, versioned_hashes, parent_beacon_block_root) =
self.decode_new_payload(request)?;

let block_hash = *json_execution_payload.block_hash();
let block_number = *json_execution_payload.block_number();

let execution_payload = ExecutionPayload::from(json_execution_payload);
let new_payload_request = match execution_payload.clone() {
ExecutionPayload::Merge(execution_payload) => {
NewPayloadRequest::Merge(NewPayloadRequestMerge { execution_payload })
}
ExecutionPayload::Capella(execution_payload) => {
NewPayloadRequest::Capella(NewPayloadRequestCapella { execution_payload })
}
ExecutionPayload::Deneb(execution_payload) => {
// TODO: error here if versioned hashes or parent root are None
NewPayloadRequest::Deneb(NewPayloadRequestDeneb {
execution_payload,
versioned_hashes: versioned_hashes.unwrap_or_default(),
parent_beacon_block_root: parent_beacon_block_root.unwrap_or_default(),
})
}
};

let status = if let Some(status) = self.get_cached_payload_status(&block_hash, true).await {
status
} else {
// Send payload to the real EL.
let execution_payload = ExecutionPayload::from(json_execution_payload);
match self.engine.api.new_payload(execution_payload.clone()).await {
match self.engine.api.new_payload(new_payload_request).await {
Ok(status) => {
let json_status = JsonPayloadStatusV1::from(status);

Expand Down Expand Up @@ -62,8 +86,11 @@ impl<E: EthSpec> Multiplexer<E> {

pub async fn handle_new_payload(&self, request: Request) -> Result<Response, ErrorResponse> {
tracing::info!("processing new payload from client");
let (id, execution_payload) = self.decode_execution_payload(request)?;
// TODO: verify versioned hashes
let (id, execution_payload, _versioned_hashes, parent_block_root) =
self.decode_new_payload(request)?;

// TODO: should check block hash validity before keying the cache on it
let block_hash = *execution_payload.block_hash();
let block_number = *execution_payload.block_number();

Expand All @@ -89,9 +116,13 @@ impl<E: EthSpec> Multiplexer<E> {
status
} else {
// Before sending a synthetic SYNCING response, check the block hash.
// Use a 0x0 hash if no parent block root was provided. The hash is only required
// for Deneb and later, and should be set (by decode_) whenever Deneb is activated.
let execution_payload = ExecutionPayload::from(execution_payload);
let (calculated_block_hash, _) =
ExecutionLayer::<E>::calculate_execution_block_hash(execution_payload.to_ref());
let (calculated_block_hash, _) = ExecutionLayer::<E>::calculate_execution_block_hash(
execution_payload.to_ref(),
parent_block_root.unwrap_or_default(),
);

if calculated_block_hash != block_hash {
tracing::warn!(
Expand Down Expand Up @@ -120,14 +151,44 @@ impl<E: EthSpec> Multiplexer<E> {
Response::new(id, status)
}

fn decode_execution_payload(
fn decode_new_payload(
&self,
request: Request,
) -> Result<(JsonValue, JsonExecutionPayload<E>), ErrorResponse> {
) -> Result<
(
JsonValue,
JsonExecutionPayload<E>,
Option<Vec<VersionedHash>>,
Option<Hash256>,
),
ErrorResponse,
> {
let method = request.method.clone();

let (id, (payload_json,)) = request.parse_as::<(JsonValue,)>()?;
let (id, params) = request.parse_as::<Vec<JsonValue>>()?;

let (versioned_hashes, parent_block_root) = if method == ENGINE_NEW_PAYLOAD_V3 {
if params.len() != 3 {
return Err(ErrorResponse::parse_error_generic(
id,
"wrong number of parameters for newPayloadV3".to_string(),
));
}
let versioned_hashes = serde_json::from_value(params[1].clone())
.map_err(|e| ErrorResponse::parse_error(id.clone(), e))?;
let parent_block_root = serde_json::from_value(params[2].clone())
.map_err(|e| ErrorResponse::parse_error(id.clone(), e))?;
(Some(versioned_hashes), Some(parent_block_root))
} else if params.len() == 1 {
(None, None)
} else {
return Err(ErrorResponse::parse_error_generic(
id,
format!("wrong number of parameters for {method}: {}", params.len()),
));
};

let payload_json = params[0].clone();
let QuantityU64 { value: timestamp } =
if let Some(timestamp_json) = payload_json.get("timestamp") {
serde_json::from_value(timestamp_json.clone())
Expand All @@ -148,15 +209,16 @@ impl<E: EthSpec> Multiplexer<E> {

let fork_name = self.spec.fork_name_at_slot::<E>(slot);

// TODO: this could be more generic
let payload = if method == ENGINE_NEW_PAYLOAD_V1 || fork_name == ForkName::Merge {
serde_json::from_value(payload_json).map(JsonExecutionPayload::V1)
} else {
} else if method == ENGINE_NEW_PAYLOAD_V2 || fork_name == ForkName::Capella {
serde_json::from_value(payload_json).map(JsonExecutionPayload::V2)
} else {
serde_json::from_value(payload_json).map(JsonExecutionPayload::V3)
}
.map_err(|e| ErrorResponse::parse_error(id.clone(), e))?;

Ok((id, payload))
Ok((id, payload, versioned_hashes, parent_block_root))
}

pub fn timestamp_to_slot(&self, timestamp: u64) -> Option<Slot> {
Expand Down
Loading

0 comments on commit bfbf34c

Please sign in to comment.