Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add first opentelemetry metric #393

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ url = "2.5.4"
urlencoding = "2.1.3"
uuid = { version = "1.11.0", features = ["v7", "serde"] }
derive_more = { version = "1.0.0", features = ["display", "error"] }
prometheus = "0.13.4"
opentelemetry = "0.27.1"
opentelemetry_sdk = "0.27.1"
opentelemetry-prometheus = "0.27.0"

[target.'cfg(not(target_family = "unix"))'.dependencies]
crossterm = { version = "0.28.1" }
Expand Down
5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod config;
mod embedded_certificate;
pub mod metrics;
mod protocols;
mod restrictions;
#[cfg(test)]
Expand Down Expand Up @@ -371,7 +372,7 @@ pub async fn run_client(args: Client) -> anyhow::Result<()> {
Ok(())
}

pub async fn run_server(args: Server) -> anyhow::Result<()> {
pub async fn run_server(args: Server, unbounded_metrics: bool) -> anyhow::Result<()> {
let tls_config = if args.remote_addr.scheme() == "wss" {
let tls_certificate = if let Some(cert_path) = &args.tls_certificate {
tls::load_certificates_from_pem(cert_path).expect("Cannot load tls certificate")
Expand Down Expand Up @@ -449,7 +450,7 @@ pub async fn run_server(args: Server) -> anyhow::Result<()> {
restriction_config: args.restrict_config,
http_proxy,
};
let server = WsServer::new(server_config);
let server = WsServer::new(server_config, unbounded_metrics);

info!(
"Starting wstunnel server v{} with config {:?}",
Expand Down
84 changes: 84 additions & 0 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use std::net::SocketAddr;

use bytes::Bytes;
use http_body_util::Full;
use hyper::service::service_fn;
use hyper::{body, Request, Response, StatusCode, Version};
use hyper_util::rt::TokioExecutor;

use opentelemetry_sdk::metrics::SdkMeterProvider;
use prometheus::{Encoder, TextEncoder};
use tokio::net::TcpListener;
use tracing::{error, info, warn};

pub async fn setup_metrics_provider(addr: &SocketAddr) -> anyhow::Result<SdkMeterProvider> {
let registry = prometheus::Registry::new();

// configure OpenTelemetry to use this registry
let exporter = opentelemetry_prometheus::exporter()
.with_registry(registry.clone())
.build()?;

// set up a meter to create instruments
let provider = SdkMeterProvider::builder().with_reader(exporter).build();
let listener = TcpListener::bind(addr).await?;
info!("Started metrics server on {}", addr);

tokio::spawn(async move {
loop {
let (stream, _) = match listener.accept().await {
Ok(ret) => ret,
Err(err) => {
warn!("Error while accepting connection on metrics port {:?}", err);
continue;
}
};

let stream = hyper_util::rt::TokioIo::new(stream);
let conn = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
let fut = conn
.serve_connection(
stream,
service_fn(|req: Request<body::Incoming>| {
Ongy marked this conversation as resolved.
Show resolved Hide resolved
if req.uri().path() != "/metrics" {
return std::future::ready(
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Default::default()),
);
}

if req.version() != Version::HTTP_11 {
return std::future::ready(
Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Full::<Bytes>::from("Failed to generate metrics")),
);
}

// Create handler local registry for ownership
let encoder = TextEncoder::new();
let metric_families = registry.gather();
let mut result = Vec::new();
if let Err(err) = encoder.encode(&metric_families, &mut result) {
error!("Failed to encode prometheus metrics: {:?}", err);
return std::future::ready(
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Full::<Bytes>::from("Failed to generate metrics")),
);
}

std::future::ready(Ok(Response::new(Full::<Bytes>::from(result))))
}),
)
.await;

if let Err(err) = fut {
warn!("Failed to handle metrics connection: {:?}", err)
}
}
});

return Ok(provider);
}
1 change: 0 additions & 1 deletion src/tunnel/server/handler_websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::tunnel::server::utils::{bad_request, inject_cookie, HttpResponse};
use crate::tunnel::server::WsServer;
use crate::tunnel::transport;
use crate::tunnel::transport::websocket::mk_websocket_tunnel;
use bytes::Bytes;
use fastwebsockets::Role;
use http_body_util::combinators::BoxBody;
use http_body_util::Either;
Expand Down
31 changes: 27 additions & 4 deletions src/tunnel/server/server.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use anyhow::anyhow;
use futures_util::FutureExt;
use http_body_util::Either;
use opentelemetry::metrics::Counter;
use opentelemetry::{global, KeyValue};
use std::fmt;
use std::fmt::{Debug, Formatter};

use crate::protocols;
use crate::tunnel::{try_to_sock_addr, LocalProtocol, RemoteAddr};
use arc_swap::ArcSwap;
use bytes::Bytes;
use http_body_util::combinators::BoxBody;
use hyper::body::Incoming;
use hyper::server::conn::{http1, http2};
use hyper::service::service_fn;
use hyper::{http, Request, Response, StatusCode, Version};
use hyper::{http, Request, StatusCode, Version};
use hyper_util::rt::{TokioExecutor, TokioTimer};
use parking_lot::Mutex;
use socket2::SockRef;
Expand Down Expand Up @@ -65,15 +65,29 @@ pub struct WsServerConfig {
pub http_proxy: Option<Url>,
}

pub struct WsServerMetrics {
pub unbounded: bool,
pub connections: Counter<u64>,
}

#[derive(Clone)]
pub struct WsServer {
pub config: Arc<WsServerConfig>,
pub metrics: Arc<WsServerMetrics>,
}

impl WsServer {
pub fn new(config: WsServerConfig) -> Self {
pub fn new(config: WsServerConfig, unbounded_metrics: bool) -> Self {
let meter = global::meter_provider().meter("wstunnel");
Self {
config: Arc::new(config),
metrics: Arc::new(WsServerMetrics {
unbounded: unbounded_metrics,
connections: meter
.u64_counter("connections_created")
.with_description("Counts the connections created. Attributes allow to split by remote host")
.build(),
}),
}
}

Expand Down Expand Up @@ -127,6 +141,15 @@ impl WsServer {
})?;
info!("Tunnel accepted due to matched restriction: {}", restriction.name);

let attributes: &[KeyValue] = if self.metrics.unbounded {
&[
KeyValue::new("remote_host", format!("{}", remote.host)),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid setting remote_host and port as label as it will make too much cardinality for prometheus, and create a data leak for us.

If a user request random host / port our metrics are going to fill the available memory of wstunnel until it OOM as they are never cleaned.

metrics should not jeopardize the stability of the system
https://stackoverflow.com/a/69167162

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have to think about this a bit.

I see why you say that, and agree with the point you make.
OTOH, I need the cardinality to get the information I need. So I'll have to think about some way to customize this.
In my case, k8s and multiple replicas make an OOM less serious.

KeyValue::new("remote_port", i64::from(remote.port)),
]
} else {
&[]
};
self.metrics.connections.add(1, attributes);
let req_protocol = remote.protocol.clone();
let inject_cookie = req_protocol.is_dynamic_reverse_tunnel();
let tunnel = self
Expand Down
1 change: 0 additions & 1 deletion src/tunnel/server/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use hyper::body::{Body, Incoming};
use hyper::header::{HeaderValue, COOKIE, SEC_WEBSOCKET_PROTOCOL};
use hyper::{http, Request, Response, StatusCode};
use jsonwebtoken::TokenData;
use std::cmp::min;
use std::net::IpAddr;
use tracing::{error, info, warn};
use url::Host;
Expand Down
3 changes: 2 additions & 1 deletion wstunnel-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ anyhow = "1.0.95"
clap = { version = "4.5.23", features = ["derive", "env"] }

fdlimit = "0.3.0"
opentelemetry = "0.27.1"

tokio = { version = "1.42.0", features = ["full"] }

Expand All @@ -19,4 +20,4 @@ wstunnel = { path = ".." , features = ["clap"] }

[[bin]]
name = "wstunnel"
path = "src/main.rs"
path = "src/main.rs"
36 changes: 34 additions & 2 deletions wstunnel-cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use clap::Parser;
use std::io;
use std::net::SocketAddr;
use std::str::FromStr;
use opentelemetry::global;
use tracing::warn;
use tracing_subscriber::filter::Directive;
use tracing_subscriber::EnvFilter;
use wstunnel::config::{Client, Server};
use wstunnel::LocalProtocol;
use wstunnel::{run_client, run_server};
use wstunnel::metrics;

/// Use Websocket or HTTP2 protocol to tunnel {TCP,UDP} traffic
/// wsTunnelClient <---> wsTunnelServer <---> RemoteHost
Expand Down Expand Up @@ -43,6 +46,24 @@ pub struct Wstunnel {
default_value = "INFO"
)]
log_lvl: String,

/// Set the listen address for the prometheus metrics exporter.
#[arg(
long,
global = true,
verbatim_doc_comment,
default_value = None,
)]
metrics_provider_address: Option<SocketAddr>,

/// Allow metrics to take up unbounded space (OOM risk!).
#[arg(
long,
global = true,
verbatim_doc_comment,
default_value = "false",
)]
metrics_unbounded: bool,
}

#[derive(clap::Subcommand, Debug)]
Expand Down Expand Up @@ -84,12 +105,23 @@ async fn main() -> anyhow::Result<()> {
warn!("Failed to set soft filelimit to hard file limit: {}", err)
}

if let Some(addr) = args.metrics_provider_address {
match metrics::setup_metrics_provider(&addr).await {
Ok(provider) => {
let _ = global::set_meter_provider(provider);
}
Err(err) => {
panic!("Failed to setup metrics server: {err:?}")
}
}
}

match args.commands {
Commands::Client(args) => {
run_client(*args).await?;
}
Commands::Server(args) => {
run_server(*args).await?;
Commands::Server(server_args) => {
run_server(*server_args, args.metrics_unbounded).await?;
}
}

Expand Down