diff --git a/rust/numaflow-core/src/config.rs b/rust/numaflow-core/src/config.rs index d7b497371..ae9071d8e 100644 --- a/rust/numaflow-core/src/config.rs +++ b/rust/numaflow-core/src/config.rs @@ -1,130 +1,23 @@ -use std::fmt::Display; +use std::env; use std::sync::OnceLock; -use std::{env, time::Duration}; - -use base64::prelude::BASE64_STANDARD; -use base64::Engine; -use bytes::Bytes; -use tracing::warn; - -use numaflow_models::models::{Backoff, MonoVertex, RetryStrategy}; +use crate::config::pipeline::PipelineConfig; use crate::Error; +use crate::Result; +use monovertex::MonovertexConfig; -// TODO move constants to a separate module, separate consts for different components -const DEFAULT_SOURCE_SOCKET: &str = "/var/run/numaflow/source.sock"; -const DEFAULT_SOURCE_SERVER_INFO_FILE: &str = "/var/run/numaflow/sourcer-server-info"; -const DEFAULT_SINK_SOCKET: &str = "/var/run/numaflow/sink.sock"; -const DEFAULT_FB_SINK_SOCKET: &str = "/var/run/numaflow/fb-sink.sock"; - -const DEFAULT_SINK_SERVER_INFO_FILE: &str = "/var/run/numaflow/sinker-server-info"; -const DEFAULT_FB_SINK_SERVER_INFO_FILE: &str = "/var/run/numaflow/fb-sinker-server-info"; -const DEFAULT_TRANSFORMER_SOCKET: &str = "/var/run/numaflow/sourcetransform.sock"; -const DEFAULT_TRANSFORMER_SERVER_INFO_FILE: &str = - "/var/run/numaflow/sourcetransformer-server-info"; const ENV_MONO_VERTEX_OBJ: &str = "NUMAFLOW_MONO_VERTEX_OBJECT"; -const ENV_POD_REPLICA: &str = "NUMAFLOW_REPLICA"; -const DEFAULT_GRPC_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; // 64 MB -const DEFAULT_METRICS_PORT: u16 = 2469; -const DEFAULT_LAG_CHECK_INTERVAL_IN_SECS: u16 = 5; -const DEFAULT_LAG_REFRESH_INTERVAL_IN_SECS: u16 = 3; -const DEFAULT_BATCH_SIZE: u64 = 500; -const DEFAULT_TIMEOUT_IN_MS: u32 = 1000; -const DEFAULT_MAX_SINK_RETRY_ATTEMPTS: u16 = u16::MAX; -const DEFAULT_SINK_RETRY_INTERVAL_IN_MS: u32 = 1; -const DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY: OnFailureStrategy = OnFailureStrategy::Retry; - -/// Jetstream ISB related configurations. -pub mod jetstream { - use std::fmt; - use std::time::Duration; - - // jetstream related constants - const DEFAULT_PARTITION_IDX: u16 = 0; - const DEFAULT_MAX_LENGTH: usize = 30000; - const DEFAULT_USAGE_LIMIT: f64 = 0.8; - const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 1; - const DEFAULT_BUFFER_FULL_STRATEGY: BufferFullStrategy = BufferFullStrategy::RetryUntilSuccess; - const DEFAULT_RETRY_INTERVAL_MILLIS: u64 = 10; - - #[derive(Debug, Clone)] - pub(crate) struct StreamWriterConfig { - pub name: String, - pub partition_idx: u16, - pub max_length: usize, - pub refresh_interval: Duration, - pub usage_limit: f64, - pub buffer_full_strategy: BufferFullStrategy, - pub retry_interval: Duration, - } - - impl Default for StreamWriterConfig { - fn default() -> Self { - StreamWriterConfig { - name: "default".to_string(), - partition_idx: DEFAULT_PARTITION_IDX, - max_length: DEFAULT_MAX_LENGTH, - usage_limit: DEFAULT_USAGE_LIMIT, - refresh_interval: Duration::from_secs(DEFAULT_REFRESH_INTERVAL_SECS), - buffer_full_strategy: DEFAULT_BUFFER_FULL_STRATEGY, - retry_interval: Duration::from_millis(DEFAULT_RETRY_INTERVAL_MILLIS), - } - } - } - - #[derive(Debug, Clone, Eq, PartialEq)] - pub(crate) enum BufferFullStrategy { - RetryUntilSuccess, - DiscardLatest, - } - - impl fmt::Display for BufferFullStrategy { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - BufferFullStrategy::RetryUntilSuccess => write!(f, "retryUntilSuccess"), - BufferFullStrategy::DiscardLatest => write!(f, "discardLatest"), - } - } - } -} - -#[derive(Debug, PartialEq, Clone)] -pub enum OnFailureStrategy { - Retry, - Fallback, - Drop, -} +const ENV_VERTEX_OBJ: &str = "NUMAFLOW_VERTEX_OBJECT"; -impl OnFailureStrategy { - /// Converts a string slice to an `OnFailureStrategy` enum variant. - /// Case insensitivity is considered to enhance usability. - /// - /// # Arguments - /// * `s` - A string slice representing the retry strategy. - /// - /// # Returns - /// An option containing the corresponding enum variant if successful, - /// or DefaultStrategy if the input does not match known variants. - fn from_str(s: &str) -> Option { - match s.to_lowercase().as_str() { - "retry" => Some(OnFailureStrategy::Retry), - "fallback" => Some(OnFailureStrategy::Fallback), - "drop" => Some(OnFailureStrategy::Drop), - _ => Some(DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY), - } - } -} - -impl Display for OnFailureStrategy { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match *self { - OnFailureStrategy::Retry => write!(f, "retry"), - OnFailureStrategy::Fallback => write!(f, "fallback"), - OnFailureStrategy::Drop => write!(f, "drop"), - } - } -} +/// Building blocks (Source, Sink, Transformer, FallBack, Metrics, etc.) to build a Pipeline or a +/// MonoVertex. +pub(crate) mod components; +/// MonoVertex specific configs. +pub(crate) mod monovertex; +/// Pipeline specific configs. +pub(crate) mod pipeline; +/// Exposes the [Settings] via lazy loading. pub fn config() -> &'static Settings { static CONF: OnceLock = OnceLock::new(); CONF.get_or_init(|| match Settings::load() { @@ -135,376 +28,48 @@ pub fn config() -> &'static Settings { }) } -pub struct Settings { - pub mono_vertex_name: String, - pub replica: u32, - pub batch_size: u64, - pub timeout_in_ms: u32, - pub metrics_server_listen_port: u16, - pub lag_check_interval_in_secs: u16, - pub lag_refresh_interval_in_secs: u16, - pub sink_max_retry_attempts: u16, - pub sink_retry_interval_in_ms: u32, - pub sink_retry_on_fail_strategy: OnFailureStrategy, - pub sink_default_retry_strategy: RetryStrategy, - pub transformer_config: Option, - pub udsource_config: Option, - pub udsink_config: Option, - pub logsink_config: Option, - pub blackhole_config: Option, - pub fallback_config: Option, - pub generator_config: Option, -} - #[derive(Debug, Clone)] -pub struct TransformerConfig { - pub grpc_max_message_size: usize, - pub socket_path: String, - pub server_info_path: String, -} - -impl Default for TransformerConfig { - fn default() -> Self { - Self { - grpc_max_message_size: DEFAULT_GRPC_MAX_MESSAGE_SIZE, - socket_path: DEFAULT_TRANSFORMER_SOCKET.to_string(), - server_info_path: DEFAULT_TRANSFORMER_SERVER_INFO_FILE.to_string(), - } - } +pub(crate) enum CustomResourceType { + MonoVertex(MonovertexConfig), + Pipeline(PipelineConfig), } +/// The CRD and other necessary setting to get the Numaflow pipeline/monovertex running. #[derive(Debug, Clone)] -pub struct UDSourceConfig { - pub grpc_max_message_size: usize, - pub socket_path: String, - pub server_info_path: String, -} - -impl Default for UDSourceConfig { - fn default() -> Self { - Self { - grpc_max_message_size: DEFAULT_GRPC_MAX_MESSAGE_SIZE, - socket_path: DEFAULT_SOURCE_SOCKET.to_string(), - server_info_path: DEFAULT_SOURCE_SERVER_INFO_FILE.to_string(), - } - } -} - -#[derive(Debug, Clone, Default)] -pub struct LogSinkConfig; - -#[derive(Debug, Clone)] -pub struct UDSinkConfig { - pub grpc_max_message_size: usize, - pub socket_path: String, - pub server_info_path: String, -} - -impl Default for UDSinkConfig { - fn default() -> Self { - Self { - grpc_max_message_size: DEFAULT_GRPC_MAX_MESSAGE_SIZE, - socket_path: DEFAULT_SINK_SOCKET.to_string(), - server_info_path: DEFAULT_SINK_SERVER_INFO_FILE.to_string(), - } - } -} - -impl UDSinkConfig { - fn fallback_default() -> Self { - Self { - grpc_max_message_size: DEFAULT_GRPC_MAX_MESSAGE_SIZE, - socket_path: DEFAULT_FB_SINK_SOCKET.to_string(), - server_info_path: DEFAULT_FB_SINK_SERVER_INFO_FILE.to_string(), - } - } -} - -#[derive(Debug, Clone)] -pub struct GeneratorConfig { - pub rpu: usize, - pub content: Bytes, - pub duration: usize, - pub value: Option, - pub key_count: u8, - pub msg_size_bytes: u32, - pub jitter: Duration, -} - -impl Default for GeneratorConfig { - fn default() -> Self { - Self { - rpu: 1, - content: Bytes::new(), - duration: 1000, - value: None, - key_count: 0, - msg_size_bytes: 8, - jitter: Duration::from_secs(0), - } - } -} - -/// Configuration for the [BlackholeSink](crate::sink::blackhole::BlackholeSink) -#[derive(Default, Debug, Clone)] -pub struct BlackholeConfig {} - -impl Default for Settings { - fn default() -> Self { - // Create a default retry strategy from defined constants - let default_retry_strategy = RetryStrategy { - backoff: Option::from(Box::from(Backoff { - interval: Option::from(kube::core::Duration::from( - std::time::Duration::from_millis(DEFAULT_SINK_RETRY_INTERVAL_IN_MS as u64), - )), - steps: Option::from(DEFAULT_MAX_SINK_RETRY_ATTEMPTS as i64), - })), - on_failure: Option::from(DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY.to_string()), - }; - Self { - mono_vertex_name: "default".to_string(), - replica: 0, - batch_size: DEFAULT_BATCH_SIZE, - timeout_in_ms: DEFAULT_TIMEOUT_IN_MS, - metrics_server_listen_port: DEFAULT_METRICS_PORT, - lag_check_interval_in_secs: DEFAULT_LAG_CHECK_INTERVAL_IN_SECS, - lag_refresh_interval_in_secs: DEFAULT_LAG_REFRESH_INTERVAL_IN_SECS, - sink_max_retry_attempts: DEFAULT_MAX_SINK_RETRY_ATTEMPTS, - sink_retry_interval_in_ms: DEFAULT_SINK_RETRY_INTERVAL_IN_MS, - sink_retry_on_fail_strategy: DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY, - sink_default_retry_strategy: default_retry_strategy, - transformer_config: None, - udsource_config: None, - udsink_config: Default::default(), - logsink_config: None, - blackhole_config: None, - fallback_config: None, - generator_config: None, - } - } +pub(crate) struct Settings { + pub(crate) custom_resource_type: CustomResourceType, } impl Settings { - fn load() -> Result { - let mut settings = Settings::default(); - if let Ok(mono_vertex_spec) = env::var(ENV_MONO_VERTEX_OBJ) { - // decode the spec it will be base64 encoded - let mono_vertex_spec = BASE64_STANDARD - .decode(mono_vertex_spec.as_bytes()) - .map_err(|e| { - Error::Config(format!("Failed to decode mono vertex spec: {:?}", e)) - })?; - - let mono_vertex_obj: MonoVertex = serde_json::from_slice(&mono_vertex_spec) - .map_err(|e| Error::Config(format!("Failed to parse mono vertex spec: {:?}", e)))?; - - settings.batch_size = mono_vertex_obj - .spec - .limits - .clone() - .unwrap() - .read_batch_size - .map(|x| x as u64) - .unwrap_or(DEFAULT_BATCH_SIZE); - - settings.timeout_in_ms = mono_vertex_obj - .spec - .limits - .clone() - .unwrap() - .read_timeout - .map(|x| std::time::Duration::from(x).as_millis() as u32) - .unwrap_or(DEFAULT_TIMEOUT_IN_MS); - - settings.mono_vertex_name = mono_vertex_obj - .metadata - .and_then(|metadata| metadata.name) - .ok_or_else(|| Error::Config("Mono vertex name not found".to_string()))?; - - settings.transformer_config = match mono_vertex_obj - .spec - .source - .as_deref() - .ok_or(Error::Config("Source not found".to_string()))? - .transformer - { - Some(_) => Some(TransformerConfig::default()), - _ => None, - }; - - settings.udsource_config = match mono_vertex_obj - .spec - .source - .as_deref() - .ok_or(Error::Config("Source not found".to_string()))? - .udsource - { - Some(_) => Some(UDSourceConfig::default()), - _ => None, - }; - - settings.udsink_config = match mono_vertex_obj - .spec - .sink - .as_deref() - .ok_or(Error::Config("Sink not found".to_string()))? - .udsink - { - Some(_) => Some(UDSinkConfig::default()), - _ => None, - }; - - settings.fallback_config = match mono_vertex_obj - .spec - .sink - .as_deref() - .ok_or(Error::Config("Sink not found".to_string()))? - .fallback - { - Some(_) => Some(UDSinkConfig::fallback_default()), - _ => None, - }; - - settings.blackhole_config = mono_vertex_obj - .spec - .sink - .as_deref() - .ok_or(Error::Config("Sink not found".to_string()))? - .blackhole - .as_deref() - .map(|_| BlackholeConfig::default()); - - settings.generator_config = match mono_vertex_obj - .spec - .source - .as_deref() - .ok_or(Error::Config("Source not found".to_string()))? - .generator - .as_deref() - { - Some(generator_source) => { - let mut config = GeneratorConfig::default(); - - if let Some(value_blob) = &generator_source.value_blob { - config.content = Bytes::from(value_blob.clone()); - } - match &generator_source.value_blob { - Some(value) => { - config.content = Bytes::from(value.clone()); - } - None => { - if let Some(msg_size) = generator_source.msg_size { - if msg_size < 0 { - warn!("'msgSize' can not be negative, using default value (8 bytes)"); - } else { - config.msg_size_bytes = msg_size as u32; - } - } - - config.value = generator_source.value; - } - } - - if let Some(rpu) = generator_source.rpu { - config.rpu = rpu as usize; - } - - if let Some(d) = generator_source.duration { - config.duration = std::time::Duration::from(d).as_millis() as usize; - } - - if let Some(key_count) = generator_source.key_count { - if key_count > u8::MAX as i32 { - warn!( - "Capping the key count to {}, provided value is {key_count}", - u8::MAX - ); - } - config.key_count = std::cmp::min(key_count, u8::MAX as i32) as u8; - } - - if let Some(jitter) = generator_source.jitter { - config.jitter = std::time::Duration::from(jitter); - } - - Some(config) - } - None => None, - }; - - settings.logsink_config = mono_vertex_obj - .spec - .sink - .as_deref() - .ok_or(Error::Config("Sink not found".to_string()))? - .log - .as_deref() - .map(|_| LogSinkConfig); - - if let Some(retry_strategy) = mono_vertex_obj - .spec - .sink - .expect("sink should not be empty") - .retry_strategy - { - if let Some(sink_backoff) = retry_strategy.clone().backoff { - // Set the max retry attempts and retry interval using direct reference - settings.sink_retry_interval_in_ms = sink_backoff - .clone() - .interval - .map(|x| std::time::Duration::from(x).as_millis() as u32) - .unwrap_or(DEFAULT_SINK_RETRY_INTERVAL_IN_MS); - - settings.sink_max_retry_attempts = sink_backoff - .clone() - .steps - .map(|x| x as u16) - .unwrap_or(DEFAULT_MAX_SINK_RETRY_ATTEMPTS); - - // We do not allow 0 attempts to write to sink - if settings.sink_max_retry_attempts == 0 { - return Err(Error::Config( - "Retry Strategy given with 0 retry attempts".to_string(), - )); - } - } - - // Set the retry strategy from the spec or use the default - settings.sink_retry_on_fail_strategy = retry_strategy - .on_failure - .clone() - .and_then(|s| OnFailureStrategy::from_str(&s)) - .unwrap_or(DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY); - - // check if the sink retry strategy is set to fallback and there is no fallback sink configured - // then we should return an error - if settings.sink_retry_on_fail_strategy == OnFailureStrategy::Fallback - && settings.fallback_config.is_none() - { - return Err(Error::Config( - "Retry Strategy given as fallback but Fallback sink not configured" - .to_string(), - )); - } - } + /// load based on the CRD type, either a pipeline or a monovertex. + /// Settings are populated through reading the env vars set via the controller. The main + /// CRD is the base64 spec of the CR. + fn load() -> Result { + if let Ok(obj) = env::var(ENV_MONO_VERTEX_OBJ) { + let cfg = MonovertexConfig::load(obj)?; + return Ok(Settings { + custom_resource_type: CustomResourceType::MonoVertex(cfg), + }); } - settings.replica = env::var(ENV_POD_REPLICA) - .unwrap_or_else(|_| "0".to_string()) - .parse() - .map_err(|e| Error::Config(format!("Failed to parse pod replica: {:?}", e)))?; - - Ok(settings) + if let Ok(obj) = env::var(ENV_VERTEX_OBJ) { + let cfg = PipelineConfig::load(obj)?; + return Ok(Settings { + custom_resource_type: CustomResourceType::Pipeline(cfg), + }); + } + Err(Error::Config("No configuration found".to_string())) } } #[cfg(test)] mod tests { - use std::env; - + use crate::config::components::sink::OnFailureStrategy; + use crate::config::{CustomResourceType, Settings, ENV_MONO_VERTEX_OBJ}; + use base64::prelude::BASE64_STANDARD; + use base64::Engine; use serde_json::json; - - use super::*; + use std::env; #[test] fn test_settings_load_combined() { @@ -559,7 +124,10 @@ mod tests { // Execute and verify let settings = Settings::load().unwrap(); - assert_eq!(settings.mono_vertex_name, "simple-mono-vertex"); + assert!(matches!( + settings.custom_resource_type, + CustomResourceType::MonoVertex(_) + )); env::remove_var(ENV_MONO_VERTEX_OBJ); } @@ -607,12 +175,28 @@ mod tests { // Execute and verify let settings = Settings::load().unwrap(); + let mvtx_cfg = match settings.custom_resource_type { + CustomResourceType::MonoVertex(cfg) => cfg, + _ => panic!("Invalid configuration type"), + }; + + assert_eq!( + mvtx_cfg + .sink_config + .retry_config + .clone() + .unwrap() + .sink_max_retry_attempts, + 5 + ); assert_eq!( - settings.sink_retry_on_fail_strategy, - DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY + mvtx_cfg + .sink_config + .retry_config + .unwrap() + .sink_retry_interval_in_ms, + 1000 ); - assert_eq!(settings.sink_max_retry_attempts, 5); - assert_eq!(settings.sink_retry_interval_in_ms, 1000); env::remove_var(ENV_MONO_VERTEX_OBJ); } @@ -661,15 +245,40 @@ mod tests { // Execute and verify let settings = Settings::load().unwrap(); + let mvtx_cfg = match settings.custom_resource_type { + CustomResourceType::MonoVertex(cfg) => cfg, + _ => panic!("Invalid configuration type"), + }; + assert_eq!( - settings.sink_retry_on_fail_strategy, + mvtx_cfg + .sink_config + .retry_config + .clone() + .unwrap() + .sink_retry_on_fail_strategy, OnFailureStrategy::Drop ); - assert_eq!(settings.sink_max_retry_attempts, 5); - assert_eq!(settings.sink_retry_interval_in_ms, 1000); + assert_eq!( + mvtx_cfg + .sink_config + .retry_config + .clone() + .unwrap() + .sink_max_retry_attempts, + 5 + ); + assert_eq!( + mvtx_cfg + .sink_config + .retry_config + .clone() + .unwrap() + .sink_retry_interval_in_ms, + 1000 + ); env::remove_var(ENV_MONO_VERTEX_OBJ); } - { // Test Invalid on failure strategy to use default let json_data = json!({ @@ -715,159 +324,39 @@ mod tests { // Execute and verify let settings = Settings::load().unwrap(); + let mvtx_config = match settings.custom_resource_type { + CustomResourceType::MonoVertex(cfg) => cfg, + _ => panic!("Invalid configuration type"), + }; + assert_eq!( - settings.sink_retry_on_fail_strategy, - DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY + mvtx_config + .sink_config + .retry_config + .clone() + .unwrap() + .sink_retry_on_fail_strategy, + OnFailureStrategy::Retry + ); + assert_eq!( + mvtx_config + .sink_config + .retry_config + .clone() + .unwrap() + .sink_max_retry_attempts, + 5 + ); + assert_eq!( + mvtx_config + .sink_config + .retry_config + .clone() + .unwrap() + .sink_retry_interval_in_ms, + 1000 ); - assert_eq!(settings.sink_max_retry_attempts, 5); - assert_eq!(settings.sink_retry_interval_in_ms, 1000); - env::remove_var(ENV_MONO_VERTEX_OBJ); - } - - { - // Test Error Case: Retry Strategy Fallback without Fallback Sink - let json_data = json!({ - "metadata": { - "name": "simple-mono-vertex", - "namespace": "default", - "creationTimestamp": null - }, - "spec": { - "replicas": 0, - "source": { - "udsource": { - "container": { - "image": "xxxxxxx", - "resources": {} - } - } - }, - "sink": { - "udsink": { - "container": { - "image": "xxxxxx", - "resources": {} - } - }, - "retryStrategy": { - "backoff": { - "interval": "1s", - "steps": 5 - }, - "onFailure": "fallback" - }, - }, - "limits": { - "readBatchSize": 500, - "readTimeout": "1s" - }, - } - }); - let json_str = json_data.to_string(); - let encoded_json = BASE64_STANDARD.encode(json_str); - env::set_var(ENV_MONO_VERTEX_OBJ, encoded_json); - - // Execute and verify - assert!(Settings::load().is_err()); - env::remove_var(ENV_MONO_VERTEX_OBJ); - } - - { - // Test Error Case: Retry Strategy with 0 Retry Attempts - let json_data = json!({ - "metadata": { - "name": "simple-mono-vertex", - "namespace": "default", - "creationTimestamp": null - }, - "spec": { - "replicas": 0, - "source": { - "udsource": { - "container": { - "image": "xxxxxxx", - "resources": {} - } - } - }, - "sink": { - "udsink": { - "container": { - "image": "xxxxxx", - "resources": {} - } - }, - "retryStrategy": { - "backoff": { - "interval": "1s", - "steps": 0 - }, - "onFailure": "retry" - }, - }, - "limits": { - "readBatchSize": 500, - "readTimeout": "1s" - }, - } - }); - let json_str = json_data.to_string(); - let encoded_json = BASE64_STANDARD.encode(json_str); - env::set_var(ENV_MONO_VERTEX_OBJ, encoded_json); - - // Execute and verify - assert!(Settings::load().is_err()); env::remove_var(ENV_MONO_VERTEX_OBJ); } } - - #[test] - fn test_on_failure_enum_from_str_valid_inputs() { - assert_eq!( - OnFailureStrategy::from_str("retry"), - Some(OnFailureStrategy::Retry) - ); - assert_eq!( - OnFailureStrategy::from_str("fallback"), - Some(OnFailureStrategy::Fallback) - ); - assert_eq!( - OnFailureStrategy::from_str("drop"), - Some(OnFailureStrategy::Drop) - ); - - // Testing case insensitivity - assert_eq!( - OnFailureStrategy::from_str("ReTry"), - Some(OnFailureStrategy::Retry) - ); - assert_eq!( - OnFailureStrategy::from_str("FALLBACK"), - Some(OnFailureStrategy::Fallback) - ); - assert_eq!( - OnFailureStrategy::from_str("Drop"), - Some(OnFailureStrategy::Drop) - ); - } - - #[test] - fn test_on_failure_enum_from_str_invalid_input() { - assert_eq!( - OnFailureStrategy::from_str("unknown"), - Some(DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY) - ); // should return None for undefined inputs - } - - #[test] - fn test_on_failure_enum_to_string() { - let retry = OnFailureStrategy::Retry; - assert_eq!(retry.to_string(), "retry"); - - let fallback = OnFailureStrategy::Fallback; - assert_eq!(fallback.to_string(), "fallback"); - - let drop = OnFailureStrategy::Drop; - assert_eq!(drop.to_string(), "drop"); - } } diff --git a/rust/numaflow-core/src/config/components.rs b/rust/numaflow-core/src/config/components.rs new file mode 100644 index 000000000..87ca81adc --- /dev/null +++ b/rust/numaflow-core/src/config/components.rs @@ -0,0 +1,452 @@ +pub(crate) mod source { + const DEFAULT_GRPC_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; // 64 MB + const DEFAULT_SOURCE_SOCKET: &str = "/var/run/numaflow/source.sock"; + const DEFAULT_SOURCE_SERVER_INFO_FILE: &str = "/var/run/numaflow/sourcer-server-info"; + + use bytes::Bytes; + use std::time::Duration; + + #[derive(Debug, Clone, PartialEq)] + pub(crate) struct SourceConfig { + pub(crate) source_type: SourceType, + } + + #[derive(Debug, Clone, PartialEq)] + pub(crate) enum SourceType { + Generator(GeneratorConfig), + UserDefined(UserDefinedConfig), + } + + #[derive(Debug, Clone, PartialEq)] + pub(crate) struct GeneratorConfig { + pub rpu: usize, + pub content: Bytes, + pub duration: usize, + pub value: Option, + pub key_count: u8, + pub msg_size_bytes: u32, + pub jitter: Duration, + } + + impl Default for GeneratorConfig { + fn default() -> Self { + Self { + rpu: 1, + content: Bytes::new(), + duration: 1000, + value: None, + key_count: 0, + msg_size_bytes: 8, + jitter: Duration::from_secs(0), + } + } + } + + #[derive(Debug, Clone, PartialEq)] + pub(crate) struct UserDefinedConfig { + pub grpc_max_message_size: usize, + pub socket_path: String, + pub server_info_path: String, + } + + impl Default for UserDefinedConfig { + fn default() -> Self { + Self { + grpc_max_message_size: DEFAULT_GRPC_MAX_MESSAGE_SIZE, + socket_path: DEFAULT_SOURCE_SOCKET.to_string(), + server_info_path: DEFAULT_SOURCE_SERVER_INFO_FILE.to_string(), + } + } + } +} + +pub(crate) mod sink { + const DEFAULT_GRPC_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; // 64 MB + const DEFAULT_SINK_SOCKET: &str = "/var/run/numaflow/sink.sock"; + const DEFAULT_SINK_SERVER_INFO_FILE: &str = "/var/run/numaflow/sinker-server-info"; + const DEFAULT_FB_SINK_SOCKET: &str = "/var/run/numaflow/fb-sink.sock"; + const DEFAULT_FB_SINK_SERVER_INFO_FILE: &str = "/var/run/numaflow/fb-sinker-server-info"; + const DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY: OnFailureStrategy = OnFailureStrategy::Retry; + const DEFAULT_MAX_SINK_RETRY_ATTEMPTS: u16 = u16::MAX; + const DEFAULT_SINK_RETRY_INTERVAL_IN_MS: u32 = 1; + + use crate::error::Error; + use crate::Result; + use numaflow_models::models::{Backoff, RetryStrategy}; + use std::fmt::Display; + + #[derive(Debug, Clone, PartialEq)] + pub(crate) struct SinkConfig { + pub(crate) sink_type: SinkType, + pub(crate) retry_config: Option, + } + + #[derive(Debug, Clone, PartialEq)] + pub(crate) enum SinkType { + Log(LogConfig), + Blackhole(BlackholeConfig), + UserDefined(UserDefinedConfig), + } + + #[derive(Debug, Clone, PartialEq, Default)] + pub(crate) struct LogConfig {} + + #[derive(Debug, Clone, PartialEq, Default)] + pub(crate) struct BlackholeConfig {} + + #[derive(Debug, Clone, PartialEq)] + pub(crate) struct UserDefinedConfig { + pub grpc_max_message_size: usize, + pub socket_path: String, + pub server_info_path: String, + } + + #[derive(Debug, Clone, PartialEq)] + pub(crate) enum OnFailureStrategy { + Retry, + Fallback, + Drop, + } + + impl OnFailureStrategy { + /// Converts a string slice to an `OnFailureStrategy` enum variant. + /// Case insensitivity is considered to enhance usability. + /// + /// # Arguments + /// * `s` - A string slice representing the retry strategy. + /// + /// # Returns + /// An option containing the corresponding enum variant if successful, + /// or DefaultStrategy if the input does not match known variants. + pub(crate) fn from_str(s: &str) -> Self { + match s.to_lowercase().as_str() { + "retry" => OnFailureStrategy::Retry, + "fallback" => OnFailureStrategy::Fallback, + "drop" => OnFailureStrategy::Drop, + _ => DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY, + } + } + } + + impl Display for OnFailureStrategy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match *self { + OnFailureStrategy::Retry => write!(f, "retry"), + OnFailureStrategy::Fallback => write!(f, "fallback"), + OnFailureStrategy::Drop => write!(f, "drop"), + } + } + } + + #[derive(Debug, Clone, PartialEq)] + pub(crate) struct RetryConfig { + pub sink_max_retry_attempts: u16, + pub sink_retry_interval_in_ms: u32, + pub sink_retry_on_fail_strategy: OnFailureStrategy, + pub sink_default_retry_strategy: RetryStrategy, + } + + impl Default for RetryConfig { + fn default() -> Self { + let default_retry_strategy = RetryStrategy { + backoff: Option::from(Box::from(Backoff { + interval: Option::from(kube::core::Duration::from( + std::time::Duration::from_millis(DEFAULT_SINK_RETRY_INTERVAL_IN_MS as u64), + )), + steps: Option::from(DEFAULT_MAX_SINK_RETRY_ATTEMPTS as i64), + })), + on_failure: Option::from(DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY.to_string()), + }; + Self { + sink_max_retry_attempts: DEFAULT_MAX_SINK_RETRY_ATTEMPTS, + sink_retry_interval_in_ms: DEFAULT_SINK_RETRY_INTERVAL_IN_MS, + sink_retry_on_fail_strategy: DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY, + sink_default_retry_strategy: default_retry_strategy, + } + } + } + + impl Default for UserDefinedConfig { + fn default() -> Self { + Self { + grpc_max_message_size: DEFAULT_GRPC_MAX_MESSAGE_SIZE, + socket_path: DEFAULT_SINK_SOCKET.to_string(), + server_info_path: DEFAULT_SINK_SERVER_INFO_FILE.to_string(), + } + } + } + + impl UserDefinedConfig { + pub(crate) fn fallback_default() -> Self { + Self { + grpc_max_message_size: DEFAULT_GRPC_MAX_MESSAGE_SIZE, + socket_path: DEFAULT_FB_SINK_SOCKET.to_string(), + server_info_path: DEFAULT_FB_SINK_SERVER_INFO_FILE.to_string(), + } + } + } +} + +pub(crate) mod transformer { + const DEFAULT_GRPC_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; // 64 MB + const DEFAULT_TRANSFORMER_SOCKET: &str = "/var/run/numaflow/sourcetransform.sock"; + const DEFAULT_TRANSFORMER_SERVER_INFO_FILE: &str = + "/var/run/numaflow/sourcetransformer-server-info"; + + #[derive(Debug, Clone, PartialEq)] + pub(crate) struct TransformerConfig { + pub(crate) transformer_type: TransformerType, + } + + #[derive(Debug, Clone, PartialEq)] + pub(crate) enum TransformerType { + Noop(NoopConfig), // will add built-in transformers + UserDefined(UserDefinedConfig), + } + + #[derive(Debug, Clone, PartialEq)] + pub(crate) struct NoopConfig {} + + #[derive(Debug, Clone, PartialEq)] + pub(crate) struct UserDefinedConfig { + pub grpc_max_message_size: usize, + pub socket_path: String, + pub server_info_path: String, + } + + impl Default for UserDefinedConfig { + fn default() -> Self { + Self { + grpc_max_message_size: DEFAULT_GRPC_MAX_MESSAGE_SIZE, + socket_path: DEFAULT_TRANSFORMER_SOCKET.to_string(), + server_info_path: DEFAULT_TRANSFORMER_SERVER_INFO_FILE.to_string(), + } + } + } +} + +pub(crate) mod metrics { + const DEFAULT_METRICS_PORT: u16 = 2469; + const DEFAULT_LAG_CHECK_INTERVAL_IN_SECS: u16 = 5; + const DEFAULT_LAG_REFRESH_INTERVAL_IN_SECS: u16 = 3; + + #[derive(Debug, Clone, PartialEq)] + pub(crate) struct MetricsConfig { + pub metrics_server_listen_port: u16, + pub lag_check_interval_in_secs: u16, + pub lag_refresh_interval_in_secs: u16, + } + + impl Default for MetricsConfig { + fn default() -> Self { + Self { + metrics_server_listen_port: DEFAULT_METRICS_PORT, + lag_check_interval_in_secs: DEFAULT_LAG_CHECK_INTERVAL_IN_SECS, + lag_refresh_interval_in_secs: DEFAULT_LAG_REFRESH_INTERVAL_IN_SECS, + } + } + } +} + +#[cfg(test)] +mod source_tests { + use super::source::{GeneratorConfig, SourceConfig, SourceType, UserDefinedConfig}; + use bytes::Bytes; + use std::time::Duration; + + #[test] + fn test_default_generator_config() { + let default_config = GeneratorConfig::default(); + assert_eq!(default_config.rpu, 1); + assert_eq!(default_config.content, Bytes::new()); + assert_eq!(default_config.duration, 1000); + assert_eq!(default_config.value, None); + assert_eq!(default_config.key_count, 0); + assert_eq!(default_config.msg_size_bytes, 8); + assert_eq!(default_config.jitter, Duration::from_secs(0)); + } + + #[test] + fn test_default_user_defined_config() { + let default_config = UserDefinedConfig::default(); + assert_eq!(default_config.grpc_max_message_size, 64 * 1024 * 1024); + assert_eq!(default_config.socket_path, "/var/run/numaflow/source.sock"); + assert_eq!( + default_config.server_info_path, + "/var/run/numaflow/sourcer-server-info" + ); + } + + #[test] + fn test_source_config_generator() { + let generator_config = GeneratorConfig::default(); + let source_config = SourceConfig { + source_type: SourceType::Generator(generator_config.clone()), + }; + if let SourceType::Generator(config) = source_config.source_type { + assert_eq!(config, generator_config); + } else { + panic!("Expected SourceType::Generator"); + } + } + + #[test] + fn test_source_config_user_defined() { + let user_defined_config = UserDefinedConfig::default(); + let source_config = SourceConfig { + source_type: SourceType::UserDefined(user_defined_config.clone()), + }; + if let SourceType::UserDefined(config) = source_config.source_type { + assert_eq!(config, user_defined_config); + } else { + panic!("Expected SourceType::UserDefined"); + } + } +} + +#[cfg(test)] +mod sink_tests { + use super::sink::{ + BlackholeConfig, LogConfig, OnFailureStrategy, RetryConfig, SinkConfig, SinkType, + UserDefinedConfig, + }; + use numaflow_models::models::{Backoff, RetryStrategy}; + + #[test] + fn test_default_log_config() { + let default_config = LogConfig::default(); + assert_eq!(default_config, LogConfig {}); + } + + #[test] + fn test_default_blackhole_config() { + let default_config = BlackholeConfig::default(); + assert_eq!(default_config, BlackholeConfig {}); + } + + #[test] + fn test_default_user_defined_config() { + let default_config = UserDefinedConfig::default(); + assert_eq!(default_config.grpc_max_message_size, 64 * 1024 * 1024); + assert_eq!(default_config.socket_path, "/var/run/numaflow/sink.sock"); + assert_eq!( + default_config.server_info_path, + "/var/run/numaflow/sinker-server-info" + ); + } + + #[test] + fn test_default_retry_config() { + let default_retry_strategy = RetryStrategy { + backoff: Option::from(Box::from(Backoff { + interval: Option::from(kube::core::Duration::from( + std::time::Duration::from_millis(1u64), + )), + steps: Option::from(u16::MAX as i64), + })), + on_failure: Option::from(OnFailureStrategy::Retry.to_string()), + }; + let default_config = RetryConfig::default(); + assert_eq!(default_config.sink_max_retry_attempts, u16::MAX); + assert_eq!(default_config.sink_retry_interval_in_ms, 1); + assert_eq!( + default_config.sink_retry_on_fail_strategy, + OnFailureStrategy::Retry + ); + assert_eq!( + default_config.sink_default_retry_strategy, + default_retry_strategy + ); + } + + #[test] + fn test_on_failure_strategy_from_str() { + assert_eq!( + OnFailureStrategy::from_str("retry"), + OnFailureStrategy::Retry + ); + assert_eq!( + OnFailureStrategy::from_str("fallback"), + OnFailureStrategy::Fallback + ); + assert_eq!(OnFailureStrategy::from_str("drop"), OnFailureStrategy::Drop); + assert_eq!( + OnFailureStrategy::from_str("unknown"), + OnFailureStrategy::Retry + ); + } + + #[test] + fn test_sink_config_log() { + let log_config = LogConfig::default(); + let sink_config = SinkConfig { + sink_type: SinkType::Log(log_config.clone()), + retry_config: None, + }; + if let SinkType::Log(config) = sink_config.sink_type { + assert_eq!(config, log_config); + } else { + panic!("Expected SinkType::Log"); + } + } + + #[test] + fn test_sink_config_blackhole() { + let blackhole_config = BlackholeConfig::default(); + let sink_config = SinkConfig { + sink_type: SinkType::Blackhole(blackhole_config.clone()), + retry_config: None, + }; + if let SinkType::Blackhole(config) = sink_config.sink_type { + assert_eq!(config, blackhole_config); + } else { + panic!("Expected SinkType::Blackhole"); + } + } + + #[test] + fn test_sink_config_user_defined() { + let user_defined_config = UserDefinedConfig::default(); + let sink_config = SinkConfig { + sink_type: SinkType::UserDefined(user_defined_config.clone()), + retry_config: None, + }; + if let SinkType::UserDefined(config) = sink_config.sink_type { + assert_eq!(config, user_defined_config); + } else { + panic!("Expected SinkType::UserDefined"); + } + } +} + +#[cfg(test)] +mod transformer_tests { + use super::transformer::{TransformerConfig, TransformerType, UserDefinedConfig}; + + #[test] + fn test_default_user_defined_config() { + let default_config = UserDefinedConfig::default(); + assert_eq!(default_config.grpc_max_message_size, 64 * 1024 * 1024); + assert_eq!( + default_config.socket_path, + "/var/run/numaflow/sourcetransform.sock" + ); + assert_eq!( + default_config.server_info_path, + "/var/run/numaflow/sourcetransformer-server-info" + ); + } + + #[test] + fn test_transformer_config_user_defined() { + let user_defined_config = UserDefinedConfig::default(); + let transformer_config = TransformerConfig { + transformer_type: TransformerType::UserDefined(user_defined_config.clone()), + }; + if let TransformerType::UserDefined(config) = transformer_config.transformer_type { + assert_eq!(config, user_defined_config); + } else { + panic!("Expected TransformerType::UserDefined"); + } + } +} diff --git a/rust/numaflow-core/src/config/monovertex.rs b/rust/numaflow-core/src/config/monovertex.rs new file mode 100644 index 000000000..a9d568553 --- /dev/null +++ b/rust/numaflow-core/src/config/monovertex.rs @@ -0,0 +1,376 @@ +use crate::config::components::metrics::MetricsConfig; +use crate::config::components::sink::{OnFailureStrategy, RetryConfig, SinkConfig}; +use crate::config::components::source::{GeneratorConfig, SourceConfig}; +use crate::config::components::transformer::{ + TransformerConfig, TransformerType, UserDefinedConfig, +}; +use crate::config::components::{sink, source}; +use crate::error::Error; +use crate::message::get_vertex_replica; +use crate::Result; +use base64::prelude::BASE64_STANDARD; +use base64::Engine; +use bytes::Bytes; +use numaflow_models::models::MonoVertex; +use serde_json::from_slice; +use std::time::Duration; +use tracing::warn; + +const DEFAULT_BATCH_SIZE: u64 = 500; +const DEFAULT_TIMEOUT_IN_MS: u32 = 1000; + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct MonovertexConfig { + pub(crate) name: String, + pub(crate) batch_size: usize, + pub(crate) timeout_in_ms: u64, + pub(crate) replica: u16, + pub(crate) source_config: SourceConfig, + pub(crate) sink_config: SinkConfig, + pub(crate) transformer_config: Option, + pub(crate) fb_sink_config: Option, + pub(crate) metrics_config: MetricsConfig, +} + +impl Default for MonovertexConfig { + fn default() -> Self { + MonovertexConfig { + name: "".to_string(), + batch_size: DEFAULT_BATCH_SIZE as usize, + timeout_in_ms: DEFAULT_TIMEOUT_IN_MS as u64, + replica: 0, + source_config: SourceConfig { + source_type: source::SourceType::Generator(GeneratorConfig::default()), + }, + sink_config: SinkConfig { + sink_type: sink::SinkType::Log(sink::LogConfig::default()), + retry_config: None, + }, + transformer_config: None, + fb_sink_config: None, + metrics_config: MetricsConfig::default(), + } + } +} + +impl MonovertexConfig { + /// Load the MonoVertex Settings. + pub(crate) fn load(mono_vertex_spec: String) -> Result { + // controller sets this env var. + let decoded_spec = BASE64_STANDARD + .decode(mono_vertex_spec.as_bytes()) + .map_err(|e| Error::Config(format!("Failed to decode mono vertex spec: {:?}", e)))?; + + let mono_vertex_obj: MonoVertex = from_slice(&decoded_spec) + .map_err(|e| Error::Config(format!("Failed to parse mono vertex spec: {:?}", e)))?; + + let batch_size = mono_vertex_obj + .spec + .limits + .as_ref() + .and_then(|limits| limits.read_batch_size.map(|x| x as u64)) + .unwrap_or(DEFAULT_BATCH_SIZE); + + let timeout_in_ms = mono_vertex_obj + .spec + .limits + .as_ref() + .and_then(|limits| { + limits + .read_timeout + .map(|x| Duration::from(x).as_millis() as u32) + }) + .unwrap_or(DEFAULT_TIMEOUT_IN_MS); + + let mono_vertex_name = mono_vertex_obj + .metadata + .as_ref() + .and_then(|metadata| metadata.name.clone()) + .ok_or_else(|| Error::Config("Mono vertex name not found".to_string()))?; + + let transformer_config = mono_vertex_obj + .spec + .source + .as_ref() + .and_then(|source| source.transformer.as_ref()) + .map(|_| TransformerConfig { + transformer_type: TransformerType::UserDefined(UserDefinedConfig::default()), + }); + + let source_config = mono_vertex_obj + .spec + .source + .as_ref() + .ok_or_else(|| Error::Config("Source not found".to_string())) + .and_then(|source| { + source.udsource.as_ref().map(|_| SourceConfig { + source_type: source::SourceType::UserDefined(source::UserDefinedConfig::default()), + }).or_else(|| { + source.generator.as_ref().map(|generator| { + let mut generator_config = GeneratorConfig::default(); + + if let Some(value_blob) = &generator.value_blob { + generator_config.content = Bytes::from(value_blob.clone()); + } + + if let Some(msg_size) = generator.msg_size { + if msg_size >= 0 { + generator_config.msg_size_bytes = msg_size as u32; + } else { + warn!("'msgSize' cannot be negative, using default value (8 bytes)"); + } + } + + generator_config.value = generator.value; + generator_config.rpu = generator.rpu.unwrap_or(1) as usize; + generator_config.duration = generator.duration.map_or(1000, |d| std::time::Duration::from(d).as_millis() as usize); + generator_config.key_count = generator.key_count.map_or(0, |kc| std::cmp::min(kc, u8::MAX as i32) as u8); + generator_config.jitter = generator.jitter.map_or(Duration::from_secs(0), std::time::Duration::from); + + SourceConfig { + source_type: source::SourceType::Generator(generator_config), + } + }) + }).ok_or_else(|| Error::Config("Source type not found".to_string())) + })?; + + let sink_config = mono_vertex_obj + .spec + .sink + .as_ref() + .ok_or_else(|| Error::Config("Sink not found".to_string())) + .and_then(|sink| { + let retry_config = sink.retry_strategy.as_ref().map(|retry| { + let mut retry_config = RetryConfig::default(); + + if let Some(backoff) = &retry.backoff { + if let Some(interval) = backoff.interval { + retry_config.sink_retry_interval_in_ms = + std::time::Duration::from(interval).as_millis() as u32; + } + + if let Some(steps) = backoff.steps { + retry_config.sink_max_retry_attempts = steps as u16; + } + } + + if let Some(strategy) = &retry.on_failure { + retry_config.sink_retry_on_fail_strategy = + OnFailureStrategy::from_str(strategy); + } + + retry_config + }); + + sink.udsink + .as_ref() + .map(|_| SinkConfig { + sink_type: sink::SinkType::UserDefined(sink::UserDefinedConfig::default()), + retry_config: retry_config.clone(), + }) + .or_else(|| { + sink.log.as_ref().map(|_| SinkConfig { + sink_type: sink::SinkType::Log(sink::LogConfig::default()), + retry_config: retry_config.clone(), + }) + }) + .or_else(|| { + sink.blackhole.as_ref().map(|_| SinkConfig { + sink_type: sink::SinkType::Blackhole(sink::BlackholeConfig::default()), + retry_config: retry_config.clone(), + }) + }) + .ok_or_else(|| Error::Config("Sink type not found".to_string())) + })?; + + let fb_sink_config = mono_vertex_obj + .spec + .sink + .as_ref() + .and_then(|sink| sink.fallback.as_ref()) + .map(|fallback| { + fallback + .udsink + .as_ref() + .map(|_| SinkConfig { + sink_type: sink::SinkType::UserDefined( + sink::UserDefinedConfig::fallback_default(), + ), + retry_config: None, + }) + .or_else(|| { + fallback.log.as_ref().map(|_| SinkConfig { + sink_type: sink::SinkType::Log(sink::LogConfig::default()), + retry_config: None, + }) + }) + .or_else(|| { + fallback.blackhole.as_ref().map(|_| SinkConfig { + sink_type: sink::SinkType::Blackhole(sink::BlackholeConfig::default()), + retry_config: None, + }) + }) + .ok_or_else(|| Error::Config("Fallback sink type not found".to_string())) + }) + .transpose()?; + + Ok(MonovertexConfig { + name: mono_vertex_name, + replica: *get_vertex_replica(), + batch_size: batch_size as usize, + timeout_in_ms: timeout_in_ms as u64, + metrics_config: MetricsConfig::default(), + source_config, + sink_config, + transformer_config, + fb_sink_config, + }) + } +} + +#[cfg(test)] +mod tests { + use crate::config::components::sink::SinkType; + use crate::config::components::source::SourceType; + use crate::config::components::transformer::TransformerType; + use crate::config::monovertex::MonovertexConfig; + use crate::error::Error; + use base64::prelude::BASE64_STANDARD; + use base64::Engine; + #[test] + fn test_load_valid_config() { + let valid_config = r#" + { + "metadata": { + "name": "test_vertex" + }, + "spec": { + "limits": { + "readBatchSize": 1000, + "readTimeout": "2s" + }, + "source": { + "udsource": { + "container": { + "image": "xxxxxxx", + "resources": {} + } + } + }, + "sink": { + "log": {} + } + } + } + "#; + + let encoded_valid_config = BASE64_STANDARD.encode(valid_config); + let spec = encoded_valid_config.as_str(); + + let config = MonovertexConfig::load(spec.to_string()).unwrap(); + + assert_eq!(config.name, "test_vertex"); + assert_eq!(config.batch_size, 1000); + assert_eq!(config.timeout_in_ms, 2000); + assert!(matches!( + config.source_config.source_type, + SourceType::UserDefined(_) + )); + assert!(matches!(config.sink_config.sink_type, SinkType::Log(_))); + } + + #[test] + fn test_load_missing_source() { + let invalid_config = r#" + { + "metadata": { + "name": "test_vertex" + }, + "spec": { + "limits": { + "readBatchSize": 1000, + "readTimeout": "2s" + }, + "sink": { + "log": {} + } + } + } + "#; + let encoded_invalid_config = BASE64_STANDARD.encode(invalid_config); + let spec = encoded_invalid_config.as_str(); + + let result = MonovertexConfig::load(spec.to_string()); + assert!(matches!(result, Err(Error::Config(_)))); + } + + #[test] + fn test_load_missing_sink() { + let invalid_config = r#" + { + "metadata": { + "name": "test_vertex" + }, + "spec": { + "limits": { + "readBatchSize": 1000, + "readTimeout": "2s" + }, + "source": { + "udsource": { + "container": { + "image": "xxxxxxx", + "resources": {} + } + } + } + } + } + "#; + let encoded_invalid_config = BASE64_STANDARD.encode(invalid_config); + let spec = encoded_invalid_config.as_str(); + + let result = MonovertexConfig::load(spec.to_string()); + assert!(matches!(result, Err(Error::Config(_)))); + } + + #[test] + fn test_load_with_transformer() { + let valid_config = r#" + { + "metadata": { + "name": "test_vertex" + }, + "spec": { + "limits": { + "readBatchSize": 1000, + "readTimeout": "2s" + }, + "source": { + "udsource": { + "container": { + "image": "xxxxxxx", + "resources": {} + } + }, + "transformer": {} + }, + "sink": { + "log": {} + } + } + } + "#; + let encoded_invalid_config = BASE64_STANDARD.encode(valid_config); + let spec = encoded_invalid_config.as_str(); + + let config = MonovertexConfig::load(spec.to_string()).unwrap(); + + assert_eq!(config.name, "test_vertex"); + assert!(config.transformer_config.is_some()); + assert!(matches!( + config.transformer_config.unwrap().transformer_type, + TransformerType::UserDefined(_) + )); + } +} diff --git a/rust/numaflow-core/src/config/pipeline.rs b/rust/numaflow-core/src/config/pipeline.rs new file mode 100644 index 000000000..03753bcfd --- /dev/null +++ b/rust/numaflow-core/src/config/pipeline.rs @@ -0,0 +1,52 @@ +use crate::config::components::sink::SinkConfig; +use crate::config::components::source::{SourceConfig, SourceType}; +use crate::config::components::transformer::TransformerConfig; +use crate::Result; + +pub(crate) mod isb; + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct PipelineConfig { + pub(crate) buffer_reader_config: BufferReaderConfig, + pub(crate) buffer_writer_config: BufferWriterConfig, + pub(crate) vertex_config: VertexConfig, +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct SourceVtxConfig { + pub(crate) source_config: SourceConfig, + pub(crate) transformer_config: Option, +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct SinkVtxConfig { + pub(crate) sink_config: SinkConfig, + pub(crate) fb_sink_config: Option, +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) enum VertexConfig { + Source(SourceVtxConfig), + Sink(SinkVtxConfig), +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct BufferReaderConfig {} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct BufferWriterConfig {} + +impl PipelineConfig { + pub fn load(_pipeline_spec_obj: String) -> Result { + Ok(PipelineConfig { + buffer_reader_config: BufferReaderConfig {}, + buffer_writer_config: BufferWriterConfig {}, + vertex_config: VertexConfig::Source(SourceVtxConfig { + source_config: SourceConfig { + source_type: SourceType::Generator(Default::default()), + }, + transformer_config: None, + }), + }) + } +} diff --git a/rust/numaflow-core/src/config/pipeline/isb.rs b/rust/numaflow-core/src/config/pipeline/isb.rs new file mode 100644 index 000000000..89723cc9b --- /dev/null +++ b/rust/numaflow-core/src/config/pipeline/isb.rs @@ -0,0 +1,53 @@ +/// Jetstream ISB related configurations. +pub mod jetstream { + use std::fmt; + use std::time::Duration; + + // jetstream related constants + const DEFAULT_PARTITION_IDX: u16 = 0; + const DEFAULT_MAX_LENGTH: usize = 30000; + const DEFAULT_USAGE_LIMIT: f64 = 0.8; + const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 1; + const DEFAULT_BUFFER_FULL_STRATEGY: BufferFullStrategy = BufferFullStrategy::RetryUntilSuccess; + const DEFAULT_RETRY_INTERVAL_MILLIS: u64 = 10; + + #[derive(Debug, Clone)] + pub(crate) struct StreamWriterConfig { + pub name: String, + pub partition_idx: u16, + pub max_length: usize, + pub refresh_interval: Duration, + pub usage_limit: f64, + pub buffer_full_strategy: BufferFullStrategy, + pub retry_interval: Duration, + } + + impl Default for StreamWriterConfig { + fn default() -> Self { + StreamWriterConfig { + name: "default".to_string(), + partition_idx: DEFAULT_PARTITION_IDX, + max_length: DEFAULT_MAX_LENGTH, + usage_limit: DEFAULT_USAGE_LIMIT, + refresh_interval: Duration::from_secs(DEFAULT_REFRESH_INTERVAL_SECS), + buffer_full_strategy: DEFAULT_BUFFER_FULL_STRATEGY, + retry_interval: Duration::from_millis(DEFAULT_RETRY_INTERVAL_MILLIS), + } + } + } + + #[derive(Debug, Clone, Eq, PartialEq)] + pub(crate) enum BufferFullStrategy { + RetryUntilSuccess, + DiscardLatest, + } + + impl fmt::Display for BufferFullStrategy { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + BufferFullStrategy::RetryUntilSuccess => write!(f, "retryUntilSuccess"), + BufferFullStrategy::DiscardLatest => write!(f, "discardLatest"), + } + } + } +} diff --git a/rust/numaflow-core/src/error.rs b/rust/numaflow-core/src/error.rs index 56e470d5e..4fd78d8a1 100644 --- a/rust/numaflow-core/src/error.rs +++ b/rust/numaflow-core/src/error.rs @@ -4,7 +4,7 @@ pub type Result = std::result::Result; #[derive(Error, Debug, Clone)] pub enum Error { - #[error("Metrics Error - {0}")] + #[error("metrics Error - {0}")] Metrics(String), #[error("Source Error - {0}")] diff --git a/rust/numaflow-core/src/monovertex.rs b/rust/numaflow-core/src/monovertex.rs index e78b81608..64fe47c67 100644 --- a/rust/numaflow-core/src/monovertex.rs +++ b/rust/numaflow-core/src/monovertex.rs @@ -10,8 +10,11 @@ use tokio_util::sync::CancellationToken; use tonic::transport::Channel; use tracing::{error, info}; -use crate::config::{config, Settings}; +use crate::config::components::{sink, source, transformer}; +use crate::config::monovertex::MonovertexConfig; +use crate::config::{config, CustomResourceType, Settings}; use crate::error::{self, Error}; +use crate::shared::server_info::check_for_server_compatibility; use crate::shared::utils; use crate::shared::utils::create_rpc_channel; use crate::sink::{SinkClientType, SinkHandle}; @@ -42,13 +45,21 @@ pub async fn mono_vertex() -> error::Result<()> { Ok(()) }); - // Run the forwarder with cancellation token. - if let Err(e) = start_forwarder(cln_token, config()).await { - error!("Application error: {:?}", e); - - // abort the signal handler task since we have an error and we are shutting down - if !shutdown_handle.is_finished() { - shutdown_handle.abort(); + let crd_type = config().custom_resource_type.clone(); + match crd_type { + CustomResourceType::MonoVertex(config) => { + // Run the forwarder with cancellation token. + if let Err(e) = start_forwarder(cln_token, &config).await { + error!("Application error: {:?}", e); + + // abort the signal handler task since we have an error and we are shutting down + if !shutdown_handle.is_finished() { + shutdown_handle.abort(); + } + } + } + CustomResourceType::Pipeline(_) => { + panic!("Pipeline not supported") } } @@ -87,30 +98,20 @@ pub(crate) enum SourceType { Generator(GeneratorRead, GeneratorAck, GeneratorLagReader), } -async fn start_forwarder(cln_token: CancellationToken, config: &Settings) -> error::Result<()> { - // make sure that we have compatibility with the server - utils::check_compatibility( - &cln_token, - config - .udsource_config - .as_ref() - .map(|source_config| source_config.server_info_path.clone().into()), - config - .udsink_config - .as_ref() - .map(|sink_config| sink_config.server_info_path.clone().into()), - config - .transformer_config - .as_ref() - .map(|transformer_config| transformer_config.server_info_path.clone().into()), - config - .fallback_config - .as_ref() - .map(|fallback_config| fallback_config.server_info_path.clone().into()), - ) - .await?; +async fn start_forwarder( + cln_token: CancellationToken, + config: &MonovertexConfig, +) -> error::Result<()> { + let mut source_grpc_client = if let source::SourceType::UserDefined(source_config) = + &config.source_config.source_type + { + // do server compatibility check + check_for_server_compatibility( + source_config.server_info_path.clone().into(), + cln_token.clone(), + ) + .await?; - let mut source_grpc_client = if let Some(source_config) = &config.udsource_config { Some( SourceClient::new(create_rpc_channel(source_config.socket_path.clone().into()).await?) .max_encoding_message_size(source_config.grpc_max_message_size) @@ -120,7 +121,16 @@ async fn start_forwarder(cln_token: CancellationToken, config: &Settings) -> err None }; - let mut sink_grpc_client = if let Some(udsink_config) = &config.udsink_config { + let mut sink_grpc_client = if let sink::SinkType::UserDefined(udsink_config) = + &config.sink_config.sink_type + { + // do server compatibility check + check_for_server_compatibility( + udsink_config.server_info_path.clone().into(), + cln_token.clone(), + ) + .await?; + Some( SinkClient::new(create_rpc_channel(udsink_config.socket_path.clone().into()).await?) .max_encoding_message_size(udsink_config.grpc_max_message_size) @@ -130,25 +140,50 @@ async fn start_forwarder(cln_token: CancellationToken, config: &Settings) -> err None }; - let mut fb_sink_grpc_client = if let Some(fb_sink_config) = &config.fallback_config { - let fb_sink_grpc_client = - SinkClient::new(create_rpc_channel(fb_sink_config.socket_path.clone().into()).await?) + let mut fb_sink_grpc_client = if let Some(fb_sink) = &config.fb_sink_config { + if let sink::SinkType::UserDefined(fb_sink_config) = &fb_sink.sink_type { + // do server compatibility check + check_for_server_compatibility( + fb_sink_config.server_info_path.clone().into(), + cln_token.clone(), + ) + .await?; + + Some( + SinkClient::new( + create_rpc_channel(fb_sink_config.socket_path.clone().into()).await?, + ) .max_encoding_message_size(fb_sink_config.grpc_max_message_size) - .max_encoding_message_size(fb_sink_config.grpc_max_message_size); - - Some(fb_sink_grpc_client.clone()) + .max_encoding_message_size(fb_sink_config.grpc_max_message_size), + ) + } else { + None + } } else { None }; - let mut transformer_grpc_client = if let Some(transformer_config) = &config.transformer_config { - let transformer_grpc_client = SourceTransformClient::new( - create_rpc_channel(transformer_config.socket_path.clone().into()).await?, - ) - .max_encoding_message_size(transformer_config.grpc_max_message_size) - .max_encoding_message_size(transformer_config.grpc_max_message_size); - - Some(transformer_grpc_client.clone()) + let mut transformer_grpc_client = if let Some(transformer) = &config.transformer_config { + if let transformer::TransformerType::UserDefined(transformer_config) = + &transformer.transformer_type + { + // do server compatibility check + check_for_server_compatibility( + transformer_config.server_info_path.clone().into(), + cln_token.clone(), + ) + .await?; + + let transformer_grpc_client = SourceTransformClient::new( + create_rpc_channel(transformer_config.socket_path.clone().into()).await?, + ) + .max_encoding_message_size(transformer_config.grpc_max_message_size) + .max_encoding_message_size(transformer_config.grpc_max_message_size); + + Some(transformer_grpc_client.clone()) + } else { + None + } } else { None }; @@ -183,10 +218,18 @@ async fn start_forwarder(cln_token: CancellationToken, config: &Settings) -> err // start the metrics server // FIXME: what to do with the handle - utils::start_metrics_server(metrics_state).await; - - let source = SourceHandle::new(source_type); - start_forwarder_with_source(source, sink, transformer_grpc_client, fb_sink, cln_token).await?; + utils::start_metrics_server(config.metrics_config.clone(), metrics_state).await; + + let source = SourceHandle::new(source_type, config.batch_size); + start_forwarder_with_source( + config.clone(), + source, + sink, + transformer_grpc_client, + fb_sink, + cln_token, + ) + .await?; info!("Forwarder stopped gracefully"); Ok(()) @@ -195,7 +238,7 @@ async fn start_forwarder(cln_token: CancellationToken, config: &Settings) -> err // fetch right the source. // source_grpc_client can be optional because it is valid only for user-defined source. async fn fetch_source( - config: &Settings, + config: &MonovertexConfig, source_grpc_client: &mut Option>, ) -> crate::Result { // check whether the source grpc client is provided, this happens only of the source is a @@ -203,7 +246,7 @@ async fn fetch_source( if let Some(source_grpc_client) = source_grpc_client.clone() { let (source_read, source_ack, lag_reader) = new_source( source_grpc_client, - config.batch_size as usize, + config.batch_size, config.timeout_in_ms as u16, ) .await?; @@ -215,7 +258,7 @@ async fn fetch_source( } // now that we know it is not a user-defined source, it has to be a built-in - if let Some(generator_config) = &config.generator_config { + if let source::SourceType::Generator(generator_config) = &config.source_config.source_type { let (source_read, source_ack, lag_reader) = new_generator(generator_config.clone(), config.batch_size as usize)?; Ok(SourceType::Generator(source_read, source_ack, lag_reader)) @@ -227,27 +270,35 @@ async fn fetch_source( // fetch the actor handle for the sink. // sink_grpc_client can be optional because it is valid only for user-defined sink. async fn fetch_sink( - settings: &Settings, + settings: &MonovertexConfig, sink_grpc_client: Option>, fallback_sink_grpc_client: Option>, ) -> crate::Result<(SinkHandle, Option)> { let fb_sink = match fallback_sink_grpc_client { - Some(fallback_sink) => { - Some(SinkHandle::new(SinkClientType::UserDefined(fallback_sink)).await?) - } + Some(fallback_sink) => Some( + SinkHandle::new( + SinkClientType::UserDefined(fallback_sink), + settings.batch_size, + ) + .await?, + ), None => None, }; if let Some(sink_client) = sink_grpc_client { - let sink = SinkHandle::new(SinkClientType::UserDefined(sink_client)).await?; + let sink = SinkHandle::new( + SinkClientType::UserDefined(sink_client), + settings.batch_size, + ) + .await?; return Ok((sink, fb_sink)); } - if settings.logsink_config.is_some() { - let log = SinkHandle::new(SinkClientType::Log).await?; + if let sink::SinkType::Log(_) = &settings.sink_config.sink_type { + let log = SinkHandle::new(SinkClientType::Log, settings.batch_size).await?; return Ok((log, fb_sink)); } - if settings.blackhole_config.is_some() { - let blackhole = SinkHandle::new(SinkClientType::Blackhole).await?; + if let sink::SinkType::Blackhole(_) = &settings.sink_config.sink_type { + let blackhole = SinkHandle::new(SinkClientType::Blackhole, settings.batch_size).await?; return Ok((blackhole, fb_sink)); } Err(Error::Config( @@ -256,17 +307,18 @@ async fn fetch_sink( } async fn start_forwarder_with_source( + mvtx_config: MonovertexConfig, source: SourceHandle, sink: SinkHandle, - transformer_client: Option>, + transformer_client: Option>, fallback_sink: Option, cln_token: CancellationToken, ) -> error::Result<()> { // start the pending reader to publish pending metrics - let pending_reader = utils::create_pending_reader(source.clone()).await; + let pending_reader = utils::create_pending_reader(&mvtx_config, source.clone()).await; let _pending_reader_handle = pending_reader.start().await; - let mut forwarder_builder = ForwarderBuilder::new(source, sink, cln_token); + let mut forwarder_builder = ForwarderBuilder::new(source, sink, mvtx_config, cln_token); // add transformer if exists if let Some(transformer_client) = transformer_client { @@ -293,16 +345,16 @@ mod tests { use std::fs::File; use std::io::Write; + use crate::config::monovertex::MonovertexConfig; + use crate::config::{components, Settings}; + use crate::error; + use crate::monovertex::start_forwarder; + use crate::shared::server_info::ServerInfo; use numaflow::source::{Message, Offset, SourceReadRequest}; use numaflow::{sink, source}; use tokio::sync::mpsc::Sender; use tokio_util::sync::CancellationToken; - use crate::config::{Settings, UDSinkConfig, UDSourceConfig}; - use crate::error; - use crate::monovertex::start_forwarder; - use crate::shared::server_info::ServerInfo; - struct SimpleSource; #[tonic::async_trait] impl source::Sourcer for SimpleSource { @@ -401,17 +453,26 @@ mod tests { token_clone.cancel(); }); - let config = Settings { - udsink_config: Some(UDSinkConfig { - socket_path: sink_sock_file.to_str().unwrap().to_string(), - server_info_path: sink_server_info.to_str().unwrap().to_string(), - grpc_max_message_size: 1024, - }), - udsource_config: Some(UDSourceConfig { - socket_path: src_sock_file.to_str().unwrap().to_string(), - server_info_path: src_info_file.to_str().unwrap().to_string(), - grpc_max_message_size: 1024, - }), + let config = MonovertexConfig { + source_config: components::source::SourceConfig { + source_type: components::source::SourceType::UserDefined( + components::source::UserDefinedConfig { + socket_path: src_sock_file.to_str().unwrap().to_string(), + grpc_max_message_size: 1024, + server_info_path: src_info_file.to_str().unwrap().to_string(), + }, + ), + }, + sink_config: components::sink::SinkConfig { + sink_type: components::sink::SinkType::UserDefined( + components::sink::UserDefinedConfig { + socket_path: sink_sock_file.to_str().unwrap().to_string(), + grpc_max_message_size: 1024, + server_info_path: sink_server_info.to_str().unwrap().to_string(), + }, + ), + retry_config: Default::default(), + }, ..Default::default() }; diff --git a/rust/numaflow-core/src/monovertex/forwarder.rs b/rust/numaflow-core/src/monovertex/forwarder.rs index bd3253fea..0b83a0dfe 100644 --- a/rust/numaflow-core/src/monovertex/forwarder.rs +++ b/rust/numaflow-core/src/monovertex/forwarder.rs @@ -6,7 +6,8 @@ use tokio::time::sleep; use tokio_util::sync::CancellationToken; use tracing::{debug, info}; -use crate::config::{config, OnFailureStrategy}; +use crate::config::components::sink::{OnFailureStrategy, RetryConfig}; +use crate::config::monovertex::MonovertexConfig; use crate::error; use crate::message::{Message, Offset, ResponseStatusFromSink}; use crate::monovertex::metrics; @@ -25,6 +26,7 @@ pub(crate) struct Forwarder { fb_sink_writer: Option, cln_token: CancellationToken, common_labels: Vec<(String, String)>, + mvtx_config: MonovertexConfig, } /// ForwarderBuilder is used to build a Forwarder instance with optional fields. @@ -34,6 +36,7 @@ pub(crate) struct ForwarderBuilder { cln_token: CancellationToken, source_transformer: Option, fb_sink_writer: Option, + mvtx_config: MonovertexConfig, } impl ForwarderBuilder { @@ -41,6 +44,7 @@ impl ForwarderBuilder { pub(crate) fn new( source_reader: SourceHandle, sink_writer: SinkHandle, + mvtx_config: MonovertexConfig, cln_token: CancellationToken, ) -> Self { Self { @@ -49,6 +53,7 @@ impl ForwarderBuilder { cln_token, source_transformer: None, fb_sink_writer: None, + mvtx_config, } } @@ -67,13 +72,18 @@ impl ForwarderBuilder { /// Build the Forwarder instance #[must_use] pub(crate) fn build(self) -> Forwarder { - let common_labels = metrics::forward_metrics_labels().clone(); + let common_labels = metrics::mvtx_forward_metric_labels( + self.mvtx_config.name.clone(), + self.mvtx_config.replica, + ) + .clone(); Forwarder { source_reader: self.source_reader, sink_writer: self.sink_writer, source_transformer: self.source_transformer, fb_sink_writer: self.fb_sink_writer, cln_token: self.cln_token, + mvtx_config: self.mvtx_config, common_labels, } } @@ -231,10 +241,22 @@ impl Forwarder { // only breaks out of this loop based on the retry strategy unless all the messages have been written to sink // successfully. + let retry_config = &self + .mvtx_config + .sink_config + .retry_config + .clone() + .unwrap_or_default(); + loop { - while attempts < config().sink_max_retry_attempts { + while attempts < retry_config.sink_max_retry_attempts { let status = self - .write_to_sink_once(&mut error_map, &mut fallback_msgs, &mut messages_to_send) + .write_to_sink_once( + &mut error_map, + &mut fallback_msgs, + &mut messages_to_send, + &retry_config, + ) .await; match status { Ok(true) => break, @@ -262,7 +284,9 @@ impl Forwarder { &mut error_map, &mut fallback_msgs, &mut messages_to_send, + &retry_config, ); + match need_retry { // if we are done with the messages, break the loop Ok(false) => break, @@ -277,7 +301,8 @@ impl Forwarder { // If there are fallback messages, write them to the fallback sink if !fallback_msgs.is_empty() { - self.handle_fallback_messages(fallback_msgs).await?; + self.handle_fallback_messages(fallback_msgs, &retry_config) + .await?; } forward_metrics() @@ -302,13 +327,14 @@ impl Forwarder { error_map: &mut HashMap, fallback_msgs: &mut Vec, messages_to_send: &mut Vec, + retry_config: &RetryConfig, ) -> error::Result { // if we are done with the messages, break the loop if messages_to_send.is_empty() { return Ok(false); } // check what is the failure strategy in the config - let strategy = config().sink_retry_on_fail_strategy.clone(); + let strategy = retry_config.sink_retry_on_fail_strategy.clone(); match strategy { // if we need to retry, return true OnFailureStrategy::Retry => { @@ -353,6 +379,7 @@ impl Forwarder { error_map: &mut HashMap, fallback_msgs: &mut Vec, messages_to_send: &mut Vec, + retry_config: &RetryConfig, ) -> error::Result { let start_time = tokio::time::Instant::now(); match self.sink_writer.sink(messages_to_send.clone()).await { @@ -393,7 +420,7 @@ impl Forwarder { } sleep(tokio::time::Duration::from_millis( - config().sink_retry_interval_in_ms as u64, + retry_config.sink_retry_interval_in_ms as u64, )) .await; @@ -405,7 +432,11 @@ impl Forwarder { } // Writes the fallback messages to the fallback sink - async fn handle_fallback_messages(&mut self, fallback_msgs: Vec) -> error::Result<()> { + async fn handle_fallback_messages( + &mut self, + fallback_msgs: Vec, + retry_config: &RetryConfig, + ) -> error::Result<()> { if self.fb_sink_writer.is_none() { return Err(Error::Sink( "Response contains fallback messages but no fallback sink is configured" @@ -421,7 +452,7 @@ impl Forwarder { let mut messages_to_send = fallback_msgs; let fb_msg_count = messages_to_send.len() as u64; - let default_retry = config() + let default_retry = retry_config .sink_default_retry_strategy .clone() .backoff @@ -669,6 +700,9 @@ mod tests { #[tokio::test] async fn test_forwarder_source_sink() { + let batch_size = 100; + let timeout_in_ms = 1000; + let (sink_tx, mut sink_rx) = mpsc::channel(10); // Start the source server @@ -729,22 +763,22 @@ mod tests { let (source_read, source_ack, source_lag_reader) = new_source( SourceClient::new(create_rpc_channel(source_sock_file.clone()).await.unwrap()), - config().batch_size as usize, - config().timeout_in_ms as u16, + batch_size, + timeout_in_ms, ) .await .expect("failed to connect to source server"); - let src_reader = SourceHandle::new(SourceType::UserDefinedSource( - source_read, - source_ack, - source_lag_reader, - )); + let src_reader = SourceHandle::new( + SourceType::UserDefinedSource(source_read, source_ack, source_lag_reader), + batch_size, + ); let sink_grpc_client = SinkClient::new(create_rpc_channel(sink_sock_file).await.unwrap()); - let sink_writer = SinkHandle::new(SinkClientType::UserDefined(sink_grpc_client)) - .await - .expect("failed to connect to sink server"); + let sink_writer = + SinkHandle::new(SinkClientType::UserDefined(sink_grpc_client), batch_size) + .await + .expect("failed to connect to sink server"); let transformer_client = SourceTransformHandle::new(SourceTransformClient::new( create_rpc_channel(transformer_sock_file).await.unwrap(), @@ -752,9 +786,14 @@ mod tests { .await .expect("failed to connect to transformer server"); - let mut forwarder = ForwarderBuilder::new(src_reader, sink_writer, cln_token.clone()) - .source_transformer(transformer_client) - .build(); + let mut forwarder = ForwarderBuilder::new( + src_reader, + sink_writer, + Default::default(), + cln_token.clone(), + ) + .source_transformer(transformer_client) + .build(); // Assert the received message in a different task let assert_handle = tokio::spawn(async move { @@ -817,6 +856,9 @@ mod tests { #[tokio::test] async fn test_forwarder_sink_error() { + let batch_size = 100; + let timeout_in_ms = 1000; + // Start the source server let (source_shutdown_tx, source_shutdown_rx) = tokio::sync::oneshot::channel(); let tmp_dir = tempfile::TempDir::new().unwrap(); @@ -858,25 +900,29 @@ mod tests { let (source_read, source_ack, lag_reader) = new_source( SourceClient::new(create_rpc_channel(source_sock_file.clone()).await.unwrap()), - 500, - 100, + batch_size, + timeout_in_ms, ) .await .expect("failed to connect to source server"); - let source_reader = SourceHandle::new(SourceType::UserDefinedSource( - source_read, - source_ack, - lag_reader, - )); + let source_reader = SourceHandle::new( + SourceType::UserDefinedSource(source_read, source_ack, lag_reader), + batch_size, + ); let sink_client = SinkClient::new(create_rpc_channel(sink_sock_file).await.unwrap()); - let sink_writer = SinkHandle::new(SinkClientType::UserDefined(sink_client)) + let sink_writer = SinkHandle::new(SinkClientType::UserDefined(sink_client), batch_size) .await .expect("failed to connect to sink server"); - let mut forwarder = - ForwarderBuilder::new(source_reader, sink_writer, cln_token.clone()).build(); + let mut forwarder = ForwarderBuilder::new( + source_reader, + sink_writer, + Default::default(), + cln_token.clone(), + ) + .build(); let cancel_handle = tokio::spawn(async move { tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; @@ -920,6 +966,9 @@ mod tests { #[tokio::test] async fn test_fb_sink() { + let batch_size = 100; + let timeout_in_ms = 1000; + let (sink_tx, mut sink_rx) = mpsc::channel(10); // Start the source server @@ -986,25 +1035,26 @@ mod tests { .await .expect("failed to connect to source server"); - let source = SourceHandle::new(SourceType::UserDefinedSource( - source_read, - source_ack, - source_lag_reader, - )); + let source = SourceHandle::new( + SourceType::UserDefinedSource(source_read, source_ack, source_lag_reader), + batch_size, + ); let sink_client = SinkClient::new(create_rpc_channel(sink_sock_file).await.unwrap()); - let sink_writer = SinkHandle::new(SinkClientType::UserDefined(sink_client)) + let sink_writer = SinkHandle::new(SinkClientType::UserDefined(sink_client), batch_size) .await .expect("failed to connect to sink server"); let fb_sink_writer = SinkClient::new(create_rpc_channel(fb_sink_sock_file).await.unwrap()); - let fb_sink_writer = SinkHandle::new(SinkClientType::UserDefined(fb_sink_writer)) - .await - .expect("failed to connect to fb sink server"); + let fb_sink_writer = + SinkHandle::new(SinkClientType::UserDefined(fb_sink_writer), batch_size) + .await + .expect("failed to connect to fb sink server"); - let mut forwarder = ForwarderBuilder::new(source, sink_writer, cln_token.clone()) - .fallback_sink_writer(fb_sink_writer) - .build(); + let mut forwarder = + ForwarderBuilder::new(source, sink_writer, Default::default(), cln_token.clone()) + .fallback_sink_writer(fb_sink_writer) + .build(); let assert_handle = tokio::spawn(async move { let received_message = sink_rx.recv().await.unwrap(); diff --git a/rust/numaflow-core/src/monovertex/metrics.rs b/rust/numaflow-core/src/monovertex/metrics.rs index 0818c26f9..2fe3336b2 100644 --- a/rust/numaflow-core/src/monovertex/metrics.rs +++ b/rust/numaflow-core/src/monovertex/metrics.rs @@ -263,14 +263,14 @@ static MONOVTX_METRICS_LABELS: OnceLock> = OnceLock::new() // forward_metrics_labels is a helper function used to fetch the // MONOVTX_METRICS_LABELS object -pub(crate) fn forward_metrics_labels() -> &'static Vec<(String, String)> { +pub(crate) fn mvtx_forward_metric_labels( + mvtx_name: String, + replica: u16, +) -> &'static Vec<(String, String)> { MONOVTX_METRICS_LABELS.get_or_init(|| { let common_labels = vec![ - ( - MVTX_NAME_LABEL.to_string(), - config().mono_vertex_name.clone(), - ), - (REPLICA_LABEL.to_string(), config().replica.to_string()), + (MVTX_NAME_LABEL.to_string(), mvtx_name), + (REPLICA_LABEL.to_string(), replica.to_string()), ]; common_labels }) @@ -282,7 +282,7 @@ pub async fn metrics_handler() -> impl IntoResponse { let state = global_registry().registry.lock(); let mut buffer = String::new(); encode(&mut buffer, &state).unwrap(); - debug!("Exposing Metrics: {:?}", buffer); + debug!("Exposing metrics: {:?}", buffer); Response::builder() .status(StatusCode::OK) .body(Body::from(buffer)) @@ -367,6 +367,8 @@ struct TimestampedPending { /// and exposing the metrics. It maintains a list of pending stats and ensures that /// only the most recent entries are kept. pub(crate) struct PendingReader { + mvtx_name: String, + replica: u16, lag_reader: SourceHandle, lag_checking_interval: Duration, refresh_interval: Duration, @@ -380,14 +382,18 @@ pub(crate) struct PendingReaderTasks { /// PendingReaderBuilder is used to build a [LagReader] instance. pub(crate) struct PendingReaderBuilder { + mvtx_name: String, + replica: u16, lag_reader: SourceHandle, lag_checking_interval: Option, refresh_interval: Option, } impl PendingReaderBuilder { - pub(crate) fn new(lag_reader: SourceHandle) -> Self { + pub(crate) fn new(mvtx_name: String, replica: u16, lag_reader: SourceHandle) -> Self { Self { + mvtx_name, + replica, lag_reader, lag_checking_interval: None, refresh_interval: None, @@ -406,6 +412,8 @@ impl PendingReaderBuilder { pub(crate) fn build(self) -> PendingReader { PendingReader { + mvtx_name: self.mvtx_name, + replica: self.replica, lag_reader: self.lag_reader, lag_checking_interval: self .lag_checking_interval @@ -437,8 +445,10 @@ impl PendingReader { }); let pending_stats = self.pending_stats.clone(); + let mvtx_name = self.mvtx_name.clone(); + let replica = self.replica; let expose_handle = tokio::spawn(async move { - expose_pending_metrics(refresh_interval, pending_stats).await; + expose_pending_metrics(mvtx_name, replica, refresh_interval, pending_stats).await; }); PendingReaderTasks { buildup_handle, @@ -497,6 +507,8 @@ const LOOKBACK_SECONDS_MAP: [(&str, i64); 4] = // Periodically exposes the pending metrics by calculating the average pending messages over different intervals. async fn expose_pending_metrics( + mvtx_name: String, + replica: u16, refresh_interval: Duration, pending_stats: Arc>>, ) { @@ -511,7 +523,8 @@ async fn expose_pending_metrics( for (label, seconds) in LOOKBACK_SECONDS_MAP { let pending = calculate_pending(seconds, &pending_stats).await; if pending != -1 { - let mut metric_labels = forward_metrics_labels().clone(); + let mut metric_labels = + mvtx_forward_metric_labels(mvtx_name.clone(), replica).clone(); metric_labels.push((PENDING_PERIOD_LABEL.to_string(), label.to_string())); pending_info.insert(label, pending); forward_metrics() @@ -751,7 +764,8 @@ mod tests { tokio::spawn({ let pending_stats = pending_stats.clone(); async move { - expose_pending_metrics(refresh_interval, pending_stats).await; + expose_pending_metrics("test".to_string(), 0, refresh_interval, pending_stats) + .await; } }); // We use tokio::time::interval() as the ticker in the expose_pending_metrics() function. @@ -763,7 +777,7 @@ mod tests { let mut stored_values: [i64; 4] = [0; 4]; { for (i, (label, _)) in LOOKBACK_SECONDS_MAP.iter().enumerate() { - let mut metric_labels = forward_metrics_labels().clone(); + let mut metric_labels = mvtx_forward_metric_labels("test".to_string(), 0).clone(); metric_labels.push((PENDING_PERIOD_LABEL.to_string(), label.to_string())); let guage = forward_metrics() .source_pending diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream.rs b/rust/numaflow-core/src/pipeline/isb/jetstream.rs index 5e7890a97..7e3e7e378 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream.rs @@ -3,7 +3,7 @@ use tokio::sync::mpsc::Receiver; use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; -use crate::config::jetstream::StreamWriterConfig; +use crate::config::pipeline::isb::jetstream::StreamWriterConfig; use crate::error::Error; use crate::message::{Message, Offset}; use crate::pipeline::isb::jetstream::writer::JetstreamWriter; diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs b/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs index 0e3324ac4..431958b12 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs @@ -15,7 +15,7 @@ use tokio_util::sync::CancellationToken; use tracing::error; use tracing::{debug, warn}; -use crate::config::jetstream::StreamWriterConfig; +use crate::config::pipeline::isb::jetstream::StreamWriterConfig; use crate::error::Error; use crate::message::{IntOffset, Offset}; use crate::Result; diff --git a/rust/numaflow-core/src/shared/utils.rs b/rust/numaflow-core/src/shared/utils.rs index 9ec87490c..f55e6b87a 100644 --- a/rust/numaflow-core/src/shared/utils.rs +++ b/rust/numaflow-core/src/shared/utils.rs @@ -2,6 +2,15 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::time::Duration; +use crate::config::components::metrics::MetricsConfig; +use crate::config::monovertex::MonovertexConfig; +use crate::error; +use crate::monovertex::metrics::{ + start_metrics_https_server, PendingReader, PendingReaderBuilder, UserDefinedContainerState, +}; +use crate::shared::server_info; +use crate::source::SourceHandle; +use crate::Error; use axum::http::Uri; use backoff::retry::Retry; use backoff::strategy::fixed; @@ -19,15 +28,6 @@ use tonic::Request; use tower::service_fn; use tracing::{info, warn}; -use crate::config::config; -use crate::error; -use crate::monovertex::metrics::{ - start_metrics_https_server, PendingReader, PendingReaderBuilder, UserDefinedContainerState, -}; -use crate::shared::server_info; -use crate::source::SourceHandle; -use crate::Error; - pub(crate) async fn check_compatibility( cln_token: &CancellationToken, source_file_path: Option, @@ -74,29 +74,41 @@ pub(crate) async fn check_compatibility( } pub(crate) async fn start_metrics_server( + metrics_config: MetricsConfig, metrics_state: UserDefinedContainerState, ) -> JoinHandle<()> { - tokio::spawn(async { + let metrics_port = metrics_config.metrics_server_listen_port.clone(); + tokio::spawn(async move { // Start the metrics server, which server the prometheus metrics. - let metrics_addr: SocketAddr = format!("0.0.0.0:{}", &config().metrics_server_listen_port) + let metrics_addr: SocketAddr = format!("0.0.0.0:{}", metrics_port) .parse() .expect("Invalid address"); if let Err(e) = start_metrics_https_server(metrics_addr, metrics_state).await { - error!("Metrics server error: {:?}", e); + error!("metrics server error: {:?}", e); } }) } -pub(crate) async fn create_pending_reader(lag_reader_grpc_client: SourceHandle) -> PendingReader { - PendingReaderBuilder::new(lag_reader_grpc_client) - .lag_checking_interval(Duration::from_secs( - config().lag_check_interval_in_secs.into(), - )) - .refresh_interval(Duration::from_secs( - config().lag_refresh_interval_in_secs.into(), - )) - .build() +pub(crate) async fn create_pending_reader( + mvtx_config: &MonovertexConfig, + lag_reader_grpc_client: SourceHandle, +) -> PendingReader { + PendingReaderBuilder::new( + mvtx_config.name.clone(), + mvtx_config.replica, + lag_reader_grpc_client, + ) + .lag_checking_interval(Duration::from_secs( + mvtx_config.metrics_config.lag_check_interval_in_secs.into(), + )) + .refresh_interval(Duration::from_secs( + mvtx_config + .metrics_config + .lag_refresh_interval_in_secs + .into(), + )) + .build() } pub(crate) async fn wait_until_ready( diff --git a/rust/numaflow-core/src/sink.rs b/rust/numaflow-core/src/sink.rs index 1c109a36f..88e41dd10 100644 --- a/rust/numaflow-core/src/sink.rs +++ b/rust/numaflow-core/src/sink.rs @@ -3,7 +3,6 @@ use tokio::sync::{mpsc, oneshot}; use tonic::transport::Channel; use user_defined::UserDefinedSink; -use crate::config::config; use crate::message::{Message, ResponseFromSink}; mod blackhole; @@ -70,8 +69,8 @@ pub(crate) enum SinkClientType { } impl SinkHandle { - pub(crate) async fn new(sink_client: SinkClientType) -> crate::Result { - let (sender, receiver) = mpsc::channel(config().batch_size as usize); + pub(crate) async fn new(sink_client: SinkClientType, batch_size: usize) -> crate::Result { + let (sender, receiver) = mpsc::channel(batch_size); match sink_client { SinkClientType::Log => { let log_sink = log::LogSink; diff --git a/rust/numaflow-core/src/source.rs b/rust/numaflow-core/src/source.rs index 5e2eba5da..1816e9fa0 100644 --- a/rust/numaflow-core/src/source.rs +++ b/rust/numaflow-core/src/source.rs @@ -106,8 +106,8 @@ pub(crate) struct SourceHandle { } impl SourceHandle { - pub(crate) fn new(src_type: SourceType) -> Self { - let (sender, receiver) = mpsc::channel(config().batch_size as usize); + pub(crate) fn new(src_type: SourceType, batch_size: usize) -> Self { + let (sender, receiver) = mpsc::channel(batch_size); match src_type { SourceType::UserDefinedSource(reader, acker, lag_reader) => { tokio::spawn(async move { diff --git a/rust/numaflow-core/src/source/generator.rs b/rust/numaflow-core/src/source/generator.rs index e8d80c431..7d42a62a8 100644 --- a/rust/numaflow-core/src/source/generator.rs +++ b/rust/numaflow-core/src/source/generator.rs @@ -1,6 +1,6 @@ use futures::StreamExt; -use crate::config; +use crate::config::components::source::GeneratorConfig; use crate::message::{Message, Offset}; use crate::reader; use crate::source; @@ -33,6 +33,7 @@ mod stream_generator { use tracing::warn; use crate::config; + use crate::config::components::source::GeneratorConfig; use crate::message::{ get_vertex_name, get_vertex_replica, Message, MessageID, Offset, StringOffset, }; @@ -67,7 +68,7 @@ mod stream_generator { } impl StreamGenerator { - pub(super) fn new(cfg: config::GeneratorConfig, batch_size: usize) -> Self { + pub(super) fn new(cfg: GeneratorConfig, batch_size: usize) -> Self { let mut tick = tokio::time::interval(Duration::from_millis(cfg.duration as u64)); tick.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -89,9 +90,7 @@ mod stream_generator { } // Generate all possible keys - let keys = (0..key_count) - .map(|i| format!("key-{}-{}", config::config().replica, i)) - .collect(); + let keys = (0..key_count).map(|i| format!("key-{}", i)).collect(); Self { content: cfg.content, @@ -249,7 +248,7 @@ mod stream_generator { // Define requests per unit (rpu), batch size, and time unit let batch = 6; let rpu = 10; - let cfg = config::GeneratorConfig { + let cfg = GeneratorConfig { content: content.clone(), rpu, jitter: Duration::from_millis(0), @@ -293,7 +292,7 @@ mod stream_generator { #[tokio::test] async fn test_stream_generator_config() { - let cfg = config::GeneratorConfig { + let cfg = GeneratorConfig { rpu: 33, key_count: 7, ..Default::default() @@ -302,7 +301,7 @@ mod stream_generator { let stream_generator = StreamGenerator::new(cfg, 50); assert_eq!(stream_generator.rpu, 28); - let cfg = config::GeneratorConfig { + let cfg = GeneratorConfig { rpu: 3, key_count: 7, ..Default::default() @@ -318,7 +317,7 @@ mod stream_generator { /// source to generate some messages. We mainly use generator for load testing and integration /// testing of Numaflow. The load generated is per replica. pub(crate) fn new_generator( - cfg: config::GeneratorConfig, + cfg: GeneratorConfig, batch_size: usize, ) -> crate::Result<(GeneratorRead, GeneratorAck, GeneratorLagReader)> { let gen_read = GeneratorRead::new(cfg, batch_size); @@ -335,7 +334,7 @@ pub(crate) struct GeneratorRead { impl GeneratorRead { /// A new [GeneratorRead] is returned. It takes a static content, requests per unit-time, batch size /// to return per [source::SourceReader::read], and the unit-time as duration. - fn new(cfg: config::GeneratorConfig, batch_size: usize) -> Self { + fn new(cfg: GeneratorConfig, batch_size: usize) -> Self { let stream_generator = stream_generator::StreamGenerator::new(cfg.clone(), batch_size); Self { stream_generator } } @@ -405,7 +404,7 @@ mod tests { // Define requests per unit (rpu), batch size, and time unit let rpu = 10; let batch = 5; - let cfg = config::GeneratorConfig { + let cfg = GeneratorConfig { content: content.clone(), rpu, jitter: Duration::from_millis(0), @@ -435,7 +434,7 @@ mod tests { // Define requests per unit (rpu), batch size, and time unit let rpu = 10; let batch = 5; - let cfg = config::GeneratorConfig { + let cfg = GeneratorConfig { content: Bytes::new(), rpu, jitter: Duration::from_millis(0), @@ -456,11 +455,11 @@ mod tests { .collect::>(); let expected_keys = vec![ - "key-0-0".to_string(), - "key-0-1".to_string(), - "key-0-2".to_string(), - "key-0-0".to_string(), - "key-0-1".to_string(), + "key-0".to_string(), + "key-1".to_string(), + "key-2".to_string(), + "key-0".to_string(), + "key-1".to_string(), ]; assert_eq!(keys, expected_keys); diff --git a/rust/numaflow-core/src/source/user_defined.rs b/rust/numaflow-core/src/source/user_defined.rs index 69be3d9a3..89a455189 100644 --- a/rust/numaflow-core/src/source/user_defined.rs +++ b/rust/numaflow-core/src/source/user_defined.rs @@ -8,7 +8,6 @@ use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Channel; use tonic::{Request, Streaming}; -use crate::config::config; use crate::message::{Message, Offset}; use crate::reader::LagReader; use crate::source::{SourceAcker, SourceReader}; @@ -41,7 +40,7 @@ pub(crate) async fn new_source( UserDefinedSourceLagReader, )> { let src_read = UserDefinedSourceRead::new(client.clone(), num_records, timeout_in_ms).await?; - let src_ack = UserDefinedSourceAck::new(client.clone()).await?; + let src_ack = UserDefinedSourceAck::new(client.clone(), num_records).await?; let lag_reader = UserDefinedSourceLagReader::new(client); Ok((src_read, src_ack, lag_reader)) @@ -50,23 +49,24 @@ pub(crate) async fn new_source( impl UserDefinedSourceRead { async fn new( mut client: SourceClient, - num_records: usize, + batch_size: usize, timeout_in_ms: u16, ) -> Result { - let (read_tx, resp_stream) = Self::create_reader(&mut client).await?; + let (read_tx, resp_stream) = Self::create_reader(batch_size, &mut client).await?; Ok(Self { read_tx, resp_stream, - num_records, + num_records: batch_size, timeout_in_ms, }) } async fn create_reader( + batch_size: usize, client: &mut SourceClient, ) -> Result<(mpsc::Sender, Streaming)> { - let (read_tx, read_rx) = mpsc::channel(config().batch_size as usize); + let (read_tx, read_rx) = mpsc::channel(batch_size); let read_stream = ReceiverStream::new(read_rx); // do a handshake for read with the server before we start sending read requests @@ -139,8 +139,8 @@ impl SourceReader for UserDefinedSourceRead { } impl UserDefinedSourceAck { - async fn new(mut client: SourceClient) -> Result { - let (ack_tx, ack_resp_stream) = Self::create_acker(&mut client).await?; + async fn new(mut client: SourceClient, batch_size: usize) -> Result { + let (ack_tx, ack_resp_stream) = Self::create_acker(batch_size, &mut client).await?; Ok(Self { ack_tx, @@ -149,9 +149,10 @@ impl UserDefinedSourceAck { } async fn create_acker( + batch_size: usize, client: &mut SourceClient, ) -> Result<(mpsc::Sender, Streaming)> { - let (ack_tx, ack_rx) = mpsc::channel(config().batch_size as usize); + let (ack_tx, ack_rx) = mpsc::channel(batch_size); let ack_stream = ReceiverStream::new(ack_rx); // do a handshake for ack with the server before we start sending ack requests diff --git a/rust/numaflow-core/src/transformer/user_defined.rs b/rust/numaflow-core/src/transformer/user_defined.rs index 035addcbb..5b6c478f4 100644 --- a/rust/numaflow-core/src/transformer/user_defined.rs +++ b/rust/numaflow-core/src/transformer/user_defined.rs @@ -12,7 +12,6 @@ use tonic::transport::Channel; use tonic::{Request, Streaming}; use tracing::warn; -use crate::config::config; use crate::error::{Error, Result}; use crate::message::{get_vertex_name, Message, MessageID, Offset}; use crate::shared::utils::utc_from_timestamp; @@ -28,10 +27,11 @@ struct SourceTransformer { impl SourceTransformer { async fn new( + batch_size: usize, mut client: SourceTransformClient, actor_messages: mpsc::Receiver, ) -> Result { - let (read_tx, read_rx) = mpsc::channel(config().batch_size as usize); + let (read_tx, read_rx) = mpsc::channel(batch_size); let read_stream = ReceiverStream::new(read_rx); // do a handshake for read with the server before we start sending read requests @@ -207,9 +207,10 @@ pub(crate) struct SourceTransformHandle { } impl SourceTransformHandle { - pub(crate) async fn new(client: SourceTransformClient) -> crate::Result { - let (sender, receiver) = mpsc::channel(config().batch_size as usize); - let mut client = SourceTransformer::new(client, receiver).await?; + pub(crate) async fn new(client: SourceTransformClient) -> Result { + let batch_size = 500; + let (sender, receiver) = mpsc::channel(batch_size); + let mut client = SourceTransformer::new(batch_size, client, receiver).await?; tokio::spawn(async move { while let Some(msg) = client.actor_messages.recv().await { client.handle_message(msg).await; @@ -276,7 +277,7 @@ mod tests { }); // wait for the server to start - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; let client = SourceTransformHandle::new(SourceTransformClient::new( create_rpc_channel(sock_file).await?, @@ -299,11 +300,8 @@ mod tests { headers: Default::default(), }; - let resp = tokio::time::timeout( - tokio::time::Duration::from_secs(2), - client.transform(vec![message]), - ) - .await??; + let resp = + tokio::time::timeout(Duration::from_secs(2), client.transform(vec![message])).await??; assert_eq!(resp.len(), 1); // we need to drop the client, because if there are any in-flight requests @@ -355,7 +353,7 @@ mod tests { }); // wait for the server to start - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; let client = SourceTransformHandle::new(SourceTransformClient::new( create_rpc_channel(sock_file).await?,