Skip to content

Commit

Permalink
refactor(stream): fix one corner case: handle possible inital slot gap
Browse files Browse the repository at this point in the history
  • Loading branch information
mjzk committed Jul 17, 2024
1 parent cc922c6 commit 47c1d0c
Showing 1 changed file with 19 additions and 7 deletions.
26 changes: 19 additions & 7 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,29 @@ async fn process_sol_notifications(
mut rx: UnboundedReceiver<u64>,
) -> eyre::Result<()> {
let sol_fetcher = SolFetcher::new(SOL_RPC_URL);
let schema = Arc::new(Transaction::get_arrow_scheme());
while let Some(slot) = rx.recv().await {
trace!("--- Received slot number: {}", slot);
let txs = sol_fetcher.fetch_transactions(slot).await?;
let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder()?;
decoder.serialize(&txs)?;
let batch = decoder
.flush()?
.unwrap_or(RecordBatch::new_empty(schema.clone()));
let batch = sol_fetcher.fetch_transactions_as_batch(slot).await?;
//NOTE minizie the lock scope
let tx_store = &mut streamer.write().await.tx_store;
//NOTE: corner case#1
// may have init slot gap in a very small possibility, just check once
let cur_slot = tx_store.current_slot;
if tx_store.init_slot == cur_slot {
if slot > (cur_slot + 1) {
for slot in (cur_slot + 1)..slot {
log::debug!(
"--- init slot gap backfill, init_slot: {}, insert slot: {}",
cur_slot,
slot
);
let batch = sol_fetcher.fetch_transactions_as_batch(slot).await?;
tx_store.append_batch(batch);
}
}
}
tx_store.current_slot = slot;

tx_store.append_batch(batch);
}
trace!("??? process_sol_notifications exited.");
Expand Down

0 comments on commit 47c1d0c

Please sign in to comment.