Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to solana 1.18 #397

Merged
merged 6 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 172 additions & 121 deletions Cargo.lock

Large diffs are not rendered by default.

34 changes: 17 additions & 17 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,25 @@ members = [

[workspace.package]
version = "0.2.4"
authors = ["gmgalactus <[email protected]>", "Aniket Prajapati <[email protected]>"]
authors = ["gmgalactus <[email protected]>", "GroovieGermanikus <[email protected]>"]
repository = "https://github.com/blockworks-foundation/lite-rpc"
license = "AGPL"
edition = "2021"

[workspace.dependencies]
solana-sdk = "~1.17.28"
solana-rpc-client = "~1.17.28"
solana-rpc-client-api = "~1.17.28"
solana-transaction-status = "~1.17.28"
solana-version = "~1.17.28"
solana-client = "~1.17.28"
solana-net-utils = "~1.17.28"
solana-pubsub-client = "~1.17.28"
solana-streamer = "~1.17.28"
solana-account-decoder = "~1.17.28"
solana-ledger = "~1.17.28"
solana-program = "~1.17.28"
solana-address-lookup-table-program = "~1.17.28"
solana-sdk = "~1.18.15"
solana-rpc-client = "~1.18.15"
solana-rpc-client-api = "~1.18.15"
solana-transaction-status = "~1.18.15"
solana-version = "~1.18.15"
solana-client = "~1.18.15"
solana-net-utils = "~1.18.15"
solana-pubsub-client = "~1.18.15"
solana-streamer = "~1.18.15"
solana-account-decoder = "~1.18.15"
solana-ledger = "~1.18.15"
solana-program = "~1.18.15"
solana-address-lookup-table-program = "~1.18.15"
itertools = "0.10.5"
rangetools = "0.1.4"
serde = { version = "1.0.160", features = ["derive"] }
Expand Down Expand Up @@ -87,9 +87,9 @@ solana-lite-rpc-accounts = {path = "accounts", version = "0.2.4"}
solana-lite-rpc-accounts-on-demand = {path = "accounts-on-demand", version = "0.2.4"}
bench = { path = "bench", version="0.2.4" }

yellowstone-grpc-proto = "1.13.0"
#geyser-grpc-connector = { path = "../../geyser-grpc-connector" }
geyser-grpc-connector = { tag = "v0.10.6+yellowstone.1.13+solana.1.17.28", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }
yellowstone-grpc-proto = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.15.0+solana.1.18" }
#geyser-grpc-connector = { path = "../geyser-grpc-connector" }
geyser-grpc-connector = { tag = "v0.10.6+yellowstone.1.15+solana.1.18", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }


async-trait = "0.1.68"
Expand Down
28 changes: 10 additions & 18 deletions cluster-endpoints/src/grpc/grpc_accounts_streaming.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::anyhow;
use futures::StreamExt;
use merge_streams::MergeStreams;
use std::{
Expand All @@ -9,7 +10,7 @@ use std::{
use geyser_grpc_connector::yellowstone_grpc_util::{
connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig,
};
use geyser_grpc_connector::{GeyserGrpcClient, GeyserGrpcClientResult, GrpcSourceConfig};
use geyser_grpc_connector::{GeyserGrpcClient, GrpcSourceConfig};
use itertools::Itertools;
use solana_lite_rpc_core::{
commitment_utils::Commitment,
Expand Down Expand Up @@ -103,19 +104,14 @@ pub fn start_account_streaming_tasks(

let program_subscription = SubscribeRequest {
accounts: subscribe_programs,
slots: Default::default(),
transactions: Default::default(),
blocks: Default::default(),
blocks_meta: Default::default(),
entry: Default::default(),
commitment: Some(processed_commitment.into()),
accounts_data_slice: Default::default(),
ping: None,
commitment: Some(processed_commitment.into()),
..Default::default()
};

let mut client = create_connection(&grpc_config).await?;

let account_stream = client.subscribe_once2(program_subscription).await.unwrap();
let account_stream = client.subscribe_once(program_subscription).await.unwrap();

// each account subscription batch will require individual stream
let mut subscriptions = vec![account_stream];
Expand All @@ -139,17 +135,12 @@ pub fn start_account_streaming_tasks(

let account_request = SubscribeRequest {
accounts: accounts_subscription,
slots: Default::default(),
transactions: Default::default(),
blocks: Default::default(),
blocks_meta: Default::default(),
entry: Default::default(),
commitment: Some(processed_commitment.into()),
accounts_data_slice: Default::default(),
ping: None,
commitment: Some(processed_commitment.into()),
..Default::default()
};

let account_stream = client.subscribe_once2(account_request).await.unwrap();
let account_stream = client.subscribe_once(account_request).await.unwrap();
subscriptions.push(account_stream);
}
let mut merged_stream = subscriptions.merge();
Expand Down Expand Up @@ -216,7 +207,7 @@ pub fn start_account_streaming_tasks(

async fn create_connection(
grpc_config: &GrpcSourceConfig,
) -> GeyserGrpcClientResult<GeyserGrpcClient<impl Interceptor + Sized>> {
) -> anyhow::Result<GeyserGrpcClient<impl Interceptor + Sized>> {
connect_with_timeout_with_buffers(
grpc_config.grpc_addr.clone(),
grpc_config.grpc_x_token.clone(),
Expand All @@ -230,6 +221,7 @@ async fn create_connection(
},
)
.await
.map_err(|e| anyhow!("Failed to connect to grpc source: {e:?}"))
}

pub fn create_grpc_account_streaming(
Expand Down
139 changes: 6 additions & 133 deletions cluster-endpoints/src/grpc_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@ use crate::grpc::grpc_accounts_streaming::create_grpc_account_streaming;
use crate::grpc_multiplex::{
create_grpc_multiplex_blocks_subscription, create_grpc_multiplex_processed_slots_subscription,
};
use anyhow::Context;
use futures::StreamExt;
use geyser_grpc_connector::yellowstone_grpc_util::{
connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig,
};
use geyser_grpc_connector::GrpcSourceConfig;
use itertools::Itertools;
use log::trace;
Expand All @@ -21,7 +16,6 @@ use solana_lite_rpc_core::{
use solana_sdk::program_utils::limited_deserialize;
use solana_sdk::vote::instruction::VoteInstruction;
use solana_sdk::{
borsh0_10::try_from_slice_unchecked,
commitment_config::CommitmentConfig,
compute_budget::{self, ComputeBudgetInstruction},
hash::Hash,
Expand All @@ -36,13 +30,9 @@ use solana_sdk::{
};
use solana_transaction_status::{Reward, RewardType};
use std::cell::OnceCell;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, Notify};
use tokio::sync::Notify;
use tracing::trace_span;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocks};

use crate::rpc_polling::vote_accounts_and_cluster_info_polling::{
poll_cluster_info, poll_vote_accounts,
Expand Down Expand Up @@ -216,18 +206,16 @@ pub fn from_grpc_block_update(

fn map_compute_budget_instructions(message: &VersionedMessage) -> (Option<u32>, Option<u64>) {
let cu_requested_cell: OnceCell<u32> = OnceCell::new();
let legacy_cu_requested_cell: OnceCell<u32> = OnceCell::new();

let prioritization_fees_cell: OnceCell<u64> = OnceCell::new();
let legacy_prio_fees_cell: OnceCell<u64> = OnceCell::new();

for compute_budget_ins in message.instructions().iter().filter(|instruction| {
instruction
.program_id(message.static_account_keys())
.eq(&compute_budget::id())
}) {
if let Ok(budget_ins) =
try_from_slice_unchecked::<ComputeBudgetInstruction>(compute_budget_ins.data.as_slice())
if let Ok(budget_ins) = solana_sdk::borsh1::try_from_slice_unchecked::<
ComputeBudgetInstruction,
>(compute_budget_ins.data.as_slice())
{
match budget_ins {
// aka cu requested
Expand All @@ -242,133 +230,18 @@ fn map_compute_budget_instructions(message: &VersionedMessage) -> (Option<u32>,
.set(price)
.expect("prioritization_fees must be set only once");
}
// legacy
ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
} => {
let _ = legacy_cu_requested_cell.set(units);
if additional_fee > 0 {
let _ = legacy_prio_fees_cell.set(((units * 1000) / additional_fee) as u64);
};
}
_ => {
trace!("skip compute budget instruction");
}
}
}
}

let cu_requested = cu_requested_cell
.get()
.or(legacy_cu_requested_cell.get())
.cloned();
let prioritization_fees = prioritization_fees_cell
.get()
.or(legacy_prio_fees_cell.get())
.cloned();
let cu_requested = cu_requested_cell.get().cloned();
let prioritization_fees = prioritization_fees_cell.get().cloned();
(cu_requested, prioritization_fees)
}

pub fn create_block_processing_task(
grpc_addr: String,
grpc_x_token: Option<String>,
block_sx: tokio::sync::mpsc::Sender<SubscribeUpdateBlock>,
commitment_level: CommitmentLevel,
mut exit_notify: broadcast::Receiver<()>,
) -> AnyhowJoinHandle {
tokio::spawn(async move {
'main_loop: loop {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"block_client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);

// connect to grpc
let mut client = connect_with_timeout_with_buffers(
grpc_addr.clone(),
grpc_x_token.clone(),
None,
Some(Duration::from_secs(10)),
Some(Duration::from_secs(10)),
GeyserGrpcClientBufferConfig {
buffer_size: Some(65536),
conn_window: Some(5242880),
stream_window: Some(4194304),
},
)
.await?;
let mut stream = tokio::select! {
res = client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
blocks_subs,
Default::default(),
Some(commitment_level),
Default::default(),
None,
) => {
res?
},
_ = exit_notify.recv() => {
break;
}
};

loop {
tokio::select! {
message = stream.next() => {
let Some(Ok(message)) = message else {
break;
};

let Some(update) = message.update_oneof else {
continue;
};

match update {
UpdateOneof::Block(block) => {
log::trace!(
"received block, hash: {} slot: {}",
block.blockhash,
block.slot
);
block_sx
.send(block)
.await
.context("Problem sending on block channel")?;
}
UpdateOneof::Ping(_) => {
log::trace!("GRPC Ping");
}
_ => {
log::trace!("unknown GRPC notification");
}
};
},
_ = exit_notify.recv() => {
break 'main_loop;
}
}
}
drop(stream);
drop(client);
log::error!("Grpc block subscription broken (resubscribing)");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Ok(())
})
}

pub fn create_grpc_subscription(
rpc_client: Arc<RpcClient>,
grpc_sources: Vec<GrpcSourceConfig>,
Expand Down
43 changes: 3 additions & 40 deletions cluster-endpoints/src/rpc_polling/poll_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use solana_lite_rpc_core::{
AnyhowJoinHandle,
};
use solana_rpc_client_api::config::RpcBlockConfig;
use solana_sdk::borsh0_10::try_from_slice_unchecked;
use solana_sdk::borsh1::try_from_slice_unchecked;
use solana_sdk::compute_budget::ComputeBudgetInstruction;
use solana_sdk::program_utils::limited_deserialize;
use solana_sdk::reward_type::RewardType;
Expand Down Expand Up @@ -222,22 +222,7 @@ pub fn from_ui_block(
_ => None,
};

let legacy_compute_budget = tx.message.instructions().iter().find_map(|i| {
if i.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
}) = try_from_slice_unchecked(i.data.as_slice())
{
return Some((units, additional_fee));
}
}
None
});

let mut cu_requested = tx.message.instructions().iter().find_map(|i| {
let cu_requested = tx.message.instructions().iter().find_map(|i| {
if i.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
Expand All @@ -250,7 +235,7 @@ pub fn from_ui_block(
None
});

let mut prioritization_fees = tx.message.instructions().iter().find_map(|i| {
let prioritization_fees = tx.message.instructions().iter().find_map(|i| {
if i.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
Expand All @@ -264,13 +249,6 @@ pub fn from_ui_block(
None
});

if let Some((units, additional_fee)) = legacy_compute_budget {
cu_requested = Some(units);
if additional_fee > 0 {
prioritization_fees = Some(calc_prioritization_fees(units, additional_fee))
}
};

let blockhash = tx.message.recent_blockhash();

let is_vote_transaction = tx.message.instructions().iter().any(|i| {
Expand Down Expand Up @@ -348,18 +326,3 @@ fn map_block_info(produced_block: &ProducedBlock) -> BlockInfo {
block_time: produced_block.block_time,
}
}

#[inline]
fn calc_prioritization_fees(units: u32, additional_fee: u32) -> u64 {
(units as u64 * 1000) / additional_fee as u64
}

#[test]
fn overflow_u32() {
// value high enough to overflow u32 if multiplied by 1000
let units: u32 = 4_000_000_000;
let additional_fee: u32 = 100;
let prioritization_fees: u64 = calc_prioritization_fees(units, additional_fee);

assert_eq!(40_000_000_000, prioritization_fees);
}
Loading
Loading