From 6a26b6c5739fb6eff928a86fe1a4418111a286bf Mon Sep 17 00:00:00 2001 From: Cappy Ishihara Date: Fri, 6 Dec 2024 00:16:55 +0700 Subject: [PATCH] Implement a stream timeout to reset the app after 30 seconds of inactivity. --- skystreamer-prometheus-exporter/src/main.rs | 284 +++++++++++--------- 1 file changed, 150 insertions(+), 134 deletions(-) diff --git a/skystreamer-prometheus-exporter/src/main.rs b/skystreamer-prometheus-exporter/src/main.rs index 517f2c9..84091da 100644 --- a/skystreamer-prometheus-exporter/src/main.rs +++ b/skystreamer-prometheus-exporter/src/main.rs @@ -157,143 +157,159 @@ async fn main() -> Result<()> { let stream = post_stream.stream().await?; futures::pin_mut!(stream); - // let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); - // interval.tick().await; - - // let mut last_tick = tokio::time::Instant::now(); - - while let Some(post) = stream.next().await { - // Total number of posts - primary_counter.inc(); - { - // Count the number of posts by label - for label in post.labels.iter() { - label_counter.with_label_values(&[label]).inc(); - } - - // Count the number of posts by tag - for tag in post.tags.iter() { - tags_counter.with_label_values(&[tag]).inc(); - } - } - - // Quote Posts - { - let is_quote = if let Some(embed) = post.embed.as_ref() { - matches!( - embed, - skystreamer::types::Embed::Record(_) - | skystreamer::types::Embed::RecordWithMedia(_, _) - ) - } else { - false - }; - - if is_quote { - quote_counter.with_label_values(&["true"]).inc(); - } - } - // End Quote Posts - - // Posts with media that have alt text - { - let has_alt_text = get_post_media(&post).iter().any(|media| match media { - skystreamer::types::Media::Image(i) => !i.alt.is_empty(), - skystreamer::types::Media::Video(v) => v.alt.is_some(), - }); - - if has_alt_text { - alt_text_counter.with_label_values(&["true"]).inc(); - }; - } - { - // Posts that are replies to other posts - let is_reply = post.reply.is_some(); - - if is_reply { - reply_counter.with_label_values(&["reply"]).inc(); - } - } - { - // Posts with media by type - let post_media = get_post_media(&post); - if !post_media.is_empty() { - // get the first media type, because images and videos are mutually exclusive - let media_type = match post_media.first().unwrap() { - skystreamer::types::Media::Image(_) => "image", - skystreamer::types::Media::Video(_) => "video", - }; - media_posts_counter.with_label_values(&[media_type]).inc(); - } - } - - // Posts by external media's domain name - { - let external_link = post.embed.and_then(|embed| match embed { - skystreamer::types::Embed::External(e) => Some(Url::parse(&e.uri)), - _ => None, - }); - - // get domain - let domain_name = external_link.and_then(|link| link.ok()).and_then(|url| { - url.domain().map(|domain| { - domain - .strip_prefix("www.") - .unwrap_or(domain) - .to_string() - .to_lowercase() - }) - }); - - if let Some(domain) = domain_name { - posts_external_links.with_label_values(&[&domain]).inc(); - } - } + // set a timer of 30 seconds + // if the stream doesn't return anything in 30 seconds, we'll just exit with an error + let timer = tokio::time::sleep(std::time::Duration::from_secs(30)); + tokio::pin!(timer); + + loop { + tokio::select! { + _ = &mut timer => { + tracing::error!("Stream timed out"); + return Err(color_eyre::eyre::eyre!("Stream timed out")); + }, + post = stream.next() => { + if let Some(post) = post { + // Reset the timer on receiving a post + timer.as_mut().reset(tokio::time::Instant::now() + std::time::Duration::from_secs(30)); + + // Total number of posts + primary_counter.inc(); + { + // Count the number of posts by label + for label in post.labels.iter() { + label_counter.with_label_values(&[label]).inc(); + } - { - // Languages of the posts - let langs = post - .language - .iter() - .map(|lang| { - // Let's normalize all the languages to its main language - - let processed_language = if lang.is_empty() { - "null".to_string() - } else if normalize_langs.unwrap_or(true) { - let l = handle_language(lang); - if l.is_none() { - tracing::warn!("Failed to normalize language: {}", lang); + // Count the number of posts by tag + for tag in post.tags.iter() { + tags_counter.with_label_values(&[tag]).inc(); + } + } + + // Quote Posts + { + let is_quote = if let Some(embed) = post.embed.as_ref() { + matches!( + embed, + skystreamer::types::Embed::Record(_) + | skystreamer::types::Embed::RecordWithMedia(_, _) + ) + } else { + false + }; + + if is_quote { + quote_counter.with_label_values(&["true"]).inc(); + } + } + // End Quote Posts + + // Posts with media that have alt text + { + let has_alt_text = get_post_media(&post).iter().any(|media| match media { + skystreamer::types::Media::Image(i) => !i.alt.is_empty(), + skystreamer::types::Media::Video(v) => v.alt.is_some(), + }); + + if has_alt_text { + alt_text_counter.with_label_values(&["true"]).inc(); + }; + } + { + // Posts that are replies to other posts + let is_reply = post.reply.is_some(); + + if is_reply { + reply_counter.with_label_values(&["reply"]).inc(); + } + } + + { + // Posts with media by type + let post_media = get_post_media(&post); + if (!post_media.is_empty()) { + // get the first media type, because images and videos are mutually exclusive + let media_type = match post_media.first().unwrap() { + skystreamer::types::Media::Image(_) => "image", + skystreamer::types::Media::Video(_) => "video", + }; + media_posts_counter.with_label_values(&[media_type]).inc(); + } + } + + // Posts by external media's domain name + { + let external_link = post.embed.and_then(|embed| match embed { + skystreamer::types::Embed::External(e) => Some(Url::parse(&e.uri)), + _ => None, + }); + + // get domain + let domain_name = external_link.and_then(|link| link.ok()).and_then(|url| { + url.domain().map(|domain| { + domain + .strip_prefix("www.") + .unwrap_or(domain) + .to_string() + .to_lowercase() + }) + }); + + if let Some(domain) = domain_name { + posts_external_links.with_label_values(&[&domain]).inc(); + } + } + + { + // Languages of the posts + let langs = post + .language + .iter() + .map(|lang| { + // Let's normalize all the languages to its main language + + let processed_language = if lang.is_empty() { + "null".to_string() + } else if normalize_langs.unwrap_or(true) { + let l = handle_language(lang); + if l.is_none() { + tracing::warn!("Failed to normalize language: {}", lang); + } + l.unwrap_or_else(|| lang.to_lowercase()) + } else { + lang.to_string() + }; + + language_counter_individual + .with_label_values(&[&processed_language]) + .inc(); + + processed_language + }) + .collect::>(); + + langs.iter().for_each(|lang| { + language_counter_individual.with_label_values(&[lang]).inc(); + }); + + let langs_joined = if langs.is_empty() { + "null".to_string() + } else { + langs.join(",") + }; + language_counter.with_label_values(&[&langs_joined]).inc(); + // handle for grouped languages + + if let Some(max_size) = max_sample_size { + if primary_counter.get() > max_size as u64 { + primary_counter.reset(); + } } - l.unwrap_or_else(|| lang.to_lowercase()) - } else { - lang.to_string() - }; - - language_counter_individual - .with_label_values(&[&processed_language]) - .inc(); - - processed_language - }) - .collect::>(); - - langs.iter().for_each(|lang| { - language_counter_individual.with_label_values(&[lang]).inc(); - }); - - let langs_joined = if langs.is_empty() { - "null".to_string() - } else { - langs.join(",") - }; - language_counter.with_label_values(&[&langs_joined]).inc(); - // handle for grouped languages - - if let Some(max_size) = max_sample_size { - if primary_counter.get() > max_size as u64 { - primary_counter.reset(); + } + } else { + break; } } }