Skip to content

Commit

Permalink
use epoch schedule for slot range assignment
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Nov 17, 2023
1 parent 9e45f88 commit 5c90feb
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 49 deletions.
11 changes: 5 additions & 6 deletions history/src/block_stores/multiple_strategy_block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,14 @@ impl MultipleStrategyBlockStorage {
// 2.2. if not: try to fetch from faithful


let persistent_block_range = self.persistent_block_storage.get_slot_range().await;
match persistent_block_range.contains(&slot) {
match self.persistent_block_storage.is_block_in_range(slot).await {
true => {
debug!(
"Assume block {} to be available in persistent block-storage (min-max slot range {:?})",
slot, persistent_block_range
"Assume block {} to be available in persistent block-storage",
slot,
);
let lookup = self.persistent_block_storage.query(slot).await
.context(format!("block not found although it was in range {:?}", persistent_block_range));
.context(format!("block not found although it was in range"));

return lookup.map(|b| BlockStorageData {
block: b,
Expand Down Expand Up @@ -149,7 +148,7 @@ impl BlockStorageInterface for MultipleStrategyBlockStorage {
}

let merged = RangeInclusive::new(lower, *persistent_storage_range.end());
trace!("Merged range from databse + faithful: {:?}", merged);
trace!("Merged range from database + faithful: {:?}", merged);

return merged;
}
Expand Down
58 changes: 16 additions & 42 deletions history/src/block_stores/postgres_block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,20 @@ pub struct PostgresData {
#[derive(Clone)]
pub struct PostgresBlockStore {
session_cache: PostgresSessionCache,
epoch_cache: EpochCache,
epoch_schedule: EpochCache,
// postgres_data: Arc<RwLock<PostgresData>>,
}

impl PostgresBlockStore {
pub async fn new(epoch_cache: EpochCache) -> Self {
pub async fn new(epoch_schedule: EpochCache) -> Self {
let session_cache = PostgresSessionCache::new().await.unwrap();
// let postgres_data = Arc::new(RwLock::new(PostgresData::default()));

Self::check_role(&session_cache).await;

Self {
session_cache,
epoch_cache,
epoch_schedule,
// postgres_data,
}
}
Expand Down Expand Up @@ -140,46 +140,26 @@ impl PostgresBlockStore {
.expect("should get new postgres session")
}

pub async fn is_block_in_range(&self, slot: Slot) -> bool {
let epoch = self.epoch_schedule.get_epoch_at_slot(slot);
let ranges = self.get_slot_range_by_epoch().await;
let matching_range: Option<&RangeInclusive<Slot>> = ranges.get(&epoch.into());

matching_range.map(|slot_range| slot_range.contains(&slot)).is_some()
}

pub async fn query(&self, slot: Slot) -> Result<ProducedBlock> {
let started = Instant::now();
let slot_ranges_by_epoch = self.get_slot_range_by_epoch().await;

let matching_epochs: Vec<EpochRef> =
slot_ranges_by_epoch.iter().filter_map(|(epoch, range)| {
if range.contains(&slot) {
Some(epoch)
} else {
None
}
}).cloned()
.collect_vec();

debug!("Postgres epoch schema matching slot {}: {:?}", slot, matching_epochs);


let matching_epoch =
match Uniqueness::inspect_len(matching_epochs.len()) {
Uniqueness::ExactlyOne => {
matching_epochs.iter().exactly_one().unwrap().clone()
}
Uniqueness::Multiple(_) => {
error!("Found multiple epoch schemata serving block {}: {:?}", slot, matching_epochs);
// workaround: use the latest epoch
matching_epochs.iter().max().unwrap().clone()
}
Uniqueness::Empty => {
bail!("No epoch schema found for slot {}", slot);
}
};

let query = PostgresBlock::build_query_statement(matching_epoch, slot);
let epoch: EpochRef = self.epoch_schedule.get_epoch_at_slot(slot).into();

let query = PostgresBlock::build_query_statement(epoch, slot);
let block_row = self.get_session().await.query_opt(
&query,
&[])
.await.unwrap();

if block_row.is_none() {
bail!("Block {} not found in postgres", slot);
bail!("Block {} in epoch {} not found in postgres", slot, epoch);
}

let row = block_row.unwrap();
Expand Down Expand Up @@ -213,12 +193,6 @@ impl PostgresBlockStore {
CommitmentConfig::confirmed(),
);


println!("REWARDS::");
println!(">> {:?}", produced_block.rewards);
println!("REWARDS::");


debug!("Querying produced block {} from postgres in epoch schema {} took {:.2}ms: {}/{}",
produced_block.slot, epoch_schema,
started.elapsed().as_secs_f64() * 1000.0,
Expand All @@ -242,7 +216,7 @@ impl PostgresBlockStore {
.collect_vec();
let postgres_block = PostgresBlock::from(block);

let epoch = self.epoch_cache.get_epoch_at_slot(slot);
let epoch = self.epoch_schedule.get_epoch_at_slot(slot);
self.start_new_epoch_if_necessary(epoch.into()).await?;

let session = self.get_session().await;
Expand Down
19 changes: 18 additions & 1 deletion history/tests/multiple_strategy_block_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ use solana_lite_rpc_history::{
use solana_sdk::{commitment_config::CommitmentConfig, hash::Hash};
use std::sync::Arc;
use anyhow::anyhow;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::reward_type::RewardType;
use solana_transaction_status::Reward;
use solana_lite_rpc_core::structures::epoch::EpochCache;
use solana_lite_rpc_history::block_stores::multiple_strategy_block_store::BlockStorageData;
use solana_lite_rpc_history::block_stores::postgres_block_store::PostgresBlockStore;

pub fn create_test_block(slot: u64, commitment_config: CommitmentConfig) -> ProducedBlock {
Expand All @@ -24,7 +29,13 @@ pub fn create_test_block(slot: u64, commitment_config: CommitmentConfig) -> Prod
commitment_config,
leader_id: None,
slot,
rewards: None,
rewards: Some(vec![Reward {
pubkey: Pubkey::new_unique().to_string(),
lamports: 5000,
post_balance: 1000000,
reward_type: Some(RewardType::Voting),
commission: None,
}]),
}
}

Expand Down Expand Up @@ -60,4 +71,10 @@ async fn test_in_multiple_stategy_block_store() {
// not in range
assert!(multi_store.query_block(9999).await.is_err());


let block_1200: BlockStorageData = multi_store.query_block(1200).await.unwrap();
assert_eq!(1, block_1200.rewards.as_ref().unwrap().len());
assert_eq!(5000, block_1200.rewards.as_ref().unwrap().get(0).unwrap().lamports);

}

0 comments on commit 5c90feb

Please sign in to comment.