Skip to content

Commit

Permalink
fix: handle all aggregation sink timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Nov 11, 2024
1 parent 72aa27d commit 6278089
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,16 @@ async fn latest_sink_timestamp(consumer: &StreamConsumer) -> anyhow::Result<Opti
let timestamp = latest_messages
.into_iter()
.map(|msg| -> anyhow::Result<i64> {
let msg = IndexerFeesHourlyProtobuf::decode(msg.payload().context("missing payload")?)?;
Ok(msg.timestamp)
let payload = msg.payload().context("missing_payload")?;
match msg.topic() {
"gateway_client_fees_hourly" => {
Ok(ClientFeesHourlyProtobuf::decode(payload)?.timestamp)
}
"gateway_indexer_fees_hourly" => {
Ok(IndexerFeesHourlyProtobuf::decode(payload)?.timestamp)
}
topic => anyhow::bail!("unhandled topic: {topic}"),
}
})
.collect::<anyhow::Result<Vec<i64>>>()?
.into_iter()
Expand Down

0 comments on commit 6278089

Please sign in to comment.