Skip to content

Commit

Permalink
geyser: do not stream outdated data (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Nov 16, 2024
1 parent 02f68b9 commit f0ef8df
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ The minor version will be incremented upon a breaking change and the patch versi
- frontend: init ([#8](https://github.com/solana-stream-solutions/solfees/pull/8))
- geyser: use process_compute_budget_instructions ([#15](https://github.com/solana-stream-solutions/solfees/pull/15))
- api: improve parallelism ([#16](https://github.com/solana-stream-solutions/solfees/pull/16))
- geyser: do not stream outdated data ([#17](https://github.com/solana-stream-solutions/solfees/pull/17))

### Breaking
1 change: 1 addition & 0 deletions solfees-be/config-grpc2redis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ grpc:

redis:
endpoint: redis://127.0.0.1:6379/
slot_finalized: solfees:finalized
stream_key: solfees:events
# increate for production
# 6_000 / 2 / 4 / 2.5 / 60 = 5min (2 producers, 4 events per slot, 2.5 slots per sec)
Expand Down
53 changes: 49 additions & 4 deletions solfees-be/src/bin/solfees-grpc2redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use {
solfees_be::{
cli,
config::ConfigGrpc2Redis as Config,
grpc_geyser::{self, GeyserMessage},
grpc_geyser::{self, CommitmentLevel, GeyserMessage},
metrics::grpc2redis as metrics,
rpc_server,
schedule::LeaderScheduleRpc,
Expand Down Expand Up @@ -67,9 +67,8 @@ async fn main2(config: Config) -> anyhow::Result<()> {
let sigint = SignalKind::interrupt();
let sigterm = SignalKind::terminate();

let mut redis_finalized_slot = 0u64;
loop {
let mut pipe = redis::pipe();

let mut messages = tokio::select! {
signal = shutdown_rx.recv() => {
match signal {
Expand Down Expand Up @@ -98,7 +97,7 @@ async fn main2(config: Config) -> anyhow::Result<()> {
break;
};

let _: () = pipe.cmd("HSET")
let _: () = redis::pipe().cmd("HSET")
.arg(&config.redis.epochs_key)
.arg(epoch)
.arg(bincode::serialize(&schedule).context("failed to serialize leader schedule")?)
Expand All @@ -117,6 +116,52 @@ async fn main2(config: Config) -> anyhow::Result<()> {
messages.push(maybe_message?);
}

if let Some(finalized_slot) = messages
.iter()
.filter_map(|message| {
if let GeyserMessage::Status {
slot,
commitment: CommitmentLevel::Finalized,
} = message
{
Some(slot)
} else {
None
}
})
.max()
{
redis_finalized_slot = redis::cmd("EVAL")
.arg(
r#"
-- redis.log(redis.LOG_WARNING, "hi");
local new = tonumber(ARGV[1])
local current = tonumber(redis.call("GET", KEYS[1]));
if current == nil or current < new then
redis.call("SET", KEYS[1], ARGV[1]);
return new;
else
return current;
end
"#,
)
.arg(1)
.arg(&config.redis.slot_finalized)
.arg(finalized_slot)
.query_async(&mut connection)
.await
.context("failed to get finalized slot from Redis")?;
}

let messages = messages
.into_iter()
.filter(|msg| msg.slot() >= redis_finalized_slot)
.collect::<Vec<_>>();
if messages.is_empty() {
continue;
}

let mut pipe = redis::pipe();
for message in messages.iter() {
pipe.cmd("XADD")
.arg(&config.redis.stream_key)
Expand Down
2 changes: 2 additions & 0 deletions solfees-be/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ impl Default for ConfigGrpc {
#[serde(deny_unknown_fields, default)]
pub struct ConfigRedisPublisher {
pub endpoint: String,
pub slot_finalized: String,
pub stream_key: String,
pub stream_maxlen: u64,
pub stream_field_key: String,
Expand All @@ -91,6 +92,7 @@ impl Default for ConfigRedisPublisher {
fn default() -> Self {
Self {
endpoint: "redis://127.0.0.1:6379/".to_owned(),
slot_finalized: "solfees:finalized".to_owned(),
stream_key: "solfees:events".to_owned(),
stream_maxlen: 15 * 60 * 3 * 4, // ~15min (2.5 slots per sec, 4 events per slot)
stream_field_key: "message".to_owned(),
Expand Down
7 changes: 7 additions & 0 deletions solfees-be/src/grpc_geyser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,13 @@ pub enum GeyserMessage {
}

impl GeyserMessage {
pub const fn slot(&self) -> Slot {
*match self {
Self::Status { slot, .. } => slot,
Self::Slot { slot, .. } => slot,
}
}

fn build_block(leader: Option<Pubkey>, block_info: BlockInfo) -> anyhow::Result<GeyserMessage> {
let Some(meta) = block_info.meta else {
anyhow::bail!("failed to get block meta");
Expand Down

0 comments on commit f0ef8df

Please sign in to comment.