Skip to content

Commit

Permalink
feat(archival store): Prep refactorings as initial steps (#12549)
Browse files Browse the repository at this point in the history
- Move `cold_storage.rs` to a new directory named `archive`.
- Remove `cold_store` argument and only use the `ColdDB` for reading the
head from the cold storage.
- Add extra logging to measure the number of keys and data size written.
  • Loading branch information
tayfunelmas authored Dec 6, 2024
1 parent 823dc6e commit 80d8e2c
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use near_primitives::types::BlockHeight;
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use std::collections::HashMap;
use std::io;
use std::sync::Arc;
use strum::IntoEnumIterator;

type StoreKey = Vec<u8>;
Expand Down Expand Up @@ -47,7 +48,7 @@ pub trait ColdMigrationStore {
/// [`write`] is called every time `transaction_size` overgrows `threshold_transaction_size`.
/// [`write`] should also be called manually before dropping BatchTransaction to write any leftovers.
struct BatchTransaction {
cold_db: std::sync::Arc<ColdDB>,
cold_db: Arc<ColdDB>,
transaction: DBTransaction,
/// Size of all values keys and values in `transaction` in bytes.
transaction_size: usize,
Expand Down Expand Up @@ -172,6 +173,8 @@ fn copy_state_from_store(
let _span = tracing::debug_span!(target: "cold_store", "copy_state_from_store", %col);
let instant = std::time::Instant::now();

let mut total_keys = 0;
let mut total_size = 0;
let mut transaction = DBTransaction::new();
for shard_uid in shard_layout.shard_uids() {
debug_assert_eq!(
Expand All @@ -185,11 +188,13 @@ fn copy_state_from_store(
hot_store.get_ser::<TrieChanges>(DBCol::TrieChanges, &key)?;

let Some(trie_changes) = trie_changes else { continue };
total_keys += trie_changes.insertions().len();
for op in trie_changes.insertions() {
// TODO(reshardingV3) Handle shard_uid not mapped there
let key = join_two_keys(&shard_uid_key, op.hash().as_bytes());
let value = op.payload().to_vec();

total_size += value.len();
tracing::trace!(target: "cold_store", pretty_key=?near_fmt::StorageKey(&key), "copying state node to colddb");
rc_aware_set(&mut transaction, DBCol::State, key, value);
}
Expand All @@ -201,7 +206,7 @@ fn copy_state_from_store(
cold_db.write(transaction)?;
let write_duration = instant.elapsed();

tracing::trace!(target: "cold_store", ?read_duration, ?write_duration, "finished");
tracing::trace!(target: "cold_store", ?total_keys, ?total_size, ?read_duration, ?write_duration, "copy_state_from_store finished");

Ok(())
}
Expand All @@ -225,6 +230,7 @@ fn copy_from_store(

let mut transaction = DBTransaction::new();
let mut good_keys = 0;
let mut total_size = 0;
let total_keys = keys.len();
for key in keys {
// TODO: Look into using RocksDB’s multi_key function. It
Expand All @@ -241,6 +247,7 @@ fn copy_from_store(
// re-adding the reference count.

good_keys += 1;
total_size += value.len();
rc_aware_set(&mut transaction, col, key, value);
}
}
Expand All @@ -251,7 +258,7 @@ fn copy_from_store(
cold_db.write(transaction)?;
let write_duration = instant.elapsed();

tracing::trace!(target: "cold_store", ?col, ?good_keys, ?total_keys, ?read_duration, ?write_duration, "finished");
tracing::trace!(target: "cold_store", ?col, ?good_keys, ?total_keys, ?total_size, ?read_duration, ?write_duration, "copy_from_store finished");

return Ok(());
}
Expand All @@ -264,6 +271,7 @@ fn copy_from_store(
/// This method relies on the fact that BlockHeight and BlockHeader are not garbage collectable.
/// (to construct the Tip we query hot_store for block hash and block header)
/// If this is to change, caller should be careful about `height` not being garbage collected in hot storage yet.
// TODO: Remove this and use `ArchivalStore::update_head` instead, once the archival storage logic is updated to use `ArchivalStore`.
pub fn update_cold_head(
cold_db: &ColdDB,
hot_store: &Store,
Expand Down Expand Up @@ -304,6 +312,15 @@ pub fn update_cold_head(
return Ok(());
}

/// Reads the cold-head from the Cold DB.
pub fn get_cold_head(cold_db: &ColdDB) -> io::Result<Option<Tip>> {
cold_db
.get_raw_bytes(DBCol::BlockMisc, HEAD_KEY)?
.as_deref()
.map(Tip::try_from_slice)
.transpose()
}

pub enum CopyAllDataToColdStatus {
EverythingCopied,
Interrupted,
Expand All @@ -312,10 +329,10 @@ pub enum CopyAllDataToColdStatus {
/// Copies all contents of all cold columns from `hot_store` to `cold_db`.
/// Does it column by column, and because columns can be huge, writes in batches of ~`batch_size`.
pub fn copy_all_data_to_cold(
cold_db: std::sync::Arc<ColdDB>,
cold_db: Arc<ColdDB>,
hot_store: &Store,
batch_size: usize,
keep_going: &std::sync::Arc<std::sync::atomic::AtomicBool>,
keep_going: &Arc<std::sync::atomic::AtomicBool>,
) -> io::Result<CopyAllDataToColdStatus> {
for col in DBCol::iter() {
if col.is_cold() {
Expand Down Expand Up @@ -580,7 +597,7 @@ impl ColdMigrationStore for Store {
}

impl BatchTransaction {
pub fn new(cold_db: std::sync::Arc<ColdDB>, batch_size: usize) -> Self {
pub fn new(cold_db: Arc<ColdDB>, batch_size: usize) -> Self {
Self {
cold_db,
transaction: DBTransaction::new(),
Expand Down
1 change: 1 addition & 0 deletions core/store/src/archive/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod cold_storage;
2 changes: 1 addition & 1 deletion core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use std::{fmt, io};
use strum;

pub mod adapter;
pub mod cold_storage;
pub mod archive;
mod columns;
pub mod config;
pub mod contract;
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/src/tests/client/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use near_primitives::transaction::{
};
use near_primitives_core::hash::CryptoHash;
use near_primitives_core::types::AccountId;
use near_store::cold_storage::{
use near_store::archive::cold_storage::{
copy_all_data_to_cold, test_cold_genesis_update, test_get_store_initial_writes,
test_get_store_reads, update_cold_db, update_cold_head,
};
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/src/tests/client/process_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use near_primitives::views::{
};
use near_primitives_core::num_rational::{Ratio, Rational32};
use near_store::adapter::StoreUpdateAdapter;
use near_store::cold_storage::{update_cold_db, update_cold_head};
use near_store::archive::cold_storage::{update_cold_db, update_cold_head};
use near_store::metadata::DbKind;
use near_store::metadata::DB_VERSION;
use near_store::test_utils::create_test_node_storage_with_cold;
Expand Down
43 changes: 15 additions & 28 deletions nearcore/src/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use near_chain::types::Tip;
use near_epoch_manager::{EpochManagerAdapter, EpochManagerHandle};
use near_primitives::errors::EpochError;
use near_primitives::{hash::CryptoHash, types::BlockHeight};
use near_store::cold_storage::{copy_all_data_to_cold, CopyAllDataToColdStatus};
use near_store::config::SplitStorageConfig;
use near_store::{
cold_storage::{update_cold_db, update_cold_head},
archive::cold_storage::{
copy_all_data_to_cold, get_cold_head, update_cold_db, update_cold_head,
CopyAllDataToColdStatus,
},
db::ColdDB,
DBCol, NodeStorage, Store, FINAL_HEAD_KEY, HEAD_KEY, TAIL_KEY,
DBCol, NodeStorage, Store, FINAL_HEAD_KEY, TAIL_KEY,
};

use crate::{metrics, NearConfig};
Expand Down Expand Up @@ -84,14 +86,13 @@ impl From<EpochError> for ColdStoreError {
/// Updates cold store head after.
fn cold_store_copy(
hot_store: &Store,
cold_store: &Store,
cold_db: &Arc<ColdDB>,
cold_db: &ColdDB,
genesis_height: BlockHeight,
epoch_manager: &EpochManagerHandle,
num_threads: usize,
) -> anyhow::Result<ColdStoreCopyResult, ColdStoreError> {
// If HEAD is not set for cold storage we default it to genesis_height.
let cold_head = cold_store.get_ser::<Tip>(DBCol::BlockMisc, HEAD_KEY)?;
let cold_head = get_cold_head(cold_db)?;
let cold_head_height = cold_head.map_or(genesis_height, |tip| tip.height);

// If FINAL_HEAD is not set for hot storage we default it to genesis_height.
Expand Down Expand Up @@ -152,10 +153,10 @@ fn cold_store_copy(
// * cold head >= hot tail
fn sanity_check(
hot_store: &Store,
cold_store: &Store,
cold_db: &ColdDB,
genesis_height: BlockHeight,
) -> anyhow::Result<()> {
let cold_head = cold_store.get_ser::<Tip>(DBCol::BlockMisc, HEAD_KEY)?;
let cold_head = get_cold_head(cold_db)?;
let cold_head_height = cold_head.map_or(genesis_height, |tip| tip.height);

let hot_final_head = hot_store.get_ser::<Tip>(DBCol::BlockMisc, FINAL_HEAD_KEY)?;
Expand Down Expand Up @@ -253,12 +254,11 @@ fn cold_store_migration(
keep_going: &Arc<AtomicBool>,
genesis_height: BlockHeight,
hot_store: &Store,
cold_store: &Store,
cold_db: &Arc<ColdDB>,
cold_db: Arc<ColdDB>,
) -> anyhow::Result<ColdStoreMigrationResult> {
// Migration is only needed if cold storage is not properly initialised,
// i.e. if cold head is not set.
if cold_store.get(DBCol::BlockMisc, HEAD_KEY)?.is_some() {
if get_cold_head(cold_db.as_ref())?.is_some() {
return Ok(ColdStoreMigrationResult::NoNeedForMigration);
}

Expand Down Expand Up @@ -291,7 +291,7 @@ fn cold_store_migration(
match copy_all_data_to_cold(cold_db.clone(), hot_store, batch_size, keep_going)? {
CopyAllDataToColdStatus::EverythingCopied => {
tracing::info!(target: "cold_store", new_cold_height, "Cold storage population was successful, writing cold head.");
update_cold_head(cold_db, hot_store, &new_cold_height)?;
update_cold_head(cold_db.as_ref(), hot_store, &new_cold_height)?;
Ok(ColdStoreMigrationResult::SuccessfulMigration)
}
CopyAllDataToColdStatus::Interrupted => {
Expand All @@ -309,7 +309,6 @@ fn cold_store_migration_loop(
keep_going: &Arc<AtomicBool>,
genesis_height: BlockHeight,
hot_store: &Store,
cold_store: &Store,
cold_db: Arc<ColdDB>,
) {
tracing::info!(target: "cold_store", "starting initial migration loop");
Expand All @@ -323,8 +322,7 @@ fn cold_store_migration_loop(
keep_going,
genesis_height,
hot_store,
cold_store,
&cold_db,
cold_db.clone(),
) {
// We can either stop the cold store thread or hope that next time migration will not fail.
// Here we pick the second option.
Expand Down Expand Up @@ -357,7 +355,6 @@ fn cold_store_loop(
split_storage_config: &SplitStorageConfig,
keep_going: &Arc<AtomicBool>,
hot_store: Store,
cold_store: Store,
cold_db: Arc<ColdDB>,
genesis_height: BlockHeight,
epoch_manager: &EpochManagerHandle,
Expand All @@ -373,8 +370,7 @@ fn cold_store_loop(
let instant = std::time::Instant::now();
let result = cold_store_copy(
&hot_store,
&cold_store,
&cold_db,
cold_db.as_ref(),
genesis_height,
epoch_manager,
split_storage_config.num_cold_store_read_threads,
Expand Down Expand Up @@ -430,13 +426,6 @@ pub fn spawn_cold_store_loop(
}

let hot_store = storage.get_hot_store();
let cold_store = match storage.get_cold_store() {
Some(cold_store) => cold_store,
None => {
tracing::debug!(target : "cold_store", "Not spawning the cold store loop because cold store is not configured");
return Ok(None);
}
};
let cold_db = match storage.cold_db() {
Some(cold_db) => cold_db.clone(),
None => {
Expand All @@ -452,7 +441,7 @@ pub fn spawn_cold_store_loop(
// Perform the sanity check before spawning the thread.
// If the check fails when the node is starting it's better to just fail
// fast and crash the node immediately.
sanity_check(&hot_store, &cold_store, genesis_height)?;
sanity_check(&hot_store, cold_db.as_ref(), genesis_height)?;

let split_storage_config = config.config.split_storage.clone().unwrap_or_default();

Expand All @@ -464,14 +453,12 @@ pub fn spawn_cold_store_loop(
&keep_going_clone,
genesis_height,
&hot_store,
&cold_store,
cold_db.clone(),
);
cold_store_loop(
&split_storage_config,
&keep_going_clone,
hot_store,
cold_store,
cold_db,
genesis_height,
epoch_manager.as_ref(),
Expand Down
2 changes: 1 addition & 1 deletion tools/cold-store/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use near_epoch_manager::{EpochManager, EpochManagerAdapter, EpochManagerHandle};
use near_primitives::block::Tip;
use near_primitives::epoch_block_info::BlockInfo;
use near_primitives::hash::CryptoHash;
use near_store::cold_storage::{copy_all_data_to_cold, update_cold_db, update_cold_head};
use near_store::archive::cold_storage::{copy_all_data_to_cold, update_cold_db, update_cold_head};
use near_store::metadata::DbKind;
use near_store::{DBCol, NodeStorage, Store, StoreOpener};
use near_store::{COLD_HEAD_KEY, FINAL_HEAD_KEY, HEAD_KEY, TAIL_KEY};
Expand Down

0 comments on commit 80d8e2c

Please sign in to comment.