Skip to content

Commit

Permalink
Merge pull request #6 from samply/feature/wait
Browse files Browse the repository at this point in the history
wait count, logging
  • Loading branch information
enola-dkfz authored Mar 14, 2024
2 parents ceebff3 + 58db4b9 commit c7fb70d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 9 deletions.
6 changes: 0 additions & 6 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ struct CliArgs {
#[clap(long, env, value_parser)]
sites: String,

/// Wait for results count
#[clap(long, env, value_parser, default_value = "32")]
wait_count: usize,

/// Credentials to use on the Beam Proxy
#[clap(long, env, value_parser = parse_cors)]
pub cors_origin: AllowOrigin,
Expand All @@ -73,7 +69,6 @@ pub(crate) struct Config {
pub beam_app_id_long: AppId,
pub api_key: String,
pub sites: Vec<String>,
pub wait_count: usize,
pub cors_origin: AllowOrigin,
pub project: String,
pub bind_addr: SocketAddr,
Expand All @@ -90,7 +85,6 @@ impl Config {
beam_app_id_long: AppId::new_unchecked(cli_args.beam_app_id_long),
api_key: cli_args.api_key,
sites: cli_args.sites.split(';').map(|s| s.to_string()).collect(),
wait_count: cli_args.wait_count,
cors_origin: cli_args.cors_origin,
project: cli_args.project,
bind_addr: cli_args.bind_addr,
Expand Down
14 changes: 11 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ pub async fn main() {
exit(2);
}

info!("Beam ready");

spawn_site_querying(shared_state.clone());

let cors = CorsLayer::new()
Expand Down Expand Up @@ -185,14 +187,19 @@ async fn post_query(shared_state: SharedState, sites: Vec<String>) -> Result<(),
info!("No sites to query");
return Ok(());
}
let wait_count = sites.len();
let site_display = sites.join(", ");
let task = create_beam_task(sites);
info!("Querying sites {:?}", site_display);
BEAM_CLIENT
.post_task(&task)
.await
.map_err(|e| PrismError::BeamError(format!("Unable to post a query: {}", e)))?;

info!("Posted task {}", task.id);

tokio::spawn(async move {
if let Err(e) = get_results(shared_state, task.id).await {
if let Err(e) = get_results(shared_state, task.id, wait_count).await {
warn!("Failed to get results for {}: {e}", task.id);
}
});
Expand Down Expand Up @@ -224,13 +231,13 @@ async fn query_sites(
Ok(())
}

async fn get_results(shared_state: SharedState, task_id: MsgId) -> Result<(), PrismError> {
async fn get_results(shared_state: SharedState, task_id: MsgId, wait_count: usize) -> Result<(), PrismError> {
let resp = BEAM_CLIENT
.raw_beam_request(
Method::GET,
&format!(
"v1/tasks/{}/results?wait_count={}",
task_id, CONFIG.wait_count
task_id, wait_count
),
)
.header(
Expand Down Expand Up @@ -282,6 +289,7 @@ async fn get_results(shared_state: SharedState, task_id: MsgId) -> Result<(), Pr
from.as_ref().split('.').nth(1).unwrap().to_string(), // extracting site name from app long name
(criteria, std::time::SystemTime::now()),
);
info!("Cached results from site {} for task {}", from.as_ref().split('.').nth(1).unwrap().to_string(), task_id);
}
Ok(())
}
Expand Down

0 comments on commit c7fb70d

Please sign in to comment.