diff --git a/Cargo.lock b/Cargo.lock index b7d90accd..0acadff88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -48,7 +48,7 @@ dependencies = [ "ahash", "base64 0.22.0", "bitflags 2.5.0", - "brotli", + "brotli 6.0.0", "bytes", "bytestring", "derive_more", @@ -681,6 +681,7 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07dbbf24db18d609b1462965249abdf49129ccad073ec257da372adc83259c60" dependencies = [ + "brotli 4.0.0", "bzip2", "flate2", "futures-core", @@ -892,6 +893,17 @@ dependencies = [ "generic-array", ] +[[package]] +name = "brotli" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "125740193d7fee5cc63ab9e16c2fdc4e07c74ba755cc53b327d6ea029e9fc569" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor 3.0.0", +] + [[package]] name = "brotli" version = "6.0.0" @@ -900,7 +912,17 @@ checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", - "brotli-decompressor", + "brotli-decompressor 4.0.1", +] + +[[package]] +name = "brotli-decompressor" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65622a320492e09b5e0ac436b14c54ff68199bac392d0e89a6832c4518eea525" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", ] [[package]] @@ -3122,7 +3144,7 @@ dependencies = [ "arrow-schema", "arrow-select", "base64 0.22.0", - "brotli", + "brotli 6.0.0", "bytes", "chrono", "flate2", @@ -3786,6 +3808,7 @@ version = "0.11.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" dependencies = [ + "async-compression", "base64 0.21.7", "bytes", "encoding_rs", @@ -3812,6 +3835,7 @@ dependencies = [ "system-configuration", "tokio", "tokio-rustls 0.24.1", + "tokio-util", "tower-service", "url", "wasm-bindgen", diff --git a/Cargo.toml b/Cargo.toml index d054735a8..c93f90029 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,8 @@ relative-path = { version = "1.7", features = ["serde"] } reqwest = { version = "0.11.27", default-features = false, features = [ "rustls-tls", "json", + "gzip", + "brotli", ] } # cannot update cause rustls is not latest `see rustls` rustls = "0.22.4" # cannot update to 0.23 actix has not caught up yet rustls-pemfile = "2.1.2" diff --git a/src/analytics.rs b/src/analytics.rs index 3c4d05c3c..85b14b87a 100644 --- a/src/analytics.rs +++ b/src/analytics.rs @@ -22,8 +22,8 @@ use crate::handlers::http::cluster::utils::check_liveness; use crate::handlers::http::{base_path_without_preceding_slash, cluster}; use crate::handlers::STREAM_NAME_HEADER_KEY; use crate::option::{Mode, CONFIG}; -use crate::storage; use crate::{metadata, stats}; +use crate::{storage, HTTP_CLIENT}; use crate::stats::Stats; use actix_web::{web, HttpRequest, Responder}; @@ -132,9 +132,7 @@ impl Report { } pub async fn send(&self) { - let client = reqwest::Client::new(); - - let _ = client + let _ = HTTP_CLIENT .post(ANALYTICS_SERVER_URL) .header(STREAM_NAME_HEADER_KEY, "serverusageevent") .json(&self) @@ -240,7 +238,7 @@ async fn fetch_ingestors_metrics( )) .expect("Should be a valid URL"); - let resp = reqwest::Client::new() + let resp = HTTP_CLIENT .get(uri) .header(header::AUTHORIZATION, im.token.clone()) .header(header::CONTENT_TYPE, "application/json") diff --git a/src/audit.rs b/src/audit.rs index 0cddb602b..9016efc30 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -21,12 +21,11 @@ use std::{ fmt::{Debug, Display}, }; -use crate::{about::current, storage::StorageMetadata}; +use crate::{about::current, storage::StorageMetadata, HTTP_CLIENT}; use super::option::CONFIG; use chrono::{DateTime, Utc}; use once_cell::sync::Lazy; -use reqwest::Client; use serde::Serialize; use serde_json::{json, Value}; use tracing::error; @@ -38,7 +37,6 @@ static AUDIT_LOGGER: Lazy> = Lazy::new(AuditLogger::new); // AuditLogger handles sending audit logs to a remote logging system pub struct AuditLogger { - client: Client, log_endpoint: Url, } @@ -62,16 +60,12 @@ impl AuditLogger { } }; - Some(AuditLogger { - client: reqwest::Client::new(), - log_endpoint, - }) + Some(AuditLogger { log_endpoint }) } // Sends the audit log to the configured endpoint with proper authentication async fn send_log(&self, json: Value) { - let mut req = self - .client + let mut req = HTTP_CLIENT .post(self.log_endpoint.as_str()) .json(&json) .header("x-p-stream", "audit_log"); diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index ff520af9a..4e936c79d 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -33,6 +33,7 @@ use crate::stats::Stats; use crate::storage::object_storage::ingestor_metadata_path; use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY}; use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY}; +use crate::HTTP_CLIENT; use actix_web::http::header::{self, HeaderMap}; use actix_web::{HttpRequest, Responder}; use bytes::Bytes; @@ -76,8 +77,6 @@ pub async fn sync_streams_with_ingestors( StreamError::Anyhow(err) })?; - let client = reqwest::Client::new(); - for ingestor in ingestor_infos { if !utils::check_liveness(&ingestor.domain_name).await { warn!("Ingestor {} is not live", ingestor.domain_name); @@ -89,7 +88,7 @@ pub async fn sync_streams_with_ingestors( base_path_without_preceding_slash(), stream_name ); - let res = client + let res = HTTP_CLIENT .put(url) .headers(reqwest_headers.clone()) .header(header::AUTHORIZATION, &ingestor.token) @@ -126,7 +125,6 @@ pub async fn sync_users_with_roles_with_ingestors( RBACError::Anyhow(err) })?; - let client = reqwest::Client::new(); let role = to_vec(&role.clone()).map_err(|err| { error!("Fatal: failed to serialize role: {:?}", err); RBACError::SerdeError(err) @@ -143,7 +141,7 @@ pub async fn sync_users_with_roles_with_ingestors( username ); - let res = client + let res = HTTP_CLIENT .put(url) .header(header::AUTHORIZATION, &ingestor.token) .header(header::CONTENT_TYPE, "application/json") @@ -177,7 +175,6 @@ pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(), RBACError::Anyhow(err) })?; - let client = reqwest::Client::new(); for ingestor in ingestor_infos.iter() { if !utils::check_liveness(&ingestor.domain_name).await { warn!("Ingestor {} is not live", ingestor.domain_name); @@ -190,7 +187,7 @@ pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(), username ); - let res = client + let res = HTTP_CLIENT .delete(url) .header(header::AUTHORIZATION, &ingestor.token) .send() @@ -231,7 +228,6 @@ pub async fn sync_user_creation_with_ingestors( user.roles.clone_from(role); } let username = user.username(); - let client = reqwest::Client::new(); let user = to_vec(&user).map_err(|err| { error!("Fatal: failed to serialize user: {:?}", err); @@ -250,7 +246,7 @@ pub async fn sync_user_creation_with_ingestors( username ); - let res = client + let res = HTTP_CLIENT .post(url) .header(header::AUTHORIZATION, &ingestor.token) .header(header::CONTENT_TYPE, "application/json") @@ -283,7 +279,6 @@ pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(), error!("Fatal: failed to get ingestor info: {:?}", err); RBACError::Anyhow(err) })?; - let client = reqwest::Client::new(); for ingestor in ingestor_infos.iter() { if !utils::check_liveness(&ingestor.domain_name).await { @@ -297,7 +292,7 @@ pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(), username ); - let res = client + let res = HTTP_CLIENT .post(url) .header(header::AUTHORIZATION, &ingestor.token) .header(header::CONTENT_TYPE, "application/json") @@ -338,7 +333,6 @@ pub async fn sync_role_update_with_ingestors( RoleError::SerdeError(err) })?; let roles = Bytes::from(roles); - let client = reqwest::Client::new(); for ingestor in ingestor_infos.iter() { if !utils::check_liveness(&ingestor.domain_name).await { @@ -352,7 +346,7 @@ pub async fn sync_role_update_with_ingestors( name ); - let res = client + let res = HTTP_CLIENT .put(url) .header(header::AUTHORIZATION, &ingestor.token) .header(header::CONTENT_TYPE, "application/json") @@ -401,7 +395,7 @@ pub async fn fetch_daily_stats_from_ingestors( StreamError::Anyhow(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err)) })?; - let res = reqwest::Client::new() + let res = HTTP_CLIENT .get(uri) .header(header::AUTHORIZATION, &ingestor.token) .header(header::CONTENT_TYPE, "application/json") @@ -512,8 +506,7 @@ pub async fn send_stream_delete_request( if !utils::check_liveness(&ingestor.domain_name).await { return Ok(()); } - let client = reqwest::Client::new(); - let resp = client + let resp = HTTP_CLIENT .delete(url) .header(header::CONTENT_TYPE, "application/json") .header(header::AUTHORIZATION, ingestor.token) @@ -551,8 +544,7 @@ pub async fn send_retention_cleanup_request( if !utils::check_liveness(&ingestor.domain_name).await { return Ok(first_event_at); } - let client = reqwest::Client::new(); - let resp = client + let resp = HTTP_CLIENT .post(url) .header(header::CONTENT_TYPE, "application/json") .header(header::AUTHORIZATION, ingestor.token) @@ -603,7 +595,7 @@ pub async fn get_cluster_info() -> Result { )) .expect("should always be a valid url"); - let resp = reqwest::Client::new() + let resp = HTTP_CLIENT .get(uri) .header(header::AUTHORIZATION, ingestor.token.clone()) .header(header::CONTENT_TYPE, "application/json") @@ -752,7 +744,7 @@ async fn fetch_cluster_metrics() -> Result, PostError> { PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err)) })?; - let res = reqwest::Client::new() + let res = HTTP_CLIENT .get(uri) .header(header::AUTHORIZATION, &ingestor.token) .header(header::CONTENT_TYPE, "application/json") diff --git a/src/handlers/http/cluster/utils.rs b/src/handlers/http/cluster/utils.rs index 41f0e91eb..0e64cd283 100644 --- a/src/handlers/http/cluster/utils.rs +++ b/src/handlers/http/cluster/utils.rs @@ -16,8 +16,11 @@ * */ -use crate::handlers::http::{ - base_path_without_preceding_slash, logstream::error::StreamError, modal::IngestorMetadata, +use crate::{ + handlers::http::{ + base_path_without_preceding_slash, logstream::error::StreamError, modal::IngestorMetadata, + }, + HTTP_CLIENT, }; use actix_web::http::header; use chrono::{DateTime, Utc}; @@ -235,13 +238,13 @@ pub async fn check_liveness(domain_name: &str) -> bool { } }; - let reqw = reqwest::Client::new() + let req = HTTP_CLIENT .get(uri) .header(header::CONTENT_TYPE, "application/json") .send() .await; - reqw.is_ok() + req.is_ok() } /// send a request to the ingestor to fetch its stats @@ -255,8 +258,7 @@ pub async fn send_stats_request( return Ok(None); } - let client = reqwest::Client::new(); - let res = client + let res = HTTP_CLIENT .get(url) .header(header::CONTENT_TYPE, "application/json") .header(header::AUTHORIZATION, ingestor.token) diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 9a95aa26d..8d8db14c9 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -23,7 +23,7 @@ use http::StatusCode; use itertools::Itertools; use serde_json::Value; -use crate::{option::CONFIG, storage::STREAM_ROOT_DIRECTORY}; +use crate::{option::CONFIG, storage::STREAM_ROOT_DIRECTORY, HTTP_CLIENT}; use self::{cluster::get_ingestor_info, query::Query}; @@ -109,7 +109,7 @@ pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result = Lazy::new(|| { + ClientBuilder::new() + .connect_timeout(Duration::from_secs(3)) // set a timeout of 3s for each connection setup + .timeout(Duration::from_secs(10)) // set a timeout of 10s for each request + .pool_idle_timeout(Duration::from_secs(90)) // set a timeout of 90s for each idle connection + .pool_max_idle_per_host(32) // max 32 idle connections per host + .gzip(true) // gzip compress for all requests + .brotli(true) // brotli compress for all requests + .use_rustls_tls() // use only the rustls backend + .http1_only() // use only http/1.1 + .build() + .expect("Construction of client shouldn't fail") +}); diff --git a/src/metrics/prom_utils.rs b/src/metrics/prom_utils.rs index 0aaaacf36..1de4da87a 100644 --- a/src/metrics/prom_utils.rs +++ b/src/metrics/prom_utils.rs @@ -20,6 +20,7 @@ use crate::handlers::http::base_path_without_preceding_slash; use crate::handlers::http::ingest::PostError; use crate::handlers::http::modal::IngestorMetadata; use crate::utils::get_url; +use crate::HTTP_CLIENT; use actix_web::http::header; use chrono::NaiveDateTime; use chrono::Utc; @@ -227,7 +228,7 @@ impl Metrics { .map_err(|err| { PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err)) })?; - let res = reqwest::Client::new() + let res = HTTP_CLIENT .get(uri) .header(header::CONTENT_TYPE, "application/json") .header(header::AUTHORIZATION, ingestor_metadata.token)