Skip to content

Commit

Permalink
feat(l1): snap sync overhaul (#1763)
Browse files Browse the repository at this point in the history
**Motivation**
This PR introduces the following upgrades for snap-sync:
- Use DB-persisted checkpoints so we can persist the sync progress
throughout restarts & cycles
- Stop ForckChoices & NewPayloads being applied while syncing
- Improved handling of stale pivot during sub-processes
- Improved handling of pending requests when aborting due to stale pivot
- Fetching of large storage tries (that don't fit in a single range
request)
- Safer (but a bit slower) healing that can be restarted
- Faster storage fetching (multiple parallel fetches)

And also simplifies it by removing the following logic:
- No longer downloads bodies and receipts for blocks before the pivot
during snap sync (WARNING: this goes against the spec but shouldn't be a
problem for the time being)
- Removes restart from latest block when latest - 64 becomes stale. (By
this point it is more effective to wait for the next fork choice update)
- Periodically shows state sync progress
<!-- Why does this pull request exist? What are its goals? -->

**Description**
- Stores the last downloaded block's hash in the DB during snap sync to
serve as a checkpoint if the sync is aborted halfway (common case when
syncing from genesis). This checkpoint is cleared upon succesful snap
sync.
- No longer fetches receipts or block bodies past the pivot block during
snap sync
- Add method `sync_status` which returns an enum with the current sync
status (either Inactive, Active or Pending) and uses it in the
ForkChoiceUpdate & NewPayload engine rpc endpoints so that we don't
apply their logic during an active or pending sync.
- Fetcher process now identify stale pivots and remain passive until
they receive the end signal
- Fetcher processes now return their current queue upon return so that
it can be persisted into the next cycle
- Stores the latest state root during state sync and healing as a
checkpoint
- Stores the last fetched key during state sync as a checkpoint
- Healing no longer stores the nodes received via p2p, it instead
inserts the leaf values and rebuilds it to avoid trie corruption between
restarts.
- The current progress percentage and estimated time to finish is
periodically reported during state sync
- Disables the following Paris & Cancun engine hive tests that
previously yielded false positives due to new payloads being accepted on
top of a syncing chain:

   * Invalid NewPayload (family)
    * Re-Org Back to Canonical Chain From Syncing Chain
   * Unknown HeadBlockHash
   * In-Order Consecutive Payload Execution (Flaky)
   * Valid NewPayload->ForkchoiceUpdated on Syncing Client
   * Invalid Missing Ancestor ReOrg
   * * Payload Build after New Invalid Payload
 (only Cancun)

- And also disables the following tests that fail with the flag
Syncing=true for the same reason :

   * Bad Hash on NewPayload
   * ParentHash equals BlockHash on NewPayload (only for Paris)
   * Invalid PayloadAttributes (family)

Misc:
- Replaces some noisy unwraps in networking module with errors
- Applies annotated hacky fixes for problems reported in #1684 #1685 &
#1686
<!-- A clear and concise general description of the changes this PR
introduces -->

<!-- Link to issues: Resolves #111, Resolves #222 -->

Closes None

<!-- A clear and concise general description of the changes this PR
introduces -->

<!-- Link to issues: Resolves #111, Resolves #222 -->

Closes #issue_number
  • Loading branch information
fmoletta authored Jan 29, 2025
1 parent 6f6dbb9 commit ea3ae65
Show file tree
Hide file tree
Showing 16 changed files with 972 additions and 199 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci_l1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,10 @@ jobs:
test_pattern: engine-(auth|exchange-capabilities)/
- name: "Cancun Engine tests"
simulation: ethereum/engine
test_pattern: "engine-cancun/Blob Transactions On Block 1|Blob Transaction Ordering, Single|Blob Transaction Ordering, Multiple Accounts|Replace Blob Transactions|Parallel Blob Transactions|ForkchoiceUpdated|GetPayload|NewPayloadV3 After Cancun|NewPayloadV3 Before Cancun|NewPayloadV3 Versioned Hashes|Incorrect BlobGasUsed|Bad Hash|ParentHash equals BlockHash|RPC:|in ForkchoiceState|Unknown|Invalid PayloadAttributes|Unique|Re-Execute Payload|In-Order Consecutive Payload|Multiple New Payloads|Valid NewPayload->|NewPayload with|Payload Build after|Build Payload with|Invalid Missing Ancestor ReOrg, StateRoot|Re-Org Back to|Re-org to Previously|Safe Re-Org to Side Chain|Transaction Re-Org|Re-Org Back into Canonical Chain, Depth=5|Suggested Fee Recipient Test|PrevRandao Opcode|Invalid NewPayload|Fork ID: Genesis=0|Fork ID: Genesis=1, Cancun=0|Fork ID: Genesis=1, Cancun=2 |Fork ID: Genesis=1, Cancun=2, BlocksBeforePeering=1|Fork ID: Genesis=1, Cancun=2, Shanghai=[^1]|Pre-Merge"
test_pattern: "engine-cancun/Blob Transactions On Block 1|Blob Transaction Ordering, Single|Blob Transaction Ordering, Multiple Accounts|Replace Blob Transactions|Parallel Blob Transactions|ForkchoiceUpdatedV3|ForkchoiceUpdatedV2|ForkchoiceUpdated Version|GetPayload|NewPayloadV3 After Cancun|NewPayloadV3 Before Cancun|NewPayloadV3 Versioned Hashes|Incorrect BlobGasUsed|ParentHash equals BlockHash|RPC:|in ForkchoiceState|Unknown SafeBlockHash|Unknown FinalizedBlockHash|Unique|Re-Execute Payload|Multiple New Payloads|NewPayload with|Build Payload with|Re-org to Previously|Safe Re-Org to Side Chain|Transaction Re-Org|Re-Org Back into Canonical Chain, Depth=5|Suggested Fee Recipient Test|PrevRandao Opcode|Fork ID: Genesis=0|Fork ID: Genesis=1, Cancun=0|Fork ID: Genesis=1, Cancun=2 |Fork ID: Genesis=1, Cancun=2, BlocksBeforePeering=1|Fork ID: Genesis=1, Cancun=2, Shanghai=[^1]|Pre-Merge"
- name: "Paris Engine tests"
simulation: ethereum/engine
test_pattern: "engine-api/RPC|Re-Org Back to Canonical Chain From Syncing Chain|Re-org to Previously Validated Sidechain Payload|Re-Org Back into Canonical Chain, Depth=5|Safe Re-Org|Transaction Re-Org|Inconsistent|Suggested Fee|PrevRandao|Fork ID|Unknown|Invalid PayloadAttributes|Bad Hash|Unique Payload ID|Re-Execute Payload|In-Order|Multiple New Payloads|Valid NewPayload|NewPayload with|Invalid NewPayload|Payload Build|Invalid NewPayload, Transaction|ParentHash equals|Build Payload|Invalid Missing Ancestor ReOrg"
test_pattern: "engine-api/RPC|Re-org to Previously Validated Sidechain Payload|Re-Org Back into Canonical Chain, Depth=5|Safe Re-Org|Transaction Re-Org|Inconsistent|Suggested Fee|PrevRandao Opcode Transactions|Fork ID|Unknown SafeBlockHash|Unknown FinalizedBlockHash|Unique Payload ID|Re-Execute Payload|Multiple New Payloads|NewPayload with|Payload Build|Build Payload"
- name: "Engine withdrawal tests"
simulation: ethereum/engine
test_pattern: "engine-withdrawals/engine-withdrawals test loader|GetPayloadV2 Block Value|Sync after 2 blocks - Withdrawals on Genesis|Max Initcode Size|Pre-Merge Fork Number > 0|Empty Withdrawals|Corrupted Block Hash Payload|Withdrawals Fork on Block 2|Withdrawals Fork on Block 3|GetPayloadBodies"
Expand Down
2 changes: 1 addition & 1 deletion crates/networking/p2p/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ pub fn node_id_from_signing_key(signer: &SigningKey) -> H512 {

/// Shows the amount of connected peers, active peers, and peers suitable for snap sync on a set interval
pub async fn periodically_show_peer_stats(peer_table: Arc<Mutex<KademliaTable>>) {
const INTERVAL_DURATION: tokio::time::Duration = tokio::time::Duration::from_secs(60);
const INTERVAL_DURATION: tokio::time::Duration = tokio::time::Duration::from_secs(120);
let mut interval = tokio::time::interval(INTERVAL_DURATION);
loop {
peer_table.lock().await.show_peer_stats();
Expand Down
66 changes: 65 additions & 1 deletion crates/networking/p2p/peer_channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl PeerChannels {
/// Requests storage ranges for accounts given their hashed address and storage roots, and the root of their state trie
/// account_hashes & storage_roots must have the same length
/// storage_roots must not contain empty trie hashes, we will treat empty ranges as invalid responses
/// Returns true if the last accoun't storage was not completely fetched by the request
/// Returns true if the last account's storage was not completely fetched by the request
/// Returns the list of hashed storage keys and values for each account's storage or None if:
/// - There are no available peers (the node just started up or was rejected by all other nodes)
/// - The response timed out
Expand Down Expand Up @@ -447,4 +447,68 @@ impl PeerChannels {
})
.flatten()
}

/// Requests a single storage range for an accouns given its hashed address and storage root, and the root of its state trie
/// This is a simplified version of `request_storage_range` meant to be used for large tries that require their own single requests
/// account_hashes & storage_roots must have the same length
/// storage_root must not be an empty trie hash, we will treat empty ranges as invalid responses
/// Returns true if the account's storage was not completely fetched by the request
/// Returns the list of hashed storage keys and values for the account's storage or None if:
/// - There are no available peers (the node just started up or was rejected by all other nodes)
/// - The response timed out
/// - The response was empty or not valid
pub async fn request_storage_range(
&self,
state_root: H256,
storage_root: H256,
account_hash: H256,
start: H256,
) -> Option<(Vec<H256>, Vec<U256>, bool)> {
let request_id = rand::random();
let request = RLPxMessage::GetStorageRanges(GetStorageRanges {
id: request_id,
root_hash: state_root,
account_hashes: vec![account_hash],
starting_hash: start,
limit_hash: HASH_MAX,
response_bytes: MAX_RESPONSE_BYTES,
});
let mut receiver = self.receiver.lock().await;
self.sender.send(request).await.ok()?;
let (mut slots, proof) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move {
loop {
match receiver.recv().await {
Some(RLPxMessage::StorageRanges(StorageRanges { id, slots, proof }))
if id == request_id =>
{
return Some((slots, proof))
}
// Ignore replies that don't match the expected id (such as late responses)
Some(_) => continue,
None => return None,
}
}
})
.await
.ok()??;
// Check we got a reasonable amount of storage ranges
if slots.len() != 1 {
return None;
}
// Unzip & validate response
let proof = encodable_to_proof(&proof);
let (storage_keys, storage_values): (Vec<H256>, Vec<U256>) = slots
.remove(0)
.into_iter()
.map(|slot| (slot.hash, slot.data))
.unzip();
let encoded_values = storage_values
.iter()
.map(|val| val.encode_to_vec())
.collect::<Vec<_>>();
// Verify storage range
let should_continue =
verify_range(storage_root, &start, &storage_keys, &encoded_values, &proof).ok()?;
Some((storage_keys, storage_values, should_continue))
}
}
4 changes: 4 additions & 0 deletions crates/networking/p2p/rlpx/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,10 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
buf.resize(msg_size + 2, 0);

// Read the rest of the message
// Guard unwrap
if buf.len() < msg_size + 2 {
return Err(RLPxError::CryptographyError(String::from("bad buf size")));
}
self.framed
.get_mut()
.read_exact(&mut buf[2..msg_size + 2])
Expand Down
4 changes: 3 additions & 1 deletion crates/networking/p2p/rlpx/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ fn decrypt_message(

// Verify the MAC.
let expected_d = sha256_hmac(&mac_key, &[iv, c], size_data);
assert_eq!(d, expected_d);
if d != expected_d {
return Err(RLPxError::HandshakeError(String::from("Invalid MAC")));
}

// Decrypt the message with the AES key.
let mut stream_cipher = Aes128Ctr64BE::new_from_slices(aes_key, iv)?;
Expand Down
Loading

0 comments on commit ea3ae65

Please sign in to comment.