diff --git a/src/fetch.rs b/src/fetch.rs index 6ca87ab..ed573da 100644 --- a/src/fetch.rs +++ b/src/fetch.rs @@ -21,9 +21,9 @@ pub struct SolFetcher { pub(crate) struct CurrentEpoch(EpochInfo); impl CurrentEpoch { - pub(crate) fn current_epoch(&self) -> u64 { - self.0.epoch - } + // pub(crate) fn current_epoch(&self) -> u64 { + // self.0.epoch + // } pub(crate) fn start_slot(&self) -> u64 { self.0.absolute_slot - self.0.slot_index @@ -32,6 +32,10 @@ impl CurrentEpoch { pub(crate) fn current_slot(&self) -> u64 { self.0.absolute_slot } + + pub(crate) fn start_slot_next_epoch(&self) -> u64 { + self.start_slot() + self.0.slots_in_epoch + } } impl SolFetcher { diff --git a/src/main.rs b/src/main.rs index 57ac2fd..273fe75 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,11 +43,12 @@ async fn main() -> eyre::Result<()> { let s = streamer.clone(); info!("To start SolAgg streamer..."); let fut_streamer = tokio::spawn(async move { start_streamer(s).await }); - let fut_api_srv = tokio::spawn(async move { api_server(streamer).await }); + let s = streamer.clone(); + let fut_api_srv = tokio::spawn(async move { api_server(s).await }); let res = tokio::try_join!(fut_streamer, fut_api_srv); match res { Ok((res0, res1)) => { - log::trace!("solagg normal exit; res0 = {:?}, res1 = {:?}", res0, res1); + log::trace!("solagg normal exit; res0 = {:?}, res1 = {:?}", res0, res1,); } Err(err) => { log::error!("solagg failed; error = {}", err); diff --git a/src/store.rs b/src/store.rs index 02cefeb..55efaca 100644 --- a/src/store.rs +++ b/src/store.rs @@ -12,8 +12,9 @@ const MOCKED_EPOCH_INIT_LEN: u64 = 25; //NOTE: 30 rps for ankr #[derive(Debug)] pub(crate) struct TransactionStore { pub(crate) tx_batches: Vec, - pub(crate) current_epoch: u64, + pub(crate) init_slot: u64, pub(crate) current_slot: u64, + pub(crate) start_slot_next_epoch: u64, mocked: bool, } @@ -21,15 +22,14 @@ impl TransactionStore { pub(crate) fn new(mocked: bool) -> eyre::Result { let sol_fetcher = SolFetcher::new(SOL_RPC_URL); let cur_epoch = sol_fetcher.get_current_epoch()?; - let current_epoch = cur_epoch.current_epoch(); - let current_slot = cur_epoch.current_slot(); + let init_slot = cur_epoch.current_slot(); let start_slot = if mocked { - current_slot - MOCKED_EPOCH_INIT_LEN + init_slot - MOCKED_EPOCH_INIT_LEN } else { cur_epoch.start_slot() }; - let range: Vec = (start_slot..=current_slot).collect(); + let range: Vec = (start_slot..=init_slot).collect(); let mut tx_batches = Vec::with_capacity(range.len()); let windows = range.chunks(RPS_LIMIT); trace!("slot windows: {:#?}", windows); @@ -52,15 +52,27 @@ impl TransactionStore { } timer = Instant::now(); } + let start_slot_next_epoch = cur_epoch.start_slot_next_epoch(); Ok(Self { tx_batches, - current_epoch, - current_slot, + start_slot_next_epoch, + init_slot, + current_slot: init_slot, mocked, }) } - pub(crate) fn append_batch(&mut self, batch: RecordBatch) { + pub(crate) fn append_batch(&mut self, batch: RecordBatch, slot: u64) { + //NOTE: corner case#2 handle epoch switch, onece per epoch + if slot == self.start_slot_next_epoch { + self.tx_batches.clear(); + } + if slot >= self.start_slot_next_epoch { + let sol_fetcher = SolFetcher::new(SOL_RPC_URL); + if let Ok(cur_epoch) = sol_fetcher.get_current_epoch() { + self.start_slot_next_epoch = cur_epoch.start_slot_next_epoch(); + } + } self.tx_batches.push(batch); } diff --git a/src/stream.rs b/src/stream.rs index a391e57..b3b1257 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,4 +1,3 @@ -use datafusion::arrow::{array::RecordBatch, json::ReaderBuilder}; use futures_util::{SinkExt, StreamExt}; use log::trace; use serde_json::json; @@ -11,7 +10,6 @@ use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; use crate::{ fetch::{SolFetcher, SOL_RPC_URL}, - parse::Transaction, store::TransactionStore, }; @@ -48,19 +46,6 @@ pub async fn start_streamer(streamer: TheadSafeStreamer) -> eyre::Result<()> { Ok(()) } -// pub(crate) async fn query( -// streamer: TheadSafeStreamer, -// sql: &str, -// table_name: &str, -// ) -> { -// Ok(streamer -// .read() -// .await -// .tx_store -// .query(sql, table_name) -// .await?) -// } - pub(crate) async fn query_to_json( streamer: TheadSafeStreamer, sql: &str, @@ -96,13 +81,13 @@ async fn process_sol_notifications( slot ); let batch = sol_fetcher.fetch_transactions_as_batch(slot).await?; - tx_store.append_batch(batch); + tx_store.append_batch(batch, slot); } } } tx_store.current_slot = slot; - tx_store.append_batch(batch); + tx_store.append_batch(batch, slot); } trace!("??? process_sol_notifications exited."); Ok(()) @@ -112,13 +97,13 @@ async fn watch_sol_rpc_ws(tx: UnboundedSender) -> eyre::Result<()> { let (ws_stream, _) = connect_async(SOL_RPC_WS).await?; let (mut write, mut read) = ws_stream.split(); - let block_sub_msg = json!({ + let slot_sub_msg = json!({ "jsonrpc": "2.0", "id": "1", "method": "slotSubscribe", }); - trace!("send block_sub_msg"); - write.send(Message::Text(block_sub_msg.to_string())).await?; + trace!("send slot_sub_msg"); + write.send(Message::Text(slot_sub_msg.to_string())).await?; trace!("Subscribed to transaction and account change notifications."); trace!("Watching for changes..."); @@ -143,6 +128,7 @@ async fn watch_sol_rpc_ws(tx: UnboundedSender) -> eyre::Result<()> { } } "accountNotification" => { + //TODO trace!("Account changed: {}", json["params"]["result"]["value"]); } _ => trace!("Received other notification: {}", text),