-
Notifications
You must be signed in to change notification settings - Fork 666
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(resharding): resharding archival node #12578
Changes from all commits
f3f4c7e
fd1b7c1
4265ee7
d5da412
a728ca8
0394da3
6dea4a0
893ea1a
7e9e716
146b2aa
ed8f37d
029cd6c
fc62e55
f46714f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -173,6 +173,20 @@ impl<'a> TrieStoreUpdateAdapter<'a> { | |
} | ||
} | ||
|
||
/// Get the `ShardUId` mapping for child_shard_uid. If the mapping does not exist, map the shard to itself. | ||
/// Used by Resharding V3 for State mapping. | ||
/// | ||
/// It is kept out of `TrieStoreAdapter`, so that `TrieStoreUpdateAdapter` can use it without | ||
/// cloning `store` each time, see https://github.com/near/nearcore/pull/12232#discussion_r1804810508. | ||
pub fn get_shard_uid_mapping(store: &Store, child_shard_uid: ShardUId) -> ShardUId { | ||
store | ||
.get_ser::<ShardUId>(DBCol::StateShardUIdMapping, &child_shard_uid.to_bytes()) | ||
.unwrap_or_else(|_| { | ||
panic!("get_shard_uid_mapping() failed for child_shard_uid = {}", child_shard_uid) | ||
}) | ||
.unwrap_or(child_shard_uid) | ||
} | ||
|
||
/// Constructs db key to be used to access the State column. | ||
/// First, it consults the `StateShardUIdMapping` column to map the `shard_uid` prefix | ||
/// to its ancestor in the resharding tree (according to Resharding V3) | ||
|
@@ -186,10 +200,7 @@ fn get_key_from_shard_uid_and_hash( | |
shard_uid: ShardUId, | ||
hash: &CryptoHash, | ||
) -> [u8; 40] { | ||
let mapped_shard_uid = store | ||
.get_ser::<ShardUId>(DBCol::StateShardUIdMapping, &shard_uid.to_bytes()) | ||
.expect("get_key_from_shard_uid_and_hash() failed") | ||
.unwrap_or(shard_uid); | ||
let mapped_shard_uid = get_shard_uid_mapping(store, shard_uid); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice |
||
let mut key = [0; 40]; | ||
key[0..8].copy_from_slice(&mapped_shard_uid.to_bytes()); | ||
key[8..].copy_from_slice(hash.as_ref()); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
use crate::adapter::trie_store::get_shard_uid_mapping; | ||
use crate::columns::DBKeyType; | ||
use crate::db::{ColdDB, COLD_HEAD_KEY, HEAD_KEY}; | ||
use crate::{metrics, DBCol, DBTransaction, Database, Store, TrieChanges}; | ||
|
@@ -57,8 +58,7 @@ struct BatchTransaction { | |
} | ||
|
||
/// Updates provided cold database from provided hot store with information about block at `height`. | ||
/// Returns if the block was copied (false only if height is not present in `hot_store`). | ||
/// Block as `height` has to be final. | ||
/// Block at `height` has to be final and present in `hot_store`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it is not too hard can you assert that this is the case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it is not present, the function will return error.
|
||
/// | ||
/// First, we read from hot store information necessary | ||
/// to determine all the keys that need to be updated in cold db. | ||
|
@@ -80,31 +80,38 @@ pub fn update_cold_db( | |
hot_store: &Store, | ||
shard_layout: &ShardLayout, | ||
height: &BlockHeight, | ||
is_last_block_in_epoch: bool, | ||
num_threads: usize, | ||
) -> io::Result<bool> { | ||
) -> io::Result<()> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happened to the return value? Was is redundant? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is redundant after refactor. Before, it was used to return whether the height exist in hot store. Now we would return an error if it does not exist. |
||
let _span = tracing::debug_span!(target: "cold_store", "update cold db", height = height); | ||
let _timer = metrics::COLD_COPY_DURATION.start_timer(); | ||
|
||
if hot_store.get_for_cold(DBCol::BlockHeight, &height.to_le_bytes())?.is_none() { | ||
return Ok(false); | ||
} | ||
|
||
let height_key = height.to_le_bytes(); | ||
let block_hash_vec = hot_store.get_or_err_for_cold(DBCol::BlockHeight, &height_key)?; | ||
let block_hash_key = block_hash_vec.as_slice(); | ||
|
||
let key_type_to_keys = | ||
get_keys_from_store(&hot_store, shard_layout, &height_key, block_hash_key)?; | ||
let cold_columns = DBCol::iter().filter(|col| col.is_cold()).collect::<Vec<DBCol>>(); | ||
let columns_to_update = DBCol::iter() | ||
.filter(|col| { | ||
if !col.is_cold() { | ||
return false; | ||
} | ||
if col == &DBCol::StateShardUIdMapping && !is_last_block_in_epoch { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a big fan of this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, my preference is to copy it at each height too, for simplicity. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, personally since you make the effort to implement the logic to make the copy only at the last block, I'm ok with keeping it as is. Not a strong preference. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh sorry, didn't know it was already discussed. We can keep the majority preference |
||
return false; | ||
} | ||
true | ||
}) | ||
.collect::<Vec<DBCol>>(); | ||
|
||
// Create new thread pool with `num_threads`. | ||
rayon::ThreadPoolBuilder::new() | ||
.num_threads(num_threads) | ||
.build() | ||
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Failed to create rayon pool"))? | ||
.install(|| { | ||
cold_columns | ||
.into_par_iter() // Process every cold column as a separate task in thread pool in parallel. | ||
columns_to_update | ||
.into_par_iter() // Process cold columns to update as a separate task in thread pool in parallel. | ||
// Copy column to cold db. | ||
.map(|col: DBCol| -> io::Result<()> { | ||
if col == DBCol::State { | ||
|
@@ -127,8 +134,7 @@ pub fn update_cold_db( | |
}, | ||
) | ||
})?; | ||
|
||
Ok(true) | ||
Ok(()) | ||
} | ||
|
||
// Correctly set the key and value on DBTransaction, taking reference counting | ||
|
@@ -189,9 +195,10 @@ fn copy_state_from_store( | |
|
||
let Some(trie_changes) = trie_changes else { continue }; | ||
total_keys += trie_changes.insertions().len(); | ||
let mapped_shard_uid_key = get_shard_uid_mapping(hot_store, shard_uid).to_bytes(); | ||
for op in trie_changes.insertions() { | ||
// TODO(reshardingV3) Handle shard_uid not mapped there | ||
let key = join_two_keys(&shard_uid_key, op.hash().as_bytes()); | ||
// TODO(resharding) Test it properly. Currently this path is not triggered in testloop. | ||
let key = join_two_keys(&mapped_shard_uid_key, op.hash().as_bytes()); | ||
let value = op.payload().to_vec(); | ||
|
||
total_size += value.len(); | ||
|
@@ -343,7 +350,7 @@ pub fn copy_all_data_to_cold( | |
tracing::debug!(target: "cold_store", "stopping copy_all_data_to_cold"); | ||
return Ok(CopyAllDataToColdStatus::Interrupted); | ||
} | ||
// TODO(reshardingV3) Should do mapping here? | ||
// TODO(resharding) Should do mapping here? | ||
let (key, value) = result?; | ||
transaction.set_and_write_if_full(col, key.to_vec(), value.to_vec())?; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we push this pub function to
TrieStoreAdapter
? In cold_storage.rs we can callhot_store.trie_store().get_shard_uid_mapping(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to but it is problematic, please see #12232 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aah okay! Yes, we don't want to be cloning store I guess. Could you add a quick comment just so that next time we have context for why it's where it is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added here: f46714f