Skip to content

Commit

Permalink
refactor: single reqwest::Client for HTTP (#1071)
Browse files Browse the repository at this point in the history
Co-authored-by: Nikhil Sinha <[email protected]>
  • Loading branch information
de-sh and nikhilsinhaparseable authored Jan 7, 2025
1 parent b46be32 commit 9122158
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 46 deletions.
30 changes: 27 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ relative-path = { version = "1.7", features = ["serde"] }
reqwest = { version = "0.11.27", default-features = false, features = [
"rustls-tls",
"json",
"gzip",
"brotli",
] } # cannot update cause rustls is not latest `see rustls`
rustls = "0.22.4" # cannot update to 0.23 actix has not caught up yet
rustls-pemfile = "2.1.2"
Expand Down
8 changes: 3 additions & 5 deletions src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use crate::handlers::http::cluster::utils::check_liveness;
use crate::handlers::http::{base_path_without_preceding_slash, cluster};
use crate::handlers::STREAM_NAME_HEADER_KEY;
use crate::option::{Mode, CONFIG};
use crate::storage;
use crate::{metadata, stats};
use crate::{storage, HTTP_CLIENT};

use crate::stats::Stats;
use actix_web::{web, HttpRequest, Responder};
Expand Down Expand Up @@ -132,9 +132,7 @@ impl Report {
}

pub async fn send(&self) {
let client = reqwest::Client::new();

let _ = client
let _ = HTTP_CLIENT
.post(ANALYTICS_SERVER_URL)
.header(STREAM_NAME_HEADER_KEY, "serverusageevent")
.json(&self)
Expand Down Expand Up @@ -240,7 +238,7 @@ async fn fetch_ingestors_metrics(
))
.expect("Should be a valid URL");

let resp = reqwest::Client::new()
let resp = HTTP_CLIENT
.get(uri)
.header(header::AUTHORIZATION, im.token.clone())
.header(header::CONTENT_TYPE, "application/json")
Expand Down
12 changes: 3 additions & 9 deletions src/audit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ use std::{
fmt::{Debug, Display},
};

use crate::{about::current, storage::StorageMetadata};
use crate::{about::current, storage::StorageMetadata, HTTP_CLIENT};

use super::option::CONFIG;
use chrono::{DateTime, Utc};
use once_cell::sync::Lazy;
use reqwest::Client;
use serde::Serialize;
use serde_json::{json, Value};
use tracing::error;
Expand All @@ -38,7 +37,6 @@ static AUDIT_LOGGER: Lazy<Option<AuditLogger>> = Lazy::new(AuditLogger::new);

// AuditLogger handles sending audit logs to a remote logging system
pub struct AuditLogger {
client: Client,
log_endpoint: Url,
}

Expand All @@ -62,16 +60,12 @@ impl AuditLogger {
}
};

Some(AuditLogger {
client: reqwest::Client::new(),
log_endpoint,
})
Some(AuditLogger { log_endpoint })
}

// Sends the audit log to the configured endpoint with proper authentication
async fn send_log(&self, json: Value) {
let mut req = self
.client
let mut req = HTTP_CLIENT
.post(self.log_endpoint.as_str())
.json(&json)
.header("x-p-stream", "audit_log");
Expand Down
32 changes: 12 additions & 20 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::stats::Stats;
use crate::storage::object_storage::ingestor_metadata_path;
use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY};
use crate::HTTP_CLIENT;
use actix_web::http::header::{self, HeaderMap};
use actix_web::{HttpRequest, Responder};
use bytes::Bytes;
Expand Down Expand Up @@ -76,8 +77,6 @@ pub async fn sync_streams_with_ingestors(
StreamError::Anyhow(err)
})?;

let client = reqwest::Client::new();

