From 6cf095f3f8bf752d69d4e8f33094ab57693b6f66 Mon Sep 17 00:00:00 2001 From: Cappy Ishihara Date: Sat, 7 Dec 2024 02:09:53 +0700 Subject: [PATCH] add event type counter for AT Firehose events --- skystreamer-prometheus-exporter/src/main.rs | 38 +++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/skystreamer-prometheus-exporter/src/main.rs b/skystreamer-prometheus-exporter/src/main.rs index b060d97..abb3493 100644 --- a/skystreamer-prometheus-exporter/src/main.rs +++ b/skystreamer-prometheus-exporter/src/main.rs @@ -3,7 +3,10 @@ mod util; use color_eyre::Result; use futures::StreamExt; use posts::PostsRegistry; -use prometheus_exporter::{self, prometheus::register_int_counter}; +use prometheus_exporter::{ + self, + prometheus::{register_int_counter, register_int_counter_vec}, +}; use skystreamer::{stream::EventStream, types::commit::Record, RepoSubscription}; use tracing::level_filters::LevelFilter; use tracing_subscriber::EnvFilter; @@ -52,6 +55,12 @@ async fn main() -> Result<()> { "Total number of events from the AT Firehose" )?; + let type_counter = register_int_counter_vec!( + "skystreamer_atproto_event_typed", + "Total number of events from the AT Firehose", + &["type"] + )?; + // const MAX_SAMPLE_SIZE: usize = 10000; let subscription = RepoSubscription::new("bsky.network") @@ -82,16 +91,41 @@ async fn main() -> Result<()> { primary_counter.inc(); let posts_registry = posts.clone(); + let type_counter = type_counter.clone(); tokio::spawn(async move { match record { Record::Post(post) => { - let post = post.clone(); + type_counter.with_label_values(&["post"]).inc(); tokio::spawn(async move { let mut posts_registry = posts_registry.lock().unwrap(); posts_registry.handle_post(&post)?; Ok::<(), color_eyre::eyre::Report>(()) }); } + Record::Block(_) => { + type_counter.with_label_values(&["block"]).inc(); + // todo + } + Record::Like(_) => { + type_counter.with_label_values(&["like"]).inc(); + // todo + } + Record::Follow(_) => { + type_counter.with_label_values(&["follow"]).inc(); + // todo + } + Record::Repost(_) => { + type_counter.with_label_values(&["repost"]).inc(); + // todo + } + Record::ListItem(_) => { + type_counter.with_label_values(&["list_item"]).inc(); + // todo + } + Record::Profile(_) => { + type_counter.with_label_values(&["profile"]).inc(); + // todo + } _ => { // todo }