Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add missing indexes #138

Merged
merged 2 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 90 additions & 79 deletions src/endpoints/leaderboard/get_ranking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,96 +39,102 @@ use axum::{
Json,
};

use futures::TryStreamExt;
use mongodb::bson::{doc, Document};
use mongodb::Collection;
use reqwest::StatusCode;
use std::sync::Arc;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use std::sync::Arc;


pub async fn get_user_rank(collection: &Collection<Document>, address: &String, start_timestamp: &i64, end_timestamp: &i64) -> Document {
pub async fn get_user_rank(
collection: &Collection<Document>,
address: &String,
start_timestamp: &i64,
end_timestamp: &i64,
) -> Document {
let user_rank_pipeline = vec![
doc! {
"$match": doc! {
"timestamp": doc! {
"$gte": start_timestamp,
"$lte": end_timestamp
"$match": doc! {
"timestamp": doc! {
"$gte": start_timestamp,
"$lte": end_timestamp
}
}
}
},
doc! {
"$sort": doc! {
"experience": -1,
"timestamp": 1,
"_id": 1
}
},
},
doc! {
"$addFields": doc! {
"tempSortField": 1
}
},
"$sort": doc! {
"experience": -1,
"timestamp": 1,
"_id": 1
}
},
doc! {
"$setWindowFields": doc! {
"sortBy": doc! {
"tempSortField": -1
},
"output": doc! {
"rank": doc! {
"$documentNumber": doc! {}
}
"$addFields": doc! {
"tempSortField": 1
}
}
},
},
doc! {
"$facet": doc! {
"total_users": [
doc! {
"$count": "total"
}
],
"user_rank": [
doc! {
"$match": doc! {
"_id": address
}
"$setWindowFields": doc! {
"sortBy": doc! {
"tempSortField": -1
},
doc! {
"$project": doc! {
"_id": 0,
"rank": "$rank"
"output": doc! {
"rank": doc! {
"$documentNumber": doc! {}
}
}
]
}
},
}
},
doc! {
"$project": doc! {
"total_users": doc! {
"$arrayElemAt": [
"$total_users.total",
0
]
},
"rank": doc! {
"$arrayElemAt": [
"$user_rank",
0
"$facet": doc! {
"total_users": [
doc! {
"$count": "total"
}
],
"user_rank": [
doc! {
"$match": doc! {
"_id": address
}
},
doc! {
"$project": doc! {
"_id": 0,
"rank": "$rank"
}
}
]
}
}
},
},
doc! {
"$project": doc! {
"total_users": 1,
"rank": "$rank.rank"
}
},
"$project": doc! {
"total_users": doc! {
"$arrayElemAt": [
"$total_users.total",
0
]
},
"rank": doc! {
"$arrayElemAt": [
"$user_rank",
0
]
}
}
},
doc! {
"$project": doc! {
"total_users": 1,
"rank": "$rank.rank"
}
},
];

// add allow disk use to view options
let view_options = mongodb::options::AggregateOptions::builder().allow_disk_use(true).build();
let view_options = mongodb::options::AggregateOptions::builder()
.allow_disk_use(true)
.build();

return match collection.aggregate(user_rank_pipeline, view_options).await {
Ok(mut cursor) => {
Expand Down Expand Up @@ -171,12 +177,10 @@ pub fn get_default_range(rank: i64, page_size: i64, total_users: i64) -> i64 {
if rank <= page_size / 2 {
lower_range = 1;
}

// if rank is in bottom half of the last page then return default range
else if rank >= (total_users - page_size / 2) {
lower_range = total_users - page_size;
}

// if rank is in middle then return modified range where rank will be placed in middle of page
else {
lower_range = rank - (page_size / 2 - 1);
Expand Down Expand Up @@ -209,7 +213,6 @@ mod tests {
}
}


#[derive(Debug, Serialize, Deserialize)]
pub struct GetCompletedQuestsQuery {
/*
Expand Down Expand Up @@ -260,7 +263,13 @@ pub async fn handler(
let shift = query.shift;

// get user rank and total users
let stats = get_user_rank(&users_collection, &address, &start_timestamp, &end_timestamp).await;
let stats = get_user_rank(
&users_collection,
&address,
&start_timestamp,
&end_timestamp,
)
.await;
let total_users = stats.get("total_users").unwrap().as_i32().unwrap() as i64;
let user_rank = stats.get("user_rank").unwrap().as_i32().unwrap() as i64;

Expand All @@ -274,7 +283,6 @@ pub async fn handler(
if shift == 0 {
lower_range = get_default_range(user_rank, page_size, total_users);
}

// get user position and set range if shift
else {
let default_lower_range = get_default_range(user_rank, page_size, total_users);
Expand All @@ -287,7 +295,6 @@ pub async fn handler(
*/
let shift_in_elements = shift * page_size;


/*
-> if lower range becomes negative then set it to 0
-> if lower range becomes greater than total users then set it to total users - page_size to show last page.
Expand Down Expand Up @@ -344,19 +351,23 @@ pub async fn handler(
},
];

match users_collection.aggregate(paginated_leaderboard_pipeline, None).await {
match users_collection
.aggregate(paginated_leaderboard_pipeline, None)
.await
{
Ok(mut cursor) => {
let mut res = Document::new();
let mut ranking = Vec::new();
while let Some(result) = cursor.try_next().await.unwrap() {
ranking.push(result);
}
res.insert("ranking".to_string(), ranking);
res.insert("first_elt_position".to_string(), if lower_range == 0 { 1 } else { lower_range });
res.insert(
"first_elt_position".to_string(),
if lower_range == 0 { 1 } else { lower_range },
);
(StatusCode::OK, Json(res)).into_response()
}
Err(_err) => {
get_error("Error querying ranks".to_string())
}
Err(_err) => get_error("Error querying ranks".to_string()),
}
}
22 changes: 17 additions & 5 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::fmt::Write;
use std::result::Result;
use std::str::FromStr;
use tokio::time::{sleep, Duration};

#[macro_export]
macro_rules! pub_struct {
($($derive:path),*; $name:ident {$($field:ident: $t:ty),* $(,)?}) => {
Expand Down Expand Up @@ -385,7 +386,6 @@ pub async fn update_leaderboard(
experience: i64,
timestamp: f64,
) {

// get current experience and new experience to it
let mut old_experience = 0;
let filter = doc! { "_id": &*address };
Expand Down Expand Up @@ -430,12 +430,24 @@ pub async fn add_leaderboard_table(db: &Database) {
// create materialised view
source_collection.aggregate(pipeline, None).await.unwrap();

let index = IndexModel::builder()
//create multiple indexes to speed it up
let timestamp_only = IndexModel::builder().keys(doc! { "timestamp":1}).build();
view_collection
.create_index(timestamp_only, None)
.await
.unwrap();
let addrs_only = IndexModel::builder().keys(doc! { "_id":1}).build();
view_collection
.create_index(addrs_only, None)
.await
.unwrap();
let compound_index = IndexModel::builder()
.keys(doc! { "experience": -1,"timestamp":1,"_id":1})
.build();

//add indexing to materialised view
view_collection.create_index(index, None).await.unwrap();
view_collection
.create_index(compound_index, None)
.await
.unwrap();
}

pub async fn fetch_and_update_boosts_winner(
Expand Down