diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 79ce1079f93..b83d8624977 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -10,21 +10,25 @@ use crate::missing_chunks::{BlockLike, MissingChunksPool}; use crate::state_request_tracker::StateRequestTracker; use crate::state_snapshot_actor::SnapshotCallbacks; use crate::store::{ChainStore, ChainStoreAccess, ChainStoreUpdate, GCMode}; + use crate::types::{ AcceptedBlock, ApplySplitStateResultOrStateChanges, ApplyTransactionResult, - ApplyTransactionsBlockContext, ApplyTransactionsChunkContext, Block, BlockEconomicsConfig, - BlockHeader, BlockStatus, ChainConfig, ChainGenesis, Provenance, RuntimeAdapter, - RuntimeStorageConfig, + ApplyTransactionsBlockContext, ApplyTransactionsChunkContext, BlockEconomicsConfig, + ChainConfig, RuntimeAdapter, RuntimeStorageConfig, StorageDataSource, }; use crate::update_shard::{ - process_shard_update, ApplyChunkResult, NewChunkData, NewChunkResult, OldChunkData, - OldChunkResult, ShardContext, ShardUpdateReason, StateSplitData, StateSplitResult, + process_shard_update, NewChunkData, NewChunkResult, OldChunkData, OldChunkResult, + ShardBlockUpdateResult, ShardContext, ShardUpdateReason, ShardUpdateResult, StateSplitData, + StateSplitResult, StorageContext, }; use crate::validate::{ validate_challenge, validate_chunk_proofs, validate_chunk_with_chunk_extra, validate_transactions_order, }; -use crate::{byzantine_assert, create_light_client_block_view, Doomslug}; +use crate::{ + byzantine_assert, create_light_client_block_view, BlockStatus, ChainGenesis, Doomslug, + Provenance, +}; use crate::{metrics, DoomslugThresholdMode}; use borsh::BorshDeserialize; use chrono::Duration; @@ -39,7 +43,8 @@ use near_epoch_manager::shard_tracker::ShardTracker; use near_epoch_manager::types::BlockHeaderInfo; use near_epoch_manager::EpochManagerAdapter; use near_o11y::log_assert; -use near_primitives::block::{genesis_chunks, BlockValidityError, Tip}; +use near_primitives::block::{genesis_chunks, Block, BlockValidityError, Tip}; +use near_primitives::block_header::BlockHeader; use near_primitives::challenge::{ BlockDoubleSign, Challenge, ChallengeBody, ChallengesResult, ChunkProofs, ChunkState, MaybeEncodedShardChunk, PartialState, SlashedValidator, @@ -452,7 +457,7 @@ pub fn check_known( check_known_store(chain, block_hash) } -type BlockApplyChunksResult = (CryptoHash, Vec>); +type BlockApplyChunksResult = (CryptoHash, Vec<(ShardId, Result)>); /// Facade to the blockchain block processing and storage. /// Provides current view on the state according to the chain state. @@ -517,7 +522,8 @@ impl Drop for Chain { /// UpdateShardJob is a closure that is responsible for updating a shard for a single block. /// Execution context (latest blocks/chunks details) are already captured within. -type UpdateShardJob = Box Result + Send + 'static>; +type UpdateShardJob = + (ShardId, Box Result + Send + 'static>); /// PreprocessBlockResult is a tuple where the first element is a vector of jobs /// to update shards, the second element is BlockPreprocessInfo @@ -2262,7 +2268,7 @@ impl Chain { &self, block_hash: CryptoHash, block_height: BlockHeight, - work: Vec Result + Send>>, + work: Vec, apply_chunks_done_marker: Arc>, apply_chunks_done_callback: DoneApplyChunkCallback, ) { @@ -2293,7 +2299,7 @@ impl Chain { me: &Option, block: &Block, block_preprocess_info: BlockPreprocessInfo, - apply_results: Vec>, + apply_results: Vec<(ShardId, Result)>, ) -> Result, Error> { let mut chain_update = self.chain_update(); let new_head = @@ -2309,7 +2315,7 @@ impl Chain { &mut self, me: &Option, block_hash: CryptoHash, - apply_results: Vec>, + apply_results: Vec<(ShardId, Result)>, block_processing_artifacts: &mut BlockProcessingArtifact, apply_chunks_done_callback: DoneApplyChunkCallback, ) -> Result { @@ -2332,10 +2338,11 @@ impl Chain { let provenance = block_preprocess_info.provenance.clone(); let block_start_processing_time = block_preprocess_info.block_start_processing_time; // TODO(#8055): this zip relies on the ordering of the apply_results. - for (apply_result, chunk) in apply_results.iter().zip(block.chunks().iter()) { + for (shard_id, apply_result) in apply_results.iter() { if let Err(err) = apply_result { if err.is_bad_data() { - block_processing_artifacts.invalid_chunks.push(chunk.clone()); + let chunk = block.chunks()[*shard_id as usize].clone(); + block_processing_artifacts.invalid_chunks.push(chunk); } } } @@ -3604,7 +3611,7 @@ impl Chain { &mut self, me: &Option, block_hash: &CryptoHash, - results: Vec>, + results: Vec>, ) -> Result<(), Error> { let block = self.store.get_block(block_hash)?; let mut chain_update = self.chain_update(); @@ -3879,6 +3886,38 @@ impl Chain { .collect() } + /// Returns sequence of blocks in chain from `last_block_hash` (inclusive) + /// until the block with height `first_block_height` (inclusive if `include_with_height` + /// is true). For each block hash in resulting `Vec`, next entry contains hash of its + /// parent on chain. + /// TODO(logunov): consider uniting with `get_incoming_receipts_for_shard` because it + /// has the same purpose. + fn get_blocks_until_height( + &self, + mut last_block_hash: CryptoHash, + first_block_height: BlockHeight, + include_with_height: bool, + ) -> Result, Error> { + let mut blocks = vec![]; + loop { + let header = self.get_block_header(&last_block_hash)?; + if header.height() < first_block_height { + return Err(Error::InvalidBlockHeight(first_block_height)); + } + + if header.height() == first_block_height { + break; + } + + blocks.push(last_block_hash); + last_block_hash = *header.prev_hash(); + } + if include_with_height { + blocks.push(last_block_hash); + } + Ok(blocks) + } + /// Creates jobs which will update shards for the given block and incoming /// receipts aggregated for it. fn apply_chunks_preprocessing( @@ -3894,16 +3933,32 @@ impl Chain { let _span = tracing::debug_span!(target: "chain", "apply_chunks_preprocessing").entered(); let prev_chunk_headers = Chain::get_prev_chunk_headers(self.epoch_manager.as_ref(), prev_block)?; - block - .chunks() - .iter() - .zip(prev_chunk_headers.iter()) - .enumerate() - .filter_map(|(shard_id, (chunk_header, prev_chunk_header))| { - // XXX: This is a bit questionable -- sandbox state patching works - // only for a single shard. This so far has been enough. - let state_patch = state_patch.take(); - let update_shard_job = self.get_update_shard_job( + + let mut maybe_jobs = vec![]; + for (shard_id, (chunk_header, prev_chunk_header)) in + block.chunks().iter().zip(prev_chunk_headers.iter()).enumerate() + { + // XXX: This is a bit questionable -- sandbox state patching works + // only for a single shard. This so far has been enough. + let state_patch = state_patch.take(); + + let stateful_job = self.get_update_shard_job( + me, + block, + prev_block, + chunk_header, + prev_chunk_header, + shard_id as ShardId, + mode, + incoming_receipts, + state_patch, + ); + maybe_jobs.push((shard_id, stateful_job)); + + let protocol_version = + self.epoch_manager.get_epoch_protocol_version(block.header().epoch_id())?; + if checked_feature!("stable", ChunkValidation, protocol_version) { + let stateless_job = self.get_stateless_validation_job( me, block, prev_block, @@ -3911,37 +3966,38 @@ impl Chain { prev_chunk_header, shard_id as ShardId, mode, - incoming_receipts, - state_patch, ); - match update_shard_job { - Ok(Some(processor)) => Some(Ok(processor)), - Ok(None) => None, - Err(err) => { - if err.is_bad_data() { - invalid_chunks.push(chunk_header.clone()); - } - Some(Err(err)) + maybe_jobs.push((shard_id, stateless_job)); + } + } + + let mut jobs = vec![]; + for (shard_id, maybe_job) in maybe_jobs { + match maybe_job { + Ok(Some(processor)) => jobs.push(processor), + Ok(None) => {} + Err(err) => { + if err.is_bad_data() { + let chunk_header = block.chunks()[shard_id].clone(); + invalid_chunks.push(chunk_header); } + return Err(err); } - }) - .collect() + } + } + + Ok(jobs) } - /// This method returns the closure that is responsible for updating a shard. - fn get_update_shard_job( + fn get_shard_context( &self, me: &Option, - block: &Block, - prev_block: &Block, - chunk_header: &ShardChunkHeader, - prev_chunk_header: &ShardChunkHeader, + block_header: &BlockHeader, shard_id: ShardId, mode: ApplyChunksMode, - incoming_receipts: &HashMap>, - state_patch: SandboxStatePatch, - ) -> Result, Error> { - let prev_hash = block.header().prev_hash(); + ) -> Result { + let prev_hash = block_header.prev_hash(); + let epoch_id = block_header.epoch_id(); let cares_about_shard_this_epoch = self.shard_tracker.care_about_shard(me.as_ref(), prev_hash, shard_id, true); let cares_about_shard_next_epoch = @@ -3953,6 +4009,33 @@ impl Chain { cares_about_shard_next_epoch, ); let need_to_split_states = will_shard_layout_change && cares_about_shard_next_epoch; + let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, epoch_id)?; + Ok(ShardContext { + shard_uid, + cares_about_shard_this_epoch, + will_shard_layout_change, + should_apply_transactions, + need_to_split_states, + }) + } + + /// This method returns the closure that is responsible for updating a shard. + fn get_update_shard_job( + &self, + me: &Option, + block: &Block, + prev_block: &Block, + chunk_header: &ShardChunkHeader, + prev_chunk_header: &ShardChunkHeader, + shard_id: ShardId, + mode: ApplyChunksMode, + incoming_receipts: &HashMap>, + state_patch: SandboxStatePatch, + ) -> Result, Error> { + let _span = tracing::debug_span!(target: "chain", "get_update_shard_job").entered(); + let prev_hash = block.header().prev_hash(); + let shard_context = self.get_shard_context(me, block.header(), shard_id, mode)?; + // We can only split states when states are ready, i.e., mode != ApplyChunksMode::NotCaughtUp // 1) if should_apply_transactions == true && split_state_roots.is_some(), // that means split states are ready. @@ -3968,24 +4051,26 @@ impl Chain { // called with mode NotCaughtUp, therefore `state_changes_for_split_states` have been // stored in the database. Then we can safely read that and apply that to the split // states - let split_state_roots = if need_to_split_states && mode != ApplyChunksMode::NotCaughtUp { - Some(self.get_split_state_roots(block, shard_id)?) - } else { - None - }; + let split_state_roots = + if shard_context.need_to_split_states && mode != ApplyChunksMode::NotCaughtUp { + Some(self.get_split_state_roots(block, shard_id)?) + } else { + None + }; - let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, block.header().epoch_id())?; let is_new_chunk = chunk_header.height_included() == block.header().height(); - let shard_update_reason = if should_apply_transactions { + let shard_update_reason = if shard_context.should_apply_transactions { let block_context = self.get_apply_transactions_block_context( block.header(), prev_block.header(), is_new_chunk, )?; + let storage_context = + StorageContext { storage_data_source: StorageDataSource::Db, state_patch }; if is_new_chunk { // Validate new chunk and collect incoming receipts for it. - let prev_chunk_extra = self.get_chunk_extra(prev_hash, &shard_uid)?; + let prev_chunk_extra = self.get_chunk_extra(prev_hash, &shard_context.shard_uid)?; let chunk = self.get_chunk_clone_from_header(&chunk_header)?; let prev_chunk_height_included = prev_chunk_header.height_included(); @@ -4051,18 +4136,22 @@ impl Chain { chunk, receipts, split_state_roots, + storage_context, }) } else { ShardUpdateReason::OldChunk(OldChunkData { block: block_context, prev_chunk_extra: ChunkExtra::clone( - self.get_chunk_extra(prev_hash, &shard_uid)?.as_ref(), + self.get_chunk_extra(prev_hash, &shard_context.shard_uid)?.as_ref(), ), split_state_roots, + storage_context, }) } } else if let Some(split_state_roots) = split_state_roots { - assert!(mode == ApplyChunksMode::CatchingUp && cares_about_shard_this_epoch); + assert!( + mode == ApplyChunksMode::CatchingUp && shard_context.cares_about_shard_this_epoch + ); let state_changes = self.store().get_state_changes_for_split_states(block.hash(), shard_id)?; ShardUpdateReason::StateSplit(StateSplitData { @@ -4077,16 +4166,240 @@ impl Chain { let runtime = self.runtime_adapter.clone(); let epoch_manager = self.epoch_manager.clone(); - Ok(Some(Box::new(move |parent_span| -> Result { - Ok(process_shard_update( - parent_span, - runtime.as_ref(), - epoch_manager.as_ref(), - shard_update_reason, - ShardContext { shard_uid, will_shard_layout_change }, - state_patch, - )?) - }))) + Ok(Some(( + shard_id, + Box::new(move |parent_span| -> Result { + Ok(ShardUpdateResult::Stateful(process_shard_update( + parent_span, + runtime.as_ref(), + epoch_manager.as_ref(), + shard_update_reason, + shard_context, + )?)) + }), + ))) + } + + /// Returns closure which should validate a chunk without state, if chunk is present. + /// TODO(logunov): + /// 1. Currently result of this job is not applied and used only to validate with + /// stateful chunk execution. Later current execution must be deprecated in favour of + /// this one. + /// 2. Use state witness and make this method truly stateless. + /// 3. Use range of blocks between chunk and previous existing chunk to collect + /// incoming receipts. + fn get_stateless_validation_job( + &self, + me: &Option, + block: &Block, + prev_block: &Block, + chunk_header: &ShardChunkHeader, + prev_chunk_header: &ShardChunkHeader, + shard_id: ShardId, + mode: ApplyChunksMode, + ) -> Result, Error> { + let _span = tracing::debug_span!(target: "chain", "get_stateless_validation_job").entered(); + let last_shard_context = self.get_shard_context(me, block.header(), shard_id, mode)?; + let is_new_chunk = chunk_header.height_included() == block.header().height(); + + // If we don't track a shard or there is no chunk, there is nothing to validate. + if !last_shard_context.should_apply_transactions || !is_new_chunk { + return Ok(None); + } + + // Validate transactions in chunk. + let chunk = self.get_chunk_clone_from_header(&chunk_header)?; + self.validate_chunk_transactions(&block, prev_block.header(), &chunk)?; + + let runtime = self.runtime_adapter.clone(); + let epoch_manager = self.epoch_manager.clone(); + let shard_id = prev_chunk_header.shard_id(); + let prev_chunk_height_included = prev_chunk_header.height_included(); + let prev_chunk_prev_hash = *prev_chunk_header.prev_block_hash(); + + // If previous chunk is genesis chunk, its execution is trivial. + // TODO(logunov): check that state witness is empty. + if prev_chunk_prev_hash == CryptoHash::default() { + return Ok(None); + } + + // Find the block at which height previous chunk was created. + // We will apply previous chunk to validate state witness in current chunk. + let mut last_blocks = + self.get_blocks_until_height(*prev_block.hash(), prev_chunk_height_included, true)?; + let prev_chunk_block_hash = last_blocks.pop().unwrap(); + let prev_chunk_block_header = self.get_block_header(&prev_chunk_block_hash)?; + assert_eq!(prev_chunk_block_header.prev_hash(), &prev_chunk_prev_hash); + + // Check that current and previous chunks have the same shard layouts because + // resharding is not supported yet - we need to determine parent shard id otherwise. + let prev_chunk_prev_block = match self.get_block(&prev_chunk_prev_hash) { + Ok(b) => b, + Err(e) => { + let block_height = block.header().height(); + let block_hash = block.hash(); + // TODO(logunov): this is probably happening just after state sync; needs to be + // fixed before stateless validation release. + debug!(target: "client", block_height, ?block_hash, shard_id, ?prev_chunk_prev_hash, "Previous block for previous chunk is missing: {e}"); + return Ok(None); + } + }; + let shard_layout = + epoch_manager.get_shard_layout_from_prev_block(prev_block.header().prev_hash())?; + let prev_shard_layout = epoch_manager + .get_shard_layout_from_prev_block(prev_chunk_prev_block.header().prev_hash())?; + if shard_layout != prev_shard_layout { + return Ok(None); + } + + // Do the same check for previous and previous-previous chunk, because we + // iterate over blocks between them to check shard update, and we will use + // the same shard id again for simplicity. + // TODO(logunov): does `prev_prev_chunk` always exist in DB? + let prev_prev_chunk = &prev_chunk_prev_block.chunks()[shard_id as usize]; + let prev_prev_chunk_prev_hash = prev_prev_chunk.prev_block_hash(); + let prev_prev_chunk_height_included = prev_prev_chunk.height_included(); + let prev_prev_shard_layout = + epoch_manager.get_shard_layout_from_prev_block(prev_prev_chunk_prev_hash)?; + if prev_shard_layout != prev_prev_shard_layout { + return Ok(None); + } + + // Collect receipts appeared on chain between previous and previous-previous chunk. + // Ideally we should collect receipts from current chunk (excluded) to previous chunk + // (included). But now we verify results against current stateful logic. + let receipts_response = &self.store().get_incoming_receipts_for_shard( + self.epoch_manager.as_ref(), + shard_id, + prev_chunk_block_hash, + prev_prev_chunk_height_included, + )?; + let receipts = collect_receipts_from_response(receipts_response); + + // Get sequence of blocks for which we will process shard update. + let blocks_to_execute = self.get_blocks_until_height( + prev_chunk_block_hash, + prev_prev_chunk_height_included, + false, + )?; + let mut execution_contexts: Vec<(ApplyTransactionsBlockContext, ShardContext)> = vec![]; + for block_hash in blocks_to_execute { + let block_header = self.get_block_header(&block_hash)?; + let prev_block_header = self.get_previous_header(&block_header)?; + let shard_context = self.get_shard_context(me, &block_header, shard_id, mode)?; + if shard_context.need_to_split_states { + return Ok(None); + } + execution_contexts.push(( + self.get_apply_transactions_block_context( + &block_header, + &prev_block_header, + block_hash == prev_chunk_block_hash, + )?, + shard_context, + )); + } + execution_contexts.reverse(); + + // Create stateless validation job. + // Its initial state corresponds to the block at which `prev_prev_chunk` was created. + // Then, we process updates for missing chunks, until we find a block at which + // `prev_chunk` was created. + // And finally we process update for the `prev_chunk`. + let mut current_chunk_extra = match self.get_chunk_extra( + &execution_contexts[0].0.prev_block_hash, + &execution_contexts[0].1.shard_uid, + ) { + Ok(c) => ChunkExtra::clone(c.as_ref()), + Err(e) => { + let block_height = block.header().height(); + let block_hash = block.hash(); + let requested_block_hash = execution_contexts[0].0.prev_block_hash; + let requested_shard_uid = execution_contexts[0].1.shard_uid; + debug!(target: "client", block_height, ?block_hash, ?requested_block_hash, ?requested_shard_uid, "Chunk extra is missing: {e}"); + return Ok(None); + } + }; + let (last_block_context, last_shard_context) = execution_contexts.pop().unwrap(); + let prev_chunk = self.get_chunk_clone_from_header(&prev_chunk_header.clone())?; + Ok(Some(( + shard_id, + Box::new(move |parent_span| -> Result { + let mut result = vec![]; + for (block_context, shard_context) in execution_contexts { + let block_result = process_shard_update( + parent_span, + runtime.as_ref(), + epoch_manager.as_ref(), + ShardUpdateReason::OldChunk(OldChunkData { + block: block_context.clone(), + split_state_roots: None, + prev_chunk_extra: current_chunk_extra.clone(), + storage_context: StorageContext { + storage_data_source: StorageDataSource::DbTrieOnly, + state_patch: Default::default(), + }, + }), + shard_context, + )?; + if let ShardBlockUpdateResult::OldChunk(OldChunkResult { + shard_uid, + apply_result, + apply_split_result_or_state_changes: _, + }) = block_result + { + *current_chunk_extra.state_root_mut() = apply_result.new_root; + result.push(( + block_context.block_hash, + shard_uid, + current_chunk_extra.clone(), + )); + } + } + // TODO(logunov): use `validate_chunk_with_chunk_extra` + assert_eq!(current_chunk_extra.state_root(), &prev_chunk.prev_state_root()); + let block_result = process_shard_update( + parent_span, + runtime.as_ref(), + epoch_manager.as_ref(), + ShardUpdateReason::NewChunk(NewChunkData { + chunk: prev_chunk, + receipts, + split_state_roots: None, + block: last_block_context.clone(), + is_first_block_with_chunk_of_version: false, + storage_context: StorageContext { + storage_data_source: StorageDataSource::DbTrieOnly, + state_patch: Default::default(), + }, + }), + last_shard_context, + )?; + if let ShardBlockUpdateResult::NewChunk(NewChunkResult { + gas_limit, + shard_uid, + apply_result, + apply_split_result_or_state_changes: _, + }) = block_result + { + let (outcome_root, _) = + ApplyTransactionResult::compute_outcomes_proof(&apply_result.outcomes); + result.push(( + last_block_context.block_hash, + shard_uid, + ChunkExtra::new( + &apply_result.new_root, + outcome_root, + apply_result.validator_proposals, + apply_result.total_gas_burnt, + gas_limit, + apply_result.total_balance_burnt, + ), + )); + } + Ok(ShardUpdateResult::Stateless(result)) + }), + ))) } /// Function to create a new snapshot if needed @@ -5026,11 +5339,28 @@ impl<'a> ChainUpdate<'a> { fn apply_chunk_postprocessing( &mut self, block: &Block, - apply_results: Vec, + apply_results: Vec, ) -> Result<(), Error> { let _span = tracing::debug_span!(target: "chain", "apply_chunk_postprocessing").entered(); for result in apply_results { - self.process_apply_chunk_result(block, result)? + match result { + ShardUpdateResult::Stateful(result) => { + self.process_apply_chunk_result(block, result)? + } + ShardUpdateResult::Stateless(results) => { + for (block_hash, shard_uid, chunk_extra) in results { + let expected_chunk_extra = + self.chain_store_update.get_chunk_extra(&block_hash, &shard_uid)?; + assert_eq!( + &chunk_extra, + expected_chunk_extra.as_ref(), + "For stateless validation, chunk extras for block {} and shard {} do not match", + block_hash, + shard_uid + ); + } + } + } } Ok(()) } @@ -5179,13 +5509,13 @@ impl<'a> ChainUpdate<'a> { fn process_apply_chunk_result( &mut self, block: &Block, - result: ApplyChunkResult, + result: ShardBlockUpdateResult, ) -> Result<(), Error> { let block_hash = block.hash(); let prev_hash = block.header().prev_hash(); let height = block.header().height(); match result { - ApplyChunkResult::NewChunk(NewChunkResult { + ShardBlockUpdateResult::NewChunk(NewChunkResult { gas_limit, shard_uid, apply_result, @@ -5236,7 +5566,7 @@ impl<'a> ChainUpdate<'a> { self.process_split_state(block, &shard_uid, apply_results_or_state_changes)?; } } - ApplyChunkResult::OldChunk(OldChunkResult { + ShardBlockUpdateResult::OldChunk(OldChunkResult { shard_uid, apply_result, apply_split_result_or_state_changes, @@ -5263,7 +5593,7 @@ impl<'a> ChainUpdate<'a> { self.process_split_state(block, &shard_uid, apply_results_or_state_changes)?; } } - ApplyChunkResult::StateSplit(StateSplitResult { shard_uid, results }) => { + ShardBlockUpdateResult::StateSplit(StateSplitResult { shard_uid, results }) => { self.chain_store_update .remove_state_changes_for_split_states(*block.hash(), shard_uid.shard_id()); self.process_split_state( @@ -5283,12 +5613,12 @@ impl<'a> ChainUpdate<'a> { me: &Option, block: &Block, block_preprocess_info: BlockPreprocessInfo, - apply_chunks_results: Vec>, + apply_chunks_results: Vec<(ShardId, Result)>, ) -> Result, Error> { let prev_hash = block.header().prev_hash(); - let results = apply_chunks_results.into_iter().map(|x| { + let results = apply_chunks_results.into_iter().map(|(shard_id, x)| { if let Err(err) = &x { - warn!(target: "chain", hash = %block.hash(), %err, "Error in applying chunks for block"); + warn!(target: "chain", shard_id, hash = %block.hash(), %err, "Error in applying chunk for block"); } x }).collect::, Error>>()?; @@ -5919,16 +6249,16 @@ impl<'a> ChainUpdate<'a> { pub fn do_apply_chunks( block_hash: CryptoHash, block_height: BlockHeight, - work: Vec Result + Send>>, -) -> Vec> { + work: Vec, +) -> Vec<(ShardId, Result)> { let parent_span = tracing::debug_span!(target: "chain", "do_apply_chunks", block_height, %block_hash) .entered(); work.into_par_iter() - .map(|task| { + .map(|(shard_id, task)| { // As chunks can be processed in parallel, make sure they are all tracked as children of // a single span. - task(&parent_span) + (shard_id, task(&parent_span)) }) .collect() } @@ -5988,7 +6318,7 @@ pub struct BlockCatchUpRequest { pub sync_hash: CryptoHash, pub block_hash: CryptoHash, pub block_height: BlockHeight, - pub work: Vec Result + Send>>, + pub work: Vec, } // Skip `work`, because displaying functions is not possible. @@ -6008,7 +6338,7 @@ impl Debug for BlockCatchUpRequest { pub struct BlockCatchUpResponse { pub sync_hash: CryptoHash, pub block_hash: CryptoHash, - pub results: Vec>, + pub results: Vec<(ShardId, Result)>, } /// Helper to track blocks catch up @@ -6033,7 +6363,7 @@ pub struct BlocksCatchUpState { /// preprocessing pub scheduled_blocks: HashSet, /// Map from block hashes that were processed to (saved store update, process results) - pub processed_blocks: HashMap>>, + pub processed_blocks: HashMap>>, /// Collection of block hashes that are fully processed pub done_blocks: Vec, } diff --git a/chain/chain/src/types.rs b/chain/chain/src/types.rs index 6d0c3c2d480..81d2cde513f 100644 --- a/chain/chain/src/types.rs +++ b/chain/chain/src/types.rs @@ -248,7 +248,14 @@ impl ChainGenesis { } pub enum StorageDataSource { + /// Full state data is present in DB. Db, + /// Trie is present in DB and flat storage is not. + /// Used for testing stateless validation jobs, should be removed after + /// stateless validation release. + DbTrieOnly, + /// State data is supplied from state witness, there is no state data + /// stored on disk. Recorded(PartialStorage), } diff --git a/chain/chain/src/update_shard.rs b/chain/chain/src/update_shard.rs index eca53bfd798..d0a93409c4b 100644 --- a/chain/chain/src/update_shard.rs +++ b/chain/chain/src/update_shard.rs @@ -2,7 +2,7 @@ use crate::crypto_hash_timer::CryptoHashTimer; use crate::types::{ ApplySplitStateResult, ApplySplitStateResultOrStateChanges, ApplyTransactionResult, ApplyTransactionsBlockContext, ApplyTransactionsChunkContext, RuntimeAdapter, - RuntimeStorageConfig, + RuntimeStorageConfig, StorageDataSource, }; use near_chain_primitives::Error; use near_epoch_manager::EpochManagerAdapter; @@ -45,8 +45,20 @@ pub struct StateSplitResult { pub(crate) results: Vec, } +/// Result of processing shard update, covering both stateful and stateless scenarios. #[derive(Debug)] -pub enum ApplyChunkResult { +pub enum ShardUpdateResult { + /// Stateful scenario - processed update for a single block. + Stateful(ShardBlockUpdateResult), + /// Stateless scenario - processed update based on state witness in a chunk. + /// Contains `ChunkExtra`s - results for processing updates corresponding + /// to state witness. + Stateless(Vec<(CryptoHash, ShardUId, ChunkExtra)>), +} + +/// Result for a shard update for a single block. +#[derive(Debug)] +pub enum ShardBlockUpdateResult { NewChunk(NewChunkResult), OldChunk(OldChunkResult), StateSplit(StateSplitResult), @@ -61,12 +73,14 @@ pub(crate) struct NewChunkData { pub split_state_roots: Option, pub block: ApplyTransactionsBlockContext, pub is_first_block_with_chunk_of_version: bool, + pub storage_context: StorageContext, } pub(crate) struct OldChunkData { pub prev_chunk_extra: ChunkExtra, pub split_state_roots: Option, pub block: ApplyTransactionsBlockContext, + pub storage_context: StorageContext, } pub(crate) struct StateSplitData { @@ -94,8 +108,21 @@ pub(crate) enum ShardUpdateReason { /// Information about shard to update. pub(crate) struct ShardContext { pub shard_uid: ShardUId, + /// Whether node cares about shard in this epoch. + pub cares_about_shard_this_epoch: bool, /// Whether shard layout changes in the next epoch. pub will_shard_layout_change: bool, + /// Whether transactions should be applied. + pub should_apply_transactions: bool, + /// See comment in `get_update_shard_job`. + pub need_to_split_states: bool, +} + +/// Information about storage used for applying txs and receipts. +pub(crate) struct StorageContext { + /// Data source used for processing shard update. + pub storage_data_source: StorageDataSource, + pub state_patch: SandboxStatePatch, } /// Processes shard update with given block and shard. @@ -106,14 +133,13 @@ pub(crate) fn process_shard_update( epoch_manager: &dyn EpochManagerAdapter, shard_update_reason: ShardUpdateReason, shard_context: ShardContext, - state_patch: SandboxStatePatch, -) -> Result { +) -> Result { match shard_update_reason { ShardUpdateReason::NewChunk(data) => { - apply_new_chunk(parent_span, data, shard_context, state_patch, runtime, epoch_manager) + apply_new_chunk(parent_span, data, shard_context, runtime, epoch_manager) } ShardUpdateReason::OldChunk(data) => { - apply_old_chunk(parent_span, data, shard_context, state_patch, runtime, epoch_manager) + apply_old_chunk(parent_span, data, shard_context, runtime, epoch_manager) } ShardUpdateReason::StateSplit(data) => { apply_state_split(parent_span, data, shard_context.shard_uid, runtime, epoch_manager) @@ -126,19 +152,19 @@ pub(crate) fn process_shard_update( fn apply_new_chunk( parent_span: &tracing::Span, data: NewChunkData, - shard_info: ShardContext, - state_patch: SandboxStatePatch, + shard_context: ShardContext, runtime: &dyn RuntimeAdapter, epoch_manager: &dyn EpochManagerAdapter, -) -> Result { +) -> Result { let NewChunkData { block, chunk, receipts, split_state_roots, is_first_block_with_chunk_of_version, + storage_context, } = data; - let shard_id = shard_info.shard_uid.shard_id(); + let shard_id = shard_context.shard_uid.shard_id(); let _span = tracing::debug_span!( target: "chain", parent: parent_span, @@ -152,8 +178,8 @@ fn apply_new_chunk( let storage_config = RuntimeStorageConfig { state_root: *chunk_inner.prev_state_root(), use_flat_storage: true, - source: crate::types::StorageDataSource::Db, - state_patch, + source: storage_context.storage_data_source, + state_patch: storage_context.state_patch, record_storage: false, }; match runtime.apply_transactions( @@ -170,7 +196,7 @@ fn apply_new_chunk( chunk.transactions(), ) { Ok(apply_result) => { - let apply_split_result_or_state_changes = if shard_info.will_shard_layout_change { + let apply_split_result_or_state_changes = if shard_context.will_shard_layout_change { Some(apply_split_state_changes( epoch_manager, runtime, @@ -181,9 +207,9 @@ fn apply_new_chunk( } else { None }; - Ok(ApplyChunkResult::NewChunk(NewChunkResult { + Ok(ShardBlockUpdateResult::NewChunk(NewChunkResult { gas_limit, - shard_uid: shard_info.shard_uid, + shard_uid: shard_context.shard_uid, apply_result, apply_split_result_or_state_changes, })) @@ -198,13 +224,12 @@ fn apply_new_chunk( fn apply_old_chunk( parent_span: &tracing::Span, data: OldChunkData, - shard_info: ShardContext, - state_patch: SandboxStatePatch, + shard_context: ShardContext, runtime: &dyn RuntimeAdapter, epoch_manager: &dyn EpochManagerAdapter, -) -> Result { - let OldChunkData { prev_chunk_extra, split_state_roots, block } = data; - let shard_id = shard_info.shard_uid.shard_id(); +) -> Result { + let OldChunkData { prev_chunk_extra, split_state_roots, block, storage_context } = data; + let shard_id = shard_context.shard_uid.shard_id(); let _span = tracing::debug_span!( target: "chain", parent: parent_span, @@ -215,8 +240,8 @@ fn apply_old_chunk( let storage_config = RuntimeStorageConfig { state_root: *prev_chunk_extra.state_root(), use_flat_storage: true, - source: crate::types::StorageDataSource::Db, - state_patch, + source: storage_context.storage_data_source, + state_patch: storage_context.state_patch, record_storage: false, }; match runtime.apply_transactions( @@ -233,7 +258,7 @@ fn apply_old_chunk( &[], ) { Ok(apply_result) => { - let apply_split_result_or_state_changes = if shard_info.will_shard_layout_change { + let apply_split_result_or_state_changes = if shard_context.will_shard_layout_change { Some(apply_split_state_changes( epoch_manager, runtime, @@ -244,8 +269,8 @@ fn apply_old_chunk( } else { None }; - Ok(ApplyChunkResult::OldChunk(OldChunkResult { - shard_uid: shard_info.shard_uid, + Ok(ShardBlockUpdateResult::OldChunk(OldChunkResult { + shard_uid: shard_context.shard_uid, apply_result, apply_split_result_or_state_changes, })) @@ -261,7 +286,7 @@ fn apply_state_split( shard_uid: ShardUId, runtime: &dyn RuntimeAdapter, epoch_manager: &dyn EpochManagerAdapter, -) -> Result { +) -> Result { let StateSplitData { split_state_roots, state_changes, block_height: height, block_hash } = data; let shard_id = shard_uid.shard_id(); @@ -281,7 +306,7 @@ fn apply_state_split( &next_epoch_shard_layout, state_changes, )?; - Ok(ApplyChunkResult::StateSplit(StateSplitResult { shard_uid, results })) + Ok(ShardBlockUpdateResult::StateSplit(StateSplitResult { shard_uid, results })) } /// Process ApplyTransactionResult to apply changes to split states diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index b0220c3e931..f96578da20d 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -22,6 +22,7 @@ use crate::{metrics, StatusResponse, SyncAdapter}; use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler}; use actix_rt::ArbiterHandle; use chrono::{DateTime, Utc}; +use itertools::Itertools; use near_async::messaging::{CanSend, Sender}; use near_chain::chain::{ ApplyStatePartsRequest, ApplyStatePartsResponse, BlockCatchUpRequest, BlockCatchUpResponse, @@ -1889,7 +1890,9 @@ impl Handler> for ClientActor { self.client.catchup_state_syncs.get_mut(&msg.sync_hash) { assert!(blocks_catch_up_state.scheduled_blocks.remove(&msg.block_hash)); - blocks_catch_up_state.processed_blocks.insert(msg.block_hash, msg.results); + blocks_catch_up_state + .processed_blocks + .insert(msg.block_hash, msg.results.into_iter().map(|res| res.1).collect_vec()); } else { panic!("block catch up processing result from unknown sync hash"); } diff --git a/chain/client/src/test_utils/client.rs b/chain/client/src/test_utils/client.rs index fc047e1d9b2..6ae3a89f1bb 100644 --- a/chain/client/src/test_utils/client.rs +++ b/chain/client/src/test_utils/client.rs @@ -7,6 +7,7 @@ use std::sync::{Arc, RwLock}; use crate::Client; use actix_rt::{Arbiter, System}; +use itertools::Itertools; use near_chain::chain::{do_apply_chunks, BlockCatchUpRequest}; use near_chain::resharding::StateSplitRequest; use near_chain::test_utils::{wait_for_all_blocks_in_processing, wait_for_block_in_processing}; @@ -242,7 +243,10 @@ pub fn run_catchup( )?; let mut catchup_done = true; for msg in block_messages.write().unwrap().drain(..) { - let results = do_apply_chunks(msg.block_hash, msg.block_height, msg.work); + let results = do_apply_chunks(msg.block_hash, msg.block_height, msg.work) + .into_iter() + .map(|res| res.1) + .collect_vec(); if let Some((_, _, blocks_catch_up_state)) = client.catchup_state_syncs.get_mut(&msg.sync_hash) { diff --git a/core/primitives/src/challenge.rs b/core/primitives/src/challenge.rs index 75dcfbc9226..83528717504 100644 --- a/core/primitives/src/challenge.rs +++ b/core/primitives/src/challenge.rs @@ -17,6 +17,12 @@ pub enum PartialState { TrieValues(Vec), } +impl Default for PartialState { + fn default() -> Self { + PartialState::TrieValues(vec![]) + } +} + impl PartialState { pub fn len(&self) -> usize { let Self::TrieValues(values) = self; diff --git a/core/store/src/trie/mod.rs b/core/store/src/trie/mod.rs index f08deeac356..f5e38cc3af0 100644 --- a/core/store/src/trie/mod.rs +++ b/core/store/src/trie/mod.rs @@ -60,7 +60,7 @@ pub mod update; const POISONED_LOCK_ERR: &str = "The lock was poisoned."; /// For fraud proofs -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct PartialStorage { pub nodes: PartialState, } @@ -649,6 +649,11 @@ impl Trie { } } + /// Temporary helper, must be removed after stateless validation release. + pub fn dont_charge_gas_for_trie_node_access(&mut self) { + self.charge_gas_for_trie_node_access = false; + } + /// Makes a new trie that has everything the same except that access /// through that trie accumulates a state proof for all nodes accessed. pub fn recording_reads(&self) -> Self { diff --git a/integration-tests/src/tests/client/process_blocks.rs b/integration-tests/src/tests/client/process_blocks.rs index 0b3ef875dcc..bca04270297 100644 --- a/integration-tests/src/tests/client/process_blocks.rs +++ b/integration-tests/src/tests/client/process_blocks.rs @@ -11,9 +11,12 @@ use near_async::messaging::IntoSender; use near_chain::chain::ApplyStatePartsRequest; use near_chain::test_utils::ValidatorSchedule; use near_chain::types::{LatestKnown, RuntimeAdapter}; +#[cfg(not(feature = "nightly"))] use near_chain::validate::validate_chunk_with_chunk_extra; +#[cfg(not(feature = "nightly"))] +use near_chain::ChainStore; use near_chain::{ - Block, BlockProcessingArtifact, ChainGenesis, ChainStore, ChainStoreAccess, Error, Provenance, + Block, BlockProcessingArtifact, ChainGenesis, ChainStoreAccess, Error, Provenance, }; use near_chain_configs::{Genesis, DEFAULT_GC_NUM_EPOCHS_TO_KEEP}; use near_chunks::test_utils::MockClientAdapterForShardsManager; @@ -2252,7 +2255,10 @@ fn test_block_height_processed_orphan() { assert!(env.clients[0].chain.mut_store().is_height_processed(block_height).unwrap()); } +// Disabled until stateless validation release, because the test relies on +// logging which is impacted by the release process. #[test] +#[cfg(not(feature = "nightly"))] fn test_validate_chunk_extra() { let mut capture = near_o11y::testonly::TracingCapture::enable(); diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index 245aa261a3a..f42c2ab8cbf 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -856,6 +856,18 @@ impl RuntimeAdapter for NightshadeRuntime { storage_config.state_root, storage_config.use_flat_storage, )?, + StorageDataSource::DbTrieOnly => { + // If there is no flat storage on disk, use trie but simulate costs with enabled + // flat storage by not charging gas for trie nodes. + let mut trie = self.get_trie_for_shard( + shard_id, + &block.prev_block_hash, + storage_config.state_root, + false, + )?; + trie.dont_charge_gas_for_trie_node_access(); + trie + } StorageDataSource::Recorded(storage) => Trie::from_recorded_storage( storage, storage_config.state_root,