Skip to content

Commit

Permalink
Add ingest rate graph for admin panel.
Browse files Browse the repository at this point in the history
  • Loading branch information
Syfaro committed Feb 8, 2024
1 parent f5cec77 commit 5b3bd36
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 24 deletions.
96 changes: 96 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ actix-session = { version = "0.9", features = ["cookie-session"] }
actix-web = { version = "4.2.1", default-features = false, features = ["macros", "cookies"] }
actix-web-actors = "4"
actix-web-httpauth = "0.8"
actix-web-lab = "0.20.2"
anyhow = "1"
argonautica = "0.2"
askama = { version = "0.12.1" , features = ["serde-json", "humansize", "markdown"] }
Expand Down
76 changes: 75 additions & 1 deletion src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use futures::TryStreamExt;
use rusoto_s3::S3;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use tracing::Instrument;
use uuid::Uuid;

use crate::{
Expand Down Expand Up @@ -287,6 +288,79 @@ async fn events(
}
}

#[get("/ingest/stats")]
async fn ingest_stats(
user: models::User,
nats: web::Data<async_nats::Client>,
) -> impl actix_web::Responder {
if !user.is_admin {
return Err(Error::Unauthorized);
}

let (tx, rx) = tokio::sync::mpsc::channel::<Result<_, Error>>(10);

tokio::task::spawn_local(
async move {
let mut counts = std::collections::HashMap::<_, usize>::new();
let mut ticker = tokio::time::interval(std::time::Duration::from_secs(1));

loop {
let mut sub = match nats.subscribe("fuzzysearch.ingest.>").await {
Ok(sub) => sub,
Err(err) => {
if let Err(err) = tx.send(Err(Error::from_displayable(err))).await {
tracing::error!("could not send error: {err}");
}
return;
}
};

tokio::select! {
msg = sub.next() => {
let msg = match msg {
Some(msg) => msg,
None => {
tracing::info!("subscriber ended");
return;
}
};

let subject = msg.subject.strip_prefix("fuzzysearch.ingest.").expect("got wrong subject").to_string();
*counts.entry(subject).or_default() += 1;
}
_ = ticker.tick() => {
match actix_web_lab::sse::Data::new_json(serde_json::json!({
"timestamp": chrono::Utc::now().timestamp(),
"counts": counts,
})) {
Ok(data) => {
if let Err(err) = tx.send(Ok(actix_web_lab::sse::Event::Data(data))).await {
tracing::error!("could not send stats: {err}");
return;
}
},
Err(err) => {
tracing::error!("could not serialize data: {err}");
if let Err(err) = tx.send(Err(err.into())).await {
tracing::error!("could not send error: {err}");
return;
}
}
}
}
_ = tx.closed() => {
tracing::info!("channel closed");
return;
}
}
}
}
.in_current_span(),
);

Ok(actix_web_lab::sse::Sse::from_receiver(rx))
}

#[post("/upload")]
async fn upload(
pool: web::Data<PgPool>,
Expand Down Expand Up @@ -417,7 +491,7 @@ async fn flist_lookup(

pub fn service() -> Scope {
web::scope("/api")
.service(services![events, upload, flist_lookup])
.service(services![events, ingest_stats, upload, flist_lookup])
.service(web::scope("/service").service(services![fuzzysearch]))
.service(web::scope("/chunk").service(services![chunk_add]))
}
47 changes: 24 additions & 23 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,6 @@ pub struct WorkerConfig {
/// Queues to fetch jobs from.
#[clap(long, env("FAKTORY_QUEUES"), value_enum, use_value_delimiter = true)]
pub faktory_queues: Vec<jobs::Queue>,

/// NATS hosts.
#[clap(
long,
env("NATS_HOST"),
use_value_delimiter = true,
value_delimiter = ','
)]
pub nats_host: Vec<ServerAddr>,
/// NATS NKEY, may be omitted if authentication is not needed.
#[clap(long, env("NATS_NKEY"))]
pub nats_nkey: Option<String>,
}

