Skip to content

Commit

Permalink
Implement a stream timeout to reset the app after 30 seconds of inact…
Browse files Browse the repository at this point in the history
…ivity.
  • Loading branch information
korewaChino committed Dec 5, 2024
1 parent 5eea206 commit 6a26b6c
Showing 1 changed file with 150 additions and 134 deletions.
284 changes: 150 additions & 134 deletions skystreamer-prometheus-exporter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,143 +157,159 @@ async fn main() -> Result<()> {
let stream = post_stream.stream().await?;

futures::pin_mut!(stream);
// let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
// interval.tick().await;

// let mut last_tick = tokio::time::Instant::now();

while let Some(post) = stream.next().await {
// Total number of posts
primary_counter.inc();
{
// Count the number of posts by label
for label in post.labels.iter() {
label_counter.with_label_values(&[label]).inc();
}

// Count the number of posts by tag
for tag in post.tags.iter() {
tags_counter.with_label_values(&[tag]).inc();
}
}

// Quote Posts
{
let is_quote = if let Some(embed) = post.embed.as_ref() {
matches!(
embed,
skystreamer::types::Embed::Record(_)
| skystreamer::types::Embed::RecordWithMedia(_, _)
)
} else {
false
};

if is_quote {
quote_counter.with_label_values(&["true"]).inc();
}
}
// End Quote Posts

// Posts with media that have alt text
{
let has_alt_text = get_post_media(&post).iter().any(|media| match media {
skystreamer::types::Media::Image(i) => !i.alt.is_empty(),
skystreamer::types::Media::Video(v) => v.alt.is_some(),
});

if has_alt_text {
alt_text_counter.with_label_values(&["true"]).inc();
};
}
{
// Posts that are replies to other posts
let is_reply = post.reply.is_some();

if is_reply {
reply_counter.with_label_values(&["reply"]).inc();
}
}

{
// Posts with media by type
let post_media = get_post_media(&post);
if !post_media.is_empty() {
// get the first media type, because images and videos are mutually exclusive
let media_type = match post_media.first().unwrap() {
skystreamer::types::Media::Image(_) => "image",
skystreamer::types::Media::Video(_) => "video",
};
media_posts_counter.with_label_values(&[media_type]).inc();
}
}

// Posts by external media's domain name
{
let external_link = post.embed.and_then(|embed| match embed {
skystreamer::types::Embed::External(e) => Some(Url::parse(&e.uri)),
_ => None,
});

// get domain
let domain_name = external_link.and_then(|link| link.ok()).and_then(|url| {
url.domain().map(|domain| {
domain
.strip_prefix("www.")
.unwrap_or(domain)
.to_string()
.to_lowercase()
})
});

if let Some(domain) = domain_name {
posts_external_links.with_label_values(&[&domain]).inc();
}
}
// set a timer of 30 seconds
// if the stream doesn't return anything in 30 seconds, we'll just exit with an error
let timer = tokio::time::sleep(std::time::Duration::from_secs(30));
tokio::pin!(timer);

loop {
tokio::select! {
_ = &mut timer => {
tracing::error!("Stream timed out");
return Err(color_eyre::eyre::eyre!("Stream timed out"));
},
post = stream.next() => {
if let Some(post) = post {
// Reset the timer on receiving a post
timer.as_mut().reset(tokio::time::Instant::now() + std::time::Duration::from_secs(30));

// Total number of posts
primary_counter.inc();
{
// Count the number of posts by label
for label in post.labels.iter() {
label_counter.with_label_values(&[label]).inc();
}

{
// Languages of the posts
let langs = post
.language
.iter()
.map(|lang| {
// Let's normalize all the languages to its main language

let processed_language = if lang.is_empty() {
"null".to_string()
} else if normalize_langs.unwrap_or(true) {
let l = handle_language(lang);
if l.is_none() {
tracing::warn!("Failed to normalize language: {}", lang);
// Count the number of posts by tag
for tag in post.tags.iter() {
tags_counter.with_label_values(&[tag]).inc();
}
}

// Quote Posts
{
let is_quote = if let Some(embed) = post.embed.as_ref() {
matches!(
embed,
skystreamer::types::Embed::Record(_)
| skystreamer::types::Embed::RecordWithMedia(_, _)
)
} else {
false
};

if is_quote {
quote_counter.with_label_values(&["true"]).inc();
}
}
// End Quote Posts

// Posts with media that have alt text
{
let has_alt_text = get_post_media(&post).iter().any(|media| match media {
skystreamer::types::Media::Image(i) => !i.alt.is_empty(),
skystreamer::types::Media::Video(v) => v.alt.is_some(),
});

if has_alt_text {
alt_text_counter.with_label_values(&["true"]).inc();
};
}
{
// Posts that are replies to other posts
let is_reply = post.reply.is_some();

if is_reply {
reply_counter.with_label_values(&["reply"]).inc();
}
}

{
// Posts with media by type
let post_media = get_post_media(&post);
if (!post_media.is_empty()) {
// get the first media type, because images and videos are mutually exclusive
let media_type = match post_media.first().unwrap() {
skystreamer::types::Media::Image(_) => "image",
skystreamer::types::Media::Video(_) => "video",
};
media_posts_counter.with_label_values(&[media_type]).inc();
}
}

// Posts by external media's domain name
{
let external_link = post.embed.and_then(|embed| match embed {
skystreamer::types::Embed::External(e) => Some(Url::parse(&e.uri)),
_ => None,
});

// get domain
let domain_name = external_link.and_then(|link| link.ok()).and_then(|url| {
url.domain().map(|domain| {
domain
.strip_prefix("www.")
.unwrap_or(domain)
.to_string()
.to_lowercase()
})
});

if let Some(domain) = domain_name {
posts_external_links.with_label_values(&[&domain]).inc();
}
}

{
// Languages of the posts
let langs = post
.language
.iter()
.map(|lang| {
// Let's normalize all the languages to its main language

let processed_language = if lang.is_empty() {
"null".to_string()
} else if normalize_langs.unwrap_or(true) {
let l = handle_language(lang);
if l.is_none() {
tracing::warn!("Failed to normalize language: {}", lang);
}
l.unwrap_or_else(|| lang.to_lowercase())
} else {
lang.to_string()
};

language_counter_individual
.with_label_values(&[&processed_language])
.inc();

processed_language
})
.collect::<Vec<_>>();

langs.iter().for_each(|lang| {
language_counter_individual.with_label_values(&[lang]).inc();
});

let langs_joined = if langs.is_empty() {
"null".to_string()
} else {
langs.join(",")
};
language_counter.with_label_values(&[&langs_joined]).inc();
// handle for grouped languages

if let Some(max_size) = max_sample_size {
if primary_counter.get() > max_size as u64 {
primary_counter.reset();
}
}
l.unwrap_or_else(|| lang.to_lowercase())
} else {
lang.to_string()
};

language_counter_individual
.with_label_values(&[&processed_language])
.inc();

processed_language
})
.collect::<Vec<_>>();

langs.iter().for_each(|lang| {
language_counter_individual.with_label_values(&[lang]).inc();
});

let langs_joined = if langs.is_empty() {
"null".to_string()
} else {
langs.join(",")
};
language_counter.with_label_values(&[&langs_joined]).inc();
// handle for grouped languages

if let Some(max_size) = max_sample_size {
if primary_counter.get() > max_size as u64 {
primary_counter.reset();
}
} else {
break;
}
}
}
Expand Down

0 comments on commit 6a26b6c

Please sign in to comment.