-
Notifications
You must be signed in to change notification settings - Fork 677
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(resharding): Resharding state mapping integration #12269
Merged
Merged
Changes from 5 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
353daf5
Refactor process_memtrie_resharding_storage_update
staffik b04f0d6
reshard state
staffik 103279d
testloop
staffik 8d8cb0d
Merge branch 'master' into resharding-mapping-integration
staffik 2fc80bc
Address feedback
staffik 55e3a01
nit fixes
staffik File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
use std::io; | ||
use std::sync::Arc; | ||
|
||
use super::event_type::ReshardingEventType; | ||
use super::event_type::{ReshardingEventType, ReshardingSplitShardParams}; | ||
use super::types::ReshardingSender; | ||
use crate::flat_storage_resharder::{FlatStorageResharder, FlatStorageResharderController}; | ||
use crate::types::RuntimeAdapter; | ||
|
@@ -13,7 +14,7 @@ use near_primitives::challenge::PartialState; | |
use near_primitives::hash::CryptoHash; | ||
use near_primitives::shard_layout::{get_block_shard_uid, ShardLayout}; | ||
use near_primitives::types::chunk_extra::ChunkExtra; | ||
use near_store::adapter::StoreUpdateAdapter; | ||
use near_store::adapter::{StoreAdapter, StoreUpdateAdapter}; | ||
use near_store::trie::mem::resharding::RetainMode; | ||
use near_store::{DBCol, PartialStorage, ShardTries, ShardUId, Store}; | ||
|
||
|
@@ -49,21 +50,18 @@ impl ReshardingManager { | |
Self { store, epoch_manager, resharding_config, flat_storage_resharder, resharding_handle } | ||
} | ||
|
||
/// If shard layout changes after the given block, creates temporary | ||
/// memtries for new shards to be able to process them in the next epoch. | ||
/// Note this doesn't complete resharding, proper memtries are to be | ||
/// created later. | ||
pub fn process_memtrie_resharding_storage_update( | ||
/// Trigger resharding if shard layout changes after the given block. | ||
pub fn start_resharding( | ||
&mut self, | ||
mut chain_store_update: ChainStoreUpdate, | ||
chain_store_update: ChainStoreUpdate, | ||
block: &Block, | ||
shard_uid: ShardUId, | ||
tries: ShardTries, | ||
) -> Result<(), Error> { | ||
let block_hash = block.hash(); | ||
let block_height = block.header().height(); | ||
let _span = tracing::debug_span!( | ||
target: "resharding", "process_memtrie_resharding_storage_update", | ||
target: "resharding", "start_resharding", | ||
?block_hash, block_height, ?shard_uid) | ||
.entered(); | ||
|
||
|
@@ -84,34 +82,108 @@ impl ReshardingManager { | |
tracing::debug!(target: "resharding", ?next_shard_layout, "next shard layout is not v2, skipping"); | ||
return Ok(()); | ||
} | ||
|
||
let resharding_event_type = | ||
ReshardingEventType::from_shard_layout(&next_shard_layout, *block_hash, *prev_hash)?; | ||
let Some(ReshardingEventType::SplitShard(split_shard_event)) = resharding_event_type else { | ||
tracing::debug!(target: "resharding", ?resharding_event_type, "resharding event type is not split shard, skipping"); | ||
return Ok(()); | ||
match resharding_event_type { | ||
Some(ReshardingEventType::SplitShard(split_shard_event)) => { | ||
self.split_shard( | ||
chain_store_update, | ||
block, | ||
shard_uid, | ||
tries, | ||
split_shard_event, | ||
next_shard_layout, | ||
)?; | ||
} | ||
None => { | ||
tracing::debug!(target: "resharding", ?resharding_event_type, "unsupported resharding event type, skipping"); | ||
staffik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice |
||
}; | ||
Ok(()) | ||
} | ||
|
||
fn split_shard( | ||
&mut self, | ||
chain_store_update: ChainStoreUpdate, | ||
block: &Block, | ||
shard_uid: ShardUId, | ||
tries: ShardTries, | ||
split_shard_event: ReshardingSplitShardParams, | ||
next_shard_layout: ShardLayout, | ||
) -> Result<(), Error> { | ||
if split_shard_event.parent_shard != shard_uid { | ||
let parent_shard = split_shard_event.parent_shard; | ||
tracing::debug!(target: "resharding", ?parent_shard, "shard uid does not match event parent shard, skipping"); | ||
return Ok(()); | ||
} | ||
|
||
// TODO(resharding): what if node doesn't have memtrie? just pause | ||
// processing? | ||
// TODO(resharding): fork handling. if epoch is finalized on different | ||
// blocks, the second finalization will crash. | ||
tries.freeze_mem_tries( | ||
// Reshard state by setting shard UId mapping from children to parent. | ||
staffik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.set_state_shard_uid_mapping(&split_shard_event)?; | ||
|
||
// Create temporary children memtries by freezing parent memtrie and referencing it. | ||
self.process_memtrie_resharding_storage_update( | ||
chain_store_update, | ||
block, | ||
shard_uid, | ||
vec![split_shard_event.left_child_shard, split_shard_event.right_child_shard], | ||
tries, | ||
split_shard_event.clone(), | ||
)?; | ||
|
||
// Trigger resharding of flat storage. | ||
self.flat_storage_resharder.start_resharding( | ||
ReshardingEventType::SplitShard(split_shard_event.clone()), | ||
ReshardingEventType::SplitShard(split_shard_event), | ||
&next_shard_layout, | ||
)?; | ||
|
||
let chunk_extra = self.get_chunk_extra(block_hash, &shard_uid)?; | ||
Ok(()) | ||
} | ||
|
||
/// Store in the database the mapping of shard UId from children to the parent shard, | ||
/// so that subsequent accesses to the State will use the parent shard's UId as a prefix for the database key. | ||
fn set_state_shard_uid_mapping( | ||
&mut self, | ||
split_shard_event: &ReshardingSplitShardParams, | ||
) -> io::Result<()> { | ||
let mut store_update = self.store.trie_store().store_update(); | ||
staffik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let parent_shard_uid = split_shard_event.parent_shard; | ||
// TODO(reshardingV3) No need to set the mapping for children shards that we won't track just after resharding? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call making that a todo. Let's come back to this question when we figure out the full picture for post resharding cleanup. |
||
let children_shard_uids = | ||
[split_shard_event.left_child_shard, split_shard_event.right_child_shard]; | ||
staffik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
for child_shard_uid in children_shard_uids { | ||
store_update.set_shard_uid_mapping(child_shard_uid, parent_shard_uid); | ||
} | ||
store_update.commit() | ||
} | ||
|
||
/// Creates temporary memtries for new shards to be able to process them in the next epoch. | ||
/// Note this doesn't complete memtries resharding, proper memtries are to be created later. | ||
fn process_memtrie_resharding_storage_update( | ||
&mut self, | ||
mut chain_store_update: ChainStoreUpdate, | ||
block: &Block, | ||
parent_shard_uid: ShardUId, | ||
tries: ShardTries, | ||
split_shard_event: ReshardingSplitShardParams, | ||
) -> Result<(), Error> { | ||
let block_hash = block.hash(); | ||
let block_height = block.header().height(); | ||
let _span = tracing::debug_span!( | ||
target: "resharding", "process_memtrie_resharding_storage_update", | ||
?block_hash, block_height, ?parent_shard_uid) | ||
.entered(); | ||
|
||
// TODO(resharding): what if node doesn't have memtrie? just pause | ||
// processing? | ||
// TODO(resharding): fork handling. if epoch is finalized on different | ||
// blocks, the second finalization will crash. | ||
tries.freeze_mem_tries( | ||
parent_shard_uid, | ||
vec![split_shard_event.left_child_shard, split_shard_event.right_child_shard], | ||
)?; | ||
|
||
let chunk_extra = self.get_chunk_extra(block_hash, &parent_shard_uid)?; | ||
let boundary_account = split_shard_event.boundary_account; | ||
|
||
let mut trie_store_update = self.store.store_update(); | ||
|
@@ -126,7 +198,7 @@ impl ReshardingManager { | |
"Memtrie not loaded. Cannot process memtrie resharding storage | ||
update for block {:?}, shard {:?}", | ||
block_hash, | ||
shard_uid | ||
parent_shard_uid, | ||
); | ||
return Err(Error::Other("Memtrie not loaded".to_string())); | ||
}; | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice... thanks for renaming!