Skip to content

Commit

Permalink
move .save out of trait
Browse files Browse the repository at this point in the history
grooviegermanikus committed Nov 15, 2023

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
1 parent 9d77b46 commit a012a8c
Showing 7 changed files with 62 additions and 44 deletions.
3 changes: 0 additions & 3 deletions core/src/traits/block_storage_interface.rs
Original file line number Diff line number Diff line change
@@ -7,9 +7,6 @@ use std::sync::Arc;

#[async_trait]
pub trait BlockStorageInterface: Send + Sync {
// will save a block
// TODO: slot might change for a sig if the block gets confirmed/finalized
async fn save(&self, block: &ProducedBlock) -> Result<()>;
// will get a block
async fn get(&self, slot: Slot) -> Result<ProducedBlock>;
// will get range of slots that are stored in the storage
13 changes: 7 additions & 6 deletions history/src/block_stores/inmemory_block_store.rs
Original file line number Diff line number Diff line change
@@ -23,7 +23,13 @@ impl InmemoryBlockStore {
}
}

pub async fn store(&self, block: &ProducedBlock) {
pub async fn save(&self, block: ProducedBlock) -> anyhow::Result<()> {
trace!("Saving block {} to memory storage...", block.slot);
self.store(block).await;
Ok(())
}

pub async fn store(&self, block: ProducedBlock) {
let slot = block.slot;
let mut block_storage = self.block_storage.write().await;
let min_slot = match block_storage.first_key_value() {
@@ -54,11 +60,6 @@ impl InmemoryBlockStore {

#[async_trait]
impl BlockStorageInterface for InmemoryBlockStore {
async fn save(&self, block: &ProducedBlock) -> anyhow::Result<()> {
trace!("Saving block {} to memory storage...", block.slot);
self.store(block).await;
Ok(())
}

async fn get(&self, slot: Slot) -> anyhow::Result<ProducedBlock> {
self.block_storage
16 changes: 9 additions & 7 deletions history/src/block_stores/multiple_strategy_block_store.rs
Original file line number Diff line number Diff line change
@@ -20,18 +20,19 @@ use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use crate::block_stores::postgres_block_store::PostgresBlockStore;

pub struct MultipleStrategyBlockStorage {
inmemory_for_storage: InmemoryBlockStore, // for confirmed blocks
persistent_block_storage: BlockStorageImpl, // for persistent block storage
persistent_block_storage: PostgresBlockStore, // for persistent block storage
// note supported ATM
faithful_block_storage: Option<FaithfulBlockStore>, // to fetch legacy blocks from faithful
last_confirmed_slot: Arc<AtomicU64>,
}

impl MultipleStrategyBlockStorage {
pub fn new(
persistent_block_storage: BlockStorageImpl,
persistent_block_storage: PostgresBlockStore,
_faithful_rpc_client: Option<Arc<RpcClient>>,
number_of_slots_in_memory: usize,
) -> Self {
@@ -48,11 +49,8 @@ impl MultipleStrategyBlockStorage {
pub async fn get_in_memory_block(&self, slot: Slot) -> anyhow::Result<ProducedBlock> {
self.inmemory_for_storage.get(slot).await
}
}

#[async_trait]
impl BlockStorageInterface for MultipleStrategyBlockStorage {
async fn save(&self, block: &ProducedBlock) -> Result<()> {
pub async fn save(&self, block: ProducedBlock) -> Result<()> {
trace!(
"Saving block {} using multiple-strategy facade...",
block.slot
@@ -81,7 +79,7 @@ impl BlockStorageInterface for MultipleStrategyBlockStorage {
}
Commitment::Finalized => {
// always store it
self.persistent_block_storage.save(block).await?;
self.persistent_block_storage.save(&block).await?;

let block_in_mem = self.get_in_memory_block(slot).await;
match block_in_mem {
@@ -105,6 +103,10 @@ impl BlockStorageInterface for MultipleStrategyBlockStorage {

Ok(())
}
}

#[async_trait]
impl BlockStorageInterface for MultipleStrategyBlockStorage {

async fn get(&self, slot: solana_sdk::slot_history::Slot) -> Result<ProducedBlock> {
let last_confirmed_slot = self.last_confirmed_slot.load(Ordering::Relaxed);
36 changes: 19 additions & 17 deletions history/src/block_stores/postgres_block_store.rs
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ pub struct PostgresData {
// current_epoch: Epoch,
}

#[derive(Clone)]
pub struct PostgresBlockStore {
session_cache: PostgresSessionCache,
epoch_cache: EpochCache,
@@ -133,24 +134,8 @@ impl PostgresBlockStore {
.await
.expect("should get new postgres session")
}
}

fn build_assign_permissions_statements(epoch: EpochRef) -> String {
let role = LITERPC_ROLE;
let schema = PostgresEpoch::build_schema_name(epoch);

format!(
r#"
GRANT USAGE ON SCHEMA {schema} TO {role};
GRANT ALL ON ALL TABLES IN SCHEMA {schema} TO {role};
ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT ALL ON TABLES TO {role};
"#
)
}

#[async_trait]
impl BlockStorageInterface for PostgresBlockStore {
async fn save(&self, block: &ProducedBlock) -> Result<()> {
pub async fn save(&self, block: &ProducedBlock) -> Result<()> {
let started = Instant::now();
trace!("Saving block {} to postgres storage...", block.slot);

@@ -188,6 +173,23 @@ impl BlockStorageInterface for PostgresBlockStore {
);
Ok(())
}
}

fn build_assign_permissions_statements(epoch: EpochRef) -> String {
let role = LITERPC_ROLE;
let schema = PostgresEpoch::build_schema_name(epoch);

format!(
r#"
GRANT USAGE ON SCHEMA {schema} TO {role};
GRANT ALL ON ALL TABLES IN SCHEMA {schema} TO {role};
ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT ALL ON TABLES TO {role};
"#
)
}

#[async_trait]
impl BlockStorageInterface for PostgresBlockStore {

async fn get(&self, slot: Slot) -> Result<ProducedBlock> {
let range = self.get_slot_range().await;
8 changes: 4 additions & 4 deletions history/tests/inmemory_block_store_tests.rs
Original file line number Diff line number Diff line change
@@ -24,12 +24,12 @@ pub fn create_test_block(slot: u64, commitment_config: CommitmentConfig) -> Prod
#[tokio::test]
async fn inmemory_block_store_tests() {
// will store only 10 blocks
let store: Arc<dyn BlockStorageInterface> = Arc::new(InmemoryBlockStore::new(10));
let store: Arc<InmemoryBlockStore> = Arc::new(InmemoryBlockStore::new(10));

// add 10 blocks
for i in 1..11 {
store
.save(&create_test_block(i, CommitmentConfig::finalized()))
.save(create_test_block(i, CommitmentConfig::finalized()))
.await
.unwrap();
}
@@ -40,7 +40,7 @@ async fn inmemory_block_store_tests() {
}
// add 11th block
store
.save(&create_test_block(11, CommitmentConfig::finalized()))
.save(create_test_block(11, CommitmentConfig::finalized()))
.await
.unwrap();

@@ -51,7 +51,7 @@ async fn inmemory_block_store_tests() {

// cannot add old blocks
store
.save(&create_test_block(1, CommitmentConfig::finalized()))
.save(create_test_block(1, CommitmentConfig::finalized()))
.await
.unwrap();
assert!(store.get(1).await.ok().is_none());
15 changes: 9 additions & 6 deletions history/tests/multiple_strategy_block_store_tests.rs
Original file line number Diff line number Diff line change
@@ -8,6 +8,8 @@ use solana_lite_rpc_history::{
};
use solana_sdk::{commitment_config::CommitmentConfig, hash::Hash};
use std::sync::Arc;
use solana_lite_rpc_core::structures::epoch::EpochCache;
use solana_lite_rpc_history::block_stores::postgres_block_store::PostgresBlockStore;

pub fn create_test_block(slot: u64, commitment_config: CommitmentConfig) -> ProducedBlock {
ProducedBlock {
@@ -26,7 +28,8 @@ pub fn create_test_block(slot: u64, commitment_config: CommitmentConfig) -> Prod

#[tokio::test]
async fn test_in_multiple_stategy_block_store() {
let persistent_store: Arc<dyn BlockStorageInterface> = Arc::new(InmemoryBlockStore::new(10));
let epoch_cache = EpochCache::new_for_tests();
let persistent_store = PostgresBlockStore::new(epoch_cache.clone()).await;
let number_of_slots_in_memory = 3;
let block_storage = MultipleStrategyBlockStorage::new(
persistent_store.clone(),
@@ -35,11 +38,11 @@ async fn test_in_multiple_stategy_block_store() {
);

block_storage
.save(&create_test_block(1235, CommitmentConfig::confirmed()))
.save(create_test_block(1235, CommitmentConfig::confirmed()))
.await
.unwrap();
block_storage
.save(&create_test_block(1236, CommitmentConfig::confirmed()))
.save(create_test_block(1236, CommitmentConfig::confirmed()))
.await
.unwrap();

@@ -49,15 +52,15 @@ async fn test_in_multiple_stategy_block_store() {
assert!(persistent_store.get(1236).await.ok().is_none());

block_storage
.save(&create_test_block(1235, CommitmentConfig::finalized()))
.save(create_test_block(1235, CommitmentConfig::finalized()))
.await
.unwrap();
block_storage
.save(&create_test_block(1236, CommitmentConfig::finalized()))
.save(create_test_block(1236, CommitmentConfig::finalized()))
.await
.unwrap();
block_storage
.save(&create_test_block(1237, CommitmentConfig::finalized()))
.save(create_test_block(1237, CommitmentConfig::finalized()))
.await
.unwrap();

15 changes: 14 additions & 1 deletion lite-rpc/tests/storage_integration_tests.rs
Original file line number Diff line number Diff line change
@@ -54,7 +54,7 @@ async fn storage_test() {
// note: the consumer lags far behind the ingress of blocks and transactions
fn storage_listen(
block_notifier: BlockStream,
block_storage: Arc<dyn BlockStorageInterface>,
block_storage: Arc<PostgresBlockStore>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let mut block_notifier = block_notifier;
@@ -105,6 +105,7 @@ fn storage_listen(

fn block_debug_listen(block_notifier: BlockStream) -> JoinHandle<()> {
tokio::spawn(async move {
let mut last_highest_slot_number = 0;
let mut block_notifier = block_notifier;
loop {
match block_notifier.recv().await {
@@ -114,6 +115,18 @@ fn block_debug_listen(block_notifier: BlockStream) -> JoinHandle<()> {
block.slot,
block.transactions.len()
);

// check monotony
if block.slot > last_highest_slot_number {
last_highest_slot_number = block.slot;
} else {
// note: ATM this failes very often (using the RPC poller)
warn!(
"Monotonic check failed - block {} is out of order, last highest was {}",
block.slot, last_highest_slot_number
);
}

} // -- Ok
Err(RecvError::Lagged(missed_blocks)) => {
warn!(

0 comments on commit a012a8c

Please sign in to comment.