Skip to content

Commit

Permalink
chore: use RUST_LOG env to control logging in rust binary (#2017)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored Aug 30, 2024
1 parent a77c939 commit 39e6ddd
Show file tree
Hide file tree
Showing 10 changed files with 22 additions and 53 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ servesink = { path = "servesink" }
serving = { path = "serving" }
monovertex = { path = "monovertex" }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
5 changes: 2 additions & 3 deletions rust/monovertex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ tower = "0.4.13"
uuid = { version = "1.10.0", features = ["v4"] }
once_cell = "1.19.0"
serde_json = "1.0.122"
numaflow-models = { path = "../numaflow-models"}
numaflow-models = { path = "../numaflow-models" }
trait-variant = "0.1.2"
rcgen = "0.13.1"
rustls = { version = "0.23.12", features = ["aws_lc_rs"] }
Expand All @@ -35,9 +35,8 @@ parking_lot = "0.12.3"
prometheus-client = "0.22.3"

[dev-dependencies]
tower = "0.4.13"
tempfile = "3.11.0"
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch="main" }
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch = "main" }

[build-dependencies]
tonic-build = "0.12.1"
10 changes: 0 additions & 10 deletions rust/monovertex/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::sync::OnceLock;

use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use tracing::level_filters::LevelFilter;

use numaflow_models::models::MonoVertex;

