Skip to content

Commit

Permalink
feat: implement history sync
Browse files Browse the repository at this point in the history
  • Loading branch information
0xbrayo committed Nov 7, 2024
1 parent 8026bcd commit c44b313
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 14 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ reqwest = { version = "0.12.7", features = ["json"] }
serde_json = "1.0.128"
serde_yaml = "0.9.34"
tokio = { version = "1.40.0", features = ["full"] }
regex = "1.10.3"
142 changes: 128 additions & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,94 @@
use aw_client_rust::AwClient;
use aw_models::{Bucket, Event};
use chrono::{TimeDelta, Utc};
use dirs::config_dir;
use chrono::{DateTime, Duration as ChronoDuration, NaiveDateTime, TimeDelta, Utc};
use regex::Regex;
use std::path::PathBuf;
use env_logger::Env;
use log::{info, warn};
use reqwest;
use serde_json::{Map, Value};
use serde_yaml;
use std::env;
use dirs::config_dir;
use std::fs::{DirBuilder, File};
use std::io::prelude::*;
use std::thread::sleep;
use tokio::time::{interval, Duration};

fn parse_time_string(time_str: &str) -> Option<ChronoDuration> {
let re = Regex::new(r"^(\d+)([dhm])$").unwrap();
if let Some(caps) = re.captures(time_str) {
let amount: i64 = caps.get(1)?.as_str().parse().ok()?;
let unit = caps.get(2)?.as_str();

match unit {
"d" => Some(ChronoDuration::days(amount)),
"h" => Some(ChronoDuration::hours(amount)),
"m" => Some(ChronoDuration::minutes(amount)),
_ => None,
}
} else {
None
}
}

async fn sync_historical_data(
client: &reqwest::Client,
aw_client: &AwClient,
username: &str,
apikey: &str,
from_time: ChronoDuration,
) -> Result<(), Box<dyn std::error::Error>> {
let from_timestamp = (Utc::now() - from_time).timestamp();
let url = format!(
"http://ws.audioscrobbler.com/2.0/?method=user.getrecenttracks&user={}&api_key={}&format=json&limit=200&from={}",
username, apikey, from_timestamp
);

let response = client.get(&url).send().await?;
let v: Value = response.json().await?;

if let Some(tracks) = v["recenttracks"]["track"].as_array() {
info!("Syncing {} historical tracks...", tracks.len());
for track in tracks.iter().rev() {
let mut event_data: Map<String, Value> = Map::new();

event_data.insert("title".to_string(), track["name"].to_owned());
event_data.insert(
"artist".to_string(),
track["artist"]["#text"].to_owned(),
);
event_data.insert(
"album".to_string(),
track["album"]["#text"].to_owned(),
);

// Get timestamp from the track
if let Some(date) = track["date"]["uts"].as_str() {
if let Ok(timestamp) = date.parse::<i64>() {
// TODO: remove the deprecated from_utc and from_timestamp
let event = Event {
id: None,
timestamp: DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(timestamp, 0), Utc),
duration: TimeDelta::seconds(30),
data: event_data,
};

aw_client
.insert_event("aw-watcher-lastfm", &event)
.await
.unwrap_or_else(|e| {
warn!("Error inserting historical event: {:?}", e);
});
}
}
}
info!("Historical sync completed!");
}

Ok(())
}

fn get_config_path() -> Option<std::path::PathBuf> {
config_dir().map(|mut path| {
path.push("activitywatch");
Expand Down Expand Up @@ -52,17 +128,43 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let args: Vec<String> = env::args().collect();
let mut port: u16 = 5600;
if args.len() > 1 {
for idx in 1..args.len() {
if args[idx] == "--port" {
port = args[idx + 1].parse().expect("Invalid port number");
break;
let mut sync_duration: Option<ChronoDuration> = None;

let mut idx = 1;
while idx < args.len() {
match args[idx].as_str() {
"--port" => {
if idx + 1 < args.len() {
port = args[idx + 1].parse().expect("Invalid port number");
idx += 2;
} else {
panic!("--port requires a value");
}
}
if args[idx] == "--testing" {
"--testing" => {
port = 5699;
idx += 1;
}
"--sync" => {
if idx + 1 < args.len() {
sync_duration = Some(parse_time_string(&args[idx + 1])
.expect("Invalid sync duration format. Use format: 7d, 24h, or 30m"));
idx += 2;
} else {
panic!("--sync requires a duration value (e.g., 7d, 24h, 30m)");
}
}
if args[idx] == "--help" {
println!("Usage: aw-watcher-lastfm-rust [--testing] [--port PORT] [--help]");
"--help" => {
println!("Usage: aw-watcher-lastfm-rust [--testing] [--port PORT] [--sync DURATION] [--help]");
println!("\nOptions:");
println!(" --testing Use testing port (5699)");
println!(" --port PORT Specify custom port");
println!(" --sync DURATION Sync historical data (format: 7d, 24h, 30m)");
println!(" --help Show this help message");
return Ok(());
}
_ => {
println!("Unknown argument: {}", args[idx]);
return Ok(());
}
}
Expand All @@ -75,10 +177,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init_from_env(env);

if !config_path.exists() {
DirBuilder::new()
.recursive(true)
.create(config_dir)
.expect("Unable to create directory");
if !config_dir.exists() {
DirBuilder::new()
.recursive(true)
.create(&config_dir)
.expect("Unable to create directory");
}
let mut file = File::create(&config_path).expect("Unable to create file");
file.write_all(b"apikey: your-api-key\nusername: your_username\npolling_interval: 10")
.expect("Unable to write to file");
Expand Down Expand Up @@ -138,6 +242,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.build()
.unwrap();

// Handle historical sync if requested
if let Some(duration) = sync_duration {
info!("Starting historical sync...");
match sync_historical_data(&client, &aw_client, &username, &apikey, duration).await {
Ok(_) => info!("Historical sync completed successfully"),
Err(e) => warn!("Error during historical sync: {:?}", e),
}
info!("Starting real-time tracking...");
}

loop {
interval.tick().await;

Expand Down

0 comments on commit c44b313

Please sign in to comment.