Skip to content

Commit

Permalink
fix(l1): fix hive LargeTxRequest test (#1480)
Browse files Browse the repository at this point in the history
The fix consists in removing the transaction limit in the Transactions
rlpx decode. While there might be a limit to transaction responses, we
should not limit incoming transactions and specially not in the decoding
stage, as this is an eth wire protocol limit, not an rlpx limit.

Additional changes: added error messages and more detailed disconnect
messages. For this PR this only helped to see that there were "early
eofs", which can roughly be transalted as abrupt disconnections from the
peer.
  • Loading branch information
Arkenan authored Jan 8, 2025
1 parent b289c0f commit 3a3d97e
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci_l1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ jobs:
test_pattern: /AccountRange|StorageRanges|ByteCodes|TrieNodes
- name: "Devp2p eth tests"
simulation: devp2p
test_pattern: eth/Status|GetBlockHeaders|SimultaneousRequests|SameRequestID|ZeroRequestID|GetBlockBodies|MaliciousHandshake|MaliciousStatus|Transaction|InvalidTxs|NewPooledTxs|GetBlockReceipts|BlobViolations
test_pattern: eth/Status|GetBlockHeaders|SimultaneousRequests|SameRequestID|ZeroRequestID|GetBlockBodies|MaliciousHandshake|MaliciousStatus|Transaction|InvalidTxs|NewPooledTxs|GetBlockReceipts|BlobViolations|LargeTxRequest
- name: "Engine Auth and EC tests"
simulation: ethereum/engine
test_pattern: engine-(auth|exchange-capabilities)/
Expand Down
2 changes: 2 additions & 0 deletions crates/common/types/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl RLPEncode for P2PTransaction {
impl RLPDecode for P2PTransaction {
fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> {
if is_encoded_as_bytes(rlp)? {
// Adjust the encoding to get the payload
let payload = get_rlp_bytes_item_payload(rlp)?;
let tx_type = payload.first().ok_or(RLPDecodeError::InvalidLength)?;
let tx_encoding = &payload.get(1..).ok_or(RLPDecodeError::InvalidLength)?;
Expand All @@ -93,6 +94,7 @@ impl RLPDecode for P2PTransaction {
// EIP4844
0x3 => WrappedEIP4844Transaction::decode_unfinished(tx_encoding)
.map(|(tx, rem)| (P2PTransaction::EIP4844TransactionWithBlobs(tx), rem)),

// PriviligedL2
0x7e => PrivilegedL2Transaction::decode_unfinished(tx_encoding)
.map(|(tx, rem)| (P2PTransaction::PrivilegedL2Transaction(tx), rem)),
Expand Down
25 changes: 15 additions & 10 deletions crates/networking/p2p/rlpx/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,13 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
reason: self.match_disconnect_reason(&error),
}))
.await
.unwrap_or_else(|e| debug!("Could not send Disconnect message: ({e})"));
.unwrap_or_else(|e| error!("Could not send Disconnect message: ({e})."));
if let Ok(node_id) = self.get_remote_node_id() {
// Discard peer from kademlia table
debug!("{error_text}: ({error}), discarding peer {node_id}");
error!("{error_text}: ({error}), discarding peer {node_id}");
table.lock().await.replace_peer(node_id);
} else {
debug!("{error_text}: ({error}), unknown peer")
error!("{error_text}: ({error}), unknown peer")
}
}

Expand Down Expand Up @@ -345,15 +345,13 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
return Err(RLPxError::Disconnect());
}
Message::Ping(_) => {
debug!("Received Ping");
self.send(Message::Pong(PongMessage {})).await?;
debug!("Pong sent");
}
Message::Pong(_) => {
// We ignore received Pong messages
}
Message::Status(msg_data) if !peer_supports_eth => {
debug!("Received Status");
backend::validate_status(msg_data, &self.storage)?
}
Message::GetAccountRange(req) => {
Expand Down Expand Up @@ -608,18 +606,25 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
let mut buf = vec![0; MAX_DISC_PACKET_SIZE];

// Read the message's size
self.stream
.read_exact(&mut buf[..2])
.await
.map_err(|_| RLPxError::ConnectionError("Connection dropped".to_string()))?;
self.stream.read_exact(&mut buf[..2]).await.map_err(|e| {
RLPxError::ConnectionError(format!(
"Connection dropped. Failed to read handshake message size: {}",
e
))
})?;
let ack_data = [buf[0], buf[1]];
let msg_size = u16::from_be_bytes(ack_data) as usize;

// Read the rest of the message
self.stream
.read_exact(&mut buf[2..msg_size + 2])
.await
.map_err(|_| RLPxError::ConnectionError("Connection dropped".to_string()))?;
.map_err(|e| {
RLPxError::ConnectionError(format!(
"Connection dropped. Failed to read the rest of the handshake message: {}.",
e
))
})?;
let ack_bytes = &buf[..msg_size + 2];
Ok(ack_bytes.to_vec())
}
Expand Down
12 changes: 3 additions & 9 deletions crates/networking/p2p/rlpx/eth/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ use crate::rlpx::{
pub(crate) struct Transactions {
pub(crate) transactions: Vec<Transaction>,
}
// TODO(#1132): Also limit transactions by message byte-size.
// Limit taken from here: https://github.com/ethereum/go-ethereum/blob/df182a742cec68adcc034d4747afa5182fc75ca3/eth/fetcher/tx_fetcher.go#L49
pub const TRANSACTION_LIMIT: usize = 256;

impl Transactions {
pub fn new(transactions: Vec<Transaction>) -> Self {
Expand Down Expand Up @@ -55,12 +52,8 @@ impl RLPxMessage for Transactions {
// or so it seems.
while let Ok((tx, updated_decoder)) = decoder.decode_field::<Transaction>("p2p transaction")
{
if transactions.len() > TRANSACTION_LIMIT {
break;
} else {
decoder = updated_decoder;
transactions.push(tx);
}
decoder = updated_decoder;
transactions.push(tx);
}
Ok(Self::new(transactions))
}
Expand Down Expand Up @@ -256,6 +249,7 @@ impl PooledTransactions {
}

/// Saves every incoming pooled transaction to the mempool.
pub fn handle(self, store: &Store) -> Result<(), MempoolError> {
for tx in self.pooled_transactions {
if let P2PTransaction::EIP4844TransactionWithBlobs(itx) = tx {
Expand Down
20 changes: 12 additions & 8 deletions crates/networking/p2p/rlpx/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ pub(crate) async fn read<S: AsyncRead + std::marker::Unpin>(

// Receive the message's frame header
let mut frame_header = [0; 32];
stream
.read_exact(&mut frame_header)
.await
.map_err(|_| RLPxError::ConnectionError("Connection dropped".to_string()))?;
stream.read_exact(&mut frame_header).await.map_err(|e| {
RLPxError::ConnectionError(format!(
"Connection dropped. Failed to read message frame header: {}",
e
))
})?;
// Both are padded to the block's size (16 bytes)
let (header_ciphertext, header_mac) = frame_header.split_at_mut(16);

Expand Down Expand Up @@ -138,10 +140,12 @@ pub(crate) async fn read<S: AsyncRead + std::marker::Unpin>(
// Receive the hello message
let padded_size = frame_size.next_multiple_of(16);
let mut frame_data = vec![0; padded_size + 16];
stream
.read_exact(&mut frame_data)
.await
.map_err(|_| RLPxError::ConnectionError("Connection dropped".to_string()))?;
stream.read_exact(&mut frame_data).await.map_err(|e| {
RLPxError::ConnectionError(format!(
"Connection dropped. Failed to read the hello message: {}",
e
))
})?;
let (frame_ciphertext, frame_mac) = frame_data.split_at_mut(padded_size);

// check MAC
Expand Down

0 comments on commit 3a3d97e

Please sign in to comment.