for ingestor in ingestor_infos {
if !utils::check_liveness(&ingestor.domain_name).await {
warn!("Ingestor {} is not live", ingestor.domain_name);
Expand All @@ -89,7 +88,7 @@ pub async fn sync_streams_with_ingestors(
base_path_without_preceding_slash(),
stream_name
);
let res = client
let res = HTTP_CLIENT
.put(url)
.headers(reqwest_headers.clone())
.header(header::AUTHORIZATION, &ingestor.token)
Expand Down Expand Up @@ -126,7 +125,6 @@ pub async fn sync_users_with_roles_with_ingestors(
RBACError::Anyhow(err)
})?;

let client = reqwest::Client::new();
let role = to_vec(&role.clone()).map_err(|err| {
error!("Fatal: failed to serialize role: {:?}", err);
RBACError::SerdeError(err)
Expand All @@ -143,7 +141,7 @@ pub async fn sync_users_with_roles_with_ingestors(
username
);

let res = client
let res = HTTP_CLIENT
.put(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -177,7 +175,6 @@ pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(),
RBACError::Anyhow(err)
})?;

let client = reqwest::Client::new();
for ingestor in ingestor_infos.iter() {
if !utils::check_liveness(&ingestor.domain_name).await {
warn!("Ingestor {} is not live", ingestor.domain_name);
Expand All @@ -190,7 +187,7 @@ pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(),
username
);

let res = client
let res = HTTP_CLIENT
.delete(url)
.header(header::AUTHORIZATION, &ingestor.token)
.send()
Expand Down Expand Up @@ -231,7 +228,6 @@ pub async fn sync_user_creation_with_ingestors(
user.roles.clone_from(role);
}
let username = user.username();
let client = reqwest::Client::new();

let user = to_vec(&user).map_err(|err| {
error!("Fatal: failed to serialize user: {:?}", err);
Expand All @@ -250,7 +246,7 @@ pub async fn sync_user_creation_with_ingestors(
username
);

let res = client
let res = HTTP_CLIENT
.post(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -283,7 +279,6 @@ pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(),
error!("Fatal: failed to get ingestor info: {:?}", err);
RBACError::Anyhow(err)
})?;
let client = reqwest::Client::new();

for ingestor in ingestor_infos.iter() {
if !utils::check_liveness(&ingestor.domain_name).await {
Expand All @@ -297,7 +292,7 @@ pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(),
username
);

let res = client
let res = HTTP_CLIENT
.post(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -338,7 +333,6 @@ pub async fn sync_role_update_with_ingestors(
RoleError::SerdeError(err)
})?;
let roles = Bytes::from(roles);
let client = reqwest::Client::new();

for ingestor in ingestor_infos.iter() {
if !utils::check_liveness(&ingestor.domain_name).await {
Expand All @@ -352,7 +346,7 @@ pub async fn sync_role_update_with_ingestors(
name
);

let res = client
let res = HTTP_CLIENT
.put(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -401,7 +395,7 @@ pub async fn fetch_daily_stats_from_ingestors(
StreamError::Anyhow(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
})?;

let res = reqwest::Client::new()
let res = HTTP_CLIENT
.get(uri)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -512,8 +506,7 @@ pub async fn send_stream_delete_request(
if !utils::check_liveness(&ingestor.domain_name).await {
return Ok(());
}
let client = reqwest::Client::new();
let resp = client
let resp = HTTP_CLIENT
.delete(url)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingestor.token)
Expand Down Expand Up @@ -551,8 +544,7 @@ pub async fn send_retention_cleanup_request(
if !utils::check_liveness(&ingestor.domain_name).await {
return Ok(first_event_at);
}
let client = reqwest::Client::new();
let resp = client
let resp = HTTP_CLIENT
.post(url)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingestor.token)
Expand Down Expand Up @@ -603,7 +595,7 @@ pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
))
.expect("should always be a valid url");

let resp = reqwest::Client::new()
let resp = HTTP_CLIENT
.get(uri)
.header(header::AUTHORIZATION, ingestor.token.clone())
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -752,7 +744,7 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
})?;

let res = reqwest::Client::new()
let res = HTTP_CLIENT
.get(uri)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
Expand Down
14 changes: 8 additions & 6 deletions src/handlers/http/cluster/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
*
*/

use crate::handlers::http::{
base_path_without_preceding_slash, logstream::error::StreamError, modal::IngestorMetadata,
use crate::{
handlers::http::{
base_path_without_preceding_slash, logstream::error::StreamError, modal::IngestorMetadata,
},
HTTP_CLIENT,
};
use actix_web::http::header;
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -235,13 +238,13 @@ pub async fn check_liveness(domain_name: &str) -> bool {
}
};

let reqw = reqwest::Client::new()
let req = HTTP_CLIENT
.get(uri)
.header(header::CONTENT_TYPE, "application/json")
.send()
.await;

reqw.is_ok()
req.is_ok()
}

/// send a request to the ingestor to fetch its stats
Expand All @@ -255,8 +258,7 @@ pub async fn send_stats_request(
return Ok(None);
}

let client = reqwest::Client::new();
let res = client
let res = HTTP_CLIENT
.get(url)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingestor.token)
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use http::StatusCode;
use itertools::Itertools;
use serde_json::Value;

use crate::{option::CONFIG, storage::STREAM_ROOT_DIRECTORY};
use crate::{option::CONFIG, storage::STREAM_ROOT_DIRECTORY, HTTP_CLIENT};

use self::{cluster::get_ingestor_info, query::Query};

Expand Down Expand Up @@ -109,7 +109,7 @@ pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec
base_path_without_preceding_slash(),
"query"
);
let reqw = reqwest::Client::new()
let reqw = HTTP_CLIENT
.post(uri)
.json(query)
.header(http::header::AUTHORIZATION, im.token.clone())
Expand Down
Loading

0 comments on commit 9122158

Please sign in to comment.