Skip to content

Commit

Permalink
Process columns in parallel in cold loop (#10437)
Browse files Browse the repository at this point in the history
  • Loading branch information
posvyatokum committed Jan 31, 2024
1 parent 15b9567 commit d229b06
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 103 deletions.
167 changes: 95 additions & 72 deletions core/store/src/cold_storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::columns::DBKeyType;
use crate::db::{ColdDB, COLD_HEAD_KEY, HEAD_KEY};
use crate::trie::TrieRefcountChange;
use crate::{metrics, DBCol, DBTransaction, Database, Store, TrieChanges};

use borsh::{BorshDeserialize, BorshSerialize};
Expand All @@ -9,17 +8,39 @@ use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardLayout;
use near_primitives::sharding::ShardChunk;
use near_primitives::types::BlockHeight;
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use std::collections::HashMap;
use std::io;
use strum::IntoEnumIterator;

type StoreKey = Vec<u8>;
type StoreValue = Option<Vec<u8>>;
type StoreCache = HashMap<(DBCol, StoreKey), StoreValue>;

struct StoreWithCache<'a> {
store: &'a Store,
cache: StoreCache,
/// This trait is used on top of Store to calculate cold loop specific metrics,
/// and implement conversion to errors for absent data.
pub trait ColdMigrationStore {
fn iter_prefix_with_callback_for_cold(
&self,
col: DBCol,
key_prefix: &[u8],
callback: impl FnMut(Box<[u8]>),
) -> io::Result<()>;

fn get_for_cold(&self, column: DBCol, key: &[u8]) -> io::Result<StoreValue>;

fn get_ser_for_cold<T: BorshDeserialize>(
&self,
column: DBCol,
key: &[u8],
) -> io::Result<Option<T>>;

fn get_or_err_for_cold(&self, column: DBCol, key: &[u8]) -> io::Result<Vec<u8>>;

fn get_ser_or_err_for_cold<T: BorshDeserialize>(
&self,
column: DBCol,
key: &[u8],
) -> io::Result<T>;
}

/// The BatchTransaction can be used to write multiple set operations to the cold db in batches.
Expand All @@ -37,7 +58,6 @@ struct BatchTransaction {
/// Updates provided cold database from provided hot store with information about block at `height`.
/// Returns if the block was copied (false only if height is not present in `hot_store`).
/// Block as `height` has to be final.
/// Wraps hot store in `StoreWithCache` for optimizing reads.
///
/// First, we read from hot store information necessary
/// to determine all the keys that need to be updated in cold db.
Expand All @@ -59,35 +79,53 @@ pub fn update_cold_db(
hot_store: &Store,
shard_layout: &ShardLayout,
height: &BlockHeight,
num_threads: usize,
) -> io::Result<bool> {
let _span = tracing::debug_span!(target: "cold_store", "update cold db", height = height);
let _timer = metrics::COLD_COPY_DURATION.start_timer();

let mut store_with_cache = StoreWithCache { store: hot_store, cache: StoreCache::new() };

if store_with_cache.get(DBCol::BlockHeight, &height.to_le_bytes())?.is_none() {
if hot_store.get_for_cold(DBCol::BlockHeight, &height.to_le_bytes())?.is_none() {
return Ok(false);
}

let height_key = height.to_le_bytes();
let block_hash_vec = store_with_cache.get_or_err(DBCol::BlockHeight, &height_key)?;
let block_hash_vec = hot_store.get_or_err_for_cold(DBCol::BlockHeight, &height_key)?;
let block_hash_key = block_hash_vec.as_slice();

let key_type_to_keys =
get_keys_from_store(&mut store_with_cache, shard_layout, &height_key, block_hash_key)?;
for col in DBCol::iter() {
if !col.is_cold() {
continue;
}

if col == DBCol::State {
copy_state_from_store(shard_layout, block_hash_key, cold_db, &mut store_with_cache)?;
continue;
}

let keys = combine_keys(&key_type_to_keys, &col.key_type());
copy_from_store(cold_db, &mut store_with_cache, col, keys)?;
}
get_keys_from_store(&hot_store, shard_layout, &height_key, block_hash_key)?;
let cold_columns = DBCol::iter().filter(|col| col.is_cold()).collect::<Vec<DBCol>>();

// Create new thread pool with `num_threads`.
rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.build()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Failed to create rayon pool"))?
.install(|| {
cold_columns
.into_par_iter() // Process every cold column as a separate task in thread pool in parallel.
// Copy column to cold db.
.map(|col: DBCol| -> io::Result<()> {
if col == DBCol::State {
copy_state_from_store(shard_layout, block_hash_key, cold_db, &hot_store)
} else {
let keys = combine_keys(&key_type_to_keys, &col.key_type());
copy_from_store(cold_db, &hot_store, col, keys)
}
})
// Return first found error, or Ok(())
.reduce(
|| Ok(()), // Ok(()) by default
// First found Err, or Ok(())g
|left, right| -> io::Result<()> {
vec![left, right]
.into_iter()
.filter(|res| res.is_err())
.next()
.unwrap_or(Ok(()))
},
)
})?;

Ok(true)
}
Expand Down Expand Up @@ -128,7 +166,7 @@ fn copy_state_from_store(
shard_layout: &ShardLayout,
block_hash_key: &[u8],
cold_db: &ColdDB,
hot_store: &mut StoreWithCache,
hot_store: &Store,
) -> io::Result<()> {
let col = DBCol::State;
let _span = tracing::debug_span!(target: "cold_store", "copy_state_from_store", %col);
Expand Down Expand Up @@ -172,7 +210,7 @@ fn copy_state_from_store(
/// Writes that transaction to cold_db.
fn copy_from_store(
cold_db: &ColdDB,
hot_store: &mut StoreWithCache,
hot_store: &Store,
col: DBCol,
keys: Vec<StoreKey>,
) -> io::Result<()> {
Expand All @@ -192,7 +230,7 @@ fn copy_from_store(
// might speed things up. Currently our Database abstraction
// doesn’t offer interface for it so that would need to be
// added.
let data = hot_store.get(col, &key)?;
let data = hot_store.get_for_cold(col, &key)?;
if let Some(value) = data {
// TODO: As an optimisation, we might consider breaking the
// abstraction layer. Since we’re always writing to cold database,
Expand Down Expand Up @@ -232,11 +270,11 @@ pub fn update_cold_head(
) -> io::Result<()> {
tracing::debug!(target: "cold_store", "update HEAD of cold db to {}", height);

let mut store = StoreWithCache { store: hot_store, cache: StoreCache::new() };

let height_key = height.to_le_bytes();
let block_hash_key = store.get_or_err(DBCol::BlockHeight, &height_key)?.as_slice().to_vec();
let tip_header = &store.get_ser_or_err::<BlockHeader>(DBCol::BlockHeader, &block_hash_key)?;
let block_hash_key =
hot_store.get_or_err_for_cold(DBCol::BlockHeight, &height_key)?.as_slice().to_vec();
let tip_header =
&hot_store.get_ser_or_err_for_cold::<BlockHeader>(DBCol::BlockHeader, &block_hash_key)?;
let tip = Tip::from_header(tip_header);

// Write HEAD to the cold db.
Expand Down Expand Up @@ -302,7 +340,6 @@ pub fn copy_all_data_to_cold(
// can be used to copy the genesis records from hot to cold.
// TODO - How did copying from genesis worked in the prod migration to split storage?
pub fn test_cold_genesis_update(cold_db: &ColdDB, hot_store: &Store) -> io::Result<()> {
let mut store_with_cache = StoreWithCache { store: hot_store, cache: StoreCache::new() };
for col in DBCol::iter() {
if !col.is_cold() {
continue;
Expand All @@ -313,7 +350,7 @@ pub fn test_cold_genesis_update(cold_db: &ColdDB, hot_store: &Store) -> io::Resu
// specialized `copy_state_from_store`.
copy_from_store(
cold_db,
&mut store_with_cache,
&hot_store,
col,
hot_store.iter(col).map(|x| x.unwrap().0.to_vec()).collect(),
)?;
Expand All @@ -338,19 +375,19 @@ pub fn test_get_store_initial_writes(column: DBCol) -> u64 {
/// For BlockHash it is just one key -- block hash of that height.
/// But for TransactionHash, for example, it is all of the tx hashes in that block.
fn get_keys_from_store(
store: &mut StoreWithCache,
store: &Store,
shard_layout: &ShardLayout,
height_key: &[u8],
block_hash_key: &[u8],
) -> io::Result<HashMap<DBKeyType, Vec<StoreKey>>> {
let mut key_type_to_keys = HashMap::new();

let block: Block = store.get_ser_or_err(DBCol::Block, &block_hash_key)?;
let block: Block = store.get_ser_or_err_for_cold(DBCol::Block, &block_hash_key)?;
let chunks = block
.chunks()
.iter()
.map(|chunk_header| {
store.get_ser_or_err(DBCol::Chunks, chunk_header.chunk_hash().as_bytes())
store.get_ser_or_err_for_cold(DBCol::Chunks, chunk_header.chunk_hash().as_bytes())
})
.collect::<io::Result<Vec<ShardChunk>>>()?;

Expand Down Expand Up @@ -382,7 +419,7 @@ fn get_keys_from_store(
// TODO: write StateChanges values to colddb directly, not to cache.
DBKeyType::TrieKey => {
let mut keys = vec![];
store.iter_prefix_with_callback(
store.iter_prefix_with_callback_for_cold(
DBCol::StateChanges,
&block_hash_key,
|full_key| {
Expand Down Expand Up @@ -490,65 +527,51 @@ where
}
}

#[allow(dead_code)]
impl StoreWithCache<'_> {
pub fn iter_prefix_with_callback(
&mut self,

impl ColdMigrationStore for Store {
fn iter_prefix_with_callback_for_cold(
&self,
col: DBCol,
key_prefix: &[u8],
mut callback: impl FnMut(Box<[u8]>),
) -> io::Result<()> {
for iter_result in self.store.iter_prefix(col, key_prefix) {
let (key, value) = iter_result?;
self.cache.insert((col, key.to_vec()), Some(value.into()));
for iter_result in self.iter_prefix(col, key_prefix) {
crate::metrics::COLD_MIGRATION_READS.with_label_values(&[<&str>::from(col)]).inc();
let (key, _) = iter_result?;
callback(key);
}
Ok(())
}

pub fn get(&mut self, column: DBCol, key: &[u8]) -> io::Result<StoreValue> {
if !self.cache.contains_key(&(column, key.to_vec())) {
crate::metrics::COLD_MIGRATION_READS.with_label_values(&[<&str>::from(column)]).inc();
self.cache.insert(
(column, key.to_vec()),
self.store.get(column, key)?.map(|x| x.as_slice().to_vec()),
);
}
Ok(self.cache[&(column, key.to_vec())].clone())
fn get_for_cold(&self, column: DBCol, key: &[u8]) -> io::Result<StoreValue> {
crate::metrics::COLD_MIGRATION_READS.with_label_values(&[<&str>::from(column)]).inc();
Ok(self.get(column, key)?.map(|x| x.as_slice().to_vec()))
}

pub fn get_ser<T: BorshDeserialize>(
&mut self,
fn get_ser_for_cold<T: BorshDeserialize>(
&self,
column: DBCol,
key: &[u8],
) -> io::Result<Option<T>> {
match self.get(column, key)? {
match self.get_for_cold(column, key)? {
Some(bytes) => Ok(Some(T::try_from_slice(&bytes)?)),
None => Ok(None),
}
}

pub fn get_or_err(&mut self, column: DBCol, key: &[u8]) -> io::Result<Vec<u8>> {
option_to_not_found(self.get(column, key), format_args!("{:?}: {:?}", column, key))
fn get_or_err_for_cold(&self, column: DBCol, key: &[u8]) -> io::Result<Vec<u8>> {
option_to_not_found(self.get_for_cold(column, key), format_args!("{:?}: {:?}", column, key))
}

pub fn get_ser_or_err<T: BorshDeserialize>(
&mut self,
fn get_ser_or_err_for_cold<T: BorshDeserialize>(
&self,
column: DBCol,
key: &[u8],
) -> io::Result<T> {
option_to_not_found(self.get_ser(column, key), format_args!("{:?}: {:?}", column, key))
}

pub fn insert_state_to_cache_from_op(&mut self, op: &TrieRefcountChange, shard_uid_key: &[u8]) {
debug_assert_eq!(
DBCol::State.key_type(),
&[DBKeyType::ShardUId, DBKeyType::TrieNodeOrValueHash]
);
self.cache.insert(
(DBCol::State, join_two_keys(shard_uid_key, op.hash().as_bytes())),
Some(op.payload().to_vec()),
);
option_to_not_found(
self.get_ser_for_cold(column, key),
format_args!("{:?}: {:?}", column, key),
)
}
}

Expand Down
37 changes: 10 additions & 27 deletions integration-tests/src/tests/client/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ fn test_storage_after_commit_of_cold_update() {
.unwrap();

let state_reads = test_get_store_reads(DBCol::State);
let state_changes_reads = test_get_store_reads(DBCol::StateChanges);

for h in 1..max_height {
let signer = InMemorySigner::from_seed("test0".parse().unwrap(), KeyType::ED25519, "test0");
Expand Down Expand Up @@ -148,26 +147,17 @@ fn test_storage_after_commit_of_cold_update() {
let block = env.clients[0].produce_block(h).unwrap().unwrap();
env.process_block(0, block.clone(), Provenance::PRODUCED);

update_cold_db(
&*storage.cold_db().unwrap(),
&env.clients[0].runtime_adapter.store(),
&env.clients[0]
.epoch_manager
.get_shard_layout(
&env.clients[0].epoch_manager.get_epoch_id_from_prev_block(&last_hash).unwrap(),
)
.unwrap(),
&h,
)
.unwrap();
let client = &env.clients[0];
let client_store = client.runtime_adapter.store();
let epoch_id = client.epoch_manager.get_epoch_id_from_prev_block(&last_hash).unwrap();
let shard_layout = client.epoch_manager.get_shard_layout(&epoch_id).unwrap();
update_cold_db(cold_db, &client_store, &shard_layout, &height, 4).unwrap();

last_hash = *block.hash();
}

// assert that we don't read State from db, but from TrieChanges
assert_eq!(state_reads, test_get_store_reads(DBCol::State));
// assert that we don't read StateChanges from db again after iter_prefix
assert_eq!(state_changes_reads, test_get_store_reads(DBCol::StateChanges));

// We still need to filter out one chunk
let mut no_check_rules: Vec<Box<dyn Fn(DBCol, &Box<[u8]>, &Box<[u8]>) -> bool>> = vec![];
Expand Down Expand Up @@ -320,18 +310,11 @@ fn test_cold_db_copy_with_height_skips() {
}
};

update_cold_db(
&*storage.cold_db().unwrap(),
&env.clients[0].runtime_adapter.store(),
&env.clients[0]
.epoch_manager
.get_shard_layout(
&env.clients[0].epoch_manager.get_epoch_id_from_prev_block(&last_hash).unwrap(),
)
.unwrap(),
&h,
)
.unwrap();
let client = &env.clients[0];
let epoch_id = client.epoch_manager.get_epoch_id_from_prev_block(&last_hash).unwrap();
let shard_layout = client.epoch_manager.get_shard_layout(&epoch_id).unwrap();
update_cold_db(&cold_db, &client.runtime_adapter.store(), &shard_layout, &height, 1)
.unwrap();

if block.is_some() {
last_hash = *block.unwrap().hash();
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/src/tests/client/process_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1603,7 +1603,7 @@ fn test_archival_gc_common(
blocks.push(block);

if i <= max_cold_head_height {
update_cold_db(storage.cold_db().unwrap(), hot_store, &shard_layout, &i).unwrap();
update_cold_db(storage.cold_db().unwrap(), hot_store, &shard_layout, &i, 1).unwrap();
update_cold_head(storage.cold_db().unwrap(), &hot_store, &i).unwrap();
}
}
Expand Down
Loading

0 comments on commit d229b06

Please sign in to comment.