Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
Fix download counts (#746)
Browse files Browse the repository at this point in the history
* Fix download counts

* remove unsafe send

* update indexing time
  • Loading branch information
Geometrically committed Nov 6, 2023
1 parent 40f28be commit 9658d1e
Show file tree
Hide file tree
Showing 9 changed files with 425 additions and 502 deletions.
515 changes: 282 additions & 233 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ rust_decimal = { version = "1.30.0", features = [
"serde-with-float",
"serde-with-str",
] }
redis = { version = "0.23.0", features = ["tokio-comp", "ahash", "r2d2"] }
deadpool-redis = "0.12.0"
redis = { version = "0.23.3", features = ["tokio-comp", "ahash", "r2d2"]}
deadpool-redis = "0.13.0"
clickhouse = { version = "0.11.2", features = ["uuid", "time"] }
uuid = { version = "1.2.2", features = ["v4", "fast-rng", "serde"] }

Expand Down
2 changes: 1 addition & 1 deletion src/database/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const DEFAULT_EXPIRY: i64 = 1800; // 30 minutes

#[derive(Clone)]
pub struct RedisPool {
pool: deadpool_redis::Pool,
pub pool: deadpool_redis::Pool,
meta_namespace: String,
}

Expand Down
33 changes: 9 additions & 24 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use actix_web::web;
use database::redis::RedisPool;
use log::{info, warn};
use queue::{
analytics::AnalyticsQueue, download::DownloadQueue, payouts::PayoutsQueue, session::AuthQueue,
analytics::AnalyticsQueue, payouts::PayoutsQueue, session::AuthQueue,
socket::ActiveSockets,
};
use scheduler::Scheduler;
Expand Down Expand Up @@ -49,7 +49,6 @@ pub struct LabrinthConfig {
pub scheduler: Arc<Scheduler>,
pub ip_salt: Pepper,
pub search_config: search::SearchConfig,
pub download_queue: web::Data<DownloadQueue>,
pub session_queue: web::Data<AuthQueue>,
pub payouts_queue: web::Data<Mutex<PayoutsQueue>>,
pub analytics_queue: Arc<AnalyticsQueue>,
Expand Down Expand Up @@ -139,24 +138,6 @@ pub fn app_setup(

scheduler::schedule_versions(&mut scheduler, pool.clone());

let download_queue = web::Data::new(DownloadQueue::new());

let pool_ref = pool.clone();
let download_queue_ref = download_queue.clone();
scheduler.run(std::time::Duration::from_secs(60 * 5), move || {
let pool_ref = pool_ref.clone();
let download_queue_ref = download_queue_ref.clone();

async move {
info!("Indexing download queue");
let result = download_queue_ref.index(&pool_ref).await;
if let Err(e) = result {
warn!("Indexing download queue failed: {:?}", e);
}
info!("Done indexing download queue");
}
});

let session_queue = web::Data::new(AuthQueue::new());

let pool_ref = pool.clone();
Expand Down Expand Up @@ -202,13 +183,19 @@ pub fn app_setup(
{
let client_ref = clickhouse.clone();
let analytics_queue_ref = analytics_queue.clone();
scheduler.run(std::time::Duration::from_secs(60 * 5), move || {
let pool_ref = pool.clone();
let redis_ref = redis_pool.clone();
scheduler.run(std::time::Duration::from_secs(15), move || {
let client_ref = client_ref.clone();
let analytics_queue_ref = analytics_queue_ref.clone();
let pool_ref = pool_ref.clone();
let redis_ref = redis_ref.clone();

async move {
info!("Indexing analytics queue");
let result = analytics_queue_ref.index(client_ref).await;
let result = analytics_queue_ref
.index(client_ref, &redis_ref, &pool_ref)
.await;
if let Err(e) = result {
warn!("Indexing analytics queue failed: {:?}", e);
}
Expand Down Expand Up @@ -252,7 +239,6 @@ pub fn app_setup(
maxmind,
scheduler: Arc::new(scheduler),
ip_salt,
download_queue,
search_config,
session_queue,
payouts_queue,
Expand Down Expand Up @@ -282,7 +268,6 @@ pub fn app_config(cfg: &mut web::ServiceConfig, labrinth_config: LabrinthConfig)
.app_data(web::Data::new(labrinth_config.pool.clone()))
.app_data(web::Data::new(labrinth_config.file_host.clone()))
.app_data(web::Data::new(labrinth_config.search_config.clone()))
.app_data(labrinth_config.download_queue.clone())
.app_data(labrinth_config.session_queue.clone())
.app_data(labrinth_config.payouts_queue.clone())
.app_data(web::Data::new(labrinth_config.ip_salt.clone()))
Expand Down
161 changes: 127 additions & 34 deletions src/queue/analytics.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use crate::database::models::DatabaseError;
use crate::models::analytics::{Download, PageView, Playtime};
use dashmap::DashSet;
use crate::routes::ApiError;
use dashmap::{DashMap, DashSet};
use redis::cmd;
use sqlx::PgPool;
use crate::database::redis::RedisPool;

#[cfg(test)]
mod tests;

const VIEWS_TABLENAME: &str = "views";
const DOWNLOADS_TABLENAME: &str = "downloads";
const PLAYTIME_TABLENAME: &str = "playtime";
const DOWNLOADS_NAMESPACE: &str = "downloads";

pub struct AnalyticsQueue {
views_queue: DashSet<PageView>,
downloads_queue: DashSet<Download>,
downloads_queue: DashMap<String, Download>,
playtime_queue: DashSet<Playtime>,
}

Expand All @@ -25,7 +25,7 @@ impl AnalyticsQueue {
pub fn new() -> Self {
AnalyticsQueue {
views_queue: DashSet::with_capacity(1000),
downloads_queue: DashSet::with_capacity(1000),
downloads_queue: DashMap::with_capacity(1000),
playtime_queue: DashSet::with_capacity(1000),
}
}
Expand All @@ -35,45 +35,138 @@ impl AnalyticsQueue {
}

pub fn add_download(&self, download: Download) {
self.downloads_queue.insert(download);
let octets = download.ip.octets();
let ip_stripped = u64::from_be_bytes([
octets[0], octets[1], octets[2], octets[3], octets[4], octets[5], octets[6], octets[7],
]);
self.downloads_queue
.insert(format!("{}-{}", ip_stripped, download.project_id), download);
}

pub fn add_playtime(&self, playtime: Playtime) {
self.playtime_queue.insert(playtime);
}

pub async fn index(&self, client: clickhouse::Client) -> Result<(), clickhouse::error::Error> {
Self::index_queue(&client, &self.views_queue, VIEWS_TABLENAME).await?;
Self::index_queue(&client, &self.downloads_queue, DOWNLOADS_TABLENAME).await?;
Self::index_queue(&client, &self.playtime_queue, PLAYTIME_TABLENAME).await?;
pub async fn index(
&self,
client: clickhouse::Client,
redis: &RedisPool,
pool: &PgPool,
) -> Result<(), ApiError> {
let views_queue = self.views_queue.clone();
self.views_queue.clear();

Ok(())
}
let downloads_queue = self.downloads_queue.clone();
self.downloads_queue.clear();

let playtime_queue = self.playtime_queue.clone();
self.playtime_queue.clear();

async fn index_queue<T>(
client: &clickhouse::Client,
queue: &DashSet<T>,
table_name: &str,
) -> Result<(), clickhouse::error::Error>
where
T: serde::Serialize + Eq + std::hash::Hash + Clone + clickhouse::Row,
{
if queue.is_empty() {
return Ok(());
if !views_queue.is_empty() {
let mut views = client.insert("views")?;

for view in views_queue {
views.write(&view).await?;
}

views.end().await?;
}

let current_queue = queue.clone();
queue.clear();
if !playtime_queue.is_empty() {
let mut playtimes = client.insert("playtime")?;

let mut inserter = client.inserter(table_name)?;
for playtime in playtime_queue {
playtimes.write(&playtime).await?;
}

for row in current_queue {
inserter.write(&row).await?;
inserter.commit().await?;
playtimes.end().await?;
}

inserter.end().await?;
if !downloads_queue.is_empty() {
let mut downloads_keys = Vec::new();
let raw_downloads = DashMap::new();

for (index, (key, download)) in downloads_queue.into_iter().enumerate() {
downloads_keys.push(key);
raw_downloads.insert(index, download);
}

let mut redis = redis.pool.get().await.map_err(DatabaseError::RedisPool)?;

let results = cmd("MGET")
.arg(
downloads_keys
.iter()
.map(|x| format!("{}:{}", DOWNLOADS_NAMESPACE, x))
.collect::<Vec<_>>(),
)
.query_async::<_, Vec<Option<u32>>>(&mut redis)
.await
.map_err(DatabaseError::CacheError)?;

let mut pipe = redis::pipe();
for (idx, count) in results.into_iter().enumerate() {
let key = &downloads_keys[idx];

let new_count = if let Some(count) = count {
if count > 5 {
raw_downloads.remove(&idx);
continue;
}

count + 1
} else {
1
};

pipe.atomic().set_ex(
format!("{}:{}", DOWNLOADS_NAMESPACE, key),
new_count,
6 * 60 * 60,
);
}
pipe.query_async(&mut *redis)
.await
.map_err(DatabaseError::CacheError)?;

let version_ids = raw_downloads
.iter()
.map(|x| x.version_id as i64)
.collect::<Vec<_>>();
let project_ids = raw_downloads
.iter()
.map(|x| x.project_id as i64)
.collect::<Vec<_>>();

let mut transaction = pool.begin().await?;
let mut downloads = client.insert("downloads")?;

for (_, download) in raw_downloads {
downloads.write(&download).await?;
}

sqlx::query!(
"UPDATE versions
SET downloads = downloads + 1
WHERE id = ANY($1)",
&version_ids
)
.execute(&mut *transaction)
.await?;

sqlx::query!(
"UPDATE mods
SET downloads = downloads + 1
WHERE id = ANY($1)",
&project_ids
)
.execute(&mut *transaction)
.await?;

transaction.commit().await?;
downloads.end().await?;
}

Ok(())
}
}
}
Loading

0 comments on commit 9658d1e

Please sign in to comment.