Skip to content

Commit

Permalink
chore: separate ack (#2119)
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <[email protected]>
  • Loading branch information
vigith authored Oct 3, 2024
1 parent 7586ffb commit 6b46875
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 84 deletions.
5 changes: 3 additions & 2 deletions rust/numaflow-core/src/monovertex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async fn start_forwarder(cln_token: CancellationToken, sdk_config: SDKConfig) ->
)
.await?;

let (source_reader, lag_reader) = new_source(
let (source_read, source_ack, lag_reader) = new_source(
source_grpc_client.clone(),
config().batch_size as usize,
config().timeout_in_ms as u16,
Expand Down Expand Up @@ -174,7 +174,8 @@ async fn start_forwarder(cln_token: CancellationToken, sdk_config: SDKConfig) ->

let sink_writer = SinkWriter::new(sink_grpc_client.clone()).await?;

let mut forwarder_builder = ForwarderBuilder::new(source_reader, sink_writer, cln_token);
let mut forwarder_builder =
ForwarderBuilder::new(source_read, source_ack, sink_writer, cln_token);

// add transformer if exists
if let Some(transformer_grpc_client) = transformer_grpc_client {
Expand Down
61 changes: 37 additions & 24 deletions rust/numaflow-core/src/monovertex/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use crate::{error, source};
/// Forwarder is responsible for reading messages from the source, applying transformation if
/// transformer is present, writing the messages to the sink, and then acknowledging the messages
/// back to the source.
pub(crate) struct Forwarder<T> {
source: T,
pub(crate) struct Forwarder<A, R> {
source_read: R,
source_ack: A,
sink_writer: SinkWriter,
source_transformer: Option<SourceTransformer>,
fb_sink_writer: Option<SinkWriter>,
Expand All @@ -28,19 +29,26 @@ pub(crate) struct Forwarder<T> {
}

/// ForwarderBuilder is used to build a Forwarder instance with optional fields.
pub(crate) struct ForwarderBuilder<T> {
source: T,
pub(crate) struct ForwarderBuilder<A, R> {
source_read: R,
source_ack: A,
sink_writer: SinkWriter,
cln_token: CancellationToken,
source_transformer: Option<SourceTransformer>,
fb_sink_writer: Option<SinkWriter>,
}

impl<T> ForwarderBuilder<T> {
impl<A, R> ForwarderBuilder<A, R> {
/// Create a new builder with mandatory fields
pub(crate) fn new(source: T, sink_writer: SinkWriter, cln_token: CancellationToken) -> Self {
pub(crate) fn new(
source_read: R,
source_ack: A,
sink_writer: SinkWriter,
cln_token: CancellationToken,
) -> Self {
Self {
source,
source_read,
source_ack,
sink_writer,
cln_token,
source_transformer: None,
Expand All @@ -62,10 +70,11 @@ impl<T> ForwarderBuilder<T> {

/// Build the Forwarder instance
#[must_use]
pub(crate) fn build(self) -> Forwarder<T> {
pub(crate) fn build(self) -> Forwarder<A, R> {
let common_labels = metrics::forward_metrics_labels().clone();
Forwarder {
source: self.source,
source_read: self.source_read,
source_ack: self.source_ack,
sink_writer: self.sink_writer,
source_transformer: self.source_transformer,
fb_sink_writer: self.fb_sink_writer,
Expand All @@ -75,9 +84,10 @@ impl<T> ForwarderBuilder<T> {
}
}

impl<T> Forwarder<T>
impl<A, R> Forwarder<A, R>
where
T: source::Source,
A: source::SourceAcker,
R: source::SourceReader,
{
/// start starts the forward-a-chunk loop and exits only after a chunk has been forwarded and ack'ed.
/// this means that, in the happy path scenario a block is always completely processed.
Expand Down Expand Up @@ -119,7 +129,7 @@ where
/// and then acknowledge the messages back to the source.
async fn read_and_process_messages(&mut self) -> error::Result<usize> {
let start_time = tokio::time::Instant::now();
let messages = self.source.read().await.map_err(|e| {
let messages = self.source_read.read().await.map_err(|e| {
Error::ForwarderError(format!("Failed to read messages from source {:?}", e))
})?;

Expand Down Expand Up @@ -515,7 +525,7 @@ where
let n = offsets.len();
let start_time = tokio::time::Instant::now();

self.source.ack(offsets).await?;
self.source_ack.ack(offsets).await?;

debug!("Ack latency - {}ms", start_time.elapsed().as_millis());

Expand Down Expand Up @@ -543,7 +553,7 @@ mod tests {
use crate::monovertex::sourcetransform_pb::source_transform_client::SourceTransformClient;
use crate::shared::utils::create_rpc_channel;
use crate::sink::user_defined::SinkWriter;
use crate::source::user_defined::UserDefinedSource;
use crate::source::user_defined::new_source;
use crate::transformer::user_defined::SourceTransformer;
use chrono::Utc;
use numaflow::source::{Message, Offset, SourceReadRequest};
Expand Down Expand Up @@ -729,7 +739,7 @@ mod tests {

let cln_token = CancellationToken::new();

let source = UserDefinedSource::new(
let (source_read, source_ack, _) = new_source(
SourceClient::new(create_rpc_channel(source_sock_file.clone()).await.unwrap()),
config().batch_size as usize,
config().timeout_in_ms as u16,
Expand All @@ -749,9 +759,10 @@ mod tests {
.await
.expect("failed to connect to transformer server");

let mut forwarder = ForwarderBuilder::new(source, sink_writer, cln_token.clone())
.source_transformer(transformer_client)
.build();
let mut forwarder =
ForwarderBuilder::new(source_read, source_ack, sink_writer, cln_token.clone())
.source_transformer(transformer_client)
.build();

// Assert the received message in a different task
let assert_handle = tokio::spawn(async move {
Expand Down Expand Up @@ -853,7 +864,7 @@ mod tests {

let cln_token = CancellationToken::new();

let source = UserDefinedSource::new(
let (source_read, source_ack, _) = new_source(
SourceClient::new(create_rpc_channel(source_sock_file.clone()).await.unwrap()),
500,
100,
Expand All @@ -867,7 +878,8 @@ mod tests {
.await
.expect("failed to connect to sink server");

let mut forwarder = ForwarderBuilder::new(source, sink_writer, cln_token.clone()).build();
let mut forwarder =
ForwarderBuilder::new(source_read, source_ack, sink_writer, cln_token.clone()).build();

let cancel_handle = tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Expand Down Expand Up @@ -969,7 +981,7 @@ mod tests {

let cln_token = CancellationToken::new();

let source = UserDefinedSource::new(
let (source_read, source_ack, _) = new_source(
SourceClient::new(create_rpc_channel(source_sock_file.clone()).await.unwrap()),
500,
100,
Expand All @@ -989,9 +1001,10 @@ mod tests {
.await
.expect("failed to connect to fb sink server");

let mut forwarder = ForwarderBuilder::new(source, sink_writer, cln_token.clone())
.fallback_sink_writer(fb_sink_writer)
.build();
let mut forwarder =
ForwarderBuilder::new(source_read, source_ack, sink_writer, cln_token.clone())
.fallback_sink_writer(fb_sink_writer)
.build();

let assert_handle = tokio::spawn(async move {
let received_message = sink_rx.recv().await.unwrap();
Expand Down
13 changes: 8 additions & 5 deletions rust/numaflow-core/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@ use crate::message::{Message, Offset};
/// [User-Defined Source]: https://numaflow.numaproj.io/user-guide/sources/user-defined-sources/
pub(crate) mod user_defined;

/// Set of items that has to be implemented to become a Source.
pub(crate) trait Source {
/// Set of Read related items that has to be implemented to become a Source.
pub(crate) trait SourceReader {
#[allow(dead_code)]
/// Name of the source.
fn name(&self) -> &'static str;

async fn read(&mut self) -> crate::Result<Vec<Message>>;

/// acknowledge an offset. The implementor might choose to do it in an asynchronous way.
async fn ack(&mut self, _: Vec<Offset>) -> crate::Result<()>;

#[allow(dead_code)]
/// number of partitions processed by this source.
fn partitions(&self) -> Vec<u16>;
}

/// Set of Ack related items that has to be implemented to become a Source.
pub(crate) trait SourceAcker {
/// acknowledge an offset. The implementor might choose to do it in an asynchronous way.
async fn ack(&mut self, _: Vec<Offset>) -> crate::Result<()>;
}
Loading

0 comments on commit 6b46875

Please sign in to comment.