Skip to content

Commit

Permalink
refactor: init once for common_telemetry::init_global_logging
Browse files Browse the repository at this point in the history
  • Loading branch information
zyy17 committed May 18, 2024
1 parent ad86eab commit 74ae32c
Showing 1 changed file with 126 additions and 121 deletions.
247 changes: 126 additions & 121 deletions src/common/telemetry/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,139 +124,144 @@ pub fn init_global_logging(
tracing_opts: &TracingOptions,
node_id: Option<String>,
) -> Vec<WorkerGuard> {
static START: Once = Once::new();
let mut guards = vec![];
let dir = &opts.dir;
let level = &opts.level;
let enable_otlp_tracing = opts.enable_otlp_tracing;

// Enable log compatible layer to convert log record to tracing span.
LogTracer::init().expect("log tracer must be valid");

// stdout log layer.
let stdout_logging_layer = if opts.append_stdout {
let (stdout_writer, stdout_guard) = tracing_appender::non_blocking(std::io::stdout());
guards.push(stdout_guard);

Some(
Layer::new()
.with_writer(stdout_writer)
.with_ansi(atty::is(atty::Stream::Stdout)),
)
} else {
None
};

// file log layer.
let rolling_appender = RollingFileAppender::new(Rotation::HOURLY, dir, app_name);
let (rolling_writer, rolling_writer_guard) = tracing_appender::non_blocking(rolling_appender);
let file_logging_layer = Layer::new().with_writer(rolling_writer).with_ansi(false);
guards.push(rolling_writer_guard);

// error file log layer.
let err_rolling_appender =
RollingFileAppender::new(Rotation::HOURLY, dir, format!("{}-{}", app_name, "err"));
let (err_rolling_writer, err_rolling_writer_guard) =
tracing_appender::non_blocking(err_rolling_appender);
let err_file_logging_layer = Layer::new()
.with_writer(err_rolling_writer)
.with_ansi(false);
guards.push(err_rolling_writer_guard);

// resolve log level settings from:
// - options from command line or config files
// - environment variable: RUST_LOG
// - default settings
let rust_log_env = std::env::var(EnvFilter::DEFAULT_ENV).ok();
let targets_string = level
.as_deref()
.or(rust_log_env.as_deref())
.unwrap_or(DEFAULT_LOG_TARGETS);
let filter = targets_string
.parse::<filter::Targets>()
.expect("error parsing log level string");
let sampler = opts
.tracing_sample_ratio
.as_ref()
.map(create_sampler)
.map(Sampler::ParentBased)
.unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)));
// Must enable 'tokio_unstable' cfg to use this feature.
// For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start`
#[cfg(feature = "tokio-console")]
let subscriber = {
let tokio_console_layer = if let Some(tokio_console_addr) = &tracing_opts.tokio_console_addr
{
let addr: std::net::SocketAddr = tokio_console_addr.parse().unwrap_or_else(|e| {
panic!("Invalid binding address '{tokio_console_addr}' for tokio-console: {e}");
});
println!("tokio-console listening on {addr}");

START.call_once(|| {
let dir = &opts.dir;
let level = &opts.level;
let enable_otlp_tracing = opts.enable_otlp_tracing;

// Enable log compatible layer to convert log record to tracing span.
LogTracer::init().expect("log tracer must be valid");

// stdout log layer.
let stdout_logging_layer = if opts.append_stdout {
let (stdout_writer, stdout_guard) = tracing_appender::non_blocking(std::io::stdout());
guards.push(stdout_guard);

Some(
console_subscriber::ConsoleLayer::builder()
.server_addr(addr)
.spawn(),
Layer::new()
.with_writer(stdout_writer)
.with_ansi(atty::is(atty::Stream::Stdout)),
)
} else {
None
};

let stdout_logging_layer = stdout_logging_layer.map(|x| x.with_filter(filter.clone()));
// file log layer.
let rolling_appender = RollingFileAppender::new(Rotation::HOURLY, dir, app_name);
let (rolling_writer, rolling_writer_guard) =
tracing_appender::non_blocking(rolling_appender);
let file_logging_layer = Layer::new().with_writer(rolling_writer).with_ansi(false);
guards.push(rolling_writer_guard);

// error file log layer.
let err_rolling_appender =
RollingFileAppender::new(Rotation::HOURLY, dir, format!("{}-{}", app_name, "err"));
let (err_rolling_writer, err_rolling_writer_guard) =
tracing_appender::non_blocking(err_rolling_appender);
let err_file_logging_layer = Layer::new()
.with_writer(err_rolling_writer)
.with_ansi(false);
guards.push(err_rolling_writer_guard);

// resolve log level settings from:
// - options from command line or config files
// - environment variable: RUST_LOG
// - default settings
let rust_log_env = std::env::var(EnvFilter::DEFAULT_ENV).ok();
let targets_string = level
.as_deref()
.or(rust_log_env.as_deref())
.unwrap_or(DEFAULT_LOG_TARGETS);
let filter = targets_string
.parse::<filter::Targets>()
.expect("error parsing log level string");
let sampler = opts
.tracing_sample_ratio
.as_ref()
.map(create_sampler)
.map(Sampler::ParentBased)
.unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)));
// Must enable 'tokio_unstable' cfg to use this feature.
// For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start`
#[cfg(feature = "tokio-console")]
let subscriber = {
let tokio_console_layer =
if let Some(tokio_console_addr) = &tracing_opts.tokio_console_addr {
let addr: std::net::SocketAddr = tokio_console_addr.parse().unwrap_or_else(|e| {
panic!("Invalid binding address '{tokio_console_addr}' for tokio-console: {e}");
});
println!("tokio-console listening on {addr}");

Some(
console_subscriber::ConsoleLayer::builder()
.server_addr(addr)
.spawn(),
)
} else {
None
};

let stdout_logging_layer = stdout_logging_layer.map(|x| x.with_filter(filter.clone()));

let file_logging_layer = file_logging_layer.with_filter(filter);
let file_logging_layer = file_logging_layer.with_filter(filter);

Registry::default()
.with(tokio_console_layer)
Registry::default()
.with(tokio_console_layer)
.with(stdout_logging_layer)
.with(file_logging_layer)
.with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR))
};