Expand All @@ -14,7 +13,6 @@ const ENV_GRPC_MAX_MESSAGE_SIZE: &str = "NUMAFLOW_GRPC_MAX_MESSAGE_SIZE";
const ENV_POD_REPLICA: &str = "NUMAFLOW_REPLICA";
const DEFAULT_GRPC_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; // 64 MB
const DEFAULT_METRICS_PORT: u16 = 2469;
const ENV_LOG_LEVEL: &str = "NUMAFLOW_DEBUG";
const DEFAULT_LAG_CHECK_INTERVAL_IN_SECS: u16 = 5;
const DEFAULT_LAG_REFRESH_INTERVAL_IN_SECS: u16 = 3;
const DEFAULT_BATCH_SIZE: u64 = 500;
Expand All @@ -38,7 +36,6 @@ pub struct Settings {
pub batch_size: u64,
pub timeout_in_ms: u32,
pub metrics_server_listen_port: u16,
pub log_level: String,
pub grpc_max_message_size: usize,
pub is_transformer_enabled: bool,
pub is_fallback_enabled: bool,
Expand All @@ -56,7 +53,6 @@ impl Default for Settings {
batch_size: DEFAULT_BATCH_SIZE,
timeout_in_ms: DEFAULT_TIMEOUT_IN_MS,
metrics_server_listen_port: DEFAULT_METRICS_PORT,
log_level: LevelFilter::INFO.to_string(),
grpc_max_message_size: DEFAULT_GRPC_MAX_MESSAGE_SIZE,
is_transformer_enabled: false,
is_fallback_enabled: false,
Expand Down Expand Up @@ -122,9 +118,6 @@ impl Settings {
.is_some();
}

settings.log_level =
env::var(ENV_LOG_LEVEL).unwrap_or_else(|_| LevelFilter::INFO.to_string());

settings.grpc_max_message_size = env::var(ENV_GRPC_MAX_MESSAGE_SIZE)
.unwrap_or_else(|_| DEFAULT_GRPC_MAX_MESSAGE_SIZE.to_string())
.parse()
Expand Down Expand Up @@ -152,7 +145,6 @@ mod tests {
// Set up environment variables
unsafe {
env::set_var(ENV_MONO_VERTEX_OBJ, "eyJtZXRhZGF0YSI6eyJuYW1lIjoic2ltcGxlLW1vbm8tdmVydGV4IiwibmFtZXNwYWNlIjoiZGVmYXVsdCIsImNyZWF0aW9uVGltZXN0YW1wIjpudWxsfSwic3BlYyI6eyJyZXBsaWNhcyI6MCwic291cmNlIjp7InRyYW5zZm9ybWVyIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6InF1YXkuaW8vbnVtYWlvL251bWFmbG93LXJzL21hcHQtZXZlbnQtdGltZS1maWx0ZXI6c3RhYmxlIiwicmVzb3VyY2VzIjp7fX0sImJ1aWx0aW4iOm51bGx9LCJ1ZHNvdXJjZSI6eyJjb250YWluZXIiOnsiaW1hZ2UiOiJkb2NrZXIuaW50dWl0LmNvbS9wZXJzb25hbC95aGwwMS9zaW1wbGUtc291cmNlOnN0YWJsZSIsInJlc291cmNlcyI6e319fX0sInNpbmsiOnsidWRzaW5rIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImRvY2tlci5pbnR1aXQuY29tL3BlcnNvbmFsL3lobDAxL2JsYWNraG9sZS1zaW5rOnN0YWJsZSIsInJlc291cmNlcyI6e319fX0sImxpbWl0cyI6eyJyZWFkQmF0Y2hTaXplIjo1MDAsInJlYWRUaW1lb3V0IjoiMXMifSwic2NhbGUiOnt9fSwic3RhdHVzIjp7InJlcGxpY2FzIjowLCJsYXN0VXBkYXRlZCI6bnVsbCwibGFzdFNjYWxlZEF0IjpudWxsfX0=");
env::set_var(ENV_LOG_LEVEL, "debug");
env::set_var(ENV_GRPC_MAX_MESSAGE_SIZE, "128000000");
};

Expand All @@ -163,13 +155,11 @@ mod tests {
assert_eq!(settings.mono_vertex_name, "simple-mono-vertex");
assert_eq!(settings.batch_size, 500);
assert_eq!(settings.timeout_in_ms, 1000);
assert_eq!(settings.log_level, "debug");
assert_eq!(settings.grpc_max_message_size, 128000000);

// Clean up environment variables
unsafe {
env::remove_var(ENV_MONO_VERTEX_OBJ);
env::remove_var(ENV_LOG_LEVEL);
env::remove_var(ENV_GRPC_MAX_MESSAGE_SIZE);
};
}
Expand Down
2 changes: 1 addition & 1 deletion rust/monovertex/src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ impl Forwarder {
match self.sink_client.sink_fn(messages_to_send.clone()).await {
Ok(response) => {
debug!(
attempts=attempts,
attempts = attempts,
"Sink latency - {}ms",
start_time.elapsed().as_millis()
);
Expand Down
14 changes: 1 addition & 13 deletions rust/monovertex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ use tokio::signal;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::level_filters::LevelFilter;
use tracing::{error, info, warn};
use tracing_subscriber::EnvFilter;

/// SourcerSinker orchestrates data movement from the Source to the Sink via the optional SourceTransformer.
/// The forward-a-chunk executes the following in an infinite loop till a shutdown signal is received:
Expand Down Expand Up @@ -43,17 +41,6 @@ mod server_info;
mod metrics;

pub async fn mono_vertex() {
// Initialize the logger
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.parse_lossy(&config().log_level),
)
.with_target(false)
.with_ansi(false)
.init();

// Initialize the source, sink and transformer configurations
// We are using the default configurations for now.
let source_config = SourceConfig {
Expand Down Expand Up @@ -83,6 +70,7 @@ pub async fn mono_vertex() {

let cln_token = CancellationToken::new();
let shutdown_cln_token = cln_token.clone();

// wait for SIG{INT,TERM} and invoke cancellation token.
let shutdown_handle: JoinHandle<Result<()>> = tokio::spawn(async move {
shutdown_signal().await;
Expand Down
6 changes: 1 addition & 5 deletions rust/monovertex/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ const TRANSFORM_TIME: &str = "monovtx_transformer_time";
const ACK_TIME: &str = "monovtx_ack_time";
const SINK_TIME: &str = "monovtx_sink_time";


#[derive(Clone)]
pub(crate) struct MetricsState {
pub source_client: SourceClient,
Expand Down Expand Up @@ -204,10 +203,7 @@ static MONOVTX_METRICS: OnceLock<MonoVtxMetrics> = OnceLock::new();
// forward_metrics is a helper function used to fetch the
// MonoVtxMetrics object
pub(crate) fn forward_metrics() -> &'static MonoVtxMetrics {
MONOVTX_METRICS.get_or_init(|| {
let metrics = MonoVtxMetrics::new();
metrics
})
MONOVTX_METRICS.get_or_init(MonoVtxMetrics::new)
}

/// MONOVTX_METRICS_LABELS are used to store the common labels used in the metrics
Expand Down
9 changes: 0 additions & 9 deletions rust/servesink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,12 @@ use std::error::Error;
use numaflow::sink::{self, Response, SinkRequest};
use reqwest::Client;
use tracing::{error, warn};
use tracing_subscriber::prelude::*;

const NUMAFLOW_CALLBACK_URL_HEADER: &str = "X-Numaflow-Callback-Url";
const NUMAFLOW_ID_HEADER: &str = "X-Numaflow-Id";

/// servesink is a Numaflow Sink which forwards the payload to the Numaflow serving URL.
pub async fn servesink() -> Result<(), Box<dyn Error + Send + Sync>> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "servesink=debug".into()),
)
.with(tracing_subscriber::fmt::layer().with_ansi(false))
.init();

sink::Server::new(ServeSink::new()).start().await
}

Expand Down
12 changes: 0 additions & 12 deletions rust/serving/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use crate::pipeline::min_pipeline_spec;
use axum_server::tls_rustls::RustlsConfig;
use std::net::SocketAddr;
use tracing::info;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

mod app;
mod config;
Expand All @@ -23,16 +21,6 @@ pub async fn serve() -> std::result::Result<(), Box<dyn std::error::Error + Send
.await
.map_err(|e| format!("Failed to create tls config {:?}", e))?;

tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
// axum logs rejections from built-in extractors with the `axum::rejection`
// target, at `TRACE` level. `axum::rejection=trace` enables showing those events
.unwrap_or_else(|_| "info,numaserve=debug,axum::rejection=trace".into()),
)
.with(tracing_subscriber::fmt::layer().with_ansi(false))
.init();

info!(config = ?config(), pipeline_spec = ? min_pipeline_spec(), "Starting server with config and pipeline spec");

// Start the metrics server, which serves the prometheus metrics.
Expand Down
15 changes: 15 additions & 0 deletions rust/src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
use std::env;
use tracing::{error, info};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

#[tokio::main]
async fn main() {
let args: Vec<String> = env::args().collect();

// Set up the tracing subscriber. RUST_LOG can be used to set the log level.
// The default log level is `info`. The `axum::rejection=trace` enables showing
// rejections from built-in extractors at `TRACE` level.
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
// TODO: add a better default based on entry point invocation
// e.g., serving/monovertex might need a different default
.unwrap_or_else(|_| "info".into()),
)
.with(tracing_subscriber::fmt::layer().with_ansi(false))
.init();

// Based on the argument, run the appropriate component.
if args.contains(&"--serving".to_string()) {
if let Err(e) = serving::serve().await {
Expand Down

0 comments on commit 39e6ddd

Please sign in to comment.