Skip to content

Commit

Permalink
add event type counter for AT Firehose events
Browse files Browse the repository at this point in the history
  • Loading branch information
korewaChino committed Dec 6, 2024
1 parent b3a5da8 commit 6cf095f
Showing 1 changed file with 36 additions and 2 deletions.
38 changes: 36 additions & 2 deletions skystreamer-prometheus-exporter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ mod util;
use color_eyre::Result;
use futures::StreamExt;
use posts::PostsRegistry;
use prometheus_exporter::{self, prometheus::register_int_counter};
use prometheus_exporter::{
self,
prometheus::{register_int_counter, register_int_counter_vec},
};
use skystreamer::{stream::EventStream, types::commit::Record, RepoSubscription};
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;
Expand Down Expand Up @@ -52,6 +55,12 @@ async fn main() -> Result<()> {
"Total number of events from the AT Firehose"
)?;

let type_counter = register_int_counter_vec!(
"skystreamer_atproto_event_typed",
"Total number of events from the AT Firehose",
&["type"]
)?;

// const MAX_SAMPLE_SIZE: usize = 10000;

let subscription = RepoSubscription::new("bsky.network")
Expand Down Expand Up @@ -82,16 +91,41 @@ async fn main() -> Result<()> {
primary_counter.inc();

let posts_registry = posts.clone();
let type_counter = type_counter.clone();
tokio::spawn(async move {
match record {
Record::Post(post) => {
let post = post.clone();
type_counter.with_label_values(&["post"]).inc();
tokio::spawn(async move {
let mut posts_registry = posts_registry.lock().unwrap();
posts_registry.handle_post(&post)?;
Ok::<(), color_eyre::eyre::Report>(())
});
}
Record::Block(_) => {
type_counter.with_label_values(&["block"]).inc();
// todo
}
Record::Like(_) => {
type_counter.with_label_values(&["like"]).inc();
// todo
}
Record::Follow(_) => {
type_counter.with_label_values(&["follow"]).inc();
// todo
}
Record::Repost(_) => {
type_counter.with_label_values(&["repost"]).inc();
// todo
}
Record::ListItem(_) => {
type_counter.with_label_values(&["list_item"]).inc();
// todo
}
Record::Profile(_) => {
type_counter.with_label_values(&["profile"]).inc();
// todo
}
_ => {
// todo
}
Expand Down

0 comments on commit 6cf095f

Please sign in to comment.