Skip to content

Commit

Permalink
refactor: init once for common_telemetry::init_global_logging and rem…
Browse files Browse the repository at this point in the history
…ove its return value which is unused
  • Loading branch information
zyy17 committed May 18, 2024
1 parent ad86eab commit 9cce7b9
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 141 deletions.
8 changes: 1 addition & 7 deletions src/cmd/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,7 @@ pub struct Command {

impl Command {
pub async fn build(&self, opts: LoggingOptions) -> Result<Instance> {
let _guard = common_telemetry::init_global_logging(
APP_NAME,
&opts,
&TracingOptions::default(),
None,
);

common_telemetry::init_global_logging(APP_NAME, &opts, &TracingOptions::default(), None);
self.cmd.build().await
}

Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ impl StartCommand {
}

async fn build(&self, mut opts: DatanodeOptions) -> Result<Instance> {
let _guard = common_telemetry::init_global_logging(
common_telemetry::init_global_logging(
APP_NAME,
&opts.logging,
&opts.tracing,
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl StartCommand {
}

async fn build(&self, mut opts: FrontendOptions) -> Result<Instance> {
let _guard = common_telemetry::init_global_logging(
common_telemetry::init_global_logging(
APP_NAME,
&opts.logging,
&opts.tracing,
Expand Down
3 changes: 1 addition & 2 deletions src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,7 @@ impl StartCommand {
}

async fn build(&self, mut opts: MetasrvOptions) -> Result<Instance> {
let _guard =
common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None);
common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None);

let plugins = plugins::setup_metasrv_plugins(&mut opts)
.await
Expand Down
3 changes: 1 addition & 2 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,7 @@ impl StartCommand {
#[allow(unused_variables)]
#[allow(clippy::diverging_sub_expression)]
async fn build(&self, opts: StandaloneOptions) -> Result<Instance> {
let _guard =
common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None);
common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None);

info!("Standalone start command: {:#?}", self);
info!("Building standalone instance with {opts:#?}");
Expand Down
253 changes: 125 additions & 128 deletions src/common/telemetry/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub fn init_default_ut_logging() {
static START: Once = Once::new();

START.call_once(|| {
let mut g = GLOBAL_UT_LOG_GUARD.as_ref().lock().unwrap();
let _g = GLOBAL_UT_LOG_GUARD.as_ref().lock().unwrap();

// When running in Github's actions, env "UNITTEST_LOG_DIR" is set to a directory other
// than "/tmp".
Expand All @@ -101,12 +101,7 @@ pub fn init_default_ut_logging() {
level: Some(level),
..Default::default()
};
*g = Some(init_global_logging(
"unittest",
&opts,
&TracingOptions::default(),
None
));
init_global_logging("unittest", &opts, &TracingOptions::default(), None);

crate::info!("logs dir = {}", dir);
});
Expand All @@ -123,140 +118,142 @@ pub fn init_global_logging(
opts: &LoggingOptions,
tracing_opts: &TracingOptions,
node_id: Option<String>,
) -> Vec<WorkerGuard> {
let mut guards = vec![];
let dir = &opts.dir;
let level = &opts.level;
let enable_otlp_tracing = opts.enable_otlp_tracing;

// Enable log compatible layer to convert log record to tracing span.
LogTracer::init().expect("log tracer must be valid");

// stdout log layer.
let stdout_logging_layer = if opts.append_stdout {
let (stdout_writer, stdout_guard) = tracing_appender::non_blocking(std::io::stdout());
guards.push(stdout_guard);

Some(
Layer::new()
.with_writer(stdout_writer)
.with_ansi(atty::is(atty::Stream::Stdout)),
)
} else {
None
};

// file log layer.
let rolling_appender = RollingFileAppender::new(Rotation::HOURLY, dir, app_name);
let (rolling_writer, rolling_writer_guard) = tracing_appender::non_blocking(rolling_appender);
let file_logging_layer = Layer::new().with_writer(rolling_writer).with_ansi(false);
guards.push(rolling_writer_guard);
) {
static START: Once = Once::new();
START.call_once(|| {
let mut guards = vec![];
let dir = &opts.dir;
let level = &opts.level;
let enable_otlp_tracing = opts.enable_otlp_tracing;

// error file log layer.
let err_rolling_appender =
RollingFileAppender::new(Rotation::HOURLY, dir, format!("{}-{}", app_name, "err"));
let (err_rolling_writer, err_rolling_writer_guard) =
tracing_appender::non_blocking(err_rolling_appender);
let err_file_logging_layer = Layer::new()
.with_writer(err_rolling_writer)
.with_ansi(false);
guards.push(err_rolling_writer_guard);
// Enable log compatible layer to convert log record to tracing span.
LogTracer::init().expect("log tracer must be valid");

// resolve log level settings from:
// - options from command line or config files
// - environment variable: RUST_LOG
// - default settings
let rust_log_env = std::env::var(EnvFilter::DEFAULT_ENV).ok();
let targets_string = level
.as_deref()
.or(rust_log_env.as_deref())
.unwrap_or(DEFAULT_LOG_TARGETS);
let filter = targets_string
.parse::<filter::Targets>()
.expect("error parsing log level string");
let sampler = opts
.tracing_sample_ratio
.as_ref()
.map(create_sampler)
.map(Sampler::ParentBased)
.unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)));
// Must enable 'tokio_unstable' cfg to use this feature.
// For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start`
#[cfg(feature = "tokio-console")]
let subscriber = {
let tokio_console_layer = if let Some(tokio_console_addr) = &tracing_opts.tokio_console_addr
{
let addr: std::net::SocketAddr = tokio_console_addr.parse().unwrap_or_else(|e| {
panic!("Invalid binding address '{tokio_console_addr}' for tokio-console: {e}");
});
println!("tokio-console listening on {addr}");
// stdout log layer.
let stdout_logging_layer = if opts.append_stdout {
let (stdout_writer, stdout_guard) = tracing_appender::non_blocking(std::io::stdout());
guards.push(stdout_guard);

Some(
console_subscriber::ConsoleLayer::builder()
.server_addr(addr)
.spawn(),
Layer::new()
.with_writer(stdout_writer)
.with_ansi(atty::is(atty::Stream::Stdout)),
)
} else {
None
};

let stdout_logging_layer = stdout_logging_layer.map(|x| x.with_filter(filter.clone()));
// file log layer.
let rolling_appender = RollingFileAppender::new(Rotation::HOURLY, dir, app_name);
let (rolling_writer, rolling_writer_guard) =
tracing_appender::non_blocking(rolling_appender);
let file_logging_layer = Layer::new().with_writer(rolling_writer).with_ansi(false);
guards.push(rolling_writer_guard);

let file_logging_layer = file_logging_layer.with_filter(filter);
// error file log layer.
let err_rolling_appender =
RollingFileAppender::new(Rotation::HOURLY, dir, format!("{}-{}", app_name, "err"));
let (err_rolling_writer, err_rolling_writer_guard) =
tracing_appender::non_blocking(err_rolling_appender);
let err_file_logging_layer = Layer::new()
.with_writer(err_rolling_writer)
.with_ansi(false);
guards.push(err_rolling_writer_guard);

Registry::default()
.with(tokio_console_layer)
.with(stdout_logging_layer)
.with(file_logging_layer)
.with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR))
};
// resolve log level settings from:
// - options from command line or config files
// - environment variable: RUST_LOG
// - default settings
let rust_log_env = std::env::var(EnvFilter::DEFAULT_ENV).ok();
let targets_string = level
.as_deref()
.or(rust_log_env.as_deref())
.unwrap_or(DEFAULT_LOG_TARGETS);
let filter = targets_string
.parse::<filter::Targets>()
.expect("error parsing log level string");
let sampler = opts
.tracing_sample_ratio
.as_ref()
.map(create_sampler)
.map(Sampler::ParentBased)
.unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)));
// Must enable 'tokio_unstable' cfg to use this feature.
// For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start`
#[cfg(feature = "tokio-console")]
let subscriber = {
let tokio_console_layer =
if let Some(tokio_console_addr) = &tracing_opts.tokio_console_addr {
let addr: std::net::SocketAddr = tokio_console_addr.parse().unwrap_or_else(|e| {
panic!("Invalid binding address '{tokio_console_addr}' for tokio-console: {e}");
});
println!("tokio-console listening on {addr}");

// consume the `tracing_opts`, to avoid "unused" warnings
let _ = tracing_opts;
Some(
console_subscriber::ConsoleLayer::builder()
.server_addr(addr)
.spawn(),
)
} else {
None
};

#[cfg(not(feature = "tokio-console"))]
let subscriber = Registry::default()
.with(filter)
.with(stdout_logging_layer)
.with(file_logging_layer)
.with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR));
let stdout_logging_layer = stdout_logging_layer.map(|x| x.with_filter(filter.clone()));

if enable_otlp_tracing {
global::set_text_map_propagator(TraceContextPropagator::new());
// otlp exporter
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter().tonic().with_endpoint(
opts.otlp_endpoint
.as_ref()
.map(|e| format!("http://{}", e))
.unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
),
)
.with_trace_config(
opentelemetry_sdk::trace::config()
.with_sampler(sampler)
.with_resource(opentelemetry_sdk::Resource::new(vec![
KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
KeyValue::new(
resource::SERVICE_INSTANCE_ID,
node_id.unwrap_or("none".to_string()),
),
KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
])),
)
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("otlp tracer install failed");
let tracing_layer = Some(tracing_opentelemetry::layer().with_tracer(tracer));
let subscriber = subscriber.with(tracing_layer);
tracing::subscriber::set_global_default(subscriber)
.expect("error setting global tracing subscriber");
} else {
tracing::subscriber::set_global_default(subscriber)
.expect("error setting global tracing subscriber");
}
let file_logging_layer = file_logging_layer.with_filter(filter);

Registry::default()
.with(tokio_console_layer)
.with(stdout_logging_layer)
.with(file_logging_layer)
.with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR))
};

