Skip to content

Commit

Permalink
refactor(main,stream,fetch,stream): fix corner case 2 - handle epoch …
Browse files Browse the repository at this point in the history
…switch
  • Loading branch information
mjzk committed Jul 17, 2024
1 parent 47c1d0c commit ba44190
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 33 deletions.
10 changes: 7 additions & 3 deletions src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ pub struct SolFetcher {
pub(crate) struct CurrentEpoch(EpochInfo);

impl CurrentEpoch {
pub(crate) fn current_epoch(&self) -> u64 {
self.0.epoch
}
// pub(crate) fn current_epoch(&self) -> u64 {
// self.0.epoch
// }

pub(crate) fn start_slot(&self) -> u64 {
self.0.absolute_slot - self.0.slot_index
Expand All @@ -32,6 +32,10 @@ impl CurrentEpoch {
pub(crate) fn current_slot(&self) -> u64 {
self.0.absolute_slot
}

pub(crate) fn start_slot_next_epoch(&self) -> u64 {
self.start_slot() + self.0.slots_in_epoch
}
}

impl SolFetcher {
Expand Down
5 changes: 3 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ async fn main() -> eyre::Result<()> {
let s = streamer.clone();
info!("To start SolAgg streamer...");
let fut_streamer = tokio::spawn(async move { start_streamer(s).await });
let fut_api_srv = tokio::spawn(async move { api_server(streamer).await });
let s = streamer.clone();
let fut_api_srv = tokio::spawn(async move { api_server(s).await });
let res = tokio::try_join!(fut_streamer, fut_api_srv);
match res {
Ok((res0, res1)) => {
log::trace!("solagg normal exit; res0 = {:?}, res1 = {:?}", res0, res1);
log::trace!("solagg normal exit; res0 = {:?}, res1 = {:?}", res0, res1,);
}
Err(err) => {
log::error!("solagg failed; error = {}", err);
Expand Down
28 changes: 20 additions & 8 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,24 @@ const MOCKED_EPOCH_INIT_LEN: u64 = 25; //NOTE: 30 rps for ankr
#[derive(Debug)]
pub(crate) struct TransactionStore {
pub(crate) tx_batches: Vec<RecordBatch>,
pub(crate) current_epoch: u64,
pub(crate) init_slot: u64,
pub(crate) current_slot: u64,
pub(crate) start_slot_next_epoch: u64,
mocked: bool,
}

impl TransactionStore {
pub(crate) fn new(mocked: bool) -> eyre::Result<Self> {
let sol_fetcher = SolFetcher::new(SOL_RPC_URL);
let cur_epoch = sol_fetcher.get_current_epoch()?;
let current_epoch = cur_epoch.current_epoch();
let current_slot = cur_epoch.current_slot();
let init_slot = cur_epoch.current_slot();
let start_slot = if mocked {
current_slot - MOCKED_EPOCH_INIT_LEN
init_slot - MOCKED_EPOCH_INIT_LEN
} else {
cur_epoch.start_slot()
};

let range: Vec<u64> = (start_slot..=current_slot).collect();
let range: Vec<u64> = (start_slot..=init_slot).collect();
let mut tx_batches = Vec::with_capacity(range.len());
let windows = range.chunks(RPS_LIMIT);
trace!("slot windows: {:#?}", windows);
Expand All @@ -52,15 +52,27 @@ impl TransactionStore {
}
timer = Instant::now();
}
let start_slot_next_epoch = cur_epoch.start_slot_next_epoch();
Ok(Self {
tx_batches,
current_epoch,
current_slot,
start_slot_next_epoch,
init_slot,
current_slot: init_slot,
mocked,
})
}

pub(crate) fn append_batch(&mut self, batch: RecordBatch) {
pub(crate) fn append_batch(&mut self, batch: RecordBatch, slot: u64) {
//NOTE: corner case#2 handle epoch switch, onece per epoch
if slot == self.start_slot_next_epoch {
self.tx_batches.clear();
}
if slot >= self.start_slot_next_epoch {
let sol_fetcher = SolFetcher::new(SOL_RPC_URL);
if let Ok(cur_epoch) = sol_fetcher.get_current_epoch() {
self.start_slot_next_epoch = cur_epoch.start_slot_next_epoch();
}
}
self.tx_batches.push(batch);
}

Expand Down
26 changes: 6 additions & 20 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use datafusion::arrow::{array::RecordBatch, json::ReaderBuilder};
use futures_util::{SinkExt, StreamExt};
use log::trace;
use serde_json::json;
Expand All @@ -11,7 +10,6 @@ use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};

use crate::{
fetch::{SolFetcher, SOL_RPC_URL},
parse::Transaction,
store::TransactionStore,
};

Expand Down Expand Up @@ -48,19 +46,6 @@ pub async fn start_streamer(streamer: TheadSafeStreamer) -> eyre::Result<()> {
Ok(())
}

// pub(crate) async fn query(
// streamer: TheadSafeStreamer,
// sql: &str,
// table_name: &str,
// ) -> {
// Ok(streamer
// .read()
// .await
// .tx_store
// .query(sql, table_name)
// .await?)
// }

pub(crate) async fn query_to_json(
streamer: TheadSafeStreamer,
sql: &str,
Expand Down Expand Up @@ -96,13 +81,13 @@ async fn process_sol_notifications(
slot
);
let batch = sol_fetcher.fetch_transactions_as_batch(slot).await?;
tx_store.append_batch(batch);
tx_store.append_batch(batch, slot);
}
}
}
tx_store.current_slot = slot;

tx_store.append_batch(batch);
tx_store.append_batch(batch, slot);
}
trace!("??? process_sol_notifications exited.");
Ok(())
Expand All @@ -112,13 +97,13 @@ async fn watch_sol_rpc_ws(tx: UnboundedSender<u64>) -> eyre::Result<()> {
let (ws_stream, _) = connect_async(SOL_RPC_WS).await?;
let (mut write, mut read) = ws_stream.split();

let block_sub_msg = json!({
let slot_sub_msg = json!({
"jsonrpc": "2.0",
"id": "1",
"method": "slotSubscribe",
});
trace!("send block_sub_msg");
write.send(Message::Text(block_sub_msg.to_string())).await?;
trace!("send slot_sub_msg");
write.send(Message::Text(slot_sub_msg.to_string())).await?;

trace!("Subscribed to transaction and account change notifications.");
trace!("Watching for changes...");
Expand All @@ -143,6 +128,7 @@ async fn watch_sol_rpc_ws(tx: UnboundedSender<u64>) -> eyre::Result<()> {
}
}
"accountNotification" => {
//TODO
trace!("Account changed: {}", json["params"]["result"]["value"]);
}
_ => trace!("Received other notification: {}", text),
Expand Down

0 comments on commit ba44190

Please sign in to comment.