diff --git a/clotributor-registrar/src/registrar.rs b/clotributor-registrar/src/registrar.rs index 5e7d3fe..ac8358d 100644 --- a/clotributor-registrar/src/registrar.rs +++ b/clotributor-registrar/src/registrar.rs @@ -1,5 +1,5 @@ use crate::db::DynDB; -use anyhow::{format_err, Context, Error, Result}; +use anyhow::{bail, format_err, Context, Error, Result}; use config::Config; use futures::stream::{self, StreamExt}; use reqwest::StatusCode; @@ -158,10 +158,10 @@ async fn process_foundation( // Fetch foundation data file let resp = http_client.get(foundation.data_url).send().await?; if resp.status() != StatusCode::OK { - return Err(format_err!( + bail!( "unexpected status code getting data file: {}", resp.status() - )); + ); } let data = resp.text().await?; diff --git a/clotributor-tracker/src/db.rs b/clotributor-tracker/src/db.rs index c97f224..9aa9a64 100644 --- a/clotributor-tracker/src/db.rs +++ b/clotributor-tracker/src/db.rs @@ -62,7 +62,8 @@ impl DB for PgDB { r.languages, r.stars, r.digest, - p.name as project_name + p.name as project_name, + p.foundation_id from repository r join project p using (project_id) where r.tracked_at is null @@ -84,6 +85,7 @@ impl DB for PgDB { stars: row.get("stars"), digest: row.get("digest"), project_name: row.get("project_name"), + foundation_id: row.get("foundation_id"), }) .collect(); Ok(repositories) diff --git a/clotributor-tracker/src/github.rs b/clotributor-tracker/src/github.rs index dc9717c..14a565b 100644 --- a/clotributor-tracker/src/github.rs +++ b/clotributor-tracker/src/github.rs @@ -1,5 +1,5 @@ use crate::tracker::Issue; -use anyhow::{format_err, Context, Result}; +use anyhow::{bail, format_err, Context, Result}; use async_trait::async_trait; use graphql_client::{GraphQLQuery, Response}; use lazy_static::lazy_static; @@ -147,11 +147,11 @@ impl GH for GHGraphQL { .await .context("error querying graphql api")?; if resp.status() != StatusCode::OK { - return Err(format_err!( + bail!( "unexpected status code querying graphql api: {} - {}", resp.status(), resp.text().await?, - )); + ); } // Parse response body and extract repository data diff --git a/clotributor-tracker/src/tracker.rs b/clotributor-tracker/src/tracker.rs index 5dee6ef..25ee859 100644 --- a/clotributor-tracker/src/tracker.rs +++ b/clotributor-tracker/src/tracker.rs @@ -4,7 +4,7 @@ use crate::{ db::DynDB, github::{repo_view, DynGH}, }; -use anyhow::{format_err, Context, Error, Result}; +use anyhow::{bail, format_err, Context, Error, Result}; use config::Config; use deadpool::unmanaged::{Object, Pool}; use futures::stream::{self, StreamExt}; @@ -22,6 +22,140 @@ use uuid::Uuid; /// Maximum time that can take tracking a single repository. const REPOSITORY_TRACK_TIMEOUT: u64 = 300; +/// Track repositories that need to be tracked. +#[instrument(skip_all, err)] +pub(crate) async fn run(cfg: &Config, db: DynDB, gh: DynGH) -> Result<()> { + // Setup GitHub tokens pool + let gh_tokens = cfg.get::>("creds.githubTokens")?; + if gh_tokens.is_empty() { + bail!("GitHub tokens not found in config file (creds.githubTokens)"); + } + let gh_tokens_pool = Pool::from(gh_tokens.clone()); + + // Get repositories to track + debug!("getting repositories to track"); + let repositories_to_track = db.get_repositories_to_track().await?; + if repositories_to_track.is_empty() { + info!("no repositories to track, finished"); + return Ok(()); + } + + // Track repositories + info!("tracking repositories"); + #[allow(clippy::manual_try_fold)] + let result = stream::iter(repositories_to_track) + .map(|repository| async { + let db = db.clone(); + let gh = gh.clone(); + let gh_token = gh_tokens_pool.get().await.expect("token -when available-"); + let repo_url = repository.url.clone(); + + match timeout( + Duration::from_secs(REPOSITORY_TRACK_TIMEOUT), + track_repository(db, gh, gh_token, repository), + ) + .await + { + Ok(result) => result, + Err(err) => Err(format_err!("{}", err)), + } + .context(format!("error tracking repository {repo_url}")) + }) + .buffer_unordered(cfg.get("tracker.concurrency")?) + .collect::>>() + .await + .into_iter() + .fold( + Ok::<(), Error>(()), + |final_result, task_result| match task_result { + Ok(()) => final_result, + Err(task_err) => match final_result { + Ok(()) => Err(Into::into(task_err)), + Err(final_err) => Err(format_err!("{:#}\n{:#}", final_err, task_err)), + }, + }, + ); + + // Check Github API rate limit status for each token + #[cfg(not(test))] + for (i, gh_token) in gh_tokens.into_iter().enumerate() { + let gh_client = github::setup_http_client(&gh_token)?; + let response: Value = gh_client + .get("https://api.github.com/rate_limit") + .send() + .await? + .json() + .await?; + debug!( + token = i, + rate = %response["rate"], + graphql = %response["resources"]["graphql"], + "token github rate limit info" + ); + } + + info!("finished"); + result +} + +/// Track repository provided. +#[instrument(fields(url = %repo.url), skip_all, err)] +async fn track_repository( + db: DynDB, + gh: DynGH, + gh_token: Object, + mut repo: Repository, +) -> Result<()> { + let start = Instant::now(); + debug!("started"); + + // Fetch repository data from GitHub + let gh_repo = gh.repository(&gh_token, &repo.url).await?; + + // Update repository's GitHub data in db if needed + let changed = repo.update_gh_data(&gh_repo)?; + if changed { + db.update_repository_gh_data(&repo).await?; + debug!("github data updated in database"); + } + + // Sync issues in GitHub with database + let mut issues_in_gh = gh_repo.issues(); + let issues_in_db = db.get_repository_issues(repo.repository_id).await?; + + // Register/update new or outdated issues + for issue in &mut issues_in_gh { + let digest_in_db = find_issue(issue.issue_id, &issues_in_db); + if issue.digest != digest_in_db { + db.register_issue(&repo, issue).await?; + debug!(issue.number, "registering issue"); + } + } + + // Unregister issues no longer available in GitHub + for issue in &issues_in_db { + if find_issue(issue.issue_id, &issues_in_gh).is_none() { + db.unregister_issue(issue.issue_id).await?; + debug!(issue.number, "unregistering issue"); + } + } + + // Update repository's last track timestamp in db + db.update_repository_last_track_ts(repo.repository_id) + .await?; + + debug!(duration_ms = start.elapsed().as_millis(), "completed"); + Ok(()) +} + +/// Find an issue in the provided collection, returning its digest if found. +fn find_issue(issue_id: i64, issues: &[Issue]) -> Option { + issues + .iter() + .find(|i| i.issue_id == issue_id) + .map(|i| i.digest.clone().expect("to be present")) +} + /// Repository information. #[derive(Debug, Clone, PartialEq, Default)] #[allow(clippy::struct_field_names)] @@ -36,6 +170,7 @@ pub(crate) struct Repository { pub stars: Option, pub digest: Option, pub project_name: String, + pub foundation_id: String, } impl Repository { @@ -163,7 +298,8 @@ impl Issue { // Weight B let weight_b = format!( - "{} {} {} {}", + "{} {} {} {} {}", + &repo.foundation_id, &repo.name, &repo.description.clone().unwrap_or_default(), &repo @@ -259,142 +395,6 @@ pub(crate) struct IssueTsTexts { pub weight_c: String, } -/// Track repositories that need to be tracked. -#[instrument(skip_all, err)] -pub(crate) async fn run(cfg: &Config, db: DynDB, gh: DynGH) -> Result<()> { - // Setup GitHub tokens pool - let gh_tokens = cfg.get::>("creds.githubTokens")?; - if gh_tokens.is_empty() { - return Err(format_err!( - "GitHub tokens not found in config file (creds.githubTokens)" - )); - } - let gh_tokens_pool = Pool::from(gh_tokens.clone()); - - // Get repositories to track - debug!("getting repositories to track"); - let repositories_to_track = db.get_repositories_to_track().await?; - if repositories_to_track.is_empty() { - info!("no repositories to track, finished"); - return Ok(()); - } - - // Track repositories - info!("tracking repositories"); - #[allow(clippy::manual_try_fold)] - let result = stream::iter(repositories_to_track) - .map(|repository| async { - let db = db.clone(); - let gh = gh.clone(); - let gh_token = gh_tokens_pool.get().await.expect("token -when available-"); - let repo_url = repository.url.clone(); - - match timeout( - Duration::from_secs(REPOSITORY_TRACK_TIMEOUT), - track_repository(db, gh, gh_token, repository), - ) - .await - { - Ok(result) => result, - Err(err) => Err(format_err!("{}", err)), - } - .context(format!("error tracking repository {repo_url}")) - }) - .buffer_unordered(cfg.get("tracker.concurrency")?) - .collect::>>() - .await - .into_iter() - .fold( - Ok::<(), Error>(()), - |final_result, task_result| match task_result { - Ok(()) => final_result, - Err(task_err) => match final_result { - Ok(()) => Err(Into::into(task_err)), - Err(final_err) => Err(format_err!("{:#}\n{:#}", final_err, task_err)), - }, - }, - ); - - // Check Github API rate limit status for each token - #[cfg(not(test))] - for (i, gh_token) in gh_tokens.into_iter().enumerate() { - let gh_client = github::setup_http_client(&gh_token)?; - let response: Value = gh_client - .get("https://api.github.com/rate_limit") - .send() - .await? - .json() - .await?; - debug!( - token = i, - rate = %response["rate"], - graphql = %response["resources"]["graphql"], - "token github rate limit info" - ); - } - - info!("finished"); - result -} - -/// Track repository provided. -#[instrument(fields(url = %repo.url), skip_all, err)] -async fn track_repository( - db: DynDB, - gh: DynGH, - gh_token: Object, - mut repo: Repository, -) -> Result<()> { - let start = Instant::now(); - debug!("started"); - - // Fetch repository data from GitHub - let gh_repo = gh.repository(&gh_token, &repo.url).await?; - - // Update repository's GitHub data in db if needed - let changed = repo.update_gh_data(&gh_repo)?; - if changed { - db.update_repository_gh_data(&repo).await?; - debug!("github data updated in database"); - } - - // Sync issues in GitHub with database - let mut issues_in_gh = gh_repo.issues(); - let issues_in_db = db.get_repository_issues(repo.repository_id).await?; - - // Register/update new or outdated issues - for issue in &mut issues_in_gh { - let digest_in_db = find_issue(issue.issue_id, &issues_in_db); - if issue.digest != digest_in_db { - db.register_issue(&repo, issue).await?; - debug!(issue.number, "registering issue"); - } - } - - // Unregister issues no longer available in GitHub - for issue in &issues_in_db { - if find_issue(issue.issue_id, &issues_in_gh).is_none() { - db.unregister_issue(issue.issue_id).await?; - debug!(issue.number, "unregistering issue"); - } - } - - // Update repository's last track timestamp in db - db.update_repository_last_track_ts(repo.repository_id) - .await?; - - debug!(duration_ms = start.elapsed().as_millis(), "completed"); - Ok(()) -} - -/// Find an issue in the provided collection, returning its digest if found. -fn find_issue(issue_id: i64, issues: &[Issue]) -> Option { - issues - .iter() - .find(|i| i.issue_id == issue_id) - .map(|i| i.digest.clone().expect("to be present")) -} - #[cfg(test)] mod tests { use super::*; @@ -518,6 +518,7 @@ mod tests { topics: Some(vec!["topic1".to_string(), "topic2".to_string()]), languages: Some(vec!["language1".to_string()]), project_name: "project".to_string(), + foundation_id: "foundation".to_string(), ..Default::default() }; let issue = Issue { @@ -540,7 +541,7 @@ mod tests { issue.prepare_ts_texts(&repo), IssueTsTexts { weight_a: "project".to_string(), - weight_b: "repo description topic1 topic2 language1".to_string(), + weight_b: "foundation repo description topic1 topic2 language1".to_string(), weight_c: "issue1 label1 label2".to_string(), }, );