From 58c2813c8679a6ee0d2f7a584e6f3e5f4f0968ed Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Thu, 29 Aug 2024 17:01:31 +0200 Subject: [PATCH] upgrade http ecosystem to hyper 1 --- Cargo.toml | 10 ++++------ src/config.rs | 2 +- src/main.rs | 55 +++++++++++++++++++++++++++++---------------------- 3 files changed, 36 insertions(+), 31 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 116bf5f..e0cdeaa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,11 +7,9 @@ license = "Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -axum = "0.6" -axum-macros = "0.4.1" -base64 = { version = "0.21.0", default_features = false } -http = "0.2" -reqwest = { version = "0.11.20", default_features = false, features = ["json", "default-tls", "stream"] } +axum = "0.7" +base64 = "0.22.1" +reqwest = { version = "0.12", default_features = false, features = ["json", "default-tls", "stream"] } serde = { version = "1.0.152", features = ["serde_derive"] } serde_json = "1.0.96" thiserror = "1.0.38" @@ -19,7 +17,7 @@ rand = { default-features = false, version = "0.8.5" } chrono = "0.4.31" tokio = { version = "1.25.0", default_features = false, features = ["signal", "rt-multi-thread", "macros"] } beam-lib = { git = "https://github.com/samply/beam", branch = "develop", features = ["http-util"] } -tower-http = { version = "0.4.4", features = ["cors"] } +tower-http = { version = "0.5", features = ["cors"] } async-sse = "5.1.0" anyhow = "1" futures-util = { version = "0.3", features = ["io"] } diff --git a/src/config.rs b/src/config.rs index 1943c41..735671c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -116,7 +116,7 @@ fn get_query_unencoded() -> String { ) } -fn parse_cors(v: &str) -> Result { +fn parse_cors(v: &str) -> Result { if v == "*" || v.to_lowercase() == "any" { Ok(AllowOrigin::any()) } else { diff --git a/src/main.rs b/src/main.rs index 2c196ac..3a5b03c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,27 +7,25 @@ mod mr; use crate::errors::PrismError; use crate::{config::CONFIG, mr::MeasureReport}; -use base64::engine::general_purpose::STANDARD as BASE64; +use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _}; use futures_util::{StreamExt as _, TryStreamExt}; -use http::HeaderValue; use std::collections::HashSet; use std::io; use std::process::exit; use std::sync::Arc; use std::time::SystemTime; -use tokio::sync::Mutex; +use tokio::{net::TcpListener, sync::Mutex}; use axum::{ - extract::State, - http::{header, StatusCode}, + extract::{Json, State}, + http::StatusCode, response::{IntoResponse, Response}, routing::post, - Json, Router, + Router, }; +use reqwest::{header, header::HeaderValue, Method}; -use base64::Engine as _; use once_cell::sync::Lazy; -use reqwest::Method; use serde::{Deserialize, Serialize}; use beam::create_beam_task; @@ -35,7 +33,7 @@ use beam_lib::{AppId, BeamClient, MsgId}; use criteria::{combine_groups_of_criteria_groups, CriteriaGroups}; use std::{collections::HashMap, time::Duration}; use tower_http::cors::CorsLayer; -use tracing::{error, info, warn, debug}; +use tracing::{debug, error, info, warn}; use beam_lib::{RawString, TaskResult}; @@ -49,7 +47,7 @@ static BEAM_CLIENT: Lazy = Lazy::new(|| { #[derive(Serialize, Deserialize, Clone, Debug)] struct LensQuery { - sites: Vec, + sites: Vec, } type Site = String; @@ -111,7 +109,7 @@ pub async fn main() { spawn_site_querying(shared_state.clone()); let cors = CorsLayer::new() - .allow_methods([http::Method::GET, http::Method::POST]) + .allow_methods([Method::GET, Method::POST]) .allow_origin(CONFIG.cors_origin.clone()) .allow_headers([header::CONTENT_TYPE]); @@ -120,10 +118,12 @@ pub async fn main() { .with_state(shared_state) .layer(cors); - axum::Server::bind(&CONFIG.bind_addr) - .serve(app.into_make_service()) - .await - .unwrap() + axum::serve( + TcpListener::bind(CONFIG.bind_addr).await.unwrap(), + app.into_make_service(), + ) + .await + .unwrap() } fn spawn_site_querying(shared_state: SharedState) { @@ -164,9 +164,11 @@ async fn handle_get_criteria( if SystemTime::now().duration_since(cached.1).unwrap() < CRITERIACACHE_TTL { Some(cached.0.clone()) } else { - debug!("Results for site {} in cache sadly expired, will query again", &site); + debug!( + "Results for site {} in cache sadly expired, will query again", + &site + ); None - } } None => { @@ -246,14 +248,15 @@ async fn query_sites( Ok(()) } -async fn get_results(shared_state: SharedState, task_id: MsgId, wait_count: usize) -> Result<(), PrismError> { +async fn get_results( + shared_state: SharedState, + task_id: MsgId, + wait_count: usize, +) -> Result<(), PrismError> { let resp = BEAM_CLIENT .raw_beam_request( Method::GET, - &format!( - "v1/tasks/{}/results?wait_count={}", - task_id, wait_count - ), + &format!("v1/tasks/{}/results?wait_count={}", task_id, wait_count), ) .header( header::ACCEPT, @@ -304,7 +307,11 @@ async fn get_results(shared_state: SharedState, task_id: MsgId, wait_count: usiz from.as_ref().split('.').nth(1).unwrap().to_string(), // extracting site name from app long name (criteria, std::time::SystemTime::now()), ); - info!("Cached results from site {} for task {}", from.as_ref().split('.').nth(1).unwrap().to_string(), task_id); + info!( + "Cached results from site {} for task {}", + from.as_ref().split('.').nth(1).unwrap().to_string(), + task_id + ); } Ok(()) } @@ -334,7 +341,7 @@ async fn wait_for_beam_proxy() -> beam_lib::Result<()> { loop { match reqwest::get(format!("{}v1/health", CONFIG.beam_proxy_url)).await { //FIXME why doesn't it work with url from config - Ok(res) if res.status() == StatusCode::OK => return Ok(()), + Ok(res) if res.status() == reqwest::StatusCode::OK => return Ok(()), _ if tries <= MAX_RETRIES => tries += 1, Err(e) => return Err(e.into()), Ok(res) => {