// consume the `tracing_opts`, to avoid "unused" warnings
let _ = tracing_opts;

#[cfg(not(feature = "tokio-console"))]
let subscriber = Registry::default()
.with(filter)
.with(stdout_logging_layer)
.with(file_logging_layer)
.with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR))
};

// consume the `tracing_opts`, to avoid "unused" warnings
let _ = tracing_opts;

#[cfg(not(feature = "tokio-console"))]
let subscriber = Registry::default()
.with(filter)
.with(stdout_logging_layer)
.with(file_logging_layer)
.with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR));

if enable_otlp_tracing {
global::set_text_map_propagator(TraceContextPropagator::new());
// otlp exporter
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter().tonic().with_endpoint(
opts.otlp_endpoint
.as_ref()
.map(|e| format!("http://{}", e))
.unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
),
)
.with_trace_config(
opentelemetry_sdk::trace::config()
.with_sampler(sampler)
.with_resource(opentelemetry_sdk::Resource::new(vec![
KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
KeyValue::new(
resource::SERVICE_INSTANCE_ID,
node_id.unwrap_or("none".to_string()),
),
KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
])),
)
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("otlp tracer install failed");
let tracing_layer = Some(tracing_opentelemetry::layer().with_tracer(tracer));
let subscriber = subscriber.with(tracing_layer);
tracing::subscriber::set_global_default(subscriber)
.expect("error setting global tracing subscriber");
} else {
tracing::subscriber::set_global_default(subscriber)
.expect("error setting global tracing subscriber");
}
.with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR));

if enable_otlp_tracing {
global::set_text_map_propagator(TraceContextPropagator::new());
// otlp exporter
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter().tonic().with_endpoint(
opts.otlp_endpoint
.as_ref()
.map(|e| format!("http://{}", e))
.unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
),
)
.with_trace_config(
opentelemetry_sdk::trace::config()
.with_sampler(sampler)
.with_resource(opentelemetry_sdk::Resource::new(vec![
KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
KeyValue::new(
resource::SERVICE_INSTANCE_ID,
node_id.unwrap_or("none".to_string()),
),
KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
])),
)
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("otlp tracer install failed");
let tracing_layer = Some(tracing_opentelemetry::layer().with_tracer(tracer));
let subscriber = subscriber.with(tracing_layer);
tracing::subscriber::set_global_default(subscriber)
.expect("error setting global tracing subscriber");
} else {
tracing::subscriber::set_global_default(subscriber)
.expect("error setting global tracing subscriber");
}
});

guards
}

0 comments on commit 74ae32c

Please sign in to comment.