Skip to content
This repository was archived by the owner on Feb 24, 2025. It is now read-only.

Commit a2a235a

Browse files
committed
feat: the first demo for export
1 parent 256dea6 commit a2a235a

File tree

14 files changed

+381
-151
lines changed

14 files changed

+381
-151
lines changed

Cargo.lock

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@ socketioxide = { version = "0.12.0", features = [
4141
"extensions",
4242
"tracing",
4343
] }
44+
tempfile = "3.10.1"
4445
tokio = { version = "1.37.0", features = ["full"] }
4546
tower = "0.4.13"
46-
tower-http = "0.5.2"
47+
tower-http = { version = "0.5.2", features = ["cors"] }
4748
tracing = { version = "0.1.40", features = ["log"] }
4849
tracing-subscriber = { version = "0.3.18", features = [
4950
"tracing",
@@ -53,7 +54,7 @@ tracing-subscriber = { version = "0.3.18", features = [
5354
"json",
5455
"regex",
5556
] }
56-
uuid = "1.8.0"
57+
uuid = { version = "1.8.0", features = ["v1", "v4", "serde"] }
5758
xlsxwriter = "0.6.0"
5859
zerocopy = "0.7.32"
5960

public/.DS_Store

6 KB
Binary file not shown.

public/exports/.DS_Store

6 KB
Binary file not shown.

src/calc/mod.rs

Lines changed: 0 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -1,121 +1 @@
1-
use crate::{
2-
models::{activities::Activity, users::User},
3-
routers::users::time::UserActivityTime,
4-
};
5-
use bson::{doc, from_document};
6-
use futures::stream::TryStreamExt;
7-
use mongodb::{Collection, Database};
8-
use polars::{df, frame::DataFrame, prelude::NamedFrom, series::Series};
9-
use std::sync::Arc;
10-
use tokio::sync::Mutex;
111

12-
async fn export(db: Arc<Mutex<Database>>) -> Result<DataFrame, String> {
13-
let db = db.lock().await;
14-
let mut df = df!(
15-
"_id" => &["".to_string()],
16-
"id" => &["0".to_string()],
17-
"name" => &["Example".to_string()],
18-
"class" => &["".to_string()],
19-
"on_campus" => &[0.0],
20-
"off_campus" => &[0.0],
21-
"social_practice" => &[0.0],
22-
"total" => &[0.0]
23-
)
24-
.unwrap();
25-
26-
let users_collection: Collection<User> = db.collection("users");
27-
let activities_collection: Collection<Activity> = db.collection("activities");
28-
29-
let mut users = users_collection.find(doc! {}, None).await.unwrap();
30-
31-
while let Some(doc) = users.try_next().await.unwrap() {
32-
let pipeline = vec![
33-
doc! {
34-
"$match": {
35-
"$or": [
36-
{ "members._id": doc._id.clone() },
37-
{ "members._id": doc._id.to_hex() }
38-
]
39-
}
40-
},
41-
doc! {
42-
"$unwind": "$members"
43-
},
44-
doc! {
45-
"$match": {
46-
"$or": [
47-
{ "members._id": doc._id.clone() },
48-
{ "members._id": doc._id.to_hex() }
49-
]
50-
}
51-
},
52-
doc! {
53-
"$group": {
54-
"_id": "$members.mode",
55-
"totalDuration": { "$sum": "$members.duration" }
56-
}
57-
},
58-
doc! {
59-
"$group": {
60-
"_id": null,
61-
"on_campus": {
62-
"$sum": {
63-
"$cond": [{ "$eq": ["$_id", "on-campus"] }, "$totalDuration", 0.0]
64-
}
65-
},
66-
"off_campus": {
67-
"$sum": {
68-
"$cond": [{ "$eq": ["$_id", "off-campus"] }, "$totalDuration", 0.0]
69-
}
70-
},
71-
"social_practice": {
72-
"$sum": {
73-
"$cond": [{ "$eq": ["$_id", "social-practice"] }, "$totalDuration", 0.0]
74-
}
75-
},
76-
"total": { "$sum": "$totalDuration" }
77-
}
78-
},
79-
doc! {
80-
"$project": {
81-
"_id": 0,
82-
"on_campus": 1,
83-
"off_campus": 1,
84-
"social_practice": 1,
85-
"total": 1
86-
}
87-
},
88-
];
89-
let cursor = activities_collection.aggregate(pipeline, None).await;
90-
if let Err(_) = cursor {
91-
return Err("Failed to get cursor".to_string());
92-
}
93-
let mut cursor = cursor.unwrap();
94-
let result = cursor.try_next().await;
95-
if let Err(_) = result {
96-
return Err("Failed to get result".to_string());
97-
}
98-
let result = result.unwrap();
99-
if let None = result {
100-
return Err("Failed to get result".to_string());
101-
}
102-
let result = result.unwrap();
103-
let result: UserActivityTime = from_document(result).unwrap();
104-
let extend = DataFrame::new(vec![
105-
Series::new("_id", vec![doc._id.clone().to_hex()]),
106-
Series::new("id", vec![doc.id.clone()]),
107-
Series::new("name", vec![doc.name.clone()]),
108-
Series::new("class", vec!["".to_string()]),
109-
Series::new("on_campus", vec![result.on_campus]),
110-
Series::new("off_campus", vec![result.off_campus]),
111-
Series::new("social_practice", vec![result.social_practice]),
112-
Series::new("total", vec![result.total]),
113-
]);
114-
if let Err(_) = extend {
115-
return Err("Failed to create DataFrame".to_string());
116-
}
117-
let extend = extend.unwrap();
118-
df.extend(&extend).unwrap();
119-
}
120-
Ok(df)
121-
}

src/main.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ mod models;
66
mod routers;
77
mod tests;
88
mod utils;
9+
use crate::models::exports::ExportState;
910
use axum::{
11+
http::Method,
1012
routing::{delete, get, post, put},
1113
Extension, Router,
1214
};
@@ -16,8 +18,9 @@ use socketioxide::{
1618
extract::{AckSender, Bin, Data, SocketRef},
1719
SocketIo,
1820
};
19-
use std::sync::Arc;
21+
use std::{collections::HashMap, sync::Arc};
2022
use tokio::sync::Mutex;
23+
use tower_http::cors::{Any, CorsLayer};
2124

2225
fn on_connect(socket: SocketRef, Data(data): Data<Value>) {
2326
socket.emit("auth", data).ok();
@@ -45,6 +48,8 @@ async fn main() {
4548
.await
4649
.expect("Failed to create client");
4750

51+
let shared_export_state = Arc::new(Mutex::new(HashMap::new()) as ExportState);
52+
4853
let shared_client = Arc::new(Mutex::new(client));
4954

5055
let (_, io) = SocketIo::new_layer();
@@ -102,7 +107,18 @@ async fn main() {
102107
"/user/:user_id/time",
103108
get(routers::users::time::calculate_user_activity_time),
104109
)
105-
.layer(Extension(shared_client.clone()));
110+
.route("/export", post(routers::exports::export_activity_times))
111+
.route(
112+
"/export/:task_id",
113+
get(routers::exports::query_export_status),
114+
)
115+
.layer(Extension(shared_client.clone()))
116+
.layer(Extension(shared_export_state.clone()))
117+
.layer(
118+
CorsLayer::new()
119+
.allow_methods([Method::GET, Method::POST, Method::PUT, Method::DELETE])
120+
.allow_origin(Any),
121+
);
106122

107123
// Run the server
108124
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();

src/models/exports.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
use bson::{doc, oid::ObjectId};
2+
use serde::{Deserialize, Serialize};
3+
use std::collections::HashMap;
4+
use tokio::sync::Mutex;
5+
use uuid::Uuid;
6+
7+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
8+
#[serde(rename_all = "kebab-case")]
9+
pub enum ExportFormat {
10+
CSV,
11+
JSON,
12+
Excel,
13+
}
14+
15+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
16+
pub struct ExportActivityTimesOptions {
17+
pub start: u64, // Unix timestamp
18+
pub end: u64, // Unix timestamp
19+
pub format: ExportFormat,
20+
}
21+
22+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
23+
#[serde(rename_all = "kebab-case")]
24+
pub enum TaskStatus {
25+
Pending,
26+
Processing,
27+
Done,
28+
Error,
29+
}
30+
31+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
32+
pub struct Task {
33+
pub time: u64, // Unix timestamp
34+
pub actioner: ObjectId,
35+
pub options: ExportActivityTimesOptions,
36+
pub status: TaskStatus,
37+
pub result: Option<String>,
38+
pub percent: Option<f64>,
39+
}
40+
41+
pub type ExportState = Mutex<HashMap<Uuid, Task>>;

src/models/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod activities;
2+
pub mod exports;
23
pub mod groups;
34
pub mod notifications;
45
pub mod response;

0 commit comments

Comments
 (0)