diff --git a/src/stream.rs b/src/stream.rs index b19d001..a391e57 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -79,17 +79,29 @@ async fn process_sol_notifications( mut rx: UnboundedReceiver, ) -> eyre::Result<()> { let sol_fetcher = SolFetcher::new(SOL_RPC_URL); - let schema = Arc::new(Transaction::get_arrow_scheme()); while let Some(slot) = rx.recv().await { trace!("--- Received slot number: {}", slot); - let txs = sol_fetcher.fetch_transactions(slot).await?; - let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder()?; - decoder.serialize(&txs)?; - let batch = decoder - .flush()? - .unwrap_or(RecordBatch::new_empty(schema.clone())); + let batch = sol_fetcher.fetch_transactions_as_batch(slot).await?; //NOTE minizie the lock scope let tx_store = &mut streamer.write().await.tx_store; + //NOTE: corner case#1 + // may have init slot gap in a very small possibility, just check once + let cur_slot = tx_store.current_slot; + if tx_store.init_slot == cur_slot { + if slot > (cur_slot + 1) { + for slot in (cur_slot + 1)..slot { + log::debug!( + "--- init slot gap backfill, init_slot: {}, insert slot: {}", + cur_slot, + slot + ); + let batch = sol_fetcher.fetch_transactions_as_batch(slot).await?; + tx_store.append_batch(batch); + } + } + } + tx_store.current_slot = slot; + tx_store.append_batch(batch); } trace!("??? process_sol_notifications exited.");