From 5c90feb11691adc443d628dfe5d2624bbbcacaf0 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Fri, 17 Nov 2023 16:33:28 +0100 Subject: [PATCH] use epoch schedule for slot range assignment --- .../multiple_strategy_block_store.rs | 11 ++-- .../src/block_stores/postgres_block_store.rs | 58 +++++-------------- .../multiple_strategy_block_store_tests.rs | 19 +++++- 3 files changed, 39 insertions(+), 49 deletions(-) diff --git a/history/src/block_stores/multiple_strategy_block_store.rs b/history/src/block_stores/multiple_strategy_block_store.rs index 0b43ced2..2e4856fd 100644 --- a/history/src/block_stores/multiple_strategy_block_store.rs +++ b/history/src/block_stores/multiple_strategy_block_store.rs @@ -79,15 +79,14 @@ impl MultipleStrategyBlockStorage { // 2.2. if not: try to fetch from faithful - let persistent_block_range = self.persistent_block_storage.get_slot_range().await; - match persistent_block_range.contains(&slot) { + match self.persistent_block_storage.is_block_in_range(slot).await { true => { debug!( - "Assume block {} to be available in persistent block-storage (min-max slot range {:?})", - slot, persistent_block_range + "Assume block {} to be available in persistent block-storage", + slot, ); let lookup = self.persistent_block_storage.query(slot).await - .context(format!("block not found although it was in range {:?}", persistent_block_range)); + .context(format!("block not found although it was in range")); return lookup.map(|b| BlockStorageData { block: b, @@ -149,7 +148,7 @@ impl BlockStorageInterface for MultipleStrategyBlockStorage { } let merged = RangeInclusive::new(lower, *persistent_storage_range.end()); - trace!("Merged range from databse + faithful: {:?}", merged); + trace!("Merged range from database + faithful: {:?}", merged); return merged; } diff --git a/history/src/block_stores/postgres_block_store.rs b/history/src/block_stores/postgres_block_store.rs index 2440131b..25b83bff 100644 --- a/history/src/block_stores/postgres_block_store.rs +++ b/history/src/block_stores/postgres_block_store.rs @@ -37,12 +37,12 @@ pub struct PostgresData { #[derive(Clone)] pub struct PostgresBlockStore { session_cache: PostgresSessionCache, - epoch_cache: EpochCache, + epoch_schedule: EpochCache, // postgres_data: Arc>, } impl PostgresBlockStore { - pub async fn new(epoch_cache: EpochCache) -> Self { + pub async fn new(epoch_schedule: EpochCache) -> Self { let session_cache = PostgresSessionCache::new().await.unwrap(); // let postgres_data = Arc::new(RwLock::new(PostgresData::default())); @@ -50,7 +50,7 @@ impl PostgresBlockStore { Self { session_cache, - epoch_cache, + epoch_schedule, // postgres_data, } } @@ -140,46 +140,26 @@ impl PostgresBlockStore { .expect("should get new postgres session") } + pub async fn is_block_in_range(&self, slot: Slot) -> bool { + let epoch = self.epoch_schedule.get_epoch_at_slot(slot); + let ranges = self.get_slot_range_by_epoch().await; + let matching_range: Option<&RangeInclusive> = ranges.get(&epoch.into()); + + matching_range.map(|slot_range| slot_range.contains(&slot)).is_some() + } + pub async fn query(&self, slot: Slot) -> Result { let started = Instant::now(); - let slot_ranges_by_epoch = self.get_slot_range_by_epoch().await; - - let matching_epochs: Vec = - slot_ranges_by_epoch.iter().filter_map(|(epoch, range)| { - if range.contains(&slot) { - Some(epoch) - } else { - None - } - }).cloned() - .collect_vec(); - - debug!("Postgres epoch schema matching slot {}: {:?}", slot, matching_epochs); - - - let matching_epoch = - match Uniqueness::inspect_len(matching_epochs.len()) { - Uniqueness::ExactlyOne => { - matching_epochs.iter().exactly_one().unwrap().clone() - } - Uniqueness::Multiple(_) => { - error!("Found multiple epoch schemata serving block {}: {:?}", slot, matching_epochs); - // workaround: use the latest epoch - matching_epochs.iter().max().unwrap().clone() - } - Uniqueness::Empty => { - bail!("No epoch schema found for slot {}", slot); - } - }; - - let query = PostgresBlock::build_query_statement(matching_epoch, slot); + let epoch: EpochRef = self.epoch_schedule.get_epoch_at_slot(slot).into(); + + let query = PostgresBlock::build_query_statement(epoch, slot); let block_row = self.get_session().await.query_opt( &query, &[]) .await.unwrap(); if block_row.is_none() { - bail!("Block {} not found in postgres", slot); + bail!("Block {} in epoch {} not found in postgres", slot, epoch); } let row = block_row.unwrap(); @@ -213,12 +193,6 @@ impl PostgresBlockStore { CommitmentConfig::confirmed(), ); - - println!("REWARDS::"); - println!(">> {:?}", produced_block.rewards); - println!("REWARDS::"); - - debug!("Querying produced block {} from postgres in epoch schema {} took {:.2}ms: {}/{}", produced_block.slot, epoch_schema, started.elapsed().as_secs_f64() * 1000.0, @@ -242,7 +216,7 @@ impl PostgresBlockStore { .collect_vec(); let postgres_block = PostgresBlock::from(block); - let epoch = self.epoch_cache.get_epoch_at_slot(slot); + let epoch = self.epoch_schedule.get_epoch_at_slot(slot); self.start_new_epoch_if_necessary(epoch.into()).await?; let session = self.get_session().await; diff --git a/history/tests/multiple_strategy_block_store_tests.rs b/history/tests/multiple_strategy_block_store_tests.rs index 3a53fe12..5a38056c 100644 --- a/history/tests/multiple_strategy_block_store_tests.rs +++ b/history/tests/multiple_strategy_block_store_tests.rs @@ -10,7 +10,12 @@ use solana_lite_rpc_history::{ use solana_sdk::{commitment_config::CommitmentConfig, hash::Hash}; use std::sync::Arc; use anyhow::anyhow; +use solana_rpc_client::nonblocking::rpc_client::RpcClient; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::reward_type::RewardType; +use solana_transaction_status::Reward; use solana_lite_rpc_core::structures::epoch::EpochCache; +use solana_lite_rpc_history::block_stores::multiple_strategy_block_store::BlockStorageData; use solana_lite_rpc_history::block_stores::postgres_block_store::PostgresBlockStore; pub fn create_test_block(slot: u64, commitment_config: CommitmentConfig) -> ProducedBlock { @@ -24,7 +29,13 @@ pub fn create_test_block(slot: u64, commitment_config: CommitmentConfig) -> Prod commitment_config, leader_id: None, slot, - rewards: None, + rewards: Some(vec![Reward { + pubkey: Pubkey::new_unique().to_string(), + lamports: 5000, + post_balance: 1000000, + reward_type: Some(RewardType::Voting), + commission: None, + }]), } } @@ -60,4 +71,10 @@ async fn test_in_multiple_stategy_block_store() { // not in range assert!(multi_store.query_block(9999).await.is_err()); + + let block_1200: BlockStorageData = multi_store.query_block(1200).await.unwrap(); + assert_eq!(1, block_1200.rewards.as_ref().unwrap().len()); + assert_eq!(5000, block_1200.rewards.as_ref().unwrap().get(0).unwrap().lamports); + } +