diff --git a/src/bin/main.rs b/src/bin/main.rs index a85c0e1..fbf3465 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -240,8 +240,16 @@ async fn latest_sink_timestamp(consumer: &StreamConsumer) -> anyhow::Result anyhow::Result { - let msg = IndexerFeesHourlyProtobuf::decode(msg.payload().context("missing payload")?)?; - Ok(msg.timestamp) + let payload = msg.payload().context("missing_payload")?; + match msg.topic() { + "gateway_client_fees_hourly" => { + Ok(ClientFeesHourlyProtobuf::decode(payload)?.timestamp) + } + "gateway_indexer_fees_hourly" => { + Ok(IndexerFeesHourlyProtobuf::decode(payload)?.timestamp) + } + topic => anyhow::bail!("unhandled topic: {topic}"), + } }) .collect::>>()? .into_iter()