Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add foundation id to issues search index #596

Merged
merged 1 commit into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions clotributor-registrar/src/registrar.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::db::DynDB;
use anyhow::{format_err, Context, Error, Result};
use anyhow::{bail, format_err, Context, Error, Result};
use config::Config;
use futures::stream::{self, StreamExt};
use reqwest::StatusCode;
Expand Down Expand Up @@ -158,10 +158,10 @@ async fn process_foundation(
// Fetch foundation data file
let resp = http_client.get(foundation.data_url).send().await?;
if resp.status() != StatusCode::OK {
return Err(format_err!(
bail!(
"unexpected status code getting data file: {}",
resp.status()
));
);
}
let data = resp.text().await?;

Expand Down
4 changes: 3 additions & 1 deletion clotributor-tracker/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ impl DB for PgDB {
r.languages,
r.stars,
r.digest,
p.name as project_name
p.name as project_name,
p.foundation_id
from repository r
join project p using (project_id)
where r.tracked_at is null
Expand All @@ -84,6 +85,7 @@ impl DB for PgDB {
stars: row.get("stars"),
digest: row.get("digest"),
project_name: row.get("project_name"),
foundation_id: row.get("foundation_id"),
})
.collect();
Ok(repositories)
Expand Down
6 changes: 3 additions & 3 deletions clotributor-tracker/src/github.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::tracker::Issue;
use anyhow::{format_err, Context, Result};
use anyhow::{bail, format_err, Context, Result};
use async_trait::async_trait;
use graphql_client::{GraphQLQuery, Response};
use lazy_static::lazy_static;
Expand Down Expand Up @@ -147,11 +147,11 @@ impl GH for GHGraphQL {
.await
.context("error querying graphql api")?;
if resp.status() != StatusCode::OK {
return Err(format_err!(
bail!(
"unexpected status code querying graphql api: {} - {}",
resp.status(),
resp.text().await?,
));
);
}

// Parse response body and extract repository data
Expand Down
279 changes: 140 additions & 139 deletions clotributor-tracker/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
db::DynDB,
github::{repo_view, DynGH},
};
use anyhow::{format_err, Context, Error, Result};
use anyhow::{bail, format_err, Context, Error, Result};
use config::Config;
use deadpool::unmanaged::{Object, Pool};
use futures::stream::{self, StreamExt};
Expand All @@ -22,6 +22,140 @@ use uuid::Uuid;
/// Maximum time that can take tracking a single repository.
const REPOSITORY_TRACK_TIMEOUT: u64 = 300;

/// Track repositories that need to be tracked.
#[instrument(skip_all, err)]
pub(crate) async fn run(cfg: &Config, db: DynDB, gh: DynGH) -> Result<()> {
// Setup GitHub tokens pool
let gh_tokens = cfg.get::<Vec<String>>("creds.githubTokens")?;
if gh_tokens.is_empty() {
bail!("GitHub tokens not found in config file (creds.githubTokens)");
}
let gh_tokens_pool = Pool::from(gh_tokens.clone());

// Get repositories to track
debug!("getting repositories to track");
let repositories_to_track = db.get_repositories_to_track().await?;
if repositories_to_track.is_empty() {
info!("no repositories to track, finished");
return Ok(());
}

// Track repositories
info!("tracking repositories");
#[allow(clippy::manual_try_fold)]
let result = stream::iter(repositories_to_track)
.map(|repository| async {
let db = db.clone();
let gh = gh.clone();
let gh_token = gh_tokens_pool.get().await.expect("token -when available-");
let repo_url = repository.url.clone();

match timeout(
Duration::from_secs(REPOSITORY_TRACK_TIMEOUT),
track_repository(db, gh, gh_token, repository),
)
.await
{
Ok(result) => result,
Err(err) => Err(format_err!("{}", err)),
}
.context(format!("error tracking repository {repo_url}"))
})
.buffer_unordered(cfg.get("tracker.concurrency")?)
.collect::<Vec<Result<()>>>()
.await
.into_iter()
.fold(
Ok::<(), Error>(()),
|final_result, task_result| match task_result {
Ok(()) => final_result,
Err(task_err) => match final_result {
Ok(()) => Err(Into::into(task_err)),
Err(final_err) => Err(format_err!("{:#}\n{:#}", final_err, task_err)),
},
},
);

// Check Github API rate limit status for each token
#[cfg(not(test))]
for (i, gh_token) in gh_tokens.into_iter().enumerate() {
let gh_client = github::setup_http_client(&gh_token)?;
let response: Value = gh_client
.get("https://api.github.com/rate_limit")
.send()
.await?
.json()
.await?;
debug!(
token = i,
rate = %response["rate"],
graphql = %response["resources"]["graphql"],
"token github rate limit info"
);
}

info!("finished");
result
}

/// Track repository provided.
#[instrument(fields(url = %repo.url), skip_all, err)]
async fn track_repository(
db: DynDB,
gh: DynGH,
gh_token: Object<String>,
mut repo: Repository,
) -> Result<()> {
let start = Instant::now();
debug!("started");

// Fetch repository data from GitHub
let gh_repo = gh.repository(&gh_token, &repo.url).await?;

// Update repository's GitHub data in db if needed
let changed = repo.update_gh_data(&gh_repo)?;
if changed {
db.update_repository_gh_data(&repo).await?;
debug!("github data updated in database");
}

// Sync issues in GitHub with database
let mut issues_in_gh = gh_repo.issues();
let issues_in_db = db.get_repository_issues(repo.repository_id).await?;

// Register/update new or outdated issues
for issue in &mut issues_in_gh {
let digest_in_db = find_issue(issue.issue_id, &issues_in_db);
if issue.digest != digest_in_db {
db.register_issue(&repo, issue).await?;
debug!(issue.number, "registering issue");
}
}

// Unregister issues no longer available in GitHub
for issue in &issues_in_db {
if find_issue(issue.issue_id, &issues_in_gh).is_none() {
db.unregister_issue(issue.issue_id).await?;
debug!(issue.number, "unregistering issue");
}
}

// Update repository's last track timestamp in db
db.update_repository_last_track_ts(repo.repository_id)
.await?;

debug!(duration_ms = start.elapsed().as_millis(), "completed");
Ok(())
}

/// Find an issue in the provided collection, returning its digest if found.
fn find_issue(issue_id: i64, issues: &[Issue]) -> Option<String> {
issues
.iter()
.find(|i| i.issue_id == issue_id)
.map(|i| i.digest.clone().expect("to be present"))
}

/// Repository information.
#[derive(Debug, Clone, PartialEq, Default)]
#[allow(clippy::struct_field_names)]
Expand All @@ -36,6 +170,7 @@ pub(crate) struct Repository {
pub stars: Option<i32>,
pub digest: Option<String>,
pub project_name: String,
pub foundation_id: String,
}

impl Repository {
Expand Down Expand Up @@ -163,7 +298,8 @@ impl Issue {

// Weight B
let weight_b = format!(
"{} {} {} {}",
"{} {} {} {} {}",
&repo.foundation_id,
&repo.name,
&repo.description.clone().unwrap_or_default(),
&repo
Expand Down Expand Up @@ -259,142 +395,6 @@ pub(crate) struct IssueTsTexts {
pub weight_c: String,
}

/// Track repositories that need to be tracked.
#[instrument(skip_all, err)]
pub(crate) async fn run(cfg: &Config, db: DynDB, gh: DynGH) -> Result<()> {
// Setup GitHub tokens pool
let gh_tokens = cfg.get::<Vec<String>>("creds.githubTokens")?;
if gh_tokens.is_empty() {
return Err(format_err!(
"GitHub tokens not found in config file (creds.githubTokens)"
));
}
let gh_tokens_pool = Pool::from(gh_tokens.clone());

// Get repositories to track
debug!("getting repositories to track");
let repositories_to_track = db.get_repositories_to_track().await?;
if repositories_to_track.is_empty() {
info!("no repositories to track, finished");
return Ok(());
}

// Track repositories
info!("tracking repositories");
#[allow(clippy::manual_try_fold)]
let result = stream::iter(repositories_to_track)
.map(|repository| async {
let db = db.clone();
let gh = gh.clone();
let gh_token = gh_tokens_pool.get().await.expect("token -when available-");
let repo_url = repository.url.clone();

match timeout(
Duration::from_secs(REPOSITORY_TRACK_TIMEOUT),
track_repository(db, gh, gh_token, repository),
)
.await
{
Ok(result) => result,
Err(err) => Err(format_err!("{}", err)),
}
.context(format!("error tracking repository {repo_url}"))
})
.buffer_unordered(cfg.get("tracker.concurrency")?)
.collect::<Vec<Result<()>>>()
.await
.into_iter()
.fold(
Ok::<(), Error>(()),
|final_result, task_result| match task_result {
Ok(()) => final_result,
Err(task_err) => match final_result {
Ok(()) => Err(Into::into(task_err)),
Err(final_err) => Err(format_err!("{:#}\n{:#}", final_err, task_err)),
},
},
);

// Check Github API rate limit status for each token
#[cfg(not(test))]
for (i, gh_token) in gh_tokens.into_iter().enumerate() {
let gh_client = github::setup_http_client(&gh_token)?;
let response: Value = gh_client
.get("https://api.github.com/rate_limit")
.send()
.await?
.json()
.await?;
debug!(
token = i,
rate = %response["rate"],
graphql = %response["resources"]["graphql"],
"token github rate limit info"
);
}

info!("finished");
result
}

/// Track repository provided.
#[instrument(fields(url = %repo.url), skip_all, err)]
async fn track_repository(
db: DynDB,
gh: DynGH,
gh_token: Object<String>,
mut repo: Repository,
) -> Result<()> {
let start = Instant::now();
debug!("started");

// Fetch repository data from GitHub
let gh_repo = gh.repository(&gh_token, &repo.url).await?;

// Update repository's GitHub data in db if needed
let changed = repo.update_gh_data(&gh_repo)?;
if changed {
db.update_repository_gh_data(&repo).await?;
debug!("github data updated in database");
}

// Sync issues in GitHub with database
let mut issues_in_gh = gh_repo.issues();
let issues_in_db = db.get_repository_issues(repo.repository_id).await?;

// Register/update new or outdated issues
for issue in &mut issues_in_gh {
let digest_in_db = find_issue(issue.issue_id, &issues_in_db);
if issue.digest != digest_in_db {
db.register_issue(&repo, issue).await?;
debug!(issue.number, "registering issue");
}
}

// Unregister issues no longer available in GitHub
for issue in &issues_in_db {
if find_issue(issue.issue_id, &issues_in_gh).is_none() {
db.unregister_issue(issue.issue_id).await?;
debug!(issue.number, "unregistering issue");
}
}

// Update repository's last track timestamp in db
db.update_repository_last_track_ts(repo.repository_id)
.await?;

debug!(duration_ms = start.elapsed().as_millis(), "completed");
Ok(())
}

/// Find an issue in the provided collection, returning its digest if found.
fn find_issue(issue_id: i64, issues: &[Issue]) -> Option<String> {
issues
.iter()
.find(|i| i.issue_id == issue_id)
.map(|i| i.digest.clone().expect("to be present"))
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -518,6 +518,7 @@ mod tests {
topics: Some(vec!["topic1".to_string(), "topic2".to_string()]),
languages: Some(vec!["language1".to_string()]),
project_name: "project".to_string(),
foundation_id: "foundation".to_string(),
..Default::default()
};
let issue = Issue {
Expand All @@ -540,7 +541,7 @@ mod tests {
issue.prepare_ts_texts(&repo),
IssueTsTexts {
weight_a: "project".to_string(),
weight_b: "repo description topic1 topic2 language1".to_string(),
weight_b: "foundation repo description topic1 topic2 language1".to_string(),
weight_c: "issue1 label1 label2".to_string(),
},
);
Expand Down