diff --git a/rust/Cargo.toml b/rust/Cargo.toml index a7a4df363..75fd03612 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -40,8 +40,8 @@ verbose_file_reads = "warn" # This profile optimizes for runtime performance and small binary size at the expense of longer build times. # Compared to default release profile, this profile reduced binary size from 29MB to 21MB # and increased build time (with only one line change in code) from 12 seconds to 133 seconds (tested on Mac M2 Max). -# [profile.release] -# lto = "fat" +[profile.release] +lto = "fat" # This profile optimizes for short build times at the expense of larger binary size and slower runtime performance. # If you have to rebuild image often, in Dockerfile you may replace `--release` passed to cargo command with `--profile quick-release` diff --git a/rust/numaflow-core/src/config/components.rs b/rust/numaflow-core/src/config/components.rs index 833ad8950..3dc0bf2a6 100644 --- a/rust/numaflow-core/src/config/components.rs +++ b/rust/numaflow-core/src/config/components.rs @@ -38,6 +38,8 @@ pub(crate) mod source { Generator(GeneratorConfig), UserDefined(UserDefinedConfig), Pulsar(PulsarSourceConfig), + // Serving source starts an Axum HTTP server in the background. + // The settings will be used as application state which gets cloned in each handler on each request. Serving(Arc), } diff --git a/rust/numaflow-core/src/lib.rs b/rust/numaflow-core/src/lib.rs index 79ce4348b..d65380f8d 100644 --- a/rust/numaflow-core/src/lib.rs +++ b/rust/numaflow-core/src/lib.rs @@ -55,7 +55,6 @@ mod tracker; mod mapper; pub async fn run() -> Result<()> { - let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); let cln_token = CancellationToken::new(); let shutdown_cln_token = cln_token.clone(); diff --git a/rust/numaflow-core/src/shared/create_components.rs b/rust/numaflow-core/src/shared/create_components.rs index c077a1f44..b28f4caee 100644 --- a/rust/numaflow-core/src/shared/create_components.rs +++ b/rust/numaflow-core/src/shared/create_components.rs @@ -12,6 +12,7 @@ use tonic::transport::Channel; use crate::config::components::sink::{SinkConfig, SinkType}; use crate::config::components::source::{SourceConfig, SourceType}; use crate::config::components::transformer::TransformerConfig; +use crate::config::get_vertex_replica; use crate::config::pipeline::map::{MapMode, MapType, MapVtxConfig}; use crate::config::pipeline::{DEFAULT_BATCH_MAP_SOCKET, DEFAULT_STREAM_MAP_SOCKET}; use crate::error::Error; @@ -337,7 +338,13 @@ pub async fn create_source( )) } SourceType::Serving(config) => { - let serving = ServingSource::new(Arc::clone(config), batch_size, read_timeout).await?; + let serving = ServingSource::new( + Arc::clone(config), + batch_size, + read_timeout, + *get_vertex_replica(), + ) + .await?; Ok(( Source::new( batch_size, diff --git a/rust/numaflow-core/src/source/serving.rs b/rust/numaflow-core/src/source/serving.rs index 8e9794b51..b9fb6c72e 100644 --- a/rust/numaflow-core/src/source/serving.rs +++ b/rust/numaflow-core/src/source/serving.rs @@ -2,6 +2,7 @@ use std::sync::Arc; pub(crate) use serving::ServingSource; +use crate::config::get_vertex_replica; use crate::message::{MessageID, StringOffset}; use crate::Error; use crate::Result; @@ -12,9 +13,10 @@ impl TryFrom for Message { type Error = Error; fn try_from(message: serving::Message) -> Result { - let offset = Offset::String(StringOffset::new(message.id.clone(), 0)); + let offset = Offset::String(StringOffset::new(message.id.clone(), *get_vertex_replica())); Ok(Message { + // we do not support keys from HTTP client keys: Arc::from(vec![]), tags: None, value: message.value, @@ -50,11 +52,14 @@ impl super::SourceReader for ServingSource { } fn partitions(&self) -> Vec { - vec![] + vec![*get_vertex_replica()] } } impl super::SourceAcker for ServingSource { + /// HTTP response is sent only once we have confirmation that the message has been written to the ISB. + // TODO: Current implementation only works for `/v1/process/async` endpoint. + // For `/v1/process/{sync,sync_serve}` endpoints: https://github.com/numaproj/numaflow/issues/2308 async fn ack(&mut self, offsets: Vec) -> Result<()> { let mut serving_offsets = vec![]; for offset in offsets { @@ -87,6 +92,8 @@ mod tests { use bytes::Bytes; use serving::{ServingSource, Settings}; + use super::get_vertex_replica; + type Result = std::result::Result>; #[test] @@ -139,8 +146,13 @@ mod tests { ..Default::default() }; let settings = Arc::new(settings); - let mut serving_source = - ServingSource::new(Arc::clone(&settings), 10, Duration::from_millis(1)).await?; + let mut serving_source = ServingSource::new( + Arc::clone(&settings), + 10, + Duration::from_millis(1), + *get_vertex_replica(), + ) + .await?; let client = reqwest::Client::builder() .timeout(Duration::from_secs(2)) diff --git a/rust/serving/src/app/jetstream_proxy.rs b/rust/serving/src/app/jetstream_proxy.rs index 41d0c0df3..6f61a0530 100644 --- a/rust/serving/src/app/jetstream_proxy.rs +++ b/rust/serving/src/app/jetstream_proxy.rs @@ -92,7 +92,11 @@ async fn sync_publish_serve( }, }; - proxy_state.message.send(message).await.unwrap(); // FIXME: + proxy_state + .message + .send(message) + .await + .expect("Failed to send request payload to Serving channel"); if let Err(e) = rx.await { // Deregister the ID in the callback proxy state if writing to Jetstream fails diff --git a/rust/serving/src/error.rs b/rust/serving/src/error.rs index cfa252daa..8d03c4823 100644 --- a/rust/serving/src/error.rs +++ b/rust/serving/src/error.rs @@ -48,6 +48,9 @@ pub enum Error { #[error("Failed to receive message from channel. Actor task is terminated: {0:?}")] ActorTaskTerminated(oneshot::error::RecvError), + #[error("Serving source error - {0}")] + Source(String), + #[error("Other Error - {0}")] // catch-all variant for now Other(String), diff --git a/rust/serving/src/lib.rs b/rust/serving/src/lib.rs index b5579fae7..001065ddf 100644 --- a/rust/serving/src/lib.rs +++ b/rust/serving/src/lib.rs @@ -23,8 +23,8 @@ mod metrics; mod pipeline; pub mod source; -pub use source::{Message, ServingSource}; use crate::source::MessageWrapper; +pub use source::{Message, ServingSource}; #[derive(Clone)] pub(crate) struct AppState { @@ -39,7 +39,9 @@ pub(crate) async fn serve( where T: Clone + Send + Sync + Store + 'static, { + // Setup the CryptoProvider (controls core cryptography used by rustls) for the process let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let (cert, key) = generate_certs()?; let tls_config = RustlsConfig::from_pem(cert.pem().into(), key.serialize_pem().into()) diff --git a/rust/serving/src/metrics.rs b/rust/serving/src/metrics.rs index ff9651377..a605cc998 100644 --- a/rust/serving/src/metrics.rs +++ b/rust/serving/src/metrics.rs @@ -175,9 +175,8 @@ mod tests { #[tokio::test] async fn test_start_metrics_server() -> Result<()> { - rustls::crypto::aws_lc_rs::default_provider() - .install_default() - .unwrap(); + // Setup the CryptoProvider (controls core cryptography used by rustls) for the process + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); let (cert, key) = generate_certs()?; let tls_config = RustlsConfig::from_pem(cert.pem().into(), key.serialize_pem().into()) diff --git a/rust/serving/src/source.rs b/rust/serving/src/source.rs index d6f7f9c28..4366338a2 100644 --- a/rust/serving/src/source.rs +++ b/rust/serving/src/source.rs @@ -31,26 +31,36 @@ enum ActorMessage { Read { batch_size: usize, timeout_at: Instant, - reply_to: oneshot::Sender>, + reply_to: oneshot::Sender>>, }, Ack { offsets: Vec, - reply_to: oneshot::Sender<()>, + reply_to: oneshot::Sender>, }, } +/// Background actor that starts Axum server for accepting HTTP requests. struct ServingSourceActor { + /// The HTTP handlers will put the message received from the payload to this channel messages: mpsc::Receiver, + /// Channel for the actor handle to communicate with this actor handler_rx: mpsc::Receiver, + /// Mapping from request's ID header (usually `X-Numaflow-Id` header) to a channel. + /// This sending a message on this channel notifies the HTTP handler function that the message + /// has been successfully processed. tracker: HashMap>, + vertex_replica_id: u16, } impl ServingSourceActor { async fn start( settings: Arc, handler_rx: mpsc::Receiver, + request_channel_buffer_size: usize, + vertex_replica_id: u16, ) -> Result<()> { - let (messages_tx, messages_rx) = mpsc::channel(10000); + // Channel to which HTTP handlers will send request payload + let (messages_tx, messages_rx) = mpsc::channel(request_channel_buffer_size); // Create a redis store to store the callbacks and the custom responses let redis_store = RedisConnection::new(settings.redis.clone()).await?; // Create the message graph from the pipeline spec and the redis store @@ -67,6 +77,7 @@ impl ServingSourceActor { messages: messages_rx, handler_rx, tracker: HashMap::new(), + vertex_replica_id, }; serving_actor.run().await; }); @@ -98,24 +109,32 @@ impl ServingSourceActor { let _ = reply_to.send(messages); } ActorMessage::Ack { offsets, reply_to } => { - self.ack(offsets).await; - let _ = reply_to.send(()); + let status = self.ack(offsets).await; + let _ = reply_to.send(status); } } } - async fn read(&mut self, count: usize, timeout_at: Instant) -> Vec { + async fn read(&mut self, count: usize, timeout_at: Instant) -> Result> { let mut messages = vec![]; loop { + // Stop if the read timeout has reached or if we have collected the requested number of messages if messages.len() >= count || Instant::now() >= timeout_at { break; } let message = match self.messages.try_recv() { Ok(msg) => msg, Err(mpsc::error::TryRecvError::Empty) => break, - Err(e) => { - tracing::error!(?e, "Receiving messages from the serving channel"); // FIXME: - return messages; + Err(mpsc::error::TryRecvError::Disconnected) => { + // If we have collected at-least one message, we return those messages. + // The error will happen on all the subsequent read attempts too. + if messages.is_empty() { + return Err(Error::Other( + "Sending half of the Serving channel has disconnected".into(), + )); + } + tracing::error!("Sending half of the Serving channel has disconnected"); + return Ok(messages); } }; let MessageWrapper { @@ -126,22 +145,24 @@ impl ServingSourceActor { self.tracker.insert(message.id.clone(), confirm_save); messages.push(message); } - messages + Ok(messages) } - async fn ack(&mut self, offsets: Vec) { + async fn ack(&mut self, offsets: Vec) -> Result<()> { + let offset_suffix = format!("-{}", self.vertex_replica_id); for offset in offsets { - let offset = offset - .strip_suffix("-0") - .expect("offset does not end with '-0'"); // FIXME: we hardcode 0 as the partition index when constructing offset + let offset = offset.strip_suffix(&offset_suffix).ok_or_else(|| { + Error::Source(format!("offset does not end with '{}'", &offset_suffix)) + })?; let confirm_save_tx = self .tracker .remove(offset) - .expect("offset was not found in the tracker"); + .ok_or_else(|| Error::Source("offset was not found in the tracker".into()))?; confirm_save_tx .send(()) - .expect("Sending on confirm_save channel"); + .map_err(|e| Error::Source(format!("Sending on confirm_save channel: {e:?}")))?; } + Ok(()) } } @@ -158,9 +179,10 @@ impl ServingSource { settings: Arc, batch_size: usize, timeout: Duration, + vertex_replica_id: u16, ) -> Result { - let (actor_tx, actor_rx) = mpsc::channel(1000); - ServingSourceActor::start(settings, actor_rx).await?; + let (actor_tx, actor_rx) = mpsc::channel(2 * batch_size); + ServingSourceActor::start(settings, actor_rx, 2 * batch_size, vertex_replica_id).await?; Ok(Self { batch_size, timeout, @@ -177,7 +199,7 @@ impl ServingSource { timeout_at: Instant::now() + self.timeout, }; let _ = self.actor_tx.send(actor_msg).await; - let messages = rx.await.map_err(Error::ActorTaskTerminated)?; + let messages = rx.await.map_err(Error::ActorTaskTerminated)??; tracing::debug!( count = messages.len(), requested_count = self.batch_size, @@ -194,7 +216,7 @@ impl ServingSource { reply_to: tx, }; let _ = self.actor_tx.send(actor_msg).await; - rx.await.map_err(Error::ActorTaskTerminated)?; + rx.await.map_err(Error::ActorTaskTerminated)??; Ok(()) } }