Skip to content

Commit

Permalink
Fix execute parallel final result
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcosNicolau committed Sep 2, 2024
1 parent 672abd7 commit d6ba396
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
const EMPTY_TXS_ROLLING_HASH: H256 = H256::zero();

#[derive(Debug, Clone)]
pub(crate) struct BootloaderL2Block {
pub struct BootloaderL2Block {
pub(crate) number: u32,
pub(crate) timestamp: u64,
pub(crate) txs_rolling_hash: H256, // The rolling hash of all the transactions in the miniblock
Expand Down Expand Up @@ -57,6 +57,7 @@ impl BootloaderL2Block {
pub(crate) fn interim_version(&self) -> BootloaderL2Block {
let mut interim = self.clone();
interim.max_virtual_blocks_to_create = 0;
println!("IN INTERIM VERSION");
interim
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mod l2_block;
pub mod l2_block;
mod snapshot;
mod state;
mod tx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ pub struct BootloaderState {
/// See the structure doc-comment for a better explanation of purpose.
tx_to_execute: usize,
/// Stored txs in bootloader memory
l2_blocks: Vec<BootloaderL2Block>,
pub l2_blocks: Vec<BootloaderL2Block>,
/// The number of 32-byte words spent on the already included compressed bytecodes.
compressed_bytecodes_encoding: usize,
/// Initial memory of bootloader
initial_memory: BootloaderMemory,
pub initial_memory: BootloaderMemory,
/// Mode of txs for execution, it can be changed once per vm lunch
execution_mode: TxExecutionMode,
/// Current offset of the free space in the bootloader memory.
Expand Down Expand Up @@ -88,7 +88,7 @@ impl BootloaderState {
}

/// This method bypass sanity checks and should be used carefully.
pub(crate) fn push_l2_block(&mut self, l2_block: L2BlockEnv) {
pub fn push_l2_block(&mut self, l2_block: L2BlockEnv) {
self.l2_blocks
.push(BootloaderL2Block::new(l2_block, self.free_tx_index()))
}
Expand Down Expand Up @@ -167,7 +167,7 @@ impl BootloaderState {
memory
}

pub(crate) fn last_l2_block(&self) -> &BootloaderL2Block {
pub fn last_l2_block(&self) -> &BootloaderL2Block {
self.l2_blocks.last().unwrap()
}

Expand Down
179 changes: 106 additions & 73 deletions core/lib/multivm/src/versions/era_vm/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use zksync_utils::{
};

use super::{
bootloader_state::{utils::apply_l2_block, BootloaderState},
bootloader_state::{l2_block::BootloaderL2Block, utils::apply_l2_block, BootloaderState},
event::merge_events,
hook::Hook,
initial_bootloader_memory::bootloader_initial_memory,
Expand Down Expand Up @@ -73,7 +73,6 @@ pub struct Vm<S: ReadStorage> {
pub snapshot: Option<VmSnapshot>,

pub transaction_to_execute: Vec<ParallelTransaction>,

pub current_tx: usize,
pub is_parallel: bool,
}
Expand Down Expand Up @@ -248,12 +247,10 @@ impl<S: ReadStorage + 'static> Vm<S> {
} else {
U256::zero()
};
println!("IS PARALLEL {}", self.is_parallel);
memory.push((100, is_parallel));
self.write_to_bootloader_heap(memory);
}
Hook::TxIndex => {
println!("CURRENT TX {}", self.current_tx);
let mut memory: Vec<(usize, U256)> = vec![];
memory.push((102, (self.current_tx).into()));
self.write_to_bootloader_heap(memory);
Expand Down Expand Up @@ -282,8 +279,7 @@ impl<S: ReadStorage + 'static> Vm<S> {
output: crate::interface::VmRevertReason::General {
msg: "Transaction reverted with empty reason. Possibly out of gas".to_string(),
data: vec![],
},
};
}};
} else {
break ExecutionResult::Success { output: vec![] };
}
Expand Down Expand Up @@ -366,6 +362,8 @@ impl<S: ReadStorage + 'static> Vm<S> {
refund: u64,
with_compression: bool,
) {
// push the transactions to the main vm, which holds the actual L2 block
self.write_block_to_mem(tx.clone(), refund, with_compression);
self.transaction_to_execute
.push(ParallelTransaction::new(tx, refund, with_compression));
}
Expand Down Expand Up @@ -407,7 +405,7 @@ impl<S: ReadStorage + 'static> Vm<S> {
self.write_to_bootloader_heap(memory);
}

pub fn push_transaction_inner_parallel(
pub fn push_transaction_inner_no_bytecode(
&mut self,
tx: Transaction,
refund: u64,
Expand All @@ -416,7 +414,45 @@ impl<S: ReadStorage + 'static> Vm<S> {
let tx: TransactionData = tx.into();
let overhead = tx.overhead_gas();

self.insert_bytecodes(tx.factory_deps.iter().map(|dep| &dep[..]));
let compressed_bytecodes = if is_l1_tx_type(tx.tx_type) || !with_compression {
// L1 transactions do not need compression
vec![]
} else {
compress_bytecodes(&tx.factory_deps, |hash| {
self.inner
.state
.storage_changes()
.get(&EraStorageKey::new(
KNOWN_CODES_STORAGE_ADDRESS,
h256_to_u256(hash),
))
.map(|x| !x.is_zero())
.unwrap_or_else(|| self.storage.is_bytecode_known(&hash))
})
};

let trusted_ergs_limit = tx.trusted_ergs_limit();

let memory = self.bootloader_state.push_tx(
tx,
overhead,
refund,
compressed_bytecodes,
trusted_ergs_limit,
self.system_env.chain_id,
);

self.write_to_bootloader_heap(memory);
}

pub fn push_transaction_inner_parallel(
&mut self,
tx: Transaction,
refund: u64,
with_compression: bool,
) {
let tx: TransactionData = tx.into();
let overhead = tx.overhead_gas();

let compressed_bytecodes = if is_l1_tx_type(tx.tx_type) || !with_compression {
// L1 transactions do not need compression
Expand Down Expand Up @@ -620,63 +656,78 @@ impl<S: ReadStorage + 'static> Vm<S> {
}
}

// Very basic parallel execution model, basically, we are spawning a new vm per transactions and then merging the results
// and creating the batch from the final merged state. We are not accounting for gas sharing, transactions that depend upon each other, etc
// in the future, this would become a new mode of execution (i.e VmExecutionMode::Parallel)
pub fn execute_parallel(&mut self) -> VmExecutionResultAndLogs {
let transactions: Vec<ParallelTransaction> =
self.transaction_to_execute.drain(..).collect();
let last_tx_number = transactions.len();
self.current_tx = last_tx_number;
self.is_parallel = true;
// Very basic parallel execution model, basically, we are spawning a new vm per transaction and merging the state results
// and sealing the batch from the final merged state.
pub fn execute_parallel_inner(&mut self, one_tx: bool) -> VmExecutionResultAndLogs {
let txs_to_process = if one_tx {
1
} else {
self.transaction_to_execute.len()
};
let transactions: Vec<ParallelTransaction> = self
.transaction_to_execute
.drain(..txs_to_process)
.collect();

// we only care about the final VMState, since that is where the pubdata and L2 changes reside
// we will merge this results later
let mut final_states: Vec<era_vm::state::VMState> = vec![self.inner.state.clone()];

// push the transactions to the main vm, which holds the actual L2 block
for tx in transactions.iter() {
self.write_block_to_mem(tx.tx.clone(), tx.refund, tx.with_compression);
}
let mut state: era_vm::state::VMState = self.inner.state.clone();

// to run in parallel, we spin up new vms to process and run the transaction in their own bootloader
for (idx, tx) in transactions.iter().enumerate() {
// the idea now is to spawn an era_vm and pass the transaction bytecode and let the rest be handled by the parallel executor
// let mut vm = EraVM::new();
for tx in transactions.iter() {
let mut vm = Vm::new(
self.batch_env.clone(),
self.system_env.clone(),
self.storage.clone(),
);

// storage writes are necessary
for (key, value) in final_states.last().unwrap().storage_changes().iter() {
vm.inner.state.storage_write(key.clone(), value.clone());
}

// set the last block to what the main vm has
vm.bootloader_state.l2_blocks.push(BootloaderL2Block {
number: self.bootloader_state.last_l2_block().number,
timestamp: self.bootloader_state.last_l2_block().timestamp,
txs_rolling_hash: self.bootloader_state.last_l2_block().txs_rolling_hash,
prev_block_hash: self.bootloader_state.last_l2_block().prev_block_hash,
first_tx_index: 0,
max_virtual_blocks_to_create: self
.bootloader_state
.last_l2_block()
.max_virtual_blocks_to_create,
txs: vec![],
});
vm.inner.state = state;
vm.is_parallel = true;
vm.current_tx = idx;
vm.inner.execution.tx_number = idx as u64;

if idx == 0 {
vm.current_tx = self.current_tx;
vm.inner.execution.tx_number = vm.current_tx as u64;
vm.inner
.execution
.set_gas_left(self.inner.execution.gas_left().unwrap())
.unwrap();

if vm.current_tx == 0 {
// since the first transaction starts the batch, we want to push it normally so that
// it create the virtual block at the beginnig
vm.push_transaction_inner(tx.tx.clone(), tx.refund, tx.with_compression);
// it create the virtual block at the beginning
vm.push_transaction_inner_no_bytecode(
tx.tx.clone(),
tx.refund,
tx.with_compression,
);
} else {
vm.push_transaction_inner_parallel(tx.tx.clone(), tx.refund, tx.with_compression);
}

// in the future we don't want to call the bootloader for this, and instead crate a new era_vm and pass the
// transaction bytecode directly, that would require to build all the validation logic for processing the transaction
// in rust
let result: VmExecutionResultAndLogs =
vm.inspect_inner(TracerDispatcher::default(), None, VmExecutionMode::OneTx);

// if one transaction fails, the whole batch fails
if result.result.is_failed() {
state = vm.inner.state;
self.inner
.execution
.set_gas_left(vm.inner.execution.gas_left().unwrap())
.unwrap();
self.current_tx += 1;

if one_tx {
self.inner.state = state.clone();
return result;
}
final_states.push(vm.inner.state);
}

// since no transactions have been pushed onto the bootloader, here it will only call the SetFictiveBlock and request the pubdata
Expand All @@ -690,11 +741,10 @@ impl<S: ReadStorage + 'static> Vm<S> {
let monotonic_counter_before = self.inner.statistics.monotonic_counter;

let snapshot = self.inner.state.snapshot();
self.inner.state = state;
self.is_parallel = true;

// finally, we need to merge the results to the current vm
self.inner.state = self.merge_vm_states(final_states);
self.inner.execution.tx_number = last_tx_number as u64;
let result = self.run(VmExecutionMode::Batch, &mut tracer);
let result: ExecutionResult = self.run(VmExecutionMode::Batch, &mut tracer);
let ergs_after = self.inner.execution.gas_left().unwrap();

VmExecutionResultAndLogs {
Expand All @@ -715,33 +765,16 @@ impl<S: ReadStorage + 'static> Vm<S> {
}
}

pub fn merge_vm_states(
&self,
vm_states: Vec<era_vm::state::VMState>,
) -> era_vm::state::VMState {
let mut final_state = era_vm::state::VMState::new(self.inner.state.storage.clone());

for state in vm_states {
for log in state.l2_to_l1_logs() {
final_state.record_l2_to_l1_log(log.clone());
}
for event in state.events() {
final_state.record_event(event.clone());
}
for (key, value) in state.storage_changes().iter() {
final_state.storage_write(key.clone(), value.clone());
}
final_state.add_pubdata(state.pubdata());
for hash in state.decommitted_hashes().iter() {
// this is to add the hash to the decommited hashes
final_state.decommit(hash.clone());
}
pub fn execute_parallel(
&mut self,
execution_mode: VmExecutionMode,
) -> VmExecutionResultAndLogs {
if let VmExecutionMode::OneTx = execution_mode {
self.execute_parallel_inner(true)
} else {
self.execute_parallel_inner(false)
}

final_state
}

pub fn merge_statistics(&self) {}
}

impl<S: ReadStorage + 'static> VmInterface for Vm<S> {
Expand Down

0 comments on commit d6ba396

Please sign in to comment.