Skip to content

Commit

Permalink
Solving issue of finalized meta after processed block (#365)
Browse files Browse the repository at this point in the history
* Solving issue of finalized meta after processed block

* Fixing the broken test
  • Loading branch information
godmodegalactus authored Mar 22, 2024
1 parent c687bf5 commit c09700f
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
4 changes: 2 additions & 2 deletions bench/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ fn transaction_size_small() {
let rand_string = random_strings.first().unwrap();
let tx = BenchHelper::create_memo_tx_small(rand_string, &payer_keypair, blockhash);

assert_eq!(bincode::serialized_size(&tx).unwrap(), 179);
assert_eq!(bincode::serialized_size(&tx).unwrap(), 231);
}

#[test]
Expand All @@ -172,5 +172,5 @@ fn transaction_size_large() {
let rand_string = random_strings.first().unwrap();
let tx = BenchHelper::create_memo_tx_large(rand_string, &payer_keypair, blockhash);

assert_eq!(bincode::serialized_size(&tx).unwrap(), 1186);
assert_eq!(bincode::serialized_size(&tx).unwrap(), 1230);
}
8 changes: 8 additions & 0 deletions cluster-endpoints/src/grpc_multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
let mut cleanup_without_confirmed_recv_blocks_meta: u32 = 0;
let mut cleanup_without_finalized_recv_blocks_meta: u32 = 0;
let mut confirmed_block_not_yet_processed = HashSet::<String>::new();
let mut finalized_block_not_yet_processed = HashSet::<String>::new();

// start logging errors when we recieve first finalized block
let mut startup_completed = false;
Expand All @@ -182,6 +183,11 @@ pub fn create_grpc_multiplex_blocks_subscription(
warn!("produced block channel has no receivers {e:?}");
}
}
if finalized_block_not_yet_processed.remove(&processed_block.blockhash) {
if let Err(e) = producedblock_sender.send(processed_block.to_finalized_block()) {
warn!("produced block channel has no receivers {e:?}");
}
}
recent_processed_blocks.insert(processed_block.blockhash.clone(), processed_block);
},
meta_confirmed = confirmed_blockmeta_stream.next() => {
Expand All @@ -206,6 +212,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
let finalized_block = cached_processed_block.to_finalized_block();
last_finalized_slot = finalized_block.slot;
startup_completed = true;
log::info!("sending finalized block");
debug!("got finalized blockmeta {} with blockhash {}",
finalized_block.slot, finalized_block.blockhash.clone());
if let Err(e) = producedblock_sender.send(finalized_block) {
Expand All @@ -214,6 +221,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
} else if startup_completed {
// this warning is ok for first few blocks when we start lrpc
log::warn!("finalized block meta received for blockhash {} which was never seen or already emitted", blockhash);
finalized_block_not_yet_processed.insert(blockhash);
}
},
_ = cleanup_tick.tick() => {
Expand Down

0 comments on commit c09700f

Please sign in to comment.