// consume the `tracing_opts`, to avoid "unused" warnings
let _ = tracing_opts;

guards
#[cfg(not(feature = "tokio-console"))]
let subscriber = Registry::default()
.with(filter)
.with(stdout_logging_layer)
.with(file_logging_layer)
.with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR));

if enable_otlp_tracing {
global::set_text_map_propagator(TraceContextPropagator::new());
// otlp exporter
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter().tonic().with_endpoint(
opts.otlp_endpoint
.as_ref()
.map(|e| format!("http://{}", e))
.unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
),
)
.with_trace_config(
opentelemetry_sdk::trace::config()
.with_sampler(sampler)
.with_resource(opentelemetry_sdk::Resource::new(vec![
KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
KeyValue::new(
resource::SERVICE_INSTANCE_ID,
node_id.unwrap_or("none".to_string()),
),
KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
])),
)
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("otlp tracer install failed");
let tracing_layer = Some(tracing_opentelemetry::layer().with_tracer(tracer));
let subscriber = subscriber.with(tracing_layer);
tracing::subscriber::set_global_default(subscriber)
.expect("error setting global tracing subscriber");
} else {
tracing::subscriber::set_global_default(subscriber)
.expect("error setting global tracing subscriber");
}
});
}

0 comments on commit 9cce7b9

Please sign in to comment.