#[derive(Clone, clap::Subcommand)]
Expand Down Expand Up @@ -137,6 +125,18 @@ pub struct Config {
#[clap(long, env("REDIS_DSN"))]
pub redis_dsn: String,

/// NATS hosts.
#[clap(
long,
env("NATS_HOST"),
use_value_delimiter = true,
value_delimiter = ','
)]
pub nats_host: Vec<ServerAddr>,
/// NATS NKEY, may be omitted if authentication is not needed.
#[clap(long, env("NATS_NKEY"))]
pub nats_nkey: Option<String>,

#[clap(long, env("UNLEASH_API_URL"))]
pub unleash_api_url: String,
#[clap(long, env("UNLEASH_SECRET"))]
Expand Down Expand Up @@ -623,6 +623,17 @@ async fn main() {
.await
.expect("could not connect to faktory");

let nats_opts = if let Some(nats_nkey) = &config.nats_nkey {
async_nats::ConnectOptions::with_nkey(nats_nkey.clone())
} else {
async_nats::ConnectOptions::default()
};

let nats = nats_opts
.connect(&config.nats_host)
.await
.expect("could not connect to nats");

let unleash = foxlib::flags::client::<Features>(
env!("CARGO_PKG_NAME"),
&config.unleash_api_url,
Expand Down Expand Up @@ -658,17 +669,6 @@ async fn main() {

let telegram = Arc::new(tgbotapi::Telegram::new(config.telegram_bot_token.clone()));

let nats_opts = if let Some(nats_nkey) = &worker_config.nats_nkey {
async_nats::ConnectOptions::with_nkey(nats_nkey.clone())
} else {
async_nats::ConnectOptions::default()
};

let nats = nats_opts
.connect(&worker_config.nats_host)
.await
.expect("could not connect to nats");

let ctx = jobs::JobContext {
producer,
conn: pool,
Expand Down Expand Up @@ -735,6 +735,7 @@ async fn main() {
.app_data(web::Data::new(telegram_login.clone()))
.app_data(web::Data::new(producer.clone()))
.app_data(web::Data::new(unleash.clone()))
.app_data(web::Data::new(nats.clone()))
.service(auth::service())
.service(user::service())
.service(api::service())
Expand Down
74 changes: 74 additions & 0 deletions templates/admin/_ingest_stats.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<div id="ingest-stats" style="height: 400px;"></div>

<script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/echarts.min.js"
integrity="sha256-EVZCmhajjLhgTcxlGMGUBtQiYULZCPjt0uNTFEPFTRk=" crossorigin="anonymous"></script>

<script>
let pointDates = [];
let series = {};
let previousValues = {};
let addedPoints = -1;

const chart = echarts.init(document.getElementById('ingest-stats'));

const opts = {
title: {
left: 'center',
text: 'Ingested Media',
},
tooltip: {
trigger: 'axis',
},
toolbox: {
feature: {
dataZoom: {
yAxisIndex: 'none',
},
restore: {},
saveAsImage: {},
},
},
xAxis: {
type: 'category',
boundaryGap: false,
data: pointDates,
},
yAxis: {
name: 'Additions',
type: 'value',
boundaryGap: [0, '100%'],
},
series: [],
};

chart.setOption(opts);

const evtSource = new EventSource('/api/ingest/stats');

evtSource.onmessage = (e) => {
const data = JSON.parse(e.data);

for (let [key, value] of Object.entries(data['counts'])) {
if (series[key] === undefined) {
series[key] = {
name: key,
type: 'line',
data: Array(addedPoints + 1).fill(0),
};
}

series[key]['data'].push(value - (previousValues[key] || 0));
previousValues[key] = value;
}

pointDates.push(new Date(data['timestamp'] * 1000));
addedPoints += 1;

chart.setOption({
xAxis: {
data: pointDates,
},
series: Object.values(series),
});
};
</script>
Loading

0 comments on commit 5b3bd36

Please sign in to comment.