diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f58f20..b9ab22e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,5 +26,6 @@ The minor version will be incremented upon a breaking change and the patch versi - frontend: init ([#8](https://github.com/solana-stream-solutions/solfees/pull/8)) - geyser: use process_compute_budget_instructions ([#15](https://github.com/solana-stream-solutions/solfees/pull/15)) - api: improve parallelism ([#16](https://github.com/solana-stream-solutions/solfees/pull/16)) +- geyser: do not stream outdated data ([#17](https://github.com/solana-stream-solutions/solfees/pull/17)) ### Breaking diff --git a/solfees-be/config-grpc2redis.yml b/solfees-be/config-grpc2redis.yml index 798134c..e5bbf17 100644 --- a/solfees-be/config-grpc2redis.yml +++ b/solfees-be/config-grpc2redis.yml @@ -10,6 +10,7 @@ grpc: redis: endpoint: redis://127.0.0.1:6379/ + slot_finalized: solfees:finalized stream_key: solfees:events # increate for production # 6_000 / 2 / 4 / 2.5 / 60 = 5min (2 producers, 4 events per slot, 2.5 slots per sec) diff --git a/solfees-be/src/bin/solfees-grpc2redis.rs b/solfees-be/src/bin/solfees-grpc2redis.rs index 3aa0510..9aad8b9 100644 --- a/solfees-be/src/bin/solfees-grpc2redis.rs +++ b/solfees-be/src/bin/solfees-grpc2redis.rs @@ -6,7 +6,7 @@ use { solfees_be::{ cli, config::ConfigGrpc2Redis as Config, - grpc_geyser::{self, GeyserMessage}, + grpc_geyser::{self, CommitmentLevel, GeyserMessage}, metrics::grpc2redis as metrics, rpc_server, schedule::LeaderScheduleRpc, @@ -67,9 +67,8 @@ async fn main2(config: Config) -> anyhow::Result<()> { let sigint = SignalKind::interrupt(); let sigterm = SignalKind::terminate(); + let mut redis_finalized_slot = 0u64; loop { - let mut pipe = redis::pipe(); - let mut messages = tokio::select! { signal = shutdown_rx.recv() => { match signal { @@ -98,7 +97,7 @@ async fn main2(config: Config) -> anyhow::Result<()> { break; }; - let _: () = pipe.cmd("HSET") + let _: () = redis::pipe().cmd("HSET") .arg(&config.redis.epochs_key) .arg(epoch) .arg(bincode::serialize(&schedule).context("failed to serialize leader schedule")?) @@ -117,6 +116,52 @@ async fn main2(config: Config) -> anyhow::Result<()> { messages.push(maybe_message?); } + if let Some(finalized_slot) = messages + .iter() + .filter_map(|message| { + if let GeyserMessage::Status { + slot, + commitment: CommitmentLevel::Finalized, + } = message + { + Some(slot) + } else { + None + } + }) + .max() + { + redis_finalized_slot = redis::cmd("EVAL") + .arg( + r#" +-- redis.log(redis.LOG_WARNING, "hi"); +local new = tonumber(ARGV[1]) +local current = tonumber(redis.call("GET", KEYS[1])); +if current == nil or current < new then + redis.call("SET", KEYS[1], ARGV[1]); + return new; +else + return current; +end +"#, + ) + .arg(1) + .arg(&config.redis.slot_finalized) + .arg(finalized_slot) + .query_async(&mut connection) + .await + .context("failed to get finalized slot from Redis")?; + } + + let messages = messages + .into_iter() + .filter(|msg| msg.slot() >= redis_finalized_slot) + .collect::>(); + if messages.is_empty() { + continue; + } + + let mut pipe = redis::pipe(); for message in messages.iter() { pipe.cmd("XADD") .arg(&config.redis.stream_key) diff --git a/solfees-be/src/config.rs b/solfees-be/src/config.rs index 11be15c..4edff6c 100644 --- a/solfees-be/src/config.rs +++ b/solfees-be/src/config.rs @@ -81,6 +81,7 @@ impl Default for ConfigGrpc { #[serde(deny_unknown_fields, default)] pub struct ConfigRedisPublisher { pub endpoint: String, + pub slot_finalized: String, pub stream_key: String, pub stream_maxlen: u64, pub stream_field_key: String, @@ -91,6 +92,7 @@ impl Default for ConfigRedisPublisher { fn default() -> Self { Self { endpoint: "redis://127.0.0.1:6379/".to_owned(), + slot_finalized: "solfees:finalized".to_owned(), stream_key: "solfees:events".to_owned(), stream_maxlen: 15 * 60 * 3 * 4, // ~15min (2.5 slots per sec, 4 events per slot) stream_field_key: "message".to_owned(), diff --git a/solfees-be/src/grpc_geyser.rs b/solfees-be/src/grpc_geyser.rs index 362012d..5d1dc23 100644 --- a/solfees-be/src/grpc_geyser.rs +++ b/solfees-be/src/grpc_geyser.rs @@ -267,6 +267,13 @@ pub enum GeyserMessage { } impl GeyserMessage { + pub const fn slot(&self) -> Slot { + *match self { + Self::Status { slot, .. } => slot, + Self::Slot { slot, .. } => slot, + } + } + fn build_block(leader: Option, block_info: BlockInfo) -> anyhow::Result { let Some(meta) = block_info.meta else { anyhow::bail!("failed to get block meta");