Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .envrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
use nix;
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,4 @@ tracing-subscriber = "0.3"
all = [ "diesel_pg" ]
default = []
diesel_pg = ["dep:diesel", "dep:async-bb8-diesel"]
qtop = ["dep:dropshot", "dep:tokio-tungstenite", "serde", "dep:serde_json", "dep:schemars"]
qtop = ["dep:dropshot", "dep:tokio-tungstenite", "serde", "dep:serde_json", "schemars"]
45 changes: 24 additions & 21 deletions examples/tcp_echo_workload/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async fn main() {
// This pool will try to lookup the echo server, using configuration
// defined in "example/dns_server/test.com.zone".
let resolver = Box::new(DnsResolver::new(
service::Name("_echo._tcp.test.com.".to_string()),
service::Name::from("_echo._tcp.test.com."),
bootstrap_dns,
DnsResolverConfig {
query_interval: Duration::from_secs(5),
Expand Down Expand Up @@ -135,7 +135,7 @@ async fn main() {
let pool = Pool::new(resolver, backend_connector, policy);

#[cfg(feature = "qtop")]
tokio::spawn(qtop(pool.stats().clone()));
serve_qtop(&pool);

// In a loop:
//
Expand Down Expand Up @@ -167,26 +167,29 @@ async fn main() {
}

#[cfg(feature = "qtop")]
async fn qtop(stats: qorb::pool::Stats) {
// Build a description of the API.
let mut api = dropshot::ApiDescription::new();
api.register(qorb::qtop::serve_stats).unwrap();
fn serve_qtop(pool: &Pool<TcpStream>) {
use qorb::qtop::Qtop;
let qtop = Qtop::new();
qtop.add_pool(pool);

let log = dropshot::ConfigLogging::StderrTerminal {
level: dropshot::ConfigLoggingLevel::Info,
};
// Set up the server.
let server = dropshot::HttpServerStarter::new(
&dropshot::ConfigDropshot {
bind_address: "127.0.0.1:42069".parse().unwrap(),
..Default::default()
},
api,
stats,
&log.to_logger("qtop").unwrap(),
)
.map_err(|error| format!("failed to create server: {}", error))
.unwrap()
.start()
.await
.unwrap();
tokio::spawn(async move {
// Set up the server.
dropshot::HttpServerStarter::new(
&dropshot::ConfigDropshot {
bind_address: "127.0.0.1:42069".parse().unwrap(),
..Default::default()
},
Qtop::api(),
qtop,
&log.to_logger("qtop").unwrap(),
)
.map_err(|error| format!("failed to create server: {}", error))
.unwrap()
.start()
.await
.unwrap();
});
}
36 changes: 26 additions & 10 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use crate::policy::Policy;
use crate::priority_list::PriorityList;
use crate::rebalancer;
use crate::resolver;
use crate::service;
use crate::slot;

use futures::StreamExt;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
Expand Down Expand Up @@ -54,7 +55,7 @@ struct PoolInner<Conn: Connection> {
priority_list: PriorityList<backend::Name>,

policy: Policy,
stats_tx: watch::Sender<HashMap<backend::Name, SerializeStats>>,
stats_tx: watch::Sender<BTreeMap<backend::Name, SerializeStats>>,

rx: mpsc::Receiver<Request<Conn>>,
}
Expand All @@ -65,7 +66,7 @@ impl<Conn: Connection> PoolInner<Conn> {
backend_connector: backend::SharedConnector<Conn>,
policy: Policy,
rx: mpsc::Receiver<Request<Conn>>,
stats_tx: watch::Sender<HashMap<backend::Name, SerializeStats>>,
stats_tx: watch::Sender<BTreeMap<backend::Name, SerializeStats>>,
) -> Self {
Self {
backend_connector,
Expand Down Expand Up @@ -325,21 +326,32 @@ impl<Conn: Connection> PoolInner<Conn> {

/// Manages a set of connections to a service
pub struct Pool<Conn: Connection> {
service_name: service::Name,
handle: tokio::task::JoinHandle<()>,
tx: mpsc::Sender<Request<Conn>>,
stats: Stats,
}

#[derive(Clone)]
pub struct Stats {
pub(crate) rx: watch::Receiver<HashMap<backend::Name, SerializeStats>>,
pub(crate) rx: watch::Receiver<BTreeMap<backend::Name, SerializeStats>>,
pub(crate) claims: Arc<AtomicUsize>,
}

impl<Conn: Connection> Pool<Conn> {
pub fn service_name(&self) -> &service::Name {
&self.service_name
}

pub fn stats(&self) -> &Stats {
&self.stats
}
}

impl<Conn: Connection + Send + 'static> Pool<Conn> {
/// Creates a new connection pool.
///
/// - resolver: Describes how backends should be found for the service.
/// - resolver: Describes how backends should be found for the service.'
/// - backend_connector: Describes how the connections to a specific
/// backend should be made.
#[instrument(skip(resolver, backend_connector), name = "Pool::new")]
Expand All @@ -348,15 +360,17 @@ impl<Conn: Connection + Send + 'static> Pool<Conn> {
backend_connector: backend::SharedConnector<Conn>,
policy: Policy,
) -> Self {
let service_name = resolver.service_name().clone();
let (tx, rx) = mpsc::channel(1);
let (stats_tx, stats_rx) = watch::channel(HashMap::default());
let (stats_tx, stats_rx) = watch::channel(BTreeMap::default());
let handle = tokio::task::spawn(async move {
let worker = PoolInner::new(resolver, backend_connector, policy, rx, stats_tx);
worker.run().await;
});

Self {
handle,
service_name,
tx,
stats: Stats {
rx: stats_rx,
Expand All @@ -365,10 +379,6 @@ impl<Conn: Connection + Send + 'static> Pool<Conn> {
}
}

pub fn stats(&self) -> &Stats {
&self.stats
}

/// Acquires a handle to a connection within the connection pool.
#[instrument(level = "debug", skip(self), err, name = "Pool::claim")]
pub async fn claim(&self) -> Result<claim::Handle<Conn>, Error> {
Expand All @@ -389,3 +399,9 @@ impl<Conn: Connection> Drop for Pool<Conn> {
self.handle.abort()
}
}

impl Stats {
pub fn same_pool(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.claims, &other.claims)
}
}
Loading