-
Notifications
You must be signed in to change notification settings - Fork 239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add tpu-client-next to SendTransactionService
#3444
Changes from all commits
571c8da
d244560
b8b1df6
4aeb142
17f66fd
39cd11e
99c2ef0
caeae5f
02bbf65
7b672b0
c9c60cc
580312a
708a24b
1cac5c2
b1963e3
08ba2b1
f5c21d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -81,8 +81,10 @@ use { | |
poh_recorder::PohRecorder, | ||
poh_service::{self, PohService}, | ||
}, | ||
solana_quic_client::quic_client::RUNTIME, | ||
solana_rayon_threadlimit::{get_max_thread_count, get_thread_count}, | ||
solana_rpc::{ | ||
cluster_tpu_info::ClusterTpuInfo, | ||
max_slots::MaxSlots, | ||
optimistically_confirmed_bank_tracker::{ | ||
BankNotificationSenderConfig, OptimisticallyConfirmedBank, | ||
|
@@ -124,7 +126,10 @@ use { | |
signature::{Keypair, Signer}, | ||
timing::timestamp, | ||
}, | ||
solana_send_transaction_service::send_transaction_service, | ||
solana_send_transaction_service::{ | ||
send_transaction_service, | ||
transaction_client::{spawn_tpu_client_send_txs, ConnectionCacheClient}, | ||
}, | ||
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes}, | ||
solana_turbine::{self, broadcast_stage::BroadcastStageType}, | ||
solana_unified_scheduler_pool::DefaultSchedulerPool, | ||
|
@@ -287,6 +292,7 @@ pub struct ValidatorConfig { | |
pub replay_transactions_threads: NonZeroUsize, | ||
pub tvu_shred_sigverify_threads: NonZeroUsize, | ||
pub delay_leader_block_for_pending_fork: bool, | ||
pub use_tpu_client_next: bool, | ||
} | ||
|
||
impl Default for ValidatorConfig { | ||
|
@@ -360,6 +366,7 @@ impl Default for ValidatorConfig { | |
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"), | ||
tvu_shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"), | ||
delay_leader_block_for_pending_fork: false, | ||
use_tpu_client_next: false, | ||
} | ||
} | ||
} | ||
|
@@ -991,6 +998,8 @@ impl Validator { | |
|
||
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); | ||
|
||
// ConnectionCache might be used for JsonRpc and for Forwarding. Since the later is not migrated yet to the tpu-client-next, | ||
// create ConnectionCache regardless of config.use_tpu_client_next for now | ||
let connection_cache = match use_quic { | ||
true => { | ||
let connection_cache = ConnectionCache::new_with_client_options( | ||
|
@@ -1044,31 +1053,82 @@ impl Validator { | |
None | ||
}; | ||
|
||
let json_rpc_service = JsonRpcService::new( | ||
rpc_addr, | ||
config.rpc_config.clone(), | ||
Some(config.snapshot_config.clone()), | ||
bank_forks.clone(), | ||
block_commitment_cache.clone(), | ||
blockstore.clone(), | ||
cluster_info.clone(), | ||
Some(poh_recorder.clone()), | ||
genesis_config.hash(), | ||
ledger_path, | ||
config.validator_exit.clone(), | ||
exit.clone(), | ||
rpc_override_health_check.clone(), | ||
startup_verification_complete, | ||
optimistically_confirmed_bank.clone(), | ||
config.send_transaction_service_config.clone(), | ||
max_slots.clone(), | ||
leader_schedule_cache.clone(), | ||
connection_cache.clone(), | ||
max_complete_transaction_status_slot, | ||
max_complete_rewards_slot, | ||
prioritization_fee_cache.clone(), | ||
) | ||
.map_err(ValidatorError::Other)?; | ||
let leader_info = ClusterTpuInfo::new(cluster_info.clone(), poh_recorder.clone()); | ||
// TODO(klykov): consider using Box<dyn TransactionClient> to make this shorter? | ||
let json_rpc_service = if config.use_tpu_client_next { | ||
let my_tpu_address = cluster_info | ||
.my_contact_info() | ||
.tpu(Protocol::QUIC) | ||
.map_err(|err| ValidatorError::Other(format!("{err}")))?; | ||
let client = spawn_tpu_client_send_txs( | ||
&*RUNTIME, // use the same runtime as ConnectionCache | ||
my_tpu_address, | ||
config.send_transaction_service_config.tpu_peers.clone(), | ||
Some(leader_info), | ||
config.send_transaction_service_config.leader_forward_count, | ||
Some((*identity_keypair).insecure_clone()), | ||
); | ||
JsonRpcService::new( | ||
rpc_addr, | ||
config.rpc_config.clone(), | ||
Some(config.snapshot_config.clone()), | ||
bank_forks.clone(), | ||
block_commitment_cache.clone(), | ||
blockstore.clone(), | ||
cluster_info.clone(), | ||
genesis_config.hash(), | ||
ledger_path, | ||
config.validator_exit.clone(), | ||
exit.clone(), | ||
rpc_override_health_check.clone(), | ||
startup_verification_complete, | ||
optimistically_confirmed_bank.clone(), | ||
config.send_transaction_service_config.clone(), | ||
max_slots.clone(), | ||
leader_schedule_cache.clone(), | ||
client, | ||
max_complete_transaction_status_slot, | ||
max_complete_rewards_slot, | ||
prioritization_fee_cache.clone(), | ||
) | ||
.map_err(ValidatorError::Other)? | ||
} else { | ||
let my_tpu_address = cluster_info | ||
.my_contact_info() | ||
.tpu(connection_cache.protocol()) | ||
.map_err(|err| ValidatorError::Other(format!("{err}")))?; | ||
let client = ConnectionCacheClient::new( | ||
connection_cache.clone(), | ||
my_tpu_address, | ||
config.send_transaction_service_config.tpu_peers.clone(), | ||
Some(leader_info), | ||
config.send_transaction_service_config.leader_forward_count, | ||
); | ||
JsonRpcService::new( | ||
rpc_addr, | ||
config.rpc_config.clone(), | ||
Some(config.snapshot_config.clone()), | ||
bank_forks.clone(), | ||
block_commitment_cache.clone(), | ||
blockstore.clone(), | ||
cluster_info.clone(), | ||
genesis_config.hash(), | ||
ledger_path, | ||
config.validator_exit.clone(), | ||
exit.clone(), | ||
rpc_override_health_check.clone(), | ||
startup_verification_complete, | ||
optimistically_confirmed_bank.clone(), | ||
config.send_transaction_service_config.clone(), | ||
max_slots.clone(), | ||
leader_schedule_cache.clone(), | ||
client, | ||
max_complete_transaction_status_slot, | ||
max_complete_rewards_slot, | ||
prioritization_fee_cache.clone(), | ||
) | ||
.map_err(ValidatorError::Other)? | ||
}; | ||
|
||
let pubsub_service = if !config.rpc_config.full_api { | ||
None | ||
|
@@ -1416,7 +1476,7 @@ impl Validator { | |
config.wait_to_vote_slot, | ||
accounts_background_request_sender.clone(), | ||
config.runtime_config.log_messages_bytes_limit, | ||
json_rpc_service.is_some().then_some(&connection_cache), // for the cache warmer only used for STS for RPC service | ||
(json_rpc_service.is_some() && config.use_tpu_client_next).then_some(&connection_cache), // for the cache warmer only used for STS for RPC service | ||
&prioritization_fee_cache, | ||
banking_tracer.clone(), | ||
turbine_quic_endpoint_sender.clone(), | ||
|
@@ -1508,7 +1568,11 @@ impl Validator { | |
); | ||
|
||
*start_progress.write().unwrap() = ValidatorStartProgress::Running; | ||
key_notifies.push(connection_cache); | ||
if config.use_tpu_client_next { | ||
unimplemented!(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is something I need to implement still |
||
} else { | ||
key_notifies.push(connection_cache); | ||
} | ||
|
||
*admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit { | ||
bank_forks: bank_forks.clone(), | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,7 +68,7 @@ impl AsyncTaskSemaphore { | |
lazy_static! { | ||
static ref ASYNC_TASK_SEMAPHORE: AsyncTaskSemaphore = | ||
AsyncTaskSemaphore::new(MAX_OUTSTANDING_TASK); | ||
static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread() | ||
pub static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea is to use the same runtime in |
||
.thread_name("solQuicClientRt") | ||
.enable_all() | ||
.build() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
runtime is passed here