Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
Fix download counts (#746)
Browse files Browse the repository at this point in the history
* Fix download counts

* remove unsafe send

* update indexing time
  • Loading branch information
Geometrically authored Nov 6, 2023
1 parent 4482aac commit c50bdda
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 183 deletions.
20 changes: 7 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ thiserror = "1.0.41"

sqlx = { version = "0.6.3", features = ["offline", "runtime-tokio-rustls", "postgres", "chrono", "macros", "migrate", "decimal", "json"] }
rust_decimal = { version = "1.30.0", features = ["serde-with-float", "serde-with-str"] }
redis = { version = "0.23.0", features = ["tokio-comp", "ahash", "r2d2"]}
deadpool-redis = "0.12.0"
redis = { version = "0.23.3", features = ["tokio-comp", "ahash", "r2d2"]}
deadpool-redis = "0.13.0"
clickhouse = { version = "0.11.2", features = ["uuid", "time"] }
uuid = { version = "1.2.2", features = ["v4", "fast-rng", "serde"] }

Expand Down
106 changes: 50 additions & 56 deletions sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,32 @@
},
"query": "\n UPDATE notifications\n SET read = TRUE\n WHERE id = ANY($1)\n "
},
"155361716f9d697c0d961b7bbad30e70698a8e5c9ceaa03b2091e058b58fb938": {
"describe": {
"columns": [
{
"name": "id",
"ordinal": 0,
"type_info": "Int8"
},
{
"name": "mod_id",
"ordinal": 1,
"type_info": "Int8"
}
],
"nullable": [
false,
false
],
"parameters": {
"Left": [
"Text"
]
}
},
"query": "\n SELECT v.id id, v.mod_id mod_id FROM files f\n INNER JOIN versions v ON v.id = f.version_id\n WHERE f.url = $1\n "
},
"15fac93c76e72348b50f526e1acb183521d94be335ad8b9dfeb0398d4a8a2fc4": {
"describe": {
"columns": [],
Expand Down Expand Up @@ -2095,18 +2121,6 @@
},
"query": "\n UPDATE threads\n SET show_in_mod_inbox = $1\n WHERE id = $2\n "
},
"49a5d21a1454afc6383b78e468fd0decc75b9163e7286f34ceab22d563a0d3f7": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": [
"Int8"
]
}
},
"query": "UPDATE mods\n SET downloads = downloads + 1\n WHERE (id = $1)"
},
"4a54d350b4695c32a802675506e85b0506fc62a63ca0ee5f38890824301d6515": {
"describe": {
"columns": [],
Expand Down Expand Up @@ -2160,18 +2174,6 @@
},
"query": "SELECT EXISTS(SELECT 1 FROM mods WHERE id = $1)"
},
"4d54032b02c860f4facec39eacb4548a0701d4505e7a80b4834650696df69c2b": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": [
"Int8"
]
}
},
"query": "UPDATE versions\n SET downloads = downloads + 1\n WHERE (id = $1)"
},
"4d752ee3f43a1bf34d71c4391c9232537e0941294951f383ea8fa61e9d83fc96": {
"describe": {
"columns": [],
Expand Down Expand Up @@ -4883,6 +4885,18 @@
},
"query": "\n UPDATE mods\n SET status = requested_status\n WHERE status = $1 AND approved < CURRENT_DATE AND requested_status IS NOT NULL\n "
},
"b993ec7579f06603a2a308dccd1ea1fbffd94286db48bc0e36a30f4f6a9d39af": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": [
"Int8Array"
]
}
},
"query": "UPDATE versions\n SET downloads = downloads + 1\n WHERE id = ANY($1)"
},
"b99e906aa6ca18b9f3f111eae7bf0d360f42385ca99228a844387bf9456a6a31": {
"describe": {
"columns": [],
Expand Down Expand Up @@ -5627,6 +5641,18 @@
},
"query": "\n UPDATE users\n SET gitlab_id = $2\n WHERE (id = $1)\n "
},
"d08c9ef6a8829ce1d23d66f27c58f4b9b64f4ce985e60ded871d1f31eb0c818b": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": [
"Int8Array"
]
}
},
"query": "UPDATE mods\n SET downloads = downloads + 1\n WHERE id = ANY($1)"
},
"d1566672369ea22cb1f638f073f8e3fb467b354351ae71c67941323749ec9bcd": {
"describe": {
"columns": [
Expand Down Expand Up @@ -5997,38 +6023,6 @@
},
"query": "\n UPDATE dependencies\n SET dependency_id = NULL, mod_dependency_id = $2\n WHERE dependency_id = $1\n "
},
"dd57a6dd89fefedbde796ef02b308ce7dba17ca0c65ffd5f9e35e296a72d4c1c": {
"describe": {
"columns": [
{
"name": "id",
"ordinal": 0,
"type_info": "Int8"
},
{
"name": "mod_id",
"ordinal": 1,
"type_info": "Int8"
},
{
"name": "file_type",
"ordinal": 2,
"type_info": "Varchar"
}
],
"nullable": [
false,
false,
true
],
"parameters": {
"Left": [
"Text"
]
}
},
"query": "\n SELECT v.id id, v.mod_id mod_id, file_type FROM files f\n INNER JOIN versions v ON v.id = f.version_id\n WHERE f.url = $1\n "
},
"de1bf7e33a99a10154cefdbe3b8322e4c6a19448b6ee3c6087b1b8163bc52cb1": {
"describe": {
"columns": [],
Expand Down
30 changes: 8 additions & 22 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::file_hosting::S3Host;
use crate::queue::analytics::AnalyticsQueue;
use crate::queue::download::DownloadQueue;
use crate::queue::payouts::{process_payout, PayoutsQueue};
use crate::queue::session::AuthQueue;
use crate::queue::socket::ActiveSockets;
Expand Down Expand Up @@ -182,24 +181,6 @@ async fn main() -> std::io::Result<()> {

scheduler::schedule_versions(&mut scheduler, pool.clone());

let download_queue = web::Data::new(DownloadQueue::new());

let pool_ref = pool.clone();
let download_queue_ref = download_queue.clone();
scheduler.run(std::time::Duration::from_secs(60 * 5), move || {
let pool_ref = pool_ref.clone();
let download_queue_ref = download_queue_ref.clone();

async move {
info!("Indexing download queue");
let result = download_queue_ref.index(&pool_ref).await;
if let Err(e) = result {
warn!("Indexing download queue failed: {:?}", e);
}
info!("Done indexing download queue");
}
});

let session_queue = web::Data::new(AuthQueue::new());

let pool_ref = pool.clone();
Expand Down Expand Up @@ -248,13 +229,19 @@ async fn main() -> std::io::Result<()> {
{
let client_ref = clickhouse.clone();
let analytics_queue_ref = analytics_queue.clone();
scheduler.run(std::time::Duration::from_secs(60 * 5), move || {
let pool_ref = pool.clone();
let redis_ref = redis_pool.clone();
scheduler.run(std::time::Duration::from_secs(15), move || {
let client_ref = client_ref.clone();
let analytics_queue_ref = analytics_queue_ref.clone();
let pool_ref = pool_ref.clone();
let redis_ref = redis_ref.clone();

async move {
info!("Indexing analytics queue");
let result = analytics_queue_ref.index(client_ref).await;
let result = analytics_queue_ref
.index(client_ref, &redis_ref, &pool_ref)
.await;
if let Err(e) = result {
warn!("Indexing analytics queue failed: {:?}", e);
}
Expand Down Expand Up @@ -344,7 +331,6 @@ async fn main() -> std::io::Result<()> {
.app_data(web::Data::new(pool.clone()))
.app_data(web::Data::new(file_host.clone()))
.app_data(web::Data::new(search_config.clone()))
.app_data(download_queue.clone())
.app_data(session_queue.clone())
.app_data(payouts_queue.clone())
.app_data(web::Data::new(ip_salt.clone()))
Expand Down
Loading

0 comments on commit c50bdda

Please sign in to comment.