Skip to content

Commit

Permalink
Update to solana 1.18 (#397)
Browse files Browse the repository at this point in the history
* update solana from 1.17.28 -> 1.18.15
* remove legacy cu
* use yellowstone 1.15
* use tagged geyser-grpc-connector version
  • Loading branch information
grooviegermanikus authored Jun 13, 2024
1 parent 5dd6515 commit 0d1d856
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 379 deletions.
293 changes: 172 additions & 121 deletions Cargo.lock

Large diffs are not rendered by default.

34 changes: 17 additions & 17 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,25 @@ members = [

[workspace.package]
version = "0.2.4"
authors = ["gmgalactus <[email protected]>", "Aniket Prajapati <[email protected]>"]
authors = ["gmgalactus <[email protected]>", "GroovieGermanikus <[email protected]>"]
repository = "https://github.com/blockworks-foundation/lite-rpc"
license = "AGPL"
edition = "2021"

[workspace.dependencies]
solana-sdk = "~1.17.28"
solana-rpc-client = "~1.17.28"
solana-rpc-client-api = "~1.17.28"
solana-transaction-status = "~1.17.28"
solana-version = "~1.17.28"
solana-client = "~1.17.28"
solana-net-utils = "~1.17.28"
solana-pubsub-client = "~1.17.28"
solana-streamer = "~1.17.28"
solana-account-decoder = "~1.17.28"
solana-ledger = "~1.17.28"
solana-program = "~1.17.28"
solana-address-lookup-table-program = "~1.17.28"
solana-sdk = "~1.18.15"
solana-rpc-client = "~1.18.15"
solana-rpc-client-api = "~1.18.15"
solana-transaction-status = "~1.18.15"
solana-version = "~1.18.15"
solana-client = "~1.18.15"
solana-net-utils = "~1.18.15"
solana-pubsub-client = "~1.18.15"
solana-streamer = "~1.18.15"
solana-account-decoder = "~1.18.15"
solana-ledger = "~1.18.15"
solana-program = "~1.18.15"
solana-address-lookup-table-program = "~1.18.15"
itertools = "0.10.5"
rangetools = "0.1.4"
serde = { version = "1.0.160", features = ["derive"] }
Expand Down Expand Up @@ -87,9 +87,9 @@ solana-lite-rpc-accounts = {path = "accounts", version = "0.2.4"}
solana-lite-rpc-accounts-on-demand = {path = "accounts-on-demand", version = "0.2.4"}
bench = { path = "bench", version="0.2.4" }

yellowstone-grpc-proto = "1.13.0"
#geyser-grpc-connector = { path = "../../geyser-grpc-connector" }
geyser-grpc-connector = { tag = "v0.10.6+yellowstone.1.13+solana.1.17.28", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }
yellowstone-grpc-proto = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.15.0+solana.1.18" }
#geyser-grpc-connector = { path = "../geyser-grpc-connector" }
geyser-grpc-connector = { tag = "v0.10.6+yellowstone.1.15+solana.1.18", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }


async-trait = "0.1.68"
Expand Down
28 changes: 10 additions & 18 deletions cluster-endpoints/src/grpc/grpc_accounts_streaming.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::anyhow;
use futures::StreamExt;
use merge_streams::MergeStreams;
use std::{
Expand All @@ -9,7 +10,7 @@ use std::{
use geyser_grpc_connector::yellowstone_grpc_util::{
connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig,
};
use geyser_grpc_connector::{GeyserGrpcClient, GeyserGrpcClientResult, GrpcSourceConfig};
use geyser_grpc_connector::{GeyserGrpcClient, GrpcSourceConfig};
use itertools::Itertools;
use solana_lite_rpc_core::{
commitment_utils::Commitment,
Expand Down Expand Up @@ -103,19 +104,14 @@ pub fn start_account_streaming_tasks(

let program_subscription = SubscribeRequest {
accounts: subscribe_programs,
slots: Default::default(),
transactions: Default::default(),
blocks: Default::default(),
blocks_meta: Default::default(),
entry: Default::default(),
commitment: Some(processed_commitment.into()),
accounts_data_slice: Default::default(),
ping: None,
commitment: Some(processed_commitment.into()),
..Default::default()
};

let mut client = create_connection(&grpc_config).await?;

let account_stream = client.subscribe_once2(program_subscription).await.unwrap();
let account_stream = client.subscribe_once(program_subscription).await.unwrap();

// each account subscription batch will require individual stream
let mut subscriptions = vec![account_stream];
Expand All @@ -139,17 +135,12 @@ pub fn start_account_streaming_tasks(

let account_request = SubscribeRequest {
accounts: accounts_subscription,
slots: Default::default(),
transactions: Default::default(),
blocks: Default::default(),
blocks_meta: Default::default(),
entry: Default::default(),
commitment: Some(processed_commitment.into()),
accounts_data_slice: Default::default(),
ping: None,
commitment: Some(processed_commitment.into()),
..Default::default()
};

let account_stream = client.subscribe_once2(account_request).await.unwrap();
let account_stream = client.subscribe_once(account_request).await.unwrap();
subscriptions.push(account_stream);
}
let mut merged_stream = subscriptions.merge();
Expand Down Expand Up @@ -216,7 +207,7 @@ pub fn start_account_streaming_tasks(

async fn create_connection(
grpc_config: &GrpcSourceConfig,
) -> GeyserGrpcClientResult<GeyserGrpcClient<impl Interceptor + Sized>> {
) -> anyhow::Result<GeyserGrpcClient<impl Interceptor + Sized>> {
connect_with_timeout_with_buffers(
grpc_config.grpc_addr.clone(),
grpc_config.grpc_x_token.clone(),
Expand All @@ -230,6 +221,7 @@ async fn create_connection(
},
)
.await
.map_err(|e| anyhow!("Failed to connect to grpc source: {e:?}"))
}

pub fn create_grpc_account_streaming(
Expand Down
139 changes: 6 additions & 133 deletions cluster-endpoints/src/grpc_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@ use crate::grpc::grpc_accounts_streaming::create_grpc_account_streaming;
use crate::grpc_multiplex::{
create_grpc_multiplex_blocks_subscription, create_grpc_multiplex_processed_slots_subscription,
};
use anyhow::Context;
use futures::StreamExt;
use geyser_grpc_connector::yellowstone_grpc_util::{
connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig,
};
use geyser_grpc_connector::GrpcSourceConfig;
use itertools::Itertools;
use log::trace;
Expand All @@ -21,7 +16,6 @@ use solana_lite_rpc_core::{
use solana_sdk::program_utils::limited_deserialize;
use solana_sdk::vote::instruction::VoteInstruction;
use solana_sdk::{
borsh0_10::try_from_slice_unchecked,
commitment_config::CommitmentConfig,
compute_budget::{self, ComputeBudgetInstruction},
hash::Hash,
Expand All @@ -36,13 +30,9 @@ use solana_sdk::{
};
use solana_transaction_status::{Reward, RewardType};
use std::cell::OnceCell;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, Notify};
use tokio::sync::Notify;
use tracing::trace_span;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocks};

use crate::rpc_polling::vote_accounts_and_cluster_info_polling::{
poll_cluster_info, poll_vote_accounts,
Expand Down Expand Up @@ -216,18 +206,16 @@ pub fn from_grpc_block_update(

fn map_compute_budget_instructions(message: &VersionedMessage) -> (Option<u32>, Option<u64>) {
let cu_requested_cell: OnceCell<u32> = OnceCell::new();
let legacy_cu_requested_cell: OnceCell<u32> = OnceCell::new();

let prioritization_fees_cell: OnceCell<u64> = OnceCell::new();
let legacy_prio_fees_cell: OnceCell<u64> = OnceCell::new();

for compute_budget_ins in message.instructions().iter().filter(|instruction| {
instruction
.program_id(message.static_account_keys())
.eq(&compute_budget::id())
}) {
if let Ok(budget_ins) =
try_from_slice_unchecked::<ComputeBudgetInstruction>(compute_budget_ins.data.as_slice())
if let Ok(budget_ins) = solana_sdk::borsh1::try_from_slice_unchecked::<
ComputeBudgetInstruction,
>(compute_budget_ins.data.as_slice())
{
match budget_ins {
// aka cu requested
Expand All @@ -242,133 +230,18 @@ fn map_compute_budget_instructions(message: &VersionedMessage) -> (Option<u32>,
.set(price)
.expect("prioritization_fees must be set only once");
}
// legacy
ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
} => {
let _ = legacy_cu_requested_cell.set(units);
if additional_fee > 0 {
let _ = legacy_prio_fees_cell.set(((units * 1000) / additional_fee) as u64);
};
}
_ => {
trace!("skip compute budget instruction");
}
}
}
}

let cu_requested = cu_requested_cell
.get()
.or(legacy_cu_requested_cell.get())
.cloned();
let prioritization_fees = prioritization_fees_cell
.get()
.or(legacy_prio_fees_cell.get())
.cloned();
let cu_requested = cu_requested_cell.get().cloned();
let prioritization_fees = prioritization_fees_cell.get().cloned();
(cu_requested, prioritization_fees)
}

pub fn create_block_processing_task(
grpc_addr: String,
grpc_x_token: Option<String>,
block_sx: tokio::sync::mpsc::Sender<SubscribeUpdateBlock>,
commitment_level: CommitmentLevel,
mut exit_notify: broadcast::Receiver<()>,
) -> AnyhowJoinHandle {
tokio::spawn(async move {
'main_loop: loop {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"block_client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);

// connect to grpc
let mut client = connect_with_timeout_with_buffers(
grpc_addr.clone(),
grpc_x_token.clone(),
None,
Some(Duration::from_secs(10)),
Some(Duration::from_secs(10)),
GeyserGrpcClientBufferConfig {
buffer_size: Some(65536),
conn_window: Some(5242880),
stream_window: Some(4194304),
},
)
.await?;
let mut stream = tokio::select! {
res = client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
blocks_subs,
Default::default(),
Some(commitment_level),
Default::default(),
None,
) => {
res?
},
_ = exit_notify.recv() => {
break;
}
};

loop {
tokio::select! {
message = stream.next() => {
let Some(Ok(message)) = message else {
break;
};

let Some(update) = message.update_oneof else {
continue;
};

match update {
UpdateOneof::Block(block) => {
log::trace!(
"received block, hash: {} slot: {}",
block.blockhash,
block.slot
);
block_sx
.send(block)
.await
.context("Problem sending on block channel")?;
}
UpdateOneof::Ping(_) => {
log::trace!("GRPC Ping");
}
_ => {
log::trace!("unknown GRPC notification");
}
};
},
_ = exit_notify.recv() => {
break 'main_loop;
}
}
}
drop(stream);
drop(client);
log::error!("Grpc block subscription broken (resubscribing)");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Ok(())
})
}

pub fn create_grpc_subscription(
rpc_client: Arc<RpcClient>,
grpc_sources: Vec<GrpcSourceConfig>,
Expand Down
43 changes: 3 additions & 40 deletions cluster-endpoints/src/rpc_polling/poll_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use solana_lite_rpc_core::{
AnyhowJoinHandle,
};
use solana_rpc_client_api::config::RpcBlockConfig;
use solana_sdk::borsh0_10::try_from_slice_unchecked;
use solana_sdk::borsh1::try_from_slice_unchecked;
use solana_sdk::compute_budget::ComputeBudgetInstruction;
use solana_sdk::program_utils::limited_deserialize;
use solana_sdk::reward_type::RewardType;
Expand Down Expand Up @@ -222,22 +222,7 @@ pub fn from_ui_block(
_ => None,
};

let legacy_compute_budget = tx.message.instructions().iter().find_map(|i| {
if i.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
}) = try_from_slice_unchecked(i.data.as_slice())
{
return Some((units, additional_fee));
}
}
None
});

let mut cu_requested = tx.message.instructions().iter().find_map(|i| {
let cu_requested = tx.message.instructions().iter().find_map(|i| {
if i.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
Expand All @@ -250,7 +235,7 @@ pub fn from_ui_block(
None
});

let mut prioritization_fees = tx.message.instructions().iter().find_map(|i| {
let prioritization_fees = tx.message.instructions().iter().find_map(|i| {
if i.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
Expand All @@ -264,13 +249,6 @@ pub fn from_ui_block(
None
});

if let Some((units, additional_fee)) = legacy_compute_budget {
cu_requested = Some(units);
if additional_fee > 0 {
prioritization_fees = Some(calc_prioritization_fees(units, additional_fee))
}
};

let blockhash = tx.message.recent_blockhash();

let is_vote_transaction = tx.message.instructions().iter().any(|i| {
Expand Down Expand Up @@ -348,18 +326,3 @@ fn map_block_info(produced_block: &ProducedBlock) -> BlockInfo {
block_time: produced_block.block_time,
}
}

#[inline]
fn calc_prioritization_fees(units: u32, additional_fee: u32) -> u64 {
(units as u64 * 1000) / additional_fee as u64
}

#[test]
fn overflow_u32() {
// value high enough to overflow u32 if multiplied by 1000
let units: u32 = 4_000_000_000;
let additional_fee: u32 = 100;
let prioritization_fees: u64 = calc_prioritization_fees(units, additional_fee);

assert_eq!(40_000_000_000, prioritization_fees);
}
Loading

0 comments on commit 0d1d856

Please sign in to comment.