diff --git a/rust/numaflow-core/src/message.rs b/rust/numaflow-core/src/message.rs index b3d1eab84..f0275df07 100644 --- a/rust/numaflow-core/src/message.rs +++ b/rust/numaflow-core/src/message.rs @@ -1,3 +1,4 @@ +use std::cmp::PartialEq; use std::collections::HashMap; use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; @@ -7,7 +8,8 @@ use chrono::{DateTime, Utc}; use crate::error::Error; use crate::shared::utils::{prost_timestamp_from_utc, utc_from_timestamp}; use numaflow_grpc::clients::sink::sink_request::Request; -use numaflow_grpc::clients::sink::SinkRequest; +use numaflow_grpc::clients::sink::Status::{Failure, Fallback, Success}; +use numaflow_grpc::clients::sink::{sink_response, SinkRequest, SinkResponse}; use numaflow_grpc::clients::source::{read_response, AckRequest}; use numaflow_grpc::clients::sourcetransformer::SourceTransformRequest; @@ -113,3 +115,62 @@ impl From for SinkRequest { } } } + +/// Sink's status for each [Message] written to Sink. +#[derive(PartialEq)] +pub(crate) enum ResponseStatusFromSink { + /// Successfully wrote to the Sink. + Success, + /// Failed with error message. + Failed(String), + /// Write to FallBack Sink. + Fallback, +} + +/// Sink will give a response per [Message]. +pub(crate) struct ResponseFromSink { + /// Unique id per [Message]. We need to track per [Message] status. + pub(crate) id: String, + /// Status of the "sink" operation per [Message]. + pub(crate) status: ResponseStatusFromSink, +} + +impl From for SinkResponse { + fn from(value: ResponseFromSink) -> Self { + let (status, err_msg) = match value.status { + ResponseStatusFromSink::Success => (Success, "".to_string()), + ResponseStatusFromSink::Failed(err) => (Failure, err.to_string()), + ResponseStatusFromSink::Fallback => (Fallback, "".to_string()), + }; + + Self { + result: Some(sink_response::Result { + id: value.id, + status: status as i32, + err_msg, + }), + handshake: None, + } + } +} + +impl TryFrom for ResponseFromSink { + type Error = crate::Error; + + fn try_from(value: SinkResponse) -> Result { + let value = value + .result + .ok_or(Error::SinkError("result is empty".to_string()))?; + + let status = match value.status() { + Success => ResponseStatusFromSink::Success, + Failure => ResponseStatusFromSink::Failed(value.err_msg), + Fallback => ResponseStatusFromSink::Fallback, + }; + + Ok(Self { + id: value.id, + status, + }) + } +} diff --git a/rust/numaflow-core/src/monovertex.rs b/rust/numaflow-core/src/monovertex.rs index 374a10e52..eee02bd30 100644 --- a/rust/numaflow-core/src/monovertex.rs +++ b/rust/numaflow-core/src/monovertex.rs @@ -1,26 +1,28 @@ +use std::time::Duration; + +use tokio::signal; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; +use tonic::transport::Channel; +use tracing::info; + +use numaflow_grpc::clients::sink::sink_client::SinkClient; +use numaflow_grpc::clients::source::source_client::SourceClient; +use numaflow_grpc::clients::sourcetransformer::source_transform_client::SourceTransformClient; + use crate::config::{config, Settings}; use crate::error; -use crate::reader::LagReader; use crate::shared::utils; use crate::shared::utils::create_rpc_channel; -use crate::sink::user_defined::SinkWriter; +use crate::sink::SinkHandle; use crate::source::generator::{new_generator, GeneratorAck, GeneratorLagReader, GeneratorRead}; use crate::source::user_defined::{ new_source, UserDefinedSourceAck, UserDefinedSourceLagReader, UserDefinedSourceRead, }; -use crate::source::{SourceAcker, SourceReader}; -use crate::transformer::user_defined::SourceTransformer; +use crate::source::SourceHandle; +use crate::transformer::user_defined::SourceTransformHandle; use forwarder::ForwarderBuilder; use metrics::UserDefinedContainerState; -use numaflow_grpc::clients::sink::sink_client::SinkClient; -use numaflow_grpc::clients::source::source_client::SourceClient; -use numaflow_grpc::clients::sourcetransformer::source_transform_client::SourceTransformClient; -use std::time::Duration; -use tokio::signal; -use tokio::task::JoinHandle; -use tokio_util::sync::CancellationToken; -use tonic::transport::Channel; -use tracing::info; /// [forwarder] orchestrates data movement from the Source to the Sink via the optional SourceTransformer. /// The forward-a-chunk executes the following in an infinite loop till a shutdown signal is received: @@ -78,8 +80,8 @@ async fn shutdown_signal() { } } -enum SourceType { - UdSource( +pub(crate) enum SourceType { + UserDefinedSource( UserDefinedSourceRead, UserDefinedSourceAck, UserDefinedSourceLagReader, @@ -155,7 +157,7 @@ async fn start_forwarder(cln_token: CancellationToken, config: &Settings) -> err ) .await?; - let source_type = fetch_source(&config, &mut source_grpc_client).await?; + let source_type = fetch_source(config, &mut source_grpc_client).await?; // Start the metrics server in a separate background async spawn, // This should be running throughout the lifetime of the application, hence the handle is not @@ -171,38 +173,21 @@ async fn start_forwarder(cln_token: CancellationToken, config: &Settings) -> err // FIXME: what to do with the handle utils::start_metrics_server(metrics_state).await; - match source_type { - SourceType::UdSource(udsource_reader, udsource_acker, udsource_lag_reader) => { - start_forwarder_with_source( - udsource_reader, - udsource_acker, - udsource_lag_reader, - sink_grpc_client, - transformer_grpc_client, - fb_sink_grpc_client, - cln_token, - ) - .await?; - } - SourceType::Generator(generator_reader, generator_acker, generator_lag_reader) => { - start_forwarder_with_source( - generator_reader, - generator_acker, - generator_lag_reader, - sink_grpc_client, - transformer_grpc_client, - fb_sink_grpc_client, - cln_token, - ) - .await?; - } - } + let source = SourceHandle::new(source_type); + start_forwarder_with_source( + source, + sink_grpc_client, + transformer_grpc_client, + fb_sink_grpc_client, + cln_token, + ) + .await?; info!("Forwarder stopped gracefully"); Ok(()) } -async fn fetch_source( +pub(crate) async fn fetch_source( config: &Settings, source_grpc_client: &mut Option>, ) -> crate::Result { @@ -213,7 +198,7 @@ async fn fetch_source( config.timeout_in_ms as u16, ) .await?; - SourceType::UdSource(source_read, source_ack, lag_reader) + SourceType::UserDefinedSource(source_read, source_ack, lag_reader) } else if let Some(generator_config) = &config.generator_config { let (source_read, source_ack, lag_reader) = new_generator( generator_config.content.clone(), @@ -230,39 +215,31 @@ async fn fetch_source( Ok(source_type) } -async fn start_forwarder_with_source( - source_reader: R, - source_acker: A, - source_lag_reader: L, +async fn start_forwarder_with_source( + source: SourceHandle, sink_grpc_client: SinkClient, transformer_client: Option>, fallback_sink_client: Option>, cln_token: CancellationToken, -) -> error::Result<()> -where - R: SourceReader, - A: SourceAcker, - L: LagReader + Clone + 'static, -{ +) -> error::Result<()> { // start the pending reader to publish pending metrics - let mut pending_reader = utils::create_pending_reader(source_lag_reader).await; + let mut pending_reader = utils::create_pending_reader(source.clone()).await; pending_reader.start().await; // build the forwarder - let sink_writer = SinkWriter::new(sink_grpc_client).await?; + let sink_writer = SinkHandle::new(sink_grpc_client).await?; - let mut forwarder_builder = - ForwarderBuilder::new(source_reader, source_acker, sink_writer, cln_token); + let mut forwarder_builder = ForwarderBuilder::new(source, sink_writer, cln_token); // add transformer if exists if let Some(transformer_client) = transformer_client { - let transformer = SourceTransformer::new(transformer_client).await?; + let transformer = SourceTransformHandle::new(transformer_client).await?; forwarder_builder = forwarder_builder.source_transformer(transformer); } // add fallback sink if exists if let Some(fallback_sink_client) = fallback_sink_client { - let fallback_writer = SinkWriter::new(fallback_sink_client).await?; + let fallback_writer = SinkHandle::new(fallback_sink_client).await?; forwarder_builder = forwarder_builder.fallback_sink_writer(fallback_writer); } // build the final forwarder diff --git a/rust/numaflow-core/src/monovertex/forwarder.rs b/rust/numaflow-core/src/monovertex/forwarder.rs index 164864d18..6ac44b763 100644 --- a/rust/numaflow-core/src/monovertex/forwarder.rs +++ b/rust/numaflow-core/src/monovertex/forwarder.rs @@ -1,54 +1,50 @@ -use chrono::Utc; -use log::warn; use std::collections::HashMap; -use tokio::time::sleep; -use tokio_util::sync::CancellationToken; -use tracing::{debug, info}; use crate::config::{config, OnFailureStrategy}; +use crate::error; use crate::error::Error; -use crate::message::{Message, Offset}; +use crate::message::{Message, Offset, ResponseStatusFromSink}; use crate::monovertex::metrics; use crate::monovertex::metrics::forward_metrics; -use crate::sink::user_defined::SinkWriter; -use crate::transformer::user_defined::SourceTransformer; -use crate::{error, source}; -use numaflow_grpc::clients::sink::Status::{Failure, Fallback, Success}; +use crate::sink::SinkHandle; +use crate::{source::SourceHandle, transformer::user_defined::SourceTransformHandle}; + +use chrono::Utc; +use log::warn; +use tokio::time::sleep; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info}; /// Forwarder is responsible for reading messages from the source, applying transformation if /// transformer is present, writing the messages to the sink, and then acknowledging the messages /// back to the source. -pub(crate) struct Forwarder { - source_read: R, - source_ack: A, - sink_writer: SinkWriter, - source_transformer: Option, - fb_sink_writer: Option, +pub(crate) struct Forwarder { + source_reader: SourceHandle, + sink_writer: SinkHandle, + source_transformer: Option, + fb_sink_writer: Option, cln_token: CancellationToken, common_labels: Vec<(String, String)>, } /// ForwarderBuilder is used to build a Forwarder instance with optional fields. -pub(crate) struct ForwarderBuilder { - source_read: R, - source_ack: A, - sink_writer: SinkWriter, +pub(crate) struct ForwarderBuilder { + source_reader: SourceHandle, + sink_writer: SinkHandle, cln_token: CancellationToken, - source_transformer: Option, - fb_sink_writer: Option, + source_transformer: Option, + fb_sink_writer: Option, } -impl ForwarderBuilder { +impl ForwarderBuilder { /// Create a new builder with mandatory fields pub(crate) fn new( - source_read: R, - source_ack: A, - sink_writer: SinkWriter, + source_reader: SourceHandle, + sink_writer: SinkHandle, cln_token: CancellationToken, ) -> Self { Self { - source_read, - source_ack, + source_reader, sink_writer, cln_token, source_transformer: None, @@ -57,24 +53,23 @@ impl ForwarderBuilder { } /// Set the optional transformer client - pub(crate) fn source_transformer(mut self, transformer_client: SourceTransformer) -> Self { + pub(crate) fn source_transformer(mut self, transformer_client: SourceTransformHandle) -> Self { self.source_transformer = Some(transformer_client); self } /// Set the optional fallback client - pub(crate) fn fallback_sink_writer(mut self, fallback_client: SinkWriter) -> Self { + pub(crate) fn fallback_sink_writer(mut self, fallback_client: SinkHandle) -> Self { self.fb_sink_writer = Some(fallback_client); self } /// Build the Forwarder instance #[must_use] - pub(crate) fn build(self) -> Forwarder { + pub(crate) fn build(self) -> Forwarder { let common_labels = metrics::forward_metrics_labels().clone(); Forwarder { - source_read: self.source_read, - source_ack: self.source_ack, + source_reader: self.source_reader, sink_writer: self.sink_writer, source_transformer: self.source_transformer, fb_sink_writer: self.fb_sink_writer, @@ -84,11 +79,7 @@ impl ForwarderBuilder { } } -impl Forwarder -where - A: source::SourceAcker, - R: source::SourceReader, -{ +impl Forwarder { /// start starts the forward-a-chunk loop and exits only after a chunk has been forwarded and ack'ed. /// this means that, in the happy path scenario a block is always completely processed. /// this function will return on any error and will cause end up in a non-0 exit code. @@ -129,7 +120,7 @@ where /// and then acknowledge the messages back to the source. async fn read_and_process_messages(&mut self) -> error::Result { let start_time = tokio::time::Instant::now(); - let messages = self.source_read.read().await.map_err(|e| { + let messages = self.source_reader.read().await.map_err(|e| { Error::ForwarderError(format!("Failed to read messages from source {:?}", e)) })?; @@ -198,13 +189,13 @@ where // Applies transformation to the messages if transformer is present // we concurrently apply transformation to all the messages. async fn apply_transformer(&mut self, messages: Vec) -> error::Result> { - let Some(transformer_client) = &mut self.source_transformer else { + let Some(client) = &mut self.source_transformer else { // return early if there is no transformer return Ok(messages); }; let start_time = tokio::time::Instant::now(); - let results = transformer_client.transform_fn(messages).await?; + let results = client.transform(messages).await?; debug!( "Transformer latency - {}ms", @@ -362,7 +353,7 @@ where messages_to_send: &mut Vec, ) -> error::Result { let start_time = tokio::time::Instant::now(); - match self.sink_writer.sink_fn(messages_to_send.clone()).await { + match self.sink_writer.sink(messages_to_send.clone()).await { Ok(response) => { debug!("Sink latency - {}ms", start_time.elapsed().as_millis()); @@ -370,13 +361,8 @@ where // for the udsink to return the results in the same order as the requests let result_map = response .into_iter() - .map(|resp| match resp.result { - Some(result) => Ok((result.id.clone(), result)), - None => Err(Error::SinkError( - "Response does not contain a result".to_string(), - )), - }) - .collect::>>()?; + .map(|resp| (resp.id, resp.status)) + .collect::>(); error_map.clear(); // drain all the messages that were successfully written @@ -384,14 +370,16 @@ where // construct the error map for the failed messages messages_to_send.retain(|msg| { if let Some(result) = result_map.get(&msg.id) { - return if result.status == Success as i32 { - false - } else if result.status == Fallback as i32 { - fallback_msgs.push(msg.clone()); // add to fallback messages - false - } else { - *error_map.entry(result.err_msg.clone()).or_insert(0) += 1; - true + return match result { + ResponseStatusFromSink::Success => false, + ResponseStatusFromSink::Failed(err_msg) => { + *error_map.entry(err_msg.clone()).or_insert(0) += 1; + true + } + ResponseStatusFromSink::Fallback => { + fallback_msgs.push(msg.clone()); + false + } }; } false @@ -441,7 +429,7 @@ where while attempts < max_attempts { let start_time = tokio::time::Instant::now(); - match fallback_client.sink_fn(messages_to_send.clone()).await { + match fallback_client.sink(messages_to_send.clone()).await { Ok(fb_response) => { debug!( "Fallback sink latency - {}ms", @@ -451,14 +439,9 @@ where // create a map of id to result, since there is no strict requirement // for the udsink to return the results in the same order as the requests let result_map = fb_response - .iter() - .map(|resp| match &resp.result { - Some(result) => Ok((result.id.clone(), result)), - None => Err(Error::SinkError( - "Response does not contain a result".to_string(), - )), - }) - .collect::>>()?; + .into_iter() + .map(|resp| (resp.id, resp.status)) + .collect::>(); let mut contains_fallback_status = false; @@ -468,17 +451,17 @@ where // construct the error map for the failed messages messages_to_send.retain(|msg| { if let Some(result) = result_map.get(&msg.id) { - if result.status == Failure as i32 { - *fallback_error_map - .entry(result.err_msg.clone()) - .or_insert(0) += 1; - true - } else if result.status == Fallback as i32 { - contains_fallback_status = true; - false - } else { - false - } + return match result { + ResponseStatusFromSink::Success => false, + ResponseStatusFromSink::Failed(err_msg) => { + *fallback_error_map.entry(err_msg.clone()).or_insert(0) += 1; + true + } + ResponseStatusFromSink::Fallback => { + contains_fallback_status = true; + false + } + }; } else { false } @@ -525,7 +508,7 @@ where let n = offsets.len(); let start_time = tokio::time::Instant::now(); - self.source_ack.ack(offsets).await?; + self.source_reader.ack(offsets).await?; debug!("Ack latency - {}ms", start_time.elapsed().as_millis()); @@ -548,10 +531,12 @@ mod tests { use crate::config::config; use crate::monovertex::forwarder::ForwarderBuilder; + use crate::monovertex::SourceType; use crate::shared::utils::create_rpc_channel; - use crate::sink::user_defined::SinkWriter; + use crate::sink::SinkHandle; use crate::source::user_defined::new_source; - use crate::transformer::user_defined::SourceTransformer; + use crate::source::SourceHandle; + use crate::transformer::user_defined::SourceTransformHandle; use chrono::Utc; use numaflow::source::{Message, Offset, SourceReadRequest}; use numaflow::{sink, source, sourcetransform}; @@ -739,7 +724,7 @@ mod tests { let cln_token = CancellationToken::new(); - let (source_read, source_ack, _) = new_source( + let (source_read, source_ack, source_lag_reader) = new_source( SourceClient::new(create_rpc_channel(source_sock_file.clone()).await.unwrap()), config().batch_size as usize, config().timeout_in_ms as u16, @@ -747,22 +732,27 @@ mod tests { .await .expect("failed to connect to source server"); - let sink_writer = SinkWriter::new(SinkClient::new( + let src_reader = SourceHandle::new(SourceType::UserDefinedSource( + source_read, + source_ack, + source_lag_reader, + )); + + let sink_writer = SinkHandle::new(SinkClient::new( create_rpc_channel(sink_sock_file).await.unwrap(), )) .await .expect("failed to connect to sink server"); - let transformer_client = SourceTransformer::new(SourceTransformClient::new( + let transformer_client = SourceTransformHandle::new(SourceTransformClient::new( create_rpc_channel(transformer_sock_file).await.unwrap(), )) .await .expect("failed to connect to transformer server"); - let mut forwarder = - ForwarderBuilder::new(source_read, source_ack, sink_writer, cln_token.clone()) - .source_transformer(transformer_client) - .build(); + let mut forwarder = ForwarderBuilder::new(src_reader, sink_writer, cln_token.clone()) + .source_transformer(transformer_client) + .build(); // Assert the received message in a different task let assert_handle = tokio::spawn(async move { @@ -864,7 +854,7 @@ mod tests { let cln_token = CancellationToken::new(); - let (source_read, source_ack, _) = new_source( + let (source_read, source_ack, lag_reader) = new_source( SourceClient::new(create_rpc_channel(source_sock_file.clone()).await.unwrap()), 500, 100, @@ -872,14 +862,20 @@ mod tests { .await .expect("failed to connect to source server"); - let sink_writer = SinkWriter::new(SinkClient::new( + let source_reader = SourceHandle::new(SourceType::UserDefinedSource( + source_read, + source_ack, + lag_reader, + )); + + let sink_writer = SinkHandle::new(SinkClient::new( create_rpc_channel(sink_sock_file).await.unwrap(), )) .await .expect("failed to connect to sink server"); let mut forwarder = - ForwarderBuilder::new(source_read, source_ack, sink_writer, cln_token.clone()).build(); + ForwarderBuilder::new(source_reader, sink_writer, cln_token.clone()).build(); let cancel_handle = tokio::spawn(async move { tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; @@ -981,7 +977,7 @@ mod tests { let cln_token = CancellationToken::new(); - let (source_read, source_ack, _) = new_source( + let (source_read, source_ack, source_lag_reader) = new_source( SourceClient::new(create_rpc_channel(source_sock_file.clone()).await.unwrap()), 500, 100, @@ -989,22 +985,27 @@ mod tests { .await .expect("failed to connect to source server"); - let sink_writer = SinkWriter::new(SinkClient::new( + let source = SourceHandle::new(SourceType::UserDefinedSource( + source_read, + source_ack, + source_lag_reader, + )); + + let sink_writer = SinkHandle::new(SinkClient::new( create_rpc_channel(sink_sock_file).await.unwrap(), )) .await .expect("failed to connect to sink server"); - let fb_sink_writer = SinkWriter::new(SinkClient::new( + let fb_sink_writer = SinkHandle::new(SinkClient::new( create_rpc_channel(fb_sink_sock_file).await.unwrap(), )) .await .expect("failed to connect to fb sink server"); - let mut forwarder = - ForwarderBuilder::new(source_read, source_ack, sink_writer, cln_token.clone()) - .fallback_sink_writer(fb_sink_writer) - .build(); + let mut forwarder = ForwarderBuilder::new(source, sink_writer, cln_token.clone()) + .fallback_sink_writer(fb_sink_writer) + .build(); let assert_handle = tokio::spawn(async move { let received_message = sink_rx.recv().await.unwrap(); diff --git a/rust/numaflow-core/src/monovertex/metrics.rs b/rust/numaflow-core/src/monovertex/metrics.rs index 9ee6f5c65..f5d432c76 100644 --- a/rust/numaflow-core/src/monovertex/metrics.rs +++ b/rust/numaflow-core/src/monovertex/metrics.rs @@ -23,13 +23,14 @@ use tonic::transport::Channel; use tonic::Request; use tracing::{debug, error, info}; -use crate::config::config; -use crate::error::Error; -use crate::reader; use numaflow_grpc::clients::sink::sink_client::SinkClient; use numaflow_grpc::clients::source::source_client::SourceClient; use numaflow_grpc::clients::sourcetransformer::source_transform_client::SourceTransformClient; +use crate::config::config; +use crate::error::Error; +use crate::source::SourceHandle; + // Define the labels for the metrics // Note: Please keep consistent with the definitions in MonoVertex daemon const MVTX_NAME_LABEL: &str = "mvtx_name"; @@ -341,8 +342,8 @@ struct TimestampedPending { /// PendingReader is responsible for periodically checking the lag of the reader /// and exposing the metrics. It maintains a list of pending stats and ensures that /// only the most recent entries are kept. -pub(crate) struct PendingReader { - lag_reader: T, +pub(crate) struct PendingReader { + lag_reader: SourceHandle, lag_checking_interval: Duration, refresh_interval: Duration, buildup_handle: Option>, @@ -351,14 +352,14 @@ pub(crate) struct PendingReader { } /// PendingReaderBuilder is used to build a [LagReader] instance. -pub(crate) struct PendingReaderBuilder { - lag_reader: T, +pub(crate) struct PendingReaderBuilder { + lag_reader: SourceHandle, lag_checking_interval: Option, refresh_interval: Option, } -impl PendingReaderBuilder { - pub(crate) fn new(lag_reader: T) -> Self { +impl PendingReaderBuilder { + pub(crate) fn new(lag_reader: SourceHandle) -> Self { Self { lag_reader, lag_checking_interval: None, @@ -376,7 +377,7 @@ impl PendingReaderBuilder { self } - pub(crate) fn build(self) -> PendingReader { + pub(crate) fn build(self) -> PendingReader { PendingReader { lag_reader: self.lag_reader, lag_checking_interval: self @@ -392,7 +393,7 @@ impl PendingReaderBuilder { } } -impl PendingReader { +impl PendingReader { /// Starts the lag reader by spawning tasks to build up pending info and expose pending metrics. /// /// This method spawns two asynchronous tasks: @@ -416,7 +417,7 @@ impl PendingReader { } /// When the PendingReader is dropped, we need to clean up the pending exposer and the pending builder tasks. -impl Drop for PendingReader { +impl Drop for PendingReader { fn drop(&mut self) { if let Some(handle) = self.expose_handle.take() { handle.abort(); @@ -430,15 +431,15 @@ impl Drop for PendingReader { } /// Periodically checks the pending messages from the source client and build the pending stats. -async fn build_pending_info( - mut lag_reader: T, +async fn build_pending_info( + source: SourceHandle, lag_checking_interval: Duration, pending_stats: Arc>>, ) { let mut ticker = time::interval(lag_checking_interval); loop { ticker.tick().await; - match fetch_pending(&mut lag_reader).await { + match fetch_pending(&source).await { Ok(pending) => { if pending != -1 { let mut stats = pending_stats.lock().await; @@ -460,7 +461,7 @@ async fn build_pending_info( } } -async fn fetch_pending(lag_reader: &mut T) -> crate::error::Result { +async fn fetch_pending(lag_reader: &SourceHandle) -> crate::error::Result { let response: i64 = lag_reader.pending().await?.map_or(-1, |p| p as i64); // default to -1(unavailable) Ok(response) } diff --git a/rust/numaflow-core/src/shared/utils.rs b/rust/numaflow-core/src/shared/utils.rs index de4c79187..7dd1d51ac 100644 --- a/rust/numaflow-core/src/shared/utils.rs +++ b/rust/numaflow-core/src/shared/utils.rs @@ -3,12 +3,13 @@ use std::path::PathBuf; use std::time::Duration; use crate::config::config; +use crate::error; use crate::error::Error; use crate::monovertex::metrics::{ start_metrics_https_server, PendingReader, PendingReaderBuilder, UserDefinedContainerState, }; use crate::shared::server_info; -use crate::{error, reader}; +use crate::source::SourceHandle; use numaflow_grpc::clients::sink::sink_client::SinkClient; use numaflow_grpc::clients::source::source_client::SourceClient; use numaflow_grpc::clients::sourcetransformer::source_transform_client::SourceTransformClient; @@ -85,9 +86,7 @@ pub(crate) async fn start_metrics_server( }) } -pub(crate) async fn create_pending_reader( - lag_reader_grpc_client: T, -) -> PendingReader { +pub(crate) async fn create_pending_reader(lag_reader_grpc_client: SourceHandle) -> PendingReader { PendingReaderBuilder::new(lag_reader_grpc_client) .lag_checking_interval(Duration::from_secs( config().lag_check_interval_in_secs.into(), diff --git a/rust/numaflow-core/src/sink.rs b/rust/numaflow-core/src/sink.rs index ccd6fb8fb..e39892ddd 100644 --- a/rust/numaflow-core/src/sink.rs +++ b/rust/numaflow-core/src/sink.rs @@ -1,4 +1,89 @@ +use tokio::sync::{mpsc, oneshot}; +use tonic::transport::Channel; + +use crate::config::config; +use crate::message::{Message, ResponseFromSink}; +use numaflow_grpc::clients::sink::sink_client::SinkClient; +use user_defined::UserDefinedSink; + /// [User-Defined Sink] extends Numaflow to add custom sources supported outside the builtins. /// /// [User-Defined Sink]: https://numaflow.numaproj.io/user-guide/sinks/user-defined-sinks/ -pub(crate) mod user_defined; +mod user_defined; + +/// Set of items to be implemented be a Numaflow Sink. +/// +/// [Sink]: https://numaflow.numaproj.io/user-guide/sinks/overview/ +#[trait_variant::make(Sink: Send)] +#[allow(unused)] +pub(crate) trait LocalSink { + /// Write the messages to the Sink. + async fn sink(&mut self, messages: Vec) -> crate::Result>; +} + +enum ActorMessage { + Sink { + messages: Vec, + respond_to: oneshot::Sender>>, + }, +} + +struct SinkActor { + actor_messages: mpsc::Receiver, + sink: T, +} + +impl SinkActor +where + T: Sink, +{ + fn new(actor_messages: mpsc::Receiver, sink: T) -> Self { + Self { + actor_messages, + sink, + } + } + + async fn handle_message(&mut self, msg: ActorMessage) { + match msg { + ActorMessage::Sink { + messages, + respond_to, + } => { + let response = self.sink.sink(messages).await; + let _ = respond_to.send(response); + } + } + } +} + +pub(crate) struct SinkHandle { + sender: mpsc::Sender, +} + +impl SinkHandle { + pub(crate) async fn new(sink_client: SinkClient) -> crate::Result { + let (sender, receiver) = mpsc::channel(config().batch_size as usize); + let sink = UserDefinedSink::new(sink_client).await?; + tokio::spawn(async move { + let mut actor = SinkActor::new(receiver, sink); + while let Some(msg) = actor.actor_messages.recv().await { + actor.handle_message(msg).await; + } + }); + Ok(Self { sender }) + } + + pub(crate) async fn sink( + &self, + messages: Vec, + ) -> crate::Result> { + let (tx, rx) = oneshot::channel(); + let msg = ActorMessage::Sink { + messages, + respond_to: tx, + }; + let _ = self.sender.send(msg).await; + rx.await.unwrap() + } +} diff --git a/rust/numaflow-core/src/sink/user_defined.rs b/rust/numaflow-core/src/sink/user_defined.rs index 54ec4bc52..529abfed5 100644 --- a/rust/numaflow-core/src/sink/user_defined.rs +++ b/rust/numaflow-core/src/sink/user_defined.rs @@ -1,23 +1,26 @@ -use crate::error; -use crate::error::Error; -use crate::message::Message; -use numaflow_grpc::clients::sink::sink_client::SinkClient; -use numaflow_grpc::clients::sink::sink_request::Status; -use numaflow_grpc::clients::sink::{Handshake, SinkRequest, SinkResponse}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Channel; use tonic::{Request, Streaming}; +use numaflow_grpc::clients::sink::sink_client::SinkClient; +use numaflow_grpc::clients::sink::sink_request::Status; +use numaflow_grpc::clients::sink::{Handshake, SinkRequest, SinkResponse}; + +use crate::error; +use crate::error::Error; +use crate::message::{Message, ResponseFromSink}; +use crate::sink::Sink; + const DEFAULT_CHANNEL_SIZE: usize = 1000; -/// SinkWriter writes messages to a sink. -pub struct SinkWriter { +/// User-Defined Sink code writes messages to a custom [Sink]. +pub struct UserDefinedSink { sink_tx: mpsc::Sender, resp_stream: Streaming, } -impl SinkWriter { +impl UserDefinedSink { pub(crate) async fn new(mut client: SinkClient) -> error::Result { let (sink_tx, sink_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); let sink_stream = ReceiverStream::new(sink_rx); @@ -54,12 +57,11 @@ impl SinkWriter { resp_stream, }) } +} +impl Sink for UserDefinedSink { /// writes a set of messages to the sink. - pub(crate) async fn sink_fn( - &mut self, - messages: Vec, - ) -> error::Result> { + async fn sink(&mut self, messages: Vec) -> error::Result> { let requests: Vec = messages.into_iter().map(|message| message.into()).collect(); let num_requests = requests.len(); @@ -93,7 +95,7 @@ impl SinkWriter { .message() .await? .ok_or(Error::SinkError("failed to receive response".to_string()))?; - responses.push(response); + responses.push(response.try_into()?); } Ok(responses) @@ -102,6 +104,8 @@ impl SinkWriter { #[cfg(test)] mod tests { + use super::*; + use chrono::offset::Utc; use numaflow::sink; use tokio::sync::mpsc; @@ -110,7 +114,7 @@ mod tests { use crate::error::Result; use crate::message::{Message, Offset}; use crate::shared::utils::create_rpc_channel; - use crate::sink::user_defined::SinkWriter; + use crate::sink::user_defined::UserDefinedSink; use numaflow_grpc::clients::sink::sink_client::SinkClient; struct Logger; @@ -157,7 +161,7 @@ mod tests { tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; let mut sink_client = - SinkWriter::new(SinkClient::new(create_rpc_channel(sock_file).await?)) + UserDefinedSink::new(SinkClient::new(create_rpc_channel(sock_file).await?)) .await .expect("failed to connect to sink server"); @@ -186,10 +190,10 @@ mod tests { }, ]; - let response = sink_client.sink_fn(messages.clone()).await?; + let response = sink_client.sink(messages.clone()).await?; assert_eq!(response.len(), 2); - let response = sink_client.sink_fn(messages.clone()).await?; + let response = sink_client.sink(messages.clone()).await?; assert_eq!(response.len(), 2); drop(sink_client); diff --git a/rust/numaflow-core/src/source.rs b/rust/numaflow-core/src/source.rs index 32fea1c0a..5a825b0c7 100644 --- a/rust/numaflow-core/src/source.rs +++ b/rust/numaflow-core/src/source.rs @@ -1,10 +1,20 @@ -use crate::message::{Message, Offset}; +use tokio::sync::{mpsc, oneshot}; + +use crate::config::config; +use crate::{ + message::{Message, Offset}, + monovertex::SourceType, + reader::LagReader, +}; /// [User-Defined Source] extends Numaflow to add custom sources supported outside the builtins. /// /// [User-Defined Source]: https://numaflow.numaproj.io/user-guide/sources/user-defined-sources/ pub(crate) mod user_defined; +/// [Generator] is a builtin to generate data for load testing and other internal use-cases. +/// +/// [Generator]: https://numaflow.numaproj.io/user-guide/sources/generator/ pub(crate) mod generator; /// Set of Read related items that has to be implemented to become a Source. @@ -25,3 +35,121 @@ pub(crate) trait SourceAcker { /// acknowledge an offset. The implementor might choose to do it in an asynchronous way. async fn ack(&mut self, _: Vec) -> crate::Result<()>; } + +enum ActorMessage { + #[allow(dead_code)] + Name { + respond_to: oneshot::Sender<&'static str>, + }, + Read { + respond_to: oneshot::Sender>>, + }, + Ack { + respond_to: oneshot::Sender>, + offsets: Vec, + }, + Pending { + respond_to: oneshot::Sender>>, + }, +} + +struct SourceActor { + receiver: mpsc::Receiver, + reader: R, + acker: A, + lag_reader: L, +} + +impl SourceActor +where + R: SourceReader, + A: SourceAcker, + L: LagReader, +{ + fn new(receiver: mpsc::Receiver, reader: R, acker: A, lag_reader: L) -> Self { + Self { + receiver, + reader, + acker, + lag_reader, + } + } + + async fn handle_message(&mut self, msg: ActorMessage) { + match msg { + ActorMessage::Name { respond_to } => { + let name = self.reader.name(); + let _ = respond_to.send(name); + } + ActorMessage::Read { respond_to } => { + let msgs = self.reader.read().await; + let _ = respond_to.send(msgs); + } + ActorMessage::Ack { + respond_to, + offsets, + } => { + let ack = self.acker.ack(offsets).await; + let _ = respond_to.send(ack); + } + ActorMessage::Pending { respond_to } => { + let pending = self.lag_reader.pending().await; + let _ = respond_to.send(pending); + } + } + } +} + +#[derive(Clone)] +pub(crate) struct SourceHandle { + sender: mpsc::Sender, +} + +impl SourceHandle { + pub(crate) fn new(src_type: SourceType) -> Self { + let (sender, receiver) = mpsc::channel(config().batch_size as usize); + match src_type { + SourceType::UserDefinedSource(reader, acker, lag_reader) => { + tokio::spawn(async move { + let mut actor = SourceActor::new(receiver, reader, acker, lag_reader); + while let Some(msg) = actor.receiver.recv().await { + actor.handle_message(msg).await; + } + }); + } + SourceType::Generator(reader, acker, lag_reader) => { + tokio::spawn(async move { + let mut actor = SourceActor::new(receiver, reader, acker, lag_reader); + while let Some(msg) = actor.receiver.recv().await { + actor.handle_message(msg).await; + } + }); + } + }; + Self { sender } + } + + pub(crate) async fn read(&self) -> crate::Result> { + let (sender, receiver) = oneshot::channel(); + let msg = ActorMessage::Read { respond_to: sender }; + let _ = self.sender.send(msg).await; + receiver.await.unwrap() + } + + pub(crate) async fn ack(&self, offsets: Vec) -> crate::Result<()> { + let (sender, receiver) = oneshot::channel(); + let msg = ActorMessage::Ack { + respond_to: sender, + offsets, + }; + let _ = self.sender.send(msg).await; + receiver.await.unwrap() + } + + pub(crate) async fn pending(&self) -> crate::error::Result> { + let (sender, receiver) = oneshot::channel(); + let msg = ActorMessage::Pending { respond_to: sender }; + let _ = self.sender.send(msg).await; + receiver.await.unwrap() + } +} diff --git a/rust/numaflow-core/src/source/generator.rs b/rust/numaflow-core/src/source/generator.rs index 611969c74..08f20e25e 100644 --- a/rust/numaflow-core/src/source/generator.rs +++ b/rust/numaflow-core/src/source/generator.rs @@ -1,9 +1,11 @@ +use std::time::Duration; + +use bytes::Bytes; +use futures::StreamExt; + use crate::message::{Message, Offset}; use crate::reader; use crate::source; -use bytes::Bytes; -use futures::StreamExt; -use std::time::Duration; /// Stream Generator returns a set of messages for every `.next` call. It will throttle itself if /// the call exceeds the RPU. It will return a max (batch size, RPU) till the quota for that unit of diff --git a/rust/numaflow-core/src/source/user_defined.rs b/rust/numaflow-core/src/source/user_defined.rs index 3eb084666..4ced50203 100644 --- a/rust/numaflow-core/src/source/user_defined.rs +++ b/rust/numaflow-core/src/source/user_defined.rs @@ -1,18 +1,20 @@ +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::transport::Channel; +use tonic::{Request, Streaming}; + +use numaflow_grpc::clients::source; +use numaflow_grpc::clients::source::source_client::SourceClient; +use numaflow_grpc::clients::source::{ + read_request, AckRequest, AckResponse, ReadRequest, ReadResponse, +}; + use crate::config::config; use crate::error; use crate::error::Error::SourceError; use crate::message::{Message, Offset}; use crate::reader::LagReader; use crate::source::{SourceAcker, SourceReader}; -use numaflow_grpc::clients::source; -use numaflow_grpc::clients::source::source_client::SourceClient; -use numaflow_grpc::clients::source::{ - read_request, AckRequest, AckResponse, ReadRequest, ReadResponse, -}; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use tonic::transport::Channel; -use tonic::{Request, Streaming}; /// User-Defined Source to operative on custom sources. #[derive(Debug)] diff --git a/rust/numaflow-core/src/transformer/user_defined.rs b/rust/numaflow-core/src/transformer/user_defined.rs index f06346053..42131622b 100644 --- a/rust/numaflow-core/src/transformer/user_defined.rs +++ b/rust/numaflow-core/src/transformer/user_defined.rs @@ -1,14 +1,6 @@ use std::collections::HashMap; -use crate::config::config; -use crate::error::{Error, Result}; -use crate::message::{Message, Offset}; -use crate::shared::utils::utc_from_timestamp; -use numaflow_grpc::clients::sourcetransformer::{ - self, source_transform_client::SourceTransformClient, SourceTransformRequest, - SourceTransformResponse, -}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; @@ -16,16 +8,30 @@ use tonic::transport::Channel; use tonic::{Request, Streaming}; use tracing::warn; +use numaflow_grpc::clients::sourcetransformer::{ + self, source_transform_client::SourceTransformClient, SourceTransformRequest, + SourceTransformResponse, +}; + +use crate::config::config; +use crate::error::{Error, Result}; +use crate::message::{Message, Offset}; +use crate::shared::utils::utc_from_timestamp; + const DROP: &str = "U+005C__DROP__"; /// TransformerClient is a client to interact with the transformer server. -pub struct SourceTransformer { +struct SourceTransformer { + actor_messages: mpsc::Receiver, read_tx: mpsc::Sender, resp_stream: Streaming, } impl SourceTransformer { - pub(crate) async fn new(mut client: SourceTransformClient) -> Result { + async fn new( + mut client: SourceTransformClient, + actor_messages: mpsc::Receiver, + ) -> Result { let (read_tx, read_rx) = mpsc::channel(config().batch_size as usize); let read_stream = ReceiverStream::new(read_rx); @@ -56,12 +62,25 @@ impl SourceTransformer { } Ok(Self { + actor_messages, read_tx, resp_stream, }) } - pub(crate) async fn transform_fn(&mut self, messages: Vec) -> Result> { + async fn handle_message(&mut self, message: ActorMessage) { + match message { + ActorMessage::Transform { + messages, + respond_to, + } => { + let result = self.transform_fn(messages).await; + let _ = respond_to.send(result); + } + } + } + + async fn transform_fn(&mut self, messages: Vec) -> Result> { // fields which will not be changed struct MessageInfo { offset: Offset, @@ -169,13 +188,48 @@ impl SourceTransformer { } } +enum ActorMessage { + Transform { + messages: Vec, + respond_to: oneshot::Sender>>, + }, +} + +#[derive(Clone)] +pub(crate) struct SourceTransformHandle { + sender: mpsc::Sender, +} + +impl SourceTransformHandle { + pub(crate) async fn new(client: SourceTransformClient) -> crate::Result { + let (sender, receiver) = mpsc::channel(config().batch_size as usize); + let mut client = SourceTransformer::new(client, receiver).await?; + tokio::spawn(async move { + while let Some(msg) = client.actor_messages.recv().await { + client.handle_message(msg).await; + } + }); + Ok(Self { sender }) + } + + pub(crate) async fn transform(&self, messages: Vec) -> Result> { + let (sender, receiver) = oneshot::channel(); + let msg = ActorMessage::Transform { + messages, + respond_to: sender, + }; + let _ = self.sender.send(msg).await; + receiver.await.unwrap() + } +} + #[cfg(test)] mod tests { use std::error::Error; use std::time::Duration; use crate::shared::utils::create_rpc_channel; - use crate::transformer::user_defined::SourceTransformer; + use crate::transformer::user_defined::SourceTransformHandle; use numaflow::sourcetransform; use numaflow_grpc::clients::sourcetransformer::source_transform_client::SourceTransformClient; use tempfile::TempDir; @@ -216,7 +270,7 @@ mod tests { // wait for the server to start tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - let mut client = SourceTransformer::new(SourceTransformClient::new( + let client = SourceTransformHandle::new(SourceTransformClient::new( create_rpc_channel(sock_file).await?, )) .await?; @@ -235,7 +289,7 @@ mod tests { let resp = tokio::time::timeout( tokio::time::Duration::from_secs(2), - client.transform_fn(vec![message]), + client.transform(vec![message]), ) .await??; assert_eq!(resp.len(), 1); @@ -291,7 +345,7 @@ mod tests { // wait for the server to start tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - let mut client = SourceTransformer::new(SourceTransformClient::new( + let client = SourceTransformHandle::new(SourceTransformClient::new( create_rpc_channel(sock_file).await?, )) .await?; @@ -308,7 +362,7 @@ mod tests { headers: Default::default(), }; - let resp = client.transform_fn(vec![message]).await?; + let resp = client.transform(vec![message]).await?; assert!(resp.is_empty()); // we need to drop the client, because if there are any in-flight requests