diff --git a/.envrc b/.envrc new file mode 100644 index 0000000..2493a15 --- /dev/null +++ b/.envrc @@ -0,0 +1 @@ +use nix; \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index e360bca..f2ae669 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,4 +75,4 @@ tracing-subscriber = "0.3" all = [ "diesel_pg" ] default = [] diesel_pg = ["dep:diesel", "dep:async-bb8-diesel"] -qtop = ["dep:dropshot", "dep:tokio-tungstenite", "serde", "dep:serde_json", "dep:schemars"] +qtop = ["dep:dropshot", "dep:tokio-tungstenite", "serde", "dep:serde_json", "schemars"] diff --git a/examples/tcp_echo_workload/main.rs b/examples/tcp_echo_workload/main.rs index cb350e9..e4d18ce 100644 --- a/examples/tcp_echo_workload/main.rs +++ b/examples/tcp_echo_workload/main.rs @@ -107,7 +107,7 @@ async fn main() { // This pool will try to lookup the echo server, using configuration // defined in "example/dns_server/test.com.zone". let resolver = Box::new(DnsResolver::new( - service::Name("_echo._tcp.test.com.".to_string()), + service::Name::from("_echo._tcp.test.com."), bootstrap_dns, DnsResolverConfig { query_interval: Duration::from_secs(5), @@ -135,7 +135,7 @@ async fn main() { let pool = Pool::new(resolver, backend_connector, policy); #[cfg(feature = "qtop")] - tokio::spawn(qtop(pool.stats().clone())); + serve_qtop(&pool); // In a loop: // @@ -167,26 +167,29 @@ async fn main() { } #[cfg(feature = "qtop")] -async fn qtop(stats: qorb::pool::Stats) { - // Build a description of the API. - let mut api = dropshot::ApiDescription::new(); - api.register(qorb::qtop::serve_stats).unwrap(); +fn serve_qtop(pool: &Pool) { + use qorb::qtop::Qtop; + let qtop = Qtop::new(); + qtop.add_pool(pool); + let log = dropshot::ConfigLogging::StderrTerminal { level: dropshot::ConfigLoggingLevel::Info, }; - // Set up the server. - let server = dropshot::HttpServerStarter::new( - &dropshot::ConfigDropshot { - bind_address: "127.0.0.1:42069".parse().unwrap(), - ..Default::default() - }, - api, - stats, - &log.to_logger("qtop").unwrap(), - ) - .map_err(|error| format!("failed to create server: {}", error)) - .unwrap() - .start() - .await - .unwrap(); + tokio::spawn(async move { + // Set up the server. + dropshot::HttpServerStarter::new( + &dropshot::ConfigDropshot { + bind_address: "127.0.0.1:42069".parse().unwrap(), + ..Default::default() + }, + Qtop::api(), + qtop, + &log.to_logger("qtop").unwrap(), + ) + .map_err(|error| format!("failed to create server: {}", error)) + .unwrap() + .start() + .await + .unwrap(); + }); } diff --git a/src/pool.rs b/src/pool.rs index 5430cdc..68fa290 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -7,10 +7,11 @@ use crate::policy::Policy; use crate::priority_list::PriorityList; use crate::rebalancer; use crate::resolver; +use crate::service; use crate::slot; use futures::StreamExt; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, Mutex, @@ -54,7 +55,7 @@ struct PoolInner { priority_list: PriorityList, policy: Policy, - stats_tx: watch::Sender>, + stats_tx: watch::Sender>, rx: mpsc::Receiver>, } @@ -65,7 +66,7 @@ impl PoolInner { backend_connector: backend::SharedConnector, policy: Policy, rx: mpsc::Receiver>, - stats_tx: watch::Sender>, + stats_tx: watch::Sender>, ) -> Self { Self { backend_connector, @@ -325,6 +326,7 @@ impl PoolInner { /// Manages a set of connections to a service pub struct Pool { + service_name: service::Name, handle: tokio::task::JoinHandle<()>, tx: mpsc::Sender>, stats: Stats, @@ -332,14 +334,24 @@ pub struct Pool { #[derive(Clone)] pub struct Stats { - pub(crate) rx: watch::Receiver>, + pub(crate) rx: watch::Receiver>, pub(crate) claims: Arc, } +impl Pool { + pub fn service_name(&self) -> &service::Name { + &self.service_name + } + + pub fn stats(&self) -> &Stats { + &self.stats + } +} + impl Pool { /// Creates a new connection pool. /// - /// - resolver: Describes how backends should be found for the service. + /// - resolver: Describes how backends should be found for the service.' /// - backend_connector: Describes how the connections to a specific /// backend should be made. #[instrument(skip(resolver, backend_connector), name = "Pool::new")] @@ -348,8 +360,9 @@ impl Pool { backend_connector: backend::SharedConnector, policy: Policy, ) -> Self { + let service_name = resolver.service_name().clone(); let (tx, rx) = mpsc::channel(1); - let (stats_tx, stats_rx) = watch::channel(HashMap::default()); + let (stats_tx, stats_rx) = watch::channel(BTreeMap::default()); let handle = tokio::task::spawn(async move { let worker = PoolInner::new(resolver, backend_connector, policy, rx, stats_tx); worker.run().await; @@ -357,6 +370,7 @@ impl Pool { Self { handle, + service_name, tx, stats: Stats { rx: stats_rx, @@ -365,10 +379,6 @@ impl Pool { } } - pub fn stats(&self) -> &Stats { - &self.stats - } - /// Acquires a handle to a connection within the connection pool. #[instrument(level = "debug", skip(self), err, name = "Pool::claim")] pub async fn claim(&self) -> Result, Error> { @@ -389,3 +399,9 @@ impl Drop for Pool { self.handle.abort() } } + +impl Stats { + pub fn same_pool(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.claims, &other.claims) + } +} diff --git a/src/qtop.rs b/src/qtop.rs index 8073d58..55b039d 100644 --- a/src/qtop.rs +++ b/src/qtop.rs @@ -1,13 +1,21 @@ -use crate::pool; -use dropshot::channel; -use dropshot::Query; +use crate::{ + backend, + pool::{self, Pool}, + service, +}; use dropshot::RequestContext; use dropshot::WebsocketConnection; -use futures::SinkExt; +use dropshot::{channel, endpoint}; +use dropshot::{HttpError, HttpResponseOk}; +use dropshot::{Path, Query}; +use futures::{stream::FuturesUnordered, SinkExt, StreamExt}; use schemars::JsonSchema; -use serde::Deserialize; -use std::collections::HashMap; -use std::sync::{atomic::Ordering, Arc, Mutex}; +use serde::{Deserialize, Serialize}; +use std::collections::{btree_map::Entry, BTreeMap}; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; use tokio::{ sync::watch, time::{self, Duration}, @@ -15,6 +23,105 @@ use tokio::{ use tokio_tungstenite::tungstenite::protocol::Role; use tokio_tungstenite::tungstenite::Message; +#[derive(Clone)] +pub struct Qtop { + pools_tx: watch::Sender>, + pools: watch::Receiver>, +} + +impl Qtop { + pub fn api() -> dropshot::ApiDescription { + let mut api = dropshot::ApiDescription::new(); + api.register(get_pools) + .expect("registering `get_pools` should succeed"); + api.register(get_pool_stats) + .expect("registering `get_pool_stats` should succeed"); + api.register(get_all_stats) + .expect("registering `get_all_stats` should succeed"); + api + } + + #[must_use] + pub fn new() -> Self { + let (pools_tx, pools) = watch::channel(BTreeMap::new()); + Self { pools_tx, pools } + } + + pub fn add_pool(&self, pool: &Pool) { + let name = pool.service_name().clone(); + let stats = pool.stats(); + self.pools_tx.send_if_modified(|pools| { + match pools.entry(name) { + // Pool is already present, do nothing. + Entry::Occupied(entry) if stats.same_pool(entry.get()) => false, + Entry::Occupied(mut entry) => { + entry.insert(stats.clone()); + true + } + Entry::Vacant(entry) => { + entry.insert(stats.clone()); + true + } + } + }); + } + + pub async fn serve_ws( + mut self, + mut ws: tokio_tungstenite::WebSocketStream, + update_interval: Duration, + ) -> dropshot::WebsocketChannelResult { + let mut interval = time::interval(update_interval); + let mut cache = Cache::default(); + let mut changed_pools = FuturesUnordered::new(); + loop { + if self.pools.has_changed()? { + let new_pools = self.pools.borrow_and_update(); + cache.pools.retain(|name, _| new_pools.contains_key(name)); + for (name, stats) in new_pools.iter() { + cache.pools.entry(name.clone()).or_insert_with(|| { + let mut stats = stats.clone(); + let sets = stats.rx.borrow_and_update().clone(); + let claims = stats.claims.load(Ordering::Relaxed); + changed_pools.push({ + let name = name.clone(); + let changed = todo!(); + async move { + changed.await; + name + } + }); + PoolCache { + stats, + sets, + claims, + } + }); + } + } + + tokio::select! { + next_changed_pool = changed_pools.next() => { + if let Some((name, stats)) = next_changed_pool { + cache.pools.entry(name).and_modify(|cache| { + cache.stats = stats.clone(); + cache.sets = stats.rx.borrow_and_update().clone(); + cache.claims = stats.claims.load(Ordering::Relaxed); + }); + } + } + _ = interval.tick() => {}, + _ = self.pools.changed() => {}, + } + } + } +} +impl Default for Qtop { + fn default() -> Self { + Self::new() + } +} + impl serde::Serialize for pool::SerializeStats { fn serialize(&self, serializer: S) -> Result { let inner = self.0.lock().unwrap(); @@ -22,24 +129,38 @@ impl serde::Serialize for pool::SerializeStats { } } +#[derive(Default)] +struct Cache { + pools: BTreeMap, +} + +#[derive(serde::Serialize)] +struct PoolCache { + #[serde(skip)] + stats: pool::Stats, + sets: BTreeMap, + claims: usize, +} + +fn serialize_atomic_usize( + atomic: &Arc, + serializer: S, +) -> Result { + atomic.load(Ordering::Relaxed).serialize(serializer) +} + impl pool::Stats { pub async fn serve_ws( mut self, - ws: WebsocketConnection, + mut ws: tokio_tungstenite::WebSocketStream, update_interval: Duration, ) -> dropshot::WebsocketChannelResult { - let mut ws = tokio_tungstenite::WebSocketStream::from_raw_socket( - ws.into_inner(), - Role::Server, - None, - ) - .await; let mut interval = time::interval(update_interval); - let mut cache = HashMap::new(); + let mut cache = BTreeMap::new(); let mut claims_last = self.claims.load(Ordering::Relaxed); loop { if self.rx.has_changed()? { - cache = (*self.rx.borrow_and_update()).clone(); + cache.clone_from(&*self.rx.borrow_and_update()); } let claims = self.claims.load(Ordering::Relaxed); let claims_per_interval = claims - claims_last; @@ -60,26 +181,90 @@ impl pool::Stats { // HTTP API interface +#[derive(Serialize, JsonSchema)] +#[schemars(example = "PoolList::example")] +struct PoolList { + pools: Vec, +} + +impl PoolList { + fn example() -> Self { + PoolList { + pools: vec!["foo".into(), "bar".into()], + } + } +} + #[derive(Deserialize, JsonSchema)] struct QueryParams { update_secs: Option, } -/// An eternally-increasing sequence of bytes, wrapping on overflow, starting -/// from the value given for the query parameter "start." +#[endpoint{ + method = GET, + path = "/qtop/pools", +}] +async fn get_pools(rqctx: RequestContext) -> Result, HttpError> { + let pools = rqctx.context().pools.borrow().keys().cloned().collect(); + Ok(HttpResponseOk(PoolList { pools })) +} + +#[endpoint { + method = GET, + path = "/qtop/stats/{pool}", +}] +async fn get_pool_stats( + rqctx: RequestContext, + path: Path, + qp: Query, + websock: dropshot::WebsocketUpgrade, +) -> dropshot::WebsocketEndpointResult { + let update_interval = Duration::from_secs(u64::from(qp.into_inner().update_secs.unwrap_or(1))); + let pool = path.into_inner(); + match rqctx.context().pools.borrow().get(pool.as_str()) { + Some(stats) => { + let stats = stats.clone(); + websock.handle(move |upgraded| async move { + let ws = tokio_tungstenite::WebSocketStream::from_raw_socket( + upgraded.into_inner(), + Role::Server, + None, + ) + .await; + stats.serve_ws(ws, update_interval).await + }) + } + None => Err(HttpError::for_not_found( + None, + format!("no pool named {pool}"), + )), + } +} + #[channel { protocol = WEBSOCKETS, path = "/qtop/stats", }] -pub async fn serve_stats( - rqctx: RequestContext, +async fn get_all_stats( + rqctx: RequestContext, qp: Query, upgraded: WebsocketConnection, ) -> dropshot::WebsocketChannelResult { - let update_interval = Duration::from_secs(u64::from(qp.into_inner().update_secs.unwrap_or(1))); - rqctx - .context() - .clone() - .serve_ws(upgraded, update_interval) - .await + // let update_interval = Duration::from_secs(u64::from(qp.into_inner().update_secs.unwrap_or(1))); + // rqctx + // .context() + // .clone() + // .serve_ws(upgraded, update_interval) + // .await + todo!() +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn dropshot_api_is_reasonable() { + // just ensure that the API description doesn't panic... + let _api = Qtop::api(); + } } diff --git a/src/resolver.rs b/src/resolver.rs index 864a889..d51d51c 100644 --- a/src/resolver.rs +++ b/src/resolver.rs @@ -28,6 +28,7 @@ pub trait Resolver: Send + Sync { /// Monitors for new backends, and returns information about the health /// of the resolver. async fn step(&mut self) -> Vec; + fn service_name(&self) -> &crate::service::Name; } /// Helper type for anything that implements the Resolver interface. diff --git a/src/resolvers/dns.rs b/src/resolvers/dns.rs index 1d5bd2c..64259db 100644 --- a/src/resolvers/dns.rs +++ b/src/resolvers/dns.rs @@ -72,7 +72,7 @@ impl Client { name: &service::Name, ) -> Result, anyhow::Error> { // Look up all the SRV records for this particular name. - let srv = self.resolver.srv_lookup(&name.0).await?; + let srv = self.resolver.srv_lookup(name.0.as_ref()).await?; event!(Level::DEBUG, ?srv, "Successfully looked up SRV record"); let futures = std::iter::repeat(self.resolver.clone()) @@ -325,4 +325,8 @@ impl Resolver for DnsResolver { } } } + + fn service_name(&self) -> &service::Name { + &self.service + } } diff --git a/src/service.rs b/src/service.rs index 0f6afdc..39ad9c6 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,5 +1,41 @@ //! Interface for services +use std::sync::Arc; /// Describes the name of a service -#[derive(Clone, Debug)] -pub struct Name(pub String); +#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +pub struct Name(pub Arc); + +impl Name { + pub fn new(s: impl ToString) -> Self { + Self(Arc::from(s.to_string())) + } +} + +impl From for Name { + fn from(s: String) -> Self { + Self(Arc::from(s)) + } +} + +impl From<&'_ str> for Name { + fn from(s: &'_ str) -> Self { + Self(Arc::from(s)) + } +} + +impl std::borrow::Borrow for Name { + fn borrow(&self) -> &str { + &self.0 + } +} + +#[cfg(feature = "serde")] +impl serde::Serialize for Name { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + self.0.serialize(serializer) + } +}