From d6ba396e664b11c43d8373fefaca167545c33f3b Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Mon, 2 Sep 2024 16:44:40 -0300 Subject: [PATCH] Fix execute parallel final result --- .../era_vm/bootloader_state/l2_block.rs | 3 +- .../versions/era_vm/bootloader_state/mod.rs | 2 +- .../versions/era_vm/bootloader_state/state.rs | 8 +- core/lib/multivm/src/versions/era_vm/vm.rs | 179 +++++++++++------- 4 files changed, 113 insertions(+), 79 deletions(-) diff --git a/core/lib/multivm/src/versions/era_vm/bootloader_state/l2_block.rs b/core/lib/multivm/src/versions/era_vm/bootloader_state/l2_block.rs index 009f2040d20b..48dcb1d3cae4 100644 --- a/core/lib/multivm/src/versions/era_vm/bootloader_state/l2_block.rs +++ b/core/lib/multivm/src/versions/era_vm/bootloader_state/l2_block.rs @@ -12,7 +12,7 @@ use crate::{ const EMPTY_TXS_ROLLING_HASH: H256 = H256::zero(); #[derive(Debug, Clone)] -pub(crate) struct BootloaderL2Block { +pub struct BootloaderL2Block { pub(crate) number: u32, pub(crate) timestamp: u64, pub(crate) txs_rolling_hash: H256, // The rolling hash of all the transactions in the miniblock @@ -57,6 +57,7 @@ impl BootloaderL2Block { pub(crate) fn interim_version(&self) -> BootloaderL2Block { let mut interim = self.clone(); interim.max_virtual_blocks_to_create = 0; + println!("IN INTERIM VERSION"); interim } diff --git a/core/lib/multivm/src/versions/era_vm/bootloader_state/mod.rs b/core/lib/multivm/src/versions/era_vm/bootloader_state/mod.rs index 73830de2759b..f0be30c07ea1 100644 --- a/core/lib/multivm/src/versions/era_vm/bootloader_state/mod.rs +++ b/core/lib/multivm/src/versions/era_vm/bootloader_state/mod.rs @@ -1,4 +1,4 @@ -mod l2_block; +pub mod l2_block; mod snapshot; mod state; mod tx; diff --git a/core/lib/multivm/src/versions/era_vm/bootloader_state/state.rs b/core/lib/multivm/src/versions/era_vm/bootloader_state/state.rs index 7edd61a098f8..12fe3ba345bc 100644 --- a/core/lib/multivm/src/versions/era_vm/bootloader_state/state.rs +++ b/core/lib/multivm/src/versions/era_vm/bootloader_state/state.rs @@ -32,11 +32,11 @@ pub struct BootloaderState { /// See the structure doc-comment for a better explanation of purpose. tx_to_execute: usize, /// Stored txs in bootloader memory - l2_blocks: Vec, + pub l2_blocks: Vec, /// The number of 32-byte words spent on the already included compressed bytecodes. compressed_bytecodes_encoding: usize, /// Initial memory of bootloader - initial_memory: BootloaderMemory, + pub initial_memory: BootloaderMemory, /// Mode of txs for execution, it can be changed once per vm lunch execution_mode: TxExecutionMode, /// Current offset of the free space in the bootloader memory. @@ -88,7 +88,7 @@ impl BootloaderState { } /// This method bypass sanity checks and should be used carefully. - pub(crate) fn push_l2_block(&mut self, l2_block: L2BlockEnv) { + pub fn push_l2_block(&mut self, l2_block: L2BlockEnv) { self.l2_blocks .push(BootloaderL2Block::new(l2_block, self.free_tx_index())) } @@ -167,7 +167,7 @@ impl BootloaderState { memory } - pub(crate) fn last_l2_block(&self) -> &BootloaderL2Block { + pub fn last_l2_block(&self) -> &BootloaderL2Block { self.l2_blocks.last().unwrap() } diff --git a/core/lib/multivm/src/versions/era_vm/vm.rs b/core/lib/multivm/src/versions/era_vm/vm.rs index 84832b765ec7..5a769674b97b 100644 --- a/core/lib/multivm/src/versions/era_vm/vm.rs +++ b/core/lib/multivm/src/versions/era_vm/vm.rs @@ -26,7 +26,7 @@ use zksync_utils::{ }; use super::{ - bootloader_state::{utils::apply_l2_block, BootloaderState}, + bootloader_state::{l2_block::BootloaderL2Block, utils::apply_l2_block, BootloaderState}, event::merge_events, hook::Hook, initial_bootloader_memory::bootloader_initial_memory, @@ -73,7 +73,6 @@ pub struct Vm { pub snapshot: Option, pub transaction_to_execute: Vec, - pub current_tx: usize, pub is_parallel: bool, } @@ -248,12 +247,10 @@ impl Vm { } else { U256::zero() }; - println!("IS PARALLEL {}", self.is_parallel); memory.push((100, is_parallel)); self.write_to_bootloader_heap(memory); } Hook::TxIndex => { - println!("CURRENT TX {}", self.current_tx); let mut memory: Vec<(usize, U256)> = vec![]; memory.push((102, (self.current_tx).into())); self.write_to_bootloader_heap(memory); @@ -282,8 +279,7 @@ impl Vm { output: crate::interface::VmRevertReason::General { msg: "Transaction reverted with empty reason. Possibly out of gas".to_string(), data: vec![], - }, - }; + }}; } else { break ExecutionResult::Success { output: vec![] }; } @@ -366,6 +362,8 @@ impl Vm { refund: u64, with_compression: bool, ) { + // push the transactions to the main vm, which holds the actual L2 block + self.write_block_to_mem(tx.clone(), refund, with_compression); self.transaction_to_execute .push(ParallelTransaction::new(tx, refund, with_compression)); } @@ -407,7 +405,7 @@ impl Vm { self.write_to_bootloader_heap(memory); } - pub fn push_transaction_inner_parallel( + pub fn push_transaction_inner_no_bytecode( &mut self, tx: Transaction, refund: u64, @@ -416,7 +414,45 @@ impl Vm { let tx: TransactionData = tx.into(); let overhead = tx.overhead_gas(); - self.insert_bytecodes(tx.factory_deps.iter().map(|dep| &dep[..])); + let compressed_bytecodes = if is_l1_tx_type(tx.tx_type) || !with_compression { + // L1 transactions do not need compression + vec![] + } else { + compress_bytecodes(&tx.factory_deps, |hash| { + self.inner + .state + .storage_changes() + .get(&EraStorageKey::new( + KNOWN_CODES_STORAGE_ADDRESS, + h256_to_u256(hash), + )) + .map(|x| !x.is_zero()) + .unwrap_or_else(|| self.storage.is_bytecode_known(&hash)) + }) + }; + + let trusted_ergs_limit = tx.trusted_ergs_limit(); + + let memory = self.bootloader_state.push_tx( + tx, + overhead, + refund, + compressed_bytecodes, + trusted_ergs_limit, + self.system_env.chain_id, + ); + + self.write_to_bootloader_heap(memory); + } + + pub fn push_transaction_inner_parallel( + &mut self, + tx: Transaction, + refund: u64, + with_compression: bool, + ) { + let tx: TransactionData = tx.into(); + let overhead = tx.overhead_gas(); let compressed_bytecodes = if is_l1_tx_type(tx.tx_type) || !with_compression { // L1 transactions do not need compression @@ -620,63 +656,78 @@ impl Vm { } } - // Very basic parallel execution model, basically, we are spawning a new vm per transactions and then merging the results - // and creating the batch from the final merged state. We are not accounting for gas sharing, transactions that depend upon each other, etc - // in the future, this would become a new mode of execution (i.e VmExecutionMode::Parallel) - pub fn execute_parallel(&mut self) -> VmExecutionResultAndLogs { - let transactions: Vec = - self.transaction_to_execute.drain(..).collect(); - let last_tx_number = transactions.len(); - self.current_tx = last_tx_number; - self.is_parallel = true; + // Very basic parallel execution model, basically, we are spawning a new vm per transaction and merging the state results + // and sealing the batch from the final merged state. + pub fn execute_parallel_inner(&mut self, one_tx: bool) -> VmExecutionResultAndLogs { + let txs_to_process = if one_tx { + 1 + } else { + self.transaction_to_execute.len() + }; + let transactions: Vec = self + .transaction_to_execute + .drain(..txs_to_process) + .collect(); // we only care about the final VMState, since that is where the pubdata and L2 changes reside // we will merge this results later - let mut final_states: Vec = vec![self.inner.state.clone()]; - - // push the transactions to the main vm, which holds the actual L2 block - for tx in transactions.iter() { - self.write_block_to_mem(tx.tx.clone(), tx.refund, tx.with_compression); - } + let mut state: era_vm::state::VMState = self.inner.state.clone(); // to run in parallel, we spin up new vms to process and run the transaction in their own bootloader - for (idx, tx) in transactions.iter().enumerate() { - // the idea now is to spawn an era_vm and pass the transaction bytecode and let the rest be handled by the parallel executor - // let mut vm = EraVM::new(); + for tx in transactions.iter() { let mut vm = Vm::new( self.batch_env.clone(), self.system_env.clone(), self.storage.clone(), ); - - // storage writes are necessary - for (key, value) in final_states.last().unwrap().storage_changes().iter() { - vm.inner.state.storage_write(key.clone(), value.clone()); - } - + // set the last block to what the main vm has + vm.bootloader_state.l2_blocks.push(BootloaderL2Block { + number: self.bootloader_state.last_l2_block().number, + timestamp: self.bootloader_state.last_l2_block().timestamp, + txs_rolling_hash: self.bootloader_state.last_l2_block().txs_rolling_hash, + prev_block_hash: self.bootloader_state.last_l2_block().prev_block_hash, + first_tx_index: 0, + max_virtual_blocks_to_create: self + .bootloader_state + .last_l2_block() + .max_virtual_blocks_to_create, + txs: vec![], + }); + vm.inner.state = state; vm.is_parallel = true; - vm.current_tx = idx; - vm.inner.execution.tx_number = idx as u64; - - if idx == 0 { + vm.current_tx = self.current_tx; + vm.inner.execution.tx_number = vm.current_tx as u64; + vm.inner + .execution + .set_gas_left(self.inner.execution.gas_left().unwrap()) + .unwrap(); + + if vm.current_tx == 0 { // since the first transaction starts the batch, we want to push it normally so that - // it create the virtual block at the beginnig - vm.push_transaction_inner(tx.tx.clone(), tx.refund, tx.with_compression); + // it create the virtual block at the beginning + vm.push_transaction_inner_no_bytecode( + tx.tx.clone(), + tx.refund, + tx.with_compression, + ); } else { vm.push_transaction_inner_parallel(tx.tx.clone(), tx.refund, tx.with_compression); } - // in the future we don't want to call the bootloader for this, and instead crate a new era_vm and pass the - // transaction bytecode directly, that would require to build all the validation logic for processing the transaction - // in rust let result: VmExecutionResultAndLogs = vm.inspect_inner(TracerDispatcher::default(), None, VmExecutionMode::OneTx); - // if one transaction fails, the whole batch fails - if result.result.is_failed() { + state = vm.inner.state; + self.inner + .execution + .set_gas_left(vm.inner.execution.gas_left().unwrap()) + .unwrap(); + self.current_tx += 1; + + if one_tx { + self.inner.state = state.clone(); return result; } - final_states.push(vm.inner.state); } // since no transactions have been pushed onto the bootloader, here it will only call the SetFictiveBlock and request the pubdata @@ -690,11 +741,10 @@ impl Vm { let monotonic_counter_before = self.inner.statistics.monotonic_counter; let snapshot = self.inner.state.snapshot(); + self.inner.state = state; + self.is_parallel = true; - // finally, we need to merge the results to the current vm - self.inner.state = self.merge_vm_states(final_states); - self.inner.execution.tx_number = last_tx_number as u64; - let result = self.run(VmExecutionMode::Batch, &mut tracer); + let result: ExecutionResult = self.run(VmExecutionMode::Batch, &mut tracer); let ergs_after = self.inner.execution.gas_left().unwrap(); VmExecutionResultAndLogs { @@ -715,33 +765,16 @@ impl Vm { } } - pub fn merge_vm_states( - &self, - vm_states: Vec, - ) -> era_vm::state::VMState { - let mut final_state = era_vm::state::VMState::new(self.inner.state.storage.clone()); - - for state in vm_states { - for log in state.l2_to_l1_logs() { - final_state.record_l2_to_l1_log(log.clone()); - } - for event in state.events() { - final_state.record_event(event.clone()); - } - for (key, value) in state.storage_changes().iter() { - final_state.storage_write(key.clone(), value.clone()); - } - final_state.add_pubdata(state.pubdata()); - for hash in state.decommitted_hashes().iter() { - // this is to add the hash to the decommited hashes - final_state.decommit(hash.clone()); - } + pub fn execute_parallel( + &mut self, + execution_mode: VmExecutionMode, + ) -> VmExecutionResultAndLogs { + if let VmExecutionMode::OneTx = execution_mode { + self.execute_parallel_inner(true) + } else { + self.execute_parallel_inner(false) } - - final_state } - - pub fn merge_statistics(&self) {} } impl VmInterface for Vm {