Skip to content

Commit

Permalink
Random delay before sub-queries. Log query time. Always unbind.
Browse files Browse the repository at this point in the history
  • Loading branch information
elonen committed Feb 23, 2023
1 parent da5ca42 commit a58f1ad
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 20 deletions.
37 changes: 37 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@ systemd-units = { enable = false }
name = "ldap_authz_proxy"
path = "src/main.rs"

[profile.release]
lto = true

[dependencies]
anyhow = "1.0.68"
async-recursion = "1.0.2"
docopt = "1.1.1"
hyper = { version = "0.14.23", features = ["full"] }
ldap3 = "0.11.1"
lru_time_cache = "0.11.11"
rand = "0.8.5"
regex = "1.7.1"
rust-ini = "0.18.0"
secrecy = "0.8.0"
Expand Down
56 changes: 36 additions & 20 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ use std::process::exit;
use std::str::FromStr;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc};
use std::time::Duration;
use config::ConfigSection;
use hyper::header::HeaderName;
use hyper::http::HeaderValue;
use hyper::service::service_fn;
use hyper::{Request, Response, Body, StatusCode};
use rand::random;
use tokio::net::TcpListener;
use docopt::Docopt;

Expand Down Expand Up @@ -133,7 +135,7 @@ async fn ldap_query(
async fn do_query(conf: &ConfigSection, query: &str) ->
ldap3::result::Result<Vec<ResultEntry>>
{
let settings = ldap3::LdapConnSettings::new().set_conn_timeout(std::time::Duration::from_millis((conf.ldap_conn_timeout*1000.0) as u64));
let settings = ldap3::LdapConnSettings::new().set_conn_timeout(Duration::from_millis((conf.ldap_conn_timeout*1000.0) as u64));
let (conn, mut ldap) = LdapConnAsync::with_settings(settings, conf.ldap_server_url.as_str()).await?;
ldap3::drive!(conn);

Expand All @@ -149,10 +151,10 @@ async fn ldap_query(
&conf.ldap_attribs).await?.success()
{
Ok((rows, _)) => {
ldap.unbind().await?;
ldap.unbind().await.ok();
Ok(rows)
},
Err(e) => Err(e)
Err(e) => { ldap.unbind().await.ok(); Err(e) }
}
}

Expand All @@ -166,13 +168,17 @@ async fn ldap_query(
},
Err(e) => {
use ldap3::LdapError::*;
if i == MAX_RETRY-1 || !matches!(e, ResultRecv{..} | Io{..} | EndOfStream{..} | Timeout{..}) {
// Last attempt or non-temporary error
return Err(e);
if i < MAX_RETRY-1 && matches!(e, ResultRecv{..} | Io{..} | EndOfStream{..} | Timeout{..}) {
let wait_time = 1.0 + (i as f32) * 0.5 * random::<f32>();
span.in_scope(|| { tracing::warn!("Temporary LDAP error: {:?}. Retry ({}/{}) in {wait_time} second.", e, i+1, MAX_RETRY, wait_time=wait_time); });
tokio::time::sleep(Duration::from_secs_f32(wait_time)).await;
} else {
let wait_time = 1.0 + (i as f32) * 0.5;
span.in_scope(|| { tracing::info!("Temporary LDAP error: {:?}. Retrying in {wait_time} second.", e); });
tokio::time::sleep(std::time::Duration::from_secs_f32(wait_time)).await;
if i == MAX_RETRY-1 {
span.in_scope(|| { tracing::error!("Max retries ({MAX_RETRY}) exhausted, LDAP error persists: {:?}", e); });
} else {
span.in_scope(|| { tracing::error!("Non-temporary LDAP error, giving up: {:?}", e); });
}
return Err(e);
}
}
};
Expand Down Expand Up @@ -220,7 +226,12 @@ async fn ldap_query(
.filter(|s| !seen.contains(*s))
.map(|s| {
span.in_scope(|| { tracing::debug!("Recursing into [{}] (rule {:?})", s, conf.sub_query_join); });
tokio::spawn(ldap_query(req_id, s.clone(), username.clone(), confs.clone(), cache.clone(), seen_sections.clone()))
let (s, username, confs, cache, seen_sections) = (s.clone(), username.clone(), confs.clone(), cache.clone(), seen_sections.clone());
tokio::spawn(async move {
// Random delay to avoid thundering herd
tokio::time::sleep(Duration::from_secs_f32(random::<f32>() * 0.05)).await;
ldap_query(req_id, s, username, confs, cache, seen_sections).await
})
}).collect::<Vec<_>>()
};

Expand Down Expand Up @@ -277,6 +288,7 @@ async fn http_handler(req: Request<Body>, ctx: Arc<ReqContext>) -> Result<Respon
path = %req.uri().path(),
section = tracing::field::Empty,
user = tracing::field::Empty,
query_ms = tracing::field::Empty,
cached = tracing::field::Empty);

let conf = match ctx.config.iter().find(|c|
Expand Down Expand Up @@ -313,13 +325,16 @@ async fn http_handler(req: Request<Body>, ctx: Arc<ReqContext>) -> Result<Respon

// Check LDAP (and cache)
let cache = ctx.cache.get(conf.section.as_str()).unwrap().clone();
let ldap_res = span.in_scope(|| async { ldap_query(
req_id,
conf.section.clone(),
username.into(),
ctx.config.clone(),
cache,
Arc::new(RwLock::new(HashSet::new()))).await }).await;
let query_start = tokio::time::Instant::now();
let ldap_res = span.in_scope(|| async {
ldap_query(
req_id,
conf.section.clone(),
username.into(),
ctx.config.clone(),
cache,
Arc::new(RwLock::new(HashSet::new()))).await }).await;
let span = span.record("query_ms", query_start.elapsed().as_millis());

match ldap_res {
Err(e) => {
Expand All @@ -329,7 +344,7 @@ async fn http_handler(req: Request<Body>, ctx: Arc<ReqContext>) -> Result<Respon
Ok(la) => {
let span = span.record("cached", &la.cached);
if let Some(ldap_res) = la.ldap_res {
span.in_scope(|| { tracing::info!("User authorized Ok"); });
span.in_scope(|| { tracing::info!("Authorized Ok"); });
let mut resp = Response::new(Body::from("200 OK - LDAP result found"));

// Store LDAP result attributes to response HTTP headers
Expand All @@ -350,12 +365,13 @@ async fn http_handler(req: Request<Body>, ctx: Arc<ReqContext>) -> Result<Respon
return Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from("Invalid LDAP result value"))
}
};
span.in_scope(|| { tracing::debug!("Adding result HTTP header: {:?} = {:?}", hname, hval); });
span.in_scope(|| { tracing::debug!("Adding header: {:?} = {:?}", hname, hval); });
resp.headers_mut().insert(hname, hval);
}
resp.headers_mut().insert("X-LDAP-CACHED", HeaderValue::from_str(if la.cached { "1" } else { "0" }).unwrap());
Ok(resp)
} else {
span.in_scope(|| { tracing::info!("Authorization denied"); });
Response::builder().status(StatusCode::FORBIDDEN)
.header("X-LDAP-CACHED", HeaderValue::from_str(if la.cached { "1" } else { "0" }).unwrap())
.body(Body::from(format!("403 Forbidden - Empty LDAP result for user '{:?}'", username)))
Expand Down Expand Up @@ -445,7 +461,7 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>>
let mut caches = HashMap::new();
for sect in conf.as_ref() {
tracing::debug!("CONFIG DUMP: {:?}", sect);
let ttl = ::std::time::Duration::from_secs_f32(sect.cache_time);
let ttl = Duration::from_secs_f32(sect.cache_time);
let cache = LdapCache::with_expiry_duration_and_capacity(ttl, sect.cache_size);
caches.insert(sect.section.clone(), Arc::new(Mutex::new(cache)));
}
Expand Down

0 comments on commit a58f1ad

Please sign in to comment.