Skip to content

Commit

Permalink
feat: add retry when starting telemetry server in ten runtime (#678)
Browse files Browse the repository at this point in the history
  • Loading branch information
halajohn authored Feb 6, 2025
1 parent f28f976 commit 27842d9
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 44 deletions.
3 changes: 3 additions & 0 deletions core/src/ten_rust/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@

pub const TELEMETRY_DEFAULT_ENDPOINT: &str = "0.0.0.0:49484";
pub const TELEMETRY_DEFAULT_PATH: &str = "/metrics";

pub const TELEMETRY_SERVER_START_RETRY_MAX_ATTEMPTS: u32 = 3;
pub const TELEMETRY_SERVER_START_RETRY_INTERVAL: u64 = 1; // seconds
110 changes: 67 additions & 43 deletions core/src/ten_rust/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ use prometheus::{
HistogramVec, Opts, Registry, TextEncoder,
};

use crate::constants::{TELEMETRY_DEFAULT_ENDPOINT, TELEMETRY_DEFAULT_PATH};
use crate::constants::{
TELEMETRY_DEFAULT_ENDPOINT, TELEMETRY_DEFAULT_PATH,
TELEMETRY_SERVER_START_RETRY_INTERVAL,
TELEMETRY_SERVER_START_RETRY_MAX_ATTEMPTS,
};

pub struct TelemetrySystem {
registry: Registry,
Expand Down Expand Up @@ -95,54 +99,74 @@ pub extern "C" fn ten_telemetry_system_create(
let registry = Registry::new();

// Start the actix-web server to provide metrics data at the specified path.
let registry_clone = registry.clone();
let path_clone = path_str.clone();

let server_builder = HttpServer::new(move || {
App::new().route(
&path_clone,
web::get().to({
let registry_handler = registry_clone.clone();

move || {
let registry_for_request = registry_handler.clone();

async move {
let metric_families = registry_for_request.gather();
let encoder = TextEncoder::new();
let mut buffer = Vec::new();

if encoder
.encode(&metric_families, &mut buffer)
.is_err()
{
return HttpResponse::InternalServerError()
.finish();
}

let response = match String::from_utf8(buffer) {
Ok(v) => v,
Err(_) => {
let mut attempts = 0;
let max_attempts = TELEMETRY_SERVER_START_RETRY_MAX_ATTEMPTS;
let wait_duration =
std::time::Duration::from_secs(TELEMETRY_SERVER_START_RETRY_INTERVAL);

let server_builder = loop {
let registry_clone = registry.clone();
let path_clone = path_str.clone();

let result = HttpServer::new(move || {
App::new().route(
&path_clone,
web::get().to({
let registry_handler = registry_clone.clone();

move || {
let registry_for_request = registry_handler.clone();

async move {
let metric_families = registry_for_request.gather();
let encoder = TextEncoder::new();
let mut buffer = Vec::new();

if encoder
.encode(&metric_families, &mut buffer)
.is_err()
{
return HttpResponse::InternalServerError()
.finish()
.finish();
}
};

HttpResponse::Ok().body(response)
let response = match String::from_utf8(buffer) {
Ok(v) => v,
Err(_) => {
return HttpResponse::InternalServerError()
.finish()
}
};

HttpResponse::Ok().body(response)
}
}
}),
)
})
// Make actix not linger on the socket.
.shutdown_timeout(0)
.bind(&endpoint_for_bind);

match result {
Ok(server) => break server,
Err(e) => {
attempts += 1;
if attempts >= max_attempts {
eprintln!(
"Error binding to address: {} after {} attempts: {:?}",
endpoint_str, attempts, e
);
return ptr::null_mut();
}
}),
)
})
// Make actix not linger on the socket.
.shutdown_timeout(0)
.bind(&endpoint_for_bind);

let server_builder = match server_builder {
Ok(s) => s,
Err(_) => {
eprintln!("Error binding to address: {}", endpoint_str);
return ptr::null_mut();

eprintln!(
"Failed to bind to address: {}. Attempt {} of {}. Retrying in {} second...",
endpoint_str, attempts, max_attempts, TELEMETRY_SERVER_START_RETRY_INTERVAL
);
std::thread::sleep(wait_duration);
}
}
};

Expand Down
2 changes: 1 addition & 1 deletion docs
Submodule docs updated 1 files
+1 −1 en/SUMMARY.md

0 comments on commit 27842d9

Please sign in